risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
42use crate::barrier::command::PostCollectCommand;
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
46use crate::barrier::{
47    BackfillProgress, BarrierKind, Command, CreateStreamingJobType, FragmentBackfillProgress,
48    TracedEpoch,
49};
50use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
51use crate::controller::utils::rebuild_fragment_mapping;
52use crate::manager::NotificationManagerRef;
53use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
54use crate::{MetaError, MetaResult};
55
56#[derive(Debug, Clone)]
57pub struct SharedActorInfo {
58    pub worker_id: WorkerId,
59    pub vnode_bitmap: Option<Bitmap>,
60    pub splits: Vec<SplitImpl>,
61}
62
63impl From<&InflightActorInfo> for SharedActorInfo {
64    fn from(value: &InflightActorInfo) -> Self {
65        Self {
66            worker_id: value.worker_id,
67            vnode_bitmap: value.vnode_bitmap.clone(),
68            splits: value.splits.clone(),
69        }
70    }
71}
72
73#[derive(Debug, Clone)]
74pub struct SharedFragmentInfo {
75    pub fragment_id: FragmentId,
76    pub job_id: JobId,
77    pub distribution_type: DistributionType,
78    pub actors: HashMap<ActorId, SharedActorInfo>,
79    pub vnode_count: usize,
80    pub fragment_type_mask: FragmentTypeMask,
81}
82
83impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
84    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
85        let (info, job_id) = pair;
86
87        let InflightFragmentInfo {
88            fragment_id,
89            distribution_type,
90            fragment_type_mask,
91            actors,
92            vnode_count,
93            ..
94        } = info;
95
96        Self {
97            fragment_id: *fragment_id,
98            job_id,
99            distribution_type: *distribution_type,
100            fragment_type_mask: *fragment_type_mask,
101            actors: actors
102                .iter()
103                .map(|(actor_id, actor)| (*actor_id, actor.into()))
104                .collect(),
105            vnode_count: *vnode_count,
106        }
107    }
108}
109
110#[derive(Default, Debug)]
111pub struct SharedActorInfosInner {
112    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
113}
114
115impl SharedActorInfosInner {
116    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
117        self.info
118            .values()
119            .find_map(|database| database.get(&fragment_id))
120    }
121
122    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
123        self.info.values().flatten()
124    }
125}
126
127#[derive(Clone, educe::Educe)]
128#[educe(Debug)]
129pub struct SharedActorInfos {
130    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
131    #[educe(Debug(ignore))]
132    notification_manager: NotificationManagerRef,
133}
134
135impl SharedActorInfos {
136    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
137        self.inner.read()
138    }
139
140    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
141        let core = self.inner.read();
142        core.iter_over_fragments()
143            .flat_map(|(_, fragment)| {
144                fragment
145                    .actors
146                    .iter()
147                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
148            })
149            .collect()
150    }
151
152    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
153    ///
154    /// Very occasionally split removal may happen during scaling, in which case we need to
155    /// use the old splits for reallocation instead of the latest splits (which may be missing),
156    /// so that we can resolve the split removal in the next command.
157    pub fn migrate_splits_for_source_actors(
158        &self,
159        fragment_id: FragmentId,
160        prev_actor_ids: &[ActorId],
161        curr_actor_ids: &[ActorId],
162    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
163        let guard = self.read_guard();
164
165        let prev_splits = prev_actor_ids
166            .iter()
167            .flat_map(|actor_id| {
168                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
169                guard
170                    .get_fragment(fragment_id)
171                    .and_then(|info| info.actors.get(actor_id))
172                    .map(|actor| actor.splits.clone())
173                    .unwrap_or_default()
174            })
175            .map(|split| (split.id(), split))
176            .collect();
177
178        let empty_actor_splits = curr_actor_ids
179            .iter()
180            .map(|actor_id| (*actor_id, vec![]))
181            .collect();
182
183        let diff = crate::stream::source_manager::reassign_splits(
184            fragment_id,
185            empty_actor_splits,
186            &prev_splits,
187            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
188            std::default::Default::default(),
189        )
190        .unwrap_or_default();
191
192        Ok(diff)
193    }
194}
195
196impl SharedActorInfos {
197    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
198        Self {
199            inner: Arc::new(Default::default()),
200            notification_manager,
201        }
202    }
203
204    pub(super) fn remove_database(&self, database_id: DatabaseId) {
205        if let Some(database) = self.inner.write().info.remove(&database_id) {
206            let mapping = database
207                .into_values()
208                .map(|fragment| rebuild_fragment_mapping(&fragment))
209                .collect_vec();
210            if !mapping.is_empty() {
211                self.notification_manager
212                    .notify_fragment_mapping(Operation::Delete, mapping);
213            }
214        }
215    }
216
217    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
218        let database_ids: HashSet<_> = database_ids.into_iter().collect();
219
220        let mut mapping = Vec::new();
221        for fragment in self
222            .inner
223            .write()
224            .info
225            .extract_if(|database_id, _| !database_ids.contains(database_id))
226            .flat_map(|(_, fragments)| fragments.into_values())
227        {
228            mapping.push(rebuild_fragment_mapping(&fragment));
229        }
230        if !mapping.is_empty() {
231            self.notification_manager
232                .notify_fragment_mapping(Operation::Delete, mapping);
233        }
234    }
235
236    pub(super) fn recover_database(
237        &self,
238        database_id: DatabaseId,
239        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
240    ) {
241        let mut remaining_fragments: HashMap<_, _> = fragments
242            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
243            .collect();
244        // delete the fragments that exist previously, but not included in the recovered fragments
245        let mut writer = self.start_writer(database_id);
246        let database = writer.write_guard.info.entry(database_id).or_default();
247        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
248            if let Some(info) = remaining_fragments.remove(fragment_id) {
249                let info = info.into();
250                writer
251                    .updated_fragment_mapping
252                    .get_or_insert_default()
253                    .push(rebuild_fragment_mapping(&info));
254                *fragment_infos = info;
255                false
256            } else {
257                true
258            }
259        }) {
260            writer
261                .deleted_fragment_mapping
262                .get_or_insert_default()
263                .push(rebuild_fragment_mapping(&fragment));
264        }
265        for (fragment_id, info) in remaining_fragments {
266            let info = info.into();
267            writer
268                .added_fragment_mapping
269                .get_or_insert_default()
270                .push(rebuild_fragment_mapping(&info));
271            database.insert(fragment_id, info);
272        }
273        writer.finish();
274    }
275
276    pub(super) fn upsert(
277        &self,
278        database_id: DatabaseId,
279        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
280    ) {
281        let mut writer = self.start_writer(database_id);
282        writer.upsert(infos);
283        writer.finish();
284    }
285
286    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
287        SharedActorInfoWriter {
288            database_id,
289            write_guard: self.inner.write(),
290            notification_manager: &self.notification_manager,
291            added_fragment_mapping: None,
292            updated_fragment_mapping: None,
293            deleted_fragment_mapping: None,
294        }
295    }
296}
297
298pub(super) struct SharedActorInfoWriter<'a> {
299    database_id: DatabaseId,
300    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
301    notification_manager: &'a NotificationManagerRef,
302    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
303    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
304    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
305}
306
307impl SharedActorInfoWriter<'_> {
308    pub(super) fn upsert(
309        &mut self,
310        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
311    ) {
312        let database = self.write_guard.info.entry(self.database_id).or_default();
313        for info @ (fragment, _) in infos {
314            match database.entry(fragment.fragment_id) {
315                Entry::Occupied(mut entry) => {
316                    let info = info.into();
317                    self.updated_fragment_mapping
318                        .get_or_insert_default()
319                        .push(rebuild_fragment_mapping(&info));
320                    entry.insert(info);
321                }
322                Entry::Vacant(entry) => {
323                    let info = info.into();
324                    self.added_fragment_mapping
325                        .get_or_insert_default()
326                        .push(rebuild_fragment_mapping(&info));
327                    entry.insert(info);
328                }
329            }
330        }
331    }
332
333    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
334        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
335            && let Some(fragment) = database.remove(&info.fragment_id)
336        {
337            self.deleted_fragment_mapping
338                .get_or_insert_default()
339                .push(rebuild_fragment_mapping(&fragment));
340        }
341    }
342
343    pub(super) fn finish(self) {
344        if let Some(mapping) = self.added_fragment_mapping {
345            self.notification_manager
346                .notify_fragment_mapping(Operation::Add, mapping);
347        }
348        if let Some(mapping) = self.updated_fragment_mapping {
349            self.notification_manager
350                .notify_fragment_mapping(Operation::Update, mapping);
351        }
352        if let Some(mapping) = self.deleted_fragment_mapping {
353            self.notification_manager
354                .notify_fragment_mapping(Operation::Delete, mapping);
355        }
356    }
357}
358
359#[derive(Debug, Clone)]
360pub(super) struct BarrierInfo {
361    pub prev_epoch: TracedEpoch,
362    pub curr_epoch: TracedEpoch,
363    pub kind: BarrierKind,
364}
365
366impl BarrierInfo {
367    pub(super) fn prev_epoch(&self) -> u64 {
368        self.prev_epoch.value().0
369    }
370
371    pub(super) fn curr_epoch(&self) -> u64 {
372        self.curr_epoch.value().0
373    }
374}
375
376#[derive(Debug)]
377pub(super) enum CommandFragmentChanges {
378    NewFragment {
379        job_id: JobId,
380        info: InflightFragmentInfo,
381    },
382    AddNodeUpstream(PbUpstreamSinkInfo),
383    DropNodeUpstream(Vec<FragmentId>),
384    ReplaceNodeUpstream(
385        /// old `fragment_id` -> new `fragment_id`
386        HashMap<FragmentId, FragmentId>,
387    ),
388    Reschedule {
389        new_actors: HashMap<ActorId, InflightActorInfo>,
390        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
391        to_remove: HashSet<ActorId>,
392        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393    },
394    RemoveFragment,
395    SplitAssignment {
396        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
397    },
398}
399
400pub(super) enum PostApplyFragmentChanges {
401    Reschedule { to_remove: HashSet<ActorId> },
402    RemoveFragment,
403}
404
405#[derive(Clone, Debug)]
406pub enum SubscriberType {
407    Subscription(u64),
408    SnapshotBackfill,
409}
410
411#[derive(Debug)]
412pub(super) enum CreateStreamingJobStatus {
413    Init,
414    Creating { tracker: CreateMviewProgressTracker },
415    Created,
416}
417
418#[derive(Debug)]
419pub(super) struct InflightStreamingJobInfo {
420    pub job_id: JobId,
421    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
422    pub subscribers: HashMap<SubscriberId, SubscriberType>,
423    pub status: CreateStreamingJobStatus,
424    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
425}
426
427impl InflightStreamingJobInfo {
428    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
429        self.fragment_infos.values()
430    }
431
432    pub fn snapshot_backfill_actor_ids(
433        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
434    ) -> impl Iterator<Item = ActorId> + '_ {
435        fragment_infos
436            .values()
437            .filter(|fragment| {
438                fragment
439                    .fragment_type_mask
440                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
441            })
442            .flat_map(|fragment| fragment.actors.keys().copied())
443    }
444
445    pub fn tracking_progress_actor_ids(
446        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
447    ) -> Vec<(ActorId, BackfillUpstreamType)> {
448        StreamJobFragments::tracking_progress_actor_ids_impl(
449            fragment_infos
450                .values()
451                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
452        )
453    }
454}
455
456impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
457    type Item = &'a InflightFragmentInfo;
458
459    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
460
461    fn into_iter(self) -> Self::IntoIter {
462        self.fragment_infos()
463    }
464}
465
466#[derive(Debug)]
467pub struct InflightDatabaseInfo {
468    pub(super) database_id: DatabaseId,
469    jobs: HashMap<JobId, InflightStreamingJobInfo>,
470    fragment_location: HashMap<FragmentId, JobId>,
471    pub(super) shared_actor_infos: SharedActorInfos,
472}
473
474impl InflightDatabaseInfo {
475    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
476        self.jobs.values().flat_map(|job| job.fragment_infos())
477    }
478
479    pub fn contains_job(&self, job_id: JobId) -> bool {
480        self.jobs.contains_key(&job_id)
481    }
482
483    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
484        let job_id = self.fragment_location[&fragment_id];
485        self.jobs
486            .get(&job_id)
487            .expect("should exist")
488            .fragment_infos
489            .get(&fragment_id)
490            .expect("should exist")
491    }
492
493    pub(super) fn backfill_fragment_ids_for_job(
494        &self,
495        job_id: JobId,
496    ) -> MetaResult<HashSet<FragmentId>> {
497        let job = self
498            .jobs
499            .get(&job_id)
500            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
501        Ok(job
502            .fragment_infos
503            .iter()
504            .filter_map(|(fragment_id, fragment)| {
505                fragment
506                    .fragment_type_mask
507                    .contains_any([
508                        FragmentTypeFlag::StreamScan,
509                        FragmentTypeFlag::SourceScan,
510                        FragmentTypeFlag::LocalityProvider,
511                    ])
512                    .then_some(*fragment_id)
513            })
514            .collect())
515    }
516
517    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
518        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
519            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
520        })?;
521        let fragment = self
522            .jobs
523            .get(job_id)
524            .expect("should exist")
525            .fragment_infos
526            .get(&fragment_id)
527            .expect("should exist");
528        Ok(fragment.fragment_type_mask.contains_any([
529            FragmentTypeFlag::StreamScan,
530            FragmentTypeFlag::SourceScan,
531            FragmentTypeFlag::LocalityProvider,
532        ]))
533    }
534
535    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
536        self.jobs
537            .iter()
538            .filter_map(|(job_id, job)| match &job.status {
539                CreateStreamingJobStatus::Init => None,
540                CreateStreamingJobStatus::Creating { tracker } => {
541                    let progress = tracker.gen_backfill_progress();
542                    Some((
543                        *job_id,
544                        BackfillProgress {
545                            progress,
546                            backfill_type: PbBackfillType::NormalBackfill,
547                        },
548                    ))
549                }
550                CreateStreamingJobStatus::Created => None,
551            })
552    }
553
554    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
555        self.jobs.iter().filter_map(|(job_id, job)| {
556            job.cdc_table_backfill_tracker
557                .as_ref()
558                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
559        })
560    }
561
562    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
563        let mut result = Vec::new();
564        for job in self.jobs.values() {
565            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
566                continue;
567            };
568            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
569            result.extend(fragment_progress);
570        }
571        result
572    }
573
574    pub(super) fn may_assign_fragment_cdc_backfill_splits(
575        &mut self,
576        fragment_id: FragmentId,
577    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
578        let job_id = self.fragment_location[&fragment_id];
579        let job = self.jobs.get_mut(&job_id).expect("should exist");
580        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
581            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
582            if cdc_scan_fragment_id != fragment_id {
583                return Ok(None);
584            }
585            let actors = job.fragment_infos[&cdc_scan_fragment_id]
586                .actors
587                .keys()
588                .copied()
589                .collect();
590            tracker.reassign_splits(actors).map(Some)
591        } else {
592            Ok(None)
593        }
594    }
595
596    pub(super) fn assign_cdc_backfill_splits(
597        &mut self,
598        job_id: JobId,
599    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
600        let job = self.jobs.get_mut(&job_id).expect("should exist");
601        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
602            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
603            let actors = job.fragment_infos[&cdc_scan_fragment_id]
604                .actors
605                .keys()
606                .copied()
607                .collect();
608            tracker.reassign_splits(actors).map(Some)
609        } else {
610            Ok(None)
611        }
612    }
613
614    pub(super) fn apply_collected_command(
615        &mut self,
616        command: &PostCollectCommand,
617        resps: &[BarrierCompleteResponse],
618        version_stats: &HummockVersionStats,
619    ) {
620        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
621            match job_type {
622                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
623                    let job_id = info.streaming_job.id();
624                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
625                        let CreateStreamingJobStatus::Init = replace(
626                            &mut job_info.status,
627                            CreateStreamingJobStatus::Creating {
628                                tracker: CreateMviewProgressTracker::new(info, version_stats),
629                            },
630                        ) else {
631                            unreachable!("should be init before collect the first barrier")
632                        };
633                    } else {
634                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
635                    }
636                }
637                CreateStreamingJobType::SnapshotBackfill(_) => {
638                    // The progress of SnapshotBackfill won't be tracked here
639                }
640            }
641        }
642        if let PostCollectCommand::RescheduleFragment { reschedules, .. } = command {
643            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
644            debug_assert!(
645                reschedules
646                    .values()
647                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
648                "RescheduleFragment should not carry vnode bitmap updates when actors are rebuilt"
649            );
650
651            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
652            let related_job_ids = reschedules
653                .keys()
654                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
655                .cloned()
656                .collect::<HashSet<_>>();
657            for job_id in related_job_ids {
658                if let Some(job) = self.jobs.get_mut(&job_id)
659                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
660                {
661                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
662                }
663            }
664        }
665        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
666            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
667                warn!(
668                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
669                );
670                continue;
671            };
672            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
673                CreateStreamingJobStatus::Init => {
674                    continue;
675                }
676                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
677                CreateStreamingJobStatus::Created => {
678                    if !progress.done {
679                        warn!("update the progress of an created streaming job: {progress:?}");
680                    }
681                    continue;
682                }
683            };
684            tracker.apply_progress(progress, version_stats);
685        }
686        for progress in resps
687            .iter()
688            .flat_map(|resp| &resp.cdc_table_backfill_progress)
689        {
690            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
691                warn!(
692                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
693                );
694                continue;
695            };
696            let Some(tracker) = &mut self
697                .jobs
698                .get_mut(job_id)
699                .expect("should exist")
700                .cdc_table_backfill_tracker
701            else {
702                warn!("update the cdc progress of an created streaming job: {progress:?}");
703                continue;
704            };
705            tracker.update_split_progress(progress);
706        }
707        // Handle CDC source offset updated events
708        for cdc_offset_updated in resps
709            .iter()
710            .flat_map(|resp| &resp.cdc_source_offset_updated)
711        {
712            use risingwave_common::id::SourceId;
713            let source_id = SourceId::new(cdc_offset_updated.source_id);
714            let job_id = source_id.as_share_source_job_id();
715            if let Some(job) = self.jobs.get_mut(&job_id) {
716                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
717                    tracker.mark_cdc_source_finished();
718                }
719            } else {
720                warn!(
721                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
722                    cdc_offset_updated.source_id, job_id
723                );
724            }
725        }
726    }
727
728    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
729        self.jobs.values().filter_map(|job| match &job.status {
730            CreateStreamingJobStatus::Init => None,
731            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
732            CreateStreamingJobStatus::Created => None,
733        })
734    }
735
736    fn iter_mut_creating_job_tracker(
737        &mut self,
738    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
739        self.jobs
740            .values_mut()
741            .filter_map(|job| match &mut job.status {
742                CreateStreamingJobStatus::Init => None,
743                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
744                CreateStreamingJobStatus::Created => None,
745            })
746    }
747
748    pub(super) fn has_pending_finished_jobs(&self) -> bool {
749        self.iter_creating_job_tracker()
750            .any(|tracker| tracker.is_finished())
751    }
752
753    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
754        self.iter_mut_creating_job_tracker()
755            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
756            .collect()
757    }
758
759    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
760        let mut finished_jobs = vec![];
761        let mut table_ids_to_truncate = vec![];
762        let mut finished_cdc_table_backfill = vec![];
763        for (job_id, job) in &mut self.jobs {
764            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
765                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
766                table_ids_to_truncate.extend(truncate_table_ids);
767                if is_finished {
768                    let CreateStreamingJobStatus::Creating { tracker, .. } =
769                        replace(&mut job.status, CreateStreamingJobStatus::Created)
770                    else {
771                        unreachable!()
772                    };
773                    finished_jobs.push(tracker.into_tracking_job());
774                }
775            }
776            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
777                && tracker.take_pre_completed()
778            {
779                finished_cdc_table_backfill.push(*job_id);
780            }
781        }
782        StagingCommitInfo {
783            finished_jobs,
784            table_ids_to_truncate,
785            finished_cdc_table_backfill,
786        }
787    }
788
789    pub fn fragment_subscribers(
790        &self,
791        fragment_id: FragmentId,
792    ) -> impl Iterator<Item = SubscriberId> + '_ {
793        let job_id = self.fragment_location[&fragment_id];
794        self.jobs[&job_id].subscribers.keys().copied()
795    }
796
797    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
798        self.jobs[&job_id].subscribers.keys().copied()
799    }
800
801    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
802        self.jobs
803            .iter()
804            .filter_map(|(job_id, info)| {
805                info.subscribers
806                    .values()
807                    .filter_map(|subscriber| match subscriber {
808                        SubscriberType::Subscription(retention) => Some(*retention),
809                        SubscriberType::SnapshotBackfill => None,
810                    })
811                    .max()
812                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
813            })
814            .collect()
815    }
816
817    pub fn register_subscriber(
818        &mut self,
819        job_id: JobId,
820        subscriber_id: SubscriberId,
821        subscriber: SubscriberType,
822    ) {
823        self.jobs
824            .get_mut(&job_id)
825            .expect("should exist")
826            .subscribers
827            .try_insert(subscriber_id, subscriber)
828            .expect("non duplicate");
829    }
830
831    pub fn unregister_subscriber(
832        &mut self,
833        job_id: JobId,
834        subscriber_id: SubscriberId,
835    ) -> Option<SubscriberType> {
836        self.jobs
837            .get_mut(&job_id)
838            .expect("should exist")
839            .subscribers
840            .remove(&subscriber_id)
841    }
842
843    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
844        let job_id = self.fragment_location[&fragment_id];
845        let fragment = self
846            .jobs
847            .get_mut(&job_id)
848            .expect("should exist")
849            .fragment_infos
850            .get_mut(&fragment_id)
851            .expect("should exist");
852        (fragment, job_id)
853    }
854
855    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
856        Self {
857            database_id,
858            jobs: Default::default(),
859            fragment_location: Default::default(),
860            shared_actor_infos,
861        }
862    }
863
864    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
865        // remove the database because it's empty.
866        shared_actor_infos.remove_database(database_id);
867        Self::empty_inner(database_id, shared_actor_infos)
868    }
869
870    pub fn recover(
871        database_id: DatabaseId,
872        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
873        shared_actor_infos: SharedActorInfos,
874    ) -> Self {
875        let mut info = Self::empty_inner(database_id, shared_actor_infos);
876        for job in jobs {
877            info.add_existing(job);
878        }
879        info
880    }
881
882    pub fn is_empty(&self) -> bool {
883        self.jobs.is_empty()
884    }
885
886    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
887        let InflightStreamingJobInfo {
888            job_id,
889            fragment_infos,
890            subscribers,
891            status,
892            cdc_table_backfill_tracker,
893        } = job;
894        self.jobs
895            .try_insert(
896                job.job_id,
897                InflightStreamingJobInfo {
898                    job_id,
899                    subscribers,
900                    fragment_infos: Default::default(), // fill in later in apply_add
901                    status,
902                    cdc_table_backfill_tracker,
903                },
904            )
905            .expect("non-duplicate");
906        let post_apply_changes =
907            self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
908                (
909                    fragment_id,
910                    CommandFragmentChanges::NewFragment {
911                        job_id: job.job_id,
912                        info,
913                    },
914                )
915            }));
916        self.post_apply(post_apply_changes);
917    }
918
919    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
920    /// the info correspondingly.
921    pub(crate) fn pre_apply(
922        &mut self,
923        new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
924        fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
925    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
926        if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
927            self.jobs
928                .try_insert(
929                    job_id,
930                    InflightStreamingJobInfo {
931                        job_id,
932                        fragment_infos: Default::default(),
933                        subscribers: Default::default(), // no subscriber for newly create job
934                        status: CreateStreamingJobStatus::Init,
935                        cdc_table_backfill_tracker,
936                    },
937                )
938                .expect("non-duplicate");
939        }
940        self.apply_add(fragment_changes.into_iter())
941    }
942
943    fn apply_add(
944        &mut self,
945        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
946    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
947        let mut post_apply = HashMap::new();
948        {
949            let shared_infos = self.shared_actor_infos.clone();
950            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
951            for (fragment_id, change) in fragment_changes {
952                match change {
953                    CommandFragmentChanges::NewFragment { job_id, info } => {
954                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
955                        shared_actor_writer.upsert([(&info, job_id)]);
956                        fragment_infos
957                            .fragment_infos
958                            .try_insert(fragment_id, info)
959                            .expect("non duplicate");
960                        self.fragment_location
961                            .try_insert(fragment_id, job_id)
962                            .expect("non duplicate");
963                    }
964                    CommandFragmentChanges::Reschedule {
965                        new_actors,
966                        actor_update_vnode_bitmap,
967                        to_remove,
968                        actor_splits,
969                    } => {
970                        let (info, _) = self.fragment_mut(fragment_id);
971                        let actors = &mut info.actors;
972                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
973                            actors
974                                .get_mut(&actor_id)
975                                .expect("should exist")
976                                .vnode_bitmap = Some(new_vnodes);
977                        }
978                        for (actor_id, actor) in new_actors {
979                            actors
980                                .try_insert(actor_id as _, actor)
981                                .expect("non-duplicate");
982                        }
983                        for (actor_id, splits) in actor_splits {
984                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
985                        }
986
987                        post_apply.insert(
988                            fragment_id,
989                            PostApplyFragmentChanges::Reschedule { to_remove },
990                        );
991
992                        // info will be upserted into shared_actor_infos in post_apply stage
993                    }
994                    CommandFragmentChanges::RemoveFragment => {
995                        post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
996                    }
997                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
998                        let mut remaining_fragment_ids: HashSet<_> =
999                            replace_map.keys().cloned().collect();
1000                        let (info, _) = self.fragment_mut(fragment_id);
1001                        visit_stream_node_mut(&mut info.nodes, |node| {
1002                            if let NodeBody::Merge(m) = node
1003                                && let Some(new_upstream_fragment_id) =
1004                                    replace_map.get(&m.upstream_fragment_id)
1005                            {
1006                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1007                                    if cfg!(debug_assertions) {
1008                                        panic!(
1009                                            "duplicate upstream fragment: {:?} {:?}",
1010                                            m, replace_map
1011                                        );
1012                                    } else {
1013                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1014                                    }
1015                                }
1016                                m.upstream_fragment_id = *new_upstream_fragment_id;
1017                            }
1018                        });
1019                        if cfg!(debug_assertions) {
1020                            assert!(
1021                                remaining_fragment_ids.is_empty(),
1022                                "non-existing fragment to replace: {:?} {:?} {:?}",
1023                                remaining_fragment_ids,
1024                                info.nodes,
1025                                replace_map
1026                            );
1027                        } else {
1028                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1029                        }
1030                    }
1031                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
1032                        let (info, _) = self.fragment_mut(fragment_id);
1033                        let mut injected = false;
1034                        visit_stream_node_mut(&mut info.nodes, |node| {
1035                            if let NodeBody::UpstreamSinkUnion(u) = node {
1036                                if cfg!(debug_assertions) {
1037                                    let current_upstream_fragment_ids = u
1038                                        .init_upstreams
1039                                        .iter()
1040                                        .map(|upstream| upstream.upstream_fragment_id)
1041                                        .collect::<HashSet<_>>();
1042                                    if current_upstream_fragment_ids
1043                                        .contains(&new_upstream_info.upstream_fragment_id)
1044                                    {
1045                                        panic!(
1046                                            "duplicate upstream fragment: {:?} {:?}",
1047                                            u, new_upstream_info
1048                                        );
1049                                    }
1050                                }
1051                                u.init_upstreams.push(new_upstream_info.clone());
1052                                injected = true;
1053                            }
1054                        });
1055                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1056                    }
1057                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
1058                        let (info, _) = self.fragment_mut(fragment_id);
1059                        let mut removed = false;
1060                        visit_stream_node_mut(&mut info.nodes, |node| {
1061                            if let NodeBody::UpstreamSinkUnion(u) = node {
1062                                if cfg!(debug_assertions) {
1063                                    let current_upstream_fragment_ids = u
1064                                        .init_upstreams
1065                                        .iter()
1066                                        .map(|upstream| upstream.upstream_fragment_id)
1067                                        .collect::<HashSet<FragmentId>>();
1068                                    for drop_fragment_id in &drop_upstream_fragment_ids {
1069                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1070                                        {
1071                                            panic!(
1072                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1073                                                u, drop_upstream_fragment_ids, drop_fragment_id
1074                                            );
1075                                        }
1076                                    }
1077                                }
1078                                u.init_upstreams.retain(|upstream| {
1079                                    !drop_upstream_fragment_ids
1080                                        .contains(&upstream.upstream_fragment_id)
1081                                });
1082                                removed = true;
1083                            }
1084                        });
1085                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1086                    }
1087                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
1088                        let (info, job_id) = self.fragment_mut(fragment_id);
1089                        let actors = &mut info.actors;
1090                        for (actor_id, splits) in actor_splits {
1091                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1092                        }
1093                        shared_actor_writer.upsert([(&*info, job_id)]);
1094                    }
1095                }
1096            }
1097            shared_actor_writer.finish();
1098        }
1099        post_apply
1100    }
1101
1102    pub(super) fn build_edge(
1103        &self,
1104        command: Option<&Command>,
1105        control_stream_manager: &ControlStreamManager,
1106    ) -> Option<FragmentEdgeBuildResult> {
1107        let (info, replace_job, new_upstream_sink) = match command {
1108            None => {
1109                return None;
1110            }
1111            Some(command) => match command {
1112                Command::Flush
1113                | Command::Pause
1114                | Command::Resume
1115                | Command::DropStreamingJobs { .. }
1116                | Command::RescheduleFragment { .. }
1117                | Command::SourceChangeSplit { .. }
1118                | Command::Throttle { .. }
1119                | Command::CreateSubscription { .. }
1120                | Command::DropSubscription { .. }
1121                | Command::ConnectorPropsChange(_)
1122                | Command::Refresh { .. }
1123                | Command::ListFinish { .. }
1124                | Command::LoadFinish { .. }
1125                | Command::ResumeBackfill { .. } => {
1126                    return None;
1127                }
1128                Command::CreateStreamingJob { info, job_type, .. } => {
1129                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1130                        new_upstream_sink,
1131                    ) = job_type
1132                    {
1133                        Some(new_upstream_sink)
1134                    } else {
1135                        None
1136                    };
1137                    let is_snapshot_backfill =
1138                        matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_));
1139                    (Some((info, is_snapshot_backfill)), None, new_upstream_sink)
1140                }
1141                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1142                Command::ResetSource { .. } => (None, None, None),
1143            },
1144        };
1145        // `existing_fragment_ids` consists of
1146        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1147        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1148        // if the upstream fragment previously exists
1149        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1150        // if creating a new sink-into-table
1151        //  - should contain the `fragment_id` of the downstream table.
1152        let existing_fragment_ids = info
1153            .into_iter()
1154            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1155            .chain(replace_job.into_iter().flat_map(|replace_job| {
1156                replace_job
1157                    .upstream_fragment_downstreams
1158                    .keys()
1159                    .filter(|fragment_id| {
1160                        info.map(|(info, _)| {
1161                            !info
1162                                .stream_job_fragments
1163                                .fragments
1164                                .contains_key(*fragment_id)
1165                        })
1166                        .unwrap_or(true)
1167                    })
1168                    .chain(replace_job.replace_upstream.keys())
1169            }))
1170            .chain(
1171                new_upstream_sink
1172                    .into_iter()
1173                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1174            )
1175            .cloned();
1176        let new_fragment_infos = info
1177            .into_iter()
1178            .flat_map(|(info, is_snapshot_backfill)| {
1179                let partial_graph_id = to_partial_graph_id(
1180                    self.database_id,
1181                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1182                );
1183                info.stream_job_fragments
1184                    .new_fragment_info(&info.init_split_assignment)
1185                    .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1186            })
1187            .chain(
1188                replace_job
1189                    .into_iter()
1190                    .flat_map(|replace_job| {
1191                        replace_job
1192                            .new_fragments
1193                            .new_fragment_info(&replace_job.init_split_assignment)
1194                            .chain(
1195                                replace_job
1196                                    .auto_refresh_schema_sinks
1197                                    .as_ref()
1198                                    .into_iter()
1199                                    .flat_map(|sinks| {
1200                                        sinks.iter().map(|sink| {
1201                                            (
1202                                                sink.new_fragment.fragment_id,
1203                                                sink.new_fragment_info(),
1204                                            )
1205                                        })
1206                                    }),
1207                            )
1208                    })
1209                    .map(|(fragment_id, fragment)| {
1210                        (
1211                            fragment_id,
1212                            fragment,
1213                            // we assume that replace job only happens in database partial graph
1214                            to_partial_graph_id(self.database_id, None),
1215                        )
1216                    }),
1217            )
1218            .collect_vec();
1219        let mut builder = FragmentEdgeBuilder::new(
1220            existing_fragment_ids
1221                .map(|fragment_id| {
1222                    (
1223                        self.fragment(fragment_id),
1224                        to_partial_graph_id(self.database_id, None),
1225                    )
1226                })
1227                .chain(
1228                    new_fragment_infos
1229                        .iter()
1230                        .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1231                ),
1232            control_stream_manager,
1233        );
1234        if let Some((info, _)) = info {
1235            builder.add_relations(&info.upstream_fragment_downstreams);
1236            builder.add_relations(&info.stream_job_fragments.downstreams);
1237        }
1238        if let Some(replace_job) = replace_job {
1239            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1240            builder.add_relations(&replace_job.new_fragments.downstreams);
1241        }
1242        if let Some(new_upstream_sink) = new_upstream_sink {
1243            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1244            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1245            builder.add_edge(sink_fragment_id, new_sink_downstream);
1246        }
1247        if let Some(replace_job) = replace_job {
1248            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1249                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1250                    fragment_replacement
1251                {
1252                    builder.replace_upstream(
1253                        *fragment_id,
1254                        *original_upstream_fragment_id,
1255                        *new_upstream_fragment_id,
1256                    );
1257                }
1258            }
1259        }
1260        Some(builder.build())
1261    }
1262
1263    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
1264    /// remove that from the snapshot correspondingly.
1265    pub(crate) fn post_apply(
1266        &mut self,
1267        fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1268    ) {
1269        let inner = self.shared_actor_infos.clone();
1270        let mut shared_actor_writer = inner.start_writer(self.database_id);
1271        {
1272            for (fragment_id, changes) in fragment_changes {
1273                match changes {
1274                    PostApplyFragmentChanges::Reschedule { to_remove } => {
1275                        let job_id = self.fragment_location[&fragment_id];
1276                        let info = self
1277                            .jobs
1278                            .get_mut(&job_id)
1279                            .expect("should exist")
1280                            .fragment_infos
1281                            .get_mut(&fragment_id)
1282                            .expect("should exist");
1283                        for actor_id in to_remove {
1284                            assert!(info.actors.remove(&actor_id).is_some());
1285                        }
1286                        shared_actor_writer.upsert([(&*info, job_id)]);
1287                    }
1288                    PostApplyFragmentChanges::RemoveFragment => {
1289                        let job_id = self
1290                            .fragment_location
1291                            .remove(&fragment_id)
1292                            .expect("should exist");
1293                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1294                        let fragment = job
1295                            .fragment_infos
1296                            .remove(&fragment_id)
1297                            .expect("should exist");
1298                        shared_actor_writer.remove(&fragment);
1299                        if job.fragment_infos.is_empty() {
1300                            self.jobs.remove(&job_id).expect("should exist");
1301                        }
1302                    }
1303                }
1304            }
1305        }
1306        shared_actor_writer.finish();
1307    }
1308}
1309
1310impl InflightFragmentInfo {
1311    /// Returns actor list to collect in the target worker node.
1312    pub(crate) fn actor_ids_to_collect(
1313        infos: impl IntoIterator<Item = &Self>,
1314    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1315        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1316        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1317            assert!(
1318                ret.entry(actor.worker_id)
1319                    .or_default()
1320                    .insert(*actor_id as _)
1321            )
1322        }
1323        ret
1324    }
1325
1326    pub fn existing_table_ids<'a>(
1327        infos: impl IntoIterator<Item = &'a Self> + 'a,
1328    ) -> impl Iterator<Item = TableId> + 'a {
1329        infos
1330            .into_iter()
1331            .flat_map(|info| info.state_table_ids.iter().cloned())
1332    }
1333
1334    pub fn workers<'a>(
1335        infos: impl IntoIterator<Item = &'a Self> + 'a,
1336    ) -> impl Iterator<Item = WorkerId> + 'a {
1337        infos
1338            .into_iter()
1339            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1340    }
1341
1342    pub fn contains_worker<'a>(
1343        infos: impl IntoIterator<Item = &'a Self> + 'a,
1344        worker_id: WorkerId,
1345    ) -> bool {
1346        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1347    }
1348}
1349
1350impl InflightDatabaseInfo {
1351    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1352        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1353    }
1354
1355    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1356        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1357    }
1358}