Skip to main content

risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_meta_model::WorkerId;
30use risingwave_meta_model::fragment::DistributionType;
31use risingwave_pb::common::ThrottleType;
32use risingwave_pb::ddl_service::PbBackfillType;
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::id::SubscriberId;
35use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
36use risingwave_pb::meta::subscribe_response::Operation;
37use risingwave_pb::source::PbCdcTableSnapshotSplits;
38use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
39use risingwave_pb::stream_plan::stream_node::NodeBody;
40use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
41use risingwave_pb::stream_service::BarrierCompleteResponse;
42use tracing::{info, warn};
43
44use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
45use crate::barrier::command::{
46    CreateStreamingJobCommandInfo, PostCollectCommand, ReplaceStreamJobPlan,
47};
48use crate::barrier::edge_builder::{
49    EdgeBuilderFragmentInfo, FragmentEdgeBuildResult, FragmentEdgeBuilder,
50};
51use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
52use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
53use crate::barrier::{
54    BackfillProgress, BarrierKind, CreateStreamingJobType, FragmentBackfillProgress, TracedEpoch,
55};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::utils::rebuild_fragment_mapping;
58use crate::manager::NotificationManagerRef;
59use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamActor, StreamJobFragments};
60use crate::stream::UpstreamSinkInfo;
61use crate::{MetaError, MetaResult};
62
63#[derive(Debug, Clone)]
64pub struct SharedActorInfo {
65    pub worker_id: WorkerId,
66    pub vnode_bitmap: Option<Bitmap>,
67    pub splits: Vec<SplitImpl>,
68}
69
70impl From<&InflightActorInfo> for SharedActorInfo {
71    fn from(value: &InflightActorInfo) -> Self {
72        Self {
73            worker_id: value.worker_id,
74            vnode_bitmap: value.vnode_bitmap.clone(),
75            splits: value.splits.clone(),
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct SharedFragmentInfo {
82    pub fragment_id: FragmentId,
83    pub job_id: JobId,
84    pub distribution_type: DistributionType,
85    pub actors: HashMap<ActorId, SharedActorInfo>,
86    pub vnode_count: usize,
87    pub fragment_type_mask: FragmentTypeMask,
88}
89
90impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
91    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
92        let (info, job_id) = pair;
93
94        let InflightFragmentInfo {
95            fragment_id,
96            distribution_type,
97            fragment_type_mask,
98            actors,
99            vnode_count,
100            ..
101        } = info;
102
103        Self {
104            fragment_id: *fragment_id,
105            job_id,
106            distribution_type: *distribution_type,
107            fragment_type_mask: *fragment_type_mask,
108            actors: actors
109                .iter()
110                .map(|(actor_id, actor)| (*actor_id, actor.into()))
111                .collect(),
112            vnode_count: *vnode_count,
113        }
114    }
115}
116
117#[derive(Default, Debug)]
118pub struct SharedActorInfosInner {
119    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
120}
121
122impl SharedActorInfosInner {
123    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
124        self.info
125            .values()
126            .find_map(|database| database.get(&fragment_id))
127    }
128
129    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
130        self.info.values().flatten()
131    }
132}
133
134#[derive(Clone, educe::Educe)]
135#[educe(Debug)]
136pub struct SharedActorInfos {
137    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
138    #[educe(Debug(ignore))]
139    notification_manager: NotificationManagerRef,
140}
141
142impl SharedActorInfos {
143    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
144        self.inner.read()
145    }
146
147    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
148        let core = self.inner.read();
149        core.iter_over_fragments()
150            .flat_map(|(_, fragment)| {
151                fragment
152                    .actors
153                    .iter()
154                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
155            })
156            .collect()
157    }
158
159    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
160    ///
161    /// Very occasionally split removal may happen during scaling, in which case we need to
162    /// use the old splits for reallocation instead of the latest splits (which may be missing),
163    /// so that we can resolve the split removal in the next command.
164    pub fn migrate_splits_for_source_actors(
165        &self,
166        fragment_id: FragmentId,
167        prev_actor_ids: &[ActorId],
168        curr_actor_ids: &[ActorId],
169    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
170        let guard = self.read_guard();
171
172        let prev_splits = prev_actor_ids
173            .iter()
174            .flat_map(|actor_id| {
175                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
176                guard
177                    .get_fragment(fragment_id)
178                    .and_then(|info| info.actors.get(actor_id))
179                    .map(|actor| actor.splits.clone())
180                    .unwrap_or_default()
181            })
182            .map(|split| (split.id(), split))
183            .collect();
184
185        let empty_actor_splits = curr_actor_ids
186            .iter()
187            .map(|actor_id| (*actor_id, vec![]))
188            .collect();
189
190        let diff = crate::stream::source_manager::reassign_splits(
191            fragment_id,
192            empty_actor_splits,
193            &prev_splits,
194            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
195            std::default::Default::default(),
196        )
197        .unwrap_or_default();
198
199        Ok(diff)
200    }
201}
202
203impl SharedActorInfos {
204    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
205        Self {
206            inner: Arc::new(Default::default()),
207            notification_manager,
208        }
209    }
210
211    pub(super) fn remove_database(&self, database_id: DatabaseId) {
212        if let Some(database) = self.inner.write().info.remove(&database_id) {
213            let mapping = database
214                .into_values()
215                .map(|fragment| rebuild_fragment_mapping(&fragment))
216                .collect_vec();
217            if !mapping.is_empty() {
218                self.notification_manager
219                    .notify_streaming_fragment_mapping(Operation::Delete, mapping);
220            }
221        }
222    }
223
224    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
225        let database_ids: HashSet<_> = database_ids.into_iter().collect();
226
227        let mut mapping = Vec::new();
228        for fragment in self
229            .inner
230            .write()
231            .info
232            .extract_if(|database_id, _| !database_ids.contains(database_id))
233            .flat_map(|(_, fragments)| fragments.into_values())
234        {
235            mapping.push(rebuild_fragment_mapping(&fragment));
236        }
237        if !mapping.is_empty() {
238            self.notification_manager
239                .notify_streaming_fragment_mapping(Operation::Delete, mapping);
240        }
241    }
242
243    pub(super) fn recover_database(
244        &self,
245        database_id: DatabaseId,
246        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
247    ) {
248        let mut remaining_fragments: HashMap<_, _> = fragments
249            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
250            .collect();
251        // delete the fragments that exist previously, but not included in the recovered fragments
252        let mut writer = self.start_writer(database_id);
253        let database = writer.write_guard.info.entry(database_id).or_default();
254        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
255            if let Some(info) = remaining_fragments.remove(fragment_id) {
256                let info = info.into();
257                writer
258                    .updated_fragment_mapping
259                    .get_or_insert_default()
260                    .push(rebuild_fragment_mapping(&info));
261                *fragment_infos = info;
262                false
263            } else {
264                true
265            }
266        }) {
267            writer
268                .deleted_fragment_mapping
269                .get_or_insert_default()
270                .push(rebuild_fragment_mapping(&fragment));
271        }
272        for (fragment_id, info) in remaining_fragments {
273            let info = info.into();
274            writer
275                .added_fragment_mapping
276                .get_or_insert_default()
277                .push(rebuild_fragment_mapping(&info));
278            database.insert(fragment_id, info);
279        }
280        writer.finish();
281    }
282
283    pub(super) fn upsert(
284        &self,
285        database_id: DatabaseId,
286        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
287    ) {
288        let mut writer = self.start_writer(database_id);
289        writer.upsert(infos);
290        writer.finish();
291    }
292
293    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
294        SharedActorInfoWriter {
295            database_id,
296            write_guard: self.inner.write(),
297            notification_manager: &self.notification_manager,
298            added_fragment_mapping: None,
299            updated_fragment_mapping: None,
300            deleted_fragment_mapping: None,
301        }
302    }
303}
304
305pub(super) struct SharedActorInfoWriter<'a> {
306    database_id: DatabaseId,
307    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
308    notification_manager: &'a NotificationManagerRef,
309    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
310    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
311    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
312}
313
314impl SharedActorInfoWriter<'_> {
315    pub(super) fn upsert(
316        &mut self,
317        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
318    ) {
319        let database = self.write_guard.info.entry(self.database_id).or_default();
320        for info @ (fragment, _) in infos {
321            match database.entry(fragment.fragment_id) {
322                Entry::Occupied(mut entry) => {
323                    let info = info.into();
324                    self.updated_fragment_mapping
325                        .get_or_insert_default()
326                        .push(rebuild_fragment_mapping(&info));
327                    entry.insert(info);
328                }
329                Entry::Vacant(entry) => {
330                    let info = info.into();
331                    self.added_fragment_mapping
332                        .get_or_insert_default()
333                        .push(rebuild_fragment_mapping(&info));
334                    entry.insert(info);
335                }
336            }
337        }
338    }
339
340    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
341        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
342            && let Some(fragment) = database.remove(&info.fragment_id)
343        {
344            self.deleted_fragment_mapping
345                .get_or_insert_default()
346                .push(rebuild_fragment_mapping(&fragment));
347        }
348    }
349
350    pub(super) fn finish(self) {
351        if let Some(mapping) = self.added_fragment_mapping {
352            self.notification_manager
353                .notify_streaming_fragment_mapping(Operation::Add, mapping);
354        }
355        if let Some(mapping) = self.updated_fragment_mapping {
356            self.notification_manager
357                .notify_streaming_fragment_mapping(Operation::Update, mapping);
358        }
359        if let Some(mapping) = self.deleted_fragment_mapping {
360            self.notification_manager
361                .notify_streaming_fragment_mapping(Operation::Delete, mapping);
362        }
363    }
364}
365
366#[derive(Debug, Clone)]
367pub(super) struct BarrierInfo {
368    pub prev_epoch: TracedEpoch,
369    pub curr_epoch: TracedEpoch,
370    pub kind: BarrierKind,
371}
372
373impl BarrierInfo {
374    pub(super) fn prev_epoch(&self) -> u64 {
375        self.prev_epoch.value().0
376    }
377
378    pub(super) fn curr_epoch(&self) -> u64 {
379        self.curr_epoch.value().0
380    }
381
382    pub(super) fn epoch(&self) -> EpochPair {
383        EpochPair {
384            curr: self.curr_epoch(),
385            prev: self.prev_epoch(),
386        }
387    }
388}
389
390#[derive(Clone, Debug)]
391pub enum SubscriberType {
392    Subscription(u64),
393    SnapshotBackfill,
394}
395
396#[derive(Debug)]
397pub(super) enum CreateStreamingJobStatus {
398    Init,
399    Creating { tracker: CreateMviewProgressTracker },
400    Created,
401}
402
403#[derive(Debug)]
404pub(super) struct InflightStreamingJobInfo {
405    pub job_id: JobId,
406    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
407    pub subscribers: HashMap<SubscriberId, SubscriberType>,
408    pub status: CreateStreamingJobStatus,
409    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
410}
411
412impl InflightStreamingJobInfo {
413    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
414        self.fragment_infos.values()
415    }
416
417    pub fn snapshot_backfill_actor_ids(
418        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
419    ) -> impl Iterator<Item = ActorId> + '_ {
420        fragment_infos
421            .values()
422            .filter(|fragment| {
423                fragment
424                    .fragment_type_mask
425                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
426            })
427            .flat_map(|fragment| fragment.actors.keys().copied())
428    }
429
430    pub fn tracking_progress_actor_ids(
431        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
432    ) -> Vec<(ActorId, BackfillUpstreamType)> {
433        StreamJobFragments::tracking_progress_actor_ids_impl(
434            fragment_infos
435                .values()
436                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
437        )
438    }
439}
440
441impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
442    type Item = &'a InflightFragmentInfo;
443
444    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
445
446    fn into_iter(self) -> Self::IntoIter {
447        self.fragment_infos()
448    }
449}
450
451#[derive(Debug)]
452pub struct InflightDatabaseInfo {
453    pub(super) database_id: DatabaseId,
454    jobs: HashMap<JobId, InflightStreamingJobInfo>,
455    fragment_location: HashMap<FragmentId, JobId>,
456    pub(super) shared_actor_infos: SharedActorInfos,
457}
458
459impl InflightDatabaseInfo {
460    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
461        self.jobs.values().flat_map(|job| job.fragment_infos())
462    }
463
464    pub fn contains_job(&self, job_id: JobId) -> bool {
465        self.jobs.contains_key(&job_id)
466    }
467
468    pub(super) fn job_id_by_fragment(&self, fragment_id: FragmentId) -> Option<JobId> {
469        self.fragment_location.get(&fragment_id).copied()
470    }
471
472    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
473        let job_id = self.fragment_location[&fragment_id];
474        self.jobs
475            .get(&job_id)
476            .expect("should exist")
477            .fragment_infos
478            .get(&fragment_id)
479            .expect("should exist")
480    }
481
482    pub(super) fn backfill_fragment_ids_for_job(
483        &self,
484        job_id: JobId,
485    ) -> MetaResult<HashSet<FragmentId>> {
486        let job = self
487            .jobs
488            .get(&job_id)
489            .ok_or_else(|| MetaError::invalid_parameter(format!("job {} not found", job_id)))?;
490        Ok(job
491            .fragment_infos
492            .iter()
493            .filter_map(|(fragment_id, fragment)| {
494                fragment
495                    .fragment_type_mask
496                    .contains_any([
497                        FragmentTypeFlag::StreamScan,
498                        FragmentTypeFlag::SourceScan,
499                        FragmentTypeFlag::LocalityProvider,
500                    ])
501                    .then_some(*fragment_id)
502            })
503            .collect())
504    }
505
506    pub(super) fn is_backfill_fragment(&self, fragment_id: FragmentId) -> MetaResult<bool> {
507        let job_id = self.fragment_location.get(&fragment_id).ok_or_else(|| {
508            MetaError::invalid_parameter(format!("fragment {} not found", fragment_id))
509        })?;
510        let fragment = self
511            .jobs
512            .get(job_id)
513            .expect("should exist")
514            .fragment_infos
515            .get(&fragment_id)
516            .expect("should exist");
517        Ok(fragment.fragment_type_mask.contains_any([
518            FragmentTypeFlag::StreamScan,
519            FragmentTypeFlag::SourceScan,
520            FragmentTypeFlag::LocalityProvider,
521        ]))
522    }
523
524    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
525        self.jobs
526            .iter()
527            .filter_map(|(job_id, job)| match &job.status {
528                CreateStreamingJobStatus::Init => None,
529                CreateStreamingJobStatus::Creating { tracker } => {
530                    let progress = tracker.gen_backfill_progress();
531                    Some((
532                        *job_id,
533                        BackfillProgress {
534                            progress,
535                            backfill_type: PbBackfillType::NormalBackfill,
536                        },
537                    ))
538                }
539                CreateStreamingJobStatus::Created => None,
540            })
541    }
542
543    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
544        self.jobs.iter().filter_map(|(job_id, job)| {
545            job.cdc_table_backfill_tracker
546                .as_ref()
547                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
548        })
549    }
550
551    pub fn gen_fragment_backfill_progress(&self) -> Vec<FragmentBackfillProgress> {
552        let mut result = Vec::new();
553        for job in self.jobs.values() {
554            let CreateStreamingJobStatus::Creating { tracker } = &job.status else {
555                continue;
556            };
557            let fragment_progress = tracker.collect_fragment_progress(&job.fragment_infos, true);
558            result.extend(fragment_progress);
559        }
560        result
561    }
562
563    pub(super) fn may_assign_fragment_cdc_backfill_splits(
564        &mut self,
565        fragment_id: FragmentId,
566    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
567        let job_id = self.fragment_location[&fragment_id];
568        let job = self.jobs.get_mut(&job_id).expect("should exist");
569        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
570            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
571            if cdc_scan_fragment_id != fragment_id {
572                return Ok(None);
573            }
574            let actors = job.fragment_infos[&cdc_scan_fragment_id]
575                .actors
576                .keys()
577                .copied()
578                .collect();
579            tracker.reassign_splits(actors).map(Some)
580        } else {
581            Ok(None)
582        }
583    }
584
585    pub(super) fn assign_cdc_backfill_splits(
586        &mut self,
587        job_id: JobId,
588    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
589        let job = self.jobs.get_mut(&job_id).expect("should exist");
590        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
591            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
592            let actors = job.fragment_infos[&cdc_scan_fragment_id]
593                .actors
594                .keys()
595                .copied()
596                .collect();
597            tracker.reassign_splits(actors).map(Some)
598        } else {
599            Ok(None)
600        }
601    }
602
603    pub(super) fn apply_collected_command(
604        &mut self,
605        command: &PostCollectCommand,
606        resps: &HashMap<WorkerId, BarrierCompleteResponse>,
607        version_stats: &HummockVersionStats,
608    ) {
609        if let PostCollectCommand::CreateStreamingJob { info, job_type, .. } = command {
610            match job_type {
611                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
612                    let job_id = info.streaming_job.id();
613                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
614                        let CreateStreamingJobStatus::Init = replace(
615                            &mut job_info.status,
616                            CreateStreamingJobStatus::Creating {
617                                tracker: CreateMviewProgressTracker::new(
618                                    info,
619                                    version_stats,
620                                    &job_info.fragment_infos,
621                                ),
622                            },
623                        ) else {
624                            unreachable!("should be init before collect the first barrier")
625                        };
626                    } else {
627                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
628                    }
629                }
630                CreateStreamingJobType::SnapshotBackfill(_)
631                | CreateStreamingJobType::BatchRefresh(_) => {
632                    // The progress of SnapshotBackfill/BatchRefresh won't be tracked here
633                }
634            }
635        }
636        if let PostCollectCommand::Reschedule { reschedules, .. } = command {
637            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
638            debug_assert!(
639                reschedules
640                    .values()
641                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
642                "Reschedule should not carry vnode bitmap updates when actors are rebuilt"
643            );
644
645            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
646            let related_job_ids = reschedules
647                .keys()
648                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
649                .cloned()
650                .collect::<HashSet<_>>();
651            for job_id in related_job_ids {
652                if let Some(job) = self.jobs.get_mut(&job_id)
653                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
654                {
655                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
656                }
657            }
658        }
659        for progress in resps.values().flat_map(|resp| &resp.create_mview_progress) {
660            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
661                warn!(
662                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
663                );
664                continue;
665            };
666            let tracker = match &mut self.jobs.get_mut(job_id).expect("should exist").status {
667                CreateStreamingJobStatus::Init => {
668                    continue;
669                }
670                CreateStreamingJobStatus::Creating { tracker, .. } => tracker,
671                CreateStreamingJobStatus::Created => {
672                    if !progress.done {
673                        warn!("update the progress of an created streaming job: {progress:?}");
674                    }
675                    continue;
676                }
677            };
678            tracker.apply_progress(progress, version_stats);
679        }
680        for progress in resps
681            .values()
682            .flat_map(|resp| &resp.cdc_table_backfill_progress)
683        {
684            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
685                warn!(
686                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
687                );
688                continue;
689            };
690            let Some(tracker) = &mut self
691                .jobs
692                .get_mut(job_id)
693                .expect("should exist")
694                .cdc_table_backfill_tracker
695            else {
696                warn!("update the cdc progress of an created streaming job: {progress:?}");
697                continue;
698            };
699            tracker.update_split_progress(progress);
700        }
701        // Handle CDC source offset updated events
702        for cdc_offset_updated in resps
703            .values()
704            .flat_map(|resp| &resp.cdc_source_offset_updated)
705        {
706            let source_id = cdc_offset_updated.source_id;
707            let job_id = source_id.as_share_source_job_id();
708            if let Some(job) = self.jobs.get_mut(&job_id) {
709                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
710                    tracker.mark_cdc_source_finished();
711                }
712            } else {
713                warn!(
714                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
715                    cdc_offset_updated.source_id, job_id
716                );
717            }
718        }
719    }
720
721    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
722        self.jobs.values().filter_map(|job| match &job.status {
723            CreateStreamingJobStatus::Init => None,
724            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
725            CreateStreamingJobStatus::Created => None,
726        })
727    }
728
729    fn iter_mut_creating_job_tracker(
730        &mut self,
731    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
732        self.jobs
733            .values_mut()
734            .filter_map(|job| match &mut job.status {
735                CreateStreamingJobStatus::Init => None,
736                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
737                CreateStreamingJobStatus::Created => None,
738            })
739    }
740
741    pub(super) fn has_pending_finished_jobs(&self) -> bool {
742        self.iter_creating_job_tracker()
743            .any(|tracker| tracker.is_finished())
744    }
745
746    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
747        self.iter_mut_creating_job_tracker()
748            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
749            .collect()
750    }
751
752    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
753        let mut finished_jobs = vec![];
754        let mut table_ids_to_truncate = vec![];
755        let mut finished_cdc_table_backfill = vec![];
756        for (job_id, job) in &mut self.jobs {
757            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
758                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
759                table_ids_to_truncate.extend(truncate_table_ids);
760                if is_finished {
761                    let CreateStreamingJobStatus::Creating { tracker, .. } =
762                        replace(&mut job.status, CreateStreamingJobStatus::Created)
763                    else {
764                        unreachable!()
765                    };
766                    finished_jobs.push(tracker.into_tracking_job());
767                }
768            }
769            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
770                && tracker.take_pre_completed()
771            {
772                finished_cdc_table_backfill.push(*job_id);
773            }
774        }
775        StagingCommitInfo {
776            finished_jobs,
777            table_ids_to_truncate,
778            finished_cdc_table_backfill,
779        }
780    }
781
782    pub fn fragment_subscribers(
783        &self,
784        fragment_id: FragmentId,
785    ) -> impl Iterator<Item = SubscriberId> + '_ {
786        let job_id = self.fragment_location[&fragment_id];
787        self.jobs[&job_id].subscribers.keys().copied()
788    }
789
790    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
791        self.jobs[&job_id].subscribers.keys().copied()
792    }
793
794    pub fn max_subscription_retention(&self) -> impl Iterator<Item = (TableId, u64)> + '_ {
795        self.jobs.iter().filter_map(|(job_id, info)| {
796            info.subscribers
797                .values()
798                .filter_map(|subscriber| match subscriber {
799                    SubscriberType::Subscription(retention) => Some(*retention),
800                    SubscriberType::SnapshotBackfill => None,
801                })
802                .max()
803                .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
804        })
805    }
806
807    pub fn register_subscriber(
808        &mut self,
809        job_id: JobId,
810        subscriber_id: SubscriberId,
811        subscriber: SubscriberType,
812    ) {
813        self.jobs
814            .get_mut(&job_id)
815            .expect("should exist")
816            .subscribers
817            .try_insert(subscriber_id, subscriber)
818            .expect("non duplicate");
819    }
820
821    pub fn unregister_subscriber(
822        &mut self,
823        job_id: JobId,
824        subscriber_id: SubscriberId,
825    ) -> Option<SubscriberType> {
826        self.jobs
827            .get_mut(&job_id)
828            .expect("should exist")
829            .subscribers
830            .remove(&subscriber_id)
831    }
832
833    pub fn update_subscription_retention(
834        &mut self,
835        job_id: JobId,
836        subscriber_id: SubscriberId,
837        retention_second: u64,
838    ) {
839        let job = self.jobs.get_mut(&job_id).expect("should exist");
840        match job.subscribers.get_mut(&subscriber_id) {
841            Some(SubscriberType::Subscription(current_retention)) => {
842                *current_retention = retention_second;
843            }
844            Some(SubscriberType::SnapshotBackfill) => {
845                warn!(
846                    %job_id,
847                    %subscriber_id,
848                    "cannot update retention for snapshot backfill subscriber"
849                );
850            }
851            None => {
852                warn!(%job_id, %subscriber_id, "subscription subscriber not found");
853            }
854        }
855    }
856
857    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
858        let job_id = self.fragment_location[&fragment_id];
859        let fragment = self
860            .jobs
861            .get_mut(&job_id)
862            .expect("should exist")
863            .fragment_infos
864            .get_mut(&fragment_id)
865            .expect("should exist");
866        (fragment, job_id)
867    }
868
869    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
870        Self {
871            database_id,
872            jobs: Default::default(),
873            fragment_location: Default::default(),
874            shared_actor_infos,
875        }
876    }
877
878    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
879        // remove the database because it's empty.
880        shared_actor_infos.remove_database(database_id);
881        Self::empty_inner(database_id, shared_actor_infos)
882    }
883
884    pub fn recover(
885        database_id: DatabaseId,
886        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
887        shared_actor_infos: SharedActorInfos,
888    ) -> Self {
889        let mut info = Self::empty_inner(database_id, shared_actor_infos);
890        for job in jobs {
891            info.add_existing(job);
892        }
893        info
894    }
895
896    pub fn is_empty(&self) -> bool {
897        self.jobs.is_empty()
898    }
899
900    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
901        let InflightStreamingJobInfo {
902            job_id,
903            fragment_infos,
904            subscribers,
905            status,
906            cdc_table_backfill_tracker,
907        } = job;
908        self.jobs
909            .try_insert(
910                job_id,
911                InflightStreamingJobInfo {
912                    job_id,
913                    subscribers,
914                    fragment_infos: Default::default(), // fill in later in pre_apply_new_fragments
915                    status,
916                    cdc_table_backfill_tracker,
917                },
918            )
919            .expect("non-duplicate");
920        self.pre_apply_new_fragments(
921            fragment_infos
922                .into_iter()
923                .map(|(fragment_id, info)| (fragment_id, job_id, info)),
924        );
925    }
926
927    /// Register a new streaming job entry (with empty `fragment_infos`).
928    pub(crate) fn pre_apply_new_job(
929        &mut self,
930        job_id: JobId,
931        cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
932    ) {
933        {
934            self.jobs
935                .try_insert(
936                    job_id,
937                    InflightStreamingJobInfo {
938                        job_id,
939                        fragment_infos: Default::default(),
940                        subscribers: Default::default(), // no subscriber for newly create job
941                        status: CreateStreamingJobStatus::Init,
942                        cdc_table_backfill_tracker,
943                    },
944                )
945                .expect("non-duplicate");
946        }
947    }
948
949    /// Add new fragment infos and update shared actor infos.
950    pub(crate) fn pre_apply_new_fragments(
951        &mut self,
952        fragments: impl IntoIterator<Item = (FragmentId, JobId, InflightFragmentInfo)>,
953    ) {
954        {
955            let shared_infos = self.shared_actor_infos.clone();
956            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
957            for (fragment_id, job_id, info) in fragments {
958                {
959                    {
960                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
961                        shared_actor_writer.upsert([(&info, job_id)]);
962                        fragment_infos
963                            .fragment_infos
964                            .try_insert(fragment_id, info)
965                            .expect("non duplicate");
966                        self.fragment_location
967                            .try_insert(fragment_id, job_id)
968                            .expect("non duplicate");
969                    }
970                }
971            }
972            shared_actor_writer.finish();
973        }
974    }
975
976    /// Pre-apply reschedule: update actors, vnode bitmaps, and splits.
977    /// The actual removal of old actors happens in `post_apply_reschedules`.
978    pub(crate) fn pre_apply_reschedule(
979        &mut self,
980        fragment_id: FragmentId,
981        new_actors: HashMap<ActorId, InflightActorInfo>,
982        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
983        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
984    ) {
985        {
986            {
987                {
988                    {
989                        let (info, _) = self.fragment_mut(fragment_id);
990                        let actors = &mut info.actors;
991                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
992                            actors
993                                .get_mut(&actor_id)
994                                .expect("should exist")
995                                .vnode_bitmap = Some(new_vnodes);
996                        }
997                        for (actor_id, actor) in new_actors {
998                            actors
999                                .try_insert(actor_id as _, actor)
1000                                .expect("non-duplicate");
1001                        }
1002                        for (actor_id, splits) in actor_splits {
1003                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1004                        }
1005                        // info will be upserted into shared_actor_infos in post_apply stage
1006                    }
1007                }
1008            }
1009        }
1010    }
1011
1012    /// Replace upstream fragment IDs in merge nodes of a fragment's stream graph.
1013    pub(crate) fn pre_apply_replace_node_upstream(
1014        &mut self,
1015        fragment_id: FragmentId,
1016        replace_map: &HashMap<FragmentId, FragmentId>,
1017    ) {
1018        {
1019            {
1020                {
1021                    {
1022                        let mut remaining_fragment_ids: HashSet<_> =
1023                            replace_map.keys().cloned().collect();
1024                        let (info, _) = self.fragment_mut(fragment_id);
1025                        visit_stream_node_mut(&mut info.nodes, |node| {
1026                            if let NodeBody::Merge(m) = node
1027                                && let Some(new_upstream_fragment_id) =
1028                                    replace_map.get(&m.upstream_fragment_id)
1029                            {
1030                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
1031                                    if cfg!(debug_assertions) {
1032                                        panic!(
1033                                            "duplicate upstream fragment: {:?} {:?}",
1034                                            m, replace_map
1035                                        );
1036                                    } else {
1037                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
1038                                    }
1039                                }
1040                                m.upstream_fragment_id = *new_upstream_fragment_id;
1041                            }
1042                        });
1043                        if cfg!(debug_assertions) {
1044                            assert!(
1045                                remaining_fragment_ids.is_empty(),
1046                                "non-existing fragment to replace: {:?} {:?} {:?}",
1047                                remaining_fragment_ids,
1048                                info.nodes,
1049                                replace_map
1050                            );
1051                        } else {
1052                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
1053                        }
1054                    }
1055                }
1056            }
1057        }
1058    }
1059
1060    /// Add a new upstream sink node to a fragment's `UpstreamSinkUnion`.
1061    pub(crate) fn pre_apply_add_node_upstream(
1062        &mut self,
1063        fragment_id: FragmentId,
1064        new_upstream_info: &PbUpstreamSinkInfo,
1065    ) {
1066        {
1067            {
1068                {
1069                    {
1070                        let (info, _) = self.fragment_mut(fragment_id);
1071                        let mut injected = false;
1072                        visit_stream_node_mut(&mut info.nodes, |node| {
1073                            if let NodeBody::UpstreamSinkUnion(u) = node {
1074                                if cfg!(debug_assertions) {
1075                                    let current_upstream_fragment_ids = u
1076                                        .init_upstreams
1077                                        .iter()
1078                                        .map(|upstream| upstream.upstream_fragment_id)
1079                                        .collect::<HashSet<_>>();
1080                                    if current_upstream_fragment_ids
1081                                        .contains(&new_upstream_info.upstream_fragment_id)
1082                                    {
1083                                        panic!(
1084                                            "duplicate upstream fragment: {:?} {:?}",
1085                                            u, new_upstream_info
1086                                        );
1087                                    }
1088                                }
1089                                u.init_upstreams.push(new_upstream_info.clone());
1090                                injected = true;
1091                            }
1092                        });
1093                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
1094                    }
1095                }
1096            }
1097        }
1098    }
1099
1100    /// Remove upstream sink nodes from a fragment's `UpstreamSinkUnion`.
1101    pub(crate) fn pre_apply_drop_node_upstream(
1102        &mut self,
1103        fragment_id: FragmentId,
1104        drop_upstream_fragment_ids: &[FragmentId],
1105    ) {
1106        if !self.fragment_location.contains_key(&fragment_id) {
1107            warn!(
1108                target_fragment_id = %fragment_id,
1109                drop_upstream_fragment_ids = ?drop_upstream_fragment_ids,
1110                "skip dropping upstream sink fragments for non-existing target fragment"
1111            );
1112            return;
1113        }
1114        {
1115            {
1116                {
1117                    {
1118                        let (info, _) = self.fragment_mut(fragment_id);
1119                        let mut removed = false;
1120                        visit_stream_node_mut(&mut info.nodes, |node| {
1121                            if let NodeBody::UpstreamSinkUnion(u) = node {
1122                                if cfg!(debug_assertions) {
1123                                    let current_upstream_fragment_ids = u
1124                                        .init_upstreams
1125                                        .iter()
1126                                        .map(|upstream| upstream.upstream_fragment_id)
1127                                        .collect::<HashSet<FragmentId>>();
1128                                    for drop_fragment_id in drop_upstream_fragment_ids {
1129                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1130                                        {
1131                                            panic!(
1132                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1133                                                u, drop_upstream_fragment_ids, drop_fragment_id
1134                                            );
1135                                        }
1136                                    }
1137                                }
1138                                u.init_upstreams.retain(|upstream| {
1139                                    !drop_upstream_fragment_ids
1140                                        .contains(&upstream.upstream_fragment_id)
1141                                });
1142                                removed = true;
1143                            }
1144                        });
1145                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1146                    }
1147                }
1148            }
1149        }
1150    }
1151
1152    /// Sync inflight `nodes.rate_limit` so a later reschedule won't materialize new actors
1153    /// from stale data. Mirrors `controller/streaming_job.rs::update_*_rate_limit_by_*`.
1154    pub(crate) fn pre_apply_throttle(&mut self, fragment_id: FragmentId, config: &ThrottleConfig) {
1155        // Snapshot-backfill creating jobs live outside main inflight; their throttles
1156        // flow through `on_new_upstream_barrier`.
1157        if !self.fragment_location.contains_key(&fragment_id) {
1158            return;
1159        }
1160        let throttle_type = config.throttle_type();
1161        let rate_limit = config.rate_limit;
1162        let (info, _) = self.fragment_mut(fragment_id);
1163
1164        visit_stream_node_mut(&mut info.nodes, |node| match throttle_type {
1165            ThrottleType::Source => {
1166                if let NodeBody::Source(node) = node
1167                    && let Some(node_inner) = &mut node.source_inner
1168                {
1169                    node_inner.rate_limit = rate_limit;
1170                }
1171                if let NodeBody::StreamFsFetch(node) = node
1172                    && let Some(node_inner) = &mut node.node_inner
1173                {
1174                    node_inner.rate_limit = rate_limit;
1175                }
1176            }
1177            ThrottleType::Backfill => match node {
1178                NodeBody::StreamCdcScan(node) => node.rate_limit = rate_limit,
1179                NodeBody::StreamScan(node) => node.rate_limit = rate_limit,
1180                NodeBody::SourceBackfill(node) => node.rate_limit = rate_limit,
1181                _ => {}
1182            },
1183            ThrottleType::Sink => {
1184                if let NodeBody::Sink(node) = node {
1185                    node.rate_limit = rate_limit;
1186                }
1187            }
1188            ThrottleType::Dml => {
1189                if let NodeBody::Dml(node) = node {
1190                    node.rate_limit = rate_limit;
1191                }
1192            }
1193            ThrottleType::Unspecified => {}
1194        });
1195    }
1196
1197    /// Update split assignments for actors in fragments.
1198    pub(crate) fn pre_apply_split_assignments(
1199        &mut self,
1200        assignments: impl IntoIterator<Item = (FragmentId, HashMap<ActorId, Vec<SplitImpl>>)>,
1201    ) {
1202        {
1203            let shared_infos = self.shared_actor_infos.clone();
1204            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
1205            {
1206                {
1207                    for (fragment_id, actor_splits) in assignments {
1208                        let (info, job_id) = self.fragment_mut(fragment_id);
1209                        let actors = &mut info.actors;
1210                        for (actor_id, splits) in actor_splits {
1211                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1212                        }
1213                        shared_actor_writer.upsert([(&*info, job_id)]);
1214                    }
1215                }
1216            }
1217            shared_actor_writer.finish();
1218        }
1219    }
1220
1221    pub(super) fn build_edge(
1222        &self,
1223        info: Option<(&CreateStreamingJobCommandInfo, bool)>,
1224        replace_job: Option<&ReplaceStreamJobPlan>,
1225        new_upstream_sink: Option<&UpstreamSinkInfo>,
1226        control_stream_manager: &ControlStreamManager,
1227        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
1228        actor_location: &HashMap<ActorId, WorkerId>,
1229    ) -> FragmentEdgeBuildResult {
1230        // `existing_fragment_ids` consists of
1231        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1232        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1233        // if the upstream fragment previously exists
1234        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1235        // if creating a new sink-into-table
1236        //  - should contain the `fragment_id` of the downstream table.
1237        let existing_fragment_ids = info
1238            .into_iter()
1239            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1240            .chain(replace_job.into_iter().flat_map(|replace_job| {
1241                replace_job
1242                    .upstream_fragment_downstreams
1243                    .keys()
1244                    .filter(|fragment_id| {
1245                        info.map(|(info, _)| {
1246                            !info
1247                                .stream_job_fragments
1248                                .fragments
1249                                .contains_key(*fragment_id)
1250                        })
1251                        .unwrap_or(true)
1252                    })
1253                    .chain(replace_job.replace_upstream.keys())
1254            }))
1255            .chain(
1256                new_upstream_sink
1257                    .into_iter()
1258                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1259            )
1260            .cloned();
1261        // Collect new fragments with their partial graph IDs
1262        let new_fragments = info
1263            .into_iter()
1264            .flat_map(|(info, is_snapshot_backfill)| {
1265                let partial_graph_id = to_partial_graph_id(
1266                    self.database_id,
1267                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1268                );
1269                info.stream_job_fragments
1270                    .fragments
1271                    .values()
1272                    .map(move |fragment| (partial_graph_id, fragment))
1273            })
1274            .chain(replace_job.into_iter().flat_map(|replace_job| {
1275                replace_job
1276                    .new_fragments
1277                    .fragments
1278                    .values()
1279                    .chain(
1280                        replace_job
1281                            .auto_refresh_schema_sinks
1282                            .as_ref()
1283                            .into_iter()
1284                            .flat_map(move |sinks| sinks.iter().map(|sink| &sink.new_fragment)),
1285                    )
1286                    .map(|fragment| {
1287                        (
1288                            // we assume that replace job only happens in database partial graph
1289                            to_partial_graph_id(self.database_id, None),
1290                            fragment,
1291                        )
1292                    })
1293            }));
1294
1295        let mut builder = FragmentEdgeBuilder::new(
1296            // Existing fragments
1297            existing_fragment_ids
1298                .map(|fragment_id| {
1299                    (
1300                        fragment_id,
1301                        EdgeBuilderFragmentInfo::from_inflight(
1302                            self.fragment(fragment_id),
1303                            to_partial_graph_id(self.database_id, None),
1304                            control_stream_manager,
1305                        ),
1306                    )
1307                })
1308                // New fragments from create/replace jobs
1309                .chain(new_fragments.map(|(partial_graph_id, fragment)| {
1310                    (
1311                        fragment.fragment_id,
1312                        EdgeBuilderFragmentInfo::from_fragment(
1313                            fragment,
1314                            stream_actors,
1315                            actor_location,
1316                            partial_graph_id,
1317                            control_stream_manager,
1318                        ),
1319                    )
1320                })),
1321        );
1322        if let Some((info, _)) = info {
1323            builder.add_relations(&info.upstream_fragment_downstreams);
1324            builder.add_relations(&info.stream_job_fragments.downstreams);
1325        }
1326        if let Some(replace_job) = replace_job {
1327            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1328            builder.add_relations(&replace_job.new_fragments.downstreams);
1329        }
1330        if let Some(new_upstream_sink) = new_upstream_sink {
1331            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1332            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1333            builder.add_edge(sink_fragment_id, new_sink_downstream);
1334        }
1335        if let Some(replace_job) = replace_job {
1336            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1337                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1338                    fragment_replacement
1339                {
1340                    builder.replace_upstream(
1341                        *fragment_id,
1342                        *original_upstream_fragment_id,
1343                        *new_upstream_fragment_id,
1344                    );
1345                }
1346            }
1347        }
1348        builder.build()
1349    }
1350
1351    /// Post-apply reschedule: remove actors that were marked for removal.
1352    pub(crate) fn post_apply_reschedules(
1353        &mut self,
1354        reschedules: impl IntoIterator<Item = (FragmentId, HashSet<ActorId>)>,
1355    ) {
1356        let inner = self.shared_actor_infos.clone();
1357        let mut shared_actor_writer = inner.start_writer(self.database_id);
1358        {
1359            {
1360                {
1361                    for (fragment_id, to_remove) in reschedules {
1362                        let job_id = self.fragment_location[&fragment_id];
1363                        let info = self
1364                            .jobs
1365                            .get_mut(&job_id)
1366                            .expect("should exist")
1367                            .fragment_infos
1368                            .get_mut(&fragment_id)
1369                            .expect("should exist");
1370                        for actor_id in to_remove {
1371                            assert!(info.actors.remove(&actor_id).is_some());
1372                        }
1373                        shared_actor_writer.upsert([(&*info, job_id)]);
1374                    }
1375                }
1376            }
1377        }
1378        shared_actor_writer.finish();
1379    }
1380
1381    /// Post-apply fragment removal: remove fragments and their jobs if empty.
1382    pub(crate) fn post_apply_remove_fragments(
1383        &mut self,
1384        fragment_ids: impl IntoIterator<Item = FragmentId>,
1385    ) {
1386        let inner = self.shared_actor_infos.clone();
1387        let mut shared_actor_writer = inner.start_writer(self.database_id);
1388        {
1389            {
1390                {
1391                    for fragment_id in fragment_ids {
1392                        let job_id = self
1393                            .fragment_location
1394                            .remove(&fragment_id)
1395                            .expect("should exist");
1396                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1397                        let fragment = job
1398                            .fragment_infos
1399                            .remove(&fragment_id)
1400                            .expect("should exist");
1401                        shared_actor_writer.remove(&fragment);
1402                        if job.fragment_infos.is_empty() {
1403                            self.jobs.remove(&job_id).expect("should exist");
1404                        }
1405                    }
1406                }
1407            }
1408        }
1409        shared_actor_writer.finish();
1410    }
1411
1412    pub(crate) fn post_apply_remove_job(
1413        &mut self,
1414        job_id: JobId,
1415    ) -> Option<InflightStreamingJobInfo> {
1416        let job = self.jobs.remove(&job_id)?;
1417        let inner = self.shared_actor_infos.clone();
1418        let mut shared_actor_writer = inner.start_writer(self.database_id);
1419        for (fragment_id, fragment) in &job.fragment_infos {
1420            self.fragment_location
1421                .remove(fragment_id)
1422                .expect("should exist");
1423            shared_actor_writer.remove(fragment);
1424        }
1425        shared_actor_writer.finish();
1426        Some(job)
1427    }
1428}
1429
1430impl InflightFragmentInfo {
1431    /// Returns actor list to collect in the target worker node.
1432    pub(crate) fn actor_ids_to_collect(
1433        infos: impl IntoIterator<Item = &Self>,
1434    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1435        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1436        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1437            assert!(
1438                ret.entry(actor.worker_id)
1439                    .or_default()
1440                    .insert(*actor_id as _)
1441            )
1442        }
1443        ret
1444    }
1445
1446    pub fn existing_table_ids<'a>(
1447        infos: impl IntoIterator<Item = &'a Self> + 'a,
1448    ) -> impl Iterator<Item = TableId> + 'a {
1449        infos
1450            .into_iter()
1451            .flat_map(|info| info.state_table_ids.iter().cloned())
1452    }
1453
1454    pub fn workers<'a>(
1455        infos: impl IntoIterator<Item = &'a Self> + 'a,
1456    ) -> impl Iterator<Item = WorkerId> + 'a {
1457        infos
1458            .into_iter()
1459            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1460    }
1461
1462    pub fn contains_worker<'a>(
1463        infos: impl IntoIterator<Item = &'a Self> + 'a,
1464        worker_id: WorkerId,
1465    ) -> bool {
1466        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1467    }
1468}
1469
1470impl InflightDatabaseInfo {
1471    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1472        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1473    }
1474
1475    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1476        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1477    }
1478}