risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::ControlStreamManager;
46use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::rebuild_fragment_mapping;
49use crate::manager::NotificationManagerRef;
50use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
51
52#[derive(Debug, Clone)]
53pub struct SharedActorInfo {
54    pub worker_id: WorkerId,
55    pub vnode_bitmap: Option<Bitmap>,
56    pub splits: Vec<SplitImpl>,
57}
58
59impl From<&InflightActorInfo> for SharedActorInfo {
60    fn from(value: &InflightActorInfo) -> Self {
61        Self {
62            worker_id: value.worker_id,
63            vnode_bitmap: value.vnode_bitmap.clone(),
64            splits: value.splits.clone(),
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct SharedFragmentInfo {
71    pub fragment_id: FragmentId,
72    pub job_id: JobId,
73    pub distribution_type: DistributionType,
74    pub actors: HashMap<ActorId, SharedActorInfo>,
75    pub vnode_count: usize,
76    pub fragment_type_mask: FragmentTypeMask,
77}
78
79impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
80    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
81        let (info, job_id) = pair;
82
83        let InflightFragmentInfo {
84            fragment_id,
85            distribution_type,
86            fragment_type_mask,
87            actors,
88            vnode_count,
89            ..
90        } = info;
91
92        Self {
93            fragment_id: *fragment_id,
94            job_id,
95            distribution_type: *distribution_type,
96            fragment_type_mask: *fragment_type_mask,
97            actors: actors
98                .iter()
99                .map(|(actor_id, actor)| (*actor_id, actor.into()))
100                .collect(),
101            vnode_count: *vnode_count,
102        }
103    }
104}
105
106#[derive(Default, Debug)]
107pub struct SharedActorInfosInner {
108    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
109}
110
111impl SharedActorInfosInner {
112    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
113        self.info
114            .values()
115            .find_map(|database| database.get(&fragment_id))
116    }
117
118    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
119        self.info.values().flatten()
120    }
121}
122
123#[derive(Clone, educe::Educe)]
124#[educe(Debug)]
125pub struct SharedActorInfos {
126    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
127    #[educe(Debug(ignore))]
128    notification_manager: NotificationManagerRef,
129}
130
131impl SharedActorInfos {
132    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
133        self.inner.read()
134    }
135
136    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
137        let core = self.inner.read();
138        core.iter_over_fragments()
139            .flat_map(|(_, fragment)| {
140                fragment
141                    .actors
142                    .iter()
143                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
144            })
145            .collect()
146    }
147
148    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
149    ///
150    /// Very occasionally split removal may happen during scaling, in which case we need to
151    /// use the old splits for reallocation instead of the latest splits (which may be missing),
152    /// so that we can resolve the split removal in the next command.
153    pub fn migrate_splits_for_source_actors(
154        &self,
155        fragment_id: FragmentId,
156        prev_actor_ids: &[ActorId],
157        curr_actor_ids: &[ActorId],
158    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
159        let guard = self.read_guard();
160
161        let prev_splits = prev_actor_ids
162            .iter()
163            .flat_map(|actor_id| {
164                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
165                guard
166                    .get_fragment(fragment_id)
167                    .and_then(|info| info.actors.get(actor_id))
168                    .map(|actor| actor.splits.clone())
169                    .unwrap_or_default()
170            })
171            .map(|split| (split.id(), split))
172            .collect();
173
174        let empty_actor_splits = curr_actor_ids
175            .iter()
176            .map(|actor_id| (*actor_id, vec![]))
177            .collect();
178
179        let diff = crate::stream::source_manager::reassign_splits(
180            fragment_id,
181            empty_actor_splits,
182            &prev_splits,
183            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
184            std::default::Default::default(),
185        )
186        .unwrap_or_default();
187
188        Ok(diff)
189    }
190}
191
192impl SharedActorInfos {
193    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
194        Self {
195            inner: Arc::new(Default::default()),
196            notification_manager,
197        }
198    }
199
200    pub(super) fn remove_database(&self, database_id: DatabaseId) {
201        if let Some(database) = self.inner.write().info.remove(&database_id) {
202            let mapping = database
203                .into_values()
204                .map(|fragment| rebuild_fragment_mapping(&fragment))
205                .collect_vec();
206            if !mapping.is_empty() {
207                self.notification_manager
208                    .notify_fragment_mapping(Operation::Delete, mapping);
209            }
210        }
211    }
212
213    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
214        let database_ids: HashSet<_> = database_ids.into_iter().collect();
215
216        let mut mapping = Vec::new();
217        for fragment in self
218            .inner
219            .write()
220            .info
221            .extract_if(|database_id, _| !database_ids.contains(database_id))
222            .flat_map(|(_, fragments)| fragments.into_values())
223        {
224            mapping.push(rebuild_fragment_mapping(&fragment));
225        }
226        if !mapping.is_empty() {
227            self.notification_manager
228                .notify_fragment_mapping(Operation::Delete, mapping);
229        }
230    }
231
232    pub(super) fn recover_database(
233        &self,
234        database_id: DatabaseId,
235        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
236    ) {
237        let mut remaining_fragments: HashMap<_, _> = fragments
238            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
239            .collect();
240        // delete the fragments that exist previously, but not included in the recovered fragments
241        let mut writer = self.start_writer(database_id);
242        let database = writer.write_guard.info.entry(database_id).or_default();
243        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
244            if let Some(info) = remaining_fragments.remove(fragment_id) {
245                let info = info.into();
246                writer
247                    .updated_fragment_mapping
248                    .get_or_insert_default()
249                    .push(rebuild_fragment_mapping(&info));
250                *fragment_infos = info;
251                false
252            } else {
253                true
254            }
255        }) {
256            writer
257                .deleted_fragment_mapping
258                .get_or_insert_default()
259                .push(rebuild_fragment_mapping(&fragment));
260        }
261        for (fragment_id, info) in remaining_fragments {
262            let info = info.into();
263            writer
264                .added_fragment_mapping
265                .get_or_insert_default()
266                .push(rebuild_fragment_mapping(&info));
267            database.insert(fragment_id, info);
268        }
269        writer.finish();
270    }
271
272    pub(super) fn upsert(
273        &self,
274        database_id: DatabaseId,
275        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
276    ) {
277        let mut writer = self.start_writer(database_id);
278        writer.upsert(infos);
279        writer.finish();
280    }
281
282    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
283        SharedActorInfoWriter {
284            database_id,
285            write_guard: self.inner.write(),
286            notification_manager: &self.notification_manager,
287            added_fragment_mapping: None,
288            updated_fragment_mapping: None,
289            deleted_fragment_mapping: None,
290        }
291    }
292}
293
294pub(super) struct SharedActorInfoWriter<'a> {
295    database_id: DatabaseId,
296    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
297    notification_manager: &'a NotificationManagerRef,
298    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
300    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
301}
302
303impl SharedActorInfoWriter<'_> {
304    pub(super) fn upsert(
305        &mut self,
306        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
307    ) {
308        let database = self.write_guard.info.entry(self.database_id).or_default();
309        for info @ (fragment, _) in infos {
310            match database.entry(fragment.fragment_id) {
311                Entry::Occupied(mut entry) => {
312                    let info = info.into();
313                    self.updated_fragment_mapping
314                        .get_or_insert_default()
315                        .push(rebuild_fragment_mapping(&info));
316                    entry.insert(info);
317                }
318                Entry::Vacant(entry) => {
319                    let info = info.into();
320                    self.added_fragment_mapping
321                        .get_or_insert_default()
322                        .push(rebuild_fragment_mapping(&info));
323                    entry.insert(info);
324                }
325            }
326        }
327    }
328
329    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
330        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
331            && let Some(fragment) = database.remove(&info.fragment_id)
332        {
333            self.deleted_fragment_mapping
334                .get_or_insert_default()
335                .push(rebuild_fragment_mapping(&fragment));
336        }
337    }
338
339    pub(super) fn finish(self) {
340        if let Some(mapping) = self.added_fragment_mapping {
341            self.notification_manager
342                .notify_fragment_mapping(Operation::Add, mapping);
343        }
344        if let Some(mapping) = self.updated_fragment_mapping {
345            self.notification_manager
346                .notify_fragment_mapping(Operation::Update, mapping);
347        }
348        if let Some(mapping) = self.deleted_fragment_mapping {
349            self.notification_manager
350                .notify_fragment_mapping(Operation::Delete, mapping);
351        }
352    }
353}
354
355#[derive(Debug, Clone)]
356pub(super) struct BarrierInfo {
357    pub prev_epoch: TracedEpoch,
358    pub curr_epoch: TracedEpoch,
359    pub kind: BarrierKind,
360}
361
362impl BarrierInfo {
363    pub(super) fn prev_epoch(&self) -> u64 {
364        self.prev_epoch.value().0
365    }
366}
367
368#[derive(Debug)]
369pub(super) enum CommandFragmentChanges {
370    NewFragment {
371        job_id: JobId,
372        info: InflightFragmentInfo,
373        /// Whether the fragment already exists before added. This is used
374        /// when snapshot backfill is finished and add its fragment info
375        /// back to the database.
376        is_existing: bool,
377    },
378    AddNodeUpstream(PbUpstreamSinkInfo),
379    DropNodeUpstream(Vec<FragmentId>),
380    ReplaceNodeUpstream(
381        /// old `fragment_id` -> new `fragment_id`
382        HashMap<FragmentId, FragmentId>,
383    ),
384    Reschedule {
385        new_actors: HashMap<ActorId, InflightActorInfo>,
386        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
387        to_remove: HashSet<ActorId>,
388        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
389    },
390    RemoveFragment,
391    SplitAssignment {
392        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393    },
394}
395
396pub(super) enum PostApplyFragmentChanges {
397    Reschedule { to_remove: HashSet<ActorId> },
398    RemoveFragment,
399}
400
401#[derive(Clone, Debug)]
402pub enum SubscriberType {
403    Subscription(u64),
404    SnapshotBackfill,
405}
406
407#[derive(Debug)]
408pub(super) enum CreateStreamingJobStatus {
409    Init,
410    Creating(CreateMviewProgressTracker),
411    Created,
412}
413
414#[derive(Debug)]
415pub(super) struct InflightStreamingJobInfo {
416    pub job_id: JobId,
417    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
418    pub subscribers: HashMap<SubscriberId, SubscriberType>,
419    pub status: CreateStreamingJobStatus,
420    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
421}
422
423impl InflightStreamingJobInfo {
424    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
425        self.fragment_infos.values()
426    }
427
428    pub fn snapshot_backfill_actor_ids(
429        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430    ) -> impl Iterator<Item = ActorId> + '_ {
431        fragment_infos
432            .values()
433            .filter(|fragment| {
434                fragment
435                    .fragment_type_mask
436                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
437            })
438            .flat_map(|fragment| fragment.actors.keys().copied())
439    }
440
441    pub fn tracking_progress_actor_ids(
442        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
443    ) -> Vec<(ActorId, BackfillUpstreamType)> {
444        StreamJobFragments::tracking_progress_actor_ids_impl(
445            fragment_infos
446                .values()
447                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
448        )
449    }
450}
451
452impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
453    type Item = &'a InflightFragmentInfo;
454
455    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
456
457    fn into_iter(self) -> Self::IntoIter {
458        self.fragment_infos()
459    }
460}
461
462#[derive(Debug)]
463pub struct InflightDatabaseInfo {
464    database_id: DatabaseId,
465    jobs: HashMap<JobId, InflightStreamingJobInfo>,
466    fragment_location: HashMap<FragmentId, JobId>,
467    pub(super) shared_actor_infos: SharedActorInfos,
468}
469
470impl InflightDatabaseInfo {
471    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
472        self.jobs.values().flat_map(|job| job.fragment_infos())
473    }
474
475    pub fn contains_job(&self, job_id: JobId) -> bool {
476        self.jobs.contains_key(&job_id)
477    }
478
479    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
480        let job_id = self.fragment_location[&fragment_id];
481        self.jobs
482            .get(&job_id)
483            .expect("should exist")
484            .fragment_infos
485            .get(&fragment_id)
486            .expect("should exist")
487    }
488
489    pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
490        self.jobs
491            .iter()
492            .filter_map(|(job_id, job)| match &job.status {
493                CreateStreamingJobStatus::Init => None,
494                CreateStreamingJobStatus::Creating(tracker) => {
495                    Some((*job_id, tracker.gen_ddl_progress()))
496                }
497                CreateStreamingJobStatus::Created => None,
498            })
499    }
500
501    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
502        self.jobs.iter().filter_map(|(job_id, job)| {
503            job.cdc_table_backfill_tracker
504                .as_ref()
505                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
506        })
507    }
508
509    pub(super) fn may_assign_fragment_cdc_backfill_splits(
510        &mut self,
511        fragment_id: FragmentId,
512    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
513        let job_id = self.fragment_location[&fragment_id];
514        let job = self.jobs.get_mut(&job_id).expect("should exist");
515        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
516            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
517            if cdc_scan_fragment_id != fragment_id {
518                return Ok(None);
519            }
520            let actors = job.fragment_infos[&cdc_scan_fragment_id]
521                .actors
522                .keys()
523                .copied()
524                .collect();
525            tracker.reassign_splits(actors).map(Some)
526        } else {
527            Ok(None)
528        }
529    }
530
531    pub(super) fn assign_cdc_backfill_splits(
532        &mut self,
533        job_id: JobId,
534    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
535        let job = self.jobs.get_mut(&job_id).expect("should exist");
536        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
537            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
538            let actors = job.fragment_infos[&cdc_scan_fragment_id]
539                .actors
540                .keys()
541                .copied()
542                .collect();
543            tracker.reassign_splits(actors).map(Some)
544        } else {
545            Ok(None)
546        }
547    }
548
549    pub(super) fn apply_collected_command(
550        &mut self,
551        command: Option<&Command>,
552        resps: &[BarrierCompleteResponse],
553        version_stats: &HummockVersionStats,
554    ) {
555        if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
556            match job_type {
557                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
558                    let job_id = info.streaming_job.id();
559                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
560                        let CreateStreamingJobStatus::Init = replace(
561                            &mut job_info.status,
562                            CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
563                                info,
564                                version_stats,
565                            )),
566                        ) else {
567                            unreachable!("should be init before collect the first barrier")
568                        };
569                    } else {
570                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
571                    }
572                }
573                CreateStreamingJobType::SnapshotBackfill(_) => {
574                    // The progress of SnapshotBackfill won't be tracked here
575                }
576            }
577        }
578        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
579            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
580                warn!(
581                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
582                );
583                continue;
584            };
585            let CreateStreamingJobStatus::Creating(tracker) =
586                &mut self.jobs.get_mut(job_id).expect("should exist").status
587            else {
588                warn!("update the progress of an created streaming job: {progress:?}");
589                continue;
590            };
591            tracker.apply_progress(progress, version_stats);
592        }
593        for progress in resps
594            .iter()
595            .flat_map(|resp| &resp.cdc_table_backfill_progress)
596        {
597            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
598                warn!(
599                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
600                );
601                continue;
602            };
603            let Some(tracker) = &mut self
604                .jobs
605                .get_mut(job_id)
606                .expect("should exist")
607                .cdc_table_backfill_tracker
608            else {
609                warn!("update the progress of an created streaming job: {progress:?}");
610                continue;
611            };
612            tracker.update_split_progress(progress);
613        }
614    }
615
616    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
617        self.jobs.values().filter_map(|job| match &job.status {
618            CreateStreamingJobStatus::Init => None,
619            CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
620            CreateStreamingJobStatus::Created => None,
621        })
622    }
623
624    fn iter_mut_creating_job_tracker(
625        &mut self,
626    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
627        self.jobs
628            .values_mut()
629            .filter_map(|job| match &mut job.status {
630                CreateStreamingJobStatus::Init => None,
631                CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
632                CreateStreamingJobStatus::Created => None,
633            })
634    }
635
636    pub(super) fn has_pending_finished_jobs(&self) -> bool {
637        self.iter_creating_job_tracker()
638            .any(|tracker| tracker.is_finished())
639    }
640
641    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
642        self.iter_mut_creating_job_tracker()
643            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
644            .collect()
645    }
646
647    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
648        let mut finished_jobs = vec![];
649        let mut table_ids_to_truncate = vec![];
650        let mut finished_cdc_table_backfill = vec![];
651        for (job_id, job) in &mut self.jobs {
652            if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
653                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
654                table_ids_to_truncate.extend(truncate_table_ids);
655                if is_finished {
656                    let CreateStreamingJobStatus::Creating(tracker) =
657                        replace(&mut job.status, CreateStreamingJobStatus::Created)
658                    else {
659                        unreachable!()
660                    };
661                    finished_jobs.push(tracker.into_tracking_job());
662                }
663            }
664            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
665                && tracker.take_pre_completed()
666            {
667                finished_cdc_table_backfill.push(*job_id);
668            }
669        }
670        StagingCommitInfo {
671            finished_jobs,
672            table_ids_to_truncate,
673            finished_cdc_table_backfill,
674        }
675    }
676
677    pub fn fragment_subscribers(
678        &self,
679        fragment_id: FragmentId,
680    ) -> impl Iterator<Item = SubscriberId> + '_ {
681        let job_id = self.fragment_location[&fragment_id];
682        self.jobs[&job_id].subscribers.keys().copied()
683    }
684
685    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
686        self.jobs[&job_id].subscribers.keys().copied()
687    }
688
689    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
690        self.jobs
691            .iter()
692            .filter_map(|(job_id, info)| {
693                info.subscribers
694                    .values()
695                    .filter_map(|subscriber| match subscriber {
696                        SubscriberType::Subscription(retention) => Some(*retention),
697                        SubscriberType::SnapshotBackfill => None,
698                    })
699                    .max()
700                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
701            })
702            .collect()
703    }
704
705    pub fn register_subscriber(
706        &mut self,
707        job_id: JobId,
708        subscriber_id: SubscriberId,
709        subscriber: SubscriberType,
710    ) {
711        self.jobs
712            .get_mut(&job_id)
713            .expect("should exist")
714            .subscribers
715            .try_insert(subscriber_id, subscriber)
716            .expect("non duplicate");
717    }
718
719    pub fn unregister_subscriber(
720        &mut self,
721        job_id: JobId,
722        subscriber_id: SubscriberId,
723    ) -> Option<SubscriberType> {
724        self.jobs
725            .get_mut(&job_id)
726            .expect("should exist")
727            .subscribers
728            .remove(&subscriber_id)
729    }
730
731    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
732        let job_id = self.fragment_location[&fragment_id];
733        let fragment = self
734            .jobs
735            .get_mut(&job_id)
736            .expect("should exist")
737            .fragment_infos
738            .get_mut(&fragment_id)
739            .expect("should exist");
740        (fragment, job_id)
741    }
742
743    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
744        Self {
745            database_id,
746            jobs: Default::default(),
747            fragment_location: Default::default(),
748            shared_actor_infos,
749        }
750    }
751
752    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
753        // remove the database because it's empty.
754        shared_actor_infos.remove_database(database_id);
755        Self::empty_inner(database_id, shared_actor_infos)
756    }
757
758    pub fn recover(
759        database_id: DatabaseId,
760        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
761        shared_actor_infos: SharedActorInfos,
762    ) -> Self {
763        let mut info = Self::empty_inner(database_id, shared_actor_infos);
764        for job in jobs {
765            info.add_existing(job);
766        }
767        info
768    }
769
770    pub fn is_empty(&self) -> bool {
771        self.jobs.is_empty()
772    }
773
774    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
775        let InflightStreamingJobInfo {
776            job_id,
777            fragment_infos,
778            subscribers,
779            status,
780            cdc_table_backfill_tracker,
781        } = job;
782        self.jobs
783            .try_insert(
784                job.job_id,
785                InflightStreamingJobInfo {
786                    job_id,
787                    subscribers,
788                    fragment_infos: Default::default(), // fill in later in apply_add
789                    status,
790                    cdc_table_backfill_tracker,
791                },
792            )
793            .expect("non-duplicate");
794        let post_apply_changes =
795            self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
796                (
797                    fragment_id,
798                    CommandFragmentChanges::NewFragment {
799                        job_id: job.job_id,
800                        info,
801                        is_existing: true,
802                    },
803                )
804            }));
805        self.post_apply(post_apply_changes);
806    }
807
808    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
809    /// the info correspondingly.
810    pub(crate) fn pre_apply(
811        &mut self,
812        new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
813        fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
814    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
815        if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
816            self.jobs
817                .try_insert(
818                    job_id,
819                    InflightStreamingJobInfo {
820                        job_id,
821                        fragment_infos: Default::default(),
822                        subscribers: Default::default(), // no subscriber for newly create job
823                        status: CreateStreamingJobStatus::Init,
824                        cdc_table_backfill_tracker,
825                    },
826                )
827                .expect("non-duplicate");
828        }
829        self.apply_add(fragment_changes.into_iter())
830    }
831
832    fn apply_add(
833        &mut self,
834        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
835    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
836        let mut post_apply = HashMap::new();
837        {
838            let shared_infos = self.shared_actor_infos.clone();
839            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
840            for (fragment_id, change) in fragment_changes {
841                match change {
842                    CommandFragmentChanges::NewFragment {
843                        job_id,
844                        info,
845                        is_existing,
846                    } => {
847                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
848                        if !is_existing {
849                            shared_actor_writer.upsert([(&info, job_id)]);
850                        }
851                        fragment_infos
852                            .fragment_infos
853                            .try_insert(fragment_id, info)
854                            .expect("non duplicate");
855                        self.fragment_location
856                            .try_insert(fragment_id, job_id)
857                            .expect("non duplicate");
858                    }
859                    CommandFragmentChanges::Reschedule {
860                        new_actors,
861                        actor_update_vnode_bitmap,
862                        to_remove,
863                        actor_splits,
864                    } => {
865                        let (info, _) = self.fragment_mut(fragment_id);
866                        let actors = &mut info.actors;
867                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
868                            actors
869                                .get_mut(&actor_id)
870                                .expect("should exist")
871                                .vnode_bitmap = Some(new_vnodes);
872                        }
873                        for (actor_id, actor) in new_actors {
874                            actors
875                                .try_insert(actor_id as _, actor)
876                                .expect("non-duplicate");
877                        }
878                        for (actor_id, splits) in actor_splits {
879                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
880                        }
881
882                        post_apply.insert(
883                            fragment_id,
884                            PostApplyFragmentChanges::Reschedule { to_remove },
885                        );
886
887                        // info will be upserted into shared_actor_infos in post_apply stage
888                    }
889                    CommandFragmentChanges::RemoveFragment => {
890                        post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
891                    }
892                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
893                        let mut remaining_fragment_ids: HashSet<_> =
894                            replace_map.keys().cloned().collect();
895                        let (info, _) = self.fragment_mut(fragment_id);
896                        visit_stream_node_mut(&mut info.nodes, |node| {
897                            if let NodeBody::Merge(m) = node
898                                && let Some(new_upstream_fragment_id) =
899                                    replace_map.get(&m.upstream_fragment_id)
900                            {
901                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
902                                    if cfg!(debug_assertions) {
903                                        panic!(
904                                            "duplicate upstream fragment: {:?} {:?}",
905                                            m, replace_map
906                                        );
907                                    } else {
908                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
909                                    }
910                                }
911                                m.upstream_fragment_id = *new_upstream_fragment_id;
912                            }
913                        });
914                        if cfg!(debug_assertions) {
915                            assert!(
916                                remaining_fragment_ids.is_empty(),
917                                "non-existing fragment to replace: {:?} {:?} {:?}",
918                                remaining_fragment_ids,
919                                info.nodes,
920                                replace_map
921                            );
922                        } else {
923                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
924                        }
925                    }
926                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
927                        let (info, _) = self.fragment_mut(fragment_id);
928                        let mut injected = false;
929                        visit_stream_node_mut(&mut info.nodes, |node| {
930                            if let NodeBody::UpstreamSinkUnion(u) = node {
931                                if cfg!(debug_assertions) {
932                                    let current_upstream_fragment_ids = u
933                                        .init_upstreams
934                                        .iter()
935                                        .map(|upstream| upstream.upstream_fragment_id)
936                                        .collect::<HashSet<_>>();
937                                    if current_upstream_fragment_ids
938                                        .contains(&new_upstream_info.upstream_fragment_id)
939                                    {
940                                        panic!(
941                                            "duplicate upstream fragment: {:?} {:?}",
942                                            u, new_upstream_info
943                                        );
944                                    }
945                                }
946                                u.init_upstreams.push(new_upstream_info.clone());
947                                injected = true;
948                            }
949                        });
950                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
951                    }
952                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
953                        let (info, _) = self.fragment_mut(fragment_id);
954                        let mut removed = false;
955                        visit_stream_node_mut(&mut info.nodes, |node| {
956                            if let NodeBody::UpstreamSinkUnion(u) = node {
957                                if cfg!(debug_assertions) {
958                                    let current_upstream_fragment_ids = u
959                                        .init_upstreams
960                                        .iter()
961                                        .map(|upstream| upstream.upstream_fragment_id)
962                                        .collect::<HashSet<FragmentId>>();
963                                    for drop_fragment_id in &drop_upstream_fragment_ids {
964                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
965                                        {
966                                            panic!(
967                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
968                                                u, drop_upstream_fragment_ids, drop_fragment_id
969                                            );
970                                        }
971                                    }
972                                }
973                                u.init_upstreams.retain(|upstream| {
974                                    !drop_upstream_fragment_ids
975                                        .contains(&upstream.upstream_fragment_id)
976                                });
977                                removed = true;
978                            }
979                        });
980                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
981                    }
982                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
983                        let (info, job_id) = self.fragment_mut(fragment_id);
984                        let actors = &mut info.actors;
985                        for (actor_id, splits) in actor_splits {
986                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
987                        }
988                        shared_actor_writer.upsert([(&*info, job_id)]);
989                    }
990                }
991            }
992            shared_actor_writer.finish();
993        }
994        post_apply
995    }
996
997    pub(super) fn build_edge(
998        &self,
999        command: Option<&Command>,
1000        control_stream_manager: &ControlStreamManager,
1001    ) -> Option<FragmentEdgeBuildResult> {
1002        let (info, replace_job, new_upstream_sink) = match command {
1003            None => {
1004                return None;
1005            }
1006            Some(command) => match command {
1007                Command::Flush
1008                | Command::Pause
1009                | Command::Resume
1010                | Command::DropStreamingJobs { .. }
1011                | Command::RescheduleFragment { .. }
1012                | Command::SourceChangeSplit { .. }
1013                | Command::Throttle(_)
1014                | Command::CreateSubscription { .. }
1015                | Command::DropSubscription { .. }
1016                | Command::ConnectorPropsChange(_)
1017                | Command::Refresh { .. }
1018                | Command::ListFinish { .. }
1019                | Command::LoadFinish { .. } => {
1020                    return None;
1021                }
1022                Command::CreateStreamingJob { info, job_type, .. } => {
1023                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1024                        new_upstream_sink,
1025                    ) = job_type
1026                    {
1027                        Some(new_upstream_sink)
1028                    } else {
1029                        None
1030                    };
1031                    (Some(info), None, new_upstream_sink)
1032                }
1033                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1034            },
1035        };
1036        // `existing_fragment_ids` consists of
1037        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1038        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1039        // if the upstream fragment previously exists
1040        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1041        // if creating a new sink-into-table
1042        //  - should contain the `fragment_id` of the downstream table.
1043        let existing_fragment_ids = info
1044            .into_iter()
1045            .flat_map(|info| info.upstream_fragment_downstreams.keys())
1046            .chain(replace_job.into_iter().flat_map(|replace_job| {
1047                replace_job
1048                    .upstream_fragment_downstreams
1049                    .keys()
1050                    .filter(|fragment_id| {
1051                        info.map(|info| {
1052                            !info
1053                                .stream_job_fragments
1054                                .fragments
1055                                .contains_key(fragment_id)
1056                        })
1057                        .unwrap_or(true)
1058                    })
1059                    .chain(replace_job.replace_upstream.keys())
1060            }))
1061            .chain(
1062                new_upstream_sink
1063                    .into_iter()
1064                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1065            )
1066            .cloned();
1067        let new_fragment_infos = info
1068            .into_iter()
1069            .flat_map(|info| {
1070                info.stream_job_fragments
1071                    .new_fragment_info(&info.init_split_assignment)
1072            })
1073            .chain(replace_job.into_iter().flat_map(|replace_job| {
1074                replace_job
1075                    .new_fragments
1076                    .new_fragment_info(&replace_job.init_split_assignment)
1077                    .chain(
1078                        replace_job
1079                            .auto_refresh_schema_sinks
1080                            .as_ref()
1081                            .into_iter()
1082                            .flat_map(|sinks| {
1083                                sinks.iter().map(|sink| {
1084                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
1085                                })
1086                            }),
1087                    )
1088            }))
1089            .collect_vec();
1090        let mut builder = FragmentEdgeBuilder::new(
1091            existing_fragment_ids
1092                .map(|fragment_id| self.fragment(fragment_id))
1093                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
1094            control_stream_manager,
1095        );
1096        if let Some(info) = info {
1097            builder.add_relations(&info.upstream_fragment_downstreams);
1098            builder.add_relations(&info.stream_job_fragments.downstreams);
1099        }
1100        if let Some(replace_job) = replace_job {
1101            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1102            builder.add_relations(&replace_job.new_fragments.downstreams);
1103        }
1104        if let Some(new_upstream_sink) = new_upstream_sink {
1105            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1106            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1107            builder.add_edge(sink_fragment_id, new_sink_downstream);
1108        }
1109        if let Some(replace_job) = replace_job {
1110            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1111                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1112                    fragment_replacement
1113                {
1114                    builder.replace_upstream(
1115                        *fragment_id,
1116                        *original_upstream_fragment_id,
1117                        *new_upstream_fragment_id,
1118                    );
1119                }
1120            }
1121        }
1122        Some(builder.build())
1123    }
1124
1125    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
1126    /// remove that from the snapshot correspondingly.
1127    pub(crate) fn post_apply(
1128        &mut self,
1129        fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1130    ) {
1131        let inner = self.shared_actor_infos.clone();
1132        let mut shared_actor_writer = inner.start_writer(self.database_id);
1133        {
1134            for (fragment_id, changes) in fragment_changes {
1135                match changes {
1136                    PostApplyFragmentChanges::Reschedule { to_remove } => {
1137                        let job_id = self.fragment_location[&fragment_id];
1138                        let info = self
1139                            .jobs
1140                            .get_mut(&job_id)
1141                            .expect("should exist")
1142                            .fragment_infos
1143                            .get_mut(&fragment_id)
1144                            .expect("should exist");
1145                        for actor_id in to_remove {
1146                            assert!(info.actors.remove(&actor_id).is_some());
1147                        }
1148                        shared_actor_writer.upsert([(&*info, job_id)]);
1149                    }
1150                    PostApplyFragmentChanges::RemoveFragment => {
1151                        let job_id = self
1152                            .fragment_location
1153                            .remove(&fragment_id)
1154                            .expect("should exist");
1155                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1156                        let fragment = job
1157                            .fragment_infos
1158                            .remove(&fragment_id)
1159                            .expect("should exist");
1160                        shared_actor_writer.remove(&fragment);
1161                        if job.fragment_infos.is_empty() {
1162                            self.jobs.remove(&job_id).expect("should exist");
1163                        }
1164                    }
1165                }
1166            }
1167        }
1168        shared_actor_writer.finish();
1169    }
1170}
1171
1172impl InflightFragmentInfo {
1173    /// Returns actor list to collect in the target worker node.
1174    pub(crate) fn actor_ids_to_collect(
1175        infos: impl IntoIterator<Item = &Self>,
1176    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1177        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1178        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1179            assert!(
1180                ret.entry(actor.worker_id)
1181                    .or_default()
1182                    .insert(*actor_id as _)
1183            )
1184        }
1185        ret
1186    }
1187
1188    pub fn existing_table_ids<'a>(
1189        infos: impl IntoIterator<Item = &'a Self> + 'a,
1190    ) -> impl Iterator<Item = TableId> + 'a {
1191        infos
1192            .into_iter()
1193            .flat_map(|info| info.state_table_ids.iter().cloned())
1194    }
1195
1196    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1197        infos.into_iter().any(|fragment| {
1198            fragment
1199                .actors
1200                .values()
1201                .any(|actor| (actor.worker_id) == worker_id)
1202        })
1203    }
1204}
1205
1206impl InflightDatabaseInfo {
1207    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1208        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1209    }
1210
1211    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1212        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1213    }
1214}