risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26    CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27    generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::id::JobId;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::stream_graph_visitor::{
33    self, visit_stream_node_cont, visit_stream_node_cont_mut,
34};
35use risingwave_connector::sink::catalog::SinkType;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::{PbSink, PbTable, Table};
38use risingwave_pb::ddl_service::TableJobType;
39use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
40use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
41use risingwave_pb::stream_plan::stream_fragment_graph::{
42    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
43};
44use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
45use risingwave_pb::stream_plan::{
46    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
47    PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
48    StreamScanType,
49};
50
51use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
52use crate::controller::id::IdGeneratorManager;
53use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
54use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
55use crate::stream::stream_graph::id::{
56    GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
57};
58use crate::stream::stream_graph::schedule::Distribution;
59use crate::{MetaError, MetaResult};
60
61/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
62/// several additional helper fields.
63#[derive(Debug, Clone)]
64pub(super) struct BuildingFragment {
65    /// The fragment structure from the frontend, with the global fragment ID.
66    inner: StreamFragment,
67
68    /// The ID of the job if it contains the streaming job node.
69    job_id: Option<JobId>,
70
71    /// The required column IDs of each upstream table.
72    /// Will be converted to indices when building the edge connected to the upstream.
73    ///
74    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
75    upstream_job_columns: HashMap<JobId, Vec<PbColumnDesc>>,
76}
77
78impl BuildingFragment {
79    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
80    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
81    fn new(
82        id: GlobalFragmentId,
83        fragment: StreamFragment,
84        job: &StreamingJob,
85        table_id_gen: GlobalTableIdGen,
86    ) -> Self {
87        let mut fragment = StreamFragment {
88            fragment_id: id.as_global_id(),
89            ..fragment
90        };
91
92        // Fill the information of the internal tables in the fragment.
93        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
94
95        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
96        let upstream_job_columns =
97            Self::extract_upstream_columns_except_cross_db_backfill(&fragment);
98
99        Self {
100            inner: fragment,
101            job_id,
102            upstream_job_columns,
103        }
104    }
105
106    /// Extract the internal tables from the fragment.
107    fn extract_internal_tables(&self) -> Vec<Table> {
108        let mut fragment = self.inner.clone();
109        let mut tables = Vec::new();
110        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
111            tables.push(table.clone());
112        });
113        tables
114    }
115
116    /// Fill the information with the internal tables in the fragment.
117    fn fill_internal_tables(
118        fragment: &mut StreamFragment,
119        job: &StreamingJob,
120        table_id_gen: GlobalTableIdGen,
121    ) {
122        let fragment_id = fragment.fragment_id;
123        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
124            table.id = table_id_gen
125                .to_global_id(table.id.as_raw_id())
126                .as_global_id();
127            table.schema_id = job.schema_id();
128            table.database_id = job.database_id();
129            table.name = generate_internal_table_name_with_type(
130                &job.name(),
131                fragment_id,
132                table.id,
133                table_type_name,
134            );
135            table.fragment_id = fragment_id;
136            table.owner = job.owner();
137            table.job_id = Some(job.id());
138        });
139    }
140
141    /// Fill the information with the job in the fragment.
142    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
143        let job_id = job.id();
144        let fragment_id = fragment.fragment_id;
145        let mut has_job = false;
146
147        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
148            NodeBody::Materialize(materialize_node) => {
149                materialize_node.table_id = job_id.as_mv_table_id();
150
151                // Fill the table field of `MaterializeNode` from the job.
152                let table = materialize_node.table.insert(job.table().unwrap().clone());
153                table.fragment_id = fragment_id; // this will later be synced back to `job.table` with `set_info_from_graph`
154                // In production, do not include full definition in the table in plan node.
155                if cfg!(not(debug_assertions)) {
156                    table.definition = job.name();
157                }
158
159                has_job = true;
160            }
161            NodeBody::Sink(sink_node) => {
162                sink_node.sink_desc.as_mut().unwrap().id = job_id.as_sink_id();
163
164                has_job = true;
165            }
166            NodeBody::Dml(dml_node) => {
167                dml_node.table_id = job_id.as_mv_table_id();
168                dml_node.table_version_id = job.table_version_id().unwrap();
169            }
170            NodeBody::StreamFsFetch(fs_fetch_node) => {
171                if let StreamingJob::Table(table_source, _, _) = job
172                    && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
173                    && let Some(source) = table_source
174                {
175                    node_inner.source_id = source.id;
176                    if let Some(id) = source.optional_associated_table_id {
177                        node_inner.associated_table_id = Some(id.into());
178                    }
179                }
180            }
181            NodeBody::Source(source_node) => {
182                match job {
183                    // Note: For table without connector, it has a dummy Source node.
184                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
185                    StreamingJob::Table(source, _table, _table_job_type) => {
186                        if let Some(source_inner) = source_node.source_inner.as_mut()
187                            && let Some(source) = source
188                        {
189                            debug_assert_ne!(source.id, job_id.as_raw_id());
190                            source_inner.source_id = source.id;
191                            if let Some(id) = source.optional_associated_table_id {
192                                source_inner.associated_table_id = Some(id.into());
193                            }
194                        }
195                    }
196                    StreamingJob::Source(source) => {
197                        has_job = true;
198                        if let Some(source_inner) = source_node.source_inner.as_mut() {
199                            debug_assert_eq!(source.id, job_id.as_raw_id());
200                            source_inner.source_id = source.id;
201                            if let Some(id) = source.optional_associated_table_id {
202                                source_inner.associated_table_id = Some(id.into());
203                            }
204                        }
205                    }
206                    // For other job types, no need to fill the source id, since it refers to an existing source.
207                    _ => {}
208                }
209            }
210            NodeBody::StreamCdcScan(node) => {
211                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
212                    table_desc.table_id = job_id.as_mv_table_id();
213                }
214            }
215            NodeBody::VectorIndexWrite(node) => {
216                let table = node.table.as_mut().unwrap();
217                table.id = job_id.as_mv_table_id();
218                table.database_id = job.database_id();
219                table.schema_id = job.schema_id();
220                table.fragment_id = fragment_id;
221                #[cfg(not(debug_assertions))]
222                {
223                    table.definition = job.name();
224                }
225
226                has_job = true;
227            }
228            _ => {}
229        });
230
231        has_job
232    }
233
234    /// Extract the required columns of each upstream table except for cross-db backfill.
235    fn extract_upstream_columns_except_cross_db_backfill(
236        fragment: &StreamFragment,
237    ) -> HashMap<JobId, Vec<PbColumnDesc>> {
238        let mut table_columns = HashMap::new();
239
240        stream_graph_visitor::visit_fragment(fragment, |node_body| {
241            let (table_id, column_ids) = match node_body {
242                NodeBody::StreamScan(stream_scan) => {
243                    if stream_scan.get_stream_scan_type().unwrap()
244                        == StreamScanType::CrossDbSnapshotBackfill
245                    {
246                        return;
247                    }
248                    (
249                        stream_scan.table_id.as_job_id(),
250                        stream_scan.upstream_columns(),
251                    )
252                }
253                NodeBody::CdcFilter(cdc_filter) => (
254                    cdc_filter.upstream_source_id.as_share_source_job_id(),
255                    vec![],
256                ),
257                NodeBody::SourceBackfill(backfill) => (
258                    backfill.upstream_source_id.as_share_source_job_id(),
259                    // FIXME: only pass required columns instead of all columns here
260                    backfill.column_descs(),
261                ),
262                _ => return,
263            };
264            table_columns
265                .try_insert(table_id, column_ids)
266                .expect("currently there should be no two same upstream tables in a fragment");
267        });
268
269        table_columns
270    }
271
272    pub fn has_shuffled_backfill(&self) -> bool {
273        let stream_node = match self.inner.node.as_ref() {
274            Some(node) => node,
275            _ => return false,
276        };
277        let mut has_shuffled_backfill = false;
278        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
279        visit_stream_node_cont(stream_node, |node| {
280            let is_shuffled_backfill = if let Some(node) = &node.node_body
281                && let Some(node) = node.as_stream_scan()
282            {
283                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
284                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
285            } else {
286                false
287            };
288            if is_shuffled_backfill {
289                *has_shuffled_backfill_mut_ref = true;
290                false
291            } else {
292                true
293            }
294        });
295        has_shuffled_backfill
296    }
297}
298
299impl Deref for BuildingFragment {
300    type Target = StreamFragment;
301
302    fn deref(&self) -> &Self::Target {
303        &self.inner
304    }
305}
306
307impl DerefMut for BuildingFragment {
308    fn deref_mut(&mut self) -> &mut Self::Target {
309        &mut self.inner
310    }
311}
312
313/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
314/// different variants.
315#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
316pub(super) enum EdgeId {
317    /// The edge between two building (internal) fragments.
318    Internal {
319        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
320        /// See [`StreamFragmentEdgeProto`].
321        link_id: u64,
322    },
323
324    /// The edge between an upstream external fragment and downstream building fragment. Used for
325    /// MV on MV.
326    UpstreamExternal {
327        /// The ID of the upstream table or materialized view.
328        upstream_job_id: JobId,
329        /// The ID of the downstream fragment.
330        downstream_fragment_id: GlobalFragmentId,
331    },
332
333    /// The edge between an upstream building fragment and downstream external fragment. Used for
334    /// schema change (replace table plan).
335    DownstreamExternal(DownstreamExternalEdgeId),
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
339pub(super) struct DownstreamExternalEdgeId {
340    /// The ID of the original upstream fragment (`Materialize`).
341    pub(super) original_upstream_fragment_id: GlobalFragmentId,
342    /// The ID of the downstream fragment.
343    pub(super) downstream_fragment_id: GlobalFragmentId,
344}
345
346/// The edge in the fragment graph.
347///
348/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
349#[derive(Debug, Clone)]
350pub(super) struct StreamFragmentEdge {
351    /// The ID of the edge.
352    pub id: EdgeId,
353
354    /// The strategy used for dispatching the data.
355    pub dispatch_strategy: DispatchStrategy,
356}
357
358impl StreamFragmentEdge {
359    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
360        Self {
361            // By creating an edge from the protobuf, we know that the edge is from the frontend and
362            // is internal.
363            id: EdgeId::Internal {
364                link_id: edge.link_id,
365            },
366            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
367        }
368    }
369}
370
371fn clone_fragment(
372    fragment: &Fragment,
373    id_generator_manager: &IdGeneratorManager,
374    actor_id_counter: &AtomicU32,
375) -> Fragment {
376    let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
377        .to_global_id(0)
378        .as_global_id();
379    let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
380    Fragment {
381        fragment_id,
382        fragment_type_mask: fragment.fragment_type_mask,
383        distribution_type: fragment.distribution_type,
384        actors: fragment
385            .actors
386            .iter()
387            .enumerate()
388            .map(|(i, actor)| StreamActor {
389                actor_id: actor_id_gen.to_global_id(i as _).as_global_id(),
390                fragment_id,
391                vnode_bitmap: actor.vnode_bitmap.clone(),
392                mview_definition: actor.mview_definition.clone(),
393                expr_context: actor.expr_context.clone(),
394                config_override: actor.config_override.clone(),
395            })
396            .collect(),
397        state_table_ids: fragment.state_table_ids.clone(),
398        maybe_vnode_count: fragment.maybe_vnode_count,
399        nodes: fragment.nodes.clone(),
400    }
401}
402
403pub fn check_sink_fragments_support_refresh_schema(
404    fragments: &BTreeMap<FragmentId, Fragment>,
405) -> MetaResult<()> {
406    if fragments.len() != 1 {
407        return Err(anyhow!(
408            "sink with auto schema change should have only 1 fragment, but got {:?}",
409            fragments.len()
410        )
411        .into());
412    }
413    let (_, fragment) = fragments.first_key_value().expect("non-empty");
414    let sink_node = &fragment.nodes;
415    let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
416        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
417    };
418    let [stream_scan_node] = sink_node.input.as_slice() else {
419        panic!("Sink has more than 1 input: {:?}", sink_node.input);
420    };
421    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
422        return Err(anyhow!(
423            "expect PbNodeBody::StreamScan but got: {:?}",
424            stream_scan_node.node_body
425        )
426        .into());
427    };
428    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
429    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
430        return Err(anyhow!(
431            "unsupported stream_scan_type for auto refresh schema: {:?}",
432            stream_scan_type
433        )
434        .into());
435    }
436    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
437        panic!(
438            "the number of StreamScan inputs is not 2: {:?}",
439            stream_scan_node.input
440        );
441    };
442    let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
443        return Err(anyhow!(
444            "expect PbNodeBody::Merge but got: {:?}",
445            merge_node.node_body
446        )
447        .into());
448    };
449    Ok(())
450}
451
452pub fn rewrite_refresh_schema_sink_fragment(
453    original_sink_fragment: &Fragment,
454    sink: &PbSink,
455    newly_added_columns: &[ColumnCatalog],
456    upstream_table: &PbTable,
457    upstream_table_fragment_id: FragmentId,
458    id_generator_manager: &IdGeneratorManager,
459    actor_id_counter: &AtomicU32,
460) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
461    let mut new_sink_columns = sink.columns.clone();
462    fn extend_sink_columns(
463        sink_columns: &mut Vec<PbColumnCatalog>,
464        new_columns: &[ColumnCatalog],
465        get_column_name: impl Fn(&String) -> String,
466    ) {
467        let next_column_id = sink_columns
468            .iter()
469            .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
470            .max()
471            .unwrap_or(1);
472        sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
473            let mut col = col.to_protobuf();
474            let column_desc = col.column_desc.as_mut().unwrap();
475            column_desc.column_id = next_column_id + (i as i32);
476            column_desc.name = get_column_name(&column_desc.name);
477            col
478        }));
479    }
480    extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
481        name.clone()
482    });
483
484    let mut new_sink_fragment = clone_fragment(
485        original_sink_fragment,
486        id_generator_manager,
487        actor_id_counter,
488    );
489    let sink_node = &mut new_sink_fragment.nodes;
490    let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
491        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
492    };
493    let [stream_scan_node] = sink_node.input.as_mut_slice() else {
494        panic!("Sink has more than 1 input: {:?}", sink_node.input);
495    };
496    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
497        return Err(anyhow!(
498            "expect PbNodeBody::StreamScan but got: {:?}",
499            stream_scan_node.node_body
500        )
501        .into());
502    };
503    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
504        panic!(
505            "the number of StreamScan inputs is not 2: {:?}",
506            stream_scan_node.input
507        );
508    };
509    let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
510        return Err(anyhow!(
511            "expect PbNodeBody::Merge but got: {:?}",
512            merge_node.node_body
513        )
514        .into());
515    };
516    // update sink_node
517    // following logic in <StreamSink as Explain>::distill
518    sink_node.identity = {
519        let sink_type = SinkType::from_proto(sink.sink_type());
520        let sink_type_str = sink_type.type_str();
521        let column_names = new_sink_columns
522            .iter()
523            .map(|col| {
524                ColumnCatalog::from(col.clone())
525                    .name_with_hidden()
526                    .to_string()
527            })
528            .join(", ");
529        let downstream_pk = if !sink_type.is_append_only() {
530            let downstream_pk = sink
531                .downstream_pk
532                .iter()
533                .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
534                .collect_vec();
535            format!(", downstream_pk: {downstream_pk:?}")
536        } else {
537            "".to_owned()
538        };
539        format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
540    };
541    sink_node
542        .fields
543        .extend(newly_added_columns.iter().map(|col| {
544            Field::new(
545                format!("{}.{}", upstream_table.name, col.column_desc.name),
546                col.data_type().clone(),
547            )
548            .to_prost()
549        }));
550
551    let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
552        extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
553            format!("{}_{}", upstream_table.name, name)
554        });
555        Some(log_store_table.clone())
556    } else {
557        None
558    };
559    sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
560
561    // update stream scan node
562    stream_scan_node
563        .fields
564        .extend(newly_added_columns.iter().map(|col| {
565            Field::new(
566                format!("{}.{}", upstream_table.name, col.column_desc.name),
567                col.data_type().clone(),
568            )
569            .to_prost()
570        }));
571    // following logic in <StreamTableScan as Explain>::distill
572    stream_scan_node.identity = {
573        let columns = stream_scan_node
574            .fields
575            .iter()
576            .map(|col| &col.name)
577            .join(", ");
578        format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
579    };
580
581    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
582    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
583        return Err(anyhow!(
584            "unsupported stream_scan_type for auto refresh schema: {:?}",
585            stream_scan_type
586        )
587        .into());
588    }
589    scan.arrangement_table = Some(upstream_table.clone());
590    scan.output_indices.extend(
591        (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
592    );
593    scan.upstream_column_ids.extend(
594        newly_added_columns
595            .iter()
596            .map(|col| col.column_id().get_id()),
597    );
598    let table_desc = scan.table_desc.as_mut().unwrap();
599    table_desc
600        .value_indices
601        .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
602    table_desc.columns.extend(
603        newly_added_columns
604            .iter()
605            .map(|col| col.column_desc.to_protobuf()),
606    );
607
608    // update merge node
609    merge_node.fields = scan
610        .upstream_column_ids
611        .iter()
612        .map(|&column_id| {
613            let col = upstream_table
614                .columns
615                .iter()
616                .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
617                .unwrap();
618            let col_desc = col.column_desc.as_ref().unwrap();
619            Field::new(
620                col_desc.name.clone(),
621                col_desc.column_type.as_ref().unwrap().into(),
622            )
623            .to_prost()
624        })
625        .collect();
626    merge.upstream_fragment_id = upstream_table_fragment_id;
627    Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
628}
629
630/// Adjacency list (G) of backfill orders.
631/// `G[10] -> [1, 2, 11]`
632/// means for the backfill node in `fragment 10`
633/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
634pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
635
636/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
637/// from the frontend.
638///
639/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
640/// that contains the additional information of pre-existing
641/// fragments, which are connected to the graph's top-most or bottom-most fragments.
642#[derive(Default, Debug)]
643pub struct StreamFragmentGraph {
644    /// stores all the fragments in the graph.
645    pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
646
647    /// stores edges between fragments: upstream => downstream.
648    pub(super) downstreams:
649        HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
650
651    /// stores edges between fragments: downstream -> upstream.
652    pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
653
654    /// Dependent relations of this job.
655    dependent_table_ids: HashSet<TableId>,
656
657    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
658    /// variable. If not specified, all active worker slots will be used.
659    specified_parallelism: Option<NonZeroUsize>,
660
661    /// Specified max parallelism, i.e., expected vnode count for the graph.
662    ///
663    /// The scheduler on the meta service will use this as a hint to decide the vnode count
664    /// for each fragment.
665    ///
666    /// Note that the actual vnode count may be different from this value.
667    /// For example, a no-shuffle exchange between current fragment graph and an existing
668    /// upstream fragment graph requires two fragments to be in the same distribution,
669    /// thus the same vnode count.
670    max_parallelism: usize,
671
672    /// The backfill ordering strategy of the graph.
673    backfill_order: BackfillOrder,
674}
675
676impl StreamFragmentGraph {
677    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
678    /// global IDs correctly filled.
679    pub fn new(
680        env: &MetaSrvEnv,
681        proto: StreamFragmentGraphProto,
682        job: &StreamingJob,
683    ) -> MetaResult<Self> {
684        let fragment_id_gen =
685            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
686        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
687        // with `refill_internal_table_ids` later.
688        // TODO: refactor the code to remove this step.
689        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
690
691        // Create nodes.
692        let fragments: HashMap<_, _> = proto
693            .fragments
694            .into_iter()
695            .map(|(id, fragment)| {
696                let id = fragment_id_gen.to_global_id(id.as_raw_id());
697                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
698                (id, fragment)
699            })
700            .collect();
701
702        assert_eq!(
703            fragments
704                .values()
705                .map(|f| f.extract_internal_tables().len() as u32)
706                .sum::<u32>(),
707            proto.table_ids_cnt
708        );
709
710        // Create edges.
711        let mut downstreams = HashMap::new();
712        let mut upstreams = HashMap::new();
713
714        for edge in proto.edges {
715            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id.as_raw_id());
716            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id.as_raw_id());
717            let edge = StreamFragmentEdge::from_protobuf(&edge);
718
719            upstreams
720                .entry(downstream_id)
721                .or_insert_with(HashMap::new)
722                .try_insert(upstream_id, edge.clone())
723                .unwrap();
724            downstreams
725                .entry(upstream_id)
726                .or_insert_with(HashMap::new)
727                .try_insert(downstream_id, edge)
728                .unwrap();
729        }
730
731        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
732        // frontend), instead of visiting the graph ourselves.
733        let dependent_table_ids = proto.dependent_table_ids.iter().copied().collect();
734
735        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
736            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
737        } else {
738            None
739        };
740
741        let max_parallelism = proto.max_parallelism as usize;
742        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
743            order: Default::default(),
744        });
745
746        Ok(Self {
747            fragments,
748            downstreams,
749            upstreams,
750            dependent_table_ids,
751            specified_parallelism,
752            max_parallelism,
753            backfill_order,
754        })
755    }
756
757    /// Retrieve the **incomplete** internal tables map of the whole graph.
758    ///
759    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
760    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
761    /// Be careful when using the returned values.
762    pub fn incomplete_internal_tables(&self) -> BTreeMap<TableId, Table> {
763        let mut tables = BTreeMap::new();
764        for fragment in self.fragments.values() {
765            for table in fragment.extract_internal_tables() {
766                let table_id = table.id;
767                tables
768                    .try_insert(table_id, table)
769                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
770            }
771        }
772        tables
773    }
774
775    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
776    /// `create_internal_table_catalog`.
777    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<TableId, TableId>) {
778        for fragment in self.fragments.values_mut() {
779            stream_graph_visitor::visit_internal_tables(
780                &mut fragment.inner,
781                |table, _table_type_name| {
782                    let target = table_id_map.get(&table.id).cloned().unwrap();
783                    table.id = target;
784                },
785            );
786        }
787    }
788
789    /// Use a trivial algorithm to match the internal tables of the new graph for
790    /// `ALTER TABLE` or `ALTER SOURCE`.
791    pub fn fit_internal_tables_trivial(
792        &mut self,
793        mut old_internal_tables: Vec<Table>,
794    ) -> MetaResult<()> {
795        let mut new_internal_table_ids = Vec::new();
796        for fragment in self.fragments.values() {
797            for table in &fragment.extract_internal_tables() {
798                new_internal_table_ids.push(table.id);
799            }
800        }
801
802        if new_internal_table_ids.len() != old_internal_tables.len() {
803            bail!(
804                "Different number of internal tables. New: {}, Old: {}",
805                new_internal_table_ids.len(),
806                old_internal_tables.len()
807            );
808        }
809        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
810        new_internal_table_ids.sort();
811
812        let internal_table_id_map = new_internal_table_ids
813            .into_iter()
814            .zip_eq_fast(old_internal_tables.into_iter())
815            .collect::<HashMap<_, _>>();
816
817        // TODO(alter-mv): unify this with `fit_internal_table_ids_with_mapping` after we
818        // confirm the behavior is the same.
819        for fragment in self.fragments.values_mut() {
820            stream_graph_visitor::visit_internal_tables(
821                &mut fragment.inner,
822                |table, _table_type_name| {
823                    // XXX: this replaces the entire table, instead of just the id!
824                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
825                    *table = target;
826                },
827            );
828        }
829
830        Ok(())
831    }
832
833    /// Fit the internal tables' `table_id`s according to the given mapping.
834    pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<TableId, Table>) {
835        for fragment in self.fragments.values_mut() {
836            stream_graph_visitor::visit_internal_tables(
837                &mut fragment.inner,
838                |table, _table_type_name| {
839                    let target = matches.remove(&table.id).unwrap_or_else(|| {
840                        panic!("no matching table for table {}({})", table.id, table.name)
841                    });
842                    table.id = target.id;
843                    table.maybe_vnode_count = target.maybe_vnode_count;
844                },
845            );
846        }
847    }
848
849    /// Returns the fragment id where the streaming job node located.
850    pub fn table_fragment_id(&self) -> FragmentId {
851        self.fragments
852            .values()
853            .filter(|b| b.job_id.is_some())
854            .map(|b| b.fragment_id)
855            .exactly_one()
856            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
857    }
858
859    /// Returns the fragment id where the table dml is received.
860    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
861        self.fragments
862            .values()
863            .filter(|b| {
864                FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
865            })
866            .map(|b| b.fragment_id)
867            .at_most_one()
868            .expect("require at most 1 dml node when creating the streaming job")
869    }
870
871    /// Get the dependent streaming job ids of this job.
872    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
873        &self.dependent_table_ids
874    }
875
876    /// Get the parallelism of the job, if specified by the user.
877    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
878        self.specified_parallelism
879    }
880
881    /// Get the expected vnode count of the graph. See documentation of the field for more details.
882    pub fn max_parallelism(&self) -> usize {
883        self.max_parallelism
884    }
885
886    /// Get downstreams of a fragment.
887    fn get_downstreams(
888        &self,
889        fragment_id: GlobalFragmentId,
890    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
891        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
892    }
893
894    /// Get upstreams of a fragment.
895    fn get_upstreams(
896        &self,
897        fragment_id: GlobalFragmentId,
898    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
899        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
900    }
901
902    pub fn collect_snapshot_backfill_info(
903        &self,
904    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
905        Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
906            (
907                fragment.node.as_ref().unwrap(),
908                fragment.fragment_type_mask.into(),
909            )
910        }))
911    }
912
913    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
914    pub fn collect_snapshot_backfill_info_impl(
915        fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
916    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
917        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
918        let mut cross_db_info = SnapshotBackfillInfo {
919            upstream_mv_table_id_to_backfill_epoch: Default::default(),
920        };
921        let mut result = Ok(());
922        for (node, fragment_type_mask) in fragments {
923            visit_stream_node_cont(node, |node| {
924                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
925                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
926                        .expect("invalid stream_scan_type");
927                    let is_snapshot_backfill = match stream_scan_type {
928                        StreamScanType::SnapshotBackfill => {
929                            assert!(
930                                fragment_type_mask
931                                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
932                            );
933                            true
934                        }
935                        StreamScanType::CrossDbSnapshotBackfill => {
936                            assert!(
937                                fragment_type_mask
938                                    .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
939                            );
940                            cross_db_info
941                                .upstream_mv_table_id_to_backfill_epoch
942                                .insert(stream_scan.table_id, stream_scan.snapshot_backfill_epoch);
943
944                            return true;
945                        }
946                        _ => false,
947                    };
948
949                    match &mut prev_stream_scan {
950                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
951                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
952                                (Some(prev_snapshot_backfill_info), true) => {
953                                    prev_snapshot_backfill_info
954                                        .upstream_mv_table_id_to_backfill_epoch
955                                        .insert(
956                                            stream_scan.table_id,
957                                            stream_scan.snapshot_backfill_epoch,
958                                        );
959                                    true
960                                }
961                                (None, false) => true,
962                                (_, _) => {
963                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
964                                    false
965                                }
966                            }
967                        }
968                        None => {
969                            prev_stream_scan = Some((
970                                if is_snapshot_backfill {
971                                    Some(SnapshotBackfillInfo {
972                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
973                                            [(
974                                                stream_scan.table_id,
975                                                stream_scan.snapshot_backfill_epoch,
976                                            )],
977                                        ),
978                                    })
979                                } else {
980                                    None
981                                },
982                                *stream_scan.clone(),
983                            ));
984                            true
985                        }
986                    }
987                } else {
988                    true
989                }
990            })
991        }
992        result.map(|_| {
993            (
994                prev_stream_scan
995                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
996                    .unwrap_or(None),
997                cross_db_info,
998            )
999        })
1000    }
1001
1002    /// Collect the mapping from table / `source_id` -> `fragment_id`
1003    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
1004        let mut mapping = HashMap::new();
1005        for (fragment_id, fragment) in &self.fragments {
1006            let fragment_id = fragment_id.as_global_id();
1007            let fragment_mask = fragment.fragment_type_mask;
1008            let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
1009            let has_some_scan = candidates
1010                .into_iter()
1011                .any(|flag| (fragment_mask & flag as u32) > 0);
1012            if has_some_scan {
1013                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1014                    match node.node_body.as_ref() {
1015                        Some(NodeBody::StreamScan(stream_scan)) => {
1016                            let table_id = stream_scan.table_id;
1017                            let fragments: &mut Vec<_> =
1018                                mapping.entry(table_id.as_raw_id()).or_default();
1019                            fragments.push(fragment_id);
1020                            // each fragment should have only 1 scan node.
1021                            false
1022                        }
1023                        Some(NodeBody::SourceBackfill(source_backfill)) => {
1024                            let source_id = source_backfill.upstream_source_id;
1025                            let fragments: &mut Vec<_> =
1026                                mapping.entry(source_id.as_raw_id()).or_default();
1027                            fragments.push(fragment_id);
1028                            // each fragment should have only 1 scan node.
1029                            false
1030                        }
1031                        _ => true,
1032                    }
1033                })
1034            }
1035        }
1036        mapping
1037    }
1038
1039    /// Initially the mapping that comes from frontend is between `table_ids`.
1040    /// We should remap it to fragment level, since we track progress by actor, and we can get
1041    /// a fragment <-> actor mapping
1042    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1043        let mapping = self.collect_backfill_mapping();
1044        let mut fragment_ordering: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1045
1046        // 1. Add backfill dependencies
1047        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1048            let fragment_ids = mapping.get(rel_id).unwrap();
1049            for fragment_id in fragment_ids {
1050                let downstream_fragment_ids = downstream_rel_ids
1051                    .data
1052                    .iter()
1053                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1054                    .copied()
1055                    .collect();
1056                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1057            }
1058        }
1059
1060        // If no backfill order is specified, we still need to ensure that all backfill fragments
1061        // run before LocalityProvider fragments.
1062        if fragment_ordering.is_empty() {
1063            for value in mapping.values() {
1064                for &fragment_id in value {
1065                    fragment_ordering.entry(fragment_id).or_default();
1066                }
1067            }
1068        }
1069
1070        // 2. Add dependencies: all backfill fragments should run before LocalityProvider fragments
1071        let locality_provider_dependencies = self.find_locality_provider_dependencies();
1072
1073        let backfill_fragments: HashSet<FragmentId> = mapping.values().flatten().copied().collect();
1074
1075        // Calculate LocalityProvider root fragments (zero indegree)
1076        // Root fragments are those that appear as keys but never appear as downstream dependencies
1077        let all_locality_provider_fragments: HashSet<FragmentId> =
1078            locality_provider_dependencies.keys().copied().collect();
1079        let downstream_locality_provider_fragments: HashSet<FragmentId> =
1080            locality_provider_dependencies
1081                .values()
1082                .flatten()
1083                .copied()
1084                .collect();
1085        let locality_provider_root_fragments: Vec<FragmentId> = all_locality_provider_fragments
1086            .difference(&downstream_locality_provider_fragments)
1087            .copied()
1088            .collect();
1089
1090        // For each backfill fragment, add only the root LocalityProvider fragments as dependents
1091        // This ensures backfill completes before any LocalityProvider starts, while minimizing dependencies
1092        for &backfill_fragment_id in &backfill_fragments {
1093            fragment_ordering
1094                .entry(backfill_fragment_id)
1095                .or_default()
1096                .extend(locality_provider_root_fragments.iter().copied());
1097        }
1098
1099        // 3. Add LocalityProvider internal dependencies
1100        for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1101            fragment_ordering
1102                .entry(fragment_id)
1103                .or_default()
1104                .extend(downstream_fragments);
1105        }
1106
1107        // Deduplicate downstream entries per fragment; overlaps are common when the same fragment
1108        // is reached via multiple paths (e.g., with StreamShare) and would otherwise appear
1109        // multiple times.
1110        for downstream in fragment_ordering.values_mut() {
1111            let mut seen = HashSet::new();
1112            downstream.retain(|id| seen.insert(*id));
1113        }
1114
1115        fragment_ordering
1116    }
1117
1118    pub fn find_locality_provider_fragment_state_table_mapping(
1119        &self,
1120    ) -> HashMap<FragmentId, Vec<TableId>> {
1121        let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1122
1123        for (fragment_id, fragment) in &self.fragments {
1124            let fragment_id = fragment_id.as_global_id();
1125
1126            // Check if this fragment contains a LocalityProvider node
1127            if let Some(node) = fragment.node.as_ref() {
1128                let mut state_table_ids = Vec::new();
1129
1130                visit_stream_node_cont(node, |stream_node| {
1131                    if let Some(NodeBody::LocalityProvider(locality_provider)) =
1132                        stream_node.node_body.as_ref()
1133                    {
1134                        // Collect state table ID (except the progress table)
1135                        let state_table_id = locality_provider
1136                            .state_table
1137                            .as_ref()
1138                            .expect("must have state table")
1139                            .id;
1140                        state_table_ids.push(state_table_id);
1141                        false // Stop visiting once we find a LocalityProvider
1142                    } else {
1143                        true // Continue visiting
1144                    }
1145                });
1146
1147                if !state_table_ids.is_empty() {
1148                    mapping.insert(fragment_id, state_table_ids);
1149                }
1150            }
1151        }
1152
1153        mapping
1154    }
1155
1156    /// Find dependency relationships among fragments containing `LocalityProvider` nodes.
1157    /// Returns a mapping where each fragment ID maps to a list of fragment IDs that should be processed after it.
1158    /// Following the same semantics as `FragmentBackfillOrder`:
1159    /// `G[10] -> [1, 2, 11]` means `LocalityProvider` in fragment 10 should be processed
1160    /// before `LocalityProviders` in fragments 1, 2, and 11.
1161    ///
1162    /// This method assumes each fragment contains at most one `LocalityProvider` node.
1163    pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1164        let mut locality_provider_fragments = HashSet::new();
1165        let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1166
1167        // First, identify all fragments that contain LocalityProvider nodes
1168        for (fragment_id, fragment) in &self.fragments {
1169            let fragment_id = fragment_id.as_global_id();
1170            let has_locality_provider = self.fragment_has_locality_provider(fragment);
1171
1172            if has_locality_provider {
1173                locality_provider_fragments.insert(fragment_id);
1174                dependencies.entry(fragment_id).or_default();
1175            }
1176        }
1177
1178        // Build dependency relationships between LocalityProvider fragments
1179        // For each LocalityProvider fragment, find all downstream LocalityProvider fragments
1180        // The upstream fragment should be processed before the downstream fragments
1181        for &provider_fragment_id in &locality_provider_fragments {
1182            let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1183
1184            // Find all fragments downstream from this LocalityProvider fragment
1185            let mut visited = HashSet::new();
1186            let mut downstream_locality_providers = Vec::new();
1187
1188            self.collect_downstream_locality_providers(
1189                provider_fragment_global_id,
1190                &locality_provider_fragments,
1191                &mut visited,
1192                &mut downstream_locality_providers,
1193            );
1194
1195            // This fragment should be processed before all its downstream LocalityProvider fragments
1196            dependencies
1197                .entry(provider_fragment_id)
1198                .or_default()
1199                .extend(downstream_locality_providers);
1200        }
1201
1202        dependencies
1203    }
1204
1205    fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1206        let mut has_locality_provider = false;
1207
1208        if let Some(node) = fragment.node.as_ref() {
1209            visit_stream_node_cont(node, |stream_node| {
1210                if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1211                    has_locality_provider = true;
1212                    false // Stop visiting once we find a LocalityProvider
1213                } else {
1214                    true // Continue visiting
1215                }
1216            });
1217        }
1218
1219        has_locality_provider
1220    }
1221
1222    /// Recursively collect downstream `LocalityProvider` fragments
1223    fn collect_downstream_locality_providers(
1224        &self,
1225        current_fragment_id: GlobalFragmentId,
1226        locality_provider_fragments: &HashSet<FragmentId>,
1227        visited: &mut HashSet<GlobalFragmentId>,
1228        downstream_providers: &mut Vec<FragmentId>,
1229    ) {
1230        if visited.contains(&current_fragment_id) {
1231            return;
1232        }
1233        visited.insert(current_fragment_id);
1234
1235        // Check all downstream fragments
1236        for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1237            let downstream_fragment_id = downstream_id.as_global_id();
1238
1239            // If the downstream fragment is a LocalityProvider, add it to results
1240            if locality_provider_fragments.contains(&downstream_fragment_id) {
1241                downstream_providers.push(downstream_fragment_id);
1242            }
1243
1244            // Recursively check further downstream
1245            self.collect_downstream_locality_providers(
1246                downstream_id,
1247                locality_provider_fragments,
1248                visited,
1249                downstream_providers,
1250            );
1251        }
1252    }
1253}
1254
1255/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
1256/// Return `true` when has change applied.
1257pub fn fill_snapshot_backfill_epoch(
1258    node: &mut StreamNode,
1259    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1260    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1261) -> MetaResult<bool> {
1262    let mut result = Ok(());
1263    let mut applied = false;
1264    visit_stream_node_cont_mut(node, |node| {
1265        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1266            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1267                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1268        {
1269            result = try {
1270                let table_id = stream_scan.table_id;
1271                let snapshot_epoch = cross_db_snapshot_backfill_info
1272                    .upstream_mv_table_id_to_backfill_epoch
1273                    .get(&table_id)
1274                    .or_else(|| {
1275                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1276                            snapshot_backfill_info
1277                                .upstream_mv_table_id_to_backfill_epoch
1278                                .get(&table_id)
1279                        })
1280                    })
1281                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1282                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1283                if let Some(prev_snapshot_epoch) =
1284                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1285                {
1286                    Err(anyhow!(
1287                        "snapshot backfill epoch set again: {} {} {}",
1288                        table_id,
1289                        prev_snapshot_epoch,
1290                        snapshot_epoch
1291                    ))?;
1292                }
1293                applied = true;
1294            };
1295            result.is_ok()
1296        } else {
1297            true
1298        }
1299    });
1300    result.map(|_| applied)
1301}
1302
1303static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1304    LazyLock::new(HashMap::new);
1305
1306/// A fragment that is either being built or already exists. Used for generalize the logic of
1307/// [`crate::stream::ActorGraphBuilder`].
1308#[derive(Debug, Clone, EnumAsInner)]
1309pub(super) enum EitherFragment {
1310    /// An internal fragment that is being built for the current streaming job.
1311    Building(BuildingFragment),
1312
1313    /// An existing fragment that is external but connected to the fragments being built.
1314    Existing(SharedFragmentInfo),
1315}
1316
1317/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
1318/// fragments, which are connected to the graph's top-most or bottom-most fragments.
1319///
1320/// For example,
1321/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
1322///   `Materialize` node will be included in this structure.
1323/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
1324///   containing the `StreamScan` nodes will be included in this structure.
1325#[derive(Debug)]
1326pub struct CompleteStreamFragmentGraph {
1327    /// The fragment graph of the streaming job being built.
1328    building_graph: StreamFragmentGraph,
1329
1330    /// The required information of existing fragments.
1331    existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1332
1333    /// The location of the actors in the existing fragments.
1334    existing_actor_location: HashMap<ActorId, WorkerId>,
1335
1336    /// Extra edges between existing fragments and the building fragments.
1337    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1338
1339    /// Extra edges between existing fragments and the building fragments.
1340    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1341}
1342
1343pub struct FragmentGraphUpstreamContext {
1344    /// Root fragment is the root of upstream stream graph, which can be a
1345    /// mview fragment or source fragment for cdc source job
1346    pub upstream_root_fragments: HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1347    pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1348}
1349
1350pub struct FragmentGraphDownstreamContext {
1351    pub original_root_fragment_id: FragmentId,
1352    pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1353    pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1354}
1355
1356impl CompleteStreamFragmentGraph {
1357    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
1358    /// upstream mviews.
1359    #[cfg(test)]
1360    pub fn for_test(graph: StreamFragmentGraph) -> Self {
1361        Self {
1362            building_graph: graph,
1363            existing_fragments: Default::default(),
1364            existing_actor_location: Default::default(),
1365            extra_downstreams: Default::default(),
1366            extra_upstreams: Default::default(),
1367        }
1368    }
1369
1370    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
1371    /// e.g., MV on MV and CDC/Source Table with the upstream existing
1372    /// `Materialize` or `Source` fragments.
1373    pub fn with_upstreams(
1374        graph: StreamFragmentGraph,
1375        upstream_context: FragmentGraphUpstreamContext,
1376        job_type: StreamingJobType,
1377    ) -> MetaResult<Self> {
1378        Self::build_helper(graph, Some(upstream_context), None, job_type)
1379    }
1380
1381    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
1382    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
1383    pub fn with_downstreams(
1384        graph: StreamFragmentGraph,
1385        downstream_context: FragmentGraphDownstreamContext,
1386        job_type: StreamingJobType,
1387    ) -> MetaResult<Self> {
1388        Self::build_helper(graph, None, Some(downstream_context), job_type)
1389    }
1390
1391    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
1392    pub fn with_upstreams_and_downstreams(
1393        graph: StreamFragmentGraph,
1394        upstream_context: FragmentGraphUpstreamContext,
1395        downstream_context: FragmentGraphDownstreamContext,
1396        job_type: StreamingJobType,
1397    ) -> MetaResult<Self> {
1398        Self::build_helper(
1399            graph,
1400            Some(upstream_context),
1401            Some(downstream_context),
1402            job_type,
1403        )
1404    }
1405
1406    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
1407    fn build_helper(
1408        mut graph: StreamFragmentGraph,
1409        upstream_ctx: Option<FragmentGraphUpstreamContext>,
1410        downstream_ctx: Option<FragmentGraphDownstreamContext>,
1411        job_type: StreamingJobType,
1412    ) -> MetaResult<Self> {
1413        let mut extra_downstreams = HashMap::new();
1414        let mut extra_upstreams = HashMap::new();
1415        let mut existing_fragments = HashMap::new();
1416
1417        let mut existing_actor_location = HashMap::new();
1418
1419        if let Some(FragmentGraphUpstreamContext {
1420            upstream_root_fragments,
1421            upstream_actor_location,
1422        }) = upstream_ctx
1423        {
1424            for (&id, fragment) in &mut graph.fragments {
1425                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1426
1427                for (&upstream_job_id, required_columns) in &fragment.upstream_job_columns {
1428                    let (upstream_fragment, nodes) = upstream_root_fragments
1429                        .get(&upstream_job_id)
1430                        .context("upstream fragment not found")?;
1431                    let upstream_root_fragment_id =
1432                        GlobalFragmentId::new(upstream_fragment.fragment_id);
1433
1434                    let edge = match job_type {
1435                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1436                            // we traverse all fragments in the graph, and we should find out the
1437                            // CdcFilter fragment and add an edge between upstream source fragment and it.
1438                            assert_ne!(
1439                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1440                                0
1441                            );
1442
1443                            tracing::debug!(
1444                                ?upstream_root_fragment_id,
1445                                ?required_columns,
1446                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
1447                                current_frag_id=?id,
1448                                "CdcFilter with upstream source fragment"
1449                            );
1450
1451                            StreamFragmentEdge {
1452                                id: EdgeId::UpstreamExternal {
1453                                    upstream_job_id,
1454                                    downstream_fragment_id: id,
1455                                },
1456                                // We always use `NoShuffle` for the exchange between the upstream
1457                                // `Source` and the downstream `StreamScan` of the new cdc table.
1458                                dispatch_strategy: DispatchStrategy {
1459                                    r#type: DispatcherType::NoShuffle as _,
1460                                    dist_key_indices: vec![], // not used for `NoShuffle`
1461                                    output_mapping: DispatchOutputMapping::identical(
1462                                        CDC_SOURCE_COLUMN_NUM as _,
1463                                    )
1464                                    .into(),
1465                                },
1466                            }
1467                        }
1468
1469                        // handle MV on MV/Source
1470                        StreamingJobType::MaterializedView
1471                        | StreamingJobType::Sink
1472                        | StreamingJobType::Index => {
1473                            // Build the extra edges between the upstream `Materialize` and
1474                            // the downstream `StreamScan` of the new job.
1475                            if upstream_fragment
1476                                .fragment_type_mask
1477                                .contains(FragmentTypeFlag::Mview)
1478                            {
1479                                // Resolve the required output columns from the upstream materialized view.
1480                                let (dist_key_indices, output_mapping) = {
1481                                    let mview_node =
1482                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1483                                    let all_columns = mview_node.column_descs();
1484                                    let dist_key_indices = mview_node.dist_key_indices();
1485                                    let output_mapping = gen_output_mapping(
1486                                        required_columns,
1487                                        &all_columns,
1488                                    )
1489                                    .context(
1490                                        "BUG: column not found in the upstream materialized view",
1491                                    )?;
1492                                    (dist_key_indices, output_mapping)
1493                                };
1494                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1495                                    uses_shuffled_backfill,
1496                                    dist_key_indices,
1497                                    output_mapping,
1498                                );
1499
1500                                StreamFragmentEdge {
1501                                    id: EdgeId::UpstreamExternal {
1502                                        upstream_job_id,
1503                                        downstream_fragment_id: id,
1504                                    },
1505                                    dispatch_strategy,
1506                                }
1507                            }
1508                            // Build the extra edges between the upstream `Source` and
1509                            // the downstream `SourceBackfill` of the new job.
1510                            else if upstream_fragment
1511                                .fragment_type_mask
1512                                .contains(FragmentTypeFlag::Source)
1513                            {
1514                                let output_mapping = {
1515                                    let source_node =
1516                                        nodes.get_node_body().unwrap().as_source().unwrap();
1517
1518                                    let all_columns = source_node.column_descs().unwrap();
1519                                    gen_output_mapping(required_columns, &all_columns).context(
1520                                        "BUG: column not found in the upstream source node",
1521                                    )?
1522                                };
1523
1524                                StreamFragmentEdge {
1525                                    id: EdgeId::UpstreamExternal {
1526                                        upstream_job_id,
1527                                        downstream_fragment_id: id,
1528                                    },
1529                                    // We always use `NoShuffle` for the exchange between the upstream
1530                                    // `Source` and the downstream `StreamScan` of the new MV.
1531                                    dispatch_strategy: DispatchStrategy {
1532                                        r#type: DispatcherType::NoShuffle as _,
1533                                        dist_key_indices: vec![], // not used for `NoShuffle`
1534                                        output_mapping: Some(output_mapping),
1535                                    },
1536                                }
1537                            } else {
1538                                bail!(
1539                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1540                                    upstream_fragment.fragment_type_mask
1541                                )
1542                            }
1543                        }
1544                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1545                            bail!(
1546                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1547                                job_type
1548                            )
1549                        }
1550                    };
1551
1552                    // put the edge into the extra edges
1553                    extra_downstreams
1554                        .entry(upstream_root_fragment_id)
1555                        .or_insert_with(HashMap::new)
1556                        .try_insert(id, edge.clone())
1557                        .unwrap();
1558                    extra_upstreams
1559                        .entry(id)
1560                        .or_insert_with(HashMap::new)
1561                        .try_insert(upstream_root_fragment_id, edge)
1562                        .unwrap();
1563                }
1564            }
1565
1566            existing_fragments.extend(
1567                upstream_root_fragments
1568                    .into_values()
1569                    .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1570            );
1571
1572            existing_actor_location.extend(upstream_actor_location);
1573        }
1574
1575        if let Some(FragmentGraphDownstreamContext {
1576            original_root_fragment_id,
1577            downstream_fragments,
1578            downstream_actor_location,
1579        }) = downstream_ctx
1580        {
1581            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1582            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1583
1584            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1585            // existing materialized views.
1586            for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1587                let id = GlobalFragmentId::new(fragment.fragment_id);
1588
1589                // Similar to `extract_upstream_columns_except_cross_db_backfill`.
1590                let output_columns = {
1591                    let mut res = None;
1592
1593                    stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1594                        let columns = match node_body {
1595                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1596                            NodeBody::SourceBackfill(source_backfill) => {
1597                                // FIXME: only pass required columns instead of all columns here
1598                                source_backfill.column_descs()
1599                            }
1600                            _ => return,
1601                        };
1602                        res = Some(columns);
1603                    });
1604
1605                    res.context("failed to locate downstream scan")?
1606                };
1607
1608                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1609                let nodes = table_fragment.node.as_ref().unwrap();
1610
1611                let (dist_key_indices, output_mapping) = match job_type {
1612                    StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1613                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1614                        let all_columns = mview_node.column_descs();
1615                        let dist_key_indices = mview_node.dist_key_indices();
1616                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1617                            .ok_or_else(|| {
1618                                MetaError::invalid_parameter(
1619                                    "unable to drop the column due to \
1620                                     being referenced by downstream materialized views or sinks",
1621                                )
1622                            })?;
1623                        (dist_key_indices, output_mapping)
1624                    }
1625
1626                    StreamingJobType::Source => {
1627                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1628                        let all_columns = source_node.column_descs().unwrap();
1629                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1630                            .ok_or_else(|| {
1631                                MetaError::invalid_parameter(
1632                                    "unable to drop the column due to \
1633                                     being referenced by downstream materialized views or sinks",
1634                                )
1635                            })?;
1636                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1637                        (
1638                            vec![], // not used for `NoShuffle`
1639                            output_mapping,
1640                        )
1641                    }
1642
1643                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1644                };
1645
1646                let edge = StreamFragmentEdge {
1647                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1648                        original_upstream_fragment_id: original_table_fragment_id,
1649                        downstream_fragment_id: id,
1650                    }),
1651                    dispatch_strategy: DispatchStrategy {
1652                        r#type: *dispatcher_type as i32,
1653                        output_mapping: Some(output_mapping),
1654                        dist_key_indices,
1655                    },
1656                };
1657
1658                extra_downstreams
1659                    .entry(table_fragment_id)
1660                    .or_insert_with(HashMap::new)
1661                    .try_insert(id, edge.clone())
1662                    .unwrap();
1663                extra_upstreams
1664                    .entry(id)
1665                    .or_insert_with(HashMap::new)
1666                    .try_insert(table_fragment_id, edge)
1667                    .unwrap();
1668            }
1669
1670            existing_fragments.extend(
1671                downstream_fragments
1672                    .into_iter()
1673                    .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1674            );
1675
1676            existing_actor_location.extend(downstream_actor_location);
1677        }
1678
1679        Ok(Self {
1680            building_graph: graph,
1681            existing_fragments,
1682            existing_actor_location,
1683            extra_downstreams,
1684            extra_upstreams,
1685        })
1686    }
1687}
1688
1689/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1690fn gen_output_mapping(
1691    required_columns: &[PbColumnDesc],
1692    upstream_columns: &[PbColumnDesc],
1693) -> Option<DispatchOutputMapping> {
1694    let len = required_columns.len();
1695    let mut indices = vec![0; len];
1696    let mut types = None;
1697
1698    for (i, r) in required_columns.iter().enumerate() {
1699        let (ui, u) = upstream_columns
1700            .iter()
1701            .find_position(|&u| u.column_id == r.column_id)?;
1702        indices[i] = ui as u32;
1703
1704        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1705        // non-empty `types`.
1706        if u.column_type != r.column_type {
1707            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1708                upstream: u.column_type.clone(),
1709                downstream: r.column_type.clone(),
1710            };
1711        }
1712    }
1713
1714    // If there's no type change, indicate it by empty `types`.
1715    let types = types.unwrap_or(Vec::new());
1716
1717    Some(DispatchOutputMapping { indices, types })
1718}
1719
1720fn mv_on_mv_dispatch_strategy(
1721    uses_shuffled_backfill: bool,
1722    dist_key_indices: Vec<u32>,
1723    output_mapping: DispatchOutputMapping,
1724) -> DispatchStrategy {
1725    if uses_shuffled_backfill {
1726        if !dist_key_indices.is_empty() {
1727            DispatchStrategy {
1728                r#type: DispatcherType::Hash as _,
1729                dist_key_indices,
1730                output_mapping: Some(output_mapping),
1731            }
1732        } else {
1733            DispatchStrategy {
1734                r#type: DispatcherType::Simple as _,
1735                dist_key_indices: vec![], // empty for Simple
1736                output_mapping: Some(output_mapping),
1737            }
1738        }
1739    } else {
1740        DispatchStrategy {
1741            r#type: DispatcherType::NoShuffle as _,
1742            dist_key_indices: vec![], // not used for `NoShuffle`
1743            output_mapping: Some(output_mapping),
1744        }
1745    }
1746}
1747
1748impl CompleteStreamFragmentGraph {
1749    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1750    /// building graph.
1751    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1752        self.building_graph
1753            .fragments
1754            .keys()
1755            .chain(self.existing_fragments.keys())
1756            .copied()
1757    }
1758
1759    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1760    pub(super) fn all_edges(
1761        &self,
1762    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1763        self.building_graph
1764            .downstreams
1765            .iter()
1766            .chain(self.extra_downstreams.iter())
1767            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1768    }
1769
1770    /// Returns the distribution of the existing fragments.
1771    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1772        self.existing_fragments
1773            .iter()
1774            .map(|(&id, f)| {
1775                (
1776                    id,
1777                    Distribution::from_fragment(f, &self.existing_actor_location),
1778                )
1779            })
1780            .collect()
1781    }
1782
1783    /// Generate topological order of **all** fragments in this graph, including the ones that are
1784    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1785    /// not be done.
1786    ///
1787    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1788    /// `Sink` / `Materialize` in stream graph.
1789    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1790        let mut topo = Vec::new();
1791        let mut downstream_cnts = HashMap::new();
1792
1793        // Iterate all fragments.
1794        for fragment_id in self.all_fragment_ids() {
1795            // Count how many downstreams we have for a given fragment.
1796            let downstream_cnt = self.get_downstreams(fragment_id).count();
1797            if downstream_cnt == 0 {
1798                topo.push(fragment_id);
1799            } else {
1800                downstream_cnts.insert(fragment_id, downstream_cnt);
1801            }
1802        }
1803
1804        let mut i = 0;
1805        while let Some(&fragment_id) = topo.get(i) {
1806            i += 1;
1807            // Find if we can process more fragments.
1808            for (upstream_job_id, _) in self.get_upstreams(fragment_id) {
1809                let downstream_cnt = downstream_cnts.get_mut(&upstream_job_id).unwrap();
1810                *downstream_cnt -= 1;
1811                if *downstream_cnt == 0 {
1812                    downstream_cnts.remove(&upstream_job_id);
1813                    topo.push(upstream_job_id);
1814                }
1815            }
1816        }
1817
1818        if !downstream_cnts.is_empty() {
1819            // There are fragments that are not processed yet.
1820            bail!("graph is not a DAG");
1821        }
1822
1823        Ok(topo)
1824    }
1825
1826    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1827    /// to build actors on the compute nodes and persist into meta store.
1828    pub(super) fn seal_fragment(
1829        &self,
1830        id: GlobalFragmentId,
1831        actors: Vec<StreamActor>,
1832        distribution: Distribution,
1833        stream_node: StreamNode,
1834    ) -> Fragment {
1835        let building_fragment = self.get_fragment(id).into_building().unwrap();
1836        let internal_tables = building_fragment.extract_internal_tables();
1837        let BuildingFragment {
1838            inner,
1839            job_id,
1840            upstream_job_columns: _,
1841        } = building_fragment;
1842
1843        let distribution_type = distribution.to_distribution_type();
1844        let vnode_count = distribution.vnode_count();
1845
1846        let materialized_fragment_id =
1847            if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1848                job_id.map(JobId::as_mv_table_id)
1849            } else {
1850                None
1851            };
1852
1853        let vector_index_fragment_id =
1854            if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1855                job_id.map(JobId::as_mv_table_id)
1856            } else {
1857                None
1858            };
1859
1860        let state_table_ids = internal_tables
1861            .iter()
1862            .map(|t| t.id)
1863            .chain(materialized_fragment_id)
1864            .chain(vector_index_fragment_id)
1865            .collect();
1866
1867        Fragment {
1868            fragment_id: inner.fragment_id,
1869            fragment_type_mask: inner.fragment_type_mask.into(),
1870            distribution_type,
1871            actors,
1872            state_table_ids,
1873            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1874            nodes: stream_node,
1875        }
1876    }
1877
1878    /// Get a fragment from the complete graph, which can be either a building fragment or an
1879    /// existing fragment.
1880    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1881        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1882            EitherFragment::Existing(fragment.clone())
1883        } else {
1884            EitherFragment::Building(
1885                self.building_graph
1886                    .fragments
1887                    .get(&fragment_id)
1888                    .unwrap()
1889                    .clone(),
1890            )
1891        }
1892    }
1893
1894    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1895    /// graph.
1896    pub(super) fn get_downstreams(
1897        &self,
1898        fragment_id: GlobalFragmentId,
1899    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1900        self.building_graph
1901            .get_downstreams(fragment_id)
1902            .iter()
1903            .chain(
1904                self.extra_downstreams
1905                    .get(&fragment_id)
1906                    .into_iter()
1907                    .flatten(),
1908            )
1909            .map(|(&id, edge)| (id, edge))
1910    }
1911
1912    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1913    /// graph.
1914    pub(super) fn get_upstreams(
1915        &self,
1916        fragment_id: GlobalFragmentId,
1917    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1918        self.building_graph
1919            .get_upstreams(fragment_id)
1920            .iter()
1921            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1922            .map(|(&id, edge)| (id, edge))
1923    }
1924
1925    /// Returns all building fragments in the graph.
1926    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1927        &self.building_graph.fragments
1928    }
1929
1930    /// Returns all building fragments in the graph, mutable.
1931    pub(super) fn building_fragments_mut(
1932        &mut self,
1933    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1934        &mut self.building_graph.fragments
1935    }
1936
1937    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1938    pub(super) fn max_parallelism(&self) -> usize {
1939        self.building_graph.max_parallelism()
1940    }
1941}