risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44    CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{
47    EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
48};
49use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
50use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
51use crate::barrier::{
52    BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
55use crate::controller::utils::rebuild_fragment_mapping;
56use crate::manager::NotificationManagerRef;
57use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
58use crate::stream::UpstreamSinkInfo;
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone)]
62pub struct SharedActorInfo {
63    pub worker_id: WorkerId,
64    pub vnode_bitmap: Option<Bitmap>,
65    pub splits: Vec<SplitImpl>,
66}
67
68impl From<&InflightActorInfo> for SharedActorInfo {
69    fn from(value: &InflightActorInfo) -> Self {
70        Self {
71            worker_id: value.worker_id,
72            vnode_bitmap: value.vnode_bitmap.clone(),
73            splits: value.splits.clone(),
74        }
75    }
76}
77
78#[derive(Debug, Clone)]
79pub struct SharedFragmentInfo {
80    pub fragment_id: FragmentId,
81    pub job_id: JobId,
82    pub distribution_type: DistributionType,
83    pub actors: HashMap<ActorId, SharedActorInfo>,
84    pub vnode_count: usize,
85    pub fragment_type_mask: FragmentTypeMask,
86}
87
88impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
89    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
90        let (info, job_id) = pair;
91
92        let InflightFragmentInfo {
93            fragment_id,
94            distribution_type,
95            fragment_type_mask,
96            actors,
97            vnode_count,
98            ..
99        } = info;
100
101        Self {
102            fragment_id: *fragment_id,
103            job_id,
104            distribution_type: *distribution_type,
105            fragment_type_mask: *fragment_type_mask,
106            actors: actors
107                .iter()
108                .map(|(actor_id, actor)| (*actor_id, actor.into()))
109                .collect(),
110            vnode_count: *vnode_count,
111        }
112    }
113}
114
115#[derive(Default, Debug)]
116pub struct SharedActorInfosInner {
117    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
118}
119
120impl SharedActorInfosInner {
121    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
122        self.info
123            .values()
124            .find_map(|database| database.get(&fragment_id))
125    }
126
127    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
128        self.info.values().flatten()
129    }
130}
131
132#[derive(Clone, educe::Educe)]
133#[educe(Debug)]
134pub struct SharedActorInfos {
135    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
136    #[educe(Debug(ignore))]
137    notification_manager: NotificationManagerRef,
138}
139
140impl SharedActorInfos {
141    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
142        self.inner.read()
143    }
144
145    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
146        let core = self.inner.read();
147        core.iter_over_fragments()
148            .flat_map(|(_, fragment)| {
149                fragment
150                    .actors
151                    .iter()
152                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
153            })
154            .collect()
155    }
156
157    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
158    ///
159    /// Very occasionally split removal may happen during scaling, in which case we need to
160    /// use the old splits for reallocation instead of the latest splits (which may be missing),
161    /// so that we can resolve the split removal in the next command.
162    pub fn migrate_splits_for_source_actors(
163        &self,
164        fragment_id: FragmentId,
165        prev_actor_ids: &[ActorId],
166        curr_actor_ids: &[ActorId],
167    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
168        let guard = self.read_guard();
169
170        let prev_splits = prev_actor_ids
171            .iter()
172            .flat_map(|actor_id| {
173                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
174                guard
175                    .get_fragment(fragment_id)
176                    .and_then(|info| info.actors.get(actor_id))
177                    .map(|actor| actor.splits.clone())
178                    .unwrap_or_default()
179            })
180            .map(|split| (split.id(), split))
181            .collect();
182
183        let empty_actor_splits = curr_actor_ids
184            .iter()
185            .map(|actor_id| (*actor_id, vec![]))
186            .collect();
187
188        let diff = crate::stream::source_manager::reassign_splits(
189            fragment_id,
190            empty_actor_splits,
191            &prev_splits,
192            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
193            std::default::Default::default(),
194        )
195        .unwrap_or_default();
196
197        Ok(diff)
198    }
199}
200
201impl SharedActorInfos {
202    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
203        Self {
204            inner: Arc::new(Default::default()),
205            notification_manager,
206        }
207    }
208
209    pub(super) fn remove_database(&self, database_id: DatabaseId) {
210        if let Some(database) = self.inner.write().info.remove(&database_id) {
211            let mapping = database
212                .into_values()
213                .map(|fragment| rebuild_fragment_mapping(&fragment))
214                .collect_vec();
215            if !mapping.is_empty() {
216                self.notification_manager
217                    .notify_fragment_mapping(Operation::Delete, mapping);
218            }
219        }
220    }
221
222    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
223        let database_ids: HashSet<_> = database_ids.into_iter().collect();
224
225        let mut mapping = Vec::new();
226        for fragment in self
227            .inner
228            .write()
229            .info
230            .extract_if(|database_id, _| !database_ids.contains(database_id))
231            .flat_map(|(_, fragments)| fragments.into_values())
232        {
233            mapping.push(rebuild_fragment_mapping(&fragment));
234        }
235        if !mapping.is_empty() {
236            self.notification_manager
237                .notify_fragment_mapping(Operation::Delete, mapping);
238        }
239    }
240
241    pub(super) fn recover_database(
242        &self,
243        database_id: DatabaseId,
244        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
245    ) {
246        let mut remaining_fragments: HashMap<_, _> = fragments
247            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
248            .collect();
249        // delete the fragments that exist previously, but not included in the recovered fragments
250        let mut writer = self.start_writer(database_id);
251        let database = writer.write_guard.info.entry(database_id).or_default();
252        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
253            if let Some(info) = remaining_fragments.remove(fragment_id) {
254                let info = info.into();
255                writer
256                    .updated_fragment_mapping
257                    .get_or_insert_default()
258                    .push(rebuild_fragment_mapping(&info));
259                *fragment_infos = info;
260                false
261            } else {
262                true
263            }
264        }) {
265            writer
266                .deleted_fragment_mapping
267                .get_or_insert_default()
268                .push(rebuild_fragment_mapping(&fragment));
269        }
270        for (fragment_id, info) in remaining_fragments {
271            let info = info.into();
272            writer
273                .added_fragment_mapping
274                .get_or_insert_default()
275                .push(rebuild_fragment_mapping(&info));
276            database.insert(fragment_id, info);
277        }
278        writer.finish();
279    }
280
281    pub(super) fn upsert(
282        &self,
283        database_id: DatabaseId,
284        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
285    ) {
286        let mut writer = self.start_writer(database_id);
287        writer.upsert(infos);
288        writer.finish();
289    }
290
291    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
292        SharedActorInfoWriter {
293            database_id,
294            write_guard: self.inner.write(),
295            notification_manager: &self.notification_manager,
296            added_fragment_mapping: None,
297            updated_fragment_mapping: None,
298            deleted_fragment_mapping: None,
299        }
300    }
301}
302
303pub(super) struct SharedActorInfoWriter<'a> {
304    database_id: DatabaseId,
305    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
306    notification_manager: &'a NotificationManagerRef,
307    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
309    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310}
311
312impl SharedActorInfoWriter<'_> {
313    pub(super) fn upsert(
314        &mut self,
315        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
316    ) {
317        let database = self.write_guard.info.entry(self.database_id).or_default();
318        for info @ (fragment, _) in infos {
319            match database.entry(fragment.fragment_id) {
320                Entry::Occupied(mut entry) => {
321                    let info = info.into();
322                    self.updated_fragment_mapping
323                        .get_or_insert_default()
324                        .push(rebuild_fragment_mapping(&info));
325                    entry.insert(info);
326                }
327                Entry::Vacant(entry) => {
328                    let info = info.into();
329                    self.added_fragment_mapping
330                        .get_or_insert_default()
331                        .push(rebuild_fragment_mapping(&info));
332                    entry.insert(info);
333                }
334            }
335        }
336    }
337
338    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
339        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
340            && let Some(fragment) = database.remove(&info.fragment_id)
341        {
342            self.deleted_fragment_mapping
343                .get_or_insert_default()
344                .push(rebuild_fragment_mapping(&fragment));
345        }
346    }
347
348    pub(super) fn finish(self) {
349        if let Some(mapping) = self.added_fragment_mapping {
350            self.notification_manager
351                .notify_fragment_mapping(Operation::Add, mapping);
352        }
353        if let Some(mapping) = self.updated_fragment_mapping {
354            self.notification_manager
355                .notify_fragment_mapping(Operation::Update, mapping);
356        }
357        if let Some(mapping) = self.deleted_fragment_mapping {
358            self.notification_manager
359                .notify_fragment_mapping(Operation::Delete, mapping);
360        }
361    }
362}
363
364#[derive(Debug, Clone)]
365pub(super) struct BarrierInfo {
366    pub prev_epoch: TracedEpoch,
367    pub curr_epoch: TracedEpoch,
368    pub kind: BarrierKind,
369}
370
371impl BarrierInfo {
372    pub(super) fn prev_epoch(&self) -> u64 {
373        self.prev_epoch.value().0
374    }
375
376    pub(super) fn curr_epoch(&self) -> u64 {
377        self.curr_epoch.value().0
378    }
379
380    pub(super) fn epoch(&self) -> EpochPair {
381        EpochPair {
382            curr: self.curr_epoch(),
383            prev: self.prev_epoch(),
384        }
385    }
386}
387
388#[derive(Clone, Debug)]
389pub enum SubscriberType {
390    Subscription(u64),
391    SnapshotBackfill,
392}
393
394#[derive(Debug)]
395pub(super) enum CreateStreamingJobStatus {
396    Init,
397    Creating { tracker: CreateMviewProgressTracker },
398    Created,
399}
400
401#[derive(Debug)]
402pub(super) struct InflightStreamingJobInfo {
403    pub job_id: JobId,
404    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
405    pub subscribers: HashMap<SubscriberId, SubscriberType>,
406    pub status: CreateStreamingJobStatus,
407    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
408}
409
410impl InflightStreamingJobInfo {
411    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
412        self.fragment_infos.values()
413    }
414
415    pub fn snapshot_backfill_actor_ids(
416        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
417    ) -> impl Iterator<Item = ActorId> + '_ {
418        fragment_infos
419            .values()
420            .filter(|fragment| {
421                fragment
422                    .fragment_type_mask
423                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
424            })
425            .flat_map(|fragment| fragment.actors.keys().copied())
426    }
427
428    pub fn tracking_progress_actor_ids(
429        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430    ) -> Vec<(ActorId, BackfillUpstreamType)> {
431        StreamJobFragments::tracking_progress_actor_ids_impl(
432            fragment_infos
433                .values()
434                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
435        )
436    }
437}
438
439impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
440    type Item = &'a InflightFragmentInfo;
441
442    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
443
444    fn into_iter(self) -> Self::IntoIter {
445        self.fragment_infos()
446    }
447}
448
449#[derive(Debug)]
450pub struct InflightDatabaseInfo {
451    pub(super) database_id: DatabaseId,
452    jobs: HashMap<JobId, InflightStreamingJobInfo>,
453    fragment_location: HashMap<FragmentId, JobId>,
454    pub(super) shared_actor_infos: SharedActorInfos,
455}
456
457impl InflightDatabaseInfo {
458    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
459        self.jobs.values().flat_map(|job| job.fragment_infos())
460    }
461
462    pub fn contains_job(&self, job_id: JobId) -> bool {
463        self.jobs.contains_key(&job_id)
464    }
465
466    pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
467        self.fragment_location.get(&fragment_id).copied()
468    }
469
470    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
471        let job_id = self.fragment_location[&fragment_id];
472        self.jobs
473            .get(&job_id)
474            .expect("should exist")
475            .fragment_infos
476            .get(&fragment_id)
477            .expect("should exist")
478    }
479
480    pub(super) fn backfill_fragment_ids_for_job(
481        &self,
482        job_id: JobId,
483    ) -> MetaResult<HashSet<FragmentId>> {
484        let job = self
485            .jobs
486            .get(&job_id)
487            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
488        Ok(job
489            .fragment_infos
490            .iter()
491            .filter_map(|(fragment_id, fragment)| {
492                fragment
493                    .fragment_type_mask
494                    .contains_any([
495                        FragmentTypeFlag::StreamScan,
496                        FragmentTypeFlag::SourceScan,
497                        FragmentTypeFlag::LocalityProvider,
498                    ])
499                    .then_some(*fragment_id)
500            })
501            .collect())
502    }
503
504    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
505        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
506            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
507        })?;
508        let fragment = self
509            .jobs
510            .get(job_id)
511            .expect("should exist")
512            .fragment_infos
513            .get(&fragment_id)
514            .expect("should exist");
515        Ok(fragment.fragment_type_mask.contains_any([
516            FragmentTypeFlag::StreamScan,
517            FragmentTypeFlag::SourceScan,
518            FragmentTypeFlag::LocalityProvider,
519        ]))
520    }
521
522    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
523        self.jobs
524            .iter()
525            .filter_map(|(job_id, job)| match &job.status {
526                CreateStreamingJobStatus::Init => None,
527                CreateStreamingJobStatus::Creating { tracker } => {
528                    let progress = tracker.gen_backfill_progress();
529                    Some((
530                        *job_id,
531                        BackfillProgress {
532                            progress,
533                            backfill_type: PbBackfillType::NormalBackfill,
534                        },
535                    ))
536                }
537                CreateStreamingJobStatus::Created => None,
538            })
539    }
540
541    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
542        self.jobs.iter().filter_map(|(job_id, job)| {
543            job.cdc_table_backfill_tracker
544                .as_ref()
545                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
546        })
547    }
548
549    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
550        let mut result = Vec::new();
551        for job in self.jobs.values() {
552            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
553                continue;
554            };
555            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
556            result.extend(fragment_progress);
557        }
558        result
559    }
560
561    pub(super) fn may_assign_fragment_cdc_backfill_splits(
562        &mut self,
563        fragment_id: FragmentId,
564    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
565        let job_id = self.fragment_location[&fragment_id];
566        let job = self.jobs.get_mut(&job_id).expect("should exist");
567        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
568            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
569            if cdc_scan_fragment_id != fragment_id {
570                return Ok(None);
571            }
572            let actors = job.fragment_infos[&cdc_scan_fragment_id]
573                .actors
574                .keys()
575                .copied()
576                .collect();
577            tracker.reassign_splits(actors).map(Some)
578        } else {
579            Ok(None)
580        }
581    }
582
583    pub(super) fn assign_cdc_backfill_splits(
584        &mut self,
585        job_id: JobId,
586    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
587        let job = self.jobs.get_mut(&job_id).expect("should exist");
588        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
589            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
590            let actors = job.fragment_infos[&cdc_scan_fragment_id]
591                .actors
592                .keys()
593                .copied()
594                .collect();
595            tracker.reassign_splits(actors).map(Some)
596        } else {
597            Ok(None)
598        }
599    }
600
601    pub(super) fn apply_collected_command(
602        &mut self,
603        command: &PostCollectCommand,
604        resps: &[BarrierCompleteResponse],
605        version_stats: &HummockVersionStats,
606    ) {
607        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
608            match job_type {
609                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
610                    let job_id = info.streaming_job.id();
611                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
612                        let CreateStreamingJobStatus::Init = replace(
613                            &mut job_info.status,
614                            CreateStreamingJobStatus::Creating {
615                                tracker: CreateMviewProgressTracker::new(
616                                    info,
617                                    version_stats,
618                                    &job_info.fragment_infos,
619                                ),
620                            },
621                        ) else {
622                            unreachable!("should be init before collect the first barrier")
623                        };
624                    } else {
625                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
626                    }
627                }
628                CreateStreamingJobType::SnapshotBackfill(_) => {
629                    // The progress of SnapshotBackfill won't be tracked here
630                }
631            }
632        }
633        if let PostCollectCommand::Reschedule { reschedules, .. } = command {
634            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
635            debug_assert!(
636                reschedules
637                    .values()
638                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
639                "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
640            );
641
642            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
643            let related_job_ids = reschedules
644                .keys()
645                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
646                .cloned()
647                .collect::<HashSet<_>>();
648            for job_id in related_job_ids {
649                if let Some(job) = self.jobs.get_mut(&job_id)
650                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
651                {
652                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
653                }
654            }
655        }
656        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
657            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
658                warn!(
659                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
660                );
661                continue;
662            };
663            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
664                CreateStreamingJobStatus::Init => {
665                    continue;
666                }
667                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
668                CreateStreamingJobStatus::Created => {
669                    if !progress.done {
670                        warn!("update the progress of an created streaming job: {progress:?}");
671                    }
672                    continue;
673                }
674            };
675            tracker.apply_progress(progress, version_stats);
676        }
677        for progress in resps
678            .iter()
679            .flat_map(|resp| &resp.cdc_table_backfill_progress)
680        {
681            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
682                warn!(
683                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684                );
685                continue;
686            };
687            let Some(tracker) = &mut self
688                .jobs
689                .get_mut(job_id)
690                .expect("should exist")
691                .cdc_table_backfill_tracker
692            else {
693                warn!("update the cdc progress of an created streaming job: {progress:?}");
694                continue;
695            };
696            tracker.update_split_progress(progress);
697        }
698        // Handle CDC source offset updated events
699        for cdc_offset_updated in resps
700            .iter()
701            .flat_map(|resp| &resp.cdc_source_offset_updated)
702        {
703            use risingwave_common::id::SourceId;
704            let source_id = SourceId::new(cdc_offset_updated.source_id);
705            let job_id = source_id.as_share_source_job_id();
706            if let Some(job) = self.jobs.get_mut(&job_id) {
707                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
708                    tracker.mark_cdc_source_finished();
709                }
710            } else {
711                warn!(
712                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
713                    cdc_offset_updated.source_id, job_id
714                );
715            }
716        }
717    }
718
719    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
720        self.jobs.values().filter_map(|job| match &job.status {
721            CreateStreamingJobStatus::Init => None,
722            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
723            CreateStreamingJobStatus::Created => None,
724        })
725    }
726
727    fn iter_mut_creating_job_tracker(
728        &mut self,
729    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
730        self.jobs
731            .values_mut()
732            .filter_map(|job| match &mut job.status {
733                CreateStreamingJobStatus::Init => None,
734                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
735                CreateStreamingJobStatus::Created => None,
736            })
737    }
738
739    pub(super) fn has_pending_finished_jobs(&self) -> bool {
740        self.iter_creating_job_tracker()
741            .any(|tracker| tracker.is_finished())
742    }
743
744    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
745        self.iter_mut_creating_job_tracker()
746            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
747            .collect()
748    }
749
750    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
751        let mut finished_jobs = vec![];
752        let mut table_ids_to_truncate = vec![];
753        let mut finished_cdc_table_backfill = vec![];
754        for (job_id, job) in &mut self.jobs {
755            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
756                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
757                table_ids_to_truncate.extend(truncate_table_ids);
758                if is_finished {
759                    let CreateStreamingJobStatus::Creating { tracker, .. } =
760                        replace(&mut job.status, CreateStreamingJobStatus::Created)
761                    else {
762                        unreachable!()
763                    };
764                    finished_jobs.push(tracker.into_tracking_job());
765                }
766            }
767            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
768                && tracker.take_pre_completed()
769            {
770                finished_cdc_table_backfill.push(*job_id);
771            }
772        }
773        StagingCommitInfo {
774            finished_jobs,
775            table_ids_to_truncate,
776            finished_cdc_table_backfill,
777        }
778    }
779
780    pub fn fragment_subscribers(
781        &self,
782        fragment_id: FragmentId,
783    ) -> impl Iterator<Item = SubscriberId> + '_ {
784        let job_id = self.fragment_location[&fragment_id];
785        self.jobs[&job_id].subscribers.keys().copied()
786    }
787
788    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
789        self.jobs[&job_id].subscribers.keys().copied()
790    }
791
792    pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
793        self.jobs.iter().filter_map(|(job_id, info)| {
794            info.subscribers
795                .values()
796                .filter_map(|subscriber| match subscriber {
797                    SubscriberType::Subscription(retention) => Some(*retention),
798                    SubscriberType::SnapshotBackfill => None,
799                })
800                .max()
801                .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
802        })
803    }
804
805    pub fn register_subscriber(
806        &mut self,
807        job_id: JobId,
808        subscriber_id: SubscriberId,
809        subscriber: SubscriberType,
810    ) {
811        self.jobs
812            .get_mut(&job_id)
813            .expect("should exist")
814            .subscribers
815            .try_insert(subscriber_id, subscriber)
816            .expect("non duplicate");
817    }
818
819    pub fn unregister_subscriber(
820        &mut self,
821        job_id: JobId,
822        subscriber_id: SubscriberId,
823    ) -> Option<SubscriberType> {
824        self.jobs
825            .get_mut(&job_id)
826            .expect("should exist")
827            .subscribers
828            .remove(&subscriber_id)
829    }
830
831    pub fn update_subscription_retention(
832        &mut self,
833        job_id: JobId,
834        subscriber_id: SubscriberId,
835        retention_second: u64,
836    ) {
837        let job = self.jobs.get_mut(&job_id).expect("should exist");
838        match job.subscribers.get_mut(&subscriber_id) {
839            Some(SubscriberType::Subscription(current_retention)) => {
840                *current_retention = retention_second;
841            }
842            Some(SubscriberType::SnapshotBackfill) => {
843                warn!(
844                    %job_id,
845                    %subscriber_id,
846                    "cannot update retention for snapshot backfill subscriber"
847                );
848            }
849            None => {
850                warn!(%job_id, %subscriber_id, "subscription subscriber not found");
851            }
852        }
853    }
854
855    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
856        let job_id = self.fragment_location[&fragment_id];
857        let fragment = self
858            .jobs
859            .get_mut(&job_id)
860            .expect("should exist")
861            .fragment_infos
862            .get_mut(&fragment_id)
863            .expect("should exist");
864        (fragment, job_id)
865    }
866
867    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
868        Self {
869            database_id,
870            jobs: Default::default(),
871            fragment_location: Default::default(),
872            shared_actor_infos,
873        }
874    }
875
876    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
877        // remove the database because it's empty.
878        shared_actor_infos.remove_database(database_id);
879        Self::empty_inner(database_id, shared_actor_infos)
880    }
881
882    pub fn recover(
883        database_id: DatabaseId,
884        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
885        shared_actor_infos: SharedActorInfos,
886    ) -> Self {
887        let mut info = Self::empty_inner(database_id, shared_actor_infos);
888        for job in jobs {
889            info.add_existing(job);
890        }
891        info
892    }
893
894    pub fn is_empty(&self) -> bool {
895        self.jobs.is_empty()
896    }
897
898    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
899        let InflightStreamingJobInfo {
900            job_id,
901            fragment_infos,
902            subscribers,
903            status,
904            cdc_table_backfill_tracker,
905        } = job;
906        self.jobs
907            .try_insert(
908                job_id,
909                InflightStreamingJobInfo {
910                    job_id,
911                    subscribers,
912                    fragment_infos: Default::default(), // fill in later in pre_apply_new_fragments
913                    status,
914                    cdc_table_backfill_tracker,
915                },
916            )
917            .expect("non-duplicate");
918        self.pre_apply_new_fragments(
919            fragment_infos
920                .into_iter()
921                .map(|(fragment_id, info)| (fragment_id, job_id, info)),
922        );
923    }
924
925    /// Register a new streaming job entry (with empty `fragment_infos`).
926    pub(crate) fn pre_apply_new_job(
927        &mut self,
928        job_id: JobId,
929        cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
930    ) {
931        {
932            self.jobs
933                .try_insert(
934                    job_id,
935                    InflightStreamingJobInfo {
936                        job_id,
937                        fragment_infos: Default::default(),
938                        subscribers: Default::default(), // no subscriber for newly create job
939                        status: CreateStreamingJobStatus::Init,
940                        cdc_table_backfill_tracker,
941                    },
942                )
943                .expect("non-duplicate");
944        }
945    }
946
947    /// Add new fragment infos and update shared actor infos.
948    pub(crate) fn pre_apply_new_fragments(
949        &mut self,
950        fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
951    ) {
952        {
953            let shared_infos = self.shared_actor_infos.clone();
954            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
955            for (fragment_id, job_id, info) in fragments {
956                {
957                    {
958                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
959                        shared_actor_writer.upsert([(&info, job_id)]);
960                        fragment_infos
961                            .fragment_infos
962                            .try_insert(fragment_id, info)
963                            .expect("non duplicate");
964                        self.fragment_location
965                            .try_insert(fragment_id, job_id)
966                            .expect("non duplicate");
967                    }
968                }
969            }
970            shared_actor_writer.finish();
971        }
972    }
973
974    /// Pre-apply reschedule: update actors, vnode bitmaps, and splits.
975    /// The actual removal of old actors happens in `post_apply_reschedules`.
976    pub(crate) fn pre_apply_reschedule(
977        &mut self,
978        fragment_id: FragmentId,
979        new_actors: HashMap<ActorId, InflightActorInfo>,
980        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
981        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
982    ) {
983        {
984            {
985                {
986                    {
987                        let (info, _) = self.fragment_mut(fragment_id);
988                        let actors = &mut info.actors;
989                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
990                            actors
991                                .get_mut(&actor_id)
992                                .expect("should exist")
993                                .vnode_bitmap = Some(new_vnodes);
994                        }
995                        for (actor_id, actor) in new_actors {
996                            actors
997                                .try_insert(actor_id as _, actor)
998                                .expect("non-duplicate");
999                        }
1000                        for (actor_id, splits) in actor_splits {
1001                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1002                        }
1003                        // info will be upserted into shared_actor_infos in post_apply stage
1004                    }
1005                }
1006            }
1007        }
1008    }
1009
1010    /// Replace upstream fragment IDs in merge nodes of a fragment's stream graph.
1011    pub(crate) fn pre_apply_replace_node_upstream(
1012        &mut self,
1013        fragment_id: FragmentId,
1014        replace_map: &HashMap<FragmentId, FragmentId>,
1015    ) {
1016        {
1017            {
1018                {
1019                    {
1020                        let mut remaining_fragment_ids: HashSet<_> =
1021                            replace_map.keys().cloned().collect();
1022                        let (info, _) = self.fragment_mut(fragment_id);
1023                        visit_stream_node_mut(&mut info.nodes, |node| {
1024                            if let NodeBody::Merge(m) = node
1025                                && let Some(new_upstream_fragment_id) =
1026                                    replace_map.get(&m.upstream_fragment_id)
1027                            {
1028                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1029                                    if cfg!(debug_assertions) {
1030                                        panic!(
1031                                            "duplicate upstream fragment: {:?} {:?}",
1032                                            m, replace_map
1033                                        );
1034                                    } else {
1035                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1036                                    }
1037                                }
1038                                m.upstream_fragment_id = *new_upstream_fragment_id;
1039                            }
1040                        });
1041                        if cfg!(debug_assertions) {
1042                            assert!(
1043                                remaining_fragment_ids.is_empty(),
1044                                "non-existing fragment to replace: {:?} {:?} {:?}",
1045                                remaining_fragment_ids,
1046                                info.nodes,
1047                                replace_map
1048                            );
1049                        } else {
1050                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1051                        }
1052                    }
1053                }
1054            }
1055        }
1056    }
1057
1058    /// Add a new upstream sink node to a fragment's `UpstreamSinkUnion`.
1059    pub(crate) fn pre_apply_add_node_upstream(
1060        &mut self,
1061        fragment_id: FragmentId,
1062        new_upstream_info: &PbUpstreamSinkInfo,
1063    ) {
1064        {
1065            {
1066                {
1067                    {
1068                        let (info, _) = self.fragment_mut(fragment_id);
1069                        let mut injected = false;
1070                        visit_stream_node_mut(&mut info.nodes, |node| {
1071                            if let NodeBody::UpstreamSinkUnion(u) = node {
1072                                if cfg!(debug_assertions) {
1073                                    let current_upstream_fragment_ids = u
1074                                        .init_upstreams
1075                                        .iter()
1076                                        .map(|upstream| upstream.upstream_fragment_id)
1077                                        .collect::<HashSet<_>>();
1078                                    if current_upstream_fragment_ids
1079                                        .contains(&new_upstream_info.upstream_fragment_id)
1080                                    {
1081                                        panic!(
1082                                            "duplicate upstream fragment: {:?} {:?}",
1083                                            u, new_upstream_info
1084                                        );
1085                                    }
1086                                }
1087                                u.init_upstreams.push(new_upstream_info.clone());
1088                                injected = true;
1089                            }
1090                        });
1091                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1092                    }
1093                }
1094            }
1095        }
1096    }
1097
1098    /// Remove upstream sink nodes from a fragment's `UpstreamSinkUnion`.
1099    pub(crate) fn pre_apply_drop_node_upstream(
1100        &mut self,
1101        fragment_id: FragmentId,
1102        drop_upstream_fragment_ids: &[FragmentId],
1103    ) {
1104        {
1105            {
1106                {
1107                    {
1108                        let (info, _) = self.fragment_mut(fragment_id);
1109                        let mut removed = false;
1110                        visit_stream_node_mut(&mut info.nodes, |node| {
1111                            if let NodeBody::UpstreamSinkUnion(u) = node {
1112                                if cfg!(debug_assertions) {
1113                                    let current_upstream_fragment_ids = u
1114                                        .init_upstreams
1115                                        .iter()
1116                                        .map(|upstream| upstream.upstream_fragment_id)
1117                                        .collect::<HashSet<FragmentId>>();
1118                                    for drop_fragment_id in drop_upstream_fragment_ids {
1119                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1120                                        {
1121                                            panic!(
1122                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1123                                                u, drop_upstream_fragment_ids, drop_fragment_id
1124                                            );
1125                                        }
1126                                    }
1127                                }
1128                                u.init_upstreams.retain(|upstream| {
1129                                    !drop_upstream_fragment_ids
1130                                        .contains(&upstream.upstream_fragment_id)
1131                                });
1132                                removed = true;
1133                            }
1134                        });
1135                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1136                    }
1137                }
1138            }
1139        }
1140    }
1141
1142    /// Update split assignments for actors in fragments.
1143    pub(crate) fn pre_apply_split_assignments(
1144        &mut self,
1145        assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1146    ) {
1147        {
1148            let shared_infos = self.shared_actor_infos.clone();
1149            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1150            {
1151                {
1152                    for (fragment_id, actor_splits) in assignments {
1153                        let (info, job_id) = self.fragment_mut(fragment_id);
1154                        let actors = &mut info.actors;
1155                        for (actor_id, splits) in actor_splits {
1156                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1157                        }
1158                        shared_actor_writer.upsert([(&*info, job_id)]);
1159                    }
1160                }
1161            }
1162            shared_actor_writer.finish();
1163        }
1164    }
1165
1166    pub(super) fn build_edge(
1167        &self,
1168        info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1169        replace_job: Option<&ReplaceStreamJobPlan>,
1170        new_upstream_sink: Option<&UpstreamSinkInfo>,
1171        control_stream_manager: &ControlStreamManager,
1172        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1173        actor_location: &HashMap<ActorId, WorkerId>,
1174    ) -> FragmentEdgeBuildResult {
1175        // `existing_fragment_ids` consists of
1176        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1177        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1178        // if the upstream fragment previously exists
1179        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1180        // if creating a new sink-into-table
1181        //  - should contain the `fragment_id` of the downstream table.
1182        let existing_fragment_ids = info
1183            .into_iter()
1184            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1185            .chain(replace_job.into_iter().flat_map(|replace_job| {
1186                replace_job
1187                    .upstream_fragment_downstreams
1188                    .keys()
1189                    .filter(|fragment_id| {
1190                        info.map(|(info, _)| {
1191                            !info
1192                                .stream_job_fragments
1193                                .fragments
1194                                .contains_key(*fragment_id)
1195                        })
1196                        .unwrap_or(true)
1197                    })
1198                    .chain(replace_job.replace_upstream.keys())
1199            }))
1200            .chain(
1201                new_upstream_sink
1202                    .into_iter()
1203                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1204            )
1205            .cloned();
1206        // Collect new fragments with their partial graph IDs
1207        let new_fragments = info
1208            .into_iter()
1209            .flat_map(|(info, is_snapshot_backfill)| {
1210                let partial_graph_id = to_partial_graph_id(
1211                    self.database_id,
1212                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1213                );
1214                info.stream_job_fragments
1215                    .fragments
1216                    .values()
1217                    .map(move |fragment| (partial_graph_id, fragment))
1218            })
1219            .chain(replace_job.into_iter().flat_map(|replace_job| {
1220                replace_job
1221                    .new_fragments
1222                    .fragments
1223                    .values()
1224                    .chain(
1225                        replace_job
1226                            .auto_refresh_schema_sinks
1227                            .as_ref()
1228                            .into_iter()
1229                            .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1230                    )
1231                    .map(|fragment| {
1232                        (
1233                            // we assume that replace job only happens in database partial graph
1234                            to_partial_graph_id(self.database_id, None),
1235                            fragment,
1236                        )
1237                    })
1238            }));
1239
1240        let mut builder = FragmentEdgeBuilder::new(
1241            // Existing fragments
1242            existing_fragment_ids
1243                .map(|fragment_id| {
1244                    (
1245                        fragment_id,
1246                        EdgeBuilderFragmentInfo::from_inflight(
1247                            self.fragment(fragment_id),
1248                            to_partial_graph_id(self.database_id, None),
1249                            control_stream_manager,
1250                        ),
1251                    )
1252                })
1253                // New fragments from create/replace jobs
1254                .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1255                    (
1256                        fragment.fragment_id,
1257                        EdgeBuilderFragmentInfo::from_fragment(
1258                            fragment,
1259                            stream_actors,
1260                            actor_location,
1261                            partial_graph_id,
1262                            control_stream_manager,
1263                        ),
1264                    )
1265                })),
1266        );
1267        if let Some((info, _)) = info {
1268            builder.add_relations(&info.upstream_fragment_downstreams);
1269            builder.add_relations(&info.stream_job_fragments.downstreams);
1270        }
1271        if let Some(replace_job) = replace_job {
1272            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1273            builder.add_relations(&replace_job.new_fragments.downstreams);
1274        }
1275        if let Some(new_upstream_sink) = new_upstream_sink {
1276            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1277            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1278            builder.add_edge(sink_fragment_id, new_sink_downstream);
1279        }
1280        if let Some(replace_job) = replace_job {
1281            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1282                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1283                    fragment_replacement
1284                {
1285                    builder.replace_upstream(
1286                        *fragment_id,
1287                        *original_upstream_fragment_id,
1288                        *new_upstream_fragment_id,
1289                    );
1290                }
1291            }
1292        }
1293        builder.build()
1294    }
1295
1296    /// Post-apply reschedule: remove actors that were marked for removal.
1297    pub(crate) fn post_apply_reschedules(
1298        &mut self,
1299        reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1300    ) {
1301        let inner = self.shared_actor_infos.clone();
1302        let mut shared_actor_writer = inner.start_writer(self.database_id);
1303        {
1304            {
1305                {
1306                    for (fragment_id, to_remove) in reschedules {
1307                        let job_id = self.fragment_location[&fragment_id];
1308                        let info = self
1309                            .jobs
1310                            .get_mut(&job_id)
1311                            .expect("should exist")
1312                            .fragment_infos
1313                            .get_mut(&fragment_id)
1314                            .expect("should exist");
1315                        for actor_id in to_remove {
1316                            assert!(info.actors.remove(&actor_id).is_some());
1317                        }
1318                        shared_actor_writer.upsert([(&*info, job_id)]);
1319                    }
1320                }
1321            }
1322        }
1323        shared_actor_writer.finish();
1324    }
1325
1326    /// Post-apply fragment removal: remove fragments and their jobs if empty.
1327    pub(crate) fn post_apply_remove_fragments(
1328        &mut self,
1329        fragment_ids: impl IntoIterator<Item = FragmentId>,
1330    ) {
1331        let inner = self.shared_actor_infos.clone();
1332        let mut shared_actor_writer = inner.start_writer(self.database_id);
1333        {
1334            {
1335                {
1336                    for fragment_id in fragment_ids {
1337                        let job_id = self
1338                            .fragment_location
1339                            .remove(&fragment_id)
1340                            .expect("should exist");
1341                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1342                        let fragment = job
1343                            .fragment_infos
1344                            .remove(&fragment_id)
1345                            .expect("should exist");
1346                        shared_actor_writer.remove(&fragment);
1347                        if job.fragment_infos.is_empty() {
1348                            self.jobs.remove(&job_id).expect("should exist");
1349                        }
1350                    }
1351                }
1352            }
1353        }
1354        shared_actor_writer.finish();
1355    }
1356}
1357
1358impl InflightFragmentInfo {
1359    /// Returns actor list to collect in the target worker node.
1360    pub(crate) fn actor_ids_to_collect(
1361        infos: impl IntoIterator<Item = &Self>,
1362    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1363        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1364        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1365            assert!(
1366                ret.entry(actor.worker_id)
1367                    .or_default()
1368                    .insert(*actor_id as _)
1369            )
1370        }
1371        ret
1372    }
1373
1374    pub fn existing_table_ids<'a>(
1375        infos: impl IntoIterator<Item = &'a Self> + 'a,
1376    ) -> impl Iterator<Item = TableId> + 'a {
1377        infos
1378            .into_iter()
1379            .flat_map(|info| info.state_table_ids.iter().cloned())
1380    }
1381
1382    pub fn workers<'a>(
1383        infos: impl IntoIterator<Item = &'a Self> + 'a,
1384    ) -> impl Iterator<Item = WorkerId> + 'a {
1385        infos
1386            .into_iter()
1387            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1388    }
1389
1390    pub fn contains_worker<'a>(
1391        infos: impl IntoIterator<Item = &'a Self> + 'a,
1392        worker_id: WorkerId,
1393    ) -> bool {
1394        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1395    }
1396}
1397
1398impl InflightDatabaseInfo {
1399    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1400        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1401    }
1402
1403    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1404        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1405    }
1406}