risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44    CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{
47    EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
48};
49use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
50use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
51use crate::barrier::{
52    BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
53};
54use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
55use crate::controller::utils::rebuild_fragment_mapping;
56use crate::manager::NotificationManagerRef;
57use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
58use crate::stream::UpstreamSinkInfo;
59use crate::{MetaError, MetaResult};
60
61#[derive(Debug, Clone)]
62pub struct SharedActorInfo {
63    pub worker_id: WorkerId,
64    pub vnode_bitmap: Option<Bitmap>,
65    pub splits: Vec<SplitImpl>,
66}
67
68impl From<&InflightActorInfo> for SharedActorInfo {
69    fn from(value: &InflightActorInfo) -> Self {
70        Self {
71            worker_id: value.worker_id,
72            vnode_bitmap: value.vnode_bitmap.clone(),
73            splits: value.splits.clone(),
74        }
75    }
76}
77
78#[derive(Debug, Clone)]
79pub struct SharedFragmentInfo {
80    pub fragment_id: FragmentId,
81    pub job_id: JobId,
82    pub distribution_type: DistributionType,
83    pub actors: HashMap<ActorId, SharedActorInfo>,
84    pub vnode_count: usize,
85    pub fragment_type_mask: FragmentTypeMask,
86}
87
88impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
89    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
90        let (info, job_id) = pair;
91
92        let InflightFragmentInfo {
93            fragment_id,
94            distribution_type,
95            fragment_type_mask,
96            actors,
97            vnode_count,
98            ..
99        } = info;
100
101        Self {
102            fragment_id: *fragment_id,
103            job_id,
104            distribution_type: *distribution_type,
105            fragment_type_mask: *fragment_type_mask,
106            actors: actors
107                .iter()
108                .map(|(actor_id, actor)| (*actor_id, actor.into()))
109                .collect(),
110            vnode_count: *vnode_count,
111        }
112    }
113}
114
115#[derive(Default, Debug)]
116pub struct SharedActorInfosInner {
117    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
118}
119
120impl SharedActorInfosInner {
121    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
122        self.info
123            .values()
124            .find_map(|database| database.get(&fragment_id))
125    }
126
127    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
128        self.info.values().flatten()
129    }
130}
131
132#[derive(Clone, educe::Educe)]
133#[educe(Debug)]
134pub struct SharedActorInfos {
135    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
136    #[educe(Debug(ignore))]
137    notification_manager: NotificationManagerRef,
138}
139
140impl SharedActorInfos {
141    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
142        self.inner.read()
143    }
144
145    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
146        let core = self.inner.read();
147        core.iter_over_fragments()
148            .flat_map(|(_, fragment)| {
149                fragment
150                    .actors
151                    .iter()
152                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
153            })
154            .collect()
155    }
156
157    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
158    ///
159    /// Very occasionally split removal may happen during scaling, in which case we need to
160    /// use the old splits for reallocation instead of the latest splits (which may be missing),
161    /// so that we can resolve the split removal in the next command.
162    pub fn migrate_splits_for_source_actors(
163        &self,
164        fragment_id: FragmentId,
165        prev_actor_ids: &[ActorId],
166        curr_actor_ids: &[ActorId],
167    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
168        let guard = self.read_guard();
169
170        let prev_splits = prev_actor_ids
171            .iter()
172            .flat_map(|actor_id| {
173                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
174                guard
175                    .get_fragment(fragment_id)
176                    .and_then(|info| info.actors.get(actor_id))
177                    .map(|actor| actor.splits.clone())
178                    .unwrap_or_default()
179            })
180            .map(|split| (split.id(), split))
181            .collect();
182
183        let empty_actor_splits = curr_actor_ids
184            .iter()
185            .map(|actor_id| (*actor_id, vec![]))
186            .collect();
187
188        let diff = crate::stream::source_manager::reassign_splits(
189            fragment_id,
190            empty_actor_splits,
191            &prev_splits,
192            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
193            std::default::Default::default(),
194        )
195        .unwrap_or_default();
196
197        Ok(diff)
198    }
199}
200
201impl SharedActorInfos {
202    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
203        Self {
204            inner: Arc::new(Default::default()),
205            notification_manager,
206        }
207    }
208
209    pub(super) fn remove_database(&self, database_id: DatabaseId) {
210        if let Some(database) = self.inner.write().info.remove(&database_id) {
211            let mapping = database
212                .into_values()
213                .map(|fragment| rebuild_fragment_mapping(&fragment))
214                .collect_vec();
215            if !mapping.is_empty() {
216                self.notification_manager
217                    .notify_fragment_mapping(Operation::Delete, mapping);
218            }
219        }
220    }
221
222    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
223        let database_ids: HashSet<_> = database_ids.into_iter().collect();
224
225        let mut mapping = Vec::new();
226        for fragment in self
227            .inner
228            .write()
229            .info
230            .extract_if(|database_id, _| !database_ids.contains(database_id))
231            .flat_map(|(_, fragments)| fragments.into_values())
232        {
233            mapping.push(rebuild_fragment_mapping(&fragment));
234        }
235        if !mapping.is_empty() {
236            self.notification_manager
237                .notify_fragment_mapping(Operation::Delete, mapping);
238        }
239    }
240
241    pub(super) fn recover_database(
242        &self,
243        database_id: DatabaseId,
244        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
245    ) {
246        let mut remaining_fragments: HashMap<_, _> = fragments
247            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
248            .collect();
249        // delete the fragments that exist previously, but not included in the recovered fragments
250        let mut writer = self.start_writer(database_id);
251        let database = writer.write_guard.info.entry(database_id).or_default();
252        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
253            if let Some(info) = remaining_fragments.remove(fragment_id) {
254                let info = info.into();
255                writer
256                    .updated_fragment_mapping
257                    .get_or_insert_default()
258                    .push(rebuild_fragment_mapping(&info));
259                *fragment_infos = info;
260                false
261            } else {
262                true
263            }
264        }) {
265            writer
266                .deleted_fragment_mapping
267                .get_or_insert_default()
268                .push(rebuild_fragment_mapping(&fragment));
269        }
270        for (fragment_id, info) in remaining_fragments {
271            let info = info.into();
272            writer
273                .added_fragment_mapping
274                .get_or_insert_default()
275                .push(rebuild_fragment_mapping(&info));
276            database.insert(fragment_id, info);
277        }
278        writer.finish();
279    }
280
281    pub(super) fn upsert(
282        &self,
283        database_id: DatabaseId,
284        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
285    ) {
286        let mut writer = self.start_writer(database_id);
287        writer.upsert(infos);
288        writer.finish();
289    }
290
291    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
292        SharedActorInfoWriter {
293            database_id,
294            write_guard: self.inner.write(),
295            notification_manager: &self.notification_manager,
296            added_fragment_mapping: None,
297            updated_fragment_mapping: None,
298            deleted_fragment_mapping: None,
299        }
300    }
301}
302
303pub(super) struct SharedActorInfoWriter<'a> {
304    database_id: DatabaseId,
305    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
306    notification_manager: &'a NotificationManagerRef,
307    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
309    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310}
311
312impl SharedActorInfoWriter<'_> {
313    pub(super) fn upsert(
314        &mut self,
315        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
316    ) {
317        let database = self.write_guard.info.entry(self.database_id).or_default();
318        for info @ (fragment, _) in infos {
319            match database.entry(fragment.fragment_id) {
320                Entry::Occupied(mut entry) => {
321                    let info = info.into();
322                    self.updated_fragment_mapping
323                        .get_or_insert_default()
324                        .push(rebuild_fragment_mapping(&info));
325                    entry.insert(info);
326                }
327                Entry::Vacant(entry) => {
328                    let info = info.into();
329                    self.added_fragment_mapping
330                        .get_or_insert_default()
331                        .push(rebuild_fragment_mapping(&info));
332                    entry.insert(info);
333                }
334            }
335        }
336    }
337
338    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
339        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
340            && let Some(fragment) = database.remove(&info.fragment_id)
341        {
342            self.deleted_fragment_mapping
343                .get_or_insert_default()
344                .push(rebuild_fragment_mapping(&fragment));
345        }
346    }
347
348    pub(super) fn finish(self) {
349        if let Some(mapping) = self.added_fragment_mapping {
350            self.notification_manager
351                .notify_fragment_mapping(Operation::Add, mapping);
352        }
353        if let Some(mapping) = self.updated_fragment_mapping {
354            self.notification_manager
355                .notify_fragment_mapping(Operation::Update, mapping);
356        }
357        if let Some(mapping) = self.deleted_fragment_mapping {
358            self.notification_manager
359                .notify_fragment_mapping(Operation::Delete, mapping);
360        }
361    }
362}
363
364#[derive(Debug, Clone)]
365pub(super) struct BarrierInfo {
366    pub prev_epoch: TracedEpoch,
367    pub curr_epoch: TracedEpoch,
368    pub kind: BarrierKind,
369}
370
371impl BarrierInfo {
372    pub(super) fn prev_epoch(&self) -> u64 {
373        self.prev_epoch.value().0
374    }
375
376    pub(super) fn curr_epoch(&self) -> u64 {
377        self.curr_epoch.value().0
378    }
379
380    pub(super) fn epoch(&self) -> EpochPair {
381        EpochPair {
382            curr: self.curr_epoch(),
383            prev: self.prev_epoch(),
384        }
385    }
386}
387
388#[derive(Clone, Debug)]
389pub enum SubscriberType {
390    Subscription(u64),
391    SnapshotBackfill,
392}
393
394#[derive(Debug)]
395pub(super) enum CreateStreamingJobStatus {
396    Init,
397    Creating { tracker: CreateMviewProgressTracker },
398    Created,
399}
400
401#[derive(Debug)]
402pub(super) struct InflightStreamingJobInfo {
403    pub job_id: JobId,
404    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
405    pub subscribers: HashMap<SubscriberId, SubscriberType>,
406    pub status: CreateStreamingJobStatus,
407    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
408}
409
410impl InflightStreamingJobInfo {
411    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
412        self.fragment_infos.values()
413    }
414
415    pub fn snapshot_backfill_actor_ids(
416        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
417    ) -> impl Iterator<Item = ActorId> + '_ {
418        fragment_infos
419            .values()
420            .filter(|fragment| {
421                fragment
422                    .fragment_type_mask
423                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
424            })
425            .flat_map(|fragment| fragment.actors.keys().copied())
426    }
427
428    pub fn tracking_progress_actor_ids(
429        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430    ) -> Vec<(ActorId, BackfillUpstreamType)> {
431        StreamJobFragments::tracking_progress_actor_ids_impl(
432            fragment_infos
433                .values()
434                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
435        )
436    }
437}
438
439impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
440    type Item = &'a InflightFragmentInfo;
441
442    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
443
444    fn into_iter(self) -> Self::IntoIter {
445        self.fragment_infos()
446    }
447}
448
449#[derive(Debug)]
450pub struct InflightDatabaseInfo {
451    pub(super) database_id: DatabaseId,
452    jobs: HashMap<JobId, InflightStreamingJobInfo>,
453    fragment_location: HashMap<FragmentId, JobId>,
454    pub(super) shared_actor_infos: SharedActorInfos,
455}
456
457impl InflightDatabaseInfo {
458    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
459        self.jobs.values().flat_map(|job| job.fragment_infos())
460    }
461
462    pub fn contains_job(&self, job_id: JobId) -> bool {
463        self.jobs.contains_key(&job_id)
464    }
465
466    pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
467        self.fragment_location.get(&fragment_id).copied()
468    }
469
470    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
471        let job_id = self.fragment_location[&fragment_id];
472        self.jobs
473            .get(&job_id)
474            .expect("should exist")
475            .fragment_infos
476            .get(&fragment_id)
477            .expect("should exist")
478    }
479
480    pub(super) fn backfill_fragment_ids_for_job(
481        &self,
482        job_id: JobId,
483    ) -> MetaResult<HashSet<FragmentId>> {
484        let job = self
485            .jobs
486            .get(&job_id)
487            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
488        Ok(job
489            .fragment_infos
490            .iter()
491            .filter_map(|(fragment_id, fragment)| {
492                fragment
493                    .fragment_type_mask
494                    .contains_any([
495                        FragmentTypeFlag::StreamScan,
496                        FragmentTypeFlag::SourceScan,
497                        FragmentTypeFlag::LocalityProvider,
498                    ])
499                    .then_some(*fragment_id)
500            })
501            .collect())
502    }
503
504    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
505        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
506            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
507        })?;
508        let fragment = self
509            .jobs
510            .get(job_id)
511            .expect("should exist")
512            .fragment_infos
513            .get(&fragment_id)
514            .expect("should exist");
515        Ok(fragment.fragment_type_mask.contains_any([
516            FragmentTypeFlag::StreamScan,
517            FragmentTypeFlag::SourceScan,
518            FragmentTypeFlag::LocalityProvider,
519        ]))
520    }
521
522    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
523        self.jobs
524            .iter()
525            .filter_map(|(job_id, job)| match &job.status {
526                CreateStreamingJobStatus::Init => None,
527                CreateStreamingJobStatus::Creating { tracker } => {
528                    let progress = tracker.gen_backfill_progress();
529                    Some((
530                        *job_id,
531                        BackfillProgress {
532                            progress,
533                            backfill_type: PbBackfillType::NormalBackfill,
534                        },
535                    ))
536                }
537                CreateStreamingJobStatus::Created => None,
538            })
539    }
540
541    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
542        self.jobs.iter().filter_map(|(job_id, job)| {
543            job.cdc_table_backfill_tracker
544                .as_ref()
545                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
546        })
547    }
548
549    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
550        let mut result = Vec::new();
551        for job in self.jobs.values() {
552            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
553                continue;
554            };
555            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
556            result.extend(fragment_progress);
557        }
558        result
559    }
560
561    pub(super) fn may_assign_fragment_cdc_backfill_splits(
562        &mut self,
563        fragment_id: FragmentId,
564    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
565        let job_id = self.fragment_location[&fragment_id];
566        let job = self.jobs.get_mut(&job_id).expect("should exist");
567        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
568            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
569            if cdc_scan_fragment_id != fragment_id {
570                return Ok(None);
571            }
572            let actors = job.fragment_infos[&cdc_scan_fragment_id]
573                .actors
574                .keys()
575                .copied()
576                .collect();
577            tracker.reassign_splits(actors).map(Some)
578        } else {
579            Ok(None)
580        }
581    }
582
583    pub(super) fn assign_cdc_backfill_splits(
584        &mut self,
585        job_id: JobId,
586    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
587        let job = self.jobs.get_mut(&job_id).expect("should exist");
588        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
589            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
590            let actors = job.fragment_infos[&cdc_scan_fragment_id]
591                .actors
592                .keys()
593                .copied()
594                .collect();
595            tracker.reassign_splits(actors).map(Some)
596        } else {
597            Ok(None)
598        }
599    }
600
601    pub(super) fn apply_collected_command(
602        &mut self,
603        command: &PostCollectCommand,
604        resps: &[BarrierCompleteResponse],
605        version_stats: &HummockVersionStats,
606    ) {
607        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
608            match job_type {
609                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
610                    let job_id = info.streaming_job.id();
611                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
612                        let CreateStreamingJobStatus::Init = replace(
613                            &mut job_info.status,
614                            CreateStreamingJobStatus::Creating {
615                                tracker: CreateMviewProgressTracker::new(
616                                    info,
617                                    version_stats,
618                                    &job_info.fragment_infos,
619                                ),
620                            },
621                        ) else {
622                            unreachable!("should be init before collect the first barrier")
623                        };
624                    } else {
625                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
626                    }
627                }
628                CreateStreamingJobType::SnapshotBackfill(_) => {
629                    // The progress of SnapshotBackfill won't be tracked here
630                }
631            }
632        }
633        if let PostCollectCommand::Reschedule { reschedules, .. } = command {
634            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
635            debug_assert!(
636                reschedules
637                    .values()
638                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
639                "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
640            );
641
642            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
643            let related_job_ids = reschedules
644                .keys()
645                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
646                .cloned()
647                .collect::<HashSet<_>>();
648            for job_id in related_job_ids {
649                if let Some(job) = self.jobs.get_mut(&job_id)
650                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
651                {
652                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
653                }
654            }
655        }
656        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
657            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
658                warn!(
659                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
660                );
661                continue;
662            };
663            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
664                CreateStreamingJobStatus::Init => {
665                    continue;
666                }
667                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
668                CreateStreamingJobStatus::Created => {
669                    if !progress.done {
670                        warn!("update the progress of an created streaming job: {progress:?}");
671                    }
672                    continue;
673                }
674            };
675            tracker.apply_progress(progress, version_stats);
676        }
677        for progress in resps
678            .iter()
679            .flat_map(|resp| &resp.cdc_table_backfill_progress)
680        {
681            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
682                warn!(
683                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
684                );
685                continue;
686            };
687            let Some(tracker) = &mut self
688                .jobs
689                .get_mut(job_id)
690                .expect("should exist")
691                .cdc_table_backfill_tracker
692            else {
693                warn!("update the cdc progress of an created streaming job: {progress:?}");
694                continue;
695            };
696            tracker.update_split_progress(progress);
697        }
698        // Handle CDC source offset updated events
699        for cdc_offset_updated in resps
700            .iter()
701            .flat_map(|resp| &resp.cdc_source_offset_updated)
702        {
703            use risingwave_common::id::SourceId;
704            let source_id = SourceId::new(cdc_offset_updated.source_id);
705            let job_id = source_id.as_share_source_job_id();
706            if let Some(job) = self.jobs.get_mut(&job_id) {
707                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
708                    tracker.mark_cdc_source_finished();
709                }
710            } else {
711                warn!(
712                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
713                    cdc_offset_updated.source_id, job_id
714                );
715            }
716        }
717    }
718
719    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
720        self.jobs.values().filter_map(|job| match &job.status {
721            CreateStreamingJobStatus::Init => None,
722            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
723            CreateStreamingJobStatus::Created => None,
724        })
725    }
726
727    fn iter_mut_creating_job_tracker(
728        &mut self,
729    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
730        self.jobs
731            .values_mut()
732            .filter_map(|job| match &mut job.status {
733                CreateStreamingJobStatus::Init => None,
734                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
735                CreateStreamingJobStatus::Created => None,
736            })
737    }
738
739    pub(super) fn has_pending_finished_jobs(&self) -> bool {
740        self.iter_creating_job_tracker()
741            .any(|tracker| tracker.is_finished())
742    }
743
744    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
745        self.iter_mut_creating_job_tracker()
746            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
747            .collect()
748    }
749
750    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
751        let mut finished_jobs = vec![];
752        let mut table_ids_to_truncate = vec![];
753        let mut finished_cdc_table_backfill = vec![];
754        for (job_id, job) in &mut self.jobs {
755            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
756                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
757                table_ids_to_truncate.extend(truncate_table_ids);
758                if is_finished {
759                    let CreateStreamingJobStatus::Creating { tracker, .. } =
760                        replace(&mut job.status, CreateStreamingJobStatus::Created)
761                    else {
762                        unreachable!()
763                    };
764                    finished_jobs.push(tracker.into_tracking_job());
765                }
766            }
767            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
768                && tracker.take_pre_completed()
769            {
770                finished_cdc_table_backfill.push(*job_id);
771            }
772        }
773        StagingCommitInfo {
774            finished_jobs,
775            table_ids_to_truncate,
776            finished_cdc_table_backfill,
777        }
778    }
779
780    pub fn fragment_subscribers(
781        &self,
782        fragment_id: FragmentId,
783    ) -> impl Iterator<Item = SubscriberId> + '_ {
784        let job_id = self.fragment_location[&fragment_id];
785        self.jobs[&job_id].subscribers.keys().copied()
786    }
787
788    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
789        self.jobs[&job_id].subscribers.keys().copied()
790    }
791
792    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
793        self.jobs
794            .iter()
795            .filter_map(|(job_id, info)| {
796                info.subscribers
797                    .values()
798                    .filter_map(|subscriber| match subscriber {
799                        SubscriberType::Subscription(retention) => Some(*retention),
800                        SubscriberType::SnapshotBackfill => None,
801                    })
802                    .max()
803                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
804            })
805            .collect()
806    }
807
808    pub fn register_subscriber(
809        &mut self,
810        job_id: JobId,
811        subscriber_id: SubscriberId,
812        subscriber: SubscriberType,
813    ) {
814        self.jobs
815            .get_mut(&job_id)
816            .expect("should exist")
817            .subscribers
818            .try_insert(subscriber_id, subscriber)
819            .expect("non duplicate");
820    }
821
822    pub fn unregister_subscriber(
823        &mut self,
824        job_id: JobId,
825        subscriber_id: SubscriberId,
826    ) -> Option<SubscriberType> {
827        self.jobs
828            .get_mut(&job_id)
829            .expect("should exist")
830            .subscribers
831            .remove(&subscriber_id)
832    }
833
834    pub fn update_subscription_retention(
835        &mut self,
836        job_id: JobId,
837        subscriber_id: SubscriberId,
838        retention_second: u64,
839    ) {
840        let job = self.jobs.get_mut(&job_id).expect("should exist");
841        match job.subscribers.get_mut(&subscriber_id) {
842            Some(SubscriberType::Subscription(current_retention)) => {
843                *current_retention = retention_second;
844            }
845            Some(SubscriberType::SnapshotBackfill) => {
846                warn!(
847                    %job_id,
848                    %subscriber_id,
849                    "cannot update retention for snapshot backfill subscriber"
850                );
851            }
852            None => {
853                warn!(%job_id, %subscriber_id, "subscription subscriber not found");
854            }
855        }
856    }
857
858    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
859        let job_id = self.fragment_location[&fragment_id];
860        let fragment = self
861            .jobs
862            .get_mut(&job_id)
863            .expect("should exist")
864            .fragment_infos
865            .get_mut(&fragment_id)
866            .expect("should exist");
867        (fragment, job_id)
868    }
869
870    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
871        Self {
872            database_id,
873            jobs: Default::default(),
874            fragment_location: Default::default(),
875            shared_actor_infos,
876        }
877    }
878
879    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
880        // remove the database because it's empty.
881        shared_actor_infos.remove_database(database_id);
882        Self::empty_inner(database_id, shared_actor_infos)
883    }
884
885    pub fn recover(
886        database_id: DatabaseId,
887        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
888        shared_actor_infos: SharedActorInfos,
889    ) -> Self {
890        let mut info = Self::empty_inner(database_id, shared_actor_infos);
891        for job in jobs {
892            info.add_existing(job);
893        }
894        info
895    }
896
897    pub fn is_empty(&self) -> bool {
898        self.jobs.is_empty()
899    }
900
901    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
902        let InflightStreamingJobInfo {
903            job_id,
904            fragment_infos,
905            subscribers,
906            status,
907            cdc_table_backfill_tracker,
908        } = job;
909        self.jobs
910            .try_insert(
911                job_id,
912                InflightStreamingJobInfo {
913                    job_id,
914                    subscribers,
915                    fragment_infos: Default::default(), // fill in later in pre_apply_new_fragments
916                    status,
917                    cdc_table_backfill_tracker,
918                },
919            )
920            .expect("non-duplicate");
921        self.pre_apply_new_fragments(
922            fragment_infos
923                .into_iter()
924                .map(|(fragment_id, info)| (fragment_id, job_id, info)),
925        );
926    }
927
928    /// Register a new streaming job entry (with empty `fragment_infos`).
929    pub(crate) fn pre_apply_new_job(
930        &mut self,
931        job_id: JobId,
932        cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
933    ) {
934        {
935            self.jobs
936                .try_insert(
937                    job_id,
938                    InflightStreamingJobInfo {
939                        job_id,
940                        fragment_infos: Default::default(),
941                        subscribers: Default::default(), // no subscriber for newly create job
942                        status: CreateStreamingJobStatus::Init,
943                        cdc_table_backfill_tracker,
944                    },
945                )
946                .expect("non-duplicate");
947        }
948    }
949
950    /// Add new fragment infos and update shared actor infos.
951    pub(crate) fn pre_apply_new_fragments(
952        &mut self,
953        fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
954    ) {
955        {
956            let shared_infos = self.shared_actor_infos.clone();
957            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
958            for (fragment_id, job_id, info) in fragments {
959                {
960                    {
961                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
962                        shared_actor_writer.upsert([(&info, job_id)]);
963                        fragment_infos
964                            .fragment_infos
965                            .try_insert(fragment_id, info)
966                            .expect("non duplicate");
967                        self.fragment_location
968                            .try_insert(fragment_id, job_id)
969                            .expect("non duplicate");
970                    }
971                }
972            }
973            shared_actor_writer.finish();
974        }
975    }
976
977    /// Pre-apply reschedule: update actors, vnode bitmaps, and splits.
978    /// The actual removal of old actors happens in `post_apply_reschedules`.
979    pub(crate) fn pre_apply_reschedule(
980        &mut self,
981        fragment_id: FragmentId,
982        new_actors: HashMap<ActorId, InflightActorInfo>,
983        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
984        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
985    ) {
986        {
987            {
988                {
989                    {
990                        let (info, _) = self.fragment_mut(fragment_id);
991                        let actors = &mut info.actors;
992                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
993                            actors
994                                .get_mut(&actor_id)
995                                .expect("should exist")
996                                .vnode_bitmap = Some(new_vnodes);
997                        }
998                        for (actor_id, actor) in new_actors {
999                            actors
1000                                .try_insert(actor_id as _, actor)
1001                                .expect("non-duplicate");
1002                        }
1003                        for (actor_id, splits) in actor_splits {
1004                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1005                        }
1006                        // info will be upserted into shared_actor_infos in post_apply stage
1007                    }
1008                }
1009            }
1010        }
1011    }
1012
1013    /// Replace upstream fragment IDs in merge nodes of a fragment's stream graph.
1014    pub(crate) fn pre_apply_replace_node_upstream(
1015        &mut self,
1016        fragment_id: FragmentId,
1017        replace_map: &HashMap<FragmentId, FragmentId>,
1018    ) {
1019        {
1020            {
1021                {
1022                    {
1023                        let mut remaining_fragment_ids: HashSet<_> =
1024                            replace_map.keys().cloned().collect();
1025                        let (info, _) = self.fragment_mut(fragment_id);
1026                        visit_stream_node_mut(&mut info.nodes, |node| {
1027                            if let NodeBody::Merge(m) = node
1028                                && let Some(new_upstream_fragment_id) =
1029                                    replace_map.get(&m.upstream_fragment_id)
1030                            {
1031                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1032                                    if cfg!(debug_assertions) {
1033                                        panic!(
1034                                            "duplicate upstream fragment: {:?} {:?}",
1035                                            m, replace_map
1036                                        );
1037                                    } else {
1038                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1039                                    }
1040                                }
1041                                m.upstream_fragment_id = *new_upstream_fragment_id;
1042                            }
1043                        });
1044                        if cfg!(debug_assertions) {
1045                            assert!(
1046                                remaining_fragment_ids.is_empty(),
1047                                "non-existing fragment to replace: {:?} {:?} {:?}",
1048                                remaining_fragment_ids,
1049                                info.nodes,
1050                                replace_map
1051                            );
1052                        } else {
1053                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1054                        }
1055                    }
1056                }
1057            }
1058        }
1059    }
1060
1061    /// Add a new upstream sink node to a fragment's `UpstreamSinkUnion`.
1062    pub(crate) fn pre_apply_add_node_upstream(
1063        &mut self,
1064        fragment_id: FragmentId,
1065        new_upstream_info: &PbUpstreamSinkInfo,
1066    ) {
1067        {
1068            {
1069                {
1070                    {
1071                        let (info, _) = self.fragment_mut(fragment_id);
1072                        let mut injected = false;
1073                        visit_stream_node_mut(&mut info.nodes, |node| {
1074                            if let NodeBody::UpstreamSinkUnion(u) = node {
1075                                if cfg!(debug_assertions) {
1076                                    let current_upstream_fragment_ids = u
1077                                        .init_upstreams
1078                                        .iter()
1079                                        .map(|upstream| upstream.upstream_fragment_id)
1080                                        .collect::<HashSet<_>>();
1081                                    if current_upstream_fragment_ids
1082                                        .contains(&new_upstream_info.upstream_fragment_id)
1083                                    {
1084                                        panic!(
1085                                            "duplicate upstream fragment: {:?} {:?}",
1086                                            u, new_upstream_info
1087                                        );
1088                                    }
1089                                }
1090                                u.init_upstreams.push(new_upstream_info.clone());
1091                                injected = true;
1092                            }
1093                        });
1094                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1095                    }
1096                }
1097            }
1098        }
1099    }
1100
1101    /// Remove upstream sink nodes from a fragment's `UpstreamSinkUnion`.
1102    pub(crate) fn pre_apply_drop_node_upstream(
1103        &mut self,
1104        fragment_id: FragmentId,
1105        drop_upstream_fragment_ids: &[FragmentId],
1106    ) {
1107        {
1108            {
1109                {
1110                    {
1111                        let (info, _) = self.fragment_mut(fragment_id);
1112                        let mut removed = false;
1113                        visit_stream_node_mut(&mut info.nodes, |node| {
1114                            if let NodeBody::UpstreamSinkUnion(u) = node {
1115                                if cfg!(debug_assertions) {
1116                                    let current_upstream_fragment_ids = u
1117                                        .init_upstreams
1118                                        .iter()
1119                                        .map(|upstream| upstream.upstream_fragment_id)
1120                                        .collect::<HashSet<FragmentId>>();
1121                                    for drop_fragment_id in drop_upstream_fragment_ids {
1122                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1123                                        {
1124                                            panic!(
1125                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1126                                                u, drop_upstream_fragment_ids, drop_fragment_id
1127                                            );
1128                                        }
1129                                    }
1130                                }
1131                                u.init_upstreams.retain(|upstream| {
1132                                    !drop_upstream_fragment_ids
1133                                        .contains(&upstream.upstream_fragment_id)
1134                                });
1135                                removed = true;
1136                            }
1137                        });
1138                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1139                    }
1140                }
1141            }
1142        }
1143    }
1144
1145    /// Update split assignments for actors in fragments.
1146    pub(crate) fn pre_apply_split_assignments(
1147        &mut self,
1148        assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1149    ) {
1150        {
1151            let shared_infos = self.shared_actor_infos.clone();
1152            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1153            {
1154                {
1155                    for (fragment_id, actor_splits) in assignments {
1156                        let (info, job_id) = self.fragment_mut(fragment_id);
1157                        let actors = &mut info.actors;
1158                        for (actor_id, splits) in actor_splits {
1159                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1160                        }
1161                        shared_actor_writer.upsert([(&*info, job_id)]);
1162                    }
1163                }
1164            }
1165            shared_actor_writer.finish();
1166        }
1167    }
1168
1169    pub(super) fn build_edge(
1170        &self,
1171        info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1172        replace_job: Option<&ReplaceStreamJobPlan>,
1173        new_upstream_sink: Option<&UpstreamSinkInfo>,
1174        control_stream_manager: &ControlStreamManager,
1175        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1176        actor_location: &HashMap<ActorId, WorkerId>,
1177    ) -> FragmentEdgeBuildResult {
1178        // `existing_fragment_ids` consists of
1179        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1180        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1181        // if the upstream fragment previously exists
1182        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1183        // if creating a new sink-into-table
1184        //  - should contain the `fragment_id` of the downstream table.
1185        let existing_fragment_ids = info
1186            .into_iter()
1187            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1188            .chain(replace_job.into_iter().flat_map(|replace_job| {
1189                replace_job
1190                    .upstream_fragment_downstreams
1191                    .keys()
1192                    .filter(|fragment_id| {
1193                        info.map(|(info, _)| {
1194                            !info
1195                                .stream_job_fragments
1196                                .fragments
1197                                .contains_key(*fragment_id)
1198                        })
1199                        .unwrap_or(true)
1200                    })
1201                    .chain(replace_job.replace_upstream.keys())
1202            }))
1203            .chain(
1204                new_upstream_sink
1205                    .into_iter()
1206                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1207            )
1208            .cloned();
1209        // Collect new fragments with their partial graph IDs
1210        let new_fragments = info
1211            .into_iter()
1212            .flat_map(|(info, is_snapshot_backfill)| {
1213                let partial_graph_id = to_partial_graph_id(
1214                    self.database_id,
1215                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1216                );
1217                info.stream_job_fragments
1218                    .fragments
1219                    .values()
1220                    .map(move |fragment| (partial_graph_id, fragment))
1221            })
1222            .chain(replace_job.into_iter().flat_map(|replace_job| {
1223                replace_job
1224                    .new_fragments
1225                    .fragments
1226                    .values()
1227                    .chain(
1228                        replace_job
1229                            .auto_refresh_schema_sinks
1230                            .as_ref()
1231                            .into_iter()
1232                            .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1233                    )
1234                    .map(|fragment| {
1235                        (
1236                            // we assume that replace job only happens in database partial graph
1237                            to_partial_graph_id(self.database_id, None),
1238                            fragment,
1239                        )
1240                    })
1241            }));
1242
1243        let mut builder = FragmentEdgeBuilder::new(
1244            // Existing fragments
1245            existing_fragment_ids
1246                .map(|fragment_id| {
1247                    (
1248                        fragment_id,
1249                        EdgeBuilderFragmentInfo::from_inflight(
1250                            self.fragment(fragment_id),
1251                            to_partial_graph_id(self.database_id, None),
1252                            control_stream_manager,
1253                        ),
1254                    )
1255                })
1256                // New fragments from create/replace jobs
1257                .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1258                    (
1259                        fragment.fragment_id,
1260                        EdgeBuilderFragmentInfo::from_fragment(
1261                            fragment,
1262                            stream_actors,
1263                            actor_location,
1264                            partial_graph_id,
1265                            control_stream_manager,
1266                        ),
1267                    )
1268                })),
1269        );
1270        if let Some((info, _)) = info {
1271            builder.add_relations(&info.upstream_fragment_downstreams);
1272            builder.add_relations(&info.stream_job_fragments.downstreams);
1273        }
1274        if let Some(replace_job) = replace_job {
1275            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1276            builder.add_relations(&replace_job.new_fragments.downstreams);
1277        }
1278        if let Some(new_upstream_sink) = new_upstream_sink {
1279            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1280            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1281            builder.add_edge(sink_fragment_id, new_sink_downstream);
1282        }
1283        if let Some(replace_job) = replace_job {
1284            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1285                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1286                    fragment_replacement
1287                {
1288                    builder.replace_upstream(
1289                        *fragment_id,
1290                        *original_upstream_fragment_id,
1291                        *new_upstream_fragment_id,
1292                    );
1293                }
1294            }
1295        }
1296        builder.build()
1297    }
1298
1299    /// Post-apply reschedule: remove actors that were marked for removal.
1300    pub(crate) fn post_apply_reschedules(
1301        &mut self,
1302        reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1303    ) {
1304        let inner = self.shared_actor_infos.clone();
1305        let mut shared_actor_writer = inner.start_writer(self.database_id);
1306        {
1307            {
1308                {
1309                    for (fragment_id, to_remove) in reschedules {
1310                        let job_id = self.fragment_location[&fragment_id];
1311                        let info = self
1312                            .jobs
1313                            .get_mut(&job_id)
1314                            .expect("should exist")
1315                            .fragment_infos
1316                            .get_mut(&fragment_id)
1317                            .expect("should exist");
1318                        for actor_id in to_remove {
1319                            assert!(info.actors.remove(&actor_id).is_some());
1320                        }
1321                        shared_actor_writer.upsert([(&*info, job_id)]);
1322                    }
1323                }
1324            }
1325        }
1326        shared_actor_writer.finish();
1327    }
1328
1329    /// Post-apply fragment removal: remove fragments and their jobs if empty.
1330    pub(crate) fn post_apply_remove_fragments(
1331        &mut self,
1332        fragment_ids: impl IntoIterator<Item = FragmentId>,
1333    ) {
1334        let inner = self.shared_actor_infos.clone();
1335        let mut shared_actor_writer = inner.start_writer(self.database_id);
1336        {
1337            {
1338                {
1339                    for fragment_id in fragment_ids {
1340                        let job_id = self
1341                            .fragment_location
1342                            .remove(&fragment_id)
1343                            .expect("should exist");
1344                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1345                        let fragment = job
1346                            .fragment_infos
1347                            .remove(&fragment_id)
1348                            .expect("should exist");
1349                        shared_actor_writer.remove(&fragment);
1350                        if job.fragment_infos.is_empty() {
1351                            self.jobs.remove(&job_id).expect("should exist");
1352                        }
1353                    }
1354                }
1355            }
1356        }
1357        shared_actor_writer.finish();
1358    }
1359}
1360
1361impl InflightFragmentInfo {
1362    /// Returns actor list to collect in the target worker node.
1363    pub(crate) fn actor_ids_to_collect(
1364        infos: impl IntoIterator<Item = &Self>,
1365    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1366        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1367        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1368            assert!(
1369                ret.entry(actor.worker_id)
1370                    .or_default()
1371                    .insert(*actor_id as _)
1372            )
1373        }
1374        ret
1375    }
1376
1377    pub fn existing_table_ids<'a>(
1378        infos: impl IntoIterator<Item = &'a Self> + 'a,
1379    ) -> impl Iterator<Item = TableId> + 'a {
1380        infos
1381            .into_iter()
1382            .flat_map(|info| info.state_table_ids.iter().cloned())
1383    }
1384
1385    pub fn workers<'a>(
1386        infos: impl IntoIterator<Item = &'a Self> + 'a,
1387    ) -> impl Iterator<Item = WorkerId> + 'a {
1388        infos
1389            .into_iter()
1390            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1391    }
1392
1393    pub fn contains_worker<'a>(
1394        infos: impl IntoIterator<Item = &'a Self> + 'a,
1395        worker_id: WorkerId,
1396    ) -> bool {
1397        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1398    }
1399}
1400
1401impl InflightDatabaseInfo {
1402    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1403        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1404    }
1405
1406    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1407        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1408    }
1409}