risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
25use risingwave_connector::source::{SplitImpl, SplitMetaData};
26use risingwave_meta_model::fragment::DistributionType;
27use risingwave_meta_model::{ObjectId, WorkerId};
28use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
29use risingwave_pb::meta::subscribe_response::Operation;
30use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
31use risingwave_pb::stream_plan::stream_node::NodeBody;
32use tracing::warn;
33
34use crate::MetaResult;
35use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
36use crate::barrier::rpc::ControlStreamManager;
37use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
38use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
39use crate::controller::utils::rebuild_fragment_mapping;
40use crate::manager::NotificationManagerRef;
41use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
42
43#[derive(Debug, Clone)]
44pub struct SharedFragmentInfo {
45    pub fragment_id: FragmentId,
46    pub job_id: ObjectId,
47    pub distribution_type: DistributionType,
48    pub actors: HashMap<ActorId, InflightActorInfo>,
49    pub vnode_count: usize,
50    pub fragment_type_mask: FragmentTypeMask,
51}
52
53impl From<(&InflightFragmentInfo, TableId)> for SharedFragmentInfo {
54    fn from(pair: (&InflightFragmentInfo, TableId)) -> Self {
55        let (info, job_id) = pair;
56
57        let InflightFragmentInfo {
58            fragment_id,
59            distribution_type,
60            fragment_type_mask,
61            actors,
62            vnode_count,
63            ..
64        } = info;
65
66        Self {
67            fragment_id: *fragment_id,
68            job_id: job_id.table_id() as _,
69            distribution_type: *distribution_type,
70            fragment_type_mask: *fragment_type_mask,
71            actors: actors.clone(),
72            vnode_count: *vnode_count,
73        }
74    }
75}
76
77#[derive(Default, Debug)]
78pub struct SharedActorInfosInner {
79    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
80}
81
82impl SharedActorInfosInner {
83    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
84        self.info
85            .values()
86            .find_map(|database| database.get(&fragment_id))
87    }
88
89    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
90        self.info.values().flatten()
91    }
92}
93
94#[derive(Clone, educe::Educe)]
95#[educe(Debug)]
96pub struct SharedActorInfos {
97    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
98    #[educe(Debug(ignore))]
99    notification_manager: NotificationManagerRef,
100}
101
102impl SharedActorInfos {
103    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
104        self.inner.read()
105    }
106
107    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
108        let core = self.inner.read();
109        core.iter_over_fragments()
110            .flat_map(|(_, fragment)| {
111                fragment
112                    .actors
113                    .iter()
114                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
115            })
116            .collect()
117    }
118
119    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
120    ///
121    /// Very occasionally split removal may happen during scaling, in which case we need to
122    /// use the old splits for reallocation instead of the latest splits (which may be missing),
123    /// so that we can resolve the split removal in the next command.
124    pub fn migrate_splits_for_source_actors(
125        &self,
126        fragment_id: FragmentId,
127        prev_actor_ids: &[ActorId],
128        curr_actor_ids: &[ActorId],
129    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
130        let guard = self.read_guard();
131
132        let prev_splits = prev_actor_ids
133            .iter()
134            .flat_map(|actor_id| {
135                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
136                guard
137                    .get_fragment(fragment_id)
138                    .and_then(|info| info.actors.get(actor_id))
139                    .map(|actor| actor.splits.clone())
140                    .unwrap_or_default()
141            })
142            .map(|split| (split.id(), split))
143            .collect();
144
145        let empty_actor_splits = curr_actor_ids
146            .iter()
147            .map(|actor_id| (*actor_id, vec![]))
148            .collect();
149
150        let diff = crate::stream::source_manager::reassign_splits(
151            fragment_id,
152            empty_actor_splits,
153            &prev_splits,
154            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
155            std::default::Default::default(),
156        )
157        .unwrap_or_default();
158
159        Ok(diff)
160    }
161}
162
163impl SharedActorInfos {
164    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
165        Self {
166            inner: Arc::new(Default::default()),
167            notification_manager,
168        }
169    }
170
171    pub(super) fn remove_database(&self, database_id: DatabaseId) {
172        if let Some(database) = self.inner.write().info.remove(&database_id) {
173            let mapping = database
174                .into_values()
175                .map(|fragment| rebuild_fragment_mapping(&fragment))
176                .collect_vec();
177            if !mapping.is_empty() {
178                self.notification_manager
179                    .notify_fragment_mapping(Operation::Delete, mapping);
180            }
181        }
182    }
183
184    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
185        let database_ids: HashSet<_> = database_ids.into_iter().collect();
186
187        let mut mapping = Vec::new();
188        for fragment in self
189            .inner
190            .write()
191            .info
192            .extract_if(|database_id, _| !database_ids.contains(database_id))
193            .flat_map(|(_, fragments)| fragments.into_values())
194        {
195            mapping.push(rebuild_fragment_mapping(&fragment));
196        }
197        if !mapping.is_empty() {
198            self.notification_manager
199                .notify_fragment_mapping(Operation::Delete, mapping);
200        }
201    }
202
203    pub(super) fn recover_database(
204        &self,
205        database_id: DatabaseId,
206        fragments: impl Iterator<Item = (&InflightFragmentInfo, TableId)>,
207    ) {
208        let mut remaining_fragments: HashMap<_, _> = fragments
209            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
210            .collect();
211        // delete the fragments that exist previously, but not included in the recovered fragments
212        let mut writer = self.start_writer(database_id);
213        let database = writer.write_guard.info.entry(database_id).or_default();
214        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
215            if let Some(info) = remaining_fragments.remove(fragment_id) {
216                let info = info.into();
217                writer
218                    .updated_fragment_mapping
219                    .get_or_insert_default()
220                    .push(rebuild_fragment_mapping(&info));
221                *fragment_info = info;
222                false
223            } else {
224                true
225            }
226        }) {
227            writer
228                .deleted_fragment_mapping
229                .get_or_insert_default()
230                .push(rebuild_fragment_mapping(&fragment));
231        }
232        for (fragment_id, info) in remaining_fragments {
233            let info = info.into();
234            writer
235                .added_fragment_mapping
236                .get_or_insert_default()
237                .push(rebuild_fragment_mapping(&info));
238            database.insert(fragment_id, info);
239        }
240        writer.finish();
241    }
242
243    pub(super) fn upsert(
244        &self,
245        database_id: DatabaseId,
246        infos: impl IntoIterator<Item = (&InflightFragmentInfo, TableId)>,
247    ) {
248        let mut writer = self.start_writer(database_id);
249        writer.upsert(infos);
250        writer.finish();
251    }
252
253    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
254        SharedActorInfoWriter {
255            database_id,
256            write_guard: self.inner.write(),
257            notification_manager: &self.notification_manager,
258            added_fragment_mapping: None,
259            updated_fragment_mapping: None,
260            deleted_fragment_mapping: None,
261        }
262    }
263}
264
265pub(super) struct SharedActorInfoWriter<'a> {
266    database_id: DatabaseId,
267    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
268    notification_manager: &'a NotificationManagerRef,
269    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
270    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
271    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
272}
273
274impl SharedActorInfoWriter<'_> {
275    pub(super) fn upsert(
276        &mut self,
277        infos: impl IntoIterator<Item = (&InflightFragmentInfo, TableId)>,
278    ) {
279        let database = self.write_guard.info.entry(self.database_id).or_default();
280        for info @ (fragment, _) in infos {
281            match database.entry(fragment.fragment_id) {
282                Entry::Occupied(mut entry) => {
283                    let info = info.into();
284                    self.updated_fragment_mapping
285                        .get_or_insert_default()
286                        .push(rebuild_fragment_mapping(&info));
287                    entry.insert(info);
288                }
289                Entry::Vacant(entry) => {
290                    let info = info.into();
291                    self.added_fragment_mapping
292                        .get_or_insert_default()
293                        .push(rebuild_fragment_mapping(&info));
294                    entry.insert(info);
295                }
296            }
297        }
298    }
299
300    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
301        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
302            && let Some(fragment) = database.remove(&info.fragment_id)
303        {
304            self.deleted_fragment_mapping
305                .get_or_insert_default()
306                .push(rebuild_fragment_mapping(&fragment));
307        }
308    }
309
310    pub(super) fn finish(self) {
311        if let Some(mapping) = self.added_fragment_mapping {
312            self.notification_manager
313                .notify_fragment_mapping(Operation::Add, mapping);
314        }
315        if let Some(mapping) = self.updated_fragment_mapping {
316            self.notification_manager
317                .notify_fragment_mapping(Operation::Update, mapping);
318        }
319        if let Some(mapping) = self.deleted_fragment_mapping {
320            self.notification_manager
321                .notify_fragment_mapping(Operation::Delete, mapping);
322        }
323    }
324}
325
326#[derive(Debug, Clone)]
327pub(super) struct BarrierInfo {
328    pub prev_epoch: TracedEpoch,
329    pub curr_epoch: TracedEpoch,
330    pub kind: BarrierKind,
331}
332
333impl BarrierInfo {
334    pub(super) fn prev_epoch(&self) -> u64 {
335        self.prev_epoch.value().0
336    }
337}
338
339#[derive(Debug, Clone)]
340pub(crate) enum CommandFragmentChanges {
341    NewFragment {
342        job_id: TableId,
343        info: InflightFragmentInfo,
344        /// Whether the fragment already exists before added. This is used
345        /// when snapshot backfill is finished and add its fragment info
346        /// back to the database.
347        is_existing: bool,
348    },
349    AddNodeUpstream(PbUpstreamSinkInfo),
350    DropNodeUpstream(Vec<FragmentId>),
351    ReplaceNodeUpstream(
352        /// old `fragment_id` -> new `fragment_id`
353        HashMap<FragmentId, FragmentId>,
354    ),
355    Reschedule {
356        new_actors: HashMap<ActorId, InflightActorInfo>,
357        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
358        to_remove: HashSet<ActorId>,
359        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
360    },
361    RemoveFragment,
362    SplitAssignment {
363        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
364    },
365}
366
367#[derive(Clone, Debug)]
368pub enum SubscriberType {
369    Subscription(u64),
370    SnapshotBackfill,
371}
372
373#[derive(Clone, Debug)]
374pub struct InflightStreamingJobInfo {
375    pub job_id: TableId,
376    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
377    pub subscribers: HashMap<u32, SubscriberType>,
378}
379
380impl InflightStreamingJobInfo {
381    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
382        self.fragment_infos.values()
383    }
384
385    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
386        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
387    }
388
389    pub fn snapshot_backfill_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
390        self.fragment_infos
391            .values()
392            .filter(|fragment| {
393                fragment
394                    .fragment_type_mask
395                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
396            })
397            .flat_map(|fragment| fragment.actors.keys().copied())
398    }
399
400    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
401        StreamJobFragments::tracking_progress_actor_ids_impl(
402            self.fragment_infos
403                .values()
404                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
405        )
406    }
407}
408
409impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
410    type Item = &'a InflightFragmentInfo;
411
412    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
413
414    fn into_iter(self) -> Self::IntoIter {
415        self.fragment_infos()
416    }
417}
418
419#[derive(Clone, Debug)]
420pub struct InflightDatabaseInfo {
421    database_id: DatabaseId,
422    jobs: HashMap<TableId, InflightStreamingJobInfo>,
423    fragment_location: HashMap<FragmentId, TableId>,
424    pub(super) shared_actor_infos: SharedActorInfos,
425}
426
427impl InflightDatabaseInfo {
428    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
429        self.jobs.values().flat_map(|job| job.fragment_infos())
430    }
431
432    pub fn contains_job(&self, job_id: TableId) -> bool {
433        self.jobs.contains_key(&job_id)
434    }
435
436    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
437        let job_id = self.fragment_location[&fragment_id];
438        self.jobs
439            .get(&job_id)
440            .expect("should exist")
441            .fragment_infos
442            .get(&fragment_id)
443            .expect("should exist")
444    }
445
446    pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
447        let job_id = self.fragment_location[&fragment_id];
448        self.jobs[&job_id].subscribers.keys().copied()
449    }
450
451    pub fn job_subscribers(&self, job_id: TableId) -> impl Iterator<Item = u32> + '_ {
452        self.jobs[&job_id].subscribers.keys().copied()
453    }
454
455    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
456        self.jobs
457            .iter()
458            .filter_map(|(job_id, info)| {
459                info.subscribers
460                    .values()
461                    .filter_map(|subscriber| match subscriber {
462                        SubscriberType::Subscription(retention) => Some(*retention),
463                        SubscriberType::SnapshotBackfill => None,
464                    })
465                    .max()
466                    .map(|max_subscription| (*job_id, max_subscription))
467            })
468            .collect()
469    }
470
471    pub fn register_subscriber(
472        &mut self,
473        job_id: TableId,
474        subscriber_id: u32,
475        subscriber: SubscriberType,
476    ) {
477        self.jobs
478            .get_mut(&job_id)
479            .expect("should exist")
480            .subscribers
481            .try_insert(subscriber_id, subscriber)
482            .expect("non duplicate");
483    }
484
485    pub fn unregister_subscriber(
486        &mut self,
487        job_id: TableId,
488        subscriber_id: u32,
489    ) -> Option<SubscriberType> {
490        self.jobs
491            .get_mut(&job_id)
492            .expect("should exist")
493            .subscribers
494            .remove(&subscriber_id)
495    }
496
497    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, TableId) {
498        let job_id = self.fragment_location[&fragment_id];
499        let fragment = self
500            .jobs
501            .get_mut(&job_id)
502            .expect("should exist")
503            .fragment_infos
504            .get_mut(&fragment_id)
505            .expect("should exist");
506        (fragment, job_id)
507    }
508
509    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
510        Self {
511            database_id,
512            jobs: Default::default(),
513            fragment_location: Default::default(),
514            shared_actor_infos,
515        }
516    }
517
518    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
519        // remove the database because it's empty.
520        shared_actor_infos.remove_database(database_id);
521        Self::empty_inner(database_id, shared_actor_infos)
522    }
523
524    pub fn recover(
525        database_id: DatabaseId,
526        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
527        shared_actor_infos: SharedActorInfos,
528    ) -> Self {
529        let mut info = Self::empty_inner(database_id, shared_actor_infos);
530        for job in jobs {
531            info.add_existing(job);
532        }
533        info
534    }
535
536    pub fn is_empty(&self) -> bool {
537        self.jobs.is_empty()
538    }
539
540    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
541        self.jobs
542            .try_insert(
543                job.job_id,
544                InflightStreamingJobInfo {
545                    job_id: job.job_id,
546                    subscribers: job.subscribers,
547                    fragment_infos: Default::default(), // fill in later in apply_add
548                },
549            )
550            .expect("non-duplicate");
551        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
552            (
553                fragment_id,
554                CommandFragmentChanges::NewFragment {
555                    job_id: job.job_id,
556                    info,
557                    is_existing: true,
558                },
559            )
560        }))
561    }
562
563    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
564    /// the info correspondingly.
565    pub(crate) fn pre_apply(
566        &mut self,
567        new_job_id: Option<TableId>,
568        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
569    ) {
570        if let Some(job_id) = new_job_id {
571            self.jobs
572                .try_insert(
573                    job_id,
574                    InflightStreamingJobInfo {
575                        job_id,
576                        fragment_infos: Default::default(),
577                        subscribers: Default::default(), // no subscriber for newly create job
578                    },
579                )
580                .expect("non-duplicate");
581        }
582        self.apply_add(
583            fragment_changes
584                .iter()
585                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
586        )
587    }
588
589    fn apply_add(
590        &mut self,
591        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
592    ) {
593        {
594            let shared_infos = self.shared_actor_infos.clone();
595            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
596            for (fragment_id, change) in fragment_changes {
597                match change {
598                    CommandFragmentChanges::NewFragment {
599                        job_id,
600                        info,
601                        is_existing,
602                    } => {
603                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
604                        if !is_existing {
605                            shared_actor_writer.upsert([(&info, job_id)]);
606                        }
607                        fragment_infos
608                            .fragment_infos
609                            .try_insert(fragment_id, info)
610                            .expect("non duplicate");
611                        self.fragment_location
612                            .try_insert(fragment_id, job_id)
613                            .expect("non duplicate");
614                    }
615                    CommandFragmentChanges::Reschedule {
616                        new_actors,
617                        actor_update_vnode_bitmap,
618                        actor_splits,
619                        ..
620                    } => {
621                        let (info, _) = self.fragment_mut(fragment_id);
622                        let actors = &mut info.actors;
623                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
624                            actors
625                                .get_mut(&actor_id)
626                                .expect("should exist")
627                                .vnode_bitmap = Some(new_vnodes);
628                        }
629                        for (actor_id, actor) in new_actors {
630                            actors
631                                .try_insert(actor_id as _, actor)
632                                .expect("non-duplicate");
633                        }
634                        for (actor_id, splits) in actor_splits {
635                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
636                        }
637
638                        // info will be upserted into shared_actor_infos in post_apply stage
639                    }
640                    CommandFragmentChanges::RemoveFragment => {}
641                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
642                        let mut remaining_fragment_ids: HashSet<_> =
643                            replace_map.keys().cloned().collect();
644                        let (info, _) = self.fragment_mut(fragment_id);
645                        visit_stream_node_mut(&mut info.nodes, |node| {
646                            if let NodeBody::Merge(m) = node
647                                && let Some(new_upstream_fragment_id) =
648                                    replace_map.get(&m.upstream_fragment_id)
649                            {
650                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
651                                    if cfg!(debug_assertions) {
652                                        panic!(
653                                            "duplicate upstream fragment: {:?} {:?}",
654                                            m, replace_map
655                                        );
656                                    } else {
657                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
658                                    }
659                                }
660                                m.upstream_fragment_id = *new_upstream_fragment_id;
661                            }
662                        });
663                        if cfg!(debug_assertions) {
664                            assert!(
665                                remaining_fragment_ids.is_empty(),
666                                "non-existing fragment to replace: {:?} {:?} {:?}",
667                                remaining_fragment_ids,
668                                info.nodes,
669                                replace_map
670                            );
671                        } else {
672                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
673                        }
674                    }
675                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
676                        let (info, _) = self.fragment_mut(fragment_id);
677                        let mut injected = false;
678                        visit_stream_node_mut(&mut info.nodes, |node| {
679                            if let NodeBody::UpstreamSinkUnion(u) = node {
680                                if cfg!(debug_assertions) {
681                                    let current_upstream_fragment_ids = u
682                                        .init_upstreams
683                                        .iter()
684                                        .map(|upstream| upstream.upstream_fragment_id)
685                                        .collect::<HashSet<_>>();
686                                    if current_upstream_fragment_ids
687                                        .contains(&new_upstream_info.upstream_fragment_id)
688                                    {
689                                        panic!(
690                                            "duplicate upstream fragment: {:?} {:?}",
691                                            u, new_upstream_info
692                                        );
693                                    }
694                                }
695                                u.init_upstreams.push(new_upstream_info.clone());
696                                injected = true;
697                            }
698                        });
699                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
700                    }
701                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
702                        let (info, _) = self.fragment_mut(fragment_id);
703                        let mut removed = false;
704                        visit_stream_node_mut(&mut info.nodes, |node| {
705                            if let NodeBody::UpstreamSinkUnion(u) = node {
706                                if cfg!(debug_assertions) {
707                                    let current_upstream_fragment_ids = u
708                                        .init_upstreams
709                                        .iter()
710                                        .map(|upstream| upstream.upstream_fragment_id)
711                                        .collect::<HashSet<_>>();
712                                    for drop_fragment_id in &drop_upstream_fragment_ids {
713                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
714                                        {
715                                            panic!(
716                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
717                                                u, drop_upstream_fragment_ids, drop_fragment_id
718                                            );
719                                        }
720                                    }
721                                }
722                                u.init_upstreams.retain(|upstream| {
723                                    !drop_upstream_fragment_ids
724                                        .contains(&upstream.upstream_fragment_id)
725                                });
726                                removed = true;
727                            }
728                        });
729                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
730                    }
731                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
732                        let (info, job_id) = self.fragment_mut(fragment_id);
733                        let actors = &mut info.actors;
734                        for (actor_id, splits) in actor_splits {
735                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
736                        }
737                        shared_actor_writer.upsert([(&*info, job_id)]);
738                    }
739                }
740            }
741            shared_actor_writer.finish();
742        }
743    }
744
745    pub(super) fn build_edge(
746        &self,
747        command: Option<&Command>,
748        control_stream_manager: &ControlStreamManager,
749    ) -> Option<FragmentEdgeBuildResult> {
750        let (info, replace_job, new_upstream_sink) = match command {
751            None => {
752                return None;
753            }
754            Some(command) => match command {
755                Command::Flush
756                | Command::Pause
757                | Command::Resume
758                | Command::DropStreamingJobs { .. }
759                | Command::MergeSnapshotBackfillStreamingJobs(_)
760                | Command::RescheduleFragment { .. }
761                | Command::SourceChangeSplit { .. }
762                | Command::Throttle(_)
763                | Command::CreateSubscription { .. }
764                | Command::DropSubscription { .. }
765                | Command::ConnectorPropsChange(_)
766                | Command::StartFragmentBackfill { .. }
767                | Command::Refresh { .. }
768                | Command::ListFinish { .. }
769                | Command::LoadFinish { .. } => {
770                    return None;
771                }
772                Command::CreateStreamingJob { info, job_type, .. } => {
773                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
774                        new_upstream_sink,
775                    ) = job_type
776                    {
777                        Some(new_upstream_sink)
778                    } else {
779                        None
780                    };
781                    (Some(info), None, new_upstream_sink)
782                }
783                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
784            },
785        };
786        // `existing_fragment_ids` consists of
787        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
788        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
789        // if the upstream fragment previously exists
790        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
791        // if creating a new sink-into-table
792        //  - should contain the `fragment_id` of the downstream table.
793        let existing_fragment_ids = info
794            .into_iter()
795            .flat_map(|info| info.upstream_fragment_downstreams.keys())
796            .chain(replace_job.into_iter().flat_map(|replace_job| {
797                replace_job
798                    .upstream_fragment_downstreams
799                    .keys()
800                    .filter(|fragment_id| {
801                        info.map(|info| {
802                            !info
803                                .stream_job_fragments
804                                .fragments
805                                .contains_key(fragment_id)
806                        })
807                        .unwrap_or(true)
808                    })
809                    .chain(replace_job.replace_upstream.keys())
810            }))
811            .chain(
812                new_upstream_sink
813                    .into_iter()
814                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
815            )
816            .cloned();
817        let new_fragment_infos = info
818            .into_iter()
819            .flat_map(|info| {
820                info.stream_job_fragments
821                    .new_fragment_info(&info.init_split_assignment)
822            })
823            .chain(replace_job.into_iter().flat_map(|replace_job| {
824                replace_job
825                    .new_fragments
826                    .new_fragment_info(&replace_job.init_split_assignment)
827                    .chain(
828                        replace_job
829                            .auto_refresh_schema_sinks
830                            .as_ref()
831                            .into_iter()
832                            .flat_map(|sinks| {
833                                sinks.iter().map(|sink| {
834                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
835                                })
836                            }),
837                    )
838            }))
839            .collect_vec();
840        let mut builder = FragmentEdgeBuilder::new(
841            existing_fragment_ids
842                .map(|fragment_id| self.fragment(fragment_id))
843                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
844            control_stream_manager,
845        );
846        if let Some(info) = info {
847            builder.add_relations(&info.upstream_fragment_downstreams);
848            builder.add_relations(&info.stream_job_fragments.downstreams);
849        }
850        if let Some(replace_job) = replace_job {
851            builder.add_relations(&replace_job.upstream_fragment_downstreams);
852            builder.add_relations(&replace_job.new_fragments.downstreams);
853        }
854        if let Some(new_upstream_sink) = new_upstream_sink {
855            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
856            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
857            builder.add_edge(sink_fragment_id, new_sink_downstream);
858        }
859        if let Some(replace_job) = replace_job {
860            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
861                for (original_upstream_fragment_id, new_upstream_fragment_id) in
862                    fragment_replacement
863                {
864                    builder.replace_upstream(
865                        *fragment_id,
866                        *original_upstream_fragment_id,
867                        *new_upstream_fragment_id,
868                    );
869                }
870            }
871        }
872        Some(builder.build())
873    }
874
875    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
876    /// remove that from the snapshot correspondingly.
877    pub(crate) fn post_apply(
878        &mut self,
879        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
880    ) {
881        let inner = self.shared_actor_infos.clone();
882        let mut shared_actor_writer = inner.start_writer(self.database_id);
883        {
884            for (fragment_id, changes) in fragment_changes {
885                match changes {
886                    CommandFragmentChanges::NewFragment { .. } => {}
887                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
888                        let job_id = self.fragment_location[fragment_id];
889                        let info = self
890                            .jobs
891                            .get_mut(&job_id)
892                            .expect("should exist")
893                            .fragment_infos
894                            .get_mut(fragment_id)
895                            .expect("should exist");
896                        for actor_id in to_remove {
897                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
898                        }
899                        shared_actor_writer.upsert([(&*info, job_id)]);
900                    }
901                    CommandFragmentChanges::RemoveFragment => {
902                        let job_id = self
903                            .fragment_location
904                            .remove(fragment_id)
905                            .expect("should exist");
906                        let job = self.jobs.get_mut(&job_id).expect("should exist");
907                        let fragment = job
908                            .fragment_infos
909                            .remove(fragment_id)
910                            .expect("should exist");
911                        shared_actor_writer.remove(&fragment);
912                        if job.fragment_infos.is_empty() {
913                            self.jobs.remove(&job_id).expect("should exist");
914                        }
915                    }
916                    CommandFragmentChanges::ReplaceNodeUpstream(_)
917                    | CommandFragmentChanges::AddNodeUpstream(_)
918                    | CommandFragmentChanges::DropNodeUpstream(_)
919                    | CommandFragmentChanges::SplitAssignment { .. } => {}
920                }
921            }
922        }
923        shared_actor_writer.finish();
924    }
925}
926
927impl InflightFragmentInfo {
928    /// Returns actor list to collect in the target worker node.
929    pub(crate) fn actor_ids_to_collect(
930        infos: impl IntoIterator<Item = &Self>,
931    ) -> HashMap<WorkerId, HashSet<ActorId>> {
932        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
933        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
934            assert!(
935                ret.entry(actor.worker_id)
936                    .or_default()
937                    .insert(*actor_id as _)
938            )
939        }
940        ret
941    }
942
943    pub fn existing_table_ids<'a>(
944        infos: impl IntoIterator<Item = &'a Self> + 'a,
945    ) -> impl Iterator<Item = TableId> + 'a {
946        infos
947            .into_iter()
948            .flat_map(|info| info.state_table_ids.iter().cloned())
949    }
950
951    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
952        infos.into_iter().any(|fragment| {
953            fragment
954                .actors
955                .values()
956                .any(|actor| (actor.worker_id) == worker_id)
957        })
958    }
959
960    pub(crate) fn workers(infos: impl IntoIterator<Item = &Self>) -> HashSet<WorkerId> {
961        infos
962            .into_iter()
963            .flat_map(|info| info.actors.values())
964            .map(|actor| actor.worker_id)
965            .collect()
966    }
967}
968
969impl InflightDatabaseInfo {
970    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
971        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
972    }
973
974    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
975        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
976    }
977}