risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
33use risingwave_pb::meta::subscribe_response::Operation;
34use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_service::BarrierCompleteResponse;
37use tracing::{info, warn};
38
39use crate::MetaResult;
40use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
41use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
42use crate::barrier::rpc::ControlStreamManager;
43use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
44use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
45use crate::controller::utils::rebuild_fragment_mapping;
46use crate::manager::NotificationManagerRef;
47use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
48
49#[derive(Debug, Clone)]
50pub struct SharedFragmentInfo {
51    pub fragment_id: FragmentId,
52    pub job_id: JobId,
53    pub distribution_type: DistributionType,
54    pub actors: HashMap<ActorId, InflightActorInfo>,
55    pub vnode_count: usize,
56    pub fragment_type_mask: FragmentTypeMask,
57}
58
59impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
60    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
61        let (info, job_id) = pair;
62
63        let InflightFragmentInfo {
64            fragment_id,
65            distribution_type,
66            fragment_type_mask,
67            actors,
68            vnode_count,
69            ..
70        } = info;
71
72        Self {
73            fragment_id: *fragment_id,
74            job_id,
75            distribution_type: *distribution_type,
76            fragment_type_mask: *fragment_type_mask,
77            actors: actors.clone(),
78            vnode_count: *vnode_count,
79        }
80    }
81}
82
83#[derive(Default, Debug)]
84pub struct SharedActorInfosInner {
85    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
86}
87
88impl SharedActorInfosInner {
89    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
90        self.info
91            .values()
92            .find_map(|database| database.get(&fragment_id))
93    }
94
95    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
96        self.info.values().flatten()
97    }
98}
99
100#[derive(Clone, educe::Educe)]
101#[educe(Debug)]
102pub struct SharedActorInfos {
103    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
104    #[educe(Debug(ignore))]
105    notification_manager: NotificationManagerRef,
106}
107
108impl SharedActorInfos {
109    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
110        self.inner.read()
111    }
112
113    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
114        let core = self.inner.read();
115        core.iter_over_fragments()
116            .flat_map(|(_, fragment)| {
117                fragment
118                    .actors
119                    .iter()
120                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
121            })
122            .collect()
123    }
124
125    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
126    ///
127    /// Very occasionally split removal may happen during scaling, in which case we need to
128    /// use the old splits for reallocation instead of the latest splits (which may be missing),
129    /// so that we can resolve the split removal in the next command.
130    pub fn migrate_splits_for_source_actors(
131        &self,
132        fragment_id: FragmentId,
133        prev_actor_ids: &[ActorId],
134        curr_actor_ids: &[ActorId],
135    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
136        let guard = self.read_guard();
137
138        let prev_splits = prev_actor_ids
139            .iter()
140            .flat_map(|actor_id| {
141                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
142                guard
143                    .get_fragment(fragment_id)
144                    .and_then(|info| info.actors.get(actor_id))
145                    .map(|actor| actor.splits.clone())
146                    .unwrap_or_default()
147            })
148            .map(|split| (split.id(), split))
149            .collect();
150
151        let empty_actor_splits = curr_actor_ids
152            .iter()
153            .map(|actor_id| (*actor_id, vec![]))
154            .collect();
155
156        let diff = crate::stream::source_manager::reassign_splits(
157            fragment_id,
158            empty_actor_splits,
159            &prev_splits,
160            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
161            std::default::Default::default(),
162        )
163        .unwrap_or_default();
164
165        Ok(diff)
166    }
167}
168
169impl SharedActorInfos {
170    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
171        Self {
172            inner: Arc::new(Default::default()),
173            notification_manager,
174        }
175    }
176
177    pub(super) fn remove_database(&self, database_id: DatabaseId) {
178        if let Some(database) = self.inner.write().info.remove(&database_id) {
179            let mapping = database
180                .into_values()
181                .map(|fragment| rebuild_fragment_mapping(&fragment))
182                .collect_vec();
183            if !mapping.is_empty() {
184                self.notification_manager
185                    .notify_fragment_mapping(Operation::Delete, mapping);
186            }
187        }
188    }
189
190    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
191        let database_ids: HashSet<_> = database_ids.into_iter().collect();
192
193        let mut mapping = Vec::new();
194        for fragment in self
195            .inner
196            .write()
197            .info
198            .extract_if(|database_id, _| !database_ids.contains(database_id))
199            .flat_map(|(_, fragments)| fragments.into_values())
200        {
201            mapping.push(rebuild_fragment_mapping(&fragment));
202        }
203        if !mapping.is_empty() {
204            self.notification_manager
205                .notify_fragment_mapping(Operation::Delete, mapping);
206        }
207    }
208
209    pub(super) fn recover_database(
210        &self,
211        database_id: DatabaseId,
212        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
213    ) {
214        let mut remaining_fragments: HashMap<_, _> = fragments
215            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
216            .collect();
217        // delete the fragments that exist previously, but not included in the recovered fragments
218        let mut writer = self.start_writer(database_id);
219        let database = writer.write_guard.info.entry(database_id).or_default();
220        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
221            if let Some(info) = remaining_fragments.remove(fragment_id) {
222                let info = info.into();
223                writer
224                    .updated_fragment_mapping
225                    .get_or_insert_default()
226                    .push(rebuild_fragment_mapping(&info));
227                *fragment_info = info;
228                false
229            } else {
230                true
231            }
232        }) {
233            writer
234                .deleted_fragment_mapping
235                .get_or_insert_default()
236                .push(rebuild_fragment_mapping(&fragment));
237        }
238        for (fragment_id, info) in remaining_fragments {
239            let info = info.into();
240            writer
241                .added_fragment_mapping
242                .get_or_insert_default()
243                .push(rebuild_fragment_mapping(&info));
244            database.insert(fragment_id, info);
245        }
246        writer.finish();
247    }
248
249    pub(super) fn upsert(
250        &self,
251        database_id: DatabaseId,
252        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
253    ) {
254        let mut writer = self.start_writer(database_id);
255        writer.upsert(infos);
256        writer.finish();
257    }
258
259    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
260        SharedActorInfoWriter {
261            database_id,
262            write_guard: self.inner.write(),
263            notification_manager: &self.notification_manager,
264            added_fragment_mapping: None,
265            updated_fragment_mapping: None,
266            deleted_fragment_mapping: None,
267        }
268    }
269}
270
271pub(super) struct SharedActorInfoWriter<'a> {
272    database_id: DatabaseId,
273    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
274    notification_manager: &'a NotificationManagerRef,
275    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
276    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
277    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
278}
279
280impl SharedActorInfoWriter<'_> {
281    pub(super) fn upsert(
282        &mut self,
283        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
284    ) {
285        let database = self.write_guard.info.entry(self.database_id).or_default();
286        for info @ (fragment, _) in infos {
287            match database.entry(fragment.fragment_id) {
288                Entry::Occupied(mut entry) => {
289                    let info = info.into();
290                    self.updated_fragment_mapping
291                        .get_or_insert_default()
292                        .push(rebuild_fragment_mapping(&info));
293                    entry.insert(info);
294                }
295                Entry::Vacant(entry) => {
296                    let info = info.into();
297                    self.added_fragment_mapping
298                        .get_or_insert_default()
299                        .push(rebuild_fragment_mapping(&info));
300                    entry.insert(info);
301                }
302            }
303        }
304    }
305
306    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
307        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
308            && let Some(fragment) = database.remove(&info.fragment_id)
309        {
310            self.deleted_fragment_mapping
311                .get_or_insert_default()
312                .push(rebuild_fragment_mapping(&fragment));
313        }
314    }
315
316    pub(super) fn finish(self) {
317        if let Some(mapping) = self.added_fragment_mapping {
318            self.notification_manager
319                .notify_fragment_mapping(Operation::Add, mapping);
320        }
321        if let Some(mapping) = self.updated_fragment_mapping {
322            self.notification_manager
323                .notify_fragment_mapping(Operation::Update, mapping);
324        }
325        if let Some(mapping) = self.deleted_fragment_mapping {
326            self.notification_manager
327                .notify_fragment_mapping(Operation::Delete, mapping);
328        }
329    }
330}
331
332#[derive(Debug, Clone)]
333pub(super) struct BarrierInfo {
334    pub prev_epoch: TracedEpoch,
335    pub curr_epoch: TracedEpoch,
336    pub kind: BarrierKind,
337}
338
339impl BarrierInfo {
340    pub(super) fn prev_epoch(&self) -> u64 {
341        self.prev_epoch.value().0
342    }
343}
344
345#[derive(Debug, Clone)]
346pub(crate) enum CommandFragmentChanges {
347    NewFragment {
348        job_id: JobId,
349        info: InflightFragmentInfo,
350        /// Whether the fragment already exists before added. This is used
351        /// when snapshot backfill is finished and add its fragment info
352        /// back to the database.
353        is_existing: bool,
354    },
355    AddNodeUpstream(PbUpstreamSinkInfo),
356    DropNodeUpstream(Vec<FragmentId>),
357    ReplaceNodeUpstream(
358        /// old `fragment_id` -> new `fragment_id`
359        HashMap<FragmentId, FragmentId>,
360    ),
361    Reschedule {
362        new_actors: HashMap<ActorId, InflightActorInfo>,
363        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
364        to_remove: HashSet<ActorId>,
365        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
366    },
367    RemoveFragment,
368    SplitAssignment {
369        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
370    },
371}
372
373#[derive(Clone, Debug)]
374pub enum SubscriberType {
375    Subscription(u64),
376    SnapshotBackfill,
377}
378
379#[derive(Debug, Clone)]
380pub(super) enum CreateStreamingJobStatus {
381    Init,
382    Creating(CreateMviewProgressTracker),
383    Created,
384}
385
386#[derive(Debug, Clone)]
387pub(super) struct InflightStreamingJobInfo {
388    pub job_id: JobId,
389    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
390    pub subscribers: HashMap<u32, SubscriberType>,
391    pub status: CreateStreamingJobStatus,
392}
393
394impl InflightStreamingJobInfo {
395    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
396        self.fragment_infos.values()
397    }
398
399    pub fn snapshot_backfill_actor_ids(
400        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
401    ) -> impl Iterator<Item = ActorId> + '_ {
402        fragment_infos
403            .values()
404            .filter(|fragment| {
405                fragment
406                    .fragment_type_mask
407                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
408            })
409            .flat_map(|fragment| fragment.actors.keys().copied())
410    }
411
412    pub fn tracking_progress_actor_ids(
413        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
414    ) -> Vec<(ActorId, BackfillUpstreamType)> {
415        StreamJobFragments::tracking_progress_actor_ids_impl(
416            fragment_infos
417                .values()
418                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
419        )
420    }
421}
422
423impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
424    type Item = &'a InflightFragmentInfo;
425
426    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
427
428    fn into_iter(self) -> Self::IntoIter {
429        self.fragment_infos()
430    }
431}
432
433#[derive(Debug, Clone)]
434pub struct InflightDatabaseInfo {
435    database_id: DatabaseId,
436    jobs: HashMap<JobId, InflightStreamingJobInfo>,
437    fragment_location: HashMap<FragmentId, JobId>,
438    pub(super) shared_actor_infos: SharedActorInfos,
439}
440
441impl InflightDatabaseInfo {
442    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
443        self.jobs.values().flat_map(|job| job.fragment_infos())
444    }
445
446    pub fn contains_job(&self, job_id: JobId) -> bool {
447        self.jobs.contains_key(&job_id)
448    }
449
450    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
451        let job_id = self.fragment_location[&fragment_id];
452        self.jobs
453            .get(&job_id)
454            .expect("should exist")
455            .fragment_infos
456            .get(&fragment_id)
457            .expect("should exist")
458    }
459
460    pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
461        self.jobs
462            .iter()
463            .filter_map(|(job_id, job)| match &job.status {
464                CreateStreamingJobStatus::Init => None,
465                CreateStreamingJobStatus::Creating(tracker) => {
466                    Some((*job_id, tracker.gen_ddl_progress()))
467                }
468                CreateStreamingJobStatus::Created => None,
469            })
470    }
471
472    pub(super) fn apply_collected_command(
473        &mut self,
474        command: Option<&Command>,
475        resps: impl Iterator<Item = &BarrierCompleteResponse>,
476        version_stats: &HummockVersionStats,
477    ) {
478        if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
479            match job_type {
480                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
481                    let job_id = info.streaming_job.id();
482                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
483                        let CreateStreamingJobStatus::Init = replace(
484                            &mut job_info.status,
485                            CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
486                                info,
487                                version_stats,
488                            )),
489                        ) else {
490                            unreachable!("should be init before collect the first barrier")
491                        };
492                    } else {
493                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
494                    }
495                }
496                CreateStreamingJobType::SnapshotBackfill(_) => {
497                    // The progress of SnapshotBackfill won't be tracked here
498                }
499            }
500        }
501        for progress in resps.flat_map(|resp| &resp.create_mview_progress) {
502            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
503                warn!(
504                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
505                );
506                continue;
507            };
508            let CreateStreamingJobStatus::Creating(tracker) =
509                &mut self.jobs.get_mut(job_id).expect("should exist").status
510            else {
511                warn!("update the progress of an created streaming job: {progress:?}");
512                continue;
513            };
514            tracker.apply_progress(progress, version_stats);
515        }
516    }
517
518    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
519        self.jobs.values().filter_map(|job| match &job.status {
520            CreateStreamingJobStatus::Init => None,
521            CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
522            CreateStreamingJobStatus::Created => None,
523        })
524    }
525
526    fn iter_mut_creating_job_tracker(
527        &mut self,
528    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
529        self.jobs
530            .values_mut()
531            .filter_map(|job| match &mut job.status {
532                CreateStreamingJobStatus::Init => None,
533                CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
534                CreateStreamingJobStatus::Created => None,
535            })
536    }
537
538    pub(super) fn has_pending_finished_jobs(&self) -> bool {
539        self.iter_creating_job_tracker()
540            .any(|tracker| tracker.is_finished())
541    }
542
543    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
544        self.iter_mut_creating_job_tracker()
545            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
546            .collect()
547    }
548
549    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
550        let mut finished_jobs = vec![];
551        let mut table_ids_to_truncate = vec![];
552        for job in self.jobs.values_mut() {
553            if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
554                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
555                table_ids_to_truncate.extend(truncate_table_ids);
556                if is_finished {
557                    let CreateStreamingJobStatus::Creating(tracker) =
558                        replace(&mut job.status, CreateStreamingJobStatus::Created)
559                    else {
560                        unreachable!()
561                    };
562                    finished_jobs.push(tracker.into_tracking_job());
563                }
564            }
565        }
566        StagingCommitInfo {
567            finished_jobs,
568            table_ids_to_truncate,
569        }
570    }
571
572    pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
573        let job_id = self.fragment_location[&fragment_id];
574        self.jobs[&job_id].subscribers.keys().copied()
575    }
576
577    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = u32> + '_ {
578        self.jobs[&job_id].subscribers.keys().copied()
579    }
580
581    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
582        self.jobs
583            .iter()
584            .filter_map(|(job_id, info)| {
585                info.subscribers
586                    .values()
587                    .filter_map(|subscriber| match subscriber {
588                        SubscriberType::Subscription(retention) => Some(*retention),
589                        SubscriberType::SnapshotBackfill => None,
590                    })
591                    .max()
592                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
593            })
594            .collect()
595    }
596
597    pub fn register_subscriber(
598        &mut self,
599        job_id: JobId,
600        subscriber_id: u32,
601        subscriber: SubscriberType,
602    ) {
603        self.jobs
604            .get_mut(&job_id)
605            .expect("should exist")
606            .subscribers
607            .try_insert(subscriber_id, subscriber)
608            .expect("non duplicate");
609    }
610
611    pub fn unregister_subscriber(
612        &mut self,
613        job_id: JobId,
614        subscriber_id: u32,
615    ) -> Option<SubscriberType> {
616        self.jobs
617            .get_mut(&job_id)
618            .expect("should exist")
619            .subscribers
620            .remove(&subscriber_id)
621    }
622
623    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
624        let job_id = self.fragment_location[&fragment_id];
625        let fragment = self
626            .jobs
627            .get_mut(&job_id)
628            .expect("should exist")
629            .fragment_infos
630            .get_mut(&fragment_id)
631            .expect("should exist");
632        (fragment, job_id)
633    }
634
635    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
636        Self {
637            database_id,
638            jobs: Default::default(),
639            fragment_location: Default::default(),
640            shared_actor_infos,
641        }
642    }
643
644    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
645        // remove the database because it's empty.
646        shared_actor_infos.remove_database(database_id);
647        Self::empty_inner(database_id, shared_actor_infos)
648    }
649
650    pub fn recover(
651        database_id: DatabaseId,
652        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
653        shared_actor_infos: SharedActorInfos,
654    ) -> Self {
655        let mut info = Self::empty_inner(database_id, shared_actor_infos);
656        for job in jobs {
657            info.add_existing(job);
658        }
659        info
660    }
661
662    pub fn is_empty(&self) -> bool {
663        self.jobs.is_empty()
664    }
665
666    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
667        let InflightStreamingJobInfo {
668            job_id,
669            fragment_infos,
670            subscribers,
671            status,
672        } = job;
673        self.jobs
674            .try_insert(
675                job.job_id,
676                InflightStreamingJobInfo {
677                    job_id,
678                    subscribers,
679                    fragment_infos: Default::default(), // fill in later in apply_add
680                    status,
681                },
682            )
683            .expect("non-duplicate");
684        self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
685            (
686                fragment_id,
687                CommandFragmentChanges::NewFragment {
688                    job_id: job.job_id,
689                    info,
690                    is_existing: true,
691                },
692            )
693        }))
694    }
695
696    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
697    /// the info correspondingly.
698    pub(crate) fn pre_apply(
699        &mut self,
700        new_job_id: Option<JobId>,
701        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
702    ) {
703        if let Some(job_id) = new_job_id {
704            self.jobs
705                .try_insert(
706                    job_id,
707                    InflightStreamingJobInfo {
708                        job_id,
709                        fragment_infos: Default::default(),
710                        subscribers: Default::default(), // no subscriber for newly create job
711                        status: CreateStreamingJobStatus::Init,
712                    },
713                )
714                .expect("non-duplicate");
715        }
716        self.apply_add(
717            fragment_changes
718                .iter()
719                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
720        )
721    }
722
723    fn apply_add(
724        &mut self,
725        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
726    ) {
727        {
728            let shared_infos = self.shared_actor_infos.clone();
729            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
730            for (fragment_id, change) in fragment_changes {
731                match change {
732                    CommandFragmentChanges::NewFragment {
733                        job_id,
734                        info,
735                        is_existing,
736                    } => {
737                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
738                        if !is_existing {
739                            shared_actor_writer.upsert([(&info, job_id)]);
740                        }
741                        fragment_infos
742                            .fragment_infos
743                            .try_insert(fragment_id, info)
744                            .expect("non duplicate");
745                        self.fragment_location
746                            .try_insert(fragment_id, job_id)
747                            .expect("non duplicate");
748                    }
749                    CommandFragmentChanges::Reschedule {
750                        new_actors,
751                        actor_update_vnode_bitmap,
752                        actor_splits,
753                        ..
754                    } => {
755                        let (info, _) = self.fragment_mut(fragment_id);
756                        let actors = &mut info.actors;
757                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
758                            actors
759                                .get_mut(&actor_id)
760                                .expect("should exist")
761                                .vnode_bitmap = Some(new_vnodes);
762                        }
763                        for (actor_id, actor) in new_actors {
764                            actors
765                                .try_insert(actor_id as _, actor)
766                                .expect("non-duplicate");
767                        }
768                        for (actor_id, splits) in actor_splits {
769                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
770                        }
771
772                        // info will be upserted into shared_actor_infos in post_apply stage
773                    }
774                    CommandFragmentChanges::RemoveFragment => {}
775                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
776                        let mut remaining_fragment_ids: HashSet<_> =
777                            replace_map.keys().cloned().collect();
778                        let (info, _) = self.fragment_mut(fragment_id);
779                        visit_stream_node_mut(&mut info.nodes, |node| {
780                            if let NodeBody::Merge(m) = node
781                                && let Some(new_upstream_fragment_id) =
782                                    replace_map.get(&m.upstream_fragment_id)
783                            {
784                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
785                                    if cfg!(debug_assertions) {
786                                        panic!(
787                                            "duplicate upstream fragment: {:?} {:?}",
788                                            m, replace_map
789                                        );
790                                    } else {
791                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
792                                    }
793                                }
794                                m.upstream_fragment_id = *new_upstream_fragment_id;
795                            }
796                        });
797                        if cfg!(debug_assertions) {
798                            assert!(
799                                remaining_fragment_ids.is_empty(),
800                                "non-existing fragment to replace: {:?} {:?} {:?}",
801                                remaining_fragment_ids,
802                                info.nodes,
803                                replace_map
804                            );
805                        } else {
806                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
807                        }
808                    }
809                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
810                        let (info, _) = self.fragment_mut(fragment_id);
811                        let mut injected = false;
812                        visit_stream_node_mut(&mut info.nodes, |node| {
813                            if let NodeBody::UpstreamSinkUnion(u) = node {
814                                if cfg!(debug_assertions) {
815                                    let current_upstream_fragment_ids = u
816                                        .init_upstreams
817                                        .iter()
818                                        .map(|upstream| upstream.upstream_fragment_id)
819                                        .collect::<HashSet<_>>();
820                                    if current_upstream_fragment_ids
821                                        .contains(&new_upstream_info.upstream_fragment_id)
822                                    {
823                                        panic!(
824                                            "duplicate upstream fragment: {:?} {:?}",
825                                            u, new_upstream_info
826                                        );
827                                    }
828                                }
829                                u.init_upstreams.push(new_upstream_info.clone());
830                                injected = true;
831                            }
832                        });
833                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
834                    }
835                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
836                        let (info, _) = self.fragment_mut(fragment_id);
837                        let mut removed = false;
838                        visit_stream_node_mut(&mut info.nodes, |node| {
839                            if let NodeBody::UpstreamSinkUnion(u) = node {
840                                if cfg!(debug_assertions) {
841                                    let current_upstream_fragment_ids = u
842                                        .init_upstreams
843                                        .iter()
844                                        .map(|upstream| upstream.upstream_fragment_id)
845                                        .collect::<HashSet<FragmentId>>();
846                                    for drop_fragment_id in &drop_upstream_fragment_ids {
847                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
848                                        {
849                                            panic!(
850                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
851                                                u, drop_upstream_fragment_ids, drop_fragment_id
852                                            );
853                                        }
854                                    }
855                                }
856                                u.init_upstreams.retain(|upstream| {
857                                    !drop_upstream_fragment_ids
858                                        .contains(&upstream.upstream_fragment_id)
859                                });
860                                removed = true;
861                            }
862                        });
863                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
864                    }
865                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
866                        let (info, job_id) = self.fragment_mut(fragment_id);
867                        let actors = &mut info.actors;
868                        for (actor_id, splits) in actor_splits {
869                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
870                        }
871                        shared_actor_writer.upsert([(&*info, job_id)]);
872                    }
873                }
874            }
875            shared_actor_writer.finish();
876        }
877    }
878
879    pub(super) fn build_edge(
880        &self,
881        command: Option<&Command>,
882        control_stream_manager: &ControlStreamManager,
883    ) -> Option<FragmentEdgeBuildResult> {
884        let (info, replace_job, new_upstream_sink) = match command {
885            None => {
886                return None;
887            }
888            Some(command) => match command {
889                Command::Flush
890                | Command::Pause
891                | Command::Resume
892                | Command::DropStreamingJobs { .. }
893                | Command::MergeSnapshotBackfillStreamingJobs(_)
894                | Command::RescheduleFragment { .. }
895                | Command::SourceChangeSplit { .. }
896                | Command::Throttle(_)
897                | Command::CreateSubscription { .. }
898                | Command::DropSubscription { .. }
899                | Command::ConnectorPropsChange(_)
900                | Command::StartFragmentBackfill { .. }
901                | Command::Refresh { .. }
902                | Command::ListFinish { .. }
903                | Command::LoadFinish { .. } => {
904                    return None;
905                }
906                Command::CreateStreamingJob { info, job_type, .. } => {
907                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
908                        new_upstream_sink,
909                    ) = job_type
910                    {
911                        Some(new_upstream_sink)
912                    } else {
913                        None
914                    };
915                    (Some(info), None, new_upstream_sink)
916                }
917                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
918            },
919        };
920        // `existing_fragment_ids` consists of
921        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
922        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
923        // if the upstream fragment previously exists
924        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
925        // if creating a new sink-into-table
926        //  - should contain the `fragment_id` of the downstream table.
927        let existing_fragment_ids = info
928            .into_iter()
929            .flat_map(|info| info.upstream_fragment_downstreams.keys())
930            .chain(replace_job.into_iter().flat_map(|replace_job| {
931                replace_job
932                    .upstream_fragment_downstreams
933                    .keys()
934                    .filter(|fragment_id| {
935                        info.map(|info| {
936                            !info
937                                .stream_job_fragments
938                                .fragments
939                                .contains_key(fragment_id)
940                        })
941                        .unwrap_or(true)
942                    })
943                    .chain(replace_job.replace_upstream.keys())
944            }))
945            .chain(
946                new_upstream_sink
947                    .into_iter()
948                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
949            )
950            .cloned();
951        let new_fragment_infos = info
952            .into_iter()
953            .flat_map(|info| {
954                info.stream_job_fragments
955                    .new_fragment_info(&info.init_split_assignment)
956            })
957            .chain(replace_job.into_iter().flat_map(|replace_job| {
958                replace_job
959                    .new_fragments
960                    .new_fragment_info(&replace_job.init_split_assignment)
961                    .chain(
962                        replace_job
963                            .auto_refresh_schema_sinks
964                            .as_ref()
965                            .into_iter()
966                            .flat_map(|sinks| {
967                                sinks.iter().map(|sink| {
968                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
969                                })
970                            }),
971                    )
972            }))
973            .collect_vec();
974        let mut builder = FragmentEdgeBuilder::new(
975            existing_fragment_ids
976                .map(|fragment_id| self.fragment(fragment_id))
977                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
978            control_stream_manager,
979        );
980        if let Some(info) = info {
981            builder.add_relations(&info.upstream_fragment_downstreams);
982            builder.add_relations(&info.stream_job_fragments.downstreams);
983        }
984        if let Some(replace_job) = replace_job {
985            builder.add_relations(&replace_job.upstream_fragment_downstreams);
986            builder.add_relations(&replace_job.new_fragments.downstreams);
987        }
988        if let Some(new_upstream_sink) = new_upstream_sink {
989            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
990            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
991            builder.add_edge(sink_fragment_id, new_sink_downstream);
992        }
993        if let Some(replace_job) = replace_job {
994            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
995                for (original_upstream_fragment_id, new_upstream_fragment_id) in
996                    fragment_replacement
997                {
998                    builder.replace_upstream(
999                        *fragment_id,
1000                        *original_upstream_fragment_id,
1001                        *new_upstream_fragment_id,
1002                    );
1003                }
1004            }
1005        }
1006        Some(builder.build())
1007    }
1008
1009    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
1010    /// remove that from the snapshot correspondingly.
1011    pub(crate) fn post_apply(
1012        &mut self,
1013        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
1014    ) {
1015        let inner = self.shared_actor_infos.clone();
1016        let mut shared_actor_writer = inner.start_writer(self.database_id);
1017        {
1018            for (fragment_id, changes) in fragment_changes {
1019                match changes {
1020                    CommandFragmentChanges::NewFragment { .. } => {}
1021                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
1022                        let job_id = self.fragment_location[fragment_id];
1023                        let info = self
1024                            .jobs
1025                            .get_mut(&job_id)
1026                            .expect("should exist")
1027                            .fragment_infos
1028                            .get_mut(fragment_id)
1029                            .expect("should exist");
1030                        for actor_id in to_remove {
1031                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
1032                        }
1033                        shared_actor_writer.upsert([(&*info, job_id)]);
1034                    }
1035                    CommandFragmentChanges::RemoveFragment => {
1036                        let job_id = self
1037                            .fragment_location
1038                            .remove(fragment_id)
1039                            .expect("should exist");
1040                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1041                        let fragment = job
1042                            .fragment_infos
1043                            .remove(fragment_id)
1044                            .expect("should exist");
1045                        shared_actor_writer.remove(&fragment);
1046                        if job.fragment_infos.is_empty() {
1047                            self.jobs.remove(&job_id).expect("should exist");
1048                        }
1049                    }
1050                    CommandFragmentChanges::ReplaceNodeUpstream(_)
1051                    | CommandFragmentChanges::AddNodeUpstream(_)
1052                    | CommandFragmentChanges::DropNodeUpstream(_)
1053                    | CommandFragmentChanges::SplitAssignment { .. } => {}
1054                }
1055            }
1056        }
1057        shared_actor_writer.finish();
1058    }
1059}
1060
1061impl InflightFragmentInfo {
1062    /// Returns actor list to collect in the target worker node.
1063    pub(crate) fn actor_ids_to_collect(
1064        infos: impl IntoIterator<Item = &Self>,
1065    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1066        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1067        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1068            assert!(
1069                ret.entry(actor.worker_id)
1070                    .or_default()
1071                    .insert(*actor_id as _)
1072            )
1073        }
1074        ret
1075    }
1076
1077    pub fn existing_table_ids<'a>(
1078        infos: impl IntoIterator<Item = &'a Self> + 'a,
1079    ) -> impl Iterator<Item = TableId> + 'a {
1080        infos
1081            .into_iter()
1082            .flat_map(|info| info.state_table_ids.iter().cloned())
1083    }
1084
1085    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1086        infos.into_iter().any(|fragment| {
1087            fragment
1088                .actors
1089                .values()
1090                .any(|actor| (actor.worker_id) == worker_id)
1091        })
1092    }
1093}
1094
1095impl InflightDatabaseInfo {
1096    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1097        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1098    }
1099
1100    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1101        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1102    }
1103}