risingwave_meta/barrier/
info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18
19use itertools::Itertools;
20use parking_lot::RawRwLock;
21use parking_lot::lock_api::RwLockReadGuard;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, FragmentTypeMask, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::util::stream_graph_visitor::visit_stream_node_mut;
26use risingwave_connector::source::{SplitImpl, SplitMetaData};
27use risingwave_meta_model::WorkerId;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_pb::meta::PbFragmentWorkerSlotMapping;
30use risingwave_pb::meta::subscribe_response::Operation;
31use risingwave_pb::stream_plan::PbUpstreamSinkInfo;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use tracing::warn;
34
35use crate::MetaResult;
36use crate::barrier::edge_builder::{FragmentEdgeBuildResult, FragmentEdgeBuilder};
37use crate::barrier::rpc::ControlStreamManager;
38use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::utils::rebuild_fragment_mapping;
41use crate::manager::NotificationManagerRef;
42use crate::model::{ActorId, BackfillUpstreamType, FragmentId, StreamJobFragments};
43
44#[derive(Debug, Clone)]
45pub struct SharedFragmentInfo {
46    pub fragment_id: FragmentId,
47    pub job_id: JobId,
48    pub distribution_type: DistributionType,
49    pub actors: HashMap<ActorId, InflightActorInfo>,
50    pub vnode_count: usize,
51    pub fragment_type_mask: FragmentTypeMask,
52}
53
54impl From<(&InflightFragmentInfo, JobId)> for SharedFragmentInfo {
55    fn from(pair: (&InflightFragmentInfo, JobId)) -> Self {
56        let (info, job_id) = pair;
57
58        let InflightFragmentInfo {
59            fragment_id,
60            distribution_type,
61            fragment_type_mask,
62            actors,
63            vnode_count,
64            ..
65        } = info;
66
67        Self {
68            fragment_id: *fragment_id,
69            job_id,
70            distribution_type: *distribution_type,
71            fragment_type_mask: *fragment_type_mask,
72            actors: actors.clone(),
73            vnode_count: *vnode_count,
74        }
75    }
76}
77
78#[derive(Default, Debug)]
79pub struct SharedActorInfosInner {
80    info: HashMap<DatabaseId, HashMap<FragmentId, SharedFragmentInfo>>,
81}
82
83impl SharedActorInfosInner {
84    pub fn get_fragment(&self, fragment_id: FragmentId) -> Option<&SharedFragmentInfo> {
85        self.info
86            .values()
87            .find_map(|database| database.get(&fragment_id))
88    }
89
90    pub fn iter_over_fragments(&self) -> impl Iterator<Item = (&FragmentId, &SharedFragmentInfo)> {
91        self.info.values().flatten()
92    }
93}
94
95#[derive(Clone, educe::Educe)]
96#[educe(Debug)]
97pub struct SharedActorInfos {
98    inner: Arc<parking_lot::RwLock<SharedActorInfosInner>>,
99    #[educe(Debug(ignore))]
100    notification_manager: NotificationManagerRef,
101}
102
103impl SharedActorInfos {
104    pub fn read_guard(&self) -> RwLockReadGuard<'_, RawRwLock, SharedActorInfosInner> {
105        self.inner.read()
106    }
107
108    pub fn list_assignments(&self) -> HashMap<ActorId, Vec<SplitImpl>> {
109        let core = self.inner.read();
110        core.iter_over_fragments()
111            .flat_map(|(_, fragment)| {
112                fragment
113                    .actors
114                    .iter()
115                    .map(|(actor_id, info)| (*actor_id, info.splits.clone()))
116            })
117            .collect()
118    }
119
120    /// Migrates splits from previous actors to the new actors for a rescheduled fragment.
121    ///
122    /// Very occasionally split removal may happen during scaling, in which case we need to
123    /// use the old splits for reallocation instead of the latest splits (which may be missing),
124    /// so that we can resolve the split removal in the next command.
125    pub fn migrate_splits_for_source_actors(
126        &self,
127        fragment_id: FragmentId,
128        prev_actor_ids: &[ActorId],
129        curr_actor_ids: &[ActorId],
130    ) -> MetaResult<HashMap<ActorId, Vec<SplitImpl>>> {
131        let guard = self.read_guard();
132
133        let prev_splits = prev_actor_ids
134            .iter()
135            .flat_map(|actor_id| {
136                // Note: File Source / Iceberg Source doesn't have splits assigned by meta.
137                guard
138                    .get_fragment(fragment_id)
139                    .and_then(|info| info.actors.get(actor_id))
140                    .map(|actor| actor.splits.clone())
141                    .unwrap_or_default()
142            })
143            .map(|split| (split.id(), split))
144            .collect();
145
146        let empty_actor_splits = curr_actor_ids
147            .iter()
148            .map(|actor_id| (*actor_id, vec![]))
149            .collect();
150
151        let diff = crate::stream::source_manager::reassign_splits(
152            fragment_id,
153            empty_actor_splits,
154            &prev_splits,
155            // pre-allocate splits is the first time getting splits, and it does not have scale-in scene
156            std::default::Default::default(),
157        )
158        .unwrap_or_default();
159
160        Ok(diff)
161    }
162}
163
164impl SharedActorInfos {
165    pub(crate) fn new(notification_manager: NotificationManagerRef) -> Self {
166        Self {
167            inner: Arc::new(Default::default()),
168            notification_manager,
169        }
170    }
171
172    pub(super) fn remove_database(&self, database_id: DatabaseId) {
173        if let Some(database) = self.inner.write().info.remove(&database_id) {
174            let mapping = database
175                .into_values()
176                .map(|fragment| rebuild_fragment_mapping(&fragment))
177                .collect_vec();
178            if !mapping.is_empty() {
179                self.notification_manager
180                    .notify_fragment_mapping(Operation::Delete, mapping);
181            }
182        }
183    }
184
185    pub(super) fn retain_databases(&self, database_ids: impl IntoIterator<Item = DatabaseId>) {
186        let database_ids: HashSet<_> = database_ids.into_iter().collect();
187
188        let mut mapping = Vec::new();
189        for fragment in self
190            .inner
191            .write()
192            .info
193            .extract_if(|database_id, _| !database_ids.contains(database_id))
194            .flat_map(|(_, fragments)| fragments.into_values())
195        {
196            mapping.push(rebuild_fragment_mapping(&fragment));
197        }
198        if !mapping.is_empty() {
199            self.notification_manager
200                .notify_fragment_mapping(Operation::Delete, mapping);
201        }
202    }
203
204    pub(super) fn recover_database(
205        &self,
206        database_id: DatabaseId,
207        fragments: impl Iterator<Item = (&InflightFragmentInfo, JobId)>,
208    ) {
209        let mut remaining_fragments: HashMap<_, _> = fragments
210            .map(|info @ (fragment, _)| (fragment.fragment_id, info))
211            .collect();
212        // delete the fragments that exist previously, but not included in the recovered fragments
213        let mut writer = self.start_writer(database_id);
214        let database = writer.write_guard.info.entry(database_id).or_default();
215        for (_, fragment) in database.extract_if(|fragment_id, fragment_info| {
216            if let Some(info) = remaining_fragments.remove(fragment_id) {
217                let info = info.into();
218                writer
219                    .updated_fragment_mapping
220                    .get_or_insert_default()
221                    .push(rebuild_fragment_mapping(&info));
222                *fragment_info = info;
223                false
224            } else {
225                true
226            }
227        }) {
228            writer
229                .deleted_fragment_mapping
230                .get_or_insert_default()
231                .push(rebuild_fragment_mapping(&fragment));
232        }
233        for (fragment_id, info) in remaining_fragments {
234            let info = info.into();
235            writer
236                .added_fragment_mapping
237                .get_or_insert_default()
238                .push(rebuild_fragment_mapping(&info));
239            database.insert(fragment_id, info);
240        }
241        writer.finish();
242    }
243
244    pub(super) fn upsert(
245        &self,
246        database_id: DatabaseId,
247        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
248    ) {
249        let mut writer = self.start_writer(database_id);
250        writer.upsert(infos);
251        writer.finish();
252    }
253
254    pub(super) fn start_writer(&self, database_id: DatabaseId) -> SharedActorInfoWriter<'_> {
255        SharedActorInfoWriter {
256            database_id,
257            write_guard: self.inner.write(),
258            notification_manager: &self.notification_manager,
259            added_fragment_mapping: None,
260            updated_fragment_mapping: None,
261            deleted_fragment_mapping: None,
262        }
263    }
264}
265
266pub(super) struct SharedActorInfoWriter<'a> {
267    database_id: DatabaseId,
268    write_guard: parking_lot::RwLockWriteGuard<'a, SharedActorInfosInner>,
269    notification_manager: &'a NotificationManagerRef,
270    added_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
271    updated_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
272    deleted_fragment_mapping: Option<Vec<PbFragmentWorkerSlotMapping>>,
273}
274
275impl SharedActorInfoWriter<'_> {
276    pub(super) fn upsert(
277        &mut self,
278        infos: impl IntoIterator<Item = (&InflightFragmentInfo, JobId)>,
279    ) {
280        let database = self.write_guard.info.entry(self.database_id).or_default();
281        for info @ (fragment, _) in infos {
282            match database.entry(fragment.fragment_id) {
283                Entry::Occupied(mut entry) => {
284                    let info = info.into();
285                    self.updated_fragment_mapping
286                        .get_or_insert_default()
287                        .push(rebuild_fragment_mapping(&info));
288                    entry.insert(info);
289                }
290                Entry::Vacant(entry) => {
291                    let info = info.into();
292                    self.added_fragment_mapping
293                        .get_or_insert_default()
294                        .push(rebuild_fragment_mapping(&info));
295                    entry.insert(info);
296                }
297            }
298        }
299    }
300
301    pub(super) fn remove(&mut self, info: &InflightFragmentInfo) {
302        if let Some(database) = self.write_guard.info.get_mut(&self.database_id)
303            && let Some(fragment) = database.remove(&info.fragment_id)
304        {
305            self.deleted_fragment_mapping
306                .get_or_insert_default()
307                .push(rebuild_fragment_mapping(&fragment));
308        }
309    }
310
311    pub(super) fn finish(self) {
312        if let Some(mapping) = self.added_fragment_mapping {
313            self.notification_manager
314                .notify_fragment_mapping(Operation::Add, mapping);
315        }
316        if let Some(mapping) = self.updated_fragment_mapping {
317            self.notification_manager
318                .notify_fragment_mapping(Operation::Update, mapping);
319        }
320        if let Some(mapping) = self.deleted_fragment_mapping {
321            self.notification_manager
322                .notify_fragment_mapping(Operation::Delete, mapping);
323        }
324    }
325}
326
327#[derive(Debug, Clone)]
328pub(super) struct BarrierInfo {
329    pub prev_epoch: TracedEpoch,
330    pub curr_epoch: TracedEpoch,
331    pub kind: BarrierKind,
332}
333
334impl BarrierInfo {
335    pub(super) fn prev_epoch(&self) -> u64 {
336        self.prev_epoch.value().0
337    }
338}
339
340#[derive(Debug, Clone)]
341pub(crate) enum CommandFragmentChanges {
342    NewFragment {
343        job_id: JobId,
344        info: InflightFragmentInfo,
345        /// Whether the fragment already exists before added. This is used
346        /// when snapshot backfill is finished and add its fragment info
347        /// back to the database.
348        is_existing: bool,
349    },
350    AddNodeUpstream(PbUpstreamSinkInfo),
351    DropNodeUpstream(Vec<FragmentId>),
352    ReplaceNodeUpstream(
353        /// old `fragment_id` -> new `fragment_id`
354        HashMap<FragmentId, FragmentId>,
355    ),
356    Reschedule {
357        new_actors: HashMap<ActorId, InflightActorInfo>,
358        actor_update_vnode_bitmap: HashMap<ActorId, Bitmap>,
359        to_remove: HashSet<ActorId>,
360        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
361    },
362    RemoveFragment,
363    SplitAssignment {
364        actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
365    },
366}
367
368#[derive(Clone, Debug)]
369pub enum SubscriberType {
370    Subscription(u64),
371    SnapshotBackfill,
372}
373
374#[derive(Clone, Debug)]
375pub struct InflightStreamingJobInfo {
376    pub job_id: JobId,
377    pub fragment_infos: HashMap<FragmentId, InflightFragmentInfo>,
378    pub subscribers: HashMap<u32, SubscriberType>,
379}
380
381impl InflightStreamingJobInfo {
382    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
383        self.fragment_infos.values()
384    }
385
386    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
387        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
388    }
389
390    pub fn snapshot_backfill_actor_ids(&self) -> impl Iterator<Item = ActorId> + '_ {
391        self.fragment_infos
392            .values()
393            .filter(|fragment| {
394                fragment
395                    .fragment_type_mask
396                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
397            })
398            .flat_map(|fragment| fragment.actors.keys().copied())
399    }
400
401    pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
402        StreamJobFragments::tracking_progress_actor_ids_impl(
403            self.fragment_infos
404                .values()
405                .map(|fragment| (fragment.fragment_type_mask, fragment.actors.keys().copied())),
406        )
407    }
408}
409
410impl<'a> IntoIterator for &'a InflightStreamingJobInfo {
411    type Item = &'a InflightFragmentInfo;
412
413    type IntoIter = impl Iterator<Item = &'a InflightFragmentInfo> + 'a;
414
415    fn into_iter(self) -> Self::IntoIter {
416        self.fragment_infos()
417    }
418}
419
420#[derive(Clone, Debug)]
421pub struct InflightDatabaseInfo {
422    database_id: DatabaseId,
423    jobs: HashMap<JobId, InflightStreamingJobInfo>,
424    fragment_location: HashMap<FragmentId, JobId>,
425    pub(super) shared_actor_infos: SharedActorInfos,
426}
427
428impl InflightDatabaseInfo {
429    pub fn fragment_infos(&self) -> impl Iterator<Item = &InflightFragmentInfo> + '_ {
430        self.jobs.values().flat_map(|job| job.fragment_infos())
431    }
432
433    pub fn contains_job(&self, job_id: JobId) -> bool {
434        self.jobs.contains_key(&job_id)
435    }
436
437    pub fn fragment(&self, fragment_id: FragmentId) -> &InflightFragmentInfo {
438        let job_id = self.fragment_location[&fragment_id];
439        self.jobs
440            .get(&job_id)
441            .expect("should exist")
442            .fragment_infos
443            .get(&fragment_id)
444            .expect("should exist")
445    }
446
447    pub fn fragment_subscribers(&self, fragment_id: FragmentId) -> impl Iterator<Item = u32> + '_ {
448        let job_id = self.fragment_location[&fragment_id];
449        self.jobs[&job_id].subscribers.keys().copied()
450    }
451
452    pub fn job_subscribers(&self, job_id: JobId) -> impl Iterator<Item = u32> + '_ {
453        self.jobs[&job_id].subscribers.keys().copied()
454    }
455
456    pub fn max_subscription_retention(&self) -> HashMap<TableId, u64> {
457        self.jobs
458            .iter()
459            .filter_map(|(job_id, info)| {
460                info.subscribers
461                    .values()
462                    .filter_map(|subscriber| match subscriber {
463                        SubscriberType::Subscription(retention) => Some(*retention),
464                        SubscriberType::SnapshotBackfill => None,
465                    })
466                    .max()
467                    .map(|max_subscription| (job_id.as_mv_table_id(), max_subscription))
468            })
469            .collect()
470    }
471
472    pub fn register_subscriber(
473        &mut self,
474        job_id: JobId,
475        subscriber_id: u32,
476        subscriber: SubscriberType,
477    ) {
478        self.jobs
479            .get_mut(&job_id)
480            .expect("should exist")
481            .subscribers
482            .try_insert(subscriber_id, subscriber)
483            .expect("non duplicate");
484    }
485
486    pub fn unregister_subscriber(
487        &mut self,
488        job_id: JobId,
489        subscriber_id: u32,
490    ) -> Option<SubscriberType> {
491        self.jobs
492            .get_mut(&job_id)
493            .expect("should exist")
494            .subscribers
495            .remove(&subscriber_id)
496    }
497
498    fn fragment_mut(&mut self, fragment_id: FragmentId) -> (&mut InflightFragmentInfo, JobId) {
499        let job_id = self.fragment_location[&fragment_id];
500        let fragment = self
501            .jobs
502            .get_mut(&job_id)
503            .expect("should exist")
504            .fragment_infos
505            .get_mut(&fragment_id)
506            .expect("should exist");
507        (fragment, job_id)
508    }
509
510    fn empty_inner(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
511        Self {
512            database_id,
513            jobs: Default::default(),
514            fragment_location: Default::default(),
515            shared_actor_infos,
516        }
517    }
518
519    pub fn empty(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
520        // remove the database because it's empty.
521        shared_actor_infos.remove_database(database_id);
522        Self::empty_inner(database_id, shared_actor_infos)
523    }
524
525    pub fn recover(
526        database_id: DatabaseId,
527        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
528        shared_actor_infos: SharedActorInfos,
529    ) -> Self {
530        let mut info = Self::empty_inner(database_id, shared_actor_infos);
531        for job in jobs {
532            info.add_existing(job);
533        }
534        info
535    }
536
537    pub fn is_empty(&self) -> bool {
538        self.jobs.is_empty()
539    }
540
541    pub fn add_existing(&mut self, job: InflightStreamingJobInfo) {
542        self.jobs
543            .try_insert(
544                job.job_id,
545                InflightStreamingJobInfo {
546                    job_id: job.job_id,
547                    subscribers: job.subscribers,
548                    fragment_infos: Default::default(), // fill in later in apply_add
549                },
550            )
551            .expect("non-duplicate");
552        self.apply_add(job.fragment_infos.into_iter().map(|(fragment_id, info)| {
553            (
554                fragment_id,
555                CommandFragmentChanges::NewFragment {
556                    job_id: job.job_id,
557                    info,
558                    is_existing: true,
559                },
560            )
561        }))
562    }
563
564    /// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
565    /// the info correspondingly.
566    pub(crate) fn pre_apply(
567        &mut self,
568        new_job_id: Option<JobId>,
569        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
570    ) {
571        if let Some(job_id) = new_job_id {
572            self.jobs
573                .try_insert(
574                    job_id,
575                    InflightStreamingJobInfo {
576                        job_id,
577                        fragment_infos: Default::default(),
578                        subscribers: Default::default(), // no subscriber for newly create job
579                    },
580                )
581                .expect("non-duplicate");
582        }
583        self.apply_add(
584            fragment_changes
585                .iter()
586                .map(|(fragment_id, change)| (*fragment_id, change.clone())),
587        )
588    }
589
590    fn apply_add(
591        &mut self,
592        fragment_changes: impl Iterator<Item = (FragmentId, CommandFragmentChanges)>,
593    ) {
594        {
595            let shared_infos = self.shared_actor_infos.clone();
596            let mut shared_actor_writer = shared_infos.start_writer(self.database_id);
597            for (fragment_id, change) in fragment_changes {
598                match change {
599                    CommandFragmentChanges::NewFragment {
600                        job_id,
601                        info,
602                        is_existing,
603                    } => {
604                        let fragment_infos = self.jobs.get_mut(&job_id).expect("should exist");
605                        if !is_existing {
606                            shared_actor_writer.upsert([(&info, job_id)]);
607                        }
608                        fragment_infos
609                            .fragment_infos
610                            .try_insert(fragment_id, info)
611                            .expect("non duplicate");
612                        self.fragment_location
613                            .try_insert(fragment_id, job_id)
614                            .expect("non duplicate");
615                    }
616                    CommandFragmentChanges::Reschedule {
617                        new_actors,
618                        actor_update_vnode_bitmap,
619                        actor_splits,
620                        ..
621                    } => {
622                        let (info, _) = self.fragment_mut(fragment_id);
623                        let actors = &mut info.actors;
624                        for (actor_id, new_vnodes) in actor_update_vnode_bitmap {
625                            actors
626                                .get_mut(&actor_id)
627                                .expect("should exist")
628                                .vnode_bitmap = Some(new_vnodes);
629                        }
630                        for (actor_id, actor) in new_actors {
631                            actors
632                                .try_insert(actor_id as _, actor)
633                                .expect("non-duplicate");
634                        }
635                        for (actor_id, splits) in actor_splits {
636                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
637                        }
638
639                        // info will be upserted into shared_actor_infos in post_apply stage
640                    }
641                    CommandFragmentChanges::RemoveFragment => {}
642                    CommandFragmentChanges::ReplaceNodeUpstream(replace_map) => {
643                        let mut remaining_fragment_ids: HashSet<_> =
644                            replace_map.keys().cloned().collect();
645                        let (info, _) = self.fragment_mut(fragment_id);
646                        visit_stream_node_mut(&mut info.nodes, |node| {
647                            if let NodeBody::Merge(m) = node
648                                && let Some(new_upstream_fragment_id) =
649                                    replace_map.get(&m.upstream_fragment_id)
650                            {
651                                if !remaining_fragment_ids.remove(&m.upstream_fragment_id) {
652                                    if cfg!(debug_assertions) {
653                                        panic!(
654                                            "duplicate upstream fragment: {:?} {:?}",
655                                            m, replace_map
656                                        );
657                                    } else {
658                                        warn!(?m, ?replace_map, "duplicate upstream fragment");
659                                    }
660                                }
661                                m.upstream_fragment_id = *new_upstream_fragment_id;
662                            }
663                        });
664                        if cfg!(debug_assertions) {
665                            assert!(
666                                remaining_fragment_ids.is_empty(),
667                                "non-existing fragment to replace: {:?} {:?} {:?}",
668                                remaining_fragment_ids,
669                                info.nodes,
670                                replace_map
671                            );
672                        } else {
673                            warn!(?remaining_fragment_ids, node = ?info.nodes, ?replace_map, "non-existing fragment to replace");
674                        }
675                    }
676                    CommandFragmentChanges::AddNodeUpstream(new_upstream_info) => {
677                        let (info, _) = self.fragment_mut(fragment_id);
678                        let mut injected = false;
679                        visit_stream_node_mut(&mut info.nodes, |node| {
680                            if let NodeBody::UpstreamSinkUnion(u) = node {
681                                if cfg!(debug_assertions) {
682                                    let current_upstream_fragment_ids = u
683                                        .init_upstreams
684                                        .iter()
685                                        .map(|upstream| upstream.upstream_fragment_id)
686                                        .collect::<HashSet<_>>();
687                                    if current_upstream_fragment_ids
688                                        .contains(&new_upstream_info.upstream_fragment_id)
689                                    {
690                                        panic!(
691                                            "duplicate upstream fragment: {:?} {:?}",
692                                            u, new_upstream_info
693                                        );
694                                    }
695                                }
696                                u.init_upstreams.push(new_upstream_info.clone());
697                                injected = true;
698                            }
699                        });
700                        assert!(injected, "should inject upstream into UpstreamSinkUnion");
701                    }
702                    CommandFragmentChanges::DropNodeUpstream(drop_upstream_fragment_ids) => {
703                        let (info, _) = self.fragment_mut(fragment_id);
704                        let mut removed = false;
705                        visit_stream_node_mut(&mut info.nodes, |node| {
706                            if let NodeBody::UpstreamSinkUnion(u) = node {
707                                if cfg!(debug_assertions) {
708                                    let current_upstream_fragment_ids = u
709                                        .init_upstreams
710                                        .iter()
711                                        .map(|upstream| upstream.upstream_fragment_id)
712                                        .collect::<HashSet<FragmentId>>();
713                                    for drop_fragment_id in &drop_upstream_fragment_ids {
714                                        if !current_upstream_fragment_ids.contains(drop_fragment_id)
715                                        {
716                                            panic!(
717                                                "non-existing upstream fragment to drop: {:?} {:?} {:?}",
718                                                u, drop_upstream_fragment_ids, drop_fragment_id
719                                            );
720                                        }
721                                    }
722                                }
723                                u.init_upstreams.retain(|upstream| {
724                                    !drop_upstream_fragment_ids
725                                        .contains(&upstream.upstream_fragment_id)
726                                });
727                                removed = true;
728                            }
729                        });
730                        assert!(removed, "should remove upstream from UpstreamSinkUnion");
731                    }
732                    CommandFragmentChanges::SplitAssignment { actor_splits } => {
733                        let (info, job_id) = self.fragment_mut(fragment_id);
734                        let actors = &mut info.actors;
735                        for (actor_id, splits) in actor_splits {
736                            actors.get_mut(&actor_id).expect("should exist").splits = splits;
737                        }
738                        shared_actor_writer.upsert([(&*info, job_id)]);
739                    }
740                }
741            }
742            shared_actor_writer.finish();
743        }
744    }
745
746    pub(super) fn build_edge(
747        &self,
748        command: Option<&Command>,
749        control_stream_manager: &ControlStreamManager,
750    ) -> Option<FragmentEdgeBuildResult> {
751        let (info, replace_job, new_upstream_sink) = match command {
752            None => {
753                return None;
754            }
755            Some(command) => match command {
756                Command::Flush
757                | Command::Pause
758                | Command::Resume
759                | Command::DropStreamingJobs { .. }
760                | Command::MergeSnapshotBackfillStreamingJobs(_)
761                | Command::RescheduleFragment { .. }
762                | Command::SourceChangeSplit { .. }
763                | Command::Throttle(_)
764                | Command::CreateSubscription { .. }
765                | Command::DropSubscription { .. }
766                | Command::ConnectorPropsChange(_)
767                | Command::StartFragmentBackfill { .. }
768                | Command::Refresh { .. }
769                | Command::ListFinish { .. }
770                | Command::LoadFinish { .. } => {
771                    return None;
772                }
773                Command::CreateStreamingJob { info, job_type, .. } => {
774                    let new_upstream_sink = if let CreateStreamingJobType::SinkIntoTable(
775                        new_upstream_sink,
776                    ) = job_type
777                    {
778                        Some(new_upstream_sink)
779                    } else {
780                        None
781                    };
782                    (Some(info), None, new_upstream_sink)
783                }
784                Command::ReplaceStreamJob(replace_job) => (None, Some(replace_job), None),
785            },
786        };
787        // `existing_fragment_ids` consists of
788        //  - keys of `info.upstream_fragment_downstreams`, which are the `fragment_id` the upstream fragment of the newly created job
789        //  - keys of `replace_job.upstream_fragment_downstreams`, which are the `fragment_id` of upstream fragment of replace_job,
790        // if the upstream fragment previously exists
791        //  - keys of `replace_upstream`, which are the `fragment_id` of downstream fragments that will update their upstream fragments,
792        // if creating a new sink-into-table
793        //  - should contain the `fragment_id` of the downstream table.
794        let existing_fragment_ids = info
795            .into_iter()
796            .flat_map(|info| info.upstream_fragment_downstreams.keys())
797            .chain(replace_job.into_iter().flat_map(|replace_job| {
798                replace_job
799                    .upstream_fragment_downstreams
800                    .keys()
801                    .filter(|fragment_id| {
802                        info.map(|info| {
803                            !info
804                                .stream_job_fragments
805                                .fragments
806                                .contains_key(fragment_id)
807                        })
808                        .unwrap_or(true)
809                    })
810                    .chain(replace_job.replace_upstream.keys())
811            }))
812            .chain(
813                new_upstream_sink
814                    .into_iter()
815                    .map(|ctx| &ctx.new_sink_downstream.downstream_fragment_id),
816            )
817            .cloned();
818        let new_fragment_infos = info
819            .into_iter()
820            .flat_map(|info| {
821                info.stream_job_fragments
822                    .new_fragment_info(&info.init_split_assignment)
823            })
824            .chain(replace_job.into_iter().flat_map(|replace_job| {
825                replace_job
826                    .new_fragments
827                    .new_fragment_info(&replace_job.init_split_assignment)
828                    .chain(
829                        replace_job
830                            .auto_refresh_schema_sinks
831                            .as_ref()
832                            .into_iter()
833                            .flat_map(|sinks| {
834                                sinks.iter().map(|sink| {
835                                    (sink.new_fragment.fragment_id, sink.new_fragment_info())
836                                })
837                            }),
838                    )
839            }))
840            .collect_vec();
841        let mut builder = FragmentEdgeBuilder::new(
842            existing_fragment_ids
843                .map(|fragment_id| self.fragment(fragment_id))
844                .chain(new_fragment_infos.iter().map(|(_, info)| info)),
845            control_stream_manager,
846        );
847        if let Some(info) = info {
848            builder.add_relations(&info.upstream_fragment_downstreams);
849            builder.add_relations(&info.stream_job_fragments.downstreams);
850        }
851        if let Some(replace_job) = replace_job {
852            builder.add_relations(&replace_job.upstream_fragment_downstreams);
853            builder.add_relations(&replace_job.new_fragments.downstreams);
854        }
855        if let Some(new_upstream_sink) = new_upstream_sink {
856            let sink_fragment_id = new_upstream_sink.sink_fragment_id;
857            let new_sink_downstream = &new_upstream_sink.new_sink_downstream;
858            builder.add_edge(sink_fragment_id, new_sink_downstream);
859        }
860        if let Some(replace_job) = replace_job {
861            for (fragment_id, fragment_replacement) in &replace_job.replace_upstream {
862                for (original_upstream_fragment_id, new_upstream_fragment_id) in
863                    fragment_replacement
864                {
865                    builder.replace_upstream(
866                        *fragment_id,
867                        *original_upstream_fragment_id,
868                        *new_upstream_fragment_id,
869                    );
870                }
871            }
872        }
873        Some(builder.build())
874    }
875
876    /// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
877    /// remove that from the snapshot correspondingly.
878    pub(crate) fn post_apply(
879        &mut self,
880        fragment_changes: &HashMap<FragmentId, CommandFragmentChanges>,
881    ) {
882        let inner = self.shared_actor_infos.clone();
883        let mut shared_actor_writer = inner.start_writer(self.database_id);
884        {
885            for (fragment_id, changes) in fragment_changes {
886                match changes {
887                    CommandFragmentChanges::NewFragment { .. } => {}
888                    CommandFragmentChanges::Reschedule { to_remove, .. } => {
889                        let job_id = self.fragment_location[fragment_id];
890                        let info = self
891                            .jobs
892                            .get_mut(&job_id)
893                            .expect("should exist")
894                            .fragment_infos
895                            .get_mut(fragment_id)
896                            .expect("should exist");
897                        for actor_id in to_remove {
898                            assert!(info.actors.remove(&(*actor_id as _)).is_some());
899                        }
900                        shared_actor_writer.upsert([(&*info, job_id)]);
901                    }
902                    CommandFragmentChanges::RemoveFragment => {
903                        let job_id = self
904                            .fragment_location
905                            .remove(fragment_id)
906                            .expect("should exist");
907                        let job = self.jobs.get_mut(&job_id).expect("should exist");
908                        let fragment = job
909                            .fragment_infos
910                            .remove(fragment_id)
911                            .expect("should exist");
912                        shared_actor_writer.remove(&fragment);
913                        if job.fragment_infos.is_empty() {
914                            self.jobs.remove(&job_id).expect("should exist");
915                        }
916                    }
917                    CommandFragmentChanges::ReplaceNodeUpstream(_)
918                    | CommandFragmentChanges::AddNodeUpstream(_)
919                    | CommandFragmentChanges::DropNodeUpstream(_)
920                    | CommandFragmentChanges::SplitAssignment { .. } => {}
921                }
922            }
923        }
924        shared_actor_writer.finish();
925    }
926}
927
928impl InflightFragmentInfo {
929    /// Returns actor list to collect in the target worker node.
930    pub(crate) fn actor_ids_to_collect(
931        infos: impl IntoIterator<Item = &Self>,
932    ) -> HashMap<WorkerId, HashSet<ActorId>> {
933        let mut ret: HashMap<_, HashSet<_>> = HashMap::new();
934        for (actor_id, actor) in infos.into_iter().flat_map(|info| info.actors.iter()) {
935            assert!(
936                ret.entry(actor.worker_id)
937                    .or_default()
938                    .insert(*actor_id as _)
939            )
940        }
941        ret
942    }
943
944    pub fn existing_table_ids<'a>(
945        infos: impl IntoIterator<Item = &'a Self> + 'a,
946    ) -> impl Iterator<Item = TableId> + 'a {
947        infos
948            .into_iter()
949            .flat_map(|info| info.state_table_ids.iter().cloned())
950    }
951
952    pub fn contains_worker(infos: impl IntoIterator<Item = &Self>, worker_id: WorkerId) -> bool {
953        infos.into_iter().any(|fragment| {
954            fragment
955                .actors
956                .values()
957                .any(|actor| (actor.worker_id) == worker_id)
958        })
959    }
960}
961
962impl InflightDatabaseInfo {
963    pub fn contains_worker(&self, worker_id: WorkerId) -> bool {
964        InflightFragmentInfo::contains_worker(self.fragment_infos(), worker_id)
965    }
966
967    pub fn existing_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
968        InflightFragmentInfo::existing_table_ids(self.fragment_infos())
969    }
970}