risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25    CDC_SOURCE_COLUMN_NUM, FragmentTypeFlag, FragmentTypeMask, TableId,
26    generate_internal_table_name_with_type,
27};
28use risingwave_common::hash::VnodeCount;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::stream_graph_visitor::{
31    self, visit_stream_node_cont, visit_stream_node_cont_mut,
32};
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::ddl_service::TableJobType;
36use risingwave_pb::plan_common::PbColumnDesc;
37use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
38use risingwave_pb::stream_plan::stream_fragment_graph::{
39    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
40};
41use risingwave_pb::stream_plan::stream_node::NodeBody;
42use risingwave_pb::stream_plan::{
43    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
44    StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode, StreamScanType,
45};
46
47use crate::barrier::SnapshotBackfillInfo;
48use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
49use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
50use crate::stream::stream_graph::id::{GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen};
51use crate::stream::stream_graph::schedule::Distribution;
52use crate::{MetaError, MetaResult};
53
54/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
55/// several additional helper fields.
56#[derive(Debug, Clone)]
57pub(super) struct BuildingFragment {
58    /// The fragment structure from the frontend, with the global fragment ID.
59    inner: StreamFragment,
60
61    /// The ID of the job if it contains the streaming job node.
62    job_id: Option<u32>,
63
64    /// The required column IDs of each upstream table.
65    /// Will be converted to indices when building the edge connected to the upstream.
66    ///
67    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
68    upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
69}
70
71impl BuildingFragment {
72    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
73    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
74    fn new(
75        id: GlobalFragmentId,
76        fragment: StreamFragment,
77        job: &StreamingJob,
78        table_id_gen: GlobalTableIdGen,
79    ) -> Self {
80        let mut fragment = StreamFragment {
81            fragment_id: id.as_global_id(),
82            ..fragment
83        };
84
85        // Fill the information of the internal tables in the fragment.
86        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
87
88        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
89        let upstream_table_columns =
90            Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
91
92        Self {
93            inner: fragment,
94            job_id,
95            upstream_table_columns,
96        }
97    }
98
99    /// Extract the internal tables from the fragment.
100    fn extract_internal_tables(&self) -> Vec<Table> {
101        let mut fragment = self.inner.to_owned();
102        let mut tables = Vec::new();
103        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
104            tables.push(table.clone());
105        });
106        tables
107    }
108
109    /// Fill the information with the internal tables in the fragment.
110    fn fill_internal_tables(
111        fragment: &mut StreamFragment,
112        job: &StreamingJob,
113        table_id_gen: GlobalTableIdGen,
114    ) {
115        let fragment_id = fragment.fragment_id;
116        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
117            table.id = table_id_gen.to_global_id(table.id).as_global_id();
118            table.schema_id = job.schema_id();
119            table.database_id = job.database_id();
120            table.name = generate_internal_table_name_with_type(
121                &job.name(),
122                fragment_id,
123                table.id,
124                table_type_name,
125            );
126            table.fragment_id = fragment_id;
127            table.owner = job.owner();
128        });
129    }
130
131    /// Fill the information with the job in the fragment.
132    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
133        let job_id = job.id();
134        let fragment_id = fragment.fragment_id;
135        let mut has_job = false;
136
137        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
138            NodeBody::Materialize(materialize_node) => {
139                materialize_node.table_id = job_id;
140
141                // Fill the table field of `MaterializeNode` from the job.
142                let table = materialize_node.table.insert(job.table().unwrap().clone());
143                table.fragment_id = fragment_id; // this will later be synced back to `job.table` with `set_info_from_graph`
144                // In production, do not include full definition in the table in plan node.
145                if cfg!(not(debug_assertions)) {
146                    table.definition = job.name();
147                }
148
149                has_job = true;
150            }
151            NodeBody::Sink(sink_node) => {
152                sink_node.sink_desc.as_mut().unwrap().id = job_id;
153
154                has_job = true;
155            }
156            NodeBody::Dml(dml_node) => {
157                dml_node.table_id = job_id;
158                dml_node.table_version_id = job.table_version_id().unwrap();
159            }
160            NodeBody::StreamFsFetch(fs_fetch_node) => {
161                if let StreamingJob::Table(table_source, _, _) = job {
162                    if let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
163                        && let Some(source) = table_source
164                    {
165                        node_inner.source_id = source.id;
166                    }
167                }
168            }
169            NodeBody::Source(source_node) => {
170                match job {
171                    // Note: For table without connector, it has a dummy Source node.
172                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
173                    StreamingJob::Table(source, _table, _table_job_type) => {
174                        if let Some(source_inner) = source_node.source_inner.as_mut() {
175                            if let Some(source) = source {
176                                debug_assert_ne!(source.id, job_id);
177                                source_inner.source_id = source.id;
178                            }
179                        }
180                    }
181                    StreamingJob::Source(source) => {
182                        has_job = true;
183                        if let Some(source_inner) = source_node.source_inner.as_mut() {
184                            debug_assert_eq!(source.id, job_id);
185                            source_inner.source_id = source.id;
186                        }
187                    }
188                    // For other job types, no need to fill the source id, since it refers to an existing source.
189                    _ => {}
190                }
191            }
192            NodeBody::StreamCdcScan(node) => {
193                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
194                    table_desc.table_id = job_id;
195                }
196            }
197            _ => {}
198        });
199
200        has_job
201    }
202
203    /// Extract the required columns of each upstream table except for cross-db backfill.
204    fn extract_upstream_table_columns_except_cross_db_backfill(
205        fragment: &StreamFragment,
206    ) -> HashMap<TableId, Vec<PbColumnDesc>> {
207        let mut table_columns = HashMap::new();
208
209        stream_graph_visitor::visit_fragment(fragment, |node_body| {
210            let (table_id, column_ids) = match node_body {
211                NodeBody::StreamScan(stream_scan) => {
212                    if stream_scan.get_stream_scan_type().unwrap()
213                        == StreamScanType::CrossDbSnapshotBackfill
214                    {
215                        return;
216                    }
217                    (stream_scan.table_id.into(), stream_scan.upstream_columns())
218                }
219                NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
220                NodeBody::SourceBackfill(backfill) => (
221                    backfill.upstream_source_id.into(),
222                    // FIXME: only pass required columns instead of all columns here
223                    backfill.column_descs(),
224                ),
225                _ => return,
226            };
227            table_columns
228                .try_insert(table_id, column_ids)
229                .expect("currently there should be no two same upstream tables in a fragment");
230        });
231
232        table_columns
233    }
234
235    pub fn has_shuffled_backfill(&self) -> bool {
236        let stream_node = match self.inner.node.as_ref() {
237            Some(node) => node,
238            _ => return false,
239        };
240        let mut has_shuffled_backfill = false;
241        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
242        visit_stream_node_cont(stream_node, |node| {
243            let is_shuffled_backfill = if let Some(node) = &node.node_body
244                && let Some(node) = node.as_stream_scan()
245            {
246                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
247                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
248            } else {
249                false
250            };
251            if is_shuffled_backfill {
252                *has_shuffled_backfill_mut_ref = true;
253                false
254            } else {
255                true
256            }
257        });
258        has_shuffled_backfill
259    }
260}
261
262impl Deref for BuildingFragment {
263    type Target = StreamFragment;
264
265    fn deref(&self) -> &Self::Target {
266        &self.inner
267    }
268}
269
270impl DerefMut for BuildingFragment {
271    fn deref_mut(&mut self) -> &mut Self::Target {
272        &mut self.inner
273    }
274}
275
276/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
277/// different variants.
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
279pub(super) enum EdgeId {
280    /// The edge between two building (internal) fragments.
281    Internal {
282        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
283        /// See [`StreamFragmentEdgeProto`].
284        link_id: u64,
285    },
286
287    /// The edge between an upstream external fragment and downstream building fragment. Used for
288    /// MV on MV.
289    UpstreamExternal {
290        /// The ID of the upstream table or materialized view.
291        upstream_table_id: TableId,
292        /// The ID of the downstream fragment.
293        downstream_fragment_id: GlobalFragmentId,
294    },
295
296    /// The edge between an upstream building fragment and downstream external fragment. Used for
297    /// schema change (replace table plan).
298    DownstreamExternal(DownstreamExternalEdgeId),
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
302pub(super) struct DownstreamExternalEdgeId {
303    /// The ID of the original upstream fragment (`Materialize`).
304    pub(super) original_upstream_fragment_id: GlobalFragmentId,
305    /// The ID of the downstream fragment.
306    pub(super) downstream_fragment_id: GlobalFragmentId,
307}
308
309/// The edge in the fragment graph.
310///
311/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
312#[derive(Debug, Clone)]
313pub(super) struct StreamFragmentEdge {
314    /// The ID of the edge.
315    pub id: EdgeId,
316
317    /// The strategy used for dispatching the data.
318    pub dispatch_strategy: DispatchStrategy,
319}
320
321impl StreamFragmentEdge {
322    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
323        Self {
324            // By creating an edge from the protobuf, we know that the edge is from the frontend and
325            // is internal.
326            id: EdgeId::Internal {
327                link_id: edge.link_id,
328            },
329            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
330        }
331    }
332}
333
334/// Adjacency list (G) of backfill orders.
335/// `G[10] -> [1, 2, 11]`
336/// means for the backfill node in `fragment 10`
337/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
338pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
339
340/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
341/// from the frontend.
342///
343/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
344/// that contains the additional information of pre-existing
345/// fragments, which are connected to the graph's top-most or bottom-most fragments.
346#[derive(Default, Debug)]
347pub struct StreamFragmentGraph {
348    /// stores all the fragments in the graph.
349    fragments: HashMap<GlobalFragmentId, BuildingFragment>,
350
351    /// stores edges between fragments: upstream => downstream.
352    downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
353
354    /// stores edges between fragments: downstream -> upstream.
355    upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
356
357    /// Dependent relations of this job.
358    dependent_table_ids: HashSet<TableId>,
359
360    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
361    /// variable. If not specified, all active worker slots will be used.
362    specified_parallelism: Option<NonZeroUsize>,
363
364    /// Specified max parallelism, i.e., expected vnode count for the graph.
365    ///
366    /// The scheduler on the meta service will use this as a hint to decide the vnode count
367    /// for each fragment.
368    ///
369    /// Note that the actual vnode count may be different from this value.
370    /// For example, a no-shuffle exchange between current fragment graph and an existing
371    /// upstream fragment graph requires two fragments to be in the same distribution,
372    /// thus the same vnode count.
373    max_parallelism: usize,
374
375    /// The backfill ordering strategy of the graph.
376    backfill_order: BackfillOrder,
377}
378
379impl StreamFragmentGraph {
380    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
381    /// global IDs correctly filled.
382    pub fn new(
383        env: &MetaSrvEnv,
384        proto: StreamFragmentGraphProto,
385        job: &StreamingJob,
386    ) -> MetaResult<Self> {
387        let fragment_id_gen =
388            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
389        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
390        // with `refill_internal_table_ids` later.
391        // TODO: refactor the code to remove this step.
392        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
393
394        // Create nodes.
395        let fragments: HashMap<_, _> = proto
396            .fragments
397            .into_iter()
398            .map(|(id, fragment)| {
399                let id = fragment_id_gen.to_global_id(id);
400                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
401                (id, fragment)
402            })
403            .collect();
404
405        assert_eq!(
406            fragments
407                .values()
408                .map(|f| f.extract_internal_tables().len() as u32)
409                .sum::<u32>(),
410            proto.table_ids_cnt
411        );
412
413        // Create edges.
414        let mut downstreams = HashMap::new();
415        let mut upstreams = HashMap::new();
416
417        for edge in proto.edges {
418            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
419            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
420            let edge = StreamFragmentEdge::from_protobuf(&edge);
421
422            upstreams
423                .entry(downstream_id)
424                .or_insert_with(HashMap::new)
425                .try_insert(upstream_id, edge.clone())
426                .unwrap();
427            downstreams
428                .entry(upstream_id)
429                .or_insert_with(HashMap::new)
430                .try_insert(downstream_id, edge)
431                .unwrap();
432        }
433
434        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
435        // frontend), instead of visiting the graph ourselves.
436        let dependent_table_ids = proto
437            .dependent_table_ids
438            .iter()
439            .map(TableId::from)
440            .collect();
441
442        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
443            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
444        } else {
445            None
446        };
447
448        let max_parallelism = proto.max_parallelism as usize;
449        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
450            order: Default::default(),
451        });
452
453        Ok(Self {
454            fragments,
455            downstreams,
456            upstreams,
457            dependent_table_ids,
458            specified_parallelism,
459            max_parallelism,
460            backfill_order,
461        })
462    }
463
464    /// Retrieve the **incomplete** internal tables map of the whole graph.
465    ///
466    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
467    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
468    /// Be careful when using the returned values.
469    ///
470    /// See also [`crate::model::StreamJobFragments::internal_tables`].
471    pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
472        let mut tables = BTreeMap::new();
473        for fragment in self.fragments.values() {
474            for table in fragment.extract_internal_tables() {
475                let table_id = table.id;
476                tables
477                    .try_insert(table_id, table)
478                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
479            }
480        }
481        tables
482    }
483
484    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
485    /// `create_internal_table_catalog`.
486    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
487        for fragment in self.fragments.values_mut() {
488            stream_graph_visitor::visit_internal_tables(
489                &mut fragment.inner,
490                |table, _table_type_name| {
491                    let target = table_id_map.get(&table.id).cloned().unwrap();
492                    table.id = target;
493                },
494            );
495        }
496    }
497
498    /// Set internal tables' `table_id`s according to a list of internal tables
499    pub fn fit_internal_table_ids(
500        &mut self,
501        mut old_internal_tables: Vec<Table>,
502    ) -> MetaResult<()> {
503        let mut new_internal_table_ids = Vec::new();
504        for fragment in self.fragments.values() {
505            for table in &fragment.extract_internal_tables() {
506                new_internal_table_ids.push(table.id);
507            }
508        }
509
510        if new_internal_table_ids.len() != old_internal_tables.len() {
511            bail!(
512                "Different number of internal tables. New: {}, Old: {}",
513                new_internal_table_ids.len(),
514                old_internal_tables.len()
515            );
516        }
517        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
518        new_internal_table_ids.sort();
519
520        let internal_table_id_map = new_internal_table_ids
521            .into_iter()
522            .zip_eq_fast(old_internal_tables.into_iter())
523            .collect::<HashMap<_, _>>();
524
525        for fragment in self.fragments.values_mut() {
526            stream_graph_visitor::visit_internal_tables(
527                &mut fragment.inner,
528                |table, _table_type_name| {
529                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
530                    *table = target;
531                },
532            );
533        }
534
535        Ok(())
536    }
537
538    /// Returns the fragment id where the streaming job node located.
539    pub fn table_fragment_id(&self) -> FragmentId {
540        self.fragments
541            .values()
542            .filter(|b| b.job_id.is_some())
543            .map(|b| b.fragment_id)
544            .exactly_one()
545            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
546    }
547
548    /// Returns the fragment id where the table dml is received.
549    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
550        self.fragments
551            .values()
552            .filter(|b| {
553                FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
554            })
555            .map(|b| b.fragment_id)
556            .at_most_one()
557            .expect("require at most 1 dml node when creating the streaming job")
558    }
559
560    /// Get the dependent streaming job ids of this job.
561    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
562        &self.dependent_table_ids
563    }
564
565    /// Get the parallelism of the job, if specified by the user.
566    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
567        self.specified_parallelism
568    }
569
570    /// Get the expected vnode count of the graph. See documentation of the field for more details.
571    pub fn max_parallelism(&self) -> usize {
572        self.max_parallelism
573    }
574
575    /// Get downstreams of a fragment.
576    fn get_downstreams(
577        &self,
578        fragment_id: GlobalFragmentId,
579    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
580        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
581    }
582
583    /// Get upstreams of a fragment.
584    fn get_upstreams(
585        &self,
586        fragment_id: GlobalFragmentId,
587    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
588        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
589    }
590
591    pub fn collect_snapshot_backfill_info(
592        &self,
593    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
594        Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
595            (
596                fragment.node.as_ref().unwrap(),
597                fragment.fragment_type_mask.into(),
598            )
599        }))
600    }
601
602    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
603    pub fn collect_snapshot_backfill_info_impl(
604        fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
605    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
606        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
607        let mut cross_db_info = SnapshotBackfillInfo {
608            upstream_mv_table_id_to_backfill_epoch: Default::default(),
609        };
610        let mut result = Ok(());
611        for (node, fragment_type_mask) in fragments {
612            visit_stream_node_cont(node, |node| {
613                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
614                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
615                        .expect("invalid stream_scan_type");
616                    let is_snapshot_backfill = match stream_scan_type {
617                        StreamScanType::SnapshotBackfill => {
618                            assert!(
619                                fragment_type_mask
620                                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
621                            );
622                            true
623                        }
624                        StreamScanType::CrossDbSnapshotBackfill => {
625                            assert!(
626                                fragment_type_mask
627                                    .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
628                            );
629                            cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
630                                TableId::new(stream_scan.table_id),
631                                stream_scan.snapshot_backfill_epoch,
632                            );
633
634                            return true;
635                        }
636                        _ => false,
637                    };
638
639                    match &mut prev_stream_scan {
640                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
641                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
642                                (Some(prev_snapshot_backfill_info), true) => {
643                                    prev_snapshot_backfill_info
644                                        .upstream_mv_table_id_to_backfill_epoch
645                                        .insert(
646                                            TableId::new(stream_scan.table_id),
647                                            stream_scan.snapshot_backfill_epoch,
648                                        );
649                                    true
650                                }
651                                (None, false) => true,
652                                (_, _) => {
653                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
654                                    false
655                                }
656                            }
657                        }
658                        None => {
659                            prev_stream_scan = Some((
660                                if is_snapshot_backfill {
661                                    Some(SnapshotBackfillInfo {
662                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
663                                            [(
664                                                TableId::new(stream_scan.table_id),
665                                                stream_scan.snapshot_backfill_epoch,
666                                            )],
667                                        ),
668                                    })
669                                } else {
670                                    None
671                                },
672                                *stream_scan.clone(),
673                            ));
674                            true
675                        }
676                    }
677                } else {
678                    true
679                }
680            })
681        }
682        result.map(|_| {
683            (
684                prev_stream_scan
685                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
686                    .unwrap_or(None),
687                cross_db_info,
688            )
689        })
690    }
691
692    /// Collect the mapping from table / `source_id` -> `fragment_id`
693    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
694        let mut mapping = HashMap::new();
695        for (fragment_id, fragment) in &self.fragments {
696            let fragment_id = fragment_id.as_global_id();
697            let fragment_mask = fragment.fragment_type_mask;
698            let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
699            let has_some_scan = candidates
700                .into_iter()
701                .any(|flag| (fragment_mask & flag as u32) > 0);
702            if has_some_scan {
703                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
704                    match node.node_body.as_ref() {
705                        Some(NodeBody::StreamScan(stream_scan)) => {
706                            let table_id = stream_scan.table_id;
707                            let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
708                            fragments.push(fragment_id);
709                            // each fragment should have only 1 scan node.
710                            false
711                        }
712                        Some(NodeBody::SourceBackfill(source_backfill)) => {
713                            let source_id = source_backfill.upstream_source_id;
714                            let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
715                            fragments.push(fragment_id);
716                            // each fragment should have only 1 scan node.
717                            false
718                        }
719                        _ => true,
720                    }
721                })
722            }
723        }
724        mapping
725    }
726
727    /// Initially the mapping that comes from frontend is between `table_ids`.
728    /// We should remap it to fragment level, since we track progress by actor, and we can get
729    /// a fragment <-> actor mapping
730    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
731        let mapping = self.collect_backfill_mapping();
732        let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
733        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
734            let fragment_ids = mapping.get(rel_id).unwrap();
735            for fragment_id in fragment_ids {
736                let downstream_fragment_ids = downstream_rel_ids
737                    .data
738                    .iter()
739                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
740                    .copied()
741                    .collect();
742                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
743            }
744        }
745        fragment_ordering
746    }
747}
748
749/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
750/// Return `true` when has change applied.
751pub fn fill_snapshot_backfill_epoch(
752    node: &mut StreamNode,
753    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
754    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
755) -> MetaResult<bool> {
756    let mut result = Ok(());
757    let mut applied = false;
758    visit_stream_node_cont_mut(node, |node| {
759        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
760            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
761                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
762        {
763            result = try {
764                let table_id = TableId::new(stream_scan.table_id);
765                let snapshot_epoch = cross_db_snapshot_backfill_info
766                    .upstream_mv_table_id_to_backfill_epoch
767                    .get(&table_id)
768                    .or_else(|| {
769                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
770                            snapshot_backfill_info
771                                .upstream_mv_table_id_to_backfill_epoch
772                                .get(&table_id)
773                        })
774                    })
775                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
776                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
777                if let Some(prev_snapshot_epoch) =
778                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
779                {
780                    Err(anyhow!(
781                        "snapshot backfill epoch set again: {} {} {}",
782                        table_id,
783                        prev_snapshot_epoch,
784                        snapshot_epoch
785                    ))?;
786                }
787                applied = true;
788            };
789            result.is_ok()
790        } else {
791            true
792        }
793    });
794    result.map(|_| applied)
795}
796
797static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
798    LazyLock::new(HashMap::new);
799
800/// A fragment that is either being built or already exists. Used for generalize the logic of
801/// [`crate::stream::ActorGraphBuilder`].
802#[derive(Debug, Clone, EnumAsInner)]
803pub(super) enum EitherFragment {
804    /// An internal fragment that is being built for the current streaming job.
805    Building(BuildingFragment),
806
807    /// An existing fragment that is external but connected to the fragments being built.
808    Existing(Fragment),
809}
810
811/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
812/// fragments, which are connected to the graph's top-most or bottom-most fragments.
813///
814/// For example,
815/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
816///   `Materialize` node will be included in this structure.
817/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
818///   containing the `StreamScan` nodes will be included in this structure.
819#[derive(Debug)]
820pub struct CompleteStreamFragmentGraph {
821    /// The fragment graph of the streaming job being built.
822    building_graph: StreamFragmentGraph,
823
824    /// The required information of existing fragments.
825    existing_fragments: HashMap<GlobalFragmentId, Fragment>,
826
827    /// The location of the actors in the existing fragments.
828    existing_actor_location: HashMap<ActorId, WorkerId>,
829
830    /// Extra edges between existing fragments and the building fragments.
831    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
832
833    /// Extra edges between existing fragments and the building fragments.
834    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
835}
836
837pub struct FragmentGraphUpstreamContext {
838    /// Root fragment is the root of upstream stream graph, which can be a
839    /// mview fragment or source fragment for cdc source job
840    upstream_root_fragments: HashMap<TableId, Fragment>,
841    upstream_actor_location: HashMap<ActorId, WorkerId>,
842}
843
844pub struct FragmentGraphDownstreamContext {
845    original_root_fragment_id: FragmentId,
846    downstream_fragments: Vec<(DispatcherType, Fragment)>,
847    downstream_actor_location: HashMap<ActorId, WorkerId>,
848}
849
850impl CompleteStreamFragmentGraph {
851    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
852    /// upstream mviews.
853    #[cfg(test)]
854    pub fn for_test(graph: StreamFragmentGraph) -> Self {
855        Self {
856            building_graph: graph,
857            existing_fragments: Default::default(),
858            existing_actor_location: Default::default(),
859            extra_downstreams: Default::default(),
860            extra_upstreams: Default::default(),
861        }
862    }
863
864    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
865    /// e.g., MV on MV and CDC/Source Table with the upstream existing
866    /// `Materialize` or `Source` fragments.
867    pub fn with_upstreams(
868        graph: StreamFragmentGraph,
869        upstream_root_fragments: HashMap<TableId, Fragment>,
870        existing_actor_location: HashMap<ActorId, WorkerId>,
871        job_type: StreamingJobType,
872    ) -> MetaResult<Self> {
873        Self::build_helper(
874            graph,
875            Some(FragmentGraphUpstreamContext {
876                upstream_root_fragments,
877                upstream_actor_location: existing_actor_location,
878            }),
879            None,
880            job_type,
881        )
882    }
883
884    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
885    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
886    pub fn with_downstreams(
887        graph: StreamFragmentGraph,
888        original_root_fragment_id: FragmentId,
889        downstream_fragments: Vec<(DispatcherType, Fragment)>,
890        existing_actor_location: HashMap<ActorId, WorkerId>,
891        job_type: StreamingJobType,
892    ) -> MetaResult<Self> {
893        Self::build_helper(
894            graph,
895            None,
896            Some(FragmentGraphDownstreamContext {
897                original_root_fragment_id,
898                downstream_fragments,
899                downstream_actor_location: existing_actor_location,
900            }),
901            job_type,
902        )
903    }
904
905    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
906    pub fn with_upstreams_and_downstreams(
907        graph: StreamFragmentGraph,
908        upstream_root_fragments: HashMap<TableId, Fragment>,
909        upstream_actor_location: HashMap<ActorId, WorkerId>,
910        original_root_fragment_id: FragmentId,
911        downstream_fragments: Vec<(DispatcherType, Fragment)>,
912        downstream_actor_location: HashMap<ActorId, WorkerId>,
913        job_type: StreamingJobType,
914    ) -> MetaResult<Self> {
915        Self::build_helper(
916            graph,
917            Some(FragmentGraphUpstreamContext {
918                upstream_root_fragments,
919                upstream_actor_location,
920            }),
921            Some(FragmentGraphDownstreamContext {
922                original_root_fragment_id,
923                downstream_fragments,
924                downstream_actor_location,
925            }),
926            job_type,
927        )
928    }
929
930    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
931    fn build_helper(
932        mut graph: StreamFragmentGraph,
933        upstream_ctx: Option<FragmentGraphUpstreamContext>,
934        downstream_ctx: Option<FragmentGraphDownstreamContext>,
935        job_type: StreamingJobType,
936    ) -> MetaResult<Self> {
937        let mut extra_downstreams = HashMap::new();
938        let mut extra_upstreams = HashMap::new();
939        let mut existing_fragments = HashMap::new();
940
941        let mut existing_actor_location = HashMap::new();
942
943        if let Some(FragmentGraphUpstreamContext {
944            upstream_root_fragments,
945            upstream_actor_location,
946        }) = upstream_ctx
947        {
948            for (&id, fragment) in &mut graph.fragments {
949                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
950
951                for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
952                    let upstream_fragment = upstream_root_fragments
953                        .get(&upstream_table_id)
954                        .context("upstream fragment not found")?;
955                    let upstream_root_fragment_id =
956                        GlobalFragmentId::new(upstream_fragment.fragment_id);
957
958                    let edge = match job_type {
959                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
960                            // we traverse all fragments in the graph, and we should find out the
961                            // CdcFilter fragment and add an edge between upstream source fragment and it.
962                            assert_ne!(
963                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
964                                0
965                            );
966
967                            tracing::debug!(
968                                ?upstream_root_fragment_id,
969                                ?required_columns,
970                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
971                                current_frag_id=?id,
972                                "CdcFilter with upstream source fragment"
973                            );
974
975                            StreamFragmentEdge {
976                                id: EdgeId::UpstreamExternal {
977                                    upstream_table_id,
978                                    downstream_fragment_id: id,
979                                },
980                                // We always use `NoShuffle` for the exchange between the upstream
981                                // `Source` and the downstream `StreamScan` of the new cdc table.
982                                dispatch_strategy: DispatchStrategy {
983                                    r#type: DispatcherType::NoShuffle as _,
984                                    dist_key_indices: vec![], // not used for `NoShuffle`
985                                    output_mapping: DispatchOutputMapping::identical(
986                                        CDC_SOURCE_COLUMN_NUM as _,
987                                    )
988                                    .into(),
989                                },
990                            }
991                        }
992
993                        // handle MV on MV/Source
994                        StreamingJobType::MaterializedView
995                        | StreamingJobType::Sink
996                        | StreamingJobType::Index => {
997                            // Build the extra edges between the upstream `Materialize` and
998                            // the downstream `StreamScan` of the new job.
999                            if upstream_fragment
1000                                .fragment_type_mask
1001                                .contains(FragmentTypeFlag::Mview)
1002                            {
1003                                // Resolve the required output columns from the upstream materialized view.
1004                                let (dist_key_indices, output_mapping) = {
1005                                    let nodes = &upstream_fragment.nodes;
1006                                    let mview_node =
1007                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1008                                    let all_columns = mview_node.column_descs();
1009                                    let dist_key_indices = mview_node.dist_key_indices();
1010                                    let output_mapping = gen_output_mapping(
1011                                        required_columns,
1012                                        &all_columns,
1013                                    )
1014                                    .context(
1015                                        "BUG: column not found in the upstream materialized view",
1016                                    )?;
1017                                    (dist_key_indices, output_mapping)
1018                                };
1019                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1020                                    uses_shuffled_backfill,
1021                                    dist_key_indices,
1022                                    output_mapping,
1023                                );
1024
1025                                StreamFragmentEdge {
1026                                    id: EdgeId::UpstreamExternal {
1027                                        upstream_table_id,
1028                                        downstream_fragment_id: id,
1029                                    },
1030                                    dispatch_strategy,
1031                                }
1032                            }
1033                            // Build the extra edges between the upstream `Source` and
1034                            // the downstream `SourceBackfill` of the new job.
1035                            else if upstream_fragment
1036                                .fragment_type_mask
1037                                .contains(FragmentTypeFlag::Source)
1038                            {
1039                                let output_mapping = {
1040                                    let nodes = &upstream_fragment.nodes;
1041                                    let source_node =
1042                                        nodes.get_node_body().unwrap().as_source().unwrap();
1043
1044                                    let all_columns = source_node.column_descs().unwrap();
1045                                    gen_output_mapping(required_columns, &all_columns).context(
1046                                        "BUG: column not found in the upstream source node",
1047                                    )?
1048                                };
1049
1050                                StreamFragmentEdge {
1051                                    id: EdgeId::UpstreamExternal {
1052                                        upstream_table_id,
1053                                        downstream_fragment_id: id,
1054                                    },
1055                                    // We always use `NoShuffle` for the exchange between the upstream
1056                                    // `Source` and the downstream `StreamScan` of the new MV.
1057                                    dispatch_strategy: DispatchStrategy {
1058                                        r#type: DispatcherType::NoShuffle as _,
1059                                        dist_key_indices: vec![], // not used for `NoShuffle`
1060                                        output_mapping: Some(output_mapping),
1061                                    },
1062                                }
1063                            } else {
1064                                bail!(
1065                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1066                                    upstream_fragment.fragment_type_mask
1067                                )
1068                            }
1069                        }
1070                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1071                            bail!(
1072                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1073                                job_type
1074                            )
1075                        }
1076                    };
1077
1078                    // put the edge into the extra edges
1079                    extra_downstreams
1080                        .entry(upstream_root_fragment_id)
1081                        .or_insert_with(HashMap::new)
1082                        .try_insert(id, edge.clone())
1083                        .unwrap();
1084                    extra_upstreams
1085                        .entry(id)
1086                        .or_insert_with(HashMap::new)
1087                        .try_insert(upstream_root_fragment_id, edge)
1088                        .unwrap();
1089                }
1090            }
1091
1092            existing_fragments.extend(
1093                upstream_root_fragments
1094                    .into_values()
1095                    .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1096            );
1097
1098            existing_actor_location.extend(upstream_actor_location);
1099        }
1100
1101        if let Some(FragmentGraphDownstreamContext {
1102            original_root_fragment_id,
1103            downstream_fragments,
1104            downstream_actor_location,
1105        }) = downstream_ctx
1106        {
1107            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1108            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1109
1110            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1111            // existing materialized views.
1112            for (dispatcher_type, fragment) in &downstream_fragments {
1113                let id = GlobalFragmentId::new(fragment.fragment_id);
1114
1115                // Similar to `extract_upstream_table_columns_except_cross_db_backfill`.
1116                let output_columns = {
1117                    let mut res = None;
1118
1119                    stream_graph_visitor::visit_stream_node_body(&fragment.nodes, |node_body| {
1120                        let columns = match node_body {
1121                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1122                            NodeBody::SourceBackfill(source_backfill) => {
1123                                // FIXME: only pass required columns instead of all columns here
1124                                source_backfill.column_descs()
1125                            }
1126                            _ => return,
1127                        };
1128                        res = Some(columns);
1129                    });
1130
1131                    res.context("failed to locate downstream scan")?
1132                };
1133
1134                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1135                let nodes = table_fragment.node.as_ref().unwrap();
1136
1137                let (dist_key_indices, output_mapping) = match job_type {
1138                    StreamingJobType::Table(_) => {
1139                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1140                        let all_columns = mview_node.column_descs();
1141                        let dist_key_indices = mview_node.dist_key_indices();
1142                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1143                            .ok_or_else(|| {
1144                                MetaError::invalid_parameter(
1145                                    "unable to drop the column due to \
1146                                     being referenced by downstream materialized views or sinks",
1147                                )
1148                            })?;
1149                        (dist_key_indices, output_mapping)
1150                    }
1151
1152                    StreamingJobType::Source => {
1153                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1154                        let all_columns = source_node.column_descs().unwrap();
1155                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1156                            .ok_or_else(|| {
1157                                MetaError::invalid_parameter(
1158                                    "unable to drop the column due to \
1159                                     being referenced by downstream materialized views or sinks",
1160                                )
1161                            })?;
1162                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1163                        (
1164                            vec![], // not used for `NoShuffle`
1165                            output_mapping,
1166                        )
1167                    }
1168
1169                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1170                };
1171
1172                let edge = StreamFragmentEdge {
1173                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1174                        original_upstream_fragment_id: original_table_fragment_id,
1175                        downstream_fragment_id: id,
1176                    }),
1177                    dispatch_strategy: DispatchStrategy {
1178                        r#type: *dispatcher_type as i32,
1179                        output_mapping: Some(output_mapping),
1180                        dist_key_indices,
1181                    },
1182                };
1183
1184                extra_downstreams
1185                    .entry(table_fragment_id)
1186                    .or_insert_with(HashMap::new)
1187                    .try_insert(id, edge.clone())
1188                    .unwrap();
1189                extra_upstreams
1190                    .entry(id)
1191                    .or_insert_with(HashMap::new)
1192                    .try_insert(table_fragment_id, edge)
1193                    .unwrap();
1194            }
1195
1196            existing_fragments.extend(
1197                downstream_fragments
1198                    .into_iter()
1199                    .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1200            );
1201
1202            existing_actor_location.extend(downstream_actor_location);
1203        }
1204
1205        Ok(Self {
1206            building_graph: graph,
1207            existing_fragments,
1208            existing_actor_location,
1209            extra_downstreams,
1210            extra_upstreams,
1211        })
1212    }
1213}
1214
1215/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1216fn gen_output_mapping(
1217    required_columns: &[PbColumnDesc],
1218    upstream_columns: &[PbColumnDesc],
1219) -> Option<DispatchOutputMapping> {
1220    let len = required_columns.len();
1221    let mut indices = vec![0; len];
1222    let mut types = None;
1223
1224    for (i, r) in required_columns.iter().enumerate() {
1225        let (ui, u) = upstream_columns
1226            .iter()
1227            .find_position(|&u| u.column_id == r.column_id)?;
1228        indices[i] = ui as u32;
1229
1230        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1231        // non-empty `types`.
1232        if u.column_type != r.column_type {
1233            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1234                upstream: u.column_type.clone(),
1235                downstream: r.column_type.clone(),
1236            };
1237        }
1238    }
1239
1240    // If there's no type change, indicate it by empty `types`.
1241    let types = types.unwrap_or(Vec::new());
1242
1243    Some(DispatchOutputMapping { indices, types })
1244}
1245
1246fn mv_on_mv_dispatch_strategy(
1247    uses_shuffled_backfill: bool,
1248    dist_key_indices: Vec<u32>,
1249    output_mapping: DispatchOutputMapping,
1250) -> DispatchStrategy {
1251    if uses_shuffled_backfill {
1252        if !dist_key_indices.is_empty() {
1253            DispatchStrategy {
1254                r#type: DispatcherType::Hash as _,
1255                dist_key_indices,
1256                output_mapping: Some(output_mapping),
1257            }
1258        } else {
1259            DispatchStrategy {
1260                r#type: DispatcherType::Simple as _,
1261                dist_key_indices: vec![], // empty for Simple
1262                output_mapping: Some(output_mapping),
1263            }
1264        }
1265    } else {
1266        DispatchStrategy {
1267            r#type: DispatcherType::NoShuffle as _,
1268            dist_key_indices: vec![], // not used for `NoShuffle`
1269            output_mapping: Some(output_mapping),
1270        }
1271    }
1272}
1273
1274impl CompleteStreamFragmentGraph {
1275    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1276    /// building graph.
1277    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1278        self.building_graph
1279            .fragments
1280            .keys()
1281            .chain(self.existing_fragments.keys())
1282            .copied()
1283    }
1284
1285    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1286    pub(super) fn all_edges(
1287        &self,
1288    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1289        self.building_graph
1290            .downstreams
1291            .iter()
1292            .chain(self.extra_downstreams.iter())
1293            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1294    }
1295
1296    /// Returns the distribution of the existing fragments.
1297    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1298        self.existing_fragments
1299            .iter()
1300            .map(|(&id, f)| {
1301                (
1302                    id,
1303                    Distribution::from_fragment(f, &self.existing_actor_location),
1304                )
1305            })
1306            .collect()
1307    }
1308
1309    /// Generate topological order of **all** fragments in this graph, including the ones that are
1310    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1311    /// not be done.
1312    ///
1313    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1314    /// `Sink` / `Materialize` in stream graph.
1315    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1316        let mut topo = Vec::new();
1317        let mut downstream_cnts = HashMap::new();
1318
1319        // Iterate all fragments.
1320        for fragment_id in self.all_fragment_ids() {
1321            // Count how many downstreams we have for a given fragment.
1322            let downstream_cnt = self.get_downstreams(fragment_id).count();
1323            if downstream_cnt == 0 {
1324                topo.push(fragment_id);
1325            } else {
1326                downstream_cnts.insert(fragment_id, downstream_cnt);
1327            }
1328        }
1329
1330        let mut i = 0;
1331        while let Some(&fragment_id) = topo.get(i) {
1332            i += 1;
1333            // Find if we can process more fragments.
1334            for (upstream_id, _) in self.get_upstreams(fragment_id) {
1335                let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1336                *downstream_cnt -= 1;
1337                if *downstream_cnt == 0 {
1338                    downstream_cnts.remove(&upstream_id);
1339                    topo.push(upstream_id);
1340                }
1341            }
1342        }
1343
1344        if !downstream_cnts.is_empty() {
1345            // There are fragments that are not processed yet.
1346            bail!("graph is not a DAG");
1347        }
1348
1349        Ok(topo)
1350    }
1351
1352    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1353    /// to build actors on the compute nodes and persist into meta store.
1354    pub(super) fn seal_fragment(
1355        &self,
1356        id: GlobalFragmentId,
1357        actors: Vec<StreamActor>,
1358        distribution: Distribution,
1359        stream_node: StreamNode,
1360    ) -> Fragment {
1361        let building_fragment = self.get_fragment(id).into_building().unwrap();
1362        let internal_tables = building_fragment.extract_internal_tables();
1363        let BuildingFragment {
1364            inner,
1365            job_id,
1366            upstream_table_columns: _,
1367        } = building_fragment;
1368
1369        let distribution_type = distribution.to_distribution_type();
1370        let vnode_count = distribution.vnode_count();
1371
1372        let materialized_fragment_id =
1373            if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1374                job_id
1375            } else {
1376                None
1377            };
1378
1379        let state_table_ids = internal_tables
1380            .iter()
1381            .map(|t| t.id)
1382            .chain(materialized_fragment_id)
1383            .collect();
1384
1385        Fragment {
1386            fragment_id: inner.fragment_id,
1387            fragment_type_mask: inner.fragment_type_mask.into(),
1388            distribution_type,
1389            actors,
1390            state_table_ids,
1391            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1392            nodes: stream_node,
1393        }
1394    }
1395
1396    /// Get a fragment from the complete graph, which can be either a building fragment or an
1397    /// existing fragment.
1398    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1399        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1400            EitherFragment::Existing(fragment.clone())
1401        } else {
1402            EitherFragment::Building(
1403                self.building_graph
1404                    .fragments
1405                    .get(&fragment_id)
1406                    .unwrap()
1407                    .clone(),
1408            )
1409        }
1410    }
1411
1412    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1413    /// graph.
1414    pub(super) fn get_downstreams(
1415        &self,
1416        fragment_id: GlobalFragmentId,
1417    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1418        self.building_graph
1419            .get_downstreams(fragment_id)
1420            .iter()
1421            .chain(
1422                self.extra_downstreams
1423                    .get(&fragment_id)
1424                    .into_iter()
1425                    .flatten(),
1426            )
1427            .map(|(&id, edge)| (id, edge))
1428    }
1429
1430    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1431    /// graph.
1432    pub(super) fn get_upstreams(
1433        &self,
1434        fragment_id: GlobalFragmentId,
1435    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1436        self.building_graph
1437            .get_upstreams(fragment_id)
1438            .iter()
1439            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1440            .map(|(&id, edge)| (id, edge))
1441    }
1442
1443    /// Returns all building fragments in the graph.
1444    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1445        &self.building_graph.fragments
1446    }
1447
1448    /// Returns all building fragments in the graph, mutable.
1449    pub(super) fn building_fragments_mut(
1450        &mut self,
1451    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1452        &mut self.building_graph.fragments
1453    }
1454
1455    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1456    pub(super) fn max_parallelism(&self) -> usize {
1457        self.building_graph.max_parallelism()
1458    }
1459}