risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25    CDC_SOURCE_COLUMN_NUM, TableId, generate_internal_table_name_with_type,
26};
27use risingwave_common::hash::VnodeCount;
28use risingwave_common::util::iter_util::ZipEqFast;
29use risingwave_common::util::stream_graph_visitor::{
30    self, visit_stream_node_cont, visit_stream_node_cont_mut,
31};
32use risingwave_meta_model::WorkerId;
33use risingwave_pb::catalog::Table;
34use risingwave_pb::ddl_service::TableJobType;
35use risingwave_pb::plan_common::PbColumnDesc;
36use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
37use risingwave_pb::stream_plan::stream_fragment_graph::{
38    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
39};
40use risingwave_pb::stream_plan::stream_node::NodeBody;
41use risingwave_pb::stream_plan::{
42    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, FragmentTypeFlag,
43    PbStreamNode, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
44    StreamScanType,
45};
46
47use crate::barrier::SnapshotBackfillInfo;
48use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
49use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
50use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
51use crate::stream::stream_graph::schedule::Distribution;
52use crate::{MetaError, MetaResult};
53
54/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
55/// several additional helper fields.
56#[derive(Debug, Clone)]
57pub(super) struct BuildingFragment {
58    /// The fragment structure from the frontend, with the global fragment ID.
59    inner: StreamFragment,
60
61    /// The ID of the job if it contains the streaming job node.
62    job_id: Option<u32>,
63
64    /// The required column IDs of each upstream table.
65    /// Will be converted to indices when building the edge connected to the upstream.
66    ///
67    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
68    upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
69}
70
71impl BuildingFragment {
72    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
73    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
74    fn new(
75        id: GlobalFragmentId,
76        fragment: StreamFragment,
77        job: &StreamingJob,
78        table_id_gen: GlobalTableIdGen,
79    ) -> Self {
80        let mut fragment = StreamFragment {
81            fragment_id: id.as_global_id(),
82            ..fragment
83        };
84
85        // Fill the information of the internal tables in the fragment.
86        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
87
88        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
89        let upstream_table_columns =
90            Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
91
92        Self {
93            inner: fragment,
94            job_id,
95            upstream_table_columns,
96        }
97    }
98
99    /// Extract the internal tables from the fragment.
100    fn extract_internal_tables(&self) -> Vec<Table> {
101        let mut fragment = self.inner.to_owned();
102        let mut tables = Vec::new();
103        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
104            tables.push(table.clone());
105        });
106        tables
107    }
108
109    /// Fill the information with the internal tables in the fragment.
110    fn fill_internal_tables(
111        fragment: &mut StreamFragment,
112        job: &StreamingJob,
113        table_id_gen: GlobalTableIdGen,
114    ) {
115        let fragment_id = fragment.fragment_id;
116        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
117            table.id = table_id_gen.to_global_id(table.id).as_global_id();
118            table.schema_id = job.schema_id();
119            table.database_id = job.database_id();
120            table.name = generate_internal_table_name_with_type(
121                &job.name(),
122                fragment_id,
123                table.id,
124                table_type_name,
125            );
126            table.fragment_id = fragment_id;
127            table.owner = job.owner();
128        });
129    }
130
131    /// Fill the information with the job in the fragment.
132    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
133        let job_id = job.id();
134        let fragment_id = fragment.fragment_id;
135        let mut has_job = false;
136
137        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
138            NodeBody::Materialize(materialize_node) => {
139                materialize_node.table_id = job_id;
140
141                // Fill the ID of the `Table`.
142                let table = materialize_node.table.as_mut().unwrap();
143                table.id = job_id;
144                table.database_id = job.database_id();
145                table.schema_id = job.schema_id();
146                table.fragment_id = fragment_id;
147                #[cfg(not(debug_assertions))]
148                {
149                    table.definition = job.name();
150                }
151
152                has_job = true;
153            }
154            NodeBody::Sink(sink_node) => {
155                sink_node.sink_desc.as_mut().unwrap().id = job_id;
156
157                has_job = true;
158            }
159            NodeBody::Dml(dml_node) => {
160                dml_node.table_id = job_id;
161                dml_node.table_version_id = job.table_version_id().unwrap();
162            }
163            NodeBody::StreamFsFetch(fs_fetch_node) => {
164                if let StreamingJob::Table(table_source, _, _) = job {
165                    if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
166                        && let Some(source) = table_source
167                    {
168                        node_inner.source_id = source.id;
169                    }
170                }
171            }
172            NodeBody::Source(source_node) => {
173                match job {
174                    // Note: For table without connector, it has a dummy Source node.
175                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
176                    StreamingJob::Table(source, _table, _table_job_type) => {
177                        if let Some(source_inner) = source_node.source_inner.as_mut() {
178                            if let Some(source) = source {
179                                debug_assert_ne!(source.id, job_id);
180                                source_inner.source_id = source.id;
181                            }
182                        }
183                    }
184                    StreamingJob::Source(source) => {
185                        has_job = true;
186                        if let Some(source_inner) = source_node.source_inner.as_mut() {
187                            debug_assert_eq!(source.id, job_id);
188                            source_inner.source_id = source.id;
189                        }
190                    }
191                    // For other job types, no need to fill the source id, since it refers to an existing source.
192                    _ => {}
193                }
194            }
195            NodeBody::StreamCdcScan(node) => {
196                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
197                    table_desc.table_id = job_id;
198                }
199            }
200            _ => {}
201        });
202
203        has_job
204    }
205
206    /// Extract the required columns of each upstream table except for cross-db backfill.
207    fn extract_upstream_table_columns_except_cross_db_backfill(
208        fragment: &StreamFragment,
209    ) -> HashMap<TableId, Vec<PbColumnDesc>> {
210        let mut table_columns = HashMap::new();
211
212        stream_graph_visitor::visit_fragment(fragment, |node_body| {
213            let (table_id, column_ids) = match node_body {
214                NodeBody::StreamScan(stream_scan) => {
215                    if stream_scan.get_stream_scan_type().unwrap()
216                        == StreamScanType::CrossDbSnapshotBackfill
217                    {
218                        return;
219                    }
220                    (stream_scan.table_id.into(), stream_scan.upstream_columns())
221                }
222                NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
223                NodeBody::SourceBackfill(backfill) => (
224                    backfill.upstream_source_id.into(),
225                    // FIXME: only pass required columns instead of all columns here
226                    backfill.column_descs(),
227                ),
228                _ => return,
229            };
230            table_columns
231                .try_insert(table_id, column_ids)
232                .expect("currently there should be no two same upstream tables in a fragment");
233        });
234
235        table_columns
236    }
237
238    pub fn has_shuffled_backfill(&self) -> bool {
239        let stream_node = match self.inner.node.as_ref() {
240            Some(node) => node,
241            _ => return false,
242        };
243        let mut has_shuffled_backfill = false;
244        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
245        visit_stream_node_cont(stream_node, |node| {
246            let is_shuffled_backfill = if let Some(node) = &node.node_body
247                && let Some(node) = node.as_stream_scan()
248            {
249                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
250                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
251            } else {
252                false
253            };
254            if is_shuffled_backfill {
255                *has_shuffled_backfill_mut_ref = true;
256                false
257            } else {
258                true
259            }
260        });
261        has_shuffled_backfill
262    }
263}
264
265impl Deref for BuildingFragment {
266    type Target = StreamFragment;
267
268    fn deref(&self) -> &Self::Target {
269        &self.inner
270    }
271}
272
273impl DerefMut for BuildingFragment {
274    fn deref_mut(&mut self) -> &mut Self::Target {
275        &mut self.inner
276    }
277}
278
279/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
280/// different variants.
281#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
282pub(super) enum EdgeId {
283    /// The edge between two building (internal) fragments.
284    Internal {
285        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
286        /// See [`StreamFragmentEdgeProto`].
287        link_id: u64,
288    },
289
290    /// The edge between an upstream external fragment and downstream building fragment. Used for
291    /// MV on MV.
292    UpstreamExternal {
293        /// The ID of the upstream table or materialized view.
294        upstream_table_id: TableId,
295        /// The ID of the downstream fragment.
296        downstream_fragment_id: GlobalFragmentId,
297    },
298
299    /// The edge between an upstream building fragment and downstream external fragment. Used for
300    /// schema change (replace table plan).
301    DownstreamExternal(DownstreamExternalEdgeId),
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
305pub(super) struct DownstreamExternalEdgeId {
306    /// The ID of the original upstream fragment (`Materialize`).
307    pub(super) original_upstream_fragment_id: GlobalFragmentId,
308    /// The ID of the downstream fragment.
309    pub(super) downstream_fragment_id: GlobalFragmentId,
310}
311
312/// The edge in the fragment graph.
313///
314/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
315#[derive(Debug, Clone)]
316pub(super) struct StreamFragmentEdge {
317    /// The ID of the edge.
318    pub id: EdgeId,
319
320    /// The strategy used for dispatching the data.
321    pub dispatch_strategy: DispatchStrategy,
322}
323
324impl StreamFragmentEdge {
325    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
326        Self {
327            // By creating an edge from the protobuf, we know that the edge is from the frontend and
328            // is internal.
329            id: EdgeId::Internal {
330                link_id: edge.link_id,
331            },
332            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
333        }
334    }
335}
336
337/// Adjacency list (G) of backfill orders.
338/// `G[10] -> [1, 2, 11]`
339/// means for the backfill node in `fragment 10`
340/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
341pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
342
343/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
344/// from the frontend.
345///
346/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
347/// that contains the additional information of pre-existing
348/// fragments, which are connected to the graph's top-most or bottom-most fragments.
349#[derive(Default, Debug)]
350pub struct StreamFragmentGraph {
351    /// stores all the fragments in the graph.
352    fragments: HashMap<GlobalFragmentId, BuildingFragment>,
353
354    /// stores edges between fragments: upstream => downstream.
355    downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
356
357    /// stores edges between fragments: downstream -> upstream.
358    upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
359
360    /// Dependent relations of this job.
361    dependent_table_ids: HashSet<TableId>,
362
363    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
364    /// variable. If not specified, all active worker slots will be used.
365    specified_parallelism: Option<NonZeroUsize>,
366
367    /// Specified max parallelism, i.e., expected vnode count for the graph.
368    ///
369    /// The scheduler on the meta service will use this as a hint to decide the vnode count
370    /// for each fragment.
371    ///
372    /// Note that the actual vnode count may be different from this value.
373    /// For example, a no-shuffle exchange between current fragment graph and an existing
374    /// upstream fragment graph requires two fragments to be in the same distribution,
375    /// thus the same vnode count.
376    max_parallelism: usize,
377
378    /// The backfill ordering strategy of the graph.
379    backfill_order: BackfillOrder,
380}
381
382impl StreamFragmentGraph {
383    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
384    /// global IDs correctly filled.
385    pub fn new(
386        env: &MetaSrvEnv,
387        proto: StreamFragmentGraphProto,
388        job: &StreamingJob,
389    ) -> MetaResult<Self> {
390        let fragment_id_gen =
391            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
392        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
393        // with `refill_internal_table_ids` later.
394        // TODO: refactor the code to remove this step.
395        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
396
397        // Create nodes.
398        let fragments: HashMap<_, _> = proto
399            .fragments
400            .into_iter()
401            .map(|(id, fragment)| {
402                let id = fragment_id_gen.to_global_id(id);
403                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
404                (id, fragment)
405            })
406            .collect();
407
408        assert_eq!(
409            fragments
410                .values()
411                .map(|f| f.extract_internal_tables().len() as u32)
412                .sum::<u32>(),
413            proto.table_ids_cnt
414        );
415
416        // Create edges.
417        let mut downstreams = HashMap::new();
418        let mut upstreams = HashMap::new();
419
420        for edge in proto.edges {
421            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
422            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
423            let edge = StreamFragmentEdge::from_protobuf(&edge);
424
425            upstreams
426                .entry(downstream_id)
427                .or_insert_with(HashMap::new)
428                .try_insert(upstream_id, edge.clone())
429                .unwrap();
430            downstreams
431                .entry(upstream_id)
432                .or_insert_with(HashMap::new)
433                .try_insert(downstream_id, edge)
434                .unwrap();
435        }
436
437        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
438        // frontend), instead of visiting the graph ourselves.
439        let dependent_table_ids = proto
440            .dependent_table_ids
441            .iter()
442            .map(TableId::from)
443            .collect();
444
445        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
446            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
447        } else {
448            None
449        };
450
451        let max_parallelism = proto.max_parallelism as usize;
452        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
453            order: Default::default(),
454        });
455
456        Ok(Self {
457            fragments,
458            downstreams,
459            upstreams,
460            dependent_table_ids,
461            specified_parallelism,
462            max_parallelism,
463            backfill_order,
464        })
465    }
466
467    /// Retrieve the **incomplete** internal tables map of the whole graph.
468    ///
469    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
470    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
471    /// Be careful when using the returned values.
472    ///
473    /// See also [`crate::model::StreamJobFragments::internal_tables`].
474    pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
475        let mut tables = BTreeMap::new();
476        for fragment in self.fragments.values() {
477            for table in fragment.extract_internal_tables() {
478                let table_id = table.id;
479                tables
480                    .try_insert(table_id, table)
481                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
482            }
483        }
484        tables
485    }
486
487    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
488    /// `create_internal_table_catalog`.
489    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
490        for fragment in self.fragments.values_mut() {
491            stream_graph_visitor::visit_internal_tables(
492                &mut fragment.inner,
493                |table, _table_type_name| {
494                    let target = table_id_map.get(&table.id).cloned().unwrap();
495                    table.id = target;
496                },
497            );
498        }
499    }
500
501    /// Set internal tables' `table_id`s according to a list of internal tables
502    pub fn fit_internal_table_ids(
503        &mut self,
504        mut old_internal_tables: Vec<Table>,
505    ) -> MetaResult<()> {
506        let mut new_internal_table_ids = Vec::new();
507        for fragment in self.fragments.values() {
508            for table in &fragment.extract_internal_tables() {
509                new_internal_table_ids.push(table.id);
510            }
511        }
512
513        if new_internal_table_ids.len() != old_internal_tables.len() {
514            bail!(
515                "Different number of internal tables. New: {}, Old: {}",
516                new_internal_table_ids.len(),
517                old_internal_tables.len()
518            );
519        }
520        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
521        new_internal_table_ids.sort();
522
523        let internal_table_id_map = new_internal_table_ids
524            .into_iter()
525            .zip_eq_fast(old_internal_tables.into_iter())
526            .collect::<HashMap<_, _>>();
527
528        for fragment in self.fragments.values_mut() {
529            stream_graph_visitor::visit_internal_tables(
530                &mut fragment.inner,
531                |table, _table_type_name| {
532                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
533                    *table = target;
534                },
535            );
536        }
537
538        Ok(())
539    }
540
541    /// Returns the fragment id where the streaming job node located.
542    pub fn table_fragment_id(&self) -> FragmentId {
543        self.fragments
544            .values()
545            .filter(|b| b.job_id.is_some())
546            .map(|b| b.fragment_id)
547            .exactly_one()
548            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
549    }
550
551    /// Returns the fragment id where the table dml is received.
552    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
553        self.fragments
554            .values()
555            .filter(|b| b.fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0)
556            .map(|b| b.fragment_id)
557            .at_most_one()
558            .expect("require at most 1 dml node when creating the streaming job")
559    }
560
561    /// Get the dependent streaming job ids of this job.
562    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
563        &self.dependent_table_ids
564    }
565
566    /// Get the parallelism of the job, if specified by the user.
567    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
568        self.specified_parallelism
569    }
570
571    /// Get the expected vnode count of the graph. See documentation of the field for more details.
572    pub fn max_parallelism(&self) -> usize {
573        self.max_parallelism
574    }
575
576    /// Get downstreams of a fragment.
577    fn get_downstreams(
578        &self,
579        fragment_id: GlobalFragmentId,
580    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
581        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
582    }
583
584    /// Get upstreams of a fragment.
585    fn get_upstreams(
586        &self,
587        fragment_id: GlobalFragmentId,
588    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
589        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
590    }
591
592    pub fn collect_snapshot_backfill_info(
593        &self,
594    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
595        Self::collect_snapshot_backfill_info_impl(
596            self.fragments
597                .values()
598                .map(|fragment| (fragment.node.as_ref().unwrap(), fragment.fragment_type_mask)),
599        )
600    }
601
602    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
603    pub fn collect_snapshot_backfill_info_impl(
604        fragments: impl IntoIterator<Item = (&PbStreamNode, u32)>,
605    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
606        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
607        let mut cross_db_info = SnapshotBackfillInfo {
608            upstream_mv_table_id_to_backfill_epoch: Default::default(),
609        };
610        let mut result = Ok(());
611        for (node, fragment_type_mask) in fragments {
612            visit_stream_node_cont(node, |node| {
613                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
614                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
615                        .expect("invalid stream_scan_type");
616                    let is_snapshot_backfill = match stream_scan_type {
617                        StreamScanType::SnapshotBackfill => {
618                            assert!(
619                                (fragment_type_mask
620                                    & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
621                                    > 0
622                            );
623                            true
624                        }
625                        StreamScanType::CrossDbSnapshotBackfill => {
626                            assert!(
627                                (fragment_type_mask
628                                    & (FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32))
629                                    > 0
630                            );
631                            cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
632                                TableId::new(stream_scan.table_id),
633                                stream_scan.snapshot_backfill_epoch,
634                            );
635
636                            return true;
637                        }
638                        _ => false,
639                    };
640
641                    match &mut prev_stream_scan {
642                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
643                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
644                                (Some(prev_snapshot_backfill_info), true) => {
645                                    prev_snapshot_backfill_info
646                                        .upstream_mv_table_id_to_backfill_epoch
647                                        .insert(
648                                            TableId::new(stream_scan.table_id),
649                                            stream_scan.snapshot_backfill_epoch,
650                                        );
651                                    true
652                                }
653                                (None, false) => true,
654                                (_, _) => {
655                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
656                                    false
657                                }
658                            }
659                        }
660                        None => {
661                            prev_stream_scan = Some((
662                                if is_snapshot_backfill {
663                                    Some(SnapshotBackfillInfo {
664                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
665                                            [(
666                                                TableId::new(stream_scan.table_id),
667                                                stream_scan.snapshot_backfill_epoch,
668                                            )],
669                                        ),
670                                    })
671                                } else {
672                                    None
673                                },
674                                *stream_scan.clone(),
675                            ));
676                            true
677                        }
678                    }
679                } else {
680                    true
681                }
682            })
683        }
684        result.map(|_| {
685            (
686                prev_stream_scan
687                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
688                    .unwrap_or(None),
689                cross_db_info,
690            )
691        })
692    }
693
694    /// Collect the mapping from table / `source_id` -> `fragment_id`
695    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
696        let mut mapping = HashMap::new();
697        for (fragment_id, fragment) in &self.fragments {
698            let fragment_id = fragment_id.as_global_id();
699            let fragment_mask = fragment.fragment_type_mask;
700            // TODO(kwannoel): Support source scan
701            let candidates = [FragmentTypeFlag::StreamScan];
702            let has_some_scan = candidates
703                .into_iter()
704                .any(|flag| (fragment_mask & flag as u32) > 0);
705            if has_some_scan {
706                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
707                    match node.node_body.as_ref() {
708                        Some(NodeBody::StreamScan(stream_scan)) => {
709                            let table_id = stream_scan.table_id;
710                            let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
711                            fragments.push(fragment_id);
712                            // each fragment should have only 1 scan node.
713                            false
714                        }
715                        // TODO(kwannoel): Support source scan
716                        _ => true,
717                    }
718                })
719            }
720        }
721        mapping
722    }
723
724    /// Initially the mapping that comes from frontend is between `table_ids`.
725    /// We should remap it to fragment level, since we track progress by actor, and we can get
726    /// a fragment <-> actor mapping
727    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
728        let mapping = self.collect_backfill_mapping();
729        let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
730        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
731            let fragment_ids = mapping.get(rel_id).unwrap();
732            for fragment_id in fragment_ids {
733                let downstream_fragment_ids = downstream_rel_ids
734                    .data
735                    .iter()
736                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
737                    .copied()
738                    .collect();
739                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
740            }
741        }
742        fragment_ordering
743    }
744}
745
746/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
747/// Return `true` when has change applied.
748pub fn fill_snapshot_backfill_epoch(
749    node: &mut StreamNode,
750    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
751    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
752) -> MetaResult<bool> {
753    let mut result = Ok(());
754    let mut applied = false;
755    visit_stream_node_cont_mut(node, |node| {
756        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
757            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
758                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
759        {
760            result = try {
761                let table_id = TableId::new(stream_scan.table_id);
762                let snapshot_epoch = cross_db_snapshot_backfill_info
763                    .upstream_mv_table_id_to_backfill_epoch
764                    .get(&table_id)
765                    .or_else(|| {
766                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
767                            snapshot_backfill_info
768                                .upstream_mv_table_id_to_backfill_epoch
769                                .get(&table_id)
770                        })
771                    })
772                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
773                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
774                if let Some(prev_snapshot_epoch) =
775                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
776                {
777                    Err(anyhow!(
778                        "snapshot backfill epoch set again: {} {} {}",
779                        table_id,
780                        prev_snapshot_epoch,
781                        snapshot_epoch
782                    ))?;
783                }
784                applied = true;
785            };
786            result.is_ok()
787        } else {
788            true
789        }
790    });
791    result.map(|_| applied)
792}
793
794static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
795    LazyLock::new(HashMap::new);
796
797/// A fragment that is either being built or already exists. Used for generalize the logic of
798/// [`crate::stream::ActorGraphBuilder`].
799#[derive(Debug, Clone, EnumAsInner)]
800pub(super) enum EitherFragment {
801    /// An internal fragment that is being built for the current streaming job.
802    Building(BuildingFragment),
803
804    /// An existing fragment that is external but connected to the fragments being built.
805    Existing(Fragment),
806}
807
808/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
809/// fragments, which are connected to the graph's top-most or bottom-most fragments.
810///
811/// For example,
812/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
813///   `Materialize` node will be included in this structure.
814/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
815///   containing the `StreamScan` nodes will be included in this structure.
816#[derive(Debug)]
817pub struct CompleteStreamFragmentGraph {
818    /// The fragment graph of the streaming job being built.
819    building_graph: StreamFragmentGraph,
820
821    /// The required information of existing fragments.
822    existing_fragments: HashMap<GlobalFragmentId, Fragment>,
823
824    /// The location of the actors in the existing fragments.
825    existing_actor_location: HashMap<ActorId, WorkerId>,
826
827    /// Extra edges between existing fragments and the building fragments.
828    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
829
830    /// Extra edges between existing fragments and the building fragments.
831    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
832}
833
834pub struct FragmentGraphUpstreamContext {
835    /// Root fragment is the root of upstream stream graph, which can be a
836    /// mview fragment or source fragment for cdc source job
837    upstream_root_fragments: HashMap<TableId, Fragment>,
838    upstream_actor_location: HashMap<ActorId, WorkerId>,
839}
840
841pub struct FragmentGraphDownstreamContext {
842    original_root_fragment_id: FragmentId,
843    downstream_fragments: Vec<(DispatcherType, Fragment)>,
844    downstream_actor_location: HashMap<ActorId, WorkerId>,
845}
846
847impl CompleteStreamFragmentGraph {
848    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
849    /// upstream mviews.
850    #[cfg(test)]
851    pub fn for_test(graph: StreamFragmentGraph) -> Self {
852        Self {
853            building_graph: graph,
854            existing_fragments: Default::default(),
855            existing_actor_location: Default::default(),
856            extra_downstreams: Default::default(),
857            extra_upstreams: Default::default(),
858        }
859    }
860
861    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
862    /// e.g., MV on MV and CDC/Source Table with the upstream existing
863    /// `Materialize` or `Source` fragments.
864    pub fn with_upstreams(
865        graph: StreamFragmentGraph,
866        upstream_root_fragments: HashMap<TableId, Fragment>,
867        existing_actor_location: HashMap<ActorId, WorkerId>,
868        job_type: StreamingJobType,
869    ) -> MetaResult<Self> {
870        Self::build_helper(
871            graph,
872            Some(FragmentGraphUpstreamContext {
873                upstream_root_fragments,
874                upstream_actor_location: existing_actor_location,
875            }),
876            None,
877            job_type,
878        )
879    }
880
881    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
882    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
883    pub fn with_downstreams(
884        graph: StreamFragmentGraph,
885        original_root_fragment_id: FragmentId,
886        downstream_fragments: Vec<(DispatcherType, Fragment)>,
887        existing_actor_location: HashMap<ActorId, WorkerId>,
888        job_type: StreamingJobType,
889    ) -> MetaResult<Self> {
890        Self::build_helper(
891            graph,
892            None,
893            Some(FragmentGraphDownstreamContext {
894                original_root_fragment_id,
895                downstream_fragments,
896                downstream_actor_location: existing_actor_location,
897            }),
898            job_type,
899        )
900    }
901
902    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
903    pub fn with_upstreams_and_downstreams(
904        graph: StreamFragmentGraph,
905        upstream_root_fragments: HashMap<TableId, Fragment>,
906        upstream_actor_location: HashMap<ActorId, WorkerId>,
907        original_root_fragment_id: FragmentId,
908        downstream_fragments: Vec<(DispatcherType, Fragment)>,
909        downstream_actor_location: HashMap<ActorId, WorkerId>,
910        job_type: StreamingJobType,
911    ) -> MetaResult<Self> {
912        Self::build_helper(
913            graph,
914            Some(FragmentGraphUpstreamContext {
915                upstream_root_fragments,
916                upstream_actor_location,
917            }),
918            Some(FragmentGraphDownstreamContext {
919                original_root_fragment_id,
920                downstream_fragments,
921                downstream_actor_location,
922            }),
923            job_type,
924        )
925    }
926
927    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
928    fn build_helper(
929        mut graph: StreamFragmentGraph,
930        upstream_ctx: Option<FragmentGraphUpstreamContext>,
931        downstream_ctx: Option<FragmentGraphDownstreamContext>,
932        job_type: StreamingJobType,
933    ) -> MetaResult<Self> {
934        let mut extra_downstreams = HashMap::new();
935        let mut extra_upstreams = HashMap::new();
936        let mut existing_fragments = HashMap::new();
937
938        let mut existing_actor_location = HashMap::new();
939
940        if let Some(FragmentGraphUpstreamContext {
941            upstream_root_fragments,
942            upstream_actor_location,
943        }) = upstream_ctx
944        {
945            for (&id, fragment) in &mut graph.fragments {
946                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
947
948                for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
949                    let upstream_fragment = upstream_root_fragments
950                        .get(&upstream_table_id)
951                        .context("upstream fragment not found")?;
952                    let upstream_root_fragment_id =
953                        GlobalFragmentId::new(upstream_fragment.fragment_id);
954
955                    let edge = match job_type {
956                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
957                            // we traverse all fragments in the graph, and we should find out the
958                            // CdcFilter fragment and add an edge between upstream source fragment and it.
959                            assert_ne!(
960                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
961                                0
962                            );
963
964                            tracing::debug!(
965                                ?upstream_root_fragment_id,
966                                ?required_columns,
967                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
968                                current_frag_id=?id,
969                                "CdcFilter with upstream source fragment"
970                            );
971
972                            StreamFragmentEdge {
973                                id: EdgeId::UpstreamExternal {
974                                    upstream_table_id,
975                                    downstream_fragment_id: id,
976                                },
977                                // We always use `NoShuffle` for the exchange between the upstream
978                                // `Source` and the downstream `StreamScan` of the new cdc table.
979                                dispatch_strategy: DispatchStrategy {
980                                    r#type: DispatcherType::NoShuffle as _,
981                                    dist_key_indices: vec![], // not used for `NoShuffle`
982                                    output_mapping: DispatchOutputMapping::identical(
983                                        CDC_SOURCE_COLUMN_NUM as _,
984                                    )
985                                    .into(),
986                                },
987                            }
988                        }
989
990                        // handle MV on MV/Source
991                        StreamingJobType::MaterializedView
992                        | StreamingJobType::Sink
993                        | StreamingJobType::Index => {
994                            // Build the extra edges between the upstream `Materialize` and
995                            // the downstream `StreamScan` of the new job.
996                            if upstream_fragment.fragment_type_mask & FragmentTypeFlag::Mview as u32
997                                != 0
998                            {
999                                // Resolve the required output columns from the upstream materialized view.
1000                                let (dist_key_indices, output_mapping) = {
1001                                    let nodes = &upstream_fragment.nodes;
1002                                    let mview_node =
1003                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1004                                    let all_columns = mview_node.column_descs();
1005                                    let dist_key_indices = mview_node.dist_key_indices();
1006                                    let output_mapping = gen_output_mapping(
1007                                        required_columns,
1008                                        &all_columns,
1009                                    )
1010                                    .context(
1011                                        "BUG: column not found in the upstream materialized view",
1012                                    )?;
1013                                    (dist_key_indices, output_mapping)
1014                                };
1015                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1016                                    uses_shuffled_backfill,
1017                                    dist_key_indices,
1018                                    output_mapping,
1019                                );
1020
1021                                StreamFragmentEdge {
1022                                    id: EdgeId::UpstreamExternal {
1023                                        upstream_table_id,
1024                                        downstream_fragment_id: id,
1025                                    },
1026                                    dispatch_strategy,
1027                                }
1028                            }
1029                            // Build the extra edges between the upstream `Source` and
1030                            // the downstream `SourceBackfill` of the new job.
1031                            else if upstream_fragment.fragment_type_mask
1032                                & FragmentTypeFlag::Source as u32
1033                                != 0
1034                            {
1035                                let output_mapping = {
1036                                    let nodes = &upstream_fragment.nodes;
1037                                    let source_node =
1038                                        nodes.get_node_body().unwrap().as_source().unwrap();
1039
1040                                    let all_columns = source_node.column_descs().unwrap();
1041                                    gen_output_mapping(required_columns, &all_columns).context(
1042                                        "BUG: column not found in the upstream source node",
1043                                    )?
1044                                };
1045
1046                                StreamFragmentEdge {
1047                                    id: EdgeId::UpstreamExternal {
1048                                        upstream_table_id,
1049                                        downstream_fragment_id: id,
1050                                    },
1051                                    // We always use `NoShuffle` for the exchange between the upstream
1052                                    // `Source` and the downstream `StreamScan` of the new MV.
1053                                    dispatch_strategy: DispatchStrategy {
1054                                        r#type: DispatcherType::NoShuffle as _,
1055                                        dist_key_indices: vec![], // not used for `NoShuffle`
1056                                        output_mapping: Some(output_mapping),
1057                                    },
1058                                }
1059                            } else {
1060                                bail!(
1061                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1062                                    upstream_fragment.fragment_type_mask
1063                                )
1064                            }
1065                        }
1066                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1067                            bail!(
1068                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1069                                job_type
1070                            )
1071                        }
1072                    };
1073
1074                    // put the edge into the extra edges
1075                    extra_downstreams
1076                        .entry(upstream_root_fragment_id)
1077                        .or_insert_with(HashMap::new)
1078                        .try_insert(id, edge.clone())
1079                        .unwrap();
1080                    extra_upstreams
1081                        .entry(id)
1082                        .or_insert_with(HashMap::new)
1083                        .try_insert(upstream_root_fragment_id, edge)
1084                        .unwrap();
1085                }
1086            }
1087
1088            existing_fragments.extend(
1089                upstream_root_fragments
1090                    .into_values()
1091                    .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1092            );
1093
1094            existing_actor_location.extend(upstream_actor_location);
1095        }
1096
1097        if let Some(FragmentGraphDownstreamContext {
1098            original_root_fragment_id,
1099            downstream_fragments,
1100            downstream_actor_location,
1101        }) = downstream_ctx
1102        {
1103            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1104            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1105
1106            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1107            // existing materialized views.
1108            for (dispatcher_type, fragment) in &downstream_fragments {
1109                let id = GlobalFragmentId::new(fragment.fragment_id);
1110
1111                // Similar to `extract_upstream_table_columns_except_cross_db_backfill`.
1112                let output_columns = {
1113                    let mut res = None;
1114
1115                    stream_graph_visitor::visit_stream_node(&fragment.nodes, |node_body| {
1116                        let columns = match node_body {
1117                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1118                            NodeBody::SourceBackfill(source_backfill) => {
1119                                // FIXME: only pass required columns instead of all columns here
1120                                source_backfill.column_descs()
1121                            }
1122                            _ => return,
1123                        };
1124                        res = Some(columns);
1125                    });
1126
1127                    res.context("failed to locate downstream scan")?
1128                };
1129
1130                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1131                let nodes = table_fragment.node.as_ref().unwrap();
1132
1133                let (dist_key_indices, output_mapping) = match job_type {
1134                    StreamingJobType::Table(_) => {
1135                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1136                        let all_columns = mview_node.column_descs();
1137                        let dist_key_indices = mview_node.dist_key_indices();
1138                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1139                            .ok_or_else(|| {
1140                                MetaError::invalid_parameter(
1141                                    "unable to drop the column due to \
1142                                     being referenced by downstream materialized views or sinks",
1143                                )
1144                            })?;
1145                        (dist_key_indices, output_mapping)
1146                    }
1147
1148                    StreamingJobType::Source => {
1149                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1150                        let all_columns = source_node.column_descs().unwrap();
1151                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1152                            .ok_or_else(|| {
1153                                MetaError::invalid_parameter(
1154                                    "unable to drop the column due to \
1155                                     being referenced by downstream materialized views or sinks",
1156                                )
1157                            })?;
1158                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1159                        (
1160                            vec![], // not used for `NoShuffle`
1161                            output_mapping,
1162                        )
1163                    }
1164
1165                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1166                };
1167
1168                let edge = StreamFragmentEdge {
1169                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1170                        original_upstream_fragment_id: original_table_fragment_id,
1171                        downstream_fragment_id: id,
1172                    }),
1173                    dispatch_strategy: DispatchStrategy {
1174                        r#type: *dispatcher_type as i32,
1175                        output_mapping: Some(output_mapping),
1176                        dist_key_indices,
1177                    },
1178                };
1179
1180                extra_downstreams
1181                    .entry(table_fragment_id)
1182                    .or_insert_with(HashMap::new)
1183                    .try_insert(id, edge.clone())
1184                    .unwrap();
1185                extra_upstreams
1186                    .entry(id)
1187                    .or_insert_with(HashMap::new)
1188                    .try_insert(table_fragment_id, edge)
1189                    .unwrap();
1190            }
1191
1192            existing_fragments.extend(
1193                downstream_fragments
1194                    .into_iter()
1195                    .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1196            );
1197
1198            existing_actor_location.extend(downstream_actor_location);
1199        }
1200
1201        Ok(Self {
1202            building_graph: graph,
1203            existing_fragments,
1204            existing_actor_location,
1205            extra_downstreams,
1206            extra_upstreams,
1207        })
1208    }
1209}
1210
1211/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1212fn gen_output_mapping(
1213    required_columns: &[PbColumnDesc],
1214    upstream_columns: &[PbColumnDesc],
1215) -> Option<DispatchOutputMapping> {
1216    let len = required_columns.len();
1217    let mut indices = vec![0; len];
1218    let mut types = None;
1219
1220    for (i, r) in required_columns.iter().enumerate() {
1221        let (ui, u) = upstream_columns
1222            .iter()
1223            .find_position(|&u| u.column_id == r.column_id)?;
1224        indices[i] = ui as u32;
1225
1226        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1227        // non-empty `types`.
1228        if u.column_type != r.column_type {
1229            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1230                upstream: u.column_type.clone(),
1231                downstream: r.column_type.clone(),
1232            };
1233        }
1234    }
1235
1236    // If there's no type change, indicate it by empty `types`.
1237    let types = types.unwrap_or(Vec::new());
1238
1239    Some(DispatchOutputMapping { indices, types })
1240}
1241
1242fn mv_on_mv_dispatch_strategy(
1243    uses_shuffled_backfill: bool,
1244    dist_key_indices: Vec<u32>,
1245    output_mapping: DispatchOutputMapping,
1246) -> DispatchStrategy {
1247    if uses_shuffled_backfill {
1248        if !dist_key_indices.is_empty() {
1249            DispatchStrategy {
1250                r#type: DispatcherType::Hash as _,
1251                dist_key_indices,
1252                output_mapping: Some(output_mapping),
1253            }
1254        } else {
1255            DispatchStrategy {
1256                r#type: DispatcherType::Simple as _,
1257                dist_key_indices: vec![], // empty for Simple
1258                output_mapping: Some(output_mapping),
1259            }
1260        }
1261    } else {
1262        DispatchStrategy {
1263            r#type: DispatcherType::NoShuffle as _,
1264            dist_key_indices: vec![], // not used for `NoShuffle`
1265            output_mapping: Some(output_mapping),
1266        }
1267    }
1268}
1269
1270impl CompleteStreamFragmentGraph {
1271    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1272    /// building graph.
1273    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1274        self.building_graph
1275            .fragments
1276            .keys()
1277            .chain(self.existing_fragments.keys())
1278            .copied()
1279    }
1280
1281    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1282    pub(super) fn all_edges(
1283        &self,
1284    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1285        self.building_graph
1286            .downstreams
1287            .iter()
1288            .chain(self.extra_downstreams.iter())
1289            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1290    }
1291
1292    /// Returns the distribution of the existing fragments.
1293    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1294        self.existing_fragments
1295            .iter()
1296            .map(|(&id, f)| {
1297                (
1298                    id,
1299                    Distribution::from_fragment(f, &self.existing_actor_location),
1300                )
1301            })
1302            .collect()
1303    }
1304
1305    /// Generate topological order of **all** fragments in this graph, including the ones that are
1306    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1307    /// not be done.
1308    ///
1309    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1310    /// `Sink` / `Materialize` in stream graph.
1311    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1312        let mut topo = Vec::new();
1313        let mut downstream_cnts = HashMap::new();
1314
1315        // Iterate all fragments.
1316        for fragment_id in self.all_fragment_ids() {
1317            // Count how many downstreams we have for a given fragment.
1318            let downstream_cnt = self.get_downstreams(fragment_id).count();
1319            if downstream_cnt == 0 {
1320                topo.push(fragment_id);
1321            } else {
1322                downstream_cnts.insert(fragment_id, downstream_cnt);
1323            }
1324        }
1325
1326        let mut i = 0;
1327        while let Some(&fragment_id) = topo.get(i) {
1328            i += 1;
1329            // Find if we can process more fragments.
1330            for (upstream_id, _) in self.get_upstreams(fragment_id) {
1331                let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1332                *downstream_cnt -= 1;
1333                if *downstream_cnt == 0 {
1334                    downstream_cnts.remove(&upstream_id);
1335                    topo.push(upstream_id);
1336                }
1337            }
1338        }
1339
1340        if !downstream_cnts.is_empty() {
1341            // There are fragments that are not processed yet.
1342            bail!("graph is not a DAG");
1343        }
1344
1345        Ok(topo)
1346    }
1347
1348    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1349    /// to build actors on the compute nodes and persist into meta store.
1350    pub(super) fn seal_fragment(
1351        &self,
1352        id: GlobalFragmentId,
1353        actors: Vec<StreamActor>,
1354        distribution: Distribution,
1355        stream_node: StreamNode,
1356    ) -> Fragment {
1357        let building_fragment = self.get_fragment(id).into_building().unwrap();
1358        let internal_tables = building_fragment.extract_internal_tables();
1359        let BuildingFragment {
1360            inner,
1361            job_id,
1362            upstream_table_columns: _,
1363        } = building_fragment;
1364
1365        let distribution_type = distribution.to_distribution_type();
1366        let vnode_count = distribution.vnode_count();
1367
1368        let materialized_fragment_id =
1369            if inner.fragment_type_mask & FragmentTypeFlag::Mview as u32 != 0 {
1370                job_id
1371            } else {
1372                None
1373            };
1374
1375        let state_table_ids = internal_tables
1376            .iter()
1377            .map(|t| t.id)
1378            .chain(materialized_fragment_id)
1379            .collect();
1380
1381        Fragment {
1382            fragment_id: inner.fragment_id,
1383            fragment_type_mask: inner.fragment_type_mask,
1384            distribution_type,
1385            actors,
1386            state_table_ids,
1387            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1388            nodes: stream_node,
1389        }
1390    }
1391
1392    /// Get a fragment from the complete graph, which can be either a building fragment or an
1393    /// existing fragment.
1394    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1395        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1396            EitherFragment::Existing(fragment.clone())
1397        } else {
1398            EitherFragment::Building(
1399                self.building_graph
1400                    .fragments
1401                    .get(&fragment_id)
1402                    .unwrap()
1403                    .clone(),
1404            )
1405        }
1406    }
1407
1408    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1409    /// graph.
1410    pub(super) fn get_downstreams(
1411        &self,
1412        fragment_id: GlobalFragmentId,
1413    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1414        self.building_graph
1415            .get_downstreams(fragment_id)
1416            .iter()
1417            .chain(
1418                self.extra_downstreams
1419                    .get(&fragment_id)
1420                    .into_iter()
1421                    .flatten(),
1422            )
1423            .map(|(&id, edge)| (id, edge))
1424    }
1425
1426    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1427    /// graph.
1428    pub(super) fn get_upstreams(
1429        &self,
1430        fragment_id: GlobalFragmentId,
1431    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1432        self.building_graph
1433            .get_upstreams(fragment_id)
1434            .iter()
1435            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1436            .map(|(&id, edge)| (id, edge))
1437    }
1438
1439    /// Returns all building fragments in the graph.
1440    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1441        &self.building_graph.fragments
1442    }
1443
1444    /// Returns all building fragments in the graph, mutable.
1445    pub(super) fn building_fragments_mut(
1446        &mut self,
1447    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1448        &mut self.building_graph.fragments
1449    }
1450
1451    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1452    pub(super) fn max_parallelism(&self) -> usize {
1453        self.building_graph.max_parallelism()
1454    }
1455}