risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
25use risingwave_connector::source::{SplitImpl, SplitMetaData};
26use risingwave_meta_model::WorkerId;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
29use risingwave_pb::meta::subscribe_response::Operation;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{PbSubscriptionUpstreamInfo, PbUpstreamSinkInfo};
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
36use crate::barrier::rpc::ControlStreamManager;
37use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
38use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
39use crate::controller::utils::rebuild_fragment_mapping;
40use crate::manager::NotificationManagerRef;
41use crate::model::{ActorId, FragmentId, SubscriptionId};
42
43#[derive(Debug, Clone)]
44pub struct SharedFragmentInfo {
45    pub fragment_id: FragmentId,
46    pub distribution_type: DistributionType,
47    pub actors: HashMap<ActorId, InflightActorInfo>,
48}
49
50impl From<&InflightFragmentInfo> for SharedFragmentInfo {
51    fn from(info: &InflightFragmentInfo) -> Self {
52        let InflightFragmentInfo {
53            fragment_id,
54            distribution_type,
55            actors,
56            ..
57        } = info;
58
59        Self {
60            fragment_id: *fragment_id,
61            distribution_type: *distribution_type,
62            actors: actors.clone(),
63        }
64    }
65}
66
67#[derive(Default, Debug)]
68pub struct SharedActorInfosInner {
69    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
70}
71
72impl SharedActorInfosInner {
73    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
74        self.info
75            .values()
76            .find_map(|database| database.get(&fragment_id))
77    }
78
79    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
80        self.info.values().flatten()
81    }
82}
83
84#[derive(Clone, educe::Educe)]
85#[educe(Debug)]
86pub struct SharedActorInfos {
87    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
88    #[educe(Debug(ignore))]
89    notification_manager: NotificationManagerRef,
90}
91
92impl SharedActorInfos {
93    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
94        self.inner.read()
95    }
96
97    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
98        let core = self.inner.read();
99        core.iter_over_fragments()
100            .flat_map(|(_, fragment)| {
101                fragment
102                    .actors
103                    .iter()
104                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
105            })
106            .collect()
107    }
108
109    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
110    ///
111    /// Very occasionally split removal may happen during scaling, in which case we need to
112    /// use the old splits for reallocation instead of the latest splits (which may be missing),
113    /// so that we can resolve the split removal in the next command.
114    pub fn migrate_splits_for_source_actors(
115        &self,
116        fragment_id: FragmentId,
117        prev_actor_ids: &[ActorId],
118        curr_actor_ids: &[ActorId],
119    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
120        let guard = self.read_guard();
121
122        let prev_splits = prev_actor_ids
123            .iter()
124            .flat_map(|actor_id| {
125                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
126                guard
127                    .get_fragment(fragment_id)
128                    .and_then(|info| info.actors.get(actor_id))
129                    .map(|actor| actor.splits.clone())
130                    .unwrap_or_default()
131            })
132            .map(|split| (split.id(), split))
133            .collect();
134
135        let empty_actor_splits = curr_actor_ids
136            .iter()
137            .map(|actor_id| (*actor_id, vec![]))
138            .collect();
139
140        let diff = crate::stream::source_manager::reassign_splits(
141            fragment_id,
142            empty_actor_splits,
143            &prev_splits,
144            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
145            std::default::Default::default(),
146        )
147        .unwrap_or_default();
148
149        Ok(diff)
150    }
151}
152
153impl SharedActorInfos {
154    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
155        Self {
156            inner: Arc::new(Default::default()),
157            notification_manager,
158        }
159    }
160
161    pub(super) fn remove_database(&self, database_id: DatabaseId) {
162        if let Some(database) = self.inner.write().info.remove(&database_id) {
163            let mapping = database
164                .into_values()
165                .map(|fragment| rebuild_fragment_mapping(&fragment))
166                .collect_vec();
167            if !mapping.is_empty() {
168                self.notification_manager
169                    .notify_fragment_mapping(Operation::Delete, mapping);
170            }
171        }
172    }
173
174    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
175        let database_ids: HashSet<_> = database_ids.into_iter().collect();
176
177        let mut mapping = Vec::new();
178        for fragment in self
179            .inner
180            .write()
181            .info
182            .extract_if(|database_id, _| !database_ids.contains(database_id))
183            .flat_map(|(_, fragments)| fragments.into_values())
184        {
185            mapping.push(rebuild_fragment_mapping(&fragment));
186        }
187        if !mapping.is_empty() {
188            self.notification_manager
189                .notify_fragment_mapping(Operation::Delete, mapping);
190        }
191    }
192
193    pub(super) fn recover_database(
194        &self,
195        database_id: DatabaseId,
196        fragments: impl Iterator<Item = &InflightFragmentInfo>,
197    ) {
198        let mut remaining_fragments: HashMap<_, _> = fragments
199            .map(|fragment| (fragment.fragment_id, fragment))
200            .collect();
201        // delete the fragments that exist previously, but not included in the recovered fragments
202        let mut writer = self.start_writer(database_id);
203        let database = writer.write_guard.info.entry(database_id).or_default();
204        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
205            if let Some(info) = remaining_fragments.remove(fragment_id) {
206                let info = info.into();
207                writer
208                    .updated_fragment_mapping
209                    .get_or_insert_default()
210                    .push(rebuild_fragment_mapping(&info));
211                *fragment_info = info;
212                false
213            } else {
214                true
215            }
216        }) {
217            writer
218                .deleted_fragment_mapping
219                .get_or_insert_default()
220                .push(rebuild_fragment_mapping(&fragment));
221        }
222        for (fragment_id, fragment) in remaining_fragments {
223            let info = fragment.into();
224            writer
225                .added_fragment_mapping
226                .get_or_insert_default()
227                .push(rebuild_fragment_mapping(&info));
228            database.insert(fragment_id, info);
229        }
230        writer.finish();
231    }
232
233    pub(super) fn upsert(
234        &self,
235        database_id: DatabaseId,
236        infos: impl IntoIterator<Item = &InflightFragmentInfo>,
237    ) {
238        let mut writer = self.start_writer(database_id);
239        writer.upsert(infos);
240        writer.finish();
241    }
242
243    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
244        SharedActorInfoWriter {
245            database_id,
246            write_guard: self.inner.write(),
247            notification_manager: &self.notification_manager,
248            added_fragment_mapping: None,
249            updated_fragment_mapping: None,
250            deleted_fragment_mapping: None,
251        }
252    }
253}
254
255pub(super) struct SharedActorInfoWriter<'a> {
256    database_id: DatabaseId,
257    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
258    notification_manager: &'a NotificationManagerRef,
259    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
260    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
261    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
262}
263
264impl SharedActorInfoWriter<'_> {
265    pub(super) fn upsert(&mut self, infos: impl IntoIterator<Item = &InflightFragmentInfo>) {
266        let database = self.write_guard.info.entry(self.database_id).or_default();
267        for info in infos {
268            match database.entry(info.fragment_id) {
269                Entry::Occupied(mut entry) => {
270                    let info = info.into();
271                    self.updated_fragment_mapping
272                        .get_or_insert_default()
273                        .push(rebuild_fragment_mapping(&info));
274                    entry.insert(info);
275                }
276                Entry::Vacant(entry) => {
277                    let info = info.into();
278                    self.added_fragment_mapping
279                        .get_or_insert_default()
280                        .push(rebuild_fragment_mapping(&info));
281                    entry.insert(info);
282                }
283            }
284        }
285    }
286
287    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
288        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
289            && let Some(fragment) = database.remove(&info.fragment_id)
290        {
291            self.deleted_fragment_mapping
292                .get_or_insert_default()
293                .push(rebuild_fragment_mapping(&fragment));
294        }
295    }
296
297    pub(super) fn finish(self) {
298        if let Some(mapping) = self.added_fragment_mapping {
299            self.notification_manager
300                .notify_fragment_mapping(Operation::Add, mapping);
301        }
302        if let Some(mapping) = self.updated_fragment_mapping {
303            self.notification_manager
304                .notify_fragment_mapping(Operation::Update, mapping);
305        }
306        if let Some(mapping) = self.deleted_fragment_mapping {
307            self.notification_manager
308                .notify_fragment_mapping(Operation::Delete, mapping);
309        }
310    }
311}
312
313#[derive(Debug, Clone)]
314pub(super) struct BarrierInfo {
315    pub prev_epoch: TracedEpoch,
316    pub curr_epoch: TracedEpoch,
317    pub kind: BarrierKind,
318}
319
320impl BarrierInfo {
321    pub(super) fn prev_epoch(&self) -> u64 {
322        self.prev_epoch.value().0
323    }
324}
325
326#[derive(Debug, Clone)]
327pub(crate) enum CommandFragmentChanges {
328    NewFragment {
329        job_id: TableId,
330        info: InflightFragmentInfo,
331        /// Whether the fragment already exists before added. This is used
332        /// when snapshot backfill is finished and add its fragment info
333        /// back to the database.
334        is_existing: bool,
335    },
336    AddNodeUpstream(PbUpstreamSinkInfo),
337    DropNodeUpstream(Vec<FragmentId>),
338    ReplaceNodeUpstream(
339        /// old `fragment_id` -> new `fragment_id`
340        HashMap<FragmentId, FragmentId>,
341    ),
342    Reschedule {
343        new_actors: HashMap<ActorId, InflightActorInfo>,
344        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
345        to_remove: HashSet<ActorId>,
346        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
347    },
348    RemoveFragment,
349    SplitAssignment {
350        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
351    },
352}
353
354#[derive(Default, Clone, Debug)]
355pub struct InflightSubscriptionInfo {
356    /// `mv_table_id` => `subscription_id` => retention seconds
357    pub mv_depended_subscriptions: HashMap<TableId, HashMap<SubscriptionId, u64>>,
358}
359
360#[derive(Clone, Debug)]
361pub struct InflightStreamingJobInfo {
362    pub job_id: TableId,
363    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
364}
365
366impl InflightStreamingJobInfo {
367    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
368        self.fragment_infos.values()
369    }
370
371    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
372        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
373    }
374}
375
376impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
377    type Item = &'a InflightFragmentInfo;
378
379    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
380
381    fn into_iter(self) -> Self::IntoIter {
382        self.fragment_infos()
383    }
384}
385
386#[derive(Clone, Debug)]
387pub struct InflightDatabaseInfo {
388    database_id: DatabaseId,
389    jobs: HashMap<TableId, InflightStreamingJobInfo>,
390    fragment_location: HashMap<FragmentId, TableId>,
391    pub(super) shared_actor_infos: SharedActorInfos,
392}
393
394impl InflightDatabaseInfo {
395    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
396        self.jobs.values().flat_map(|job| job.fragment_infos())
397    }
398
399    pub fn contains_job(&self, job_id: TableId) -> bool {
400        self.jobs.contains_key(&job_id)
401    }
402
403    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
404        let job_id = self.fragment_location[&fragment_id];
405        self.jobs
406            .get(&job_id)
407            .expect("should exist")
408            .fragment_infos
409            .get(&fragment_id)
410            .expect("should exist")
411    }
412
413    fn fragment_mut(&mut self, fragment_id: FragmentId) -> &mut InflightFragmentInfo {
414        let job_id = self.fragment_location[&fragment_id];
415        self.jobs
416            .get_mut(&job_id)
417            .expect("should exist")
418            .fragment_infos
419            .get_mut(&fragment_id)
420            .expect("should exist")
421    }
422
423    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
424        Self {
425            database_id,
426            jobs: Default::default(),
427            fragment_location: Default::default(),
428            shared_actor_infos,
429        }
430    }
431
432    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
433        // remove the database because it's empty.
434        shared_actor_infos.remove_database(database_id);
435        Self::empty_inner(database_id, shared_actor_infos)
436    }
437
438    pub fn recover(
439        database_id: DatabaseId,
440        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
441        shared_actor_infos: SharedActorInfos,
442    ) -> Self {
443        let mut info = Self::empty_inner(database_id, shared_actor_infos);
444        for job in jobs {
445            info.add_existing(job);
446        }
447        info
448    }
449
450    pub fn is_empty(&self) -> bool {
451        self.jobs.is_empty()
452    }
453
454    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
455        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
456            (
457                fragment_id,
458                CommandFragmentChanges::NewFragment {
459                    job_id: job.job_id,
460                    info,
461                    is_existing: true,
462                },
463            )
464        }))
465    }
466
467    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
468    /// the info correspondingly.
469    pub(crate) fn pre_apply(
470        &mut self,
471        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
472    ) {
473        self.apply_add(
474            fragment_changes
475                .iter()
476                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
477        )
478    }
479
480    fn apply_add(
481        &mut self,
482        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
483    ) {
484        {
485            let shared_infos = self.shared_actor_infos.clone();
486            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
487            for (fragment_id, change) in fragment_changes {
488                match change {
489                    CommandFragmentChanges::NewFragment {
490                        job_id,
491                        info,
492                        is_existing,
493                    } => {
494                        let fragment_infos =
495                            self.jobs
496                                .entry(job_id)
497                                .or_insert_with(|| InflightStreamingJobInfo {
498                                    job_id,
499                                    fragment_infos: Default::default(),
500                                });
501                        if !is_existing {
502                            shared_actor_writer.upsert([&info]);
503                        }
504                        fragment_infos
505                            .fragment_infos
506                            .try_insert(fragment_id, info)
507                            .expect("non duplicate");
508                        self.fragment_location
509                            .try_insert(fragment_id, job_id)
510                            .expect("non duplicate");
511                    }
512                    CommandFragmentChanges::Reschedule {
513                        new_actors,
514                        actor_update_vnode_bitmap,
515                        actor_splits,
516                        ..
517                    } => {
518                        let info = self.fragment_mut(fragment_id);
519                        let actors = &mut info.actors;
520                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
521                            actors
522                                .get_mut(&actor_id)
523                                .expect("should exist")
524                                .vnode_bitmap = Some(new_vnodes);
525                        }
526                        for (actor_id, actor) in new_actors {
527                            actors
528                                .try_insert(actor_id as _, actor)
529                                .expect("non-duplicate");
530                        }
531                        for (actor_id, splits) in actor_splits {
532                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
533                        }
534
535                        // info will be upserted into shared_actor_infos in post_apply stage
536                    }
537                    CommandFragmentChanges::RemoveFragment => {}
538                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
539                        let mut remaining_fragment_ids: HashSet<_> =
540                            replace_map.keys().cloned().collect();
541                        let info = self.fragment_mut(fragment_id);
542                        visit_stream_node_mut(&mut info.nodes, |node| {
543                            if let NodeBody::Merge(m) = node
544                                && let Some(new_upstream_fragment_id) =
545                                    replace_map.get(&m.upstream_fragment_id)
546                            {
547                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
548                                    if cfg!(debug_assertions) {
549                                        panic!(
550                                            "duplicate upstream fragment: {:?} {:?}",
551                                            m, replace_map
552                                        );
553                                    } else {
554                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
555                                    }
556                                }
557                                m.upstream_fragment_id = *new_upstream_fragment_id;
558                            }
559                        });
560                        if cfg!(debug_assertions) {
561                            assert!(
562                                remaining_fragment_ids.is_empty(),
563                                "non-existing fragment to replace: {:?} {:?} {:?}",
564                                remaining_fragment_ids,
565                                info.nodes,
566                                replace_map
567                            );
568                        } else {
569                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
570                        }
571                    }
572                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
573                        let info = self.fragment_mut(fragment_id);
574                        let mut injected = false;
575                        visit_stream_node_mut(&mut info.nodes, |node| {
576                            if let NodeBody::UpstreamSinkUnion(u) = node {
577                                if cfg!(debug_assertions) {
578                                    let current_upstream_fragment_ids = u
579                                        .init_upstreams
580                                        .iter()
581                                        .map(|upstream| upstream.upstream_fragment_id)
582                                        .collect::<HashSet<_>>();
583                                    if current_upstream_fragment_ids
584                                        .contains(&new_upstream_info.upstream_fragment_id)
585                                    {
586                                        panic!(
587                                            "duplicate upstream fragment: {:?} {:?}",
588                                            u, new_upstream_info
589                                        );
590                                    }
591                                }
592                                u.init_upstreams.push(new_upstream_info.clone());
593                                injected = true;
594                            }
595                        });
596                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
597                    }
598                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
599                        let info = self.fragment_mut(fragment_id);
600                        let mut removed = false;
601                        visit_stream_node_mut(&mut info.nodes, |node| {
602                            if let NodeBody::UpstreamSinkUnion(u) = node {
603                                if cfg!(debug_assertions) {
604                                    let current_upstream_fragment_ids = u
605                                        .init_upstreams
606                                        .iter()
607                                        .map(|upstream| upstream.upstream_fragment_id)
608                                        .collect::<HashSet<_>>();
609                                    for drop_fragment_id in &drop_upstream_fragment_ids {
610                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
611                                        {
612                                            panic!(
613                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
614                                                u, drop_upstream_fragment_ids, drop_fragment_id
615                                            );
616                                        }
617                                    }
618                                }
619                                u.init_upstreams.retain(|upstream| {
620                                    !drop_upstream_fragment_ids
621                                        .contains(&upstream.upstream_fragment_id)
622                                });
623                                removed = true;
624                            }
625                        });
626                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
627                    }
628                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
629                        let info = self.fragment_mut(fragment_id);
630                        let actors = &mut info.actors;
631                        for (actor_id, splits) in actor_splits {
632                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
633                        }
634
635                        shared_actor_writer.upsert([&*info]);
636                    }
637                }
638            }
639            shared_actor_writer.finish();
640        }
641    }
642
643    pub(super) fn build_edge(
644        &self,
645        command: Option<&Command>,
646        control_stream_manager: &ControlStreamManager,
647    ) -> Option<FragmentEdgeBuildResult> {
648        let (info, replace_job, new_upstream_sink) = match command {
649            None => {
650                return None;
651            }
652            Some(command) => match command {
653                Command::Flush
654                | Command::Pause
655                | Command::Resume
656                | Command::DropStreamingJobs { .. }
657                | Command::MergeSnapshotBackfillStreamingJobs(_)
658                | Command::RescheduleFragment { .. }
659                | Command::SourceChangeSplit { .. }
660                | Command::Throttle(_)
661                | Command::CreateSubscription { .. }
662                | Command::DropSubscription { .. }
663                | Command::ConnectorPropsChange(_)
664                | Command::StartFragmentBackfill { .. }
665                | Command::Refresh { .. }
666                | Command::LoadFinish { .. } => {
667                    return None;
668                }
669                Command::CreateStreamingJob { info, job_type, .. } => {
670                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
671                        new_upstream_sink,
672                    ) = job_type
673                    {
674                        Some(new_upstream_sink)
675                    } else {
676                        None
677                    };
678                    (Some(info), None, new_upstream_sink)
679                }
680                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
681            },
682        };
683        // `existing_fragment_ids` consists of
684        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
685        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
686        // if the upstream fragment previously exists
687        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
688        // if creating a new sink-into-table
689        //  - should contain the `fragment_id` of the downstream table.
690        let existing_fragment_ids = info
691            .into_iter()
692            .flat_map(|info| info.upstream_fragment_downstreams.keys())
693            .chain(replace_job.into_iter().flat_map(|replace_job| {
694                replace_job
695                    .upstream_fragment_downstreams
696                    .keys()
697                    .filter(|fragment_id| {
698                        info.map(|info| {
699                            !info
700                                .stream_job_fragments
701                                .fragments
702                                .contains_key(fragment_id)
703                        })
704                        .unwrap_or(true)
705                    })
706                    .chain(replace_job.replace_upstream.keys())
707            }))
708            .chain(
709                new_upstream_sink
710                    .into_iter()
711                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
712            )
713            .cloned();
714        let new_fragment_infos = info
715            .into_iter()
716            .flat_map(|info| {
717                info.stream_job_fragments
718                    .new_fragment_info(&info.init_split_assignment)
719            })
720            .chain(replace_job.into_iter().flat_map(|replace_job| {
721                replace_job
722                    .new_fragments
723                    .new_fragment_info(&replace_job.init_split_assignment)
724                    .chain(
725                        replace_job
726                            .auto_refresh_schema_sinks
727                            .as_ref()
728                            .into_iter()
729                            .flat_map(|sinks| {
730                                sinks.iter().map(|sink| {
731                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
732                                })
733                            }),
734                    )
735            }))
736            .collect_vec();
737        let mut builder = FragmentEdgeBuilder::new(
738            existing_fragment_ids
739                .map(|fragment_id| self.fragment(fragment_id))
740                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
741            control_stream_manager,
742        );
743        if let Some(info) = info {
744            builder.add_relations(&info.upstream_fragment_downstreams);
745            builder.add_relations(&info.stream_job_fragments.downstreams);
746        }
747        if let Some(replace_job) = replace_job {
748            builder.add_relations(&replace_job.upstream_fragment_downstreams);
749            builder.add_relations(&replace_job.new_fragments.downstreams);
750        }
751        if let Some(new_upstream_sink) = new_upstream_sink {
752            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
753            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
754            builder.add_edge(sink_fragment_id, new_sink_downstream);
755        }
756        if let Some(replace_job) = replace_job {
757            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
758                for (original_upstream_fragment_id, new_upstream_fragment_id) in
759                    fragment_replacement
760                {
761                    builder.replace_upstream(
762                        *fragment_id,
763                        *original_upstream_fragment_id,
764                        *new_upstream_fragment_id,
765                    );
766                }
767            }
768        }
769        Some(builder.build())
770    }
771}
772
773impl InflightSubscriptionInfo {
774    pub fn pre_apply(&mut self, command: &Command) {
775        if let Command::CreateSubscription {
776            subscription_id,
777            upstream_mv_table_id,
778            retention_second,
779        } = command
780            && let Some(prev_retiontion) = self
781                .mv_depended_subscriptions
782                .entry(*upstream_mv_table_id)
783                .or_default()
784                .insert(*subscription_id, *retention_second)
785        {
786            warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
787        }
788    }
789}
790
791impl<'a> IntoIterator for &'a InflightSubscriptionInfo {
792    type Item = PbSubscriptionUpstreamInfo;
793
794    type IntoIter = impl Iterator<Item = PbSubscriptionUpstreamInfo> + 'a;
795
796    fn into_iter(self) -> Self::IntoIter {
797        self.mv_depended_subscriptions
798            .iter()
799            .flat_map(|(table_id, subscriptions)| {
800                subscriptions
801                    .keys()
802                    .map(|subscriber_id| PbSubscriptionUpstreamInfo {
803                        subscriber_id: *subscriber_id,
804                        upstream_mv_table_id: table_id.table_id,
805                    })
806            })
807    }
808}
809
810impl InflightDatabaseInfo {
811    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
812    /// remove that from the snapshot correspondingly.
813    pub(crate) fn post_apply(
814        &mut self,
815        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
816    ) {
817        let inner = self.shared_actor_infos.clone();
818        let mut shared_actor_writer = inner.start_writer(self.database_id);
819        {
820            for (fragment_id, changes) in fragment_changes {
821                match changes {
822                    CommandFragmentChanges::NewFragment { .. } => {}
823                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
824                        let job_id = self.fragment_location[fragment_id];
825                        let info = self
826                            .jobs
827                            .get_mut(&job_id)
828                            .expect("should exist")
829                            .fragment_infos
830                            .get_mut(fragment_id)
831                            .expect("should exist");
832                        for actor_id in to_remove {
833                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
834                        }
835                        shared_actor_writer.upsert([&*info]);
836                    }
837                    CommandFragmentChanges::RemoveFragment => {
838                        let job_id = self
839                            .fragment_location
840                            .remove(fragment_id)
841                            .expect("should exist");
842                        let job = self.jobs.get_mut(&job_id).expect("should exist");
843                        let fragment = job
844                            .fragment_infos
845                            .remove(fragment_id)
846                            .expect("should exist");
847                        shared_actor_writer.remove(&fragment);
848                        if job.fragment_infos.is_empty() {
849                            self.jobs.remove(&job_id).expect("should exist");
850                        }
851                    }
852                    CommandFragmentChanges::ReplaceNodeUpstream(_)
853                    | CommandFragmentChanges::AddNodeUpstream(_)
854                    | CommandFragmentChanges::DropNodeUpstream(_)
855                    | CommandFragmentChanges::SplitAssignment { .. } => {}
856                }
857            }
858        }
859        shared_actor_writer.finish();
860    }
861}
862
863impl InflightSubscriptionInfo {
864    pub fn post_apply(&mut self, command: &Command) {
865        if let Command::DropSubscription {
866            subscription_id,
867            upstream_mv_table_id,
868        } = command
869        {
870            let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
871                Some(subscriptions) => {
872                    let removed = subscriptions.remove(subscription_id).is_some();
873                    if removed && subscriptions.is_empty() {
874                        self.mv_depended_subscriptions.remove(upstream_mv_table_id);
875                    }
876                    removed
877                }
878                None => false,
879            };
880            if !removed {
881                warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
882            }
883        }
884    }
885}
886
887impl InflightFragmentInfo {
888    /// Returns actor list to collect in the target worker node.
889    pub(crate) fn actor_ids_to_collect(
890        infos: impl IntoIterator<Item = &Self>,
891    ) -> HashMap<WorkerId, HashSet<ActorId>> {
892        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
893        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
894            assert!(
895                ret.entry(actor.worker_id)
896                    .or_default()
897                    .insert(*actor_id as _)
898            )
899        }
900        ret
901    }
902
903    pub fn existing_table_ids<'a>(
904        infos: impl IntoIterator<Item = &'a Self> + 'a,
905    ) -> impl Iterator<Item = TableId> + 'a {
906        infos
907            .into_iter()
908            .flat_map(|info| info.state_table_ids.iter().cloned())
909    }
910
911    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
912        infos.into_iter().any(|fragment| {
913            fragment
914                .actors
915                .values()
916                .any(|actor| (actor.worker_id) == worker_id)
917        })
918    }
919
920    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
921        infos
922            .into_iter()
923            .flat_map(|info| info.actors.values())
924            .map(|actor| actor.worker_id)
925            .collect()
926    }
927}
928
929impl InflightDatabaseInfo {
930    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
931        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
932    }
933
934    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
935        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
936    }
937}