risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::ddl_service::PbBackfillType;
32use risingwave_pb::hummock::HummockVersionStats;
33use risingwave_pb::id::SubscriberId;
34use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
35use risingwave_pb::meta::subscribe_response::Operation;
36use risingwave_pb::source::PbCdcTableSnapshotSplits;
37use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_service::BarrierCompleteResponse;
40use tracing::{info, warn};
41
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::command::{
44    CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
45};
46use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
47use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
48use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
49use crate::barrier::{
50    BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
51};
52use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
53use crate::controller::utils::rebuild_fragment_mapping;
54use crate::manager::NotificationManagerRef;
55use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
56use crate::stream::UpstreamSinkInfo;
57use crate::{MetaError, MetaResult};
58
59#[derive(Debug, Clone)]
60pub struct SharedActorInfo {
61    pub worker_id: WorkerId,
62    pub vnode_bitmap: Option<Bitmap>,
63    pub splits: Vec<SplitImpl>,
64}
65
66impl From<&InflightActorInfo> for SharedActorInfo {
67    fn from(value: &InflightActorInfo) -> Self {
68        Self {
69            worker_id: value.worker_id,
70            vnode_bitmap: value.vnode_bitmap.clone(),
71            splits: value.splits.clone(),
72        }
73    }
74}
75
76#[derive(Debug, Clone)]
77pub struct SharedFragmentInfo {
78    pub fragment_id: FragmentId,
79    pub job_id: JobId,
80    pub distribution_type: DistributionType,
81    pub actors: HashMap<ActorId, SharedActorInfo>,
82    pub vnode_count: usize,
83    pub fragment_type_mask: FragmentTypeMask,
84}
85
86impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
87    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
88        let (info, job_id) = pair;
89
90        let InflightFragmentInfo {
91            fragment_id,
92            distribution_type,
93            fragment_type_mask,
94            actors,
95            vnode_count,
96            ..
97        } = info;
98
99        Self {
100            fragment_id: *fragment_id,
101            job_id,
102            distribution_type: *distribution_type,
103            fragment_type_mask: *fragment_type_mask,
104            actors: actors
105                .iter()
106                .map(|(actor_id, actor)| (*actor_id, actor.into()))
107                .collect(),
108            vnode_count: *vnode_count,
109        }
110    }
111}
112
113#[derive(Default, Debug)]
114pub struct SharedActorInfosInner {
115    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
116}
117
118impl SharedActorInfosInner {
119    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
120        self.info
121            .values()
122            .find_map(|database| database.get(&fragment_id))
123    }
124
125    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
126        self.info.values().flatten()
127    }
128}
129
130#[derive(Clone, educe::Educe)]
131#[educe(Debug)]
132pub struct SharedActorInfos {
133    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
134    #[educe(Debug(ignore))]
135    notification_manager: NotificationManagerRef,
136}
137
138impl SharedActorInfos {
139    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
140        self.inner.read()
141    }
142
143    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
144        let core = self.inner.read();
145        core.iter_over_fragments()
146            .flat_map(|(_, fragment)| {
147                fragment
148                    .actors
149                    .iter()
150                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
151            })
152            .collect()
153    }
154
155    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
156    ///
157    /// Very occasionally split removal may happen during scaling, in which case we need to
158    /// use the old splits for reallocation instead of the latest splits (which may be missing),
159    /// so that we can resolve the split removal in the next command.
160    pub fn migrate_splits_for_source_actors(
161        &self,
162        fragment_id: FragmentId,
163        prev_actor_ids: &[ActorId],
164        curr_actor_ids: &[ActorId],
165    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
166        let guard = self.read_guard();
167
168        let prev_splits = prev_actor_ids
169            .iter()
170            .flat_map(|actor_id| {
171                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
172                guard
173                    .get_fragment(fragment_id)
174                    .and_then(|info| info.actors.get(actor_id))
175                    .map(|actor| actor.splits.clone())
176                    .unwrap_or_default()
177            })
178            .map(|split| (split.id(), split))
179            .collect();
180
181        let empty_actor_splits = curr_actor_ids
182            .iter()
183            .map(|actor_id| (*actor_id, vec![]))
184            .collect();
185
186        let diff = crate::stream::source_manager::reassign_splits(
187            fragment_id,
188            empty_actor_splits,
189            &prev_splits,
190            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
191            std::default::Default::default(),
192        )
193        .unwrap_or_default();
194
195        Ok(diff)
196    }
197}
198
199impl SharedActorInfos {
200    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
201        Self {
202            inner: Arc::new(Default::default()),
203            notification_manager,
204        }
205    }
206
207    pub(super) fn remove_database(&self, database_id: DatabaseId) {
208        if let Some(database) = self.inner.write().info.remove(&database_id) {
209            let mapping = database
210                .into_values()
211                .map(|fragment| rebuild_fragment_mapping(&fragment))
212                .collect_vec();
213            if !mapping.is_empty() {
214                self.notification_manager
215                    .notify_fragment_mapping(Operation::Delete, mapping);
216            }
217        }
218    }
219
220    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
221        let database_ids: HashSet<_> = database_ids.into_iter().collect();
222
223        let mut mapping = Vec::new();
224        for fragment in self
225            .inner
226            .write()
227            .info
228            .extract_if(|database_id, _| !database_ids.contains(database_id))
229            .flat_map(|(_, fragments)| fragments.into_values())
230        {
231            mapping.push(rebuild_fragment_mapping(&fragment));
232        }
233        if !mapping.is_empty() {
234            self.notification_manager
235                .notify_fragment_mapping(Operation::Delete, mapping);
236        }
237    }
238
239    pub(super) fn recover_database(
240        &self,
241        database_id: DatabaseId,
242        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
243    ) {
244        let mut remaining_fragments: HashMap<_, _> = fragments
245            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
246            .collect();
247        // delete the fragments that exist previously, but not included in the recovered fragments
248        let mut writer = self.start_writer(database_id);
249        let database = writer.write_guard.info.entry(database_id).or_default();
250        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
251            if let Some(info) = remaining_fragments.remove(fragment_id) {
252                let info = info.into();
253                writer
254                    .updated_fragment_mapping
255                    .get_or_insert_default()
256                    .push(rebuild_fragment_mapping(&info));
257                *fragment_infos = info;
258                false
259            } else {
260                true
261            }
262        }) {
263            writer
264                .deleted_fragment_mapping
265                .get_or_insert_default()
266                .push(rebuild_fragment_mapping(&fragment));
267        }
268        for (fragment_id, info) in remaining_fragments {
269            let info = info.into();
270            writer
271                .added_fragment_mapping
272                .get_or_insert_default()
273                .push(rebuild_fragment_mapping(&info));
274            database.insert(fragment_id, info);
275        }
276        writer.finish();
277    }
278
279    pub(super) fn upsert(
280        &self,
281        database_id: DatabaseId,
282        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
283    ) {
284        let mut writer = self.start_writer(database_id);
285        writer.upsert(infos);
286        writer.finish();
287    }
288
289    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
290        SharedActorInfoWriter {
291            database_id,
292            write_guard: self.inner.write(),
293            notification_manager: &self.notification_manager,
294            added_fragment_mapping: None,
295            updated_fragment_mapping: None,
296            deleted_fragment_mapping: None,
297        }
298    }
299}
300
301pub(super) struct SharedActorInfoWriter<'a> {
302    database_id: DatabaseId,
303    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
304    notification_manager: &'a NotificationManagerRef,
305    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
306    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
307    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
308}
309
310impl SharedActorInfoWriter<'_> {
311    pub(super) fn upsert(
312        &mut self,
313        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
314    ) {
315        let database = self.write_guard.info.entry(self.database_id).or_default();
316        for info @ (fragment, _) in infos {
317            match database.entry(fragment.fragment_id) {
318                Entry::Occupied(mut entry) => {
319                    let info = info.into();
320                    self.updated_fragment_mapping
321                        .get_or_insert_default()
322                        .push(rebuild_fragment_mapping(&info));
323                    entry.insert(info);
324                }
325                Entry::Vacant(entry) => {
326                    let info = info.into();
327                    self.added_fragment_mapping
328                        .get_or_insert_default()
329                        .push(rebuild_fragment_mapping(&info));
330                    entry.insert(info);
331                }
332            }
333        }
334    }
335
336    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
337        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
338            && let Some(fragment) = database.remove(&info.fragment_id)
339        {
340            self.deleted_fragment_mapping
341                .get_or_insert_default()
342                .push(rebuild_fragment_mapping(&fragment));
343        }
344    }
345
346    pub(super) fn finish(self) {
347        if let Some(mapping) = self.added_fragment_mapping {
348            self.notification_manager
349                .notify_fragment_mapping(Operation::Add, mapping);
350        }
351        if let Some(mapping) = self.updated_fragment_mapping {
352            self.notification_manager
353                .notify_fragment_mapping(Operation::Update, mapping);
354        }
355        if let Some(mapping) = self.deleted_fragment_mapping {
356            self.notification_manager
357                .notify_fragment_mapping(Operation::Delete, mapping);
358        }
359    }
360}
361
362#[derive(Debug, Clone)]
363pub(super) struct BarrierInfo {
364    pub prev_epoch: TracedEpoch,
365    pub curr_epoch: TracedEpoch,
366    pub kind: BarrierKind,
367}
368
369impl BarrierInfo {
370    pub(super) fn prev_epoch(&self) -> u64 {
371        self.prev_epoch.value().0
372    }
373
374    pub(super) fn curr_epoch(&self) -> u64 {
375        self.curr_epoch.value().0
376    }
377
378    pub(super) fn epoch(&self) -> EpochPair {
379        EpochPair {
380            curr: self.curr_epoch(),
381            prev: self.prev_epoch(),
382        }
383    }
384}
385
386#[derive(Clone, Debug)]
387pub enum SubscriberType {
388    Subscription(u64),
389    SnapshotBackfill,
390}
391
392#[derive(Debug)]
393pub(super) enum CreateStreamingJobStatus {
394    Init,
395    Creating { tracker: CreateMviewProgressTracker },
396    Created,
397}
398
399#[derive(Debug)]
400pub(super) struct InflightStreamingJobInfo {
401    pub job_id: JobId,
402    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
403    pub subscribers: HashMap<SubscriberId, SubscriberType>,
404    pub status: CreateStreamingJobStatus,
405    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
406}
407
408impl InflightStreamingJobInfo {
409    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
410        self.fragment_infos.values()
411    }
412
413    pub fn snapshot_backfill_actor_ids(
414        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
415    ) -> impl Iterator<Item = ActorId> + '_ {
416        fragment_infos
417            .values()
418            .filter(|fragment| {
419                fragment
420                    .fragment_type_mask
421                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
422            })
423            .flat_map(|fragment| fragment.actors.keys().copied())
424    }
425
426    pub fn tracking_progress_actor_ids(
427        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
428    ) -> Vec<(ActorId, BackfillUpstreamType)> {
429        StreamJobFragments::tracking_progress_actor_ids_impl(
430            fragment_infos
431                .values()
432                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
433        )
434    }
435}
436
437impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
438    type Item = &'a InflightFragmentInfo;
439
440    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
441
442    fn into_iter(self) -> Self::IntoIter {
443        self.fragment_infos()
444    }
445}
446
447#[derive(Debug)]
448pub struct InflightDatabaseInfo {
449    pub(super) database_id: DatabaseId,
450    jobs: HashMap<JobId, InflightStreamingJobInfo>,
451    fragment_location: HashMap<FragmentId, JobId>,
452    pub(super) shared_actor_infos: SharedActorInfos,
453}
454
455impl InflightDatabaseInfo {
456    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
457        self.jobs.values().flat_map(|job| job.fragment_infos())
458    }
459
460    pub fn contains_job(&self, job_id: JobId) -> bool {
461        self.jobs.contains_key(&job_id)
462    }
463
464    pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
465        self.fragment_location.get(&fragment_id).copied()
466    }
467
468    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
469        let job_id = self.fragment_location[&fragment_id];
470        self.jobs
471            .get(&job_id)
472            .expect("should exist")
473            .fragment_infos
474            .get(&fragment_id)
475            .expect("should exist")
476    }
477
478    pub(super) fn backfill_fragment_ids_for_job(
479        &self,
480        job_id: JobId,
481    ) -> MetaResult<HashSet<FragmentId>> {
482        let job = self
483            .jobs
484            .get(&job_id)
485            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
486        Ok(job
487            .fragment_infos
488            .iter()
489            .filter_map(|(fragment_id, fragment)| {
490                fragment
491                    .fragment_type_mask
492                    .contains_any([
493                        FragmentTypeFlag::StreamScan,
494                        FragmentTypeFlag::SourceScan,
495                        FragmentTypeFlag::LocalityProvider,
496                    ])
497                    .then_some(*fragment_id)
498            })
499            .collect())
500    }
501
502    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
503        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
504            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
505        })?;
506        let fragment = self
507            .jobs
508            .get(job_id)
509            .expect("should exist")
510            .fragment_infos
511            .get(&fragment_id)
512            .expect("should exist");
513        Ok(fragment.fragment_type_mask.contains_any([
514            FragmentTypeFlag::StreamScan,
515            FragmentTypeFlag::SourceScan,
516            FragmentTypeFlag::LocalityProvider,
517        ]))
518    }
519
520    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
521        self.jobs
522            .iter()
523            .filter_map(|(job_id, job)| match &job.status {
524                CreateStreamingJobStatus::Init => None,
525                CreateStreamingJobStatus::Creating { tracker } => {
526                    let progress = tracker.gen_backfill_progress();
527                    Some((
528                        *job_id,
529                        BackfillProgress {
530                            progress,
531                            backfill_type: PbBackfillType::NormalBackfill,
532                        },
533                    ))
534                }
535                CreateStreamingJobStatus::Created => None,
536            })
537    }
538
539    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
540        self.jobs.iter().filter_map(|(job_id, job)| {
541            job.cdc_table_backfill_tracker
542                .as_ref()
543                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
544        })
545    }
546
547    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
548        let mut result = Vec::new();
549        for job in self.jobs.values() {
550            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
551                continue;
552            };
553            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
554            result.extend(fragment_progress);
555        }
556        result
557    }
558
559    pub(super) fn may_assign_fragment_cdc_backfill_splits(
560        &mut self,
561        fragment_id: FragmentId,
562    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
563        let job_id = self.fragment_location[&fragment_id];
564        let job = self.jobs.get_mut(&job_id).expect("should exist");
565        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
566            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
567            if cdc_scan_fragment_id != fragment_id {
568                return Ok(None);
569            }
570            let actors = job.fragment_infos[&cdc_scan_fragment_id]
571                .actors
572                .keys()
573                .copied()
574                .collect();
575            tracker.reassign_splits(actors).map(Some)
576        } else {
577            Ok(None)
578        }
579    }
580
581    pub(super) fn assign_cdc_backfill_splits(
582        &mut self,
583        job_id: JobId,
584    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
585        let job = self.jobs.get_mut(&job_id).expect("should exist");
586        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
587            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
588            let actors = job.fragment_infos[&cdc_scan_fragment_id]
589                .actors
590                .keys()
591                .copied()
592                .collect();
593            tracker.reassign_splits(actors).map(Some)
594        } else {
595            Ok(None)
596        }
597    }
598
599    pub(super) fn apply_collected_command(
600        &mut self,
601        command: &PostCollectCommand,
602        resps: &[BarrierCompleteResponse],
603        version_stats: &HummockVersionStats,
604    ) {
605        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
606            match job_type {
607                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
608                    let job_id = info.streaming_job.id();
609                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
610                        let CreateStreamingJobStatus::Init = replace(
611                            &mut job_info.status,
612                            CreateStreamingJobStatus::Creating {
613                                tracker: CreateMviewProgressTracker::new(info, version_stats),
614                            },
615                        ) else {
616                            unreachable!("should be init before collect the first barrier")
617                        };
618                    } else {
619                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
620                    }
621                }
622                CreateStreamingJobType::SnapshotBackfill(_) => {
623                    // The progress of SnapshotBackfill won't be tracked here
624                }
625            }
626        }
627        if let PostCollectCommand::Reschedule { reschedules, .. } = command {
628            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
629            debug_assert!(
630                reschedules
631                    .values()
632                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
633                "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
634            );
635
636            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
637            let related_job_ids = reschedules
638                .keys()
639                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
640                .cloned()
641                .collect::<HashSet<_>>();
642            for job_id in related_job_ids {
643                if let Some(job) = self.jobs.get_mut(&job_id)
644                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
645                {
646                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
647                }
648            }
649        }
650        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
651            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
652                warn!(
653                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
654                );
655                continue;
656            };
657            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
658                CreateStreamingJobStatus::Init => {
659                    continue;
660                }
661                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
662                CreateStreamingJobStatus::Created => {
663                    if !progress.done {
664                        warn!("update the progress of an created streaming job: {progress:?}");
665                    }
666                    continue;
667                }
668            };
669            tracker.apply_progress(progress, version_stats);
670        }
671        for progress in resps
672            .iter()
673            .flat_map(|resp| &resp.cdc_table_backfill_progress)
674        {
675            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
676                warn!(
677                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
678                );
679                continue;
680            };
681            let Some(tracker) = &mut self
682                .jobs
683                .get_mut(job_id)
684                .expect("should exist")
685                .cdc_table_backfill_tracker
686            else {
687                warn!("update the cdc progress of an created streaming job: {progress:?}");
688                continue;
689            };
690            tracker.update_split_progress(progress);
691        }
692        // Handle CDC source offset updated events
693        for cdc_offset_updated in resps
694            .iter()
695            .flat_map(|resp| &resp.cdc_source_offset_updated)
696        {
697            use risingwave_common::id::SourceId;
698            let source_id = SourceId::new(cdc_offset_updated.source_id);
699            let job_id = source_id.as_share_source_job_id();
700            if let Some(job) = self.jobs.get_mut(&job_id) {
701                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
702                    tracker.mark_cdc_source_finished();
703                }
704            } else {
705                warn!(
706                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
707                    cdc_offset_updated.source_id, job_id
708                );
709            }
710        }
711    }
712
713    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
714        self.jobs.values().filter_map(|job| match &job.status {
715            CreateStreamingJobStatus::Init => None,
716            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
717            CreateStreamingJobStatus::Created => None,
718        })
719    }
720
721    fn iter_mut_creating_job_tracker(
722        &mut self,
723    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
724        self.jobs
725            .values_mut()
726            .filter_map(|job| match &mut job.status {
727                CreateStreamingJobStatus::Init => None,
728                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
729                CreateStreamingJobStatus::Created => None,
730            })
731    }
732
733    pub(super) fn has_pending_finished_jobs(&self) -> bool {
734        self.iter_creating_job_tracker()
735            .any(|tracker| tracker.is_finished())
736    }
737
738    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
739        self.iter_mut_creating_job_tracker()
740            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
741            .collect()
742    }
743
744    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
745        let mut finished_jobs = vec![];
746        let mut table_ids_to_truncate = vec![];
747        let mut finished_cdc_table_backfill = vec![];
748        for (job_id, job) in &mut self.jobs {
749            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
750                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
751                table_ids_to_truncate.extend(truncate_table_ids);
752                if is_finished {
753                    let CreateStreamingJobStatus::Creating { tracker, .. } =
754                        replace(&mut job.status, CreateStreamingJobStatus::Created)
755                    else {
756                        unreachable!()
757                    };
758                    finished_jobs.push(tracker.into_tracking_job());
759                }
760            }
761            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
762                && tracker.take_pre_completed()
763            {
764                finished_cdc_table_backfill.push(*job_id);
765            }
766        }
767        StagingCommitInfo {
768            finished_jobs,
769            table_ids_to_truncate,
770            finished_cdc_table_backfill,
771        }
772    }
773
774    pub fn fragment_subscribers(
775        &self,
776        fragment_id: FragmentId,
777    ) -> impl Iterator<Item = SubscriberId> + '_ {
778        let job_id = self.fragment_location[&fragment_id];
779        self.jobs[&job_id].subscribers.keys().copied()
780    }
781
782    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
783        self.jobs[&job_id].subscribers.keys().copied()
784    }
785
786    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
787        self.jobs
788            .iter()
789            .filter_map(|(job_id, info)| {
790                info.subscribers
791                    .values()
792                    .filter_map(|subscriber| match subscriber {
793                        SubscriberType::Subscription(retention) => Some(*retention),
794                        SubscriberType::SnapshotBackfill => None,
795                    })
796                    .max()
797                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
798            })
799            .collect()
800    }
801
802    pub fn register_subscriber(
803        &mut self,
804        job_id: JobId,
805        subscriber_id: SubscriberId,
806        subscriber: SubscriberType,
807    ) {
808        self.jobs
809            .get_mut(&job_id)
810            .expect("should exist")
811            .subscribers
812            .try_insert(subscriber_id, subscriber)
813            .expect("non duplicate");
814    }
815
816    pub fn unregister_subscriber(
817        &mut self,
818        job_id: JobId,
819        subscriber_id: SubscriberId,
820    ) -> Option<SubscriberType> {
821        self.jobs
822            .get_mut(&job_id)
823            .expect("should exist")
824            .subscribers
825            .remove(&subscriber_id)
826    }
827
828    pub fn update_subscription_retention(
829        &mut self,
830        job_id: JobId,
831        subscriber_id: SubscriberId,
832        retention_second: u64,
833    ) {
834        let job = self.jobs.get_mut(&job_id).expect("should exist");
835        match job.subscribers.get_mut(&subscriber_id) {
836            Some(SubscriberType::Subscription(current_retention)) => {
837                *current_retention = retention_second;
838            }
839            Some(SubscriberType::SnapshotBackfill) => {
840                warn!(
841                    %job_id,
842                    %subscriber_id,
843                    "cannot update retention for snapshot backfill subscriber"
844                );
845            }
846            None => {
847                warn!(%job_id, %subscriber_id, "subscription subscriber not found");
848            }
849        }
850    }
851
852    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
853        let job_id = self.fragment_location[&fragment_id];
854        let fragment = self
855            .jobs
856            .get_mut(&job_id)
857            .expect("should exist")
858            .fragment_infos
859            .get_mut(&fragment_id)
860            .expect("should exist");
861        (fragment, job_id)
862    }
863
864    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
865        Self {
866            database_id,
867            jobs: Default::default(),
868            fragment_location: Default::default(),
869            shared_actor_infos,
870        }
871    }
872
873    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
874        // remove the database because it's empty.
875        shared_actor_infos.remove_database(database_id);
876        Self::empty_inner(database_id, shared_actor_infos)
877    }
878
879    pub fn recover(
880        database_id: DatabaseId,
881        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
882        shared_actor_infos: SharedActorInfos,
883    ) -> Self {
884        let mut info = Self::empty_inner(database_id, shared_actor_infos);
885        for job in jobs {
886            info.add_existing(job);
887        }
888        info
889    }
890
891    pub fn is_empty(&self) -> bool {
892        self.jobs.is_empty()
893    }
894
895    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
896        let InflightStreamingJobInfo {
897            job_id,
898            fragment_infos,
899            subscribers,
900            status,
901            cdc_table_backfill_tracker,
902        } = job;
903        self.jobs
904            .try_insert(
905                job_id,
906                InflightStreamingJobInfo {
907                    job_id,
908                    subscribers,
909                    fragment_infos: Default::default(), // fill in later in pre_apply_new_fragments
910                    status,
911                    cdc_table_backfill_tracker,
912                },
913            )
914            .expect("non-duplicate");
915        self.pre_apply_new_fragments(
916            fragment_infos
917                .into_iter()
918                .map(|(fragment_id, info)| (fragment_id, job_id, info)),
919        );
920    }
921
922    /// Register a new streaming job entry (with empty `fragment_infos`).
923    pub(crate) fn pre_apply_new_job(
924        &mut self,
925        job_id: JobId,
926        cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
927    ) {
928        {
929            self.jobs
930                .try_insert(
931                    job_id,
932                    InflightStreamingJobInfo {
933                        job_id,
934                        fragment_infos: Default::default(),
935                        subscribers: Default::default(), // no subscriber for newly create job
936                        status: CreateStreamingJobStatus::Init,
937                        cdc_table_backfill_tracker,
938                    },
939                )
940                .expect("non-duplicate");
941        }
942    }
943
944    /// Add new fragment infos and update shared actor infos.
945    pub(crate) fn pre_apply_new_fragments(
946        &mut self,
947        fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
948    ) {
949        {
950            let shared_infos = self.shared_actor_infos.clone();
951            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
952            for (fragment_id, job_id, info) in fragments {
953                {
954                    {
955                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
956                        shared_actor_writer.upsert([(&info, job_id)]);
957                        fragment_infos
958                            .fragment_infos
959                            .try_insert(fragment_id, info)
960                            .expect("non duplicate");
961                        self.fragment_location
962                            .try_insert(fragment_id, job_id)
963                            .expect("non duplicate");
964                    }
965                }
966            }
967            shared_actor_writer.finish();
968        }
969    }
970
971    /// Pre-apply reschedule: update actors, vnode bitmaps, and splits.
972    /// The actual removal of old actors happens in `post_apply_reschedules`.
973    pub(crate) fn pre_apply_reschedule(
974        &mut self,
975        fragment_id: FragmentId,
976        new_actors: HashMap<ActorId, InflightActorInfo>,
977        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
978        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
979    ) {
980        {
981            {
982                {
983                    {
984                        let (info, _) = self.fragment_mut(fragment_id);
985                        let actors = &mut info.actors;
986                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
987                            actors
988                                .get_mut(&actor_id)
989                                .expect("should exist")
990                                .vnode_bitmap = Some(new_vnodes);
991                        }
992                        for (actor_id, actor) in new_actors {
993                            actors
994                                .try_insert(actor_id as _, actor)
995                                .expect("non-duplicate");
996                        }
997                        for (actor_id, splits) in actor_splits {
998                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
999                        }
1000                        // info will be upserted into shared_actor_infos in post_apply stage
1001                    }
1002                }
1003            }
1004        }
1005    }
1006
1007    /// Replace upstream fragment IDs in merge nodes of a fragment's stream graph.
1008    pub(crate) fn pre_apply_replace_node_upstream(
1009        &mut self,
1010        fragment_id: FragmentId,
1011        replace_map: &HashMap<FragmentId, FragmentId>,
1012    ) {
1013        {
1014            {
1015                {
1016                    {
1017                        let mut remaining_fragment_ids: HashSet<_> =
1018                            replace_map.keys().cloned().collect();
1019                        let (info, _) = self.fragment_mut(fragment_id);
1020                        visit_stream_node_mut(&mut info.nodes, |node| {
1021                            if let NodeBody::Merge(m) = node
1022                                && let Some(new_upstream_fragment_id) =
1023                                    replace_map.get(&m.upstream_fragment_id)
1024                            {
1025                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1026                                    if cfg!(debug_assertions) {
1027                                        panic!(
1028                                            "duplicate upstream fragment: {:?} {:?}",
1029                                            m, replace_map
1030                                        );
1031                                    } else {
1032                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1033                                    }
1034                                }
1035                                m.upstream_fragment_id = *new_upstream_fragment_id;
1036                            }
1037                        });
1038                        if cfg!(debug_assertions) {
1039                            assert!(
1040                                remaining_fragment_ids.is_empty(),
1041                                "non-existing fragment to replace: {:?} {:?} {:?}",
1042                                remaining_fragment_ids,
1043                                info.nodes,
1044                                replace_map
1045                            );
1046                        } else {
1047                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1048                        }
1049                    }
1050                }
1051            }
1052        }
1053    }
1054
1055    /// Add a new upstream sink node to a fragment's `UpstreamSinkUnion`.
1056    pub(crate) fn pre_apply_add_node_upstream(
1057        &mut self,
1058        fragment_id: FragmentId,
1059        new_upstream_info: &PbUpstreamSinkInfo,
1060    ) {
1061        {
1062            {
1063                {
1064                    {
1065                        let (info, _) = self.fragment_mut(fragment_id);
1066                        let mut injected = false;
1067                        visit_stream_node_mut(&mut info.nodes, |node| {
1068                            if let NodeBody::UpstreamSinkUnion(u) = node {
1069                                if cfg!(debug_assertions) {
1070                                    let current_upstream_fragment_ids = u
1071                                        .init_upstreams
1072                                        .iter()
1073                                        .map(|upstream| upstream.upstream_fragment_id)
1074                                        .collect::<HashSet<_>>();
1075                                    if current_upstream_fragment_ids
1076                                        .contains(&new_upstream_info.upstream_fragment_id)
1077                                    {
1078                                        panic!(
1079                                            "duplicate upstream fragment: {:?} {:?}",
1080                                            u, new_upstream_info
1081                                        );
1082                                    }
1083                                }
1084                                u.init_upstreams.push(new_upstream_info.clone());
1085                                injected = true;
1086                            }
1087                        });
1088                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1089                    }
1090                }
1091            }
1092        }
1093    }
1094
1095    /// Remove upstream sink nodes from a fragment's `UpstreamSinkUnion`.
1096    pub(crate) fn pre_apply_drop_node_upstream(
1097        &mut self,
1098        fragment_id: FragmentId,
1099        drop_upstream_fragment_ids: &[FragmentId],
1100    ) {
1101        {
1102            {
1103                {
1104                    {
1105                        let (info, _) = self.fragment_mut(fragment_id);
1106                        let mut removed = false;
1107                        visit_stream_node_mut(&mut info.nodes, |node| {
1108                            if let NodeBody::UpstreamSinkUnion(u) = node {
1109                                if cfg!(debug_assertions) {
1110                                    let current_upstream_fragment_ids = u
1111                                        .init_upstreams
1112                                        .iter()
1113                                        .map(|upstream| upstream.upstream_fragment_id)
1114                                        .collect::<HashSet<FragmentId>>();
1115                                    for drop_fragment_id in drop_upstream_fragment_ids {
1116                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1117                                        {
1118                                            panic!(
1119                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1120                                                u, drop_upstream_fragment_ids, drop_fragment_id
1121                                            );
1122                                        }
1123                                    }
1124                                }
1125                                u.init_upstreams.retain(|upstream| {
1126                                    !drop_upstream_fragment_ids
1127                                        .contains(&upstream.upstream_fragment_id)
1128                                });
1129                                removed = true;
1130                            }
1131                        });
1132                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1133                    }
1134                }
1135            }
1136        }
1137    }
1138
1139    /// Update split assignments for actors in fragments.
1140    pub(crate) fn pre_apply_split_assignments(
1141        &mut self,
1142        assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1143    ) {
1144        {
1145            let shared_infos = self.shared_actor_infos.clone();
1146            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1147            {
1148                {
1149                    for (fragment_id, actor_splits) in assignments {
1150                        let (info, job_id) = self.fragment_mut(fragment_id);
1151                        let actors = &mut info.actors;
1152                        for (actor_id, splits) in actor_splits {
1153                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1154                        }
1155                        shared_actor_writer.upsert([(&*info, job_id)]);
1156                    }
1157                }
1158            }
1159            shared_actor_writer.finish();
1160        }
1161    }
1162
1163    pub(super) fn build_edge(
1164        &self,
1165        info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1166        replace_job: Option<&ReplaceStreamJobPlan>,
1167        new_upstream_sink: Option<&UpstreamSinkInfo>,
1168        control_stream_manager: &ControlStreamManager,
1169    ) -> FragmentEdgeBuildResult {
1170        // `existing_fragment_ids` consists of
1171        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1172        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1173        // if the upstream fragment previously exists
1174        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1175        // if creating a new sink-into-table
1176        //  - should contain the `fragment_id` of the downstream table.
1177        let existing_fragment_ids = info
1178            .into_iter()
1179            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1180            .chain(replace_job.into_iter().flat_map(|replace_job| {
1181                replace_job
1182                    .upstream_fragment_downstreams
1183                    .keys()
1184                    .filter(|fragment_id| {
1185                        info.map(|(info, _)| {
1186                            !info
1187                                .stream_job_fragments
1188                                .fragments
1189                                .contains_key(*fragment_id)
1190                        })
1191                        .unwrap_or(true)
1192                    })
1193                    .chain(replace_job.replace_upstream.keys())
1194            }))
1195            .chain(
1196                new_upstream_sink
1197                    .into_iter()
1198                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1199            )
1200            .cloned();
1201        let new_fragment_infos = info
1202            .into_iter()
1203            .flat_map(|(info, is_snapshot_backfill)| {
1204                let partial_graph_id = to_partial_graph_id(
1205                    self.database_id,
1206                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1207                );
1208                info.stream_job_fragments
1209                    .new_fragment_info(&info.init_split_assignment)
1210                    .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1211            })
1212            .chain(
1213                replace_job
1214                    .into_iter()
1215                    .flat_map(|replace_job| {
1216                        replace_job
1217                            .new_fragments
1218                            .new_fragment_info(&replace_job.init_split_assignment)
1219                            .chain(
1220                                replace_job
1221                                    .auto_refresh_schema_sinks
1222                                    .as_ref()
1223                                    .into_iter()
1224                                    .flat_map(|sinks| {
1225                                        sinks.iter().map(|sink| {
1226                                            (
1227                                                sink.new_fragment.fragment_id,
1228                                                sink.new_fragment_info(),
1229                                            )
1230                                        })
1231                                    }),
1232                            )
1233                    })
1234                    .map(|(fragment_id, fragment)| {
1235                        (
1236                            fragment_id,
1237                            fragment,
1238                            // we assume that replace job only happens in database partial graph
1239                            to_partial_graph_id(self.database_id, None),
1240                        )
1241                    }),
1242            )
1243            .collect_vec();
1244        let mut builder = FragmentEdgeBuilder::new(
1245            existing_fragment_ids
1246                .map(|fragment_id| {
1247                    (
1248                        self.fragment(fragment_id),
1249                        to_partial_graph_id(self.database_id, None),
1250                    )
1251                })
1252                .chain(
1253                    new_fragment_infos
1254                        .iter()
1255                        .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1256                ),
1257            control_stream_manager,
1258        );
1259        if let Some((info, _)) = info {
1260            builder.add_relations(&info.upstream_fragment_downstreams);
1261            builder.add_relations(&info.stream_job_fragments.downstreams);
1262        }
1263        if let Some(replace_job) = replace_job {
1264            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1265            builder.add_relations(&replace_job.new_fragments.downstreams);
1266        }
1267        if let Some(new_upstream_sink) = new_upstream_sink {
1268            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1269            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1270            builder.add_edge(sink_fragment_id, new_sink_downstream);
1271        }
1272        if let Some(replace_job) = replace_job {
1273            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1274                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1275                    fragment_replacement
1276                {
1277                    builder.replace_upstream(
1278                        *fragment_id,
1279                        *original_upstream_fragment_id,
1280                        *new_upstream_fragment_id,
1281                    );
1282                }
1283            }
1284        }
1285        builder.build()
1286    }
1287
1288    /// Post-apply reschedule: remove actors that were marked for removal.
1289    pub(crate) fn post_apply_reschedules(
1290        &mut self,
1291        reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1292    ) {
1293        let inner = self.shared_actor_infos.clone();
1294        let mut shared_actor_writer = inner.start_writer(self.database_id);
1295        {
1296            {
1297                {
1298                    for (fragment_id, to_remove) in reschedules {
1299                        let job_id = self.fragment_location[&fragment_id];
1300                        let info = self
1301                            .jobs
1302                            .get_mut(&job_id)
1303                            .expect("should exist")
1304                            .fragment_infos
1305                            .get_mut(&fragment_id)
1306                            .expect("should exist");
1307                        for actor_id in to_remove {
1308                            assert!(info.actors.remove(&actor_id).is_some());
1309                        }
1310                        shared_actor_writer.upsert([(&*info, job_id)]);
1311                    }
1312                }
1313            }
1314        }
1315        shared_actor_writer.finish();
1316    }
1317
1318    /// Post-apply fragment removal: remove fragments and their jobs if empty.
1319    pub(crate) fn post_apply_remove_fragments(
1320        &mut self,
1321        fragment_ids: impl IntoIterator<Item = FragmentId>,
1322    ) {
1323        let inner = self.shared_actor_infos.clone();
1324        let mut shared_actor_writer = inner.start_writer(self.database_id);
1325        {
1326            {
1327                {
1328                    for fragment_id in fragment_ids {
1329                        let job_id = self
1330                            .fragment_location
1331                            .remove(&fragment_id)
1332                            .expect("should exist");
1333                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1334                        let fragment = job
1335                            .fragment_infos
1336                            .remove(&fragment_id)
1337                            .expect("should exist");
1338                        shared_actor_writer.remove(&fragment);
1339                        if job.fragment_infos.is_empty() {
1340                            self.jobs.remove(&job_id).expect("should exist");
1341                        }
1342                    }
1343                }
1344            }
1345        }
1346        shared_actor_writer.finish();
1347    }
1348}
1349
1350impl InflightFragmentInfo {
1351    /// Returns actor list to collect in the target worker node.
1352    pub(crate) fn actor_ids_to_collect(
1353        infos: impl IntoIterator<Item = &Self>,
1354    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1355        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1356        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1357            assert!(
1358                ret.entry(actor.worker_id)
1359                    .or_default()
1360                    .insert(*actor_id as _)
1361            )
1362        }
1363        ret
1364    }
1365
1366    pub fn existing_table_ids<'a>(
1367        infos: impl IntoIterator<Item = &'a Self> + 'a,
1368    ) -> impl Iterator<Item = TableId> + 'a {
1369        infos
1370            .into_iter()
1371            .flat_map(|info| info.state_table_ids.iter().cloned())
1372    }
1373
1374    pub fn workers<'a>(
1375        infos: impl IntoIterator<Item = &'a Self> + 'a,
1376    ) -> impl Iterator<Item = WorkerId> + 'a {
1377        infos
1378            .into_iter()
1379            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1380    }
1381
1382    pub fn contains_worker<'a>(
1383        infos: impl IntoIterator<Item = &'a Self> + 'a,
1384        worker_id: WorkerId,
1385    ) -> bool {
1386        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1387    }
1388}
1389
1390impl InflightDatabaseInfo {
1391    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1392        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1393    }
1394
1395    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1396        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1397    }
1398}