risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::mem::replace;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use parking_lot::RawRwLock;
22use parking_lot::lock_api::RwLockReadGuard;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
25use risingwave_common::id::JobId;
26use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
27use risingwave_connector::source::{SplitImpl, SplitMetaData};
28use risingwave_meta_model::WorkerId;
29use risingwave_meta_model::fragment::DistributionType;
30use risingwave_pb::ddl_service::DdlProgress;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::id::SubscriberId;
33use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
34use risingwave_pb::meta::subscribe_response::Operation;
35use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_service::BarrierCompleteResponse;
38use tracing::{info, warn};
39
40use crate::MetaResult;
41use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
42use crate::barrier::progress::{CreateMviewProgressTracker, StagingCommitInfo};
43use crate::barrier::rpc::ControlStreamManager;
44use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
45use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
46use crate::controller::utils::rebuild_fragment_mapping;
47use crate::manager::NotificationManagerRef;
48use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
49
50#[derive(Debug, Clone)]
51pub struct SharedActorInfo {
52    pub worker_id: WorkerId,
53    pub vnode_bitmap: Option<Bitmap>,
54    pub splits: Vec<SplitImpl>,
55}
56
57impl From<&InflightActorInfo> for SharedActorInfo {
58    fn from(value: &InflightActorInfo) -> Self {
59        Self {
60            worker_id: value.worker_id,
61            vnode_bitmap: value.vnode_bitmap.clone(),
62            splits: value.splits.clone(),
63        }
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct SharedFragmentInfo {
69    pub fragment_id: FragmentId,
70    pub job_id: JobId,
71    pub distribution_type: DistributionType,
72    pub actors: HashMap<ActorId, SharedActorInfo>,
73    pub vnode_count: usize,
74    pub fragment_type_mask: FragmentTypeMask,
75}
76
77impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
78    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
79        let (info, job_id) = pair;
80
81        let InflightFragmentInfo {
82            fragment_id,
83            distribution_type,
84            fragment_type_mask,
85            actors,
86            vnode_count,
87            ..
88        } = info;
89
90        Self {
91            fragment_id: *fragment_id,
92            job_id,
93            distribution_type: *distribution_type,
94            fragment_type_mask: *fragment_type_mask,
95            actors: actors
96                .iter()
97                .map(|(actor_id, actor)| (*actor_id, actor.into()))
98                .collect(),
99            vnode_count: *vnode_count,
100        }
101    }
102}
103
104#[derive(Default, Debug)]
105pub struct SharedActorInfosInner {
106    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
107}
108
109impl SharedActorInfosInner {
110    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
111        self.info
112            .values()
113            .find_map(|database| database.get(&fragment_id))
114    }
115
116    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
117        self.info.values().flatten()
118    }
119}
120
121#[derive(Clone, educe::Educe)]
122#[educe(Debug)]
123pub struct SharedActorInfos {
124    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
125    #[educe(Debug(ignore))]
126    notification_manager: NotificationManagerRef,
127}
128
129impl SharedActorInfos {
130    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
131        self.inner.read()
132    }
133
134    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
135        let core = self.inner.read();
136        core.iter_over_fragments()
137            .flat_map(|(_, fragment)| {
138                fragment
139                    .actors
140                    .iter()
141                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
142            })
143            .collect()
144    }
145
146    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
147    ///
148    /// Very occasionally split removal may happen during scaling, in which case we need to
149    /// use the old splits for reallocation instead of the latest splits (which may be missing),
150    /// so that we can resolve the split removal in the next command.
151    pub fn migrate_splits_for_source_actors(
152        &self,
153        fragment_id: FragmentId,
154        prev_actor_ids: &[ActorId],
155        curr_actor_ids: &[ActorId],
156    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
157        let guard = self.read_guard();
158
159        let prev_splits = prev_actor_ids
160            .iter()
161            .flat_map(|actor_id| {
162                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
163                guard
164                    .get_fragment(fragment_id)
165                    .and_then(|info| info.actors.get(actor_id))
166                    .map(|actor| actor.splits.clone())
167                    .unwrap_or_default()
168            })
169            .map(|split| (split.id(), split))
170            .collect();
171
172        let empty_actor_splits = curr_actor_ids
173            .iter()
174            .map(|actor_id| (*actor_id, vec![]))
175            .collect();
176
177        let diff = crate::stream::source_manager::reassign_splits(
178            fragment_id,
179            empty_actor_splits,
180            &prev_splits,
181            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
182            std::default::Default::default(),
183        )
184        .unwrap_or_default();
185
186        Ok(diff)
187    }
188}
189
190impl SharedActorInfos {
191    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
192        Self {
193            inner: Arc::new(Default::default()),
194            notification_manager,
195        }
196    }
197
198    pub(super) fn remove_database(&self, database_id: DatabaseId) {
199        if let Some(database) = self.inner.write().info.remove(&database_id) {
200            let mapping = database
201                .into_values()
202                .map(|fragment| rebuild_fragment_mapping(&fragment))
203                .collect_vec();
204            if !mapping.is_empty() {
205                self.notification_manager
206                    .notify_fragment_mapping(Operation::Delete, mapping);
207            }
208        }
209    }
210
211    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
212        let database_ids: HashSet<_> = database_ids.into_iter().collect();
213
214        let mut mapping = Vec::new();
215        for fragment in self
216            .inner
217            .write()
218            .info
219            .extract_if(|database_id, _| !database_ids.contains(database_id))
220            .flat_map(|(_, fragments)| fragments.into_values())
221        {
222            mapping.push(rebuild_fragment_mapping(&fragment));
223        }
224        if !mapping.is_empty() {
225            self.notification_manager
226                .notify_fragment_mapping(Operation::Delete, mapping);
227        }
228    }
229
230    pub(super) fn recover_database(
231        &self,
232        database_id: DatabaseId,
233        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
234    ) {
235        let mut remaining_fragments: HashMap<_, _> = fragments
236            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
237            .collect();
238        // delete the fragments that exist previously, but not included in the recovered fragments
239        let mut writer = self.start_writer(database_id);
240        let database = writer.write_guard.info.entry(database_id).or_default();
241        for (_, fragment) in database.extract_if(|fragment_id, fragment_infos| {
242            if let Some(info) = remaining_fragments.remove(fragment_id) {
243                let info = info.into();
244                writer
245                    .updated_fragment_mapping
246                    .get_or_insert_default()
247                    .push(rebuild_fragment_mapping(&info));
248                *fragment_infos = info;
249                false
250            } else {
251                true
252            }
253        }) {
254            writer
255                .deleted_fragment_mapping
256                .get_or_insert_default()
257                .push(rebuild_fragment_mapping(&fragment));
258        }
259        for (fragment_id, info) in remaining_fragments {
260            let info = info.into();
261            writer
262                .added_fragment_mapping
263                .get_or_insert_default()
264                .push(rebuild_fragment_mapping(&info));
265            database.insert(fragment_id, info);
266        }
267        writer.finish();
268    }
269
270    pub(super) fn upsert(
271        &self,
272        database_id: DatabaseId,
273        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
274    ) {
275        let mut writer = self.start_writer(database_id);
276        writer.upsert(infos);
277        writer.finish();
278    }
279
280    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
281        SharedActorInfoWriter {
282            database_id,
283            write_guard: self.inner.write(),
284            notification_manager: &self.notification_manager,
285            added_fragment_mapping: None,
286            updated_fragment_mapping: None,
287            deleted_fragment_mapping: None,
288        }
289    }
290}
291
292pub(super) struct SharedActorInfoWriter<'a> {
293    database_id: DatabaseId,
294    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
295    notification_manager: &'a NotificationManagerRef,
296    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
297    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
298    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
299}
300
301impl SharedActorInfoWriter<'_> {
302    pub(super) fn upsert(
303        &mut self,
304        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
305    ) {
306        let database = self.write_guard.info.entry(self.database_id).or_default();
307        for info @ (fragment, _) in infos {
308            match database.entry(fragment.fragment_id) {
309                Entry::Occupied(mut entry) => {
310                    let info = info.into();
311                    self.updated_fragment_mapping
312                        .get_or_insert_default()
313                        .push(rebuild_fragment_mapping(&info));
314                    entry.insert(info);
315                }
316                Entry::Vacant(entry) => {
317                    let info = info.into();
318                    self.added_fragment_mapping
319                        .get_or_insert_default()
320                        .push(rebuild_fragment_mapping(&info));
321                    entry.insert(info);
322                }
323            }
324        }
325    }
326
327    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
328        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
329            && let Some(fragment) = database.remove(&info.fragment_id)
330        {
331            self.deleted_fragment_mapping
332                .get_or_insert_default()
333                .push(rebuild_fragment_mapping(&fragment));
334        }
335    }
336
337    pub(super) fn finish(self) {
338        if let Some(mapping) = self.added_fragment_mapping {
339            self.notification_manager
340                .notify_fragment_mapping(Operation::Add, mapping);
341        }
342        if let Some(mapping) = self.updated_fragment_mapping {
343            self.notification_manager
344                .notify_fragment_mapping(Operation::Update, mapping);
345        }
346        if let Some(mapping) = self.deleted_fragment_mapping {
347            self.notification_manager
348                .notify_fragment_mapping(Operation::Delete, mapping);
349        }
350    }
351}
352
353#[derive(Debug, Clone)]
354pub(super) struct BarrierInfo {
355    pub prev_epoch: TracedEpoch,
356    pub curr_epoch: TracedEpoch,
357    pub kind: BarrierKind,
358}
359
360impl BarrierInfo {
361    pub(super) fn prev_epoch(&self) -> u64 {
362        self.prev_epoch.value().0
363    }
364}
365
366#[derive(Debug)]
367pub(super) enum CommandFragmentChanges {
368    NewFragment {
369        job_id: JobId,
370        info: InflightFragmentInfo,
371        /// Whether the fragment already exists before added. This is used
372        /// when snapshot backfill is finished and add its fragment info
373        /// back to the database.
374        is_existing: bool,
375    },
376    AddNodeUpstream(PbUpstreamSinkInfo),
377    DropNodeUpstream(Vec<FragmentId>),
378    ReplaceNodeUpstream(
379        /// old `fragment_id` -> new `fragment_id`
380        HashMap<FragmentId, FragmentId>,
381    ),
382    Reschedule {
383        new_actors: HashMap<ActorId, InflightActorInfo>,
384        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
385        to_remove: HashSet<ActorId>,
386        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
387    },
388    RemoveFragment,
389    SplitAssignment {
390        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
391    },
392}
393
394pub(super) enum PostApplyFragmentChanges {
395    Reschedule { to_remove: HashSet<ActorId> },
396    RemoveFragment,
397}
398
399#[derive(Clone, Debug)]
400pub enum SubscriberType {
401    Subscription(u64),
402    SnapshotBackfill,
403}
404
405#[derive(Debug)]
406pub(super) enum CreateStreamingJobStatus {
407    Init,
408    Creating(CreateMviewProgressTracker),
409    Created,
410}
411
412#[derive(Debug)]
413pub(super) struct InflightStreamingJobInfo {
414    pub job_id: JobId,
415    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
416    pub subscribers: HashMap<SubscriberId, SubscriberType>,
417    pub status: CreateStreamingJobStatus,
418}
419
420impl InflightStreamingJobInfo {
421    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
422        self.fragment_infos.values()
423    }
424
425    pub fn snapshot_backfill_actor_ids(
426        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
427    ) -> impl Iterator<Item = ActorId> + '_ {
428        fragment_infos
429            .values()
430            .filter(|fragment| {
431                fragment
432                    .fragment_type_mask
433                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
434            })
435            .flat_map(|fragment| fragment.actors.keys().copied())
436    }
437
438    pub fn tracking_progress_actor_ids(
439        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
440    ) -> Vec<(ActorId, BackfillUpstreamType)> {
441        StreamJobFragments::tracking_progress_actor_ids_impl(
442            fragment_infos
443                .values()
444                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
445        )
446    }
447}
448
449impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
450    type Item = &'a InflightFragmentInfo;
451
452    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
453
454    fn into_iter(self) -> Self::IntoIter {
455        self.fragment_infos()
456    }
457}
458
459#[derive(Debug)]
460pub struct InflightDatabaseInfo {
461    database_id: DatabaseId,
462    jobs: HashMap<JobId, InflightStreamingJobInfo>,
463    fragment_location: HashMap<FragmentId, JobId>,
464    pub(super) shared_actor_infos: SharedActorInfos,
465}
466
467impl InflightDatabaseInfo {
468    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
469        self.jobs.values().flat_map(|job| job.fragment_infos())
470    }
471
472    pub fn contains_job(&self, job_id: JobId) -> bool {
473        self.jobs.contains_key(&job_id)
474    }
475
476    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
477        let job_id = self.fragment_location[&fragment_id];
478        self.jobs
479            .get(&job_id)
480            .expect("should exist")
481            .fragment_infos
482            .get(&fragment_id)
483            .expect("should exist")
484    }
485
486    pub fn gen_ddl_progress(&self) -> impl Iterator<Item = (JobId, DdlProgress)> + '_ {
487        self.jobs
488            .iter()
489            .filter_map(|(job_id, job)| match &job.status {
490                CreateStreamingJobStatus::Init => None,
491                CreateStreamingJobStatus::Creating(tracker) => {
492                    Some((*job_id, tracker.gen_ddl_progress()))
493                }
494                CreateStreamingJobStatus::Created => None,
495            })
496    }
497
498    pub(super) fn apply_collected_command(
499        &mut self,
500        command: Option<&Command>,
501        resps: impl Iterator<Item = &BarrierCompleteResponse>,
502        version_stats: &HummockVersionStats,
503    ) {
504        if let Some(Command::CreateStreamingJob { info, job_type, .. }) = command {
505            match job_type {
506                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
507                    let job_id = info.streaming_job.id();
508                    if let Some(job_info) = self.jobs.get_mut(&job_id) {
509                        let CreateStreamingJobStatus::Init = replace(
510                            &mut job_info.status,
511                            CreateStreamingJobStatus::Creating(CreateMviewProgressTracker::new(
512                                info,
513                                version_stats,
514                            )),
515                        ) else {
516                            unreachable!("should be init before collect the first barrier")
517                        };
518                    } else {
519                        info!(%job_id, "newly create job get cancelled before first barrier is collected")
520                    }
521                }
522                CreateStreamingJobType::SnapshotBackfill(_) => {
523                    // The progress of SnapshotBackfill won't be tracked here
524                }
525            }
526        }
527        for progress in resps.flat_map(|resp| &resp.create_mview_progress) {
528            let Some(job_id) = self.fragment_location.get(&progress.fragment_id) else {
529                warn!(
530                    "update the progress of an non-existent creating streaming job: {progress:?}, which could be cancelled"
531                );
532                continue;
533            };
534            let CreateStreamingJobStatus::Creating(tracker) =
535                &mut self.jobs.get_mut(job_id).expect("should exist").status
536            else {
537                warn!("update the progress of an created streaming job: {progress:?}");
538                continue;
539            };
540            tracker.apply_progress(progress, version_stats);
541        }
542    }
543
544    fn iter_creating_job_tracker(&self) -> impl Iterator<Item = &CreateMviewProgressTracker> {
545        self.jobs.values().filter_map(|job| match &job.status {
546            CreateStreamingJobStatus::Init => None,
547            CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
548            CreateStreamingJobStatus::Created => None,
549        })
550    }
551
552    fn iter_mut_creating_job_tracker(
553        &mut self,
554    ) -> impl Iterator<Item = &mut CreateMviewProgressTracker> {
555        self.jobs
556            .values_mut()
557            .filter_map(|job| match &mut job.status {
558                CreateStreamingJobStatus::Init => None,
559                CreateStreamingJobStatus::Creating(tracker) => Some(tracker),
560                CreateStreamingJobStatus::Created => None,
561            })
562    }
563
564    pub(super) fn has_pending_finished_jobs(&self) -> bool {
565        self.iter_creating_job_tracker()
566            .any(|tracker| tracker.is_finished())
567    }
568
569    pub(super) fn take_pending_backfill_nodes(&mut self) -> Vec<FragmentId> {
570        self.iter_mut_creating_job_tracker()
571            .flat_map(|tracker| tracker.take_pending_backfill_nodes())
572            .collect()
573    }
574
575    pub(super) fn take_staging_commit_info(&mut self) -> StagingCommitInfo {
576        let mut finished_jobs = vec![];
577        let mut table_ids_to_truncate = vec![];
578        for job in self.jobs.values_mut() {
579            if let CreateStreamingJobStatus::Creating(tracker) = &mut job.status {
580                let (is_finished, truncate_table_ids) = tracker.collect_staging_commit_info();
581                table_ids_to_truncate.extend(truncate_table_ids);
582                if is_finished {
583                    let CreateStreamingJobStatus::Creating(tracker) =
584                        replace(&mut job.status, CreateStreamingJobStatus::Created)
585                    else {
586                        unreachable!()
587                    };
588                    finished_jobs.push(tracker.into_tracking_job());
589                }
590            }
591        }
592        StagingCommitInfo {
593            finished_jobs,
594            table_ids_to_truncate,
595        }
596    }
597
598    pub fn fragment_subscribers(
599        &self,
600        fragment_id: FragmentId,
601    ) -> impl Iterator<Item = SubscriberId> + '_ {
602        let job_id = self.fragment_location[&fragment_id];
603        self.jobs[&job_id].subscribers.keys().copied()
604    }
605
606    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = SubscriberId> + '_ {
607        self.jobs[&job_id].subscribers.keys().copied()
608    }
609
610    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
611        self.jobs
612            .iter()
613            .filter_map(|(job_id, info)| {
614                info.subscribers
615                    .values()
616                    .filter_map(|subscriber| match subscriber {
617                        SubscriberType::Subscription(retention) => Some(*retention),
618                        SubscriberType::SnapshotBackfill => None,
619                    })
620                    .max()
621                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
622            })
623            .collect()
624    }
625
626    pub fn register_subscriber(
627        &mut self,
628        job_id: JobId,
629        subscriber_id: SubscriberId,
630        subscriber: SubscriberType,
631    ) {
632        self.jobs
633            .get_mut(&job_id)
634            .expect("should exist")
635            .subscribers
636            .try_insert(subscriber_id, subscriber)
637            .expect("non duplicate");
638    }
639
640    pub fn unregister_subscriber(
641        &mut self,
642        job_id: JobId,
643        subscriber_id: SubscriberId,
644    ) -> Option<SubscriberType> {
645        self.jobs
646            .get_mut(&job_id)
647            .expect("should exist")
648            .subscribers
649            .remove(&subscriber_id)
650    }
651
652    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
653        let job_id = self.fragment_location[&fragment_id];
654        let fragment = self
655            .jobs
656            .get_mut(&job_id)
657            .expect("should exist")
658            .fragment_infos
659            .get_mut(&fragment_id)
660            .expect("should exist");
661        (fragment, job_id)
662    }
663
664    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
665        Self {
666            database_id,
667            jobs: Default::default(),
668            fragment_location: Default::default(),
669            shared_actor_infos,
670        }
671    }
672
673    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
674        // remove the database because it's empty.
675        shared_actor_infos.remove_database(database_id);
676        Self::empty_inner(database_id, shared_actor_infos)
677    }
678
679    pub fn recover(
680        database_id: DatabaseId,
681        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
682        shared_actor_infos: SharedActorInfos,
683    ) -> Self {
684        let mut info = Self::empty_inner(database_id, shared_actor_infos);
685        for job in jobs {
686            info.add_existing(job);
687        }
688        info
689    }
690
691    pub fn is_empty(&self) -> bool {
692        self.jobs.is_empty()
693    }
694
695    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
696        let InflightStreamingJobInfo {
697            job_id,
698            fragment_infos,
699            subscribers,
700            status,
701        } = job;
702        self.jobs
703            .try_insert(
704                job.job_id,
705                InflightStreamingJobInfo {
706                    job_id,
707                    subscribers,
708                    fragment_infos: Default::default(), // fill in later in apply_add
709                    status,
710                },
711            )
712            .expect("non-duplicate");
713        let post_apply_changes =
714            self.apply_add(fragment_infos.into_iter().map(|(fragment_id, info)| {
715                (
716                    fragment_id,
717                    CommandFragmentChanges::NewFragment {
718                        job_id: job.job_id,
719                        info,
720                        is_existing: true,
721                    },
722                )
723            }));
724        self.post_apply(post_apply_changes);
725    }
726
727    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
728    /// the info correspondingly.
729    pub(crate) fn pre_apply(
730        &mut self,
731        new_job_id: Option<JobId>,
732        fragment_changes: HashMap<FragmentId, CommandFragmentChanges>,
733    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
734        if let Some(job_id) = new_job_id {
735            self.jobs
736                .try_insert(
737                    job_id,
738                    InflightStreamingJobInfo {
739                        job_id,
740                        fragment_infos: Default::default(),
741                        subscribers: Default::default(), // no subscriber for newly create job
742                        status: CreateStreamingJobStatus::Init,
743                    },
744                )
745                .expect("non-duplicate");
746        }
747        self.apply_add(fragment_changes.into_iter())
748    }
749
750    fn apply_add(
751        &mut self,
752        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
753    ) -> HashMap<FragmentId, PostApplyFragmentChanges> {
754        let mut post_apply = HashMap::new();
755        {
756            let shared_infos = self.shared_actor_infos.clone();
757            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
758            for (fragment_id, change) in fragment_changes {
759                match change {
760                    CommandFragmentChanges::NewFragment {
761                        job_id,
762                        info,
763                        is_existing,
764                    } => {
765                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
766                        if !is_existing {
767                            shared_actor_writer.upsert([(&info, job_id)]);
768                        }
769                        fragment_infos
770                            .fragment_infos
771                            .try_insert(fragment_id, info)
772                            .expect("non duplicate");
773                        self.fragment_location
774                            .try_insert(fragment_id, job_id)
775                            .expect("non duplicate");
776                    }
777                    CommandFragmentChanges::Reschedule {
778                        new_actors,
779                        actor_update_vnode_bitmap,
780                        to_remove,
781                        actor_splits,
782                    } => {
783                        let (info, _) = self.fragment_mut(fragment_id);
784                        let actors = &mut info.actors;
785                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
786                            actors
787                                .get_mut(&actor_id)
788                                .expect("should exist")
789                                .vnode_bitmap = Some(new_vnodes);
790                        }
791                        for (actor_id, actor) in new_actors {
792                            actors
793                                .try_insert(actor_id as _, actor)
794                                .expect("non-duplicate");
795                        }
796                        for (actor_id, splits) in actor_splits {
797                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
798                        }
799
800                        post_apply.insert(
801                            fragment_id,
802                            PostApplyFragmentChanges::Reschedule { to_remove },
803                        );
804
805                        // info will be upserted into shared_actor_infos in post_apply stage
806                    }
807                    CommandFragmentChanges::RemoveFragment => {
808                        post_apply.insert(fragment_id, PostApplyFragmentChanges::RemoveFragment);
809                    }
810                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
811                        let mut remaining_fragment_ids: HashSet<_> =
812                            replace_map.keys().cloned().collect();
813                        let (info, _) = self.fragment_mut(fragment_id);
814                        visit_stream_node_mut(&mut info.nodes, |node| {
815                            if let NodeBody::Merge(m) = node
816                                && let Some(new_upstream_fragment_id) =
817                                    replace_map.get(&m.upstream_fragment_id)
818                            {
819                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
820                                    if cfg!(debug_assertions) {
821                                        panic!(
822                                            "duplicate upstream fragment: {:?} {:?}",
823                                            m, replace_map
824                                        );
825                                    } else {
826                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
827                                    }
828                                }
829                                m.upstream_fragment_id = *new_upstream_fragment_id;
830                            }
831                        });
832                        if cfg!(debug_assertions) {
833                            assert!(
834                                remaining_fragment_ids.is_empty(),
835                                "non-existing fragment to replace: {:?} {:?} {:?}",
836                                remaining_fragment_ids,
837                                info.nodes,
838                                replace_map
839                            );
840                        } else {
841                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
842                        }
843                    }
844                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
845                        let (info, _) = self.fragment_mut(fragment_id);
846                        let mut injected = false;
847                        visit_stream_node_mut(&mut info.nodes, |node| {
848                            if let NodeBody::UpstreamSinkUnion(u) = node {
849                                if cfg!(debug_assertions) {
850                                    let current_upstream_fragment_ids = u
851                                        .init_upstreams
852                                        .iter()
853                                        .map(|upstream| upstream.upstream_fragment_id)
854                                        .collect::<HashSet<_>>();
855                                    if current_upstream_fragment_ids
856                                        .contains(&new_upstream_info.upstream_fragment_id)
857                                    {
858                                        panic!(
859                                            "duplicate upstream fragment: {:?} {:?}",
860                                            u, new_upstream_info
861                                        );
862                                    }
863                                }
864                                u.init_upstreams.push(new_upstream_info.clone());
865                                injected = true;
866                            }
867                        });
868                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
869                    }
870                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
871                        let (info, _) = self.fragment_mut(fragment_id);
872                        let mut removed = false;
873                        visit_stream_node_mut(&mut info.nodes, |node| {
874                            if let NodeBody::UpstreamSinkUnion(u) = node {
875                                if cfg!(debug_assertions) {
876                                    let current_upstream_fragment_ids = u
877                                        .init_upstreams
878                                        .iter()
879                                        .map(|upstream| upstream.upstream_fragment_id)
880                                        .collect::<HashSet<FragmentId>>();
881                                    for drop_fragment_id in &drop_upstream_fragment_ids {
882                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
883                                        {
884                                            panic!(
885                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
886                                                u, drop_upstream_fragment_ids, drop_fragment_id
887                                            );
888                                        }
889                                    }
890                                }
891                                u.init_upstreams.retain(|upstream| {
892                                    !drop_upstream_fragment_ids
893                                        .contains(&upstream.upstream_fragment_id)
894                                });
895                                removed = true;
896                            }
897                        });
898                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
899                    }
900                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
901                        let (info, job_id) = self.fragment_mut(fragment_id);
902                        let actors = &mut info.actors;
903                        for (actor_id, splits) in actor_splits {
904                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
905                        }
906                        shared_actor_writer.upsert([(&*info, job_id)]);
907                    }
908                }
909            }
910            shared_actor_writer.finish();
911        }
912        post_apply
913    }
914
915    pub(super) fn build_edge(
916        &self,
917        command: Option<&Command>,
918        control_stream_manager: &ControlStreamManager,
919    ) -> Option<FragmentEdgeBuildResult> {
920        let (info, replace_job, new_upstream_sink) = match command {
921            None => {
922                return None;
923            }
924            Some(command) => match command {
925                Command::Flush
926                | Command::Pause
927                | Command::Resume
928                | Command::DropStreamingJobs { .. }
929                | Command::MergeSnapshotBackfillStreamingJobs(_)
930                | Command::RescheduleFragment { .. }
931                | Command::SourceChangeSplit { .. }
932                | Command::Throttle(_)
933                | Command::CreateSubscription { .. }
934                | Command::DropSubscription { .. }
935                | Command::ConnectorPropsChange(_)
936                | Command::StartFragmentBackfill { .. }
937                | Command::Refresh { .. }
938                | Command::ListFinish { .. }
939                | Command::LoadFinish { .. } => {
940                    return None;
941                }
942                Command::CreateStreamingJob { info, job_type, .. } => {
943                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
944                        new_upstream_sink,
945                    ) = job_type
946                    {
947                        Some(new_upstream_sink)
948                    } else {
949                        None
950                    };
951                    (Some(info), None, new_upstream_sink)
952                }
953                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
954            },
955        };
956        // `existing_fragment_ids` consists of
957        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
958        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
959        // if the upstream fragment previously exists
960        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
961        // if creating a new sink-into-table
962        //  - should contain the `fragment_id` of the downstream table.
963        let existing_fragment_ids = info
964            .into_iter()
965            .flat_map(|info| info.upstream_fragment_downstreams.keys())
966            .chain(replace_job.into_iter().flat_map(|replace_job| {
967                replace_job
968                    .upstream_fragment_downstreams
969                    .keys()
970                    .filter(|fragment_id| {
971                        info.map(|info| {
972                            !info
973                                .stream_job_fragments
974                                .fragments
975                                .contains_key(fragment_id)
976                        })
977                        .unwrap_or(true)
978                    })
979                    .chain(replace_job.replace_upstream.keys())
980            }))
981            .chain(
982                new_upstream_sink
983                    .into_iter()
984                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
985            )
986            .cloned();
987        let new_fragment_infos = info
988            .into_iter()
989            .flat_map(|info| {
990                info.stream_job_fragments
991                    .new_fragment_info(&info.init_split_assignment)
992            })
993            .chain(replace_job.into_iter().flat_map(|replace_job| {
994                replace_job
995                    .new_fragments
996                    .new_fragment_info(&replace_job.init_split_assignment)
997                    .chain(
998                        replace_job
999                            .auto_refresh_schema_sinks
1000                            .as_ref()
1001                            .into_iter()
1002                            .flat_map(|sinks| {
1003                                sinks.iter().map(|sink| {
1004                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
1005                                })
1006                            }),
1007                    )
1008            }))
1009            .collect_vec();
1010        let mut builder = FragmentEdgeBuilder::new(
1011            existing_fragment_ids
1012                .map(|fragment_id| self.fragment(fragment_id))
1013                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
1014            control_stream_manager,
1015        );
1016        if let Some(info) = info {
1017            builder.add_relations(&info.upstream_fragment_downstreams);
1018            builder.add_relations(&info.stream_job_fragments.downstreams);
1019        }
1020        if let Some(replace_job) = replace_job {
1021            builder.add_relations(&replace_job.upstream_fragment_downstreams);
1022            builder.add_relations(&replace_job.new_fragments.downstreams);
1023        }
1024        if let Some(new_upstream_sink) = new_upstream_sink {
1025            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
1026            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
1027            builder.add_edge(sink_fragment_id, new_sink_downstream);
1028        }
1029        if let Some(replace_job) = replace_job {
1030            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
1031                for (original_upstream_fragment_id, new_upstream_fragment_id) in
1032                    fragment_replacement
1033                {
1034                    builder.replace_upstream(
1035                        *fragment_id,
1036                        *original_upstream_fragment_id,
1037                        *new_upstream_fragment_id,
1038                    );
1039                }
1040            }
1041        }
1042        Some(builder.build())
1043    }
1044
1045    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
1046    /// remove that from the snapshot correspondingly.
1047    pub(crate) fn post_apply(
1048        &mut self,
1049        fragment_changes: HashMap<FragmentId, PostApplyFragmentChanges>,
1050    ) {
1051        let inner = self.shared_actor_infos.clone();
1052        let mut shared_actor_writer = inner.start_writer(self.database_id);
1053        {
1054            for (fragment_id, changes) in fragment_changes {
1055                match changes {
1056                    PostApplyFragmentChanges::Reschedule { to_remove } => {
1057                        let job_id = self.fragment_location[&fragment_id];
1058                        let info = self
1059                            .jobs
1060                            .get_mut(&job_id)
1061                            .expect("should exist")
1062                            .fragment_infos
1063                            .get_mut(&fragment_id)
1064                            .expect("should exist");
1065                        for actor_id in to_remove {
1066                            assert!(info.actors.remove(&actor_id).is_some());
1067                        }
1068                        shared_actor_writer.upsert([(&*info, job_id)]);
1069                    }
1070                    PostApplyFragmentChanges::RemoveFragment => {
1071                        let job_id = self
1072                            .fragment_location
1073                            .remove(&fragment_id)
1074                            .expect("should exist");
1075                        let job = self.jobs.get_mut(&job_id).expect("should exist");
1076                        let fragment = job
1077                            .fragment_infos
1078                            .remove(&fragment_id)
1079                            .expect("should exist");
1080                        shared_actor_writer.remove(&fragment);
1081                        if job.fragment_infos.is_empty() {
1082                            self.jobs.remove(&job_id).expect("should exist");
1083                        }
1084                    }
1085                }
1086            }
1087        }
1088        shared_actor_writer.finish();
1089    }
1090}
1091
1092impl InflightFragmentInfo {
1093    /// Returns actor list to collect in the target worker node.
1094    pub(crate) fn actor_ids_to_collect(
1095        infos: impl IntoIterator<Item = &Self>,
1096    ) -> HashMap<WorkerId, HashSet<ActorId>> {
1097        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
1098        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
1099            assert!(
1100                ret.entry(actor.worker_id)
1101                    .or_default()
1102                    .insert(*actor_id as _)
1103            )
1104        }
1105        ret
1106    }
1107
1108    pub fn existing_table_ids<'a>(
1109        infos: impl IntoIterator<Item = &'a Self> + 'a,
1110    ) -> impl Iterator<Item = TableId> + 'a {
1111        infos
1112            .into_iter()
1113            .flat_map(|info| info.state_table_ids.iter().cloned())
1114    }
1115
1116    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
1117        infos.into_iter().any(|fragment| {
1118            fragment
1119                .actors
1120                .values()
1121                .any(|actor| (actor.worker_id) == worker_id)
1122        })
1123    }
1124}
1125
1126impl InflightDatabaseInfo {
1127    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
1128        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
1129    }
1130
1131    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
1132        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
1133    }
1134}