risingwave_meta/barrier/
info.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::PbBackfillType;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::source::PbCdcTableSnapshotSplits;
36use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
37use risingwave_pb::stream_plan::stream_node::NodeBody;
38use risingwave_pb::stream_service::BarrierCompleteResponse;
39use tracing::{info, warn};
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::{CdcProgress, CdcTableBackfillTracker};
43use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
44use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
45use crate::barrier::rpc::{ControlStreamManager, to_partial_graph_id};
46use crate::barrier::{BackfillProgress, BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
47use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
48use crate::controller::utils::rebuild_fragment_mapping;
49use crate::manager::NotificationManagerRef;
50use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
51
52#[derive(Debug, Clone)]
53pub struct SharedActorInfo {
54    pub worker_id: WorkerId,
55    pub vnode_bitmap: Option<Bitmap>,
56    pub splits: Vec<SplitImpl>,
57}
58
59impl From<&InflightActorInfo> for SharedActorInfo {
60    fn from(value: &InflightActorInfo) -> Self {
61        Self {
62            worker_id: value.worker_id,
63            vnode_bitmap: value.vnode_bitmap.clone(),
64            splits: value.splits.clone(),
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct SharedFragmentInfo {
71    pub fragment_id: FragmentId,
72    pub job_id: JobId,
73    pub distribution_type: DistributionType,
74    pub actors: HashMap<ActorId, SharedActorInfo>,
75    pub vnode_count: usize,
76    pub fragment_type_mask: FragmentTypeMask,
77}
78
79impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
80    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
81        let (info, job_id) = pair;
82
83        let InflightFragmentInfo {
84            fragment_id,
85            distribution_type,
86            fragment_type_mask,
87            actors,
88            vnode_count,
89            ..
90        } = info;
91
92        Self {
93            fragment_id: *fragment_id,
94            job_id,
95            distribution_type: *distribution_type,
96            fragment_type_mask: *fragment_type_mask,
97            actors: actors
98                .iter()
99                .map(|(actor_id, actor)| (*actor_id, actor.into()))
100                .collect(),
101            vnode_count: *vnode_count,
102        }
103    }
104}
105
106#[derive(Default, Debug)]
107pub struct SharedActorInfosInner {
108    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
109}
110
111impl SharedActorInfosInner {
112    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
113        self.info
114            .values()
115            .find_map(|database| database.get(&fragment_id))
116    }
117
118    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
119        self.info.values().flatten()
120    }
121}
122
123#[derive(Clone, educe::Educe)]
124#[educe(Debug)]
125pub struct SharedActorInfos {
126    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
127    #[educe(Debug(ignore))]
128    notification_manager: NotificationManagerRef,
129}
130
131impl SharedActorInfos {
132    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
133        self.inner.read()
134    }
135
136    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
137        let core = self.inner.read();
138        core.iter_over_fragments()
139            .flat_map(|(_, fragment)| {
140                fragment
141                    .actors
142                    .iter()
143                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
144            })
145            .collect()
146    }
147
148    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
149    ///
150    /// Very occasionally split removal may happen during scaling, in which case we need to
151    /// use the old splits for reallocation instead of the latest splits (which may be missing),
152    /// so that we can resolve the split removal in the next command.
153    pub fn migrate_splits_for_source_actors(
154        &self,
155        fragment_id: FragmentId,
156        prev_actor_ids: &[ActorId],
157        curr_actor_ids: &[ActorId],
158    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
159        let guard = self.read_guard();
160
161        let prev_splits = prev_actor_ids
162            .iter()
163            .flat_map(|actor_id| {
164                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
165                guard
166                    .get_fragment(fragment_id)
167                    .and_then(|info| info.actors.get(actor_id))
168                    .map(|actor| actor.splits.clone())
169                    .unwrap_or_default()
170            })
171            .map(|split| (split.id(), split))
172            .collect();
173
174        let empty_actor_splits = curr_actor_ids
175            .iter()
176            .map(|actor_id| (*actor_id, vec![]))
177            .collect();
178
179        let diff = crate::stream::source_manager::reassign_splits(
180            fragment_id,
181            empty_actor_splits,
182            &prev_splits,
183            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
184            std::default::Default::default(),
185        )
186        .unwrap_or_default();
187
188        Ok(diff)
189    }
190}
191
192impl SharedActorInfos {
193    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
194        Self {
195            inner: Arc::new(Default::default()),
196            notification_manager,
197        }
198    }
199
200    pub(super) fn remove_database(&self, database_id: DatabaseId) {
201        if let Some(database) = self.inner.write().info.remove(&database_id) {
202            let mapping = database
203                .into_values()
204                .map(|fragment| rebuild_fragment_mapping(&fragment))
205                .collect_vec();
206            if !mapping.is_empty() {
207                self.notification_manager
208                    .notify_fragment_mapping(Operation::Delete, mapping);
209            }
210        }
211    }
212
213    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
214        let database_ids: HashSet<_> = database_ids.into_iter().collect();
215
216        let mut mapping = Vec::new();
217        for fragment in self
218            .inner
219            .write()
220            .info
221            .extract_if(|database_id, _| !database_ids.contains(database_id))
222            .flat_map(|(_, fragments)| fragments.into_values())
223        {
224            mapping.push(rebuild_fragment_mapping(&fragment));
225        }
226        if !mapping.is_empty() {
227            self.notification_manager
228                .notify_fragment_mapping(Operation::Delete, mapping);
229        }
230    }
231
232    pub(super) fn recover_database(
233        &self,
234        database_id: DatabaseId,
235        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
236    ) {
237        let mut remaining_fragments: HashMap<_, _> = fragments
238            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
239            .collect();
240        // delete the fragments that exist previously, but not included in the recovered fragments
241        let mut writer = self.start_writer(database_id);
242        let database = writer.write_guard.info.entry(database_id).or_default();
243        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
244            if let Some(info) = remaining_fragments.remove(fragment_id) {
245                let info = info.into();
246                writer
247                    .updated_fragment_mapping
248                    .get_or_insert_default()
249                    .push(rebuild_fragment_mapping(&info));
250                *fragment_infos = info;
251                false
252            } else {
253                true
254            }
255        }) {
256            writer
257                .deleted_fragment_mapping
258                .get_or_insert_default()
259                .push(rebuild_fragment_mapping(&fragment));
260        }
261        for (fragment_id, info) in remaining_fragments {
262            let info = info.into();
263            writer
264                .added_fragment_mapping
265                .get_or_insert_default()
266                .push(rebuild_fragment_mapping(&info));
267            database.insert(fragment_id, info);
268        }
269        writer.finish();
270    }
271
272    pub(super) fn upsert(
273        &self,
274        database_id: DatabaseId,
275        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
276    ) {
277        let mut writer = self.start_writer(database_id);
278        writer.upsert(infos);
279        writer.finish();
280    }
281
282    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
283        SharedActorInfoWriter {
284            database_id,
285            write_guard: self.inner.write(),
286            notification_manager: &self.notification_manager,
287            added_fragment_mapping: None,
288            updated_fragment_mapping: None,
289            deleted_fragment_mapping: None,
290        }
291    }
292}
293
294pub(super) struct SharedActorInfoWriter<'a> {
295    database_id: DatabaseId,
296    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
297    notification_manager: &'a NotificationManagerRef,
298    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
300    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
301}
302
303impl SharedActorInfoWriter<'_> {
304    pub(super) fn upsert(
305        &mut self,
306        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
307    ) {
308        let database = self.write_guard.info.entry(self.database_id).or_default();
309        for info @ (fragment, _) in infos {
310            match database.entry(fragment.fragment_id) {
311                Entry::Occupied(mut entry) => {
312                    let info = info.into();
313                    self.updated_fragment_mapping
314                        .get_or_insert_default()
315                        .push(rebuild_fragment_mapping(&info));
316                    entry.insert(info);
317                }
318                Entry::Vacant(entry) => {
319                    let info = info.into();
320                    self.added_fragment_mapping
321                        .get_or_insert_default()
322                        .push(rebuild_fragment_mapping(&info));
323                    entry.insert(info);
324                }
325            }
326        }
327    }
328
329    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
330        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
331            && let Some(fragment) = database.remove(&info.fragment_id)
332        {
333            self.deleted_fragment_mapping
334                .get_or_insert_default()
335                .push(rebuild_fragment_mapping(&fragment));
336        }
337    }
338
339    pub(super) fn finish(self) {
340        if let Some(mapping) = self.added_fragment_mapping {
341            self.notification_manager
342                .notify_fragment_mapping(Operation::Add, mapping);
343        }
344        if let Some(mapping) = self.updated_fragment_mapping {
345            self.notification_manager
346                .notify_fragment_mapping(Operation::Update, mapping);
347        }
348        if let Some(mapping) = self.deleted_fragment_mapping {
349            self.notification_manager
350                .notify_fragment_mapping(Operation::Delete, mapping);
351        }
352    }
353}
354
355#[derive(Debug, Clone)]
356pub(super) struct BarrierInfo {
357    pub prev_epoch: TracedEpoch,
358    pub curr_epoch: TracedEpoch,
359    pub kind: BarrierKind,
360}
361
362impl BarrierInfo {
363    pub(super) fn prev_epoch(&self) -> u64 {
364        self.prev_epoch.value().0
365    }
366
367    pub(super) fn curr_epoch(&self) -> u64 {
368        self.curr_epoch.value().0
369    }
370}
371
372#[derive(Debug)]
373pub(super) enum CommandFragmentChanges {
374    NewFragment {
375        job_id: JobId,
376        info: InflightFragmentInfo,
377    },
378    AddNodeUpstream(PbUpstreamSinkInfo),
379    DropNodeUpstream(Vec<FragmentId>),
380    ReplaceNodeUpstream(
381        /// old `fragment_id` -> new `fragment_id`
382        HashMap<FragmentId, FragmentId>,
383    ),
384    Reschedule {
385        new_actors: HashMap<ActorId, InflightActorInfo>,
386        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
387        to_remove: HashSet<ActorId>,
388        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
389    },
390    RemoveFragment,
391    SplitAssignment {
392        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
393    },
394}
395
396pub(super) enum PostApplyFragmentChanges {
397    Reschedule { to_remove: HashSet<ActorId> },
398    RemoveFragment,
399}
400
401#[derive(Clone, Debug)]
402pub enum SubscriberType {
403    Subscription(u64),
404    SnapshotBackfill,
405}
406
407#[derive(Debug)]
408pub(super) enum CreateStreamingJobStatus {
409    Init,
410    Creating { tracker: CreateMviewProgressTracker },
411    Created,
412}
413
414#[derive(Debug)]
415pub(super) struct InflightStreamingJobInfo {
416    pub job_id: JobId,
417    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
418    pub subscribers: HashMap<SubscriberId, SubscriberType>,
419    pub status: CreateStreamingJobStatus,
420    pub cdc_table_backfill_tracker: Option<CdcTableBackfillTracker>,
421}
422
423impl InflightStreamingJobInfo {
424    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
425        self.fragment_infos.values()
426    }
427
428    pub fn snapshot_backfill_actor_ids(
429        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
430    ) -> impl Iterator<Item = ActorId> + '_ {
431        fragment_infos
432            .values()
433            .filter(|fragment| {
434                fragment
435                    .fragment_type_mask
436                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
437            })
438            .flat_map(|fragment| fragment.actors.keys().copied())
439    }
440
441    pub fn tracking_progress_actor_ids(
442        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
443    ) -> Vec<(ActorId, BackfillUpstreamType)> {
444        StreamJobFragments::tracking_progress_actor_ids_impl(
445            fragment_infos
446                .values()
447                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
448        )
449    }
450}
451
452impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
453    type Item = &'a InflightFragmentInfo;
454
455    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
456
457    fn into_iter(self) -> Self::IntoIter {
458        self.fragment_infos()
459    }
460}
461
462#[derive(Debug)]
463pub struct InflightDatabaseInfo {
464    pub(super) database_id: DatabaseId,
465    jobs: HashMap<JobId, InflightStreamingJobInfo>,
466    fragment_location: HashMap<FragmentId, JobId>,
467    pub(super) shared_actor_infos: SharedActorInfos,
468}
469
470impl InflightDatabaseInfo {
471    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
472        self.jobs.values().flat_map(|job| job.fragment_infos())
473    }
474
475    pub fn contains_job(&self, job_id: JobId) -> bool {
476        self.jobs.contains_key(&job_id)
477    }
478
479    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
480        let job_id = self.fragment_location[&fragment_id];
481        self.jobs
482            .get(&job_id)
483            .expect("should exist")
484            .fragment_infos
485            .get(&fragment_id)
486            .expect("should exist")
487    }
488
489    pub fn gen_backfill_progress(&self) -> impl Iterator<Item = (JobId, BackfillProgress)> + '_ {
490        self.jobs
491            .iter()
492            .filter_map(|(job_id, job)| match &job.status {
493                CreateStreamingJobStatus::Init => None,
494                CreateStreamingJobStatus::Creating { tracker } => {
495                    let progress = tracker.gen_backfill_progress();
496                    Some((
497                        *job_id,
498                        BackfillProgress {
499                            progress,
500                            backfill_type: PbBackfillType::NormalBackfill,
501                        },
502                    ))
503                }
504                CreateStreamingJobStatus::Created => None,
505            })
506    }
507
508    pub fn gen_cdc_progress(&self) -> impl Iterator<Item = (JobId, CdcProgress)> + '_ {
509        self.jobs.iter().filter_map(|(job_id, job)| {
510            job.cdc_table_backfill_tracker
511                .as_ref()
512                .map(|tracker| (*job_id, tracker.gen_cdc_progress()))
513        })
514    }
515
516    pub(super) fn may_assign_fragment_cdc_backfill_splits(
517        &mut self,
518        fragment_id: FragmentId,
519    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
520        let job_id = self.fragment_location[&fragment_id];
521        let job = self.jobs.get_mut(&job_id).expect("should exist");
522        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
523            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
524            if cdc_scan_fragment_id != fragment_id {
525                return Ok(None);
526            }
527            let actors = job.fragment_infos[&cdc_scan_fragment_id]
528                .actors
529                .keys()
530                .copied()
531                .collect();
532            tracker.reassign_splits(actors).map(Some)
533        } else {
534            Ok(None)
535        }
536    }
537
538    pub(super) fn assign_cdc_backfill_splits(
539        &mut self,
540        job_id: JobId,
541    ) -> MetaResult<Option<HashMap<ActorId, PbCdcTableSnapshotSplits>>> {
542        let job = self.jobs.get_mut(&job_id).expect("should exist");
543        if let Some(tracker) = &mut job.cdc_table_backfill_tracker {
544            let cdc_scan_fragment_id = tracker.cdc_scan_fragment_id();
545            let actors = job.fragment_infos[&cdc_scan_fragment_id]
546                .actors
547                .keys()
548                .copied()
549                .collect();
550            tracker.reassign_splits(actors).map(Some)
551        } else {
552            Ok(None)
553        }
554    }
555
556    pub(super) fn apply_collected_command(
557        &mut self,
558        command: Option<&Command>,
559        resps: &[BarrierCompleteResponse],
560        version_stats: &HummockVersionStats,
561    ) {
562        if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
563            match job_type {
564                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
565                    let job_id = info.streaming_job.id();
566                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
567                        let CreateStreamingJobStatus::Init = replace(
568                            &mut job_info.status,
569                            CreateStreamingJobStatus::Creating {
570                                tracker: CreateMviewProgressTracker::new(info, version_stats),
571                            },
572                        ) else {
573                            unreachable!("should be init before collect the first barrier")
574                        };
575                    } else {
576                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
577                    }
578                }
579                CreateStreamingJobType::SnapshotBackfill(_) => {
580                    // The progress of SnapshotBackfill won't be tracked here
581                }
582            }
583        }
584        if let Some(Command::RescheduleFragment { reschedules, .. }) = command {
585            // During reschedule we expect fragments to be rebuilt with new actors and no vnode bitmap update.
586            debug_assert!(
587                reschedules
588                    .values()
589                    .all(|reschedule| reschedule.vnode_bitmap_updates.is_empty()),
590                "RescheduleFragment should not carry vnode bitmap updates when actors are rebuilt"
591            );
592
593            // Collect jobs that own the rescheduled fragments; de-duplicate via HashSet.
594            let related_job_ids = reschedules
595                .keys()
596                .filter_map(|fragment_id| self.fragment_location.get(fragment_id))
597                .cloned()
598                .collect::<HashSet<_>>();
599            for job_id in related_job_ids {
600                if let Some(job) = self.jobs.get_mut(&job_id)
601                    && let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status
602                {
603                    tracker.refresh_after_reschedule(&job.fragment_infos, version_stats);
604                }
605            }
606        }
607        for progress in resps.iter().flat_map(|resp| &resp.create_mview_progress) {
608            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
609                warn!(
610                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
611                );
612                continue;
613            };
614            let CreateStreamingJobStatus::Creating { tracker, .. } =
615                &mut self.jobs.get_mut(job_id).expect("should exist").status
616            else {
617                warn!("update the progress of an created streaming job: {progress:?}");
618                continue;
619            };
620            tracker.apply_progress(progress, version_stats);
621        }
622        for progress in resps
623            .iter()
624            .flat_map(|resp| &resp.cdc_table_backfill_progress)
625        {
626            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
627                warn!(
628                    "update the cdc progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
629                );
630                continue;
631            };
632            let Some(tracker) = &mut self
633                .jobs
634                .get_mut(job_id)
635                .expect("should exist")
636                .cdc_table_backfill_tracker
637            else {
638                warn!("update the progress of an created streaming job: {progress:?}");
639                continue;
640            };
641            tracker.update_split_progress(progress);
642        }
643        // Handle CDC source offset updated events
644        for cdc_offset_updated in resps
645            .iter()
646            .flat_map(|resp| &resp.cdc_source_offset_updated)
647        {
648            use risingwave_common::id::SourceId;
649            let source_id = SourceId::new(cdc_offset_updated.source_id);
650            let job_id = source_id.as_share_source_job_id();
651            if let Some(job) = self.jobs.get_mut(&job_id) {
652                if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
653                    tracker.mark_cdc_source_finished();
654                }
655            } else {
656                warn!(
657                    "update cdc source offset for non-existent creating streaming job: source_id={}, job_id={}",
658                    cdc_offset_updated.source_id, job_id
659                );
660            }
661        }
662    }
663
664    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
665        self.jobs.values().filter_map(|job| match &job.status {
666            CreateStreamingJobStatus::Init => None,
667            CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
668            CreateStreamingJobStatus::Created => None,
669        })
670    }
671
672    fn iter_mut_creating_job_tracker(
673        &mut self,
674    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
675        self.jobs
676            .values_mut()
677            .filter_map(|job| match &mut job.status {
678                CreateStreamingJobStatus::Init => None,
679                CreateStreamingJobStatus::Creating { tracker, .. } => Some(tracker),
680                CreateStreamingJobStatus::Created => None,
681            })
682    }
683
684    pub(super) fn has_pending_finished_jobs(&self) -> bool {
685        self.iter_creating_job_tracker()
686            .any(|tracker| tracker.is_finished())
687    }
688
689    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
690        self.iter_mut_creating_job_tracker()
691            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
692            .collect()
693    }
694
695    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
696        let mut finished_jobs = vec![];
697        let mut table_ids_to_truncate = vec![];
698        let mut finished_cdc_table_backfill = vec![];
699        for (job_id, job) in &mut self.jobs {
700            if let CreateStreamingJobStatus::Creating { tracker, .. } = &mut job.status {
701                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
702                table_ids_to_truncate.extend(truncate_table_ids);
703                if is_finished {
704                    let CreateStreamingJobStatus::Creating { tracker, .. } =
705                        replace(&mut job.status, CreateStreamingJobStatus::Created)
706                    else {
707                        unreachable!()
708                    };
709                    finished_jobs.push(tracker.into_tracking_job());
710                }
711            }
712            if let Some(tracker) = &mut job.cdc_table_backfill_tracker
713                && tracker.take_pre_completed()
714            {
715                finished_cdc_table_backfill.push(*job_id);
716            }
717        }
718        StagingCommitInfo {
719            finished_jobs,
720            table_ids_to_truncate,
721            finished_cdc_table_backfill,
722        }
723    }
724
725    pub fn fragment_subscribers(
726        &self,
727        fragment_id: FragmentId,
728    ) -> impl Iterator<Item = SubscriberId> + '_ {
729        let job_id = self.fragment_location[&fragment_id];
730        self.jobs[&job_id].subscribers.keys().copied()
731    }
732
733    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
734        self.jobs[&job_id].subscribers.keys().copied()
735    }
736
737    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
738        self.jobs
739            .iter()
740            .filter_map(|(job_id, info)| {
741                info.subscribers
742                    .values()
743                    .filter_map(|subscriber| match subscriber {
744                        SubscriberType::Subscription(retention) => Some(*retention),
745                        SubscriberType::SnapshotBackfill => None,
746                    })
747                    .max()
748                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
749            })
750            .collect()
751    }
752
753    pub fn register_subscriber(
754        &mut self,
755        job_id: JobId,
756        subscriber_id: SubscriberId,
757        subscriber: SubscriberType,
758    ) {
759        self.jobs
760            .get_mut(&job_id)
761            .expect("should exist")
762            .subscribers
763            .try_insert(subscriber_id, subscriber)
764            .expect("non duplicate");
765    }
766
767    pub fn unregister_subscriber(
768        &mut self,
769        job_id: JobId,
770        subscriber_id: SubscriberId,
771    ) -> Option<SubscriberType> {
772        self.jobs
773            .get_mut(&job_id)
774            .expect("should exist")
775            .subscribers
776            .remove(&subscriber_id)
777    }
778
779    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
780        let job_id = self.fragment_location[&fragment_id];
781        let fragment = self
782            .jobs
783            .get_mut(&job_id)
784            .expect("should exist")
785            .fragment_infos
786            .get_mut(&fragment_id)
787            .expect("should exist");
788        (fragment, job_id)
789    }
790
791    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
792        Self {
793            database_id,
794            jobs: Default::default(),
795            fragment_location: Default::default(),
796            shared_actor_infos,
797        }
798    }
799
800    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
801        // remove the database because it's empty.
802        shared_actor_infos.remove_database(database_id);
803        Self::empty_inner(database_id, shared_actor_infos)
804    }
805
806    pub fn recover(
807        database_id: DatabaseId,
808        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
809        shared_actor_infos: SharedActorInfos,
810    ) -> Self {
811        let mut info = Self::empty_inner(database_id, shared_actor_infos);
812        for job in jobs {
813            info.add_existing(job);
814        }
815        info
816    }
817
818    pub fn is_empty(&self) -> bool {
819        self.jobs.is_empty()
820    }
821
822    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
823        let InflightStreamingJobInfo {
824            job_id,
825            fragment_infos,
826            subscribers,
827            status,
828            cdc_table_backfill_tracker,
829        } = job;
830        self.jobs
831            .try_insert(
832                job.job_id,
833                InflightStreamingJobInfo {
834                    job_id,
835                    subscribers,
836                    fragment_infos: Default::default(), // fill in later in apply_add
837                    status,
838                    cdc_table_backfill_tracker,
839                },
840            )
841            .expect("non-duplicate");
842        let post_apply_changes =
843            self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
844                (
845                    fragment_id,
846                    CommandFragmentChanges::NewFragment {
847                        job_id: job.job_id,
848                        info,
849                    },
850                )
851            }));
852        self.post_apply(post_apply_changes);
853    }
854
855    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
856    /// the info correspondingly.
857    pub(crate) fn pre_apply(
858        &mut self,
859        new_job: Option<(JobId, Option<CdcTableBackfillTracker>)>,
860        fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
861    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
862        if let Some((job_id, cdc_table_backfill_tracker)) = new_job {
863            self.jobs
864                .try_insert(
865                    job_id,
866                    InflightStreamingJobInfo {
867                        job_id,
868                        fragment_infos: Default::default(),
869                        subscribers: Default::default(), // no subscriber for newly create job
870                        status: CreateStreamingJobStatus::Init,
871                        cdc_table_backfill_tracker,
872                    },
873                )
874                .expect("non-duplicate");
875        }
876        self.apply_add(fragment_changes.into_iter())
877    }
878
879    fn apply_add(
880        &mut self,
881        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
882    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
883        let mut post_apply = HashMap::new();
884        {
885            let shared_infos = self.shared_actor_infos.clone();
886            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
887            for (fragment_id, change) in fragment_changes {
888                match change {
889                    CommandFragmentChanges::NewFragment { job_id, info } => {
890                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
891                        shared_actor_writer.upsert([(&info, job_id)]);
892                        fragment_infos
893                            .fragment_infos
894                            .try_insert(fragment_id, info)
895                            .expect("non duplicate");
896                        self.fragment_location
897                            .try_insert(fragment_id, job_id)
898                            .expect("non duplicate");
899                    }
900                    CommandFragmentChanges::Reschedule {
901                        new_actors,
902                        actor_update_vnode_bitmap,
903                        to_remove,
904                        actor_splits,
905                    } => {
906                        let (info, _) = self.fragment_mut(fragment_id);
907                        let actors = &mut info.actors;
908                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
909                            actors
910                                .get_mut(&actor_id)
911                                .expect("should exist")
912                                .vnode_bitmap = Some(new_vnodes);
913                        }
914                        for (actor_id, actor) in new_actors {
915                            actors
916                                .try_insert(actor_id as _, actor)
917                                .expect("non-duplicate");
918                        }
919                        for (actor_id, splits) in actor_splits {
920                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
921                        }
922
923                        post_apply.insert(
924                            fragment_id,
925                            PostApplyFragmentChanges::Reschedule { to_remove },
926                        );
927
928                        // info will be upserted into shared_actor_infos in post_apply stage
929                    }
930                    CommandFragmentChanges::RemoveFragment => {
931                        post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
932                    }
933                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
934                        let mut remaining_fragment_ids: HashSet<_> =
935                            replace_map.keys().cloned().collect();
936                        let (info, _) = self.fragment_mut(fragment_id);
937                        visit_stream_node_mut(&mut info.nodes, |node| {
938                            if let NodeBody::Merge(m) = node
939                                && let Some(new_upstream_fragment_id) =
940                                    replace_map.get(&m.upstream_fragment_id)
941                            {
942                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
943                                    if cfg!(debug_assertions) {
944                                        panic!(
945                                            "duplicate upstream fragment: {:?} {:?}",
946                                            m, replace_map
947                                        );
948                                    } else {
949                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
950                                    }
951                                }
952                                m.upstream_fragment_id = *new_upstream_fragment_id;
953                            }
954                        });
955                        if cfg!(debug_assertions) {
956                            assert!(
957                                remaining_fragment_ids.is_empty(),
958                                "non-existing fragment to replace: {:?} {:?} {:?}",
959                                remaining_fragment_ids,
960                                info.nodes,
961                                replace_map
962                            );
963                        } else {
964                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
965                        }
966                    }
967                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
968                        let (info, _) = self.fragment_mut(fragment_id);
969                        let mut injected = false;
970                        visit_stream_node_mut(&mut info.nodes, |node| {
971                            if let NodeBody::UpstreamSinkUnion(u) = node {
972                                if cfg!(debug_assertions) {
973                                    let current_upstream_fragment_ids = u
974                                        .init_upstreams
975                                        .iter()
976                                        .map(|upstream| upstream.upstream_fragment_id)
977                                        .collect::<HashSet<_>>();
978                                    if current_upstream_fragment_ids
979                                        .contains(&new_upstream_info.upstream_fragment_id)
980                                    {
981                                        panic!(
982                                            "duplicate upstream fragment: {:?} {:?}",
983                                            u, new_upstream_info
984                                        );
985                                    }
986                                }
987                                u.init_upstreams.push(new_upstream_info.clone());
988                                injected = true;
989                            }
990                        });
991                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
992                    }
993                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
994                        let (info, _) = self.fragment_mut(fragment_id);
995                        let mut removed = false;
996                        visit_stream_node_mut(&mut info.nodes, |node| {
997                            if let NodeBody::UpstreamSinkUnion(u) = node {
998                                if cfg!(debug_assertions) {
999                                    let current_upstream_fragment_ids = u
1000                                        .init_upstreams
1001                                        .iter()
1002                                        .map(|upstream| upstream.upstream_fragment_id)
1003                                        .collect::<HashSet<FragmentId>>();
1004                                    for drop_fragment_id in &drop_upstream_fragment_ids {
1005                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
1006                                        {
1007                                            panic!(
1008                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
1009                                                u, drop_upstream_fragment_ids, drop_fragment_id
1010                                            );
1011                                        }
1012                                    }
1013                                }
1014                                u.init_upstreams.retain(|upstream| {
1015                                    !drop_upstream_fragment_ids
1016                                        .contains(&upstream.upstream_fragment_id)
1017                                });
1018                                removed = true;
1019                            }
1020                        });
1021                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
1022                    }
1023                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
1024                        let (info, job_id) = self.fragment_mut(fragment_id);
1025                        let actors = &mut info.actors;
1026                        for (actor_id, splits) in actor_splits {
1027                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
1028                        }
1029                        shared_actor_writer.upsert([(&*info, job_id)]);
1030                    }
1031                }
1032            }
1033            shared_actor_writer.finish();
1034        }
1035        post_apply
1036    }
1037
1038    pub(super) fn build_edge(
1039        &self,
1040        command: Option<&Command>,
1041        control_stream_manager: &ControlStreamManager,
1042    ) -> Option<FragmentEdgeBuildResult> {
1043        let (info, replace_job, new_upstream_sink) = match command {
1044            None => {
1045                return None;
1046            }
1047            Some(command) => match command {
1048                Command::Flush
1049                | Command::Pause
1050                | Command::Resume
1051                | Command::DropStreamingJobs { .. }
1052                | Command::RescheduleFragment { .. }
1053                | Command::SourceChangeSplit { .. }
1054                | Command::Throttle { .. }
1055                | Command::CreateSubscription { .. }
1056                | Command::DropSubscription { .. }
1057                | Command::ConnectorPropsChange(_)
1058                | Command::Refresh { .. }
1059                | Command::ListFinish { .. }
1060                | Command::LoadFinish { .. } => {
1061                    return None;
1062                }
1063                Command::CreateStreamingJob { info, job_type, .. } => {
1064                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
1065                        new_upstream_sink,
1066                    ) = job_type
1067                    {
1068                        Some(new_upstream_sink)
1069                    } else {
1070                        None
1071                    };
1072                    let is_snapshot_backfill =
1073                        matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_));
1074                    (Some((info, is_snapshot_backfill)), None, new_upstream_sink)
1075                }
1076                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
1077                Command::ResetSource { .. } => (None, None, None),
1078            },
1079        };
1080        // `existing_fragment_ids` consists of
1081        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
1082        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
1083        // if the upstream fragment previously exists
1084        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
1085        // if creating a new sink-into-table
1086        //  - should contain the `fragment_id` of the downstream table.
1087        let existing_fragment_ids = info
1088            .into_iter()
1089            .flat_map(|(info, _)| info.upstream_fragment_downstreams.keys())
1090            .chain(replace_job.into_iter().flat_map(|replace_job| {
1091                replace_job
1092                    .upstream_fragment_downstreams
1093                    .keys()
1094                    .filter(|fragment_id| {
1095                        info.map(|(info, _)| {
1096                            !info
1097                                .stream_job_fragments
1098                                .fragments
1099                                .contains_key(fragment_id)
1100                        })
1101                        .unwrap_or(true)
1102                    })
1103                    .chain(replace_job.replace_upstream.keys())
1104            }))
1105            .chain(
1106                new_upstream_sink
1107                    .into_iter()
1108                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
1109            )
1110            .cloned();
1111        let new_fragment_infos = info
1112            .into_iter()
1113            .flat_map(|(info, is_snapshot_backfill)| {
1114                let partial_graph_id = to_partial_graph_id(
1115                    self.database_id,
1116                    is_snapshot_backfill.then_some(info.streaming_job.id()),
1117                );
1118                info.stream_job_fragments
1119                    .new_fragment_info(&info.init_split_assignment)
1120                    .map(move |(fragment_id, info)| (fragment_id, info, partial_graph_id))
1121            })
1122            .chain(
1123                replace_job
1124                    .into_iter()
1125                    .flat_map(|replace_job| {
1126                        replace_job
1127                            .new_fragments
1128                            .new_fragment_info(&replace_job.init_split_assignment)
1129                            .chain(
1130                                replace_job
1131                                    .auto_refresh_schema_sinks
1132                                    .as_ref()
1133                                    .into_iter()
1134                                    .flat_map(|sinks| {
1135                                        sinks.iter().map(|sink| {
1136                                            (
1137                                                sink.new_fragment.fragment_id,
1138                                                sink.new_fragment_info(),
1139                                            )
1140                                        })
1141                                    }),
1142                            )
1143                    })
1144                    .map(|(fragment_id, fragment)| {
1145                        (
1146                            fragment_id,
1147                            fragment,
1148                            // we assume that replace job only happens in database partial graph
1149                            to_partial_graph_id(self.database_id, None),
1150                        )
1151                    }),
1152            )
1153            .collect_vec();
1154        let mut builder = FragmentEdgeBuilder::new(
1155            existing_fragment_ids
1156                .map(|fragment_id| {
1157                    (
1158                        self.fragment(fragment_id),
1159                        to_partial_graph_id(self.database_id, None),
1160                    )
1161                })
1162                .chain(
1163                    new_fragment_infos
1164                        .iter()
1165                        .map(|(_, info, partial_graph_id)| (info, *partial_graph_id)),
1166                ),
1167            control_stream_manager,
1168        );
1169        if let Some((info, _)) = info {
1170            builder.add_relations(&info.upstream_fragment_downstreams);
1171            builder.add_relations(&info.stream_job_fragments.downstreams);
1172        }
1173        if let Some(replace_job) = replace_job {
1174            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1175            builder.add_relations(&replace_job.new_fragments.downstreams);
1176        }
1177        if let Some(new_upstream_sink) = new_upstream_sink {
1178            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1179            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1180            builder.add_edge(sink_fragment_id, new_sink_downstream);
1181        }
1182        if let Some(replace_job) = replace_job {
1183            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1184                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1185                    fragment_replacement
1186                {
1187                    builder.replace_upstream(
1188                        *fragment_id,
1189                        *original_upstream_fragment_id,
1190                        *new_upstream_fragment_id,
1191                    );
1192                }
1193            }
1194        }
1195        Some(builder.build())
1196    }
1197
1198    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
1199    /// remove that from the snapshot correspondingly.
1200    pub(crate) fn post_apply(
1201        &mut self,
1202        fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1203    ) {
1204        let inner = self.shared_actor_infos.clone();
1205        let mut shared_actor_writer = inner.start_writer(self.database_id);
1206        {
1207            for (fragment_id, changes) in fragment_changes {
1208                match changes {
1209                    PostApplyFragmentChanges::Reschedule { to_remove } => {
1210                        let job_id = self.fragment_location[&fragment_id];
1211                        let info = self
1212                            .jobs
1213                            .get_mut(&job_id)
1214                            .expect("should exist")
1215                            .fragment_infos
1216                            .get_mut(&fragment_id)
1217                            .expect("should exist");
1218                        for actor_id in to_remove {
1219                            assert!(info.actors.remove(&actor_id).is_some());
1220                        }
1221                        shared_actor_writer.upsert([(&*info, job_id)]);
1222                    }
1223                    PostApplyFragmentChanges::RemoveFragment => {
1224                        let job_id = self
1225                            .fragment_location
1226                            .remove(&fragment_id)
1227                            .expect("should exist");
1228                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1229                        let fragment = job
1230                            .fragment_infos
1231                            .remove(&fragment_id)
1232                            .expect("should exist");
1233                        shared_actor_writer.remove(&fragment);
1234                        if job.fragment_infos.is_empty() {
1235                            self.jobs.remove(&job_id).expect("should exist");
1236                        }
1237                    }
1238                }
1239            }
1240        }
1241        shared_actor_writer.finish();
1242    }
1243}
1244
1245impl InflightFragmentInfo {
1246    /// Returns actor list to collect in the target worker node.
1247    pub(crate) fn actor_ids_to_collect(
1248        infos: impl IntoIterator<Item = &Self>,
1249    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1250        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1251        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1252            assert!(
1253                ret.entry(actor.worker_id)
1254                    .or_default()
1255                    .insert(*actor_id as _)
1256            )
1257        }
1258        ret
1259    }
1260
1261    pub fn existing_table_ids<'a>(
1262        infos: impl IntoIterator<Item = &'a Self> + 'a,
1263    ) -> impl Iterator<Item = TableId> + 'a {
1264        infos
1265            .into_iter()
1266            .flat_map(|info| info.state_table_ids.iter().cloned())
1267    }
1268
1269    pub fn workers<'a>(
1270        infos: impl IntoIterator<Item = &'a Self> + 'a,
1271    ) -> impl Iterator<Item = WorkerId> + 'a {
1272        infos
1273            .into_iter()
1274            .flat_map(|fragment| fragment.actors.values().map(|actor| actor.worker_id))
1275    }
1276
1277    pub fn contains_worker<'a>(
1278        infos: impl IntoIterator<Item = &'a Self> + 'a,
1279        worker_id: WorkerId,
1280    ) -> bool {
1281        Self::workers(infos).any(|existing_worker_id| existing_worker_id == worker_id)
1282    }
1283}
1284
1285impl InflightDatabaseInfo {
1286    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1287        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1288    }
1289
1290    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1291        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1292    }
1293}