risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::ddl_service::PbBackfillType;
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
36use risingwave_pb::meta::subscribe_response::Operation;
37use risingwave_pb::source::PbCdcTableSnapshotSplits;
38use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{info, warn};
43
44use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
45use crate::barrier::command::{
46    CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
47};
48use crate::barrier::edge_builder::{
49    EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
50};
51use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
52use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
53use crate::barrier::{
54    BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
55};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::utils::rebuild_fragment_mapping;
58use crate::manager::NotificationManagerRef;
59use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
60use crate::stream::UpstreamSinkInfo;
61use crate::{MetaError, MetaResult};
62
63#[derive(Debug, Clone)]
64pub struct SharedActorInfo {
65    pub worker_id: WorkerId,
66    pub vnode_bitmap: Option<Bitmap>,
67    pub splits: Vec<SplitImpl>,
68}
69
70impl From<&InflightActorInfo> for SharedActorInfo {
71    fn from(value: &InflightActorInfo) -> Self {
72        Self {
73            worker_id: value.worker_id,
74            vnode_bitmap: value.vnode_bitmap.clone(),
75            splits: value.splits.clone(),
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct SharedFragmentInfo {
82    pub fragment_id: FragmentId,
83    pub job_id: JobId,
84    pub distribution_type: DistributionType,
85    pub actors: HashMap<ActorId, SharedActorInfo>,
86    pub vnode_count: usize,
87    pub fragment_type_mask: FragmentTypeMask,
88}
89
90impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
91    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
92        let (info, job_id) = pair;
93
94        let InflightFragmentInfo {
95            fragment_id,
96            distribution_type,
97            fragment_type_mask,
98            actors,
99            vnode_count,
100            ..
101        } = info;
102
103        Self {
104            fragment_id: *fragment_id,
105            job_id,
106            distribution_type: *distribution_type,
107            fragment_type_mask: *fragment_type_mask,
108            actors: actors
109                .iter()
110                .map(|(actor_id, actor)| (*actor_id, actor.into()))
111                .collect(),
112            vnode_count: *vnode_count,
113        }
114    }
115}
116
117#[derive(Default, Debug)]
118pub struct SharedActorInfosInner {
119    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
120}
121
122impl SharedActorInfosInner {
123    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
124        self.info
125            .values()
126            .find_map(|database| database.get(&fragment_id))
127    }
128
129    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
130        self.info.values().flatten()
131    }
132}
133
134#[derive(Clone, educe::Educe)]
135#[educe(Debug)]
136pub struct SharedActorInfos {
137    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
138    #[educe(Debug(ignore))]
139    notification_manager: NotificationManagerRef,
140}
141
142impl SharedActorInfos {
143    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
144        self.inner.read()
145    }
146
147    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
148        let core = self.inner.read();
149        core.iter_over_fragments()
150            .flat_map(|(_, fragment)| {
151                fragment
152                    .actors
153                    .iter()
154                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
155            })
156            .collect()
157    }
158
159    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
160    ///
161    /// Very occasionally split removal may happen during scaling, in which case we need to
162    /// use the old splits for reallocation instead of the latest splits (which may be missing),
163    /// so that we can resolve the split removal in the next command.
164    pub fn migrate_splits_for_source_actors(
165        &self,
166        fragment_id: FragmentId,
167        prev_actor_ids: &[ActorId],
168        curr_actor_ids: &[ActorId],
169    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
170        let guard = self.read_guard();
171
172        let prev_splits = prev_actor_ids
173            .iter()
174            .flat_map(|actor_id| {
175                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
176                guard
177                    .get_fragment(fragment_id)
178                    .and_then(|info| info.actors.get(actor_id))
179                    .map(|actor| actor.splits.clone())
180                    .unwrap_or_default()
181            })
182            .map(|split| (split.id(), split))
183            .collect();
184
185        let empty_actor_splits = curr_actor_ids
186            .iter()
187            .map(|actor_id| (*actor_id, vec![]))
188            .collect();
189
190        let diff = crate::stream::source_manager::reassign_splits(
191            fragment_id,
192            empty_actor_splits,
193            &prev_splits,
194            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
195            std::default::Default::default(),
196        )
197        .unwrap_or_default();
198
199        Ok(diff)
200    }
201}
202
203impl SharedActorInfos {
204    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
205        Self {
206            inner: Arc::new(Default::default()),
207            notification_manager,
208        }
209    }
210
211    pub(super) fn remove_database(&self, database_id: DatabaseId) {
212        if let Some(database) = self.inner.write().info.remove(&database_id) {
213            let mapping = database
214                .into_values()
215                .map(|fragment| rebuild_fragment_mapping(&fragment))
216                .collect_vec();
217            if !mapping.is_empty() {
218                self.notification_manager
219                    .notify_streaming_fragment_mapping(Operation::Delete, mapping);
220            }
221        }
222    }
223
224    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
225        let database_ids: HashSet<_> = database_ids.into_iter().collect();
226
227        let mut mapping = Vec::new();
228        for fragment in self
229            .inner
230            .write()
231            .info
232            .extract_if(|database_id, _| !database_ids.contains(database_id))
233            .flat_map(|(_, fragments)| fragments.into_values())
234        {
235            mapping.push(rebuild_fragment_mapping(&fragment));
236        }
237        if !mapping.is_empty() {
238            self.notification_manager
239                .notify_streaming_fragment_mapping(Operation::Delete, mapping);
240        }
241    }
242
243    pub(super) fn recover_database(
244        &self,
245        database_id: DatabaseId,
246        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
247    ) {
248        let mut remaining_fragments: HashMap<_, _> = fragments
249            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
250            .collect();
251        // delete the fragments that exist previously, but not included in the recovered fragments
252        let mut writer = self.start_writer(database_id);
253        let database = writer.write_guard.info.entry(database_id).or_default();
254        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
255            if let Some(info) = remaining_fragments.remove(fragment_id) {
256                let info = info.into();
257                writer
258                    .updated_fragment_mapping
259                    .get_or_insert_default()
260                    .push(rebuild_fragment_mapping(&info));
261                *fragment_infos = info;
262                false
263            } else {
264                true
265            }
266        }) {
267            writer
268                .deleted_fragment_mapping
269                .get_or_insert_default()
270                .push(rebuild_fragment_mapping(&fragment));
271        }
272        for (fragment_id, info) in remaining_fragments {
273            let info = info.into();
274            writer
275                .added_fragment_mapping
276                .get_or_insert_default()
277                .push(rebuild_fragment_mapping(&info));
278            database.insert(fragment_id, info);
279        }
280        writer.finish();
281    }
282
283    pub(super) fn upsert(
284        &self,
285        database_id: DatabaseId,
286        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
287    ) {
288        let mut writer = self.start_writer(database_id);
289        writer.upsert(infos);
290        writer.finish();
291    }
292
293    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
294        SharedActorInfoWriter {
295            database_id,
296            write_guard: self.inner.write(),
297            notification_manager: &self.notification_manager,
298            added_fragment_mapping: None,
299            updated_fragment_mapping: None,
300            deleted_fragment_mapping: None,
301        }
302    }
303}
304
305pub(super) struct SharedActorInfoWriter<'a> {
306    database_id: DatabaseId,
307    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
308    notification_manager: &'a NotificationManagerRef,
309    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
311    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
312}
313
314impl SharedActorInfoWriter<'_> {
315    pub(super) fn upsert(
316        &mut self,
317        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
318    ) {
319        let database = self.write_guard.info.entry(self.database_id).or_default();
320        for info @ (fragment, _) in infos {
321            match database.entry(fragment.fragment_id) {
322                Entry::Occupied(mut entry) => {
323                    let info = info.into();
324                    self.updated_fragment_mapping
325                        .get_or_insert_default()
326                        .push(rebuild_fragment_mapping(&info));
327                    entry.insert(info);
328                }
329                Entry::Vacant(entry) => {
330                    let info = info.into();
331                    self.added_fragment_mapping
332                        .get_or_insert_default()
333                        .push(rebuild_fragment_mapping(&info));
334                    entry.insert(info);
335                }
336            }
337        }
338    }
339
340    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
341        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
342            && let Some(fragment) = database.remove(&info.fragment_id)
343        {
344            self.deleted_fragment_mapping
345                .get_or_insert_default()
346                .push(rebuild_fragment_mapping(&fragment));
347        }
348    }
349
350    pub(super) fn finish(self) {
351        if let Some(mapping) = self.added_fragment_mapping {
352            self.notification_manager
353                .notify_streaming_fragment_mapping(Operation::Add, mapping);
354        }
355        if let Some(mapping) = self.updated_fragment_mapping {
356            self.notification_manager
357                .notify_streaming_fragment_mapping(Operation::Update, mapping);
358        }
359        if let Some(mapping) = self.deleted_fragment_mapping {
360            self.notification_manager
361                .notify_streaming_fragment_mapping(Operation::Delete, mapping);
362        }
363    }
364}
365
366#[derive(Debug, Clone)]
367pub(super) struct BarrierInfo {
368    pub prev_epoch: TracedEpoch,
369    pub curr_epoch: TracedEpoch,
370    pub kind: BarrierKind,
371}
372
373impl BarrierInfo {
374    pub(super) fn prev_epoch(&self) -> u64 {
375        self.prev_epoch.value().0
376    }
377
378    pub(super) fn curr_epoch(&self) -> u64 {
379        self.curr_epoch.value().0
380    }
381
382    pub(super) fn epoch(&self) -> EpochPair {
383        EpochPair {
384            curr: self.curr_epoch(),
385            prev: self.prev_epoch(),
386        }
387    }
388}
389
390#[derive(Clone, Debug)]
391pub enum SubscriberType {
392    Subscription(u64),
393    SnapshotBackfill,
394}
395
396#[derive(Debug)]
397pub(super) enum CreateStreamingJobStatus {
398    Init,
399    Creating { tracker: CreateMviewProgressTracker },
400    Created,
401}
402
403#[derive(Debug)]
404pub(super) struct InflightStreamingJobInfo {
405    pub job_id: JobId,
406    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407    pub subscribers: HashMap<SubscriberId, SubscriberType>,
408    pub status: CreateStreamingJobStatus,
409    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
410}
411
412impl InflightStreamingJobInfo {
413    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
414        self.fragment_infos.values()
415    }
416
417    pub fn snapshot_backfill_actor_ids(
418        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
419    ) -> impl Iterator<Item = ActorId> + '_ {
420        fragment_infos
421            .values()
422            .filter(|fragment| {
423                fragment
424                    .fragment_type_mask
425                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
426            })
427            .flat_map(|fragment| fragment.actors.keys().copied())
428    }
429
430    pub fn tracking_progress_actor_ids(
431        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
432    ) -> Vec<(ActorId, BackfillUpstreamType)> {
433        StreamJobFragments::tracking_progress_actor_ids_impl(
434            fragment_infos
435                .values()
436                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
437        )
438    }
439}
440
441impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
442    type Item = &'a InflightFragmentInfo;
443
444    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
445
446    fn into_iter(self) -> Self::IntoIter {
447        self.fragment_infos()
448    }
449}
450
451#[derive(Debug)]
452pub struct InflightDatabaseInfo {
453    pub(super) database_id: DatabaseId,
454    jobs: HashMap<JobId, InflightStreamingJobInfo>,
455    fragment_location: HashMap<FragmentId, JobId>,
456    pub(super) shared_actor_infos: SharedActorInfos,
457}
458
459impl InflightDatabaseInfo {
460    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
461        self.jobs.values().flat_map(|job| job.fragment_infos())
462    }
463
464    pub fn contains_job(&self, job_id: JobId) -> bool {
465        self.jobs.contains_key(&job_id)
466    }
467
468    pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
469        self.fragment_location.get(&fragment_id).copied()
470    }
471
472    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
473        let job_id = self.fragment_location[&fragment_id];
474        self.jobs
475            .get(&job_id)
476            .expect("should exist")
477            .fragment_infos
478            .get(&fragment_id)
479            .expect("should exist")
480    }
481
482    pub(super) fn backfill_fragment_ids_for_job(
483        &self,
484        job_id: JobId,
485    ) -> MetaResult<HashSet<FragmentId>> {
486        let job = self
487            .jobs
488            .get(&job_id)
489            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
490        Ok(job
491            .fragment_infos
492            .iter()
493            .filter_map(|(fragment_id, fragment)| {
494                fragment
495                    .fragment_type_mask
496                    .contains_any([
497                        FragmentTypeFlag::StreamScan,
498                        FragmentTypeFlag::SourceScan,
499                        FragmentTypeFlag::LocalityProvider,
500                    ])
501                    .then_some(*fragment_id)
502            })
503            .collect())
504    }
505
506    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
507        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
508            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
509        })?;
510        let fragment = self
511            .jobs
512            .get(job_id)
513            .expect("should exist")
514            .fragment_infos
515            .get(&fragment_id)
516            .expect("should exist");
517        Ok(fragment.fragment_type_mask.contains_any([
518            FragmentTypeFlag::StreamScan,
519            FragmentTypeFlag::SourceScan,
520            FragmentTypeFlag::LocalityProvider,
521        ]))
522    }
523
524    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
525        self.jobs
526            .iter()
527            .filter_map(|(job_id, job)| match &job.status {
528                CreateStreamingJobStatus::Init => None,
529                CreateStreamingJobStatus::Creating { tracker } => {
530                    let progress = tracker.gen_backfill_progress();
531                    Some((
532                        *job_id,
533                        BackfillProgress {
534                            progress,
535                            backfill_type: PbBackfillType::NormalBackfill,
536                        },
537                    ))
538                }
539                CreateStreamingJobStatus::Created => None,
540            })
541    }
542
543    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
544        self.jobs.iter().filter_map(|(job_id, job)| {
545            job.cdc_table_backfill_tracker
546                .as_ref()
547                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
548        })
549    }
550
551    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
552        let mut result = Vec::new();
553        for job in self.jobs.values() {
554            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
555                continue;
556            };
557            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
558            result.extend(fragment_progress);
559        }
560        result
561    }
562
563    pub(super) fn may_assign_fragment_cdc_backfill_splits(
564        &mut self,
565        fragment_id: FragmentId,
566    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
567        let job_id = self.fragment_location[&fragment_id];
568        let job = self.jobs.get_mut(&job_id).expect("should exist");
569        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
570            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
571            if cdc_scan_fragment_id != fragment_id {
572                return Ok(None);
573            }
574            let actors = job.fragment_infos[&cdc_scan_fragment_id]
575                .actors
576                .keys()
577                .copied()
578                .collect();
579            tracker.reassign_splits(actors).map(Some)
580        } else {
581            Ok(None)
582        }
583    }
584
585    pub(super) fn assign_cdc_backfill_splits(
586        &mut self,
587        job_id: JobId,
588    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
589        let job = self.jobs.get_mut(&job_id).expect("should exist");
590        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
591            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
592            let actors = job.fragment_infos[&cdc_scan_fragment_id]
593                .actors
594                .keys()
595                .copied()
596                .collect();
597            tracker.reassign_splits(actors).map(Some)
598        } else {
599            Ok(None)
600        }
601    }
602
603    pub(super) fn apply_collected_command(
604        &mut self,
605        command: &PostCollectCommand,
606        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
607        version_stats: &HummockVersionStats,
608    ) {
609        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
610            match job_type {
611                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
612                    let job_id = info.streaming_job.id();
613                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
614                        let CreateStreamingJobStatus::Init = replace(
615                            &mut job_info.status,
616                            CreateStreamingJobStatus::Creating {
617                                tracker: CreateMviewProgressTracker::new(
618                                    info,
619                                    version_stats,
620                                    &job_info.fragment_infos,
621                                ),
622                            },
623                        ) else {
624                            unreachable!("should be init before collect the first barrier")
625                        };
626                    } else {
627                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
628                    }
629                }
630                CreateStreamingJobType::SnapshotBackfill(_)
631                | CreateStreamingJobType::BatchRefresh(_) => {
632                    // The progress of SnapshotBackfill/BatchRefresh won't be tracked here
633                }
634            }
635        }
636        if let PostCollectCommand::Reschedule { reschedules, .. } = command {
637            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
638            debug_assert!(
639                reschedules
640                    .values()
641                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
642                "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
643            );
644
645            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
646            let related_job_ids = reschedules
647                .keys()
648                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
649                .cloned()
650                .collect::<HashSet<_>>();
651            for job_id in related_job_ids {
652                if let Some(job) = self.jobs.get_mut(&job_id)
653                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
654                {
655                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
656                }
657            }
658        }
659        for progress in resps.values().flat_map(|resp| &resp.create_mview_progress) {
660            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
661                warn!(
662                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663                );
664                continue;
665            };
666            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
667                CreateStreamingJobStatus::Init => {
668                    continue;
669                }
670                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
671                CreateStreamingJobStatus::Created => {
672                    if !progress.done {
673                        warn!("update the progress of an created streaming job: {progress:?}");
674                    }
675                    continue;
676                }
677            };
678            tracker.apply_progress(progress, version_stats);
679        }
680        for progress in resps
681            .values()
682            .flat_map(|resp| &resp.cdc_table_backfill_progress)
683        {
684            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
685                warn!(
686                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
687                );
688                continue;
689            };
690            let Some(tracker) = &mut self
691                .jobs
692                .get_mut(job_id)
693                .expect("should exist")
694                .cdc_table_backfill_tracker
695            else {
696                warn!("update the cdc progress of an created streaming job: {progress:?}");
697                continue;
698            };
699            tracker.update_split_progress(progress);
700        }
701        // Handle CDC source offset updated events
702        for cdc_offset_updated in resps
703            .values()
704            .flat_map(|resp| &resp.cdc_source_offset_updated)
705        {
706            use risingwave_common::id::SourceId;
707            let source_id = SourceId::new(cdc_offset_updated.source_id);
708            let job_id = source_id.as_share_source_job_id();
709            if let Some(job) = self.jobs.get_mut(&job_id) {
710                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
711                    tracker.mark_cdc_source_finished();
712                }
713            } else {
714                warn!(
715                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
716                    cdc_offset_updated.source_id, job_id
717                );
718            }
719        }
720    }
721
722    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
723        self.jobs.values().filter_map(|job| match &job.status {
724            CreateStreamingJobStatus::Init => None,
725            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
726            CreateStreamingJobStatus::Created => None,
727        })
728    }
729
730    fn iter_mut_creating_job_tracker(
731        &mut self,
732    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
733        self.jobs
734            .values_mut()
735            .filter_map(|job| match &mut job.status {
736                CreateStreamingJobStatus::Init => None,
737                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
738                CreateStreamingJobStatus::Created => None,
739            })
740    }
741
742    pub(super) fn has_pending_finished_jobs(&self) -> bool {
743        self.iter_creating_job_tracker()
744            .any(|tracker| tracker.is_finished())
745    }
746
747    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
748        self.iter_mut_creating_job_tracker()
749            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
750            .collect()
751    }
752
753    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
754        let mut finished_jobs = vec![];
755        let mut table_ids_to_truncate = vec![];
756        let mut finished_cdc_table_backfill = vec![];
757        for (job_id, job) in &mut self.jobs {
758            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
759                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
760                table_ids_to_truncate.extend(truncate_table_ids);
761                if is_finished {
762                    let CreateStreamingJobStatus::Creating { tracker, .. } =
763                        replace(&mut job.status, CreateStreamingJobStatus::Created)
764                    else {
765                        unreachable!()
766                    };
767                    finished_jobs.push(tracker.into_tracking_job());
768                }
769            }
770            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
771                && tracker.take_pre_completed()
772            {
773                finished_cdc_table_backfill.push(*job_id);
774            }
775        }
776        StagingCommitInfo {
777            finished_jobs,
778            table_ids_to_truncate,
779            finished_cdc_table_backfill,
780        }
781    }
782
783    pub fn fragment_subscribers(
784        &self,
785        fragment_id: FragmentId,
786    ) -> impl Iterator<Item = SubscriberId> + '_ {
787        let job_id = self.fragment_location[&fragment_id];
788        self.jobs[&job_id].subscribers.keys().copied()
789    }
790
791    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
792        self.jobs[&job_id].subscribers.keys().copied()
793    }
794
795    pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
796        self.jobs.iter().filter_map(|(job_id, info)| {
797            info.subscribers
798                .values()
799                .filter_map(|subscriber| match subscriber {
800                    SubscriberType::Subscription(retention) => Some(*retention),
801                    SubscriberType::SnapshotBackfill => None,
802                })
803                .max()
804                .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
805        })
806    }
807
808    pub fn register_subscriber(
809        &mut self,
810        job_id: JobId,
811        subscriber_id: SubscriberId,
812        subscriber: SubscriberType,
813    ) {
814        self.jobs
815            .get_mut(&job_id)
816            .expect("should exist")
817            .subscribers
818            .try_insert(subscriber_id, subscriber)
819            .expect("non duplicate");
820    }
821
822    pub fn unregister_subscriber(
823        &mut self,
824        job_id: JobId,
825        subscriber_id: SubscriberId,
826    ) -> Option<SubscriberType> {
827        self.jobs
828            .get_mut(&job_id)
829            .expect("should exist")
830            .subscribers
831            .remove(&subscriber_id)
832    }
833
834    pub fn update_subscription_retention(
835        &mut self,
836        job_id: JobId,
837        subscriber_id: SubscriberId,
838        retention_second: u64,
839    ) {
840        let job = self.jobs.get_mut(&job_id).expect("should exist");
841        match job.subscribers.get_mut(&subscriber_id) {
842            Some(SubscriberType::Subscription(current_retention)) => {
843                *current_retention = retention_second;
844            }
845            Some(SubscriberType::SnapshotBackfill) => {
846                warn!(
847                    %job_id,
848                    %subscriber_id,
849                    "cannot update retention for snapshot backfill subscriber"
850                );
851            }
852            None => {
853                warn!(%job_id, %subscriber_id, "subscription subscriber not found");
854            }
855        }
856    }
857
858    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
859        let job_id = self.fragment_location[&fragment_id];
860        let fragment = self
861            .jobs
862            .get_mut(&job_id)
863            .expect("should exist")
864            .fragment_infos
865            .get_mut(&fragment_id)
866            .expect("should exist");
867        (fragment, job_id)
868    }
869
870    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
871        Self {
872            database_id,
873            jobs: Default::default(),
874            fragment_location: Default::default(),
875            shared_actor_infos,
876        }
877    }
878
879    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
880        // remove the database because it's empty.
881        shared_actor_infos.remove_database(database_id);
882        Self::empty_inner(database_id, shared_actor_infos)
883    }
884
885    pub fn recover(
886        database_id: DatabaseId,
887        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
888        shared_actor_infos: SharedActorInfos,
889    ) -> Self {
890        let mut info = Self::empty_inner(database_id, shared_actor_infos);
891        for job in jobs {
892            info.add_existing(job);
893        }
894        info
895    }
896
897    pub fn is_empty(&self) -> bool {
898        self.jobs.is_empty()
899    }
900
901    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
902        let InflightStreamingJobInfo {
903            job_id,
904            fragment_infos,
905            subscribers,
906            status,
907            cdc_table_backfill_tracker,
908        } = job;
909        self.jobs
910            .try_insert(
911                job_id,
912                InflightStreamingJobInfo {
913                    job_id,
914                    subscribers,
915                    fragment_infos: Default::default(), // fill in later in pre_apply_new_fragments
916                    status,
917                    cdc_table_backfill_tracker,
918                },
919            )
920            .expect("non-duplicate");
921        self.pre_apply_new_fragments(
922            fragment_infos
923                .into_iter()
924                .map(|(fragment_id, info)| (fragment_id, job_id, info)),
925        );
926    }
927
928    /// Register a new streaming job entry (with empty `fragment_infos`).
929    pub(crate) fn pre_apply_new_job(
930        &mut self,
931        job_id: JobId,
932        cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
933    ) {
934        {
935            self.jobs
936                .try_insert(
937                    job_id,
938                    InflightStreamingJobInfo {
939                        job_id,
940                        fragment_infos: Default::default(),
941                        subscribers: Default::default(), // no subscriber for newly create job
942                        status: CreateStreamingJobStatus::Init,
943                        cdc_table_backfill_tracker,
944                    },
945                )
946                .expect("non-duplicate");
947        }
948    }
949
950    /// Add new fragment infos and update shared actor infos.
951    pub(crate) fn pre_apply_new_fragments(
952        &mut self,
953        fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
954    ) {
955        {
956            let shared_infos = self.shared_actor_infos.clone();
957            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
958            for (fragment_id, job_id, info) in fragments {
959                {
960                    {
961                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
962                        shared_actor_writer.upsert([(&info, job_id)]);
963                        fragment_infos
964                            .fragment_infos
965                            .try_insert(fragment_id, info)
966                            .expect("non duplicate");
967                        self.fragment_location
968                            .try_insert(fragment_id, job_id)
969                            .expect("non duplicate");
970                    }
971                }
972            }
973            shared_actor_writer.finish();
974        }
975    }
976
977    /// Pre-apply reschedule: update actors, vnode bitmaps, and splits.
978    /// The actual removal of old actors happens in `post_apply_reschedules`.
979    pub(crate) fn pre_apply_reschedule(
980        &mut self,
981        fragment_id: FragmentId,
982        new_actors: HashMap<ActorId, InflightActorInfo>,
983        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
984        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
985    ) {
986        {
987            {
988                {
989                    {
990                        let (info, _) = self.fragment_mut(fragment_id);
991                        let actors = &mut info.actors;
992                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
993                            actors
994                                .get_mut(&actor_id)
995                                .expect("should exist")
996                                .vnode_bitmap = Some(new_vnodes);
997                        }
998                        for (actor_id, actor) in new_actors {
999                            actors
1000                                .try_insert(actor_id as _, actor)
1001                                .expect("non-duplicate");
1002                        }
1003                        for (actor_id, splits) in actor_splits {
1004                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1005                        }
1006                        // info will be upserted into shared_actor_infos in post_apply stage
1007                    }
1008                }
1009            }
1010        }
1011    }
1012
1013    /// Replace upstream fragment IDs in merge nodes of a fragment's stream graph.
1014    pub(crate) fn pre_apply_replace_node_upstream(
1015        &mut self,
1016        fragment_id: FragmentId,
1017        replace_map: &HashMap<FragmentId, FragmentId>,
1018    ) {
1019        {
1020            {
1021                {
1022                    {
1023                        let mut remaining_fragment_ids: HashSet<_> =
1024                            replace_map.keys().cloned().collect();
1025                        let (info, _) = self.fragment_mut(fragment_id);
1026                        visit_stream_node_mut(&mut info.nodes, |node| {
1027                            if let NodeBody::Merge(m) = node
1028                                && let Some(new_upstream_fragment_id) =
1029                                    replace_map.get(&m.upstream_fragment_id)
1030                            {
1031                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1032                                    if cfg!(debug_assertions) {
1033                                        panic!(
1034                                            "duplicate upstream fragment: {:?} {:?}",
1035                                            m, replace_map
1036                                        );
1037                                    } else {
1038                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1039                                    }
1040                                }
1041                                m.upstream_fragment_id = *new_upstream_fragment_id;
1042                            }
1043                        });
1044                        if cfg!(debug_assertions) {
1045                            assert!(
1046                                remaining_fragment_ids.is_empty(),
1047                                "non-existing fragment to replace: {:?} {:?} {:?}",
1048                                remaining_fragment_ids,
1049                                info.nodes,
1050                                replace_map
1051                            );
1052                        } else {
1053                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1054                        }
1055                    }
1056                }
1057            }
1058        }
1059    }
1060
1061    /// Add a new upstream sink node to a fragment's `UpstreamSinkUnion`.
1062    pub(crate) fn pre_apply_add_node_upstream(
1063        &mut self,
1064        fragment_id: FragmentId,
1065        new_upstream_info: &PbUpstreamSinkInfo,
1066    ) {
1067        {
1068            {
1069                {
1070                    {
1071                        let (info, _) = self.fragment_mut(fragment_id);
1072                        let mut injected = false;
1073                        visit_stream_node_mut(&mut info.nodes, |node| {
1074                            if let NodeBody::UpstreamSinkUnion(u) = node {
1075                                if cfg!(debug_assertions) {
1076                                    let current_upstream_fragment_ids = u
1077                                        .init_upstreams
1078                                        .iter()
1079                                        .map(|upstream| upstream.upstream_fragment_id)
1080                                        .collect::<HashSet<_>>();
1081                                    if current_upstream_fragment_ids
1082                                        .contains(&new_upstream_info.upstream_fragment_id)
1083                                    {
1084                                        panic!(
1085                                            "duplicate upstream fragment: {:?} {:?}",
1086                                            u, new_upstream_info
1087                                        );
1088                                    }
1089                                }
1090                                u.init_upstreams.push(new_upstream_info.clone());
1091                                injected = true;
1092                            }
1093                        });
1094                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1095                    }
1096                }
1097            }
1098        }
1099    }
1100
1101    /// Remove upstream sink nodes from a fragment's `UpstreamSinkUnion`.
1102    pub(crate) fn pre_apply_drop_node_upstream(
1103        &mut self,
1104        fragment_id: FragmentId,
1105        drop_upstream_fragment_ids: &[FragmentId],
1106    ) {
1107        if !self.fragment_location.contains_key(&fragment_id) {
1108            warn!(
1109                target_fragment_id = %fragment_id,
1110                drop_upstream_fragment_ids = ?drop_upstream_fragment_ids,
1111                "skip dropping upstream sink fragments for non-existing target fragment"
1112            );
1113            return;
1114        }
1115        {
1116            {
1117                {
1118                    {
1119                        let (info, _) = self.fragment_mut(fragment_id);
1120                        let mut removed = false;
1121                        visit_stream_node_mut(&mut info.nodes, |node| {
1122                            if let NodeBody::UpstreamSinkUnion(u) = node {
1123                                if cfg!(debug_assertions) {
1124                                    let current_upstream_fragment_ids = u
1125                                        .init_upstreams
1126                                        .iter()
1127                                        .map(|upstream| upstream.upstream_fragment_id)
1128                                        .collect::<HashSet<FragmentId>>();
1129                                    for drop_fragment_id in drop_upstream_fragment_ids {
1130                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1131                                        {
1132                                            panic!(
1133                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1134                                                u, drop_upstream_fragment_ids, drop_fragment_id
1135                                            );
1136                                        }
1137                                    }
1138                                }
1139                                u.init_upstreams.retain(|upstream| {
1140                                    !drop_upstream_fragment_ids
1141                                        .contains(&upstream.upstream_fragment_id)
1142                                });
1143                                removed = true;
1144                            }
1145                        });
1146                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1147                    }
1148                }
1149            }
1150        }
1151    }
1152
1153    /// Sync inflight `nodes.rate_limit` so a later reschedule won't materialize new actors
1154    /// from stale data. Mirrors `controller/streaming_job.rs::update_*_rate_limit_by_*`.
1155    pub(crate) fn pre_apply_throttle(&mut self, fragment_id: FragmentId, config: &ThrottleConfig) {
1156        // Snapshot-backfill creating jobs live outside main inflight; their throttles
1157        // flow through `on_new_upstream_barrier`.
1158        if !self.fragment_location.contains_key(&fragment_id) {
1159            return;
1160        }
1161        let throttle_type = config.throttle_type();
1162        let rate_limit = config.rate_limit;
1163        let (info, _) = self.fragment_mut(fragment_id);
1164
1165        visit_stream_node_mut(&mut info.nodes, |node| match throttle_type {
1166            ThrottleType::Source => {
1167                if let NodeBody::Source(node) = node
1168                    && let Some(node_inner) = &mut node.source_inner
1169                {
1170                    node_inner.rate_limit = rate_limit;
1171                }
1172                if let NodeBody::StreamFsFetch(node) = node
1173                    && let Some(node_inner) = &mut node.node_inner
1174                {
1175                    node_inner.rate_limit = rate_limit;
1176                }
1177            }
1178            ThrottleType::Backfill => match node {
1179                NodeBody::StreamCdcScan(node) => node.rate_limit = rate_limit,
1180                NodeBody::StreamScan(node) => node.rate_limit = rate_limit,
1181                NodeBody::SourceBackfill(node) => node.rate_limit = rate_limit,
1182                _ => {}
1183            },
1184            ThrottleType::Sink => {
1185                if let NodeBody::Sink(node) = node {
1186                    node.rate_limit = rate_limit;
1187                }
1188            }
1189            ThrottleType::Dml => {
1190                if let NodeBody::Dml(node) = node {
1191                    node.rate_limit = rate_limit;
1192                }
1193            }
1194            ThrottleType::Unspecified => {}
1195        });
1196    }
1197
1198    /// Update split assignments for actors in fragments.
1199    pub(crate) fn pre_apply_split_assignments(
1200        &mut self,
1201        assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1202    ) {
1203        {
1204            let shared_infos = self.shared_actor_infos.clone();
1205            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1206            {
1207                {
1208                    for (fragment_id, actor_splits) in assignments {
1209                        let (info, job_id) = self.fragment_mut(fragment_id);
1210                        let actors = &mut info.actors;
1211                        for (actor_id, splits) in actor_splits {
1212                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1213                        }
1214                        shared_actor_writer.upsert([(&*info, job_id)]);
1215                    }
1216                }
1217            }
1218            shared_actor_writer.finish();
1219        }
1220    }
1221
1222    pub(super) fn build_edge(
1223        &self,
1224        info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1225        replace_job: Option<&ReplaceStreamJobPlan>,
1226        new_upstream_sink: Option<&UpstreamSinkInfo>,
1227        control_stream_manager: &ControlStreamManager,
1228        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1229        actor_location: &HashMap<ActorId, WorkerId>,
1230    ) -> FragmentEdgeBuildResult {
1231        // `existing_fragment_ids` consists of
1232        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1233        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1234        // if the upstream fragment previously exists
1235        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1236        // if creating a new sink-into-table
1237        //  - should contain the `fragment_id` of the downstream table.
1238        let existing_fragment_ids = info
1239            .into_iter()
1240            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1241            .chain(replace_job.into_iter().flat_map(|replace_job| {
1242                replace_job
1243                    .upstream_fragment_downstreams
1244                    .keys()
1245                    .filter(|fragment_id| {
1246                        info.map(|(info, _)| {
1247                            !info
1248                                .stream_job_fragments
1249                                .fragments
1250                                .contains_key(*fragment_id)
1251                        })
1252                        .unwrap_or(true)
1253                    })
1254                    .chain(replace_job.replace_upstream.keys())
1255            }))
1256            .chain(
1257                new_upstream_sink
1258                    .into_iter()
1259                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1260            )
1261            .cloned();
1262        // Collect new fragments with their partial graph IDs
1263        let new_fragments = info
1264            .into_iter()
1265            .flat_map(|(info, is_snapshot_backfill)| {
1266                let partial_graph_id = to_partial_graph_id(
1267                    self.database_id,
1268                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1269                );
1270                info.stream_job_fragments
1271                    .fragments
1272                    .values()
1273                    .map(move |fragment| (partial_graph_id, fragment))
1274            })
1275            .chain(replace_job.into_iter().flat_map(|replace_job| {
1276                replace_job
1277                    .new_fragments
1278                    .fragments
1279                    .values()
1280                    .chain(
1281                        replace_job
1282                            .auto_refresh_schema_sinks
1283                            .as_ref()
1284                            .into_iter()
1285                            .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1286                    )
1287                    .map(|fragment| {
1288                        (
1289                            // we assume that replace job only happens in database partial graph
1290                            to_partial_graph_id(self.database_id, None),
1291                            fragment,
1292                        )
1293                    })
1294            }));
1295
1296        let mut builder = FragmentEdgeBuilder::new(
1297            // Existing fragments
1298            existing_fragment_ids
1299                .map(|fragment_id| {
1300                    (
1301                        fragment_id,
1302                        EdgeBuilderFragmentInfo::from_inflight(
1303                            self.fragment(fragment_id),
1304                            to_partial_graph_id(self.database_id, None),
1305                            control_stream_manager,
1306                        ),
1307                    )
1308                })
1309                // New fragments from create/replace jobs
1310                .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1311                    (
1312                        fragment.fragment_id,
1313                        EdgeBuilderFragmentInfo::from_fragment(
1314                            fragment,
1315                            stream_actors,
1316                            actor_location,
1317                            partial_graph_id,
1318                            control_stream_manager,
1319                        ),
1320                    )
1321                })),
1322        );
1323        if let Some((info, _)) = info {
1324            builder.add_relations(&info.upstream_fragment_downstreams);
1325            builder.add_relations(&info.stream_job_fragments.downstreams);
1326        }
1327        if let Some(replace_job) = replace_job {
1328            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1329            builder.add_relations(&replace_job.new_fragments.downstreams);
1330        }
1331        if let Some(new_upstream_sink) = new_upstream_sink {
1332            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1333            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1334            builder.add_edge(sink_fragment_id, new_sink_downstream);
1335        }
1336        if let Some(replace_job) = replace_job {
1337            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1338                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1339                    fragment_replacement
1340                {
1341                    builder.replace_upstream(
1342                        *fragment_id,
1343                        *original_upstream_fragment_id,
1344                        *new_upstream_fragment_id,
1345                    );
1346                }
1347            }
1348        }
1349        builder.build()
1350    }
1351
1352    /// Post-apply reschedule: remove actors that were marked for removal.
1353    pub(crate) fn post_apply_reschedules(
1354        &mut self,
1355        reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1356    ) {
1357        let inner = self.shared_actor_infos.clone();
1358        let mut shared_actor_writer = inner.start_writer(self.database_id);
1359        {
1360            {
1361                {
1362                    for (fragment_id, to_remove) in reschedules {
1363                        let job_id = self.fragment_location[&fragment_id];
1364                        let info = self
1365                            .jobs
1366                            .get_mut(&job_id)
1367                            .expect("should exist")
1368                            .fragment_infos
1369                            .get_mut(&fragment_id)
1370                            .expect("should exist");
1371                        for actor_id in to_remove {
1372                            assert!(info.actors.remove(&actor_id).is_some());
1373                        }
1374                        shared_actor_writer.upsert([(&*info, job_id)]);
1375                    }
1376                }
1377            }
1378        }
1379        shared_actor_writer.finish();
1380    }
1381
1382    /// Post-apply fragment removal: remove fragments and their jobs if empty.
1383    pub(crate) fn post_apply_remove_fragments(
1384        &mut self,
1385        fragment_ids: impl IntoIterator<Item = FragmentId>,
1386    ) {
1387        let inner = self.shared_actor_infos.clone();
1388        let mut shared_actor_writer = inner.start_writer(self.database_id);
1389        {
1390            {
1391                {
1392                    for fragment_id in fragment_ids {
1393                        let job_id = self
1394                            .fragment_location
1395                            .remove(&fragment_id)
1396                            .expect("should exist");
1397                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1398                        let fragment = job
1399                            .fragment_infos
1400                            .remove(&fragment_id)
1401                            .expect("should exist");
1402                        shared_actor_writer.remove(&fragment);
1403                        if job.fragment_infos.is_empty() {
1404                            self.jobs.remove(&job_id).expect("should exist");
1405                        }
1406                    }
1407                }
1408            }
1409        }
1410        shared_actor_writer.finish();
1411    }
1412
1413    pub(crate) fn post_apply_remove_job(
1414        &mut self,
1415        job_id: JobId,
1416    ) -> Option<InflightStreamingJobInfo> {
1417        let job = self.jobs.remove(&job_id)?;
1418        let inner = self.shared_actor_infos.clone();
1419        let mut shared_actor_writer = inner.start_writer(self.database_id);
1420        for (fragment_id, fragment) in &job.fragment_infos {
1421            self.fragment_location
1422                .remove(fragment_id)
1423                .expect("should exist");
1424            shared_actor_writer.remove(fragment);
1425        }
1426        shared_actor_writer.finish();
1427        Some(job)
1428    }
1429}
1430
1431impl InflightFragmentInfo {
1432    /// Returns actor list to collect in the target worker node.
1433    pub(crate) fn actor_ids_to_collect(
1434        infos: impl IntoIterator<Item = &Self>,
1435    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1436        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1437        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1438            assert!(
1439                ret.entry(actor.worker_id)
1440                    .or_default()
1441                    .insert(*actor_id as _)
1442            )
1443        }
1444        ret
1445    }
1446
1447    pub fn existing_table_ids<'a>(
1448        infos: impl IntoIterator<Item = &'a Self> + 'a,
1449    ) -> impl Iterator<Item = TableId> + 'a {
1450        infos
1451            .into_iter()
1452            .flat_map(|info| info.state_table_ids.iter().cloned())
1453    }
1454
1455    pub fn workers<'a>(
1456        infos: impl IntoIterator<Item = &'a Self> + 'a,
1457    ) -> impl Iterator<Item = WorkerId> + 'a {
1458        infos
1459            .into_iter()
1460            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1461    }
1462
1463    pub fn contains_worker<'a>(
1464        infos: impl IntoIterator<Item = &'a Self> + 'a,
1465        worker_id: WorkerId,
1466    ) -> bool {
1467        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1468    }
1469}
1470
1471impl InflightDatabaseInfo {
1472    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1473        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1474    }
1475
1476    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1477        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1478    }
1479}