risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25    CDC_SOURCE_COLUMN_NUM, TableId, generate_internal_table_name_with_type,
26};
27use risingwave_common::hash::VnodeCount;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::stream_graph_visitor::{
30    self, visit_stream_node_cont, visit_stream_node_cont_mut,
31};
32use risingwave_meta_model::WorkerId;
33use risingwave_pb::catalog::Table;
34use risingwave_pb::ddl_service::TableJobType;
35use risingwave_pb::stream_plan::stream_fragment_graph::{
36    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
37};
38use risingwave_pb::stream_plan::stream_node::NodeBody;
39use risingwave_pb::stream_plan::{
40    BackfillOrder, DispatchStrategy, DispatcherType, FragmentTypeFlag, PbStreamNode,
41    StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType,
42};
43
44use crate::barrier::SnapshotBackfillInfo;
45use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
46use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
47use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
48use crate::stream::stream_graph::schedule::Distribution;
49use crate::{MetaError, MetaResult};
50
51/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
52/// several additional helper fields.
53#[derive(Debug, Clone)]
54pub(super) struct BuildingFragment {
55    /// The fragment structure from the frontend, with the global fragment ID.
56    inner: StreamFragment,
57
58    /// The ID of the job if it contains the streaming job node.
59    job_id: Option<u32>,
60
61    /// The required column IDs of each upstream table.
62    /// Will be converted to indices when building the edge connected to the upstream.
63    ///
64    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
65    upstream_table_columns: HashMap<TableId, Vec<i32>>,
66}
67
68impl BuildingFragment {
69    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
70    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
71    fn new(
72        id: GlobalFragmentId,
73        fragment: StreamFragment,
74        job: &StreamingJob,
75        table_id_gen: GlobalTableIdGen,
76    ) -> Self {
77        let mut fragment = StreamFragment {
78            fragment_id: id.as_global_id(),
79            ..fragment
80        };
81
82        // Fill the information of the internal tables in the fragment.
83        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
84
85        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
86        let upstream_table_columns =
87            Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
88
89        Self {
90            inner: fragment,
91            job_id,
92            upstream_table_columns,
93        }
94    }
95
96    /// Extract the internal tables from the fragment.
97    fn extract_internal_tables(&self) -> Vec<Table> {
98        let mut fragment = self.inner.to_owned();
99        let mut tables = Vec::new();
100        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
101            tables.push(table.clone());
102        });
103        tables
104    }
105
106    /// Fill the information with the internal tables in the fragment.
107    fn fill_internal_tables(
108        fragment: &mut StreamFragment,
109        job: &StreamingJob,
110        table_id_gen: GlobalTableIdGen,
111    ) {
112        let fragment_id = fragment.fragment_id;
113        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
114            table.id = table_id_gen.to_global_id(table.id).as_global_id();
115            table.schema_id = job.schema_id();
116            table.database_id = job.database_id();
117            table.name = generate_internal_table_name_with_type(
118                &job.name(),
119                fragment_id,
120                table.id,
121                table_type_name,
122            );
123            table.fragment_id = fragment_id;
124            table.owner = job.owner();
125        });
126    }
127
128    /// Fill the information with the job in the fragment.
129    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
130        let job_id = job.id();
131        let fragment_id = fragment.fragment_id;
132        let mut has_job = false;
133
134        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
135            NodeBody::Materialize(materialize_node) => {
136                materialize_node.table_id = job_id;
137
138                // Fill the ID of the `Table`.
139                let table = materialize_node.table.as_mut().unwrap();
140                table.id = job_id;
141                table.database_id = job.database_id();
142                table.schema_id = job.schema_id();
143                table.fragment_id = fragment_id;
144                #[cfg(not(debug_assertions))]
145                {
146                    table.definition = job.name();
147                }
148
149                has_job = true;
150            }
151            NodeBody::Sink(sink_node) => {
152                sink_node.sink_desc.as_mut().unwrap().id = job_id;
153
154                has_job = true;
155            }
156            NodeBody::Dml(dml_node) => {
157                dml_node.table_id = job_id;
158                dml_node.table_version_id = job.table_version_id().unwrap();
159            }
160            NodeBody::StreamFsFetch(fs_fetch_node) => {
161                if let StreamingJob::Table(table_source, _, _) = job {
162                    if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
163                        && let Some(source) = table_source
164                    {
165                        node_inner.source_id = source.id;
166                    }
167                }
168            }
169            NodeBody::Source(source_node) => {
170                match job {
171                    // Note: For table without connector, it has a dummy Source node.
172                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
173                    StreamingJob::Table(source, _table, _table_job_type) => {
174                        if let Some(source_inner) = source_node.source_inner.as_mut() {
175                            if let Some(source) = source {
176                                debug_assert_ne!(source.id, job_id);
177                                source_inner.source_id = source.id;
178                            }
179                        }
180                    }
181                    StreamingJob::Source(source) => {
182                        has_job = true;
183                        if let Some(source_inner) = source_node.source_inner.as_mut() {
184                            debug_assert_eq!(source.id, job_id);
185                            source_inner.source_id = source.id;
186                        }
187                    }
188                    // For other job types, no need to fill the source id, since it refers to an existing source.
189                    _ => {}
190                }
191            }
192            NodeBody::StreamCdcScan(node) => {
193                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
194                    table_desc.table_id = job_id;
195                }
196            }
197            _ => {}
198        });
199
200        has_job
201    }
202
203    /// Extract the required columns (in IDs) of each upstream table except for cross-db backfill.
204    fn extract_upstream_table_columns_except_cross_db_backfill(
205        fragment: &StreamFragment,
206    ) -> HashMap<TableId, Vec<i32>> {
207        let mut table_columns = HashMap::new();
208
209        stream_graph_visitor::visit_fragment(fragment, |node_body| {
210            let (table_id, column_ids) = match node_body {
211                NodeBody::StreamScan(stream_scan) => {
212                    if stream_scan.get_stream_scan_type().unwrap()
213                        == StreamScanType::CrossDbSnapshotBackfill
214                    {
215                        return;
216                    }
217                    (
218                        stream_scan.table_id.into(),
219                        stream_scan.upstream_column_ids.clone(),
220                    )
221                }
222                NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
223                NodeBody::SourceBackfill(backfill) => (
224                    backfill.upstream_source_id.into(),
225                    // FIXME: only pass required columns instead of all columns here
226                    backfill.column_ids(),
227                ),
228                _ => return,
229            };
230            table_columns
231                .try_insert(table_id, column_ids)
232                .expect("currently there should be no two same upstream tables in a fragment");
233        });
234
235        table_columns
236    }
237
238    pub fn has_shuffled_backfill(&self) -> bool {
239        let stream_node = match self.inner.node.as_ref() {
240            Some(node) => node,
241            _ => return false,
242        };
243        let mut has_shuffled_backfill = false;
244        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
245        visit_stream_node_cont(stream_node, |node| {
246            let is_shuffled_backfill = if let Some(node) = &node.node_body
247                && let Some(node) = node.as_stream_scan()
248            {
249                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
250                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
251            } else {
252                false
253            };
254            if is_shuffled_backfill {
255                *has_shuffled_backfill_mut_ref = true;
256                false
257            } else {
258                true
259            }
260        });
261        has_shuffled_backfill
262    }
263}
264
265impl Deref for BuildingFragment {
266    type Target = StreamFragment;
267
268    fn deref(&self) -> &Self::Target {
269        &self.inner
270    }
271}
272
273impl DerefMut for BuildingFragment {
274    fn deref_mut(&mut self) -> &mut Self::Target {
275        &mut self.inner
276    }
277}
278
279/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
280/// different variants.
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
282pub(super) enum EdgeId {
283    /// The edge between two building (internal) fragments.
284    Internal {
285        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
286        /// See [`StreamFragmentEdgeProto`].
287        link_id: u64,
288    },
289
290    /// The edge between an upstream external fragment and downstream building fragment. Used for
291    /// MV on MV.
292    UpstreamExternal {
293        /// The ID of the upstream table or materialized view.
294        upstream_table_id: TableId,
295        /// The ID of the downstream fragment.
296        downstream_fragment_id: GlobalFragmentId,
297    },
298
299    /// The edge between an upstream building fragment and downstream external fragment. Used for
300    /// schema change (replace table plan).
301    DownstreamExternal(DownstreamExternalEdgeId),
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
305pub(super) struct DownstreamExternalEdgeId {
306    /// The ID of the original upstream fragment (`Materialize`).
307    pub(super) original_upstream_fragment_id: GlobalFragmentId,
308    /// The ID of the downstream fragment.
309    pub(super) downstream_fragment_id: GlobalFragmentId,
310}
311
312/// The edge in the fragment graph.
313///
314/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
315#[derive(Debug, Clone)]
316pub(super) struct StreamFragmentEdge {
317    /// The ID of the edge.
318    pub id: EdgeId,
319
320    /// The strategy used for dispatching the data.
321    pub dispatch_strategy: DispatchStrategy,
322}
323
324impl StreamFragmentEdge {
325    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
326        Self {
327            // By creating an edge from the protobuf, we know that the edge is from the frontend and
328            // is internal.
329            id: EdgeId::Internal {
330                link_id: edge.link_id,
331            },
332            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
333        }
334    }
335}
336
337/// Adjacency list (G) of backfill orders.
338/// `G[10] -> [1, 2, 11]`
339/// means for the backfill node in `fragment 10`
340/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
341pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
342
343/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
344/// from the frontend.
345///
346/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
347/// that contains the additional information of pre-existing
348/// fragments, which are connected to the graph's top-most or bottom-most fragments.
349#[derive(Default, Debug)]
350pub struct StreamFragmentGraph {
351    /// stores all the fragments in the graph.
352    fragments: HashMap<GlobalFragmentId, BuildingFragment>,
353
354    /// stores edges between fragments: upstream => downstream.
355    downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
356
357    /// stores edges between fragments: downstream -> upstream.
358    upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
359
360    /// Dependent relations of this job.
361    dependent_table_ids: HashSet<TableId>,
362
363    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
364    /// variable. If not specified, all active worker slots will be used.
365    specified_parallelism: Option<NonZeroUsize>,
366
367    /// Specified max parallelism, i.e., expected vnode count for the graph.
368    ///
369    /// The scheduler on the meta service will use this as a hint to decide the vnode count
370    /// for each fragment.
371    ///
372    /// Note that the actual vnode count may be different from this value.
373    /// For example, a no-shuffle exchange between current fragment graph and an existing
374    /// upstream fragment graph requires two fragments to be in the same distribution,
375    /// thus the same vnode count.
376    max_parallelism: usize,
377
378    /// The backfill ordering strategy of the graph.
379    backfill_order: BackfillOrder,
380}
381
382impl StreamFragmentGraph {
383    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
384    /// global IDs correctly filled.
385    pub fn new(
386        env: &MetaSrvEnv,
387        proto: StreamFragmentGraphProto,
388        job: &StreamingJob,
389    ) -> MetaResult<Self> {
390        let fragment_id_gen =
391            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
392        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
393        // with `refill_internal_table_ids` later.
394        // TODO: refactor the code to remove this step.
395        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
396
397        // Create nodes.
398        let fragments: HashMap<_, _> = proto
399            .fragments
400            .into_iter()
401            .map(|(id, fragment)| {
402                let id = fragment_id_gen.to_global_id(id);
403                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
404                (id, fragment)
405            })
406            .collect();
407
408        assert_eq!(
409            fragments
410                .values()
411                .map(|f| f.extract_internal_tables().len() as u32)
412                .sum::<u32>(),
413            proto.table_ids_cnt
414        );
415
416        // Create edges.
417        let mut downstreams = HashMap::new();
418        let mut upstreams = HashMap::new();
419
420        for edge in proto.edges {
421            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
422            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
423            let edge = StreamFragmentEdge::from_protobuf(&edge);
424
425            upstreams
426                .entry(downstream_id)
427                .or_insert_with(HashMap::new)
428                .try_insert(upstream_id, edge.clone())
429                .unwrap();
430            downstreams
431                .entry(upstream_id)
432                .or_insert_with(HashMap::new)
433                .try_insert(downstream_id, edge)
434                .unwrap();
435        }
436
437        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
438        // frontend), instead of visiting the graph ourselves.
439        let dependent_table_ids = proto
440            .dependent_table_ids
441            .iter()
442            .map(TableId::from)
443            .collect();
444
445        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
446            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
447        } else {
448            None
449        };
450
451        let max_parallelism = proto.max_parallelism as usize;
452        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
453            order: Default::default(),
454        });
455
456        Ok(Self {
457            fragments,
458            downstreams,
459            upstreams,
460            dependent_table_ids,
461            specified_parallelism,
462            max_parallelism,
463            backfill_order,
464        })
465    }
466
467    /// Retrieve the **incomplete** internal tables map of the whole graph.
468    ///
469    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
470    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
471    /// Be careful when using the returned values.
472    ///
473    /// See also [`crate::model::StreamJobFragments::internal_tables`].
474    pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
475        let mut tables = BTreeMap::new();
476        for fragment in self.fragments.values() {
477            for table in fragment.extract_internal_tables() {
478                let table_id = table.id;
479                tables
480                    .try_insert(table_id, table)
481                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
482            }
483        }
484        tables
485    }
486
487    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
488    /// `create_internal_table_catalog`.
489    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
490        for fragment in self.fragments.values_mut() {
491            stream_graph_visitor::visit_internal_tables(
492                &mut fragment.inner,
493                |table, _table_type_name| {
494                    let target = table_id_map.get(&table.id).cloned().unwrap();
495                    table.id = target;
496                },
497            );
498        }
499    }
500
501    /// Set internal tables' `table_id`s according to a list of internal tables
502    pub fn fit_internal_table_ids(
503        &mut self,
504        mut old_internal_tables: Vec<Table>,
505    ) -> MetaResult<()> {
506        let mut new_internal_table_ids = Vec::new();
507        for fragment in self.fragments.values() {
508            for table in &fragment.extract_internal_tables() {
509                new_internal_table_ids.push(table.id);
510            }
511        }
512
513        if new_internal_table_ids.len() != old_internal_tables.len() {
514            bail!(
515                "Different number of internal tables. New: {}, Old: {}",
516                new_internal_table_ids.len(),
517                old_internal_tables.len()
518            );
519        }
520        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
521        new_internal_table_ids.sort();
522
523        let internal_table_id_map = new_internal_table_ids
524            .into_iter()
525            .zip_eq_fast(old_internal_tables.into_iter())
526            .collect::<HashMap<_, _>>();
527
528        for fragment in self.fragments.values_mut() {
529            stream_graph_visitor::visit_internal_tables(
530                &mut fragment.inner,
531                |table, _table_type_name| {
532                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
533                    *table = target;
534                },
535            );
536        }
537
538        Ok(())
539    }
540
541    /// Returns the fragment id where the streaming job node located.
542    pub fn table_fragment_id(&self) -> FragmentId {
543        self.fragments
544            .values()
545            .filter(|b| b.job_id.is_some())
546            .map(|b| b.fragment_id)
547            .exactly_one()
548            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
549    }
550
551    /// Returns the fragment id where the table dml is received.
552    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
553        self.fragments
554            .values()
555            .filter(|b| b.fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0)
556            .map(|b| b.fragment_id)
557            .at_most_one()
558            .expect("require at most 1 dml node when creating the streaming job")
559    }
560
561    /// Get the dependent streaming job ids of this job.
562    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
563        &self.dependent_table_ids
564    }
565
566    /// Get the parallelism of the job, if specified by the user.
567    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
568        self.specified_parallelism
569    }
570
571    /// Get the expected vnode count of the graph. See documentation of the field for more details.
572    pub fn max_parallelism(&self) -> usize {
573        self.max_parallelism
574    }
575
576    /// Get downstreams of a fragment.
577    fn get_downstreams(
578        &self,
579        fragment_id: GlobalFragmentId,
580    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
581        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
582    }
583
584    /// Get upstreams of a fragment.
585    fn get_upstreams(
586        &self,
587        fragment_id: GlobalFragmentId,
588    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
589        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
590    }
591
592    pub fn collect_snapshot_backfill_info(
593        &self,
594    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
595        Self::collect_snapshot_backfill_info_impl(
596            self.fragments
597                .values()
598                .map(|fragment| (fragment.node.as_ref().unwrap(), fragment.fragment_type_mask)),
599        )
600    }
601
602    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
603    pub fn collect_snapshot_backfill_info_impl(
604        fragments: impl IntoIterator<Item = (&PbStreamNode, u32)>,
605    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
606        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
607        let mut cross_db_info = SnapshotBackfillInfo {
608            upstream_mv_table_id_to_backfill_epoch: Default::default(),
609        };
610        let mut result = Ok(());
611        for (node, fragment_type_mask) in fragments {
612            visit_stream_node_cont(node, |node| {
613                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
614                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
615                        .expect("invalid stream_scan_type");
616                    let is_snapshot_backfill = match stream_scan_type {
617                        StreamScanType::SnapshotBackfill => {
618                            assert!(
619                                (fragment_type_mask
620                                    & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
621                                    > 0
622                            );
623                            true
624                        }
625                        StreamScanType::CrossDbSnapshotBackfill => {
626                            assert!(
627                                (fragment_type_mask
628                                    & (FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
629                                    > 0
630                            );
631                            cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
632                                TableId::new(stream_scan.table_id),
633                                stream_scan.snapshot_backfill_epoch,
634                            );
635
636                            return true;
637                        }
638                        _ => false,
639                    };
640
641                    match &mut prev_stream_scan {
642                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
643                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
644                                (Some(prev_snapshot_backfill_info), true) => {
645                                    prev_snapshot_backfill_info
646                                        .upstream_mv_table_id_to_backfill_epoch
647                                        .insert(
648                                            TableId::new(stream_scan.table_id),
649                                            stream_scan.snapshot_backfill_epoch,
650                                        );
651                                    true
652                                }
653                                (None, false) => true,
654                                (_, _) => {
655                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
656                                    false
657                                }
658                            }
659                        }
660                        None => {
661                            prev_stream_scan = Some((
662                                if is_snapshot_backfill {
663                                    Some(SnapshotBackfillInfo {
664                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
665                                            [(
666                                                TableId::new(stream_scan.table_id),
667                                                stream_scan.snapshot_backfill_epoch,
668                                            )],
669                                        ),
670                                    })
671                                } else {
672                                    None
673                                },
674                                *stream_scan.clone(),
675                            ));
676                            true
677                        }
678                    }
679                } else {
680                    true
681                }
682            })
683        }
684        result.map(|_| {
685            (
686                prev_stream_scan
687                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
688                    .unwrap_or(None),
689                cross_db_info,
690            )
691        })
692    }
693
694    /// Collect the mapping from table / `source_id` -> `fragment_id`
695    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
696        let mut mapping = HashMap::new();
697        for (fragment_id, fragment) in &self.fragments {
698            let fragment_id = fragment_id.as_global_id();
699            let fragment_mask = fragment.fragment_type_mask;
700            // TODO(kwannoel): Support source scan
701            let candidates = [FragmentTypeFlag::StreamScan];
702            let has_some_scan = candidates
703                .into_iter()
704                .any(|flag| (fragment_mask & flag as u32) > 0);
705            if has_some_scan {
706                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
707                    match node.node_body.as_ref() {
708                        Some(NodeBody::StreamScan(stream_scan)) => {
709                            let table_id = stream_scan.table_id;
710                            let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
711                            fragments.push(fragment_id);
712                            // each fragment should have only 1 scan node.
713                            false
714                        }
715                        // TODO(kwannoel): Support source scan
716                        _ => true,
717                    }
718                })
719            }
720        }
721        mapping
722    }
723
724    /// Initially the mapping that comes from frontend is between `table_ids`.
725    /// We should remap it to fragment level, since we track progress by actor, and we can get
726    /// a fragment <-> actor mapping
727    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
728        let mapping = self.collect_backfill_mapping();
729        let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
730        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
731            let fragment_ids = mapping.get(rel_id).unwrap();
732            for fragment_id in fragment_ids {
733                let downstream_fragment_ids = downstream_rel_ids
734                    .data
735                    .iter()
736                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
737                    .copied()
738                    .collect();
739                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
740            }
741        }
742        fragment_ordering
743    }
744}
745
746/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
747/// Return `true` when has change applied.
748pub fn fill_snapshot_backfill_epoch(
749    node: &mut StreamNode,
750    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
751    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
752) -> MetaResult<bool> {
753    let mut result = Ok(());
754    let mut applied = false;
755    visit_stream_node_cont_mut(node, |node| {
756        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
757            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
758                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
759        {
760            result = try {
761                let table_id = TableId::new(stream_scan.table_id);
762                let snapshot_epoch = cross_db_snapshot_backfill_info
763                    .upstream_mv_table_id_to_backfill_epoch
764                    .get(&table_id)
765                    .or_else(|| {
766                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
767                            snapshot_backfill_info
768                                .upstream_mv_table_id_to_backfill_epoch
769                                .get(&table_id)
770                        })
771                    })
772                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
773                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
774                if let Some(prev_snapshot_epoch) =
775                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
776                {
777                    Err(anyhow!(
778                        "snapshot backfill epoch set again: {} {} {}",
779                        table_id,
780                        prev_snapshot_epoch,
781                        snapshot_epoch
782                    ))?;
783                }
784                applied = true;
785            };
786            result.is_ok()
787        } else {
788            true
789        }
790    });
791    result.map(|_| applied)
792}
793
794static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
795    LazyLock::new(HashMap::new);
796
797/// A fragment that is either being built or already exists. Used for generalize the logic of
798/// [`crate::stream::ActorGraphBuilder`].
799#[derive(Debug, Clone, EnumAsInner)]
800pub(super) enum EitherFragment {
801    /// An internal fragment that is being built for the current streaming job.
802    Building(BuildingFragment),
803
804    /// An existing fragment that is external but connected to the fragments being built.
805    Existing(Fragment),
806}
807
808/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
809/// fragments, which are connected to the graph's top-most or bottom-most fragments.
810///
811/// For example,
812/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
813///   `Materialize` node will be included in this structure.
814/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
815///   containing the `StreamScan` nodes will be included in this structure.
816#[derive(Debug)]
817pub struct CompleteStreamFragmentGraph {
818    /// The fragment graph of the streaming job being built.
819    building_graph: StreamFragmentGraph,
820
821    /// The required information of existing fragments.
822    existing_fragments: HashMap<GlobalFragmentId, Fragment>,
823
824    /// The location of the actors in the existing fragments.
825    existing_actor_location: HashMap<ActorId, WorkerId>,
826
827    /// Extra edges between existing fragments and the building fragments.
828    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
829
830    /// Extra edges between existing fragments and the building fragments.
831    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
832}
833
834pub struct FragmentGraphUpstreamContext {
835    /// Root fragment is the root of upstream stream graph, which can be a
836    /// mview fragment or source fragment for cdc source job
837    upstream_root_fragments: HashMap<TableId, Fragment>,
838    upstream_actor_location: HashMap<ActorId, WorkerId>,
839}
840
841pub struct FragmentGraphDownstreamContext {
842    original_root_fragment_id: FragmentId,
843    downstream_fragments: Vec<(DispatcherType, Fragment)>,
844    downstream_actor_location: HashMap<ActorId, WorkerId>,
845}
846
847impl CompleteStreamFragmentGraph {
848    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
849    /// upstream mviews.
850    #[cfg(test)]
851    pub fn for_test(graph: StreamFragmentGraph) -> Self {
852        Self {
853            building_graph: graph,
854            existing_fragments: Default::default(),
855            existing_actor_location: Default::default(),
856            extra_downstreams: Default::default(),
857            extra_upstreams: Default::default(),
858        }
859    }
860
861    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
862    /// e.g., MV on MV and CDC/Source Table with the upstream existing
863    /// `Materialize` or `Source` fragments.
864    pub fn with_upstreams(
865        graph: StreamFragmentGraph,
866        upstream_root_fragments: HashMap<TableId, Fragment>,
867        existing_actor_location: HashMap<ActorId, WorkerId>,
868        job_type: StreamingJobType,
869    ) -> MetaResult<Self> {
870        Self::build_helper(
871            graph,
872            Some(FragmentGraphUpstreamContext {
873                upstream_root_fragments,
874                upstream_actor_location: existing_actor_location,
875            }),
876            None,
877            job_type,
878        )
879    }
880
881    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
882    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
883    pub fn with_downstreams(
884        graph: StreamFragmentGraph,
885        original_root_fragment_id: FragmentId,
886        downstream_fragments: Vec<(DispatcherType, Fragment)>,
887        existing_actor_location: HashMap<ActorId, WorkerId>,
888        job_type: StreamingJobType,
889    ) -> MetaResult<Self> {
890        Self::build_helper(
891            graph,
892            None,
893            Some(FragmentGraphDownstreamContext {
894                original_root_fragment_id,
895                downstream_fragments,
896                downstream_actor_location: existing_actor_location,
897            }),
898            job_type,
899        )
900    }
901
902    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
903    pub fn with_upstreams_and_downstreams(
904        graph: StreamFragmentGraph,
905        upstream_root_fragments: HashMap<TableId, Fragment>,
906        upstream_actor_location: HashMap<ActorId, WorkerId>,
907        original_root_fragment_id: FragmentId,
908        downstream_fragments: Vec<(DispatcherType, Fragment)>,
909        downstream_actor_location: HashMap<ActorId, WorkerId>,
910        job_type: StreamingJobType,
911    ) -> MetaResult<Self> {
912        Self::build_helper(
913            graph,
914            Some(FragmentGraphUpstreamContext {
915                upstream_root_fragments,
916                upstream_actor_location,
917            }),
918            Some(FragmentGraphDownstreamContext {
919                original_root_fragment_id,
920                downstream_fragments,
921                downstream_actor_location,
922            }),
923            job_type,
924        )
925    }
926
927    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
928    fn build_helper(
929        mut graph: StreamFragmentGraph,
930        upstream_ctx: Option<FragmentGraphUpstreamContext>,
931        downstream_ctx: Option<FragmentGraphDownstreamContext>,
932        job_type: StreamingJobType,
933    ) -> MetaResult<Self> {
934        let mut extra_downstreams = HashMap::new();
935        let mut extra_upstreams = HashMap::new();
936        let mut existing_fragments = HashMap::new();
937
938        let mut existing_actor_location = HashMap::new();
939
940        if let Some(FragmentGraphUpstreamContext {
941            upstream_root_fragments,
942            upstream_actor_location,
943        }) = upstream_ctx
944        {
945            for (&id, fragment) in &mut graph.fragments {
946                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
947
948                for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
949                    let upstream_fragment = upstream_root_fragments
950                        .get(&upstream_table_id)
951                        .context("upstream fragment not found")?;
952                    let upstream_root_fragment_id =
953                        GlobalFragmentId::new(upstream_fragment.fragment_id);
954
955                    let edge = match job_type {
956                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
957                            // we traverse all fragments in the graph, and we should find out the
958                            // CdcFilter fragment and add an edge between upstream source fragment and it.
959                            assert_ne!(
960                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
961                                0
962                            );
963
964                            tracing::debug!(
965                                ?upstream_root_fragment_id,
966                                ?required_columns,
967                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
968                                current_frag_id=?id,
969                                "CdcFilter with upstream source fragment"
970                            );
971
972                            StreamFragmentEdge {
973                                id: EdgeId::UpstreamExternal {
974                                    upstream_table_id,
975                                    downstream_fragment_id: id,
976                                },
977                                // We always use `NoShuffle` for the exchange between the upstream
978                                // `Source` and the downstream `StreamScan` of the new cdc table.
979                                dispatch_strategy: DispatchStrategy {
980                                    r#type: DispatcherType::NoShuffle as _,
981                                    dist_key_indices: vec![], // not used for `NoShuffle`
982                                    output_indices: (0..CDC_SOURCE_COLUMN_NUM as _).collect(),
983                                },
984                            }
985                        }
986
987                        // handle MV on MV/Source
988                        StreamingJobType::MaterializedView
989                        | StreamingJobType::Sink
990                        | StreamingJobType::Index => {
991                            // Build the extra edges between the upstream `Materialize` and
992                            // the downstream `StreamScan` of the new job.
993                            if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32
994                                != 0
995                            {
996                                // Resolve the required output columns from the upstream materialized view.
997                                let (dist_key_indices, output_indices) = {
998                                    let nodes = &upstream_fragment.nodes;
999                                    let mview_node =
1000                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1001                                    let all_column_ids = mview_node.column_ids();
1002                                    let dist_key_indices = mview_node.dist_key_indices();
1003                                    let output_indices = gen_output_indices(
1004                                        required_columns,
1005                                        all_column_ids,
1006                                    )
1007                                    .context(
1008                                        "BUG: column not found in the upstream materialized view",
1009                                    )?;
1010                                    (dist_key_indices, output_indices)
1011                                };
1012                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1013                                    uses_shuffled_backfill,
1014                                    dist_key_indices,
1015                                    output_indices,
1016                                );
1017
1018                                StreamFragmentEdge {
1019                                    id: EdgeId::UpstreamExternal {
1020                                        upstream_table_id,
1021                                        downstream_fragment_id: id,
1022                                    },
1023                                    dispatch_strategy,
1024                                }
1025                            }
1026                            // Build the extra edges between the upstream `Source` and
1027                            // the downstream `SourceBackfill` of the new job.
1028                            else if upstream_fragment.fragment_type_mask
1029                                & FragmentTypeFlag::Source as u32
1030                                != 0
1031                            {
1032                                let output_indices = {
1033                                    let nodes = &upstream_fragment.nodes;
1034                                    let source_node =
1035                                        nodes.get_node_body().unwrap().as_source().unwrap();
1036
1037                                    let all_column_ids = source_node.column_ids().unwrap();
1038                                    gen_output_indices(required_columns, all_column_ids).context(
1039                                        "BUG: column not found in the upstream source node",
1040                                    )?
1041                                };
1042
1043                                StreamFragmentEdge {
1044                                    id: EdgeId::UpstreamExternal {
1045                                        upstream_table_id,
1046                                        downstream_fragment_id: id,
1047                                    },
1048                                    // We always use `NoShuffle` for the exchange between the upstream
1049                                    // `Source` and the downstream `StreamScan` of the new MV.
1050                                    dispatch_strategy: DispatchStrategy {
1051                                        r#type: DispatcherType::NoShuffle as _,
1052                                        dist_key_indices: vec![], // not used for `NoShuffle`
1053                                        output_indices,
1054                                    },
1055                                }
1056                            } else {
1057                                bail!(
1058                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1059                                    upstream_fragment.fragment_type_mask
1060                                )
1061                            }
1062                        }
1063                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1064                            bail!(
1065                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1066                                job_type
1067                            )
1068                        }
1069                    };
1070
1071                    // put the edge into the extra edges
1072                    extra_downstreams
1073                        .entry(upstream_root_fragment_id)
1074                        .or_insert_with(HashMap::new)
1075                        .try_insert(id, edge.clone())
1076                        .unwrap();
1077                    extra_upstreams
1078                        .entry(id)
1079                        .or_insert_with(HashMap::new)
1080                        .try_insert(upstream_root_fragment_id, edge)
1081                        .unwrap();
1082                }
1083            }
1084
1085            existing_fragments.extend(
1086                upstream_root_fragments
1087                    .into_values()
1088                    .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1089            );
1090
1091            existing_actor_location.extend(upstream_actor_location);
1092        }
1093
1094        if let Some(FragmentGraphDownstreamContext {
1095            original_root_fragment_id,
1096            downstream_fragments,
1097            downstream_actor_location,
1098        }) = downstream_ctx
1099        {
1100            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1101            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1102
1103            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1104            // existing materialized views.
1105            for (dispatcher_type, fragment) in &downstream_fragments {
1106                let id = GlobalFragmentId::new(fragment.fragment_id);
1107
1108                // Similar to `extract_upstream_table_columns_except_cross_db_backfill`.
1109                let output_columns = {
1110                    let mut res = None;
1111
1112                    stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| {
1113                        let columns = match node_body {
1114                            NodeBody::StreamScan(stream_scan) => {
1115                                stream_scan.upstream_column_ids.clone()
1116                            }
1117                            NodeBody::SourceBackfill(source_backfill) => {
1118                                // FIXME: only pass required columns instead of all columns here
1119                                source_backfill.column_ids()
1120                            }
1121                            _ => return,
1122                        };
1123                        res = Some(columns);
1124                    });
1125
1126                    res.context("failed to locate downstream scan")?
1127                };
1128
1129                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1130                let nodes = table_fragment.node.as_ref().unwrap();
1131
1132                let (dist_key_indices, output_indices) = match job_type {
1133                    StreamingJobType::Table(_) => {
1134                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1135                        let all_column_ids = mview_node.column_ids();
1136                        let dist_key_indices = mview_node.dist_key_indices();
1137                        let output_indices = gen_output_indices(&output_columns, all_column_ids)
1138                            .ok_or_else(|| {
1139                                MetaError::invalid_parameter(
1140                                    "unable to drop the column due to \
1141                                     being referenced by downstream materialized views or sinks",
1142                                )
1143                            })?;
1144                        (dist_key_indices, output_indices)
1145                    }
1146
1147                    StreamingJobType::Source => {
1148                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1149                        let all_column_ids = source_node.column_ids().unwrap();
1150                        let output_indices = gen_output_indices(&output_columns, all_column_ids)
1151                            .ok_or_else(|| {
1152                                MetaError::invalid_parameter(
1153                                    "unable to drop the column due to \
1154                                     being referenced by downstream materialized views or sinks",
1155                                )
1156                            })?;
1157                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1158                        (
1159                            vec![], // not used for `NoShuffle`
1160                            output_indices,
1161                        )
1162                    }
1163
1164                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1165                };
1166
1167                let edge = StreamFragmentEdge {
1168                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1169                        original_upstream_fragment_id: original_table_fragment_id,
1170                        downstream_fragment_id: id,
1171                    }),
1172                    dispatch_strategy: DispatchStrategy {
1173                        r#type: *dispatcher_type as i32,
1174                        output_indices,
1175                        dist_key_indices,
1176                    },
1177                };
1178
1179                extra_downstreams
1180                    .entry(table_fragment_id)
1181                    .or_insert_with(HashMap::new)
1182                    .try_insert(id, edge.clone())
1183                    .unwrap();
1184                extra_upstreams
1185                    .entry(id)
1186                    .or_insert_with(HashMap::new)
1187                    .try_insert(table_fragment_id, edge)
1188                    .unwrap();
1189            }
1190
1191            existing_fragments.extend(
1192                downstream_fragments
1193                    .into_iter()
1194                    .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1195            );
1196
1197            existing_actor_location.extend(downstream_actor_location);
1198        }
1199
1200        Ok(Self {
1201            building_graph: graph,
1202            existing_fragments,
1203            existing_actor_location,
1204            extra_downstreams,
1205            extra_upstreams,
1206        })
1207    }
1208}
1209
1210/// Generate the `output_indices` for [`DispatchStrategy`].
1211fn gen_output_indices(required_columns: &Vec<i32>, upstream_columns: Vec<i32>) -> Option<Vec<u32>> {
1212    required_columns
1213        .iter()
1214        .map(|c| {
1215            upstream_columns
1216                .iter()
1217                .position(|&id| id == *c)
1218                .map(|i| i as u32)
1219        })
1220        .collect()
1221}
1222
1223fn mv_on_mv_dispatch_strategy(
1224    uses_shuffled_backfill: bool,
1225    dist_key_indices: Vec<u32>,
1226    output_indices: Vec<u32>,
1227) -> DispatchStrategy {
1228    if uses_shuffled_backfill {
1229        if !dist_key_indices.is_empty() {
1230            DispatchStrategy {
1231                r#type: DispatcherType::Hash as _,
1232                dist_key_indices,
1233                output_indices,
1234            }
1235        } else {
1236            DispatchStrategy {
1237                r#type: DispatcherType::Simple as _,
1238                dist_key_indices: vec![], // empty for Simple
1239                output_indices,
1240            }
1241        }
1242    } else {
1243        DispatchStrategy {
1244            r#type: DispatcherType::NoShuffle as _,
1245            dist_key_indices: vec![], // not used for `NoShuffle`
1246            output_indices,
1247        }
1248    }
1249}
1250
1251impl CompleteStreamFragmentGraph {
1252    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1253    /// building graph.
1254    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1255        self.building_graph
1256            .fragments
1257            .keys()
1258            .chain(self.existing_fragments.keys())
1259            .copied()
1260    }
1261
1262    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1263    pub(super) fn all_edges(
1264        &self,
1265    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1266        self.building_graph
1267            .downstreams
1268            .iter()
1269            .chain(self.extra_downstreams.iter())
1270            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1271    }
1272
1273    /// Returns the distribution of the existing fragments.
1274    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1275        self.existing_fragments
1276            .iter()
1277            .map(|(&id, f)| {
1278                (
1279                    id,
1280                    Distribution::from_fragment(f, &self.existing_actor_location),
1281                )
1282            })
1283            .collect()
1284    }
1285
1286    /// Generate topological order of **all** fragments in this graph, including the ones that are
1287    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1288    /// not be done.
1289    ///
1290    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1291    /// `Sink` / `Materialize` in stream graph.
1292    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1293        let mut topo = Vec::new();
1294        let mut downstream_cnts = HashMap::new();
1295
1296        // Iterate all fragments.
1297        for fragment_id in self.all_fragment_ids() {
1298            // Count how many downstreams we have for a given fragment.
1299            let downstream_cnt = self.get_downstreams(fragment_id).count();
1300            if downstream_cnt == 0 {
1301                topo.push(fragment_id);
1302            } else {
1303                downstream_cnts.insert(fragment_id, downstream_cnt);
1304            }
1305        }
1306
1307        let mut i = 0;
1308        while let Some(&fragment_id) = topo.get(i) {
1309            i += 1;
1310            // Find if we can process more fragments.
1311            for (upstream_id, _) in self.get_upstreams(fragment_id) {
1312                let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1313                *downstream_cnt -= 1;
1314                if *downstream_cnt == 0 {
1315                    downstream_cnts.remove(&upstream_id);
1316                    topo.push(upstream_id);
1317                }
1318            }
1319        }
1320
1321        if !downstream_cnts.is_empty() {
1322            // There are fragments that are not processed yet.
1323            bail!("graph is not a DAG");
1324        }
1325
1326        Ok(topo)
1327    }
1328
1329    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1330    /// to build actors on the compute nodes and persist into meta store.
1331    pub(super) fn seal_fragment(
1332        &self,
1333        id: GlobalFragmentId,
1334        actors: Vec<StreamActor>,
1335        distribution: Distribution,
1336        stream_node: StreamNode,
1337    ) -> Fragment {
1338        let building_fragment = self.get_fragment(id).into_building().unwrap();
1339        let internal_tables = building_fragment.extract_internal_tables();
1340        let BuildingFragment {
1341            inner,
1342            job_id,
1343            upstream_table_columns: _,
1344        } = building_fragment;
1345
1346        let distribution_type = distribution.to_distribution_type();
1347        let vnode_count = distribution.vnode_count();
1348
1349        let materialized_fragment_id =
1350            if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
1351                job_id
1352            } else {
1353                None
1354            };
1355
1356        let state_table_ids = internal_tables
1357            .iter()
1358            .map(|t| t.id)
1359            .chain(materialized_fragment_id)
1360            .collect();
1361
1362        Fragment {
1363            fragment_id: inner.fragment_id,
1364            fragment_type_mask: inner.fragment_type_mask,
1365            distribution_type,
1366            actors,
1367            state_table_ids,
1368            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1369            nodes: stream_node,
1370        }
1371    }
1372
1373    /// Get a fragment from the complete graph, which can be either a building fragment or an
1374    /// existing fragment.
1375    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1376        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1377            EitherFragment::Existing(fragment.clone())
1378        } else {
1379            EitherFragment::Building(
1380                self.building_graph
1381                    .fragments
1382                    .get(&fragment_id)
1383                    .unwrap()
1384                    .clone(),
1385            )
1386        }
1387    }
1388
1389    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1390    /// graph.
1391    pub(super) fn get_downstreams(
1392        &self,
1393        fragment_id: GlobalFragmentId,
1394    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1395        self.building_graph
1396            .get_downstreams(fragment_id)
1397            .iter()
1398            .chain(
1399                self.extra_downstreams
1400                    .get(&fragment_id)
1401                    .into_iter()
1402                    .flatten(),
1403            )
1404            .map(|(&id, edge)| (id, edge))
1405    }
1406
1407    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1408    /// graph.
1409    pub(super) fn get_upstreams(
1410        &self,
1411        fragment_id: GlobalFragmentId,
1412    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1413        self.building_graph
1414            .get_upstreams(fragment_id)
1415            .iter()
1416            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1417            .map(|(&id, edge)| (id, edge))
1418    }
1419
1420    /// Returns all building fragments in the graph.
1421    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1422        &self.building_graph.fragments
1423    }
1424
1425    /// Returns all building fragments in the graph, mutable.
1426    pub(super) fn building_fragments_mut(
1427        &mut self,
1428    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1429        &mut self.building_graph.fragments
1430    }
1431
1432    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1433    pub(super) fn max_parallelism(&self) -> usize {
1434        self.building_graph.max_parallelism()
1435    }
1436}