risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26    CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27    generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::id::JobId;
31use risingwave_common::util::iter_util::ZipEqFast;
32use risingwave_common::util::stream_graph_visitor::{
33    self, visit_stream_node_cont, visit_stream_node_cont_mut,
34};
35use risingwave_connector::sink::catalog::SinkType;
36use risingwave_meta_model::WorkerId;
37use risingwave_pb::catalog::{PbSink, PbTable, Table};
38use risingwave_pb::ddl_service::TableJobType;
39use risingwave_pb::id::StreamNodeLocalOperatorId;
40use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
41use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
42use risingwave_pb::stream_plan::stream_fragment_graph::{
43    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
44};
45use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
46use risingwave_pb::stream_plan::{
47    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
48    PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
49    StreamScanType,
50};
51
52use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
53use crate::controller::id::IdGeneratorManager;
54use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
55use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
56use crate::stream::stream_graph::id::{
57    GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
58};
59use crate::stream::stream_graph::schedule::Distribution;
60use crate::{MetaError, MetaResult};
61
62/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
63/// several additional helper fields.
64#[derive(Debug, Clone)]
65pub(super) struct BuildingFragment {
66    /// The fragment structure from the frontend, with the global fragment ID.
67    inner: StreamFragment,
68
69    /// The ID of the job if it contains the streaming job node.
70    job_id: Option<JobId>,
71
72    /// The required column IDs of each upstream table.
73    /// Will be converted to indices when building the edge connected to the upstream.
74    ///
75    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
76    upstream_job_columns: HashMap<JobId, Vec<PbColumnDesc>>,
77}
78
79impl BuildingFragment {
80    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
81    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
82    fn new(
83        id: GlobalFragmentId,
84        fragment: StreamFragment,
85        job: &StreamingJob,
86        table_id_gen: GlobalTableIdGen,
87    ) -> Self {
88        let mut fragment = StreamFragment {
89            fragment_id: id.as_global_id(),
90            ..fragment
91        };
92
93        // Fill the information of the internal tables in the fragment.
94        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
95
96        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
97        let upstream_job_columns =
98            Self::extract_upstream_columns_except_cross_db_backfill(&fragment);
99
100        Self {
101            inner: fragment,
102            job_id,
103            upstream_job_columns,
104        }
105    }
106
107    /// Extract the internal tables from the fragment.
108    fn extract_internal_tables(&self) -> Vec<Table> {
109        let mut fragment = self.inner.clone();
110        let mut tables = Vec::new();
111        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
112            tables.push(table.clone());
113        });
114        tables
115    }
116
117    /// Fill the information with the internal tables in the fragment.
118    fn fill_internal_tables(
119        fragment: &mut StreamFragment,
120        job: &StreamingJob,
121        table_id_gen: GlobalTableIdGen,
122    ) {
123        let fragment_id = fragment.fragment_id;
124        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
125            table.id = table_id_gen
126                .to_global_id(table.id.as_raw_id())
127                .as_global_id();
128            table.schema_id = job.schema_id();
129            table.database_id = job.database_id();
130            table.name = generate_internal_table_name_with_type(
131                &job.name(),
132                fragment_id,
133                table.id,
134                table_type_name,
135            );
136            table.fragment_id = fragment_id;
137            table.owner = job.owner();
138            table.job_id = Some(job.id());
139        });
140    }
141
142    /// Fill the information with the job in the fragment.
143    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
144        let job_id = job.id();
145        let fragment_id = fragment.fragment_id;
146        let mut has_job = false;
147
148        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
149            NodeBody::Materialize(materialize_node) => {
150                materialize_node.table_id = job_id.as_mv_table_id();
151
152                // Fill the table field of `MaterializeNode` from the job.
153                let table = materialize_node.table.insert(job.table().unwrap().clone());
154                table.fragment_id = fragment_id; // this will later be synced back to `job.table` with `set_info_from_graph`
155                // In production, do not include full definition in the table in plan node.
156                if cfg!(not(debug_assertions)) {
157                    table.definition = job.name();
158                }
159
160                has_job = true;
161            }
162            NodeBody::Sink(sink_node) => {
163                sink_node.sink_desc.as_mut().unwrap().id = job_id.as_sink_id();
164
165                has_job = true;
166            }
167            NodeBody::Dml(dml_node) => {
168                dml_node.table_id = job_id.as_mv_table_id();
169                dml_node.table_version_id = job.table_version_id().unwrap();
170            }
171            NodeBody::StreamFsFetch(fs_fetch_node) => {
172                if let StreamingJob::Table(table_source, _, _) = job
173                    && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
174                    && let Some(source) = table_source
175                {
176                    node_inner.source_id = source.id;
177                    if let Some(id) = source.optional_associated_table_id {
178                        node_inner.associated_table_id = Some(id.into());
179                    }
180                }
181            }
182            NodeBody::Source(source_node) => {
183                match job {
184                    // Note: For table without connector, it has a dummy Source node.
185                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
186                    StreamingJob::Table(source, _table, _table_job_type) => {
187                        if let Some(source_inner) = source_node.source_inner.as_mut()
188                            && let Some(source) = source
189                        {
190                            debug_assert_ne!(source.id, job_id.as_raw_id());
191                            source_inner.source_id = source.id;
192                            if let Some(id) = source.optional_associated_table_id {
193                                source_inner.associated_table_id = Some(id.into());
194                            }
195                        }
196                    }
197                    StreamingJob::Source(source) => {
198                        has_job = true;
199                        if let Some(source_inner) = source_node.source_inner.as_mut() {
200                            debug_assert_eq!(source.id, job_id.as_raw_id());
201                            source_inner.source_id = source.id;
202                            if let Some(id) = source.optional_associated_table_id {
203                                source_inner.associated_table_id = Some(id.into());
204                            }
205                        }
206                    }
207                    // For other job types, no need to fill the source id, since it refers to an existing source.
208                    _ => {}
209                }
210            }
211            NodeBody::StreamCdcScan(node) => {
212                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
213                    table_desc.table_id = job_id.as_mv_table_id();
214                }
215            }
216            NodeBody::VectorIndexWrite(node) => {
217                let table = node.table.as_mut().unwrap();
218                table.id = job_id.as_mv_table_id();
219                table.database_id = job.database_id();
220                table.schema_id = job.schema_id();
221                table.fragment_id = fragment_id;
222                #[cfg(not(debug_assertions))]
223                {
224                    table.definition = job.name();
225                }
226
227                has_job = true;
228            }
229            _ => {}
230        });
231
232        has_job
233    }
234
235    /// Extract the required columns of each upstream table except for cross-db backfill.
236    fn extract_upstream_columns_except_cross_db_backfill(
237        fragment: &StreamFragment,
238    ) -> HashMap<JobId, Vec<PbColumnDesc>> {
239        let mut table_columns = HashMap::new();
240
241        stream_graph_visitor::visit_fragment(fragment, |node_body| {
242            let (table_id, column_ids) = match node_body {
243                NodeBody::StreamScan(stream_scan) => {
244                    if stream_scan.get_stream_scan_type().unwrap()
245                        == StreamScanType::CrossDbSnapshotBackfill
246                    {
247                        return;
248                    }
249                    (
250                        stream_scan.table_id.as_job_id(),
251                        stream_scan.upstream_columns(),
252                    )
253                }
254                NodeBody::CdcFilter(cdc_filter) => (
255                    cdc_filter.upstream_source_id.as_share_source_job_id(),
256                    vec![],
257                ),
258                NodeBody::SourceBackfill(backfill) => (
259                    backfill.upstream_source_id.as_share_source_job_id(),
260                    // FIXME: only pass required columns instead of all columns here
261                    backfill.column_descs(),
262                ),
263                _ => return,
264            };
265            table_columns
266                .try_insert(table_id, column_ids)
267                .expect("currently there should be no two same upstream tables in a fragment");
268        });
269
270        table_columns
271    }
272
273    pub fn has_shuffled_backfill(&self) -> bool {
274        let stream_node = match self.inner.node.as_ref() {
275            Some(node) => node,
276            _ => return false,
277        };
278        let mut has_shuffled_backfill = false;
279        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
280        visit_stream_node_cont(stream_node, |node| {
281            let is_shuffled_backfill = if let Some(node) = &node.node_body
282                && let Some(node) = node.as_stream_scan()
283            {
284                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
285                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
286            } else {
287                false
288            };
289            if is_shuffled_backfill {
290                *has_shuffled_backfill_mut_ref = true;
291                false
292            } else {
293                true
294            }
295        });
296        has_shuffled_backfill
297    }
298}
299
300impl Deref for BuildingFragment {
301    type Target = StreamFragment;
302
303    fn deref(&self) -> &Self::Target {
304        &self.inner
305    }
306}
307
308impl DerefMut for BuildingFragment {
309    fn deref_mut(&mut self) -> &mut Self::Target {
310        &mut self.inner
311    }
312}
313
314/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
315/// different variants.
316#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
317pub(super) enum EdgeId {
318    /// The edge between two building (internal) fragments.
319    Internal {
320        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
321        /// See [`StreamFragmentEdgeProto`].
322        link_id: u64,
323    },
324
325    /// The edge between an upstream external fragment and downstream building fragment. Used for
326    /// MV on MV.
327    UpstreamExternal {
328        /// The ID of the upstream table or materialized view.
329        upstream_job_id: JobId,
330        /// The ID of the downstream fragment.
331        downstream_fragment_id: GlobalFragmentId,
332    },
333
334    /// The edge between an upstream building fragment and downstream external fragment. Used for
335    /// schema change (replace table plan).
336    DownstreamExternal(DownstreamExternalEdgeId),
337}
338
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
340pub(super) struct DownstreamExternalEdgeId {
341    /// The ID of the original upstream fragment (`Materialize`).
342    pub(super) original_upstream_fragment_id: GlobalFragmentId,
343    /// The ID of the downstream fragment.
344    pub(super) downstream_fragment_id: GlobalFragmentId,
345}
346
347/// The edge in the fragment graph.
348///
349/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
350#[derive(Debug, Clone)]
351pub(super) struct StreamFragmentEdge {
352    /// The ID of the edge.
353    pub id: EdgeId,
354
355    /// The strategy used for dispatching the data.
356    pub dispatch_strategy: DispatchStrategy,
357}
358
359impl StreamFragmentEdge {
360    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
361        Self {
362            // By creating an edge from the protobuf, we know that the edge is from the frontend and
363            // is internal.
364            id: EdgeId::Internal {
365                link_id: edge.link_id,
366            },
367            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
368        }
369    }
370}
371
372fn clone_fragment(
373    fragment: &Fragment,
374    id_generator_manager: &IdGeneratorManager,
375    actor_id_counter: &AtomicU32,
376) -> Fragment {
377    let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
378        .to_global_id(0)
379        .as_global_id();
380    let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
381    Fragment {
382        fragment_id,
383        fragment_type_mask: fragment.fragment_type_mask,
384        distribution_type: fragment.distribution_type,
385        actors: fragment
386            .actors
387            .iter()
388            .enumerate()
389            .map(|(i, actor)| StreamActor {
390                actor_id: actor_id_gen.to_global_id(i as _).as_global_id(),
391                fragment_id,
392                vnode_bitmap: actor.vnode_bitmap.clone(),
393                mview_definition: actor.mview_definition.clone(),
394                expr_context: actor.expr_context.clone(),
395                config_override: actor.config_override.clone(),
396            })
397            .collect(),
398        state_table_ids: fragment.state_table_ids.clone(),
399        maybe_vnode_count: fragment.maybe_vnode_count,
400        nodes: fragment.nodes.clone(),
401    }
402}
403
404pub fn check_sink_fragments_support_refresh_schema(
405    fragments: &BTreeMap<FragmentId, Fragment>,
406) -> MetaResult<()> {
407    if fragments.len() != 1 {
408        return Err(anyhow!(
409            "sink with auto schema change should have only 1 fragment, but got {:?}",
410            fragments.len()
411        )
412        .into());
413    }
414    let (_, fragment) = fragments.first_key_value().expect("non-empty");
415    let sink_node = &fragment.nodes;
416    let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
417        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
418    };
419    let [stream_scan_node] = sink_node.input.as_slice() else {
420        panic!("Sink has more than 1 input: {:?}", sink_node.input);
421    };
422    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
423        return Err(anyhow!(
424            "expect PbNodeBody::StreamScan but got: {:?}",
425            stream_scan_node.node_body
426        )
427        .into());
428    };
429    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
430    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
431        return Err(anyhow!(
432            "unsupported stream_scan_type for auto refresh schema: {:?}",
433            stream_scan_type
434        )
435        .into());
436    }
437    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
438        panic!(
439            "the number of StreamScan inputs is not 2: {:?}",
440            stream_scan_node.input
441        );
442    };
443    let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
444        return Err(anyhow!(
445            "expect PbNodeBody::Merge but got: {:?}",
446            merge_node.node_body
447        )
448        .into());
449    };
450    Ok(())
451}
452
453pub fn rewrite_refresh_schema_sink_fragment(
454    original_sink_fragment: &Fragment,
455    sink: &PbSink,
456    newly_added_columns: &[ColumnCatalog],
457    upstream_table: &PbTable,
458    upstream_table_fragment_id: FragmentId,
459    id_generator_manager: &IdGeneratorManager,
460    actor_id_counter: &AtomicU32,
461) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
462    let mut new_sink_columns = sink.columns.clone();
463    fn extend_sink_columns(
464        sink_columns: &mut Vec<PbColumnCatalog>,
465        new_columns: &[ColumnCatalog],
466        get_column_name: impl Fn(&String) -> String,
467    ) {
468        let next_column_id = sink_columns
469            .iter()
470            .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
471            .max()
472            .unwrap_or(1);
473        sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
474            let mut col = col.to_protobuf();
475            let column_desc = col.column_desc.as_mut().unwrap();
476            column_desc.column_id = next_column_id + (i as i32);
477            column_desc.name = get_column_name(&column_desc.name);
478            col
479        }));
480    }
481    extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
482        name.clone()
483    });
484
485    let mut new_sink_fragment = clone_fragment(
486        original_sink_fragment,
487        id_generator_manager,
488        actor_id_counter,
489    );
490    let sink_node = &mut new_sink_fragment.nodes;
491    let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
492        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
493    };
494    let [stream_scan_node] = sink_node.input.as_mut_slice() else {
495        panic!("Sink has more than 1 input: {:?}", sink_node.input);
496    };
497    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
498        return Err(anyhow!(
499            "expect PbNodeBody::StreamScan but got: {:?}",
500            stream_scan_node.node_body
501        )
502        .into());
503    };
504    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
505        panic!(
506            "the number of StreamScan inputs is not 2: {:?}",
507            stream_scan_node.input
508        );
509    };
510    let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
511        return Err(anyhow!(
512            "expect PbNodeBody::Merge but got: {:?}",
513            merge_node.node_body
514        )
515        .into());
516    };
517    // update sink_node
518    // following logic in <StreamSink as Explain>::distill
519    sink_node.identity = {
520        let sink_type = SinkType::from_proto(sink.sink_type());
521        let sink_type_str = sink_type.type_str();
522        let column_names = new_sink_columns
523            .iter()
524            .map(|col| {
525                ColumnCatalog::from(col.clone())
526                    .name_with_hidden()
527                    .to_string()
528            })
529            .join(", ");
530        let downstream_pk = if !sink_type.is_append_only() {
531            let downstream_pk = sink
532                .downstream_pk
533                .iter()
534                .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
535                .collect_vec();
536            format!(", downstream_pk: {downstream_pk:?}")
537        } else {
538            "".to_owned()
539        };
540        format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
541    };
542    sink_node
543        .fields
544        .extend(newly_added_columns.iter().map(|col| {
545            Field::new(
546                format!("{}.{}", upstream_table.name, col.column_desc.name),
547                col.data_type().clone(),
548            )
549            .to_prost()
550        }));
551
552    let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
553        extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
554            format!("{}_{}", upstream_table.name, name)
555        });
556        Some(log_store_table.clone())
557    } else {
558        None
559    };
560    sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
561
562    // update stream scan node
563    stream_scan_node
564        .fields
565        .extend(newly_added_columns.iter().map(|col| {
566            Field::new(
567                format!("{}.{}", upstream_table.name, col.column_desc.name),
568                col.data_type().clone(),
569            )
570            .to_prost()
571        }));
572    // following logic in <StreamTableScan as Explain>::distill
573    stream_scan_node.identity = {
574        let columns = stream_scan_node
575            .fields
576            .iter()
577            .map(|col| &col.name)
578            .join(", ");
579        format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
580    };
581
582    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
583    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
584        return Err(anyhow!(
585            "unsupported stream_scan_type for auto refresh schema: {:?}",
586            stream_scan_type
587        )
588        .into());
589    }
590    scan.arrangement_table = Some(upstream_table.clone());
591    scan.output_indices.extend(
592        (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
593    );
594    scan.upstream_column_ids.extend(
595        newly_added_columns
596            .iter()
597            .map(|col| col.column_id().get_id()),
598    );
599    let table_desc = scan.table_desc.as_mut().unwrap();
600    table_desc
601        .value_indices
602        .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
603    table_desc.columns.extend(
604        newly_added_columns
605            .iter()
606            .map(|col| col.column_desc.to_protobuf()),
607    );
608
609    // update merge node
610    merge_node.fields = scan
611        .upstream_column_ids
612        .iter()
613        .map(|&column_id| {
614            let col = upstream_table
615                .columns
616                .iter()
617                .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
618                .unwrap();
619            let col_desc = col.column_desc.as_ref().unwrap();
620            Field::new(
621                col_desc.name.clone(),
622                col_desc.column_type.as_ref().unwrap().into(),
623            )
624            .to_prost()
625        })
626        .collect();
627    merge.upstream_fragment_id = upstream_table_fragment_id;
628    Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
629}
630
631/// Adjacency list (G) of backfill orders.
632/// `G[10] -> [1, 2, 11]`
633/// means for the backfill node in `fragment 10`
634/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
635pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
636
637/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
638/// from the frontend.
639///
640/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
641/// that contains the additional information of pre-existing
642/// fragments, which are connected to the graph's top-most or bottom-most fragments.
643#[derive(Default, Debug)]
644pub struct StreamFragmentGraph {
645    /// stores all the fragments in the graph.
646    pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
647
648    /// stores edges between fragments: upstream => downstream.
649    pub(super) downstreams:
650        HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
651
652    /// stores edges between fragments: downstream -> upstream.
653    pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
654
655    /// Dependent relations of this job.
656    dependent_table_ids: HashSet<TableId>,
657
658    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
659    /// variable. If not specified, all active worker slots will be used.
660    specified_parallelism: Option<NonZeroUsize>,
661    /// The parallelism to use during backfill, specified by the `STREAMING_PARALLELISM_FOR_BACKFILL`
662    /// session variable. If not specified, falls back to `specified_parallelism`.
663    specified_backfill_parallelism: Option<NonZeroUsize>,
664
665    /// Specified max parallelism, i.e., expected vnode count for the graph.
666    ///
667    /// The scheduler on the meta service will use this as a hint to decide the vnode count
668    /// for each fragment.
669    ///
670    /// Note that the actual vnode count may be different from this value.
671    /// For example, a no-shuffle exchange between current fragment graph and an existing
672    /// upstream fragment graph requires two fragments to be in the same distribution,
673    /// thus the same vnode count.
674    max_parallelism: usize,
675
676    /// The backfill ordering strategy of the graph.
677    backfill_order: BackfillOrder,
678}
679
680impl StreamFragmentGraph {
681    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
682    /// global IDs correctly filled.
683    pub fn new(
684        env: &MetaSrvEnv,
685        proto: StreamFragmentGraphProto,
686        job: &StreamingJob,
687    ) -> MetaResult<Self> {
688        let fragment_id_gen =
689            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
690        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
691        // with `refill_internal_table_ids` later.
692        // TODO: refactor the code to remove this step.
693        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
694
695        // Create nodes.
696        let fragments: HashMap<_, _> = proto
697            .fragments
698            .into_iter()
699            .map(|(id, fragment)| {
700                let id = fragment_id_gen.to_global_id(id.as_raw_id());
701                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
702                (id, fragment)
703            })
704            .collect();
705
706        assert_eq!(
707            fragments
708                .values()
709                .map(|f| f.extract_internal_tables().len() as u32)
710                .sum::<u32>(),
711            proto.table_ids_cnt
712        );
713
714        // Create edges.
715        let mut downstreams = HashMap::new();
716        let mut upstreams = HashMap::new();
717
718        for edge in proto.edges {
719            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id.as_raw_id());
720            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id.as_raw_id());
721            let edge = StreamFragmentEdge::from_protobuf(&edge);
722
723            upstreams
724                .entry(downstream_id)
725                .or_insert_with(HashMap::new)
726                .try_insert(upstream_id, edge.clone())
727                .unwrap();
728            downstreams
729                .entry(upstream_id)
730                .or_insert_with(HashMap::new)
731                .try_insert(downstream_id, edge)
732                .unwrap();
733        }
734
735        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
736        // frontend), instead of visiting the graph ourselves.
737        let dependent_table_ids = proto.dependent_table_ids.iter().copied().collect();
738
739        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
740            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
741        } else {
742            None
743        };
744        let specified_backfill_parallelism =
745            if let Some(Parallelism { parallelism }) = proto.backfill_parallelism {
746                Some(
747                    NonZeroUsize::new(parallelism as usize)
748                        .context("backfill parallelism should not be 0")?,
749                )
750            } else {
751                None
752            };
753
754        let max_parallelism = proto.max_parallelism as usize;
755        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
756            order: Default::default(),
757        });
758
759        Ok(Self {
760            fragments,
761            downstreams,
762            upstreams,
763            dependent_table_ids,
764            specified_parallelism,
765            specified_backfill_parallelism,
766            max_parallelism,
767            backfill_order,
768        })
769    }
770
771    /// Retrieve the **incomplete** internal tables map of the whole graph.
772    ///
773    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
774    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
775    /// Be careful when using the returned values.
776    pub fn incomplete_internal_tables(&self) -> BTreeMap<TableId, Table> {
777        let mut tables = BTreeMap::new();
778        for fragment in self.fragments.values() {
779            for table in fragment.extract_internal_tables() {
780                let table_id = table.id;
781                tables
782                    .try_insert(table_id, table)
783                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
784            }
785        }
786        tables
787    }
788
789    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
790    /// `create_internal_table_catalog`.
791    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<TableId, TableId>) {
792        for fragment in self.fragments.values_mut() {
793            stream_graph_visitor::visit_internal_tables(
794                &mut fragment.inner,
795                |table, _table_type_name| {
796                    let target = table_id_map.get(&table.id).cloned().unwrap();
797                    table.id = target;
798                },
799            );
800        }
801    }
802
803    /// Use a trivial algorithm to match the internal tables of the new graph for
804    /// `ALTER TABLE` or `ALTER SOURCE`.
805    pub fn fit_internal_tables_trivial(
806        &mut self,
807        mut old_internal_tables: Vec<Table>,
808    ) -> MetaResult<()> {
809        let mut new_internal_table_ids = Vec::new();
810        for fragment in self.fragments.values() {
811            for table in &fragment.extract_internal_tables() {
812                new_internal_table_ids.push(table.id);
813            }
814        }
815
816        if new_internal_table_ids.len() != old_internal_tables.len() {
817            bail!(
818                "Different number of internal tables. New: {}, Old: {}",
819                new_internal_table_ids.len(),
820                old_internal_tables.len()
821            );
822        }
823        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
824        new_internal_table_ids.sort();
825
826        let internal_table_id_map = new_internal_table_ids
827            .into_iter()
828            .zip_eq_fast(old_internal_tables.into_iter())
829            .collect::<HashMap<_, _>>();
830
831        // TODO(alter-mv): unify this with `fit_internal_table_ids_with_mapping` after we
832        // confirm the behavior is the same.
833        for fragment in self.fragments.values_mut() {
834            stream_graph_visitor::visit_internal_tables(
835                &mut fragment.inner,
836                |table, _table_type_name| {
837                    // XXX: this replaces the entire table, instead of just the id!
838                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
839                    *table = target;
840                },
841            );
842        }
843
844        Ok(())
845    }
846
847    /// Fit the internal tables' `table_id`s according to the given mapping.
848    pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<TableId, Table>) {
849        for fragment in self.fragments.values_mut() {
850            stream_graph_visitor::visit_internal_tables(
851                &mut fragment.inner,
852                |table, _table_type_name| {
853                    let target = matches.remove(&table.id).unwrap_or_else(|| {
854                        panic!("no matching table for table {}({})", table.id, table.name)
855                    });
856                    table.id = target.id;
857                    table.maybe_vnode_count = target.maybe_vnode_count;
858                },
859            );
860        }
861    }
862
863    pub fn fit_snapshot_backfill_epochs(
864        &mut self,
865        mut snapshot_backfill_epochs: HashMap<StreamNodeLocalOperatorId, u64>,
866    ) {
867        for fragment in self.fragments.values_mut() {
868            visit_stream_node_cont_mut(fragment.node.as_mut().unwrap(), |node| {
869                if let PbNodeBody::StreamScan(scan) = node.node_body.as_mut().unwrap()
870                    && let StreamScanType::SnapshotBackfill
871                    | StreamScanType::CrossDbSnapshotBackfill = scan.stream_scan_type()
872                {
873                    let Some(epoch) = snapshot_backfill_epochs.remove(&node.operator_id) else {
874                        panic!("no snapshot epoch found for node {:?}", node)
875                    };
876                    scan.snapshot_backfill_epoch = Some(epoch);
877                }
878                true
879            })
880        }
881    }
882
883    /// Returns the fragment id where the streaming job node located.
884    pub fn table_fragment_id(&self) -> FragmentId {
885        self.fragments
886            .values()
887            .filter(|b| b.job_id.is_some())
888            .map(|b| b.fragment_id)
889            .exactly_one()
890            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
891    }
892
893    /// Returns the fragment id where the table dml is received.
894    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
895        self.fragments
896            .values()
897            .filter(|b| {
898                FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
899            })
900            .map(|b| b.fragment_id)
901            .at_most_one()
902            .expect("require at most 1 dml node when creating the streaming job")
903    }
904
905    /// Get the dependent streaming job ids of this job.
906    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
907        &self.dependent_table_ids
908    }
909
910    /// Get the parallelism of the job, if specified by the user.
911    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
912        self.specified_parallelism
913    }
914
915    /// Get the backfill parallelism of the job, if specified by the user.
916    pub fn specified_backfill_parallelism(&self) -> Option<NonZeroUsize> {
917        self.specified_backfill_parallelism
918    }
919
920    /// Get the expected vnode count of the graph. See documentation of the field for more details.
921    pub fn max_parallelism(&self) -> usize {
922        self.max_parallelism
923    }
924
925    /// Get downstreams of a fragment.
926    fn get_downstreams(
927        &self,
928        fragment_id: GlobalFragmentId,
929    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
930        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
931    }
932
933    /// Get upstreams of a fragment.
934    fn get_upstreams(
935        &self,
936        fragment_id: GlobalFragmentId,
937    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
938        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
939    }
940
941    pub fn collect_snapshot_backfill_info(
942        &self,
943    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
944        Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
945            (
946                fragment.node.as_ref().unwrap(),
947                fragment.fragment_type_mask.into(),
948            )
949        }))
950    }
951
952    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
953    pub fn collect_snapshot_backfill_info_impl(
954        fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
955    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
956        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
957        let mut cross_db_info = SnapshotBackfillInfo {
958            upstream_mv_table_id_to_backfill_epoch: Default::default(),
959        };
960        let mut result = Ok(());
961        for (node, fragment_type_mask) in fragments {
962            visit_stream_node_cont(node, |node| {
963                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
964                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
965                        .expect("invalid stream_scan_type");
966                    let is_snapshot_backfill = match stream_scan_type {
967                        StreamScanType::SnapshotBackfill => {
968                            assert!(
969                                fragment_type_mask
970                                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
971                            );
972                            true
973                        }
974                        StreamScanType::CrossDbSnapshotBackfill => {
975                            assert!(
976                                fragment_type_mask
977                                    .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
978                            );
979                            cross_db_info
980                                .upstream_mv_table_id_to_backfill_epoch
981                                .insert(stream_scan.table_id, stream_scan.snapshot_backfill_epoch);
982
983                            return true;
984                        }
985                        _ => false,
986                    };
987
988                    match &mut prev_stream_scan {
989                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
990                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
991                                (Some(prev_snapshot_backfill_info), true) => {
992                                    prev_snapshot_backfill_info
993                                        .upstream_mv_table_id_to_backfill_epoch
994                                        .insert(
995                                            stream_scan.table_id,
996                                            stream_scan.snapshot_backfill_epoch,
997                                        );
998                                    true
999                                }
1000                                (None, false) => true,
1001                                (_, _) => {
1002                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
1003                                    false
1004                                }
1005                            }
1006                        }
1007                        None => {
1008                            prev_stream_scan = Some((
1009                                if is_snapshot_backfill {
1010                                    Some(SnapshotBackfillInfo {
1011                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
1012                                            [(
1013                                                stream_scan.table_id,
1014                                                stream_scan.snapshot_backfill_epoch,
1015                                            )],
1016                                        ),
1017                                    })
1018                                } else {
1019                                    None
1020                                },
1021                                *stream_scan.clone(),
1022                            ));
1023                            true
1024                        }
1025                    }
1026                } else {
1027                    true
1028                }
1029            })
1030        }
1031        result.map(|_| {
1032            (
1033                prev_stream_scan
1034                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
1035                    .unwrap_or(None),
1036                cross_db_info,
1037            )
1038        })
1039    }
1040
1041    /// Collect the mapping from table / `source_id` -> `fragment_id`
1042    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
1043        let mut mapping = HashMap::new();
1044        for (fragment_id, fragment) in &self.fragments {
1045            let fragment_id = fragment_id.as_global_id();
1046            let fragment_mask = fragment.fragment_type_mask;
1047            let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
1048            let has_some_scan = candidates
1049                .into_iter()
1050                .any(|flag| (fragment_mask & flag as u32) > 0);
1051            if has_some_scan {
1052                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1053                    match node.node_body.as_ref() {
1054                        Some(NodeBody::StreamScan(stream_scan)) => {
1055                            let table_id = stream_scan.table_id;
1056                            let fragments: &mut Vec<_> =
1057                                mapping.entry(table_id.as_raw_id()).or_default();
1058                            fragments.push(fragment_id);
1059                            // each fragment should have only 1 scan node.
1060                            false
1061                        }
1062                        Some(NodeBody::SourceBackfill(source_backfill)) => {
1063                            let source_id = source_backfill.upstream_source_id;
1064                            let fragments: &mut Vec<_> =
1065                                mapping.entry(source_id.as_raw_id()).or_default();
1066                            fragments.push(fragment_id);
1067                            // each fragment should have only 1 scan node.
1068                            false
1069                        }
1070                        _ => true,
1071                    }
1072                })
1073            }
1074        }
1075        mapping
1076    }
1077
1078    /// Initially the mapping that comes from frontend is between `table_ids`.
1079    /// We should remap it to fragment level, since we track progress by actor, and we can get
1080    /// a fragment <-> actor mapping
1081    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1082        let mapping = self.collect_backfill_mapping();
1083        let mut fragment_ordering: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1084
1085        // 1. Add backfill dependencies
1086        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1087            let fragment_ids = mapping.get(rel_id).unwrap();
1088            for fragment_id in fragment_ids {
1089                let downstream_fragment_ids = downstream_rel_ids
1090                    .data
1091                    .iter()
1092                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1093                    .copied()
1094                    .collect();
1095                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1096            }
1097        }
1098
1099        // If no backfill order is specified, we still need to ensure that all backfill fragments
1100        // run before LocalityProvider fragments.
1101        if fragment_ordering.is_empty() {
1102            for value in mapping.values() {
1103                for &fragment_id in value {
1104                    fragment_ordering.entry(fragment_id).or_default();
1105                }
1106            }
1107        }
1108
1109        // 2. Add dependencies: all backfill fragments should run before LocalityProvider fragments
1110        let locality_provider_dependencies = self.find_locality_provider_dependencies();
1111
1112        let backfill_fragments: HashSet<FragmentId> = mapping.values().flatten().copied().collect();
1113
1114        // Calculate LocalityProvider root fragments (zero indegree)
1115        // Root fragments are those that appear as keys but never appear as downstream dependencies
1116        let all_locality_provider_fragments: HashSet<FragmentId> =
1117            locality_provider_dependencies.keys().copied().collect();
1118        let downstream_locality_provider_fragments: HashSet<FragmentId> =
1119            locality_provider_dependencies
1120                .values()
1121                .flatten()
1122                .copied()
1123                .collect();
1124        let locality_provider_root_fragments: Vec<FragmentId> = all_locality_provider_fragments
1125            .difference(&downstream_locality_provider_fragments)
1126            .copied()
1127            .collect();
1128
1129        // For each backfill fragment, add only the root LocalityProvider fragments as dependents
1130        // This ensures backfill completes before any LocalityProvider starts, while minimizing dependencies
1131        for &backfill_fragment_id in &backfill_fragments {
1132            fragment_ordering
1133                .entry(backfill_fragment_id)
1134                .or_default()
1135                .extend(locality_provider_root_fragments.iter().copied());
1136        }
1137
1138        // 3. Add LocalityProvider internal dependencies
1139        for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1140            fragment_ordering
1141                .entry(fragment_id)
1142                .or_default()
1143                .extend(downstream_fragments);
1144        }
1145
1146        // Deduplicate downstream entries per fragment; overlaps are common when the same fragment
1147        // is reached via multiple paths (e.g., with StreamShare) and would otherwise appear
1148        // multiple times.
1149        for downstream in fragment_ordering.values_mut() {
1150            let mut seen = HashSet::new();
1151            downstream.retain(|id| seen.insert(*id));
1152        }
1153
1154        fragment_ordering
1155    }
1156
1157    pub fn find_locality_provider_fragment_state_table_mapping(
1158        &self,
1159    ) -> HashMap<FragmentId, Vec<TableId>> {
1160        let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1161
1162        for (fragment_id, fragment) in &self.fragments {
1163            let fragment_id = fragment_id.as_global_id();
1164
1165            // Check if this fragment contains a LocalityProvider node
1166            if let Some(node) = fragment.node.as_ref() {
1167                let mut state_table_ids = Vec::new();
1168
1169                visit_stream_node_cont(node, |stream_node| {
1170                    if let Some(NodeBody::LocalityProvider(locality_provider)) =
1171                        stream_node.node_body.as_ref()
1172                    {
1173                        // Collect state table ID (except the progress table)
1174                        let state_table_id = locality_provider
1175                            .state_table
1176                            .as_ref()
1177                            .expect("must have state table")
1178                            .id;
1179                        state_table_ids.push(state_table_id);
1180                        false // Stop visiting once we find a LocalityProvider
1181                    } else {
1182                        true // Continue visiting
1183                    }
1184                });
1185
1186                if !state_table_ids.is_empty() {
1187                    mapping.insert(fragment_id, state_table_ids);
1188                }
1189            }
1190        }
1191
1192        mapping
1193    }
1194
1195    /// Find dependency relationships among fragments containing `LocalityProvider` nodes.
1196    /// Returns a mapping where each fragment ID maps to a list of fragment IDs that should be processed after it.
1197    /// Following the same semantics as `FragmentBackfillOrder`:
1198    /// `G[10] -> [1, 2, 11]` means `LocalityProvider` in fragment 10 should be processed
1199    /// before `LocalityProviders` in fragments 1, 2, and 11.
1200    ///
1201    /// This method assumes each fragment contains at most one `LocalityProvider` node.
1202    pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1203        let mut locality_provider_fragments = HashSet::new();
1204        let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1205
1206        // First, identify all fragments that contain LocalityProvider nodes
1207        for (fragment_id, fragment) in &self.fragments {
1208            let fragment_id = fragment_id.as_global_id();
1209            let has_locality_provider = self.fragment_has_locality_provider(fragment);
1210
1211            if has_locality_provider {
1212                locality_provider_fragments.insert(fragment_id);
1213                dependencies.entry(fragment_id).or_default();
1214            }
1215        }
1216
1217        // Build dependency relationships between LocalityProvider fragments
1218        // For each LocalityProvider fragment, find all downstream LocalityProvider fragments
1219        // The upstream fragment should be processed before the downstream fragments
1220        for &provider_fragment_id in &locality_provider_fragments {
1221            let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1222
1223            // Find all fragments downstream from this LocalityProvider fragment
1224            let mut visited = HashSet::new();
1225            let mut downstream_locality_providers = Vec::new();
1226
1227            self.collect_downstream_locality_providers(
1228                provider_fragment_global_id,
1229                &locality_provider_fragments,
1230                &mut visited,
1231                &mut downstream_locality_providers,
1232            );
1233
1234            // This fragment should be processed before all its downstream LocalityProvider fragments
1235            dependencies
1236                .entry(provider_fragment_id)
1237                .or_default()
1238                .extend(downstream_locality_providers);
1239        }
1240
1241        dependencies
1242    }
1243
1244    fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1245        let mut has_locality_provider = false;
1246
1247        if let Some(node) = fragment.node.as_ref() {
1248            visit_stream_node_cont(node, |stream_node| {
1249                if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1250                    has_locality_provider = true;
1251                    false // Stop visiting once we find a LocalityProvider
1252                } else {
1253                    true // Continue visiting
1254                }
1255            });
1256        }
1257
1258        has_locality_provider
1259    }
1260
1261    /// Recursively collect downstream `LocalityProvider` fragments
1262    fn collect_downstream_locality_providers(
1263        &self,
1264        current_fragment_id: GlobalFragmentId,
1265        locality_provider_fragments: &HashSet<FragmentId>,
1266        visited: &mut HashSet<GlobalFragmentId>,
1267        downstream_providers: &mut Vec<FragmentId>,
1268    ) {
1269        if visited.contains(&current_fragment_id) {
1270            return;
1271        }
1272        visited.insert(current_fragment_id);
1273
1274        // Check all downstream fragments
1275        for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1276            let downstream_fragment_id = downstream_id.as_global_id();
1277
1278            // If the downstream fragment is a LocalityProvider, add it to results
1279            if locality_provider_fragments.contains(&downstream_fragment_id) {
1280                downstream_providers.push(downstream_fragment_id);
1281            }
1282
1283            // Recursively check further downstream
1284            self.collect_downstream_locality_providers(
1285                downstream_id,
1286                locality_provider_fragments,
1287                visited,
1288                downstream_providers,
1289            );
1290        }
1291    }
1292}
1293
1294/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
1295/// Return `true` when has change applied.
1296pub fn fill_snapshot_backfill_epoch(
1297    node: &mut StreamNode,
1298    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1299    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1300) -> MetaResult<bool> {
1301    let mut result = Ok(());
1302    let mut applied = false;
1303    visit_stream_node_cont_mut(node, |node| {
1304        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1305            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1306                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1307        {
1308            result = try {
1309                let table_id = stream_scan.table_id;
1310                let snapshot_epoch = cross_db_snapshot_backfill_info
1311                    .upstream_mv_table_id_to_backfill_epoch
1312                    .get(&table_id)
1313                    .or_else(|| {
1314                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1315                            snapshot_backfill_info
1316                                .upstream_mv_table_id_to_backfill_epoch
1317                                .get(&table_id)
1318                        })
1319                    })
1320                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1321                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1322                if let Some(prev_snapshot_epoch) =
1323                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1324                {
1325                    Err(anyhow!(
1326                        "snapshot backfill epoch set again: {} {} {}",
1327                        table_id,
1328                        prev_snapshot_epoch,
1329                        snapshot_epoch
1330                    ))?;
1331                }
1332                applied = true;
1333            };
1334            result.is_ok()
1335        } else {
1336            true
1337        }
1338    });
1339    result.map(|_| applied)
1340}
1341
1342static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1343    LazyLock::new(HashMap::new);
1344
1345/// A fragment that is either being built or already exists. Used for generalize the logic of
1346/// [`crate::stream::ActorGraphBuilder`].
1347#[derive(Debug, Clone, EnumAsInner)]
1348pub(super) enum EitherFragment {
1349    /// An internal fragment that is being built for the current streaming job.
1350    Building(BuildingFragment),
1351
1352    /// An existing fragment that is external but connected to the fragments being built.
1353    Existing(SharedFragmentInfo),
1354}
1355
1356/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
1357/// fragments, which are connected to the graph's top-most or bottom-most fragments.
1358///
1359/// For example,
1360/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
1361///   `Materialize` node will be included in this structure.
1362/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
1363///   containing the `StreamScan` nodes will be included in this structure.
1364#[derive(Debug)]
1365pub struct CompleteStreamFragmentGraph {
1366    /// The fragment graph of the streaming job being built.
1367    building_graph: StreamFragmentGraph,
1368
1369    /// The required information of existing fragments.
1370    existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1371
1372    /// The location of the actors in the existing fragments.
1373    existing_actor_location: HashMap<ActorId, WorkerId>,
1374
1375    /// Extra edges between existing fragments and the building fragments.
1376    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1377
1378    /// Extra edges between existing fragments and the building fragments.
1379    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1380}
1381
1382pub struct FragmentGraphUpstreamContext {
1383    /// Root fragment is the root of upstream stream graph, which can be a
1384    /// mview fragment or source fragment for cdc source job
1385    pub upstream_root_fragments: HashMap<JobId, (SharedFragmentInfo, PbStreamNode)>,
1386    pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1387}
1388
1389pub struct FragmentGraphDownstreamContext {
1390    pub original_root_fragment_id: FragmentId,
1391    pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1392    pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1393}
1394
1395impl CompleteStreamFragmentGraph {
1396    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
1397    /// upstream mviews.
1398    #[cfg(test)]
1399    pub fn for_test(graph: StreamFragmentGraph) -> Self {
1400        Self {
1401            building_graph: graph,
1402            existing_fragments: Default::default(),
1403            existing_actor_location: Default::default(),
1404            extra_downstreams: Default::default(),
1405            extra_upstreams: Default::default(),
1406        }
1407    }
1408
1409    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
1410    /// e.g., MV on MV and CDC/Source Table with the upstream existing
1411    /// `Materialize` or `Source` fragments.
1412    pub fn with_upstreams(
1413        graph: StreamFragmentGraph,
1414        upstream_context: FragmentGraphUpstreamContext,
1415        job_type: StreamingJobType,
1416    ) -> MetaResult<Self> {
1417        Self::build_helper(graph, Some(upstream_context), None, job_type)
1418    }
1419
1420    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
1421    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
1422    pub fn with_downstreams(
1423        graph: StreamFragmentGraph,
1424        downstream_context: FragmentGraphDownstreamContext,
1425        job_type: StreamingJobType,
1426    ) -> MetaResult<Self> {
1427        Self::build_helper(graph, None, Some(downstream_context), job_type)
1428    }
1429
1430    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
1431    pub fn with_upstreams_and_downstreams(
1432        graph: StreamFragmentGraph,
1433        upstream_context: FragmentGraphUpstreamContext,
1434        downstream_context: FragmentGraphDownstreamContext,
1435        job_type: StreamingJobType,
1436    ) -> MetaResult<Self> {
1437        Self::build_helper(
1438            graph,
1439            Some(upstream_context),
1440            Some(downstream_context),
1441            job_type,
1442        )
1443    }
1444
1445    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
1446    fn build_helper(
1447        mut graph: StreamFragmentGraph,
1448        upstream_ctx: Option<FragmentGraphUpstreamContext>,
1449        downstream_ctx: Option<FragmentGraphDownstreamContext>,
1450        job_type: StreamingJobType,
1451    ) -> MetaResult<Self> {
1452        let mut extra_downstreams = HashMap::new();
1453        let mut extra_upstreams = HashMap::new();
1454        let mut existing_fragments = HashMap::new();
1455
1456        let mut existing_actor_location = HashMap::new();
1457
1458        if let Some(FragmentGraphUpstreamContext {
1459            upstream_root_fragments,
1460            upstream_actor_location,
1461        }) = upstream_ctx
1462        {
1463            for (&id, fragment) in &mut graph.fragments {
1464                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1465
1466                for (&upstream_job_id, required_columns) in &fragment.upstream_job_columns {
1467                    let (upstream_fragment, nodes) = upstream_root_fragments
1468                        .get(&upstream_job_id)
1469                        .context("upstream fragment not found")?;
1470                    let upstream_root_fragment_id =
1471                        GlobalFragmentId::new(upstream_fragment.fragment_id);
1472
1473                    let edge = match job_type {
1474                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1475                            // we traverse all fragments in the graph, and we should find out the
1476                            // CdcFilter fragment and add an edge between upstream source fragment and it.
1477                            assert_ne!(
1478                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1479                                0
1480                            );
1481
1482                            tracing::debug!(
1483                                ?upstream_root_fragment_id,
1484                                ?required_columns,
1485                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
1486                                current_frag_id=?id,
1487                                "CdcFilter with upstream source fragment"
1488                            );
1489
1490                            StreamFragmentEdge {
1491                                id: EdgeId::UpstreamExternal {
1492                                    upstream_job_id,
1493                                    downstream_fragment_id: id,
1494                                },
1495                                // We always use `NoShuffle` for the exchange between the upstream
1496                                // `Source` and the downstream `StreamScan` of the new cdc table.
1497                                dispatch_strategy: DispatchStrategy {
1498                                    r#type: DispatcherType::NoShuffle as _,
1499                                    dist_key_indices: vec![], // not used for `NoShuffle`
1500                                    output_mapping: DispatchOutputMapping::identical(
1501                                        CDC_SOURCE_COLUMN_NUM as _,
1502                                    )
1503                                    .into(),
1504                                },
1505                            }
1506                        }
1507
1508                        // handle MV on MV/Source
1509                        StreamingJobType::MaterializedView
1510                        | StreamingJobType::Sink
1511                        | StreamingJobType::Index => {
1512                            // Build the extra edges between the upstream `Materialize` and
1513                            // the downstream `StreamScan` of the new job.
1514                            if upstream_fragment
1515                                .fragment_type_mask
1516                                .contains(FragmentTypeFlag::Mview)
1517                            {
1518                                // Resolve the required output columns from the upstream materialized view.
1519                                let (dist_key_indices, output_mapping) = {
1520                                    let mview_node =
1521                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1522                                    let all_columns = mview_node.column_descs();
1523                                    let dist_key_indices = mview_node.dist_key_indices();
1524                                    let output_mapping = gen_output_mapping(
1525                                        required_columns,
1526                                        &all_columns,
1527                                    )
1528                                    .context(
1529                                        "BUG: column not found in the upstream materialized view",
1530                                    )?;
1531                                    (dist_key_indices, output_mapping)
1532                                };
1533                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1534                                    uses_shuffled_backfill,
1535                                    dist_key_indices,
1536                                    output_mapping,
1537                                );
1538
1539                                StreamFragmentEdge {
1540                                    id: EdgeId::UpstreamExternal {
1541                                        upstream_job_id,
1542                                        downstream_fragment_id: id,
1543                                    },
1544                                    dispatch_strategy,
1545                                }
1546                            }
1547                            // Build the extra edges between the upstream `Source` and
1548                            // the downstream `SourceBackfill` of the new job.
1549                            else if upstream_fragment
1550                                .fragment_type_mask
1551                                .contains(FragmentTypeFlag::Source)
1552                            {
1553                                let output_mapping = {
1554                                    let source_node =
1555                                        nodes.get_node_body().unwrap().as_source().unwrap();
1556
1557                                    let all_columns = source_node.column_descs().unwrap();
1558                                    gen_output_mapping(required_columns, &all_columns).context(
1559                                        "BUG: column not found in the upstream source node",
1560                                    )?
1561                                };
1562
1563                                StreamFragmentEdge {
1564                                    id: EdgeId::UpstreamExternal {
1565                                        upstream_job_id,
1566                                        downstream_fragment_id: id,
1567                                    },
1568                                    // We always use `NoShuffle` for the exchange between the upstream
1569                                    // `Source` and the downstream `StreamScan` of the new MV.
1570                                    dispatch_strategy: DispatchStrategy {
1571                                        r#type: DispatcherType::NoShuffle as _,
1572                                        dist_key_indices: vec![], // not used for `NoShuffle`
1573                                        output_mapping: Some(output_mapping),
1574                                    },
1575                                }
1576                            } else {
1577                                bail!(
1578                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1579                                    upstream_fragment.fragment_type_mask
1580                                )
1581                            }
1582                        }
1583                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1584                            bail!(
1585                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1586                                job_type
1587                            )
1588                        }
1589                    };
1590
1591                    // put the edge into the extra edges
1592                    extra_downstreams
1593                        .entry(upstream_root_fragment_id)
1594                        .or_insert_with(HashMap::new)
1595                        .try_insert(id, edge.clone())
1596                        .unwrap();
1597                    extra_upstreams
1598                        .entry(id)
1599                        .or_insert_with(HashMap::new)
1600                        .try_insert(upstream_root_fragment_id, edge)
1601                        .unwrap();
1602                }
1603            }
1604
1605            existing_fragments.extend(
1606                upstream_root_fragments
1607                    .into_values()
1608                    .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1609            );
1610
1611            existing_actor_location.extend(upstream_actor_location);
1612        }
1613
1614        if let Some(FragmentGraphDownstreamContext {
1615            original_root_fragment_id,
1616            downstream_fragments,
1617            downstream_actor_location,
1618        }) = downstream_ctx
1619        {
1620            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1621            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1622
1623            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1624            // existing materialized views.
1625            for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1626                let id = GlobalFragmentId::new(fragment.fragment_id);
1627
1628                // Similar to `extract_upstream_columns_except_cross_db_backfill`.
1629                let output_columns = {
1630                    let mut res = None;
1631
1632                    stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1633                        let columns = match node_body {
1634                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1635                            NodeBody::SourceBackfill(source_backfill) => {
1636                                // FIXME: only pass required columns instead of all columns here
1637                                source_backfill.column_descs()
1638                            }
1639                            _ => return,
1640                        };
1641                        res = Some(columns);
1642                    });
1643
1644                    res.context("failed to locate downstream scan")?
1645                };
1646
1647                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1648                let nodes = table_fragment.node.as_ref().unwrap();
1649
1650                let (dist_key_indices, output_mapping) = match job_type {
1651                    StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1652                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1653                        let all_columns = mview_node.column_descs();
1654                        let dist_key_indices = mview_node.dist_key_indices();
1655                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1656                            .ok_or_else(|| {
1657                                MetaError::invalid_parameter(
1658                                    "unable to drop the column due to \
1659                                     being referenced by downstream materialized views or sinks",
1660                                )
1661                            })?;
1662                        (dist_key_indices, output_mapping)
1663                    }
1664
1665                    StreamingJobType::Source => {
1666                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1667                        let all_columns = source_node.column_descs().unwrap();
1668                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1669                            .ok_or_else(|| {
1670                                MetaError::invalid_parameter(
1671                                    "unable to drop the column due to \
1672                                     being referenced by downstream materialized views or sinks",
1673                                )
1674                            })?;
1675                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1676                        (
1677                            vec![], // not used for `NoShuffle`
1678                            output_mapping,
1679                        )
1680                    }
1681
1682                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1683                };
1684
1685                let edge = StreamFragmentEdge {
1686                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1687                        original_upstream_fragment_id: original_table_fragment_id,
1688                        downstream_fragment_id: id,
1689                    }),
1690                    dispatch_strategy: DispatchStrategy {
1691                        r#type: *dispatcher_type as i32,
1692                        output_mapping: Some(output_mapping),
1693                        dist_key_indices,
1694                    },
1695                };
1696
1697                extra_downstreams
1698                    .entry(table_fragment_id)
1699                    .or_insert_with(HashMap::new)
1700                    .try_insert(id, edge.clone())
1701                    .unwrap();
1702                extra_upstreams
1703                    .entry(id)
1704                    .or_insert_with(HashMap::new)
1705                    .try_insert(table_fragment_id, edge)
1706                    .unwrap();
1707            }
1708
1709            existing_fragments.extend(
1710                downstream_fragments
1711                    .into_iter()
1712                    .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1713            );
1714
1715            existing_actor_location.extend(downstream_actor_location);
1716        }
1717
1718        Ok(Self {
1719            building_graph: graph,
1720            existing_fragments,
1721            existing_actor_location,
1722            extra_downstreams,
1723            extra_upstreams,
1724        })
1725    }
1726}
1727
1728/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1729fn gen_output_mapping(
1730    required_columns: &[PbColumnDesc],
1731    upstream_columns: &[PbColumnDesc],
1732) -> Option<DispatchOutputMapping> {
1733    let len = required_columns.len();
1734    let mut indices = vec![0; len];
1735    let mut types = None;
1736
1737    for (i, r) in required_columns.iter().enumerate() {
1738        let (ui, u) = upstream_columns
1739            .iter()
1740            .find_position(|&u| u.column_id == r.column_id)?;
1741        indices[i] = ui as u32;
1742
1743        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1744        // non-empty `types`.
1745        if u.column_type != r.column_type {
1746            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1747                upstream: u.column_type.clone(),
1748                downstream: r.column_type.clone(),
1749            };
1750        }
1751    }
1752
1753    // If there's no type change, indicate it by empty `types`.
1754    let types = types.unwrap_or(Vec::new());
1755
1756    Some(DispatchOutputMapping { indices, types })
1757}
1758
1759fn mv_on_mv_dispatch_strategy(
1760    uses_shuffled_backfill: bool,
1761    dist_key_indices: Vec<u32>,
1762    output_mapping: DispatchOutputMapping,
1763) -> DispatchStrategy {
1764    if uses_shuffled_backfill {
1765        if !dist_key_indices.is_empty() {
1766            DispatchStrategy {
1767                r#type: DispatcherType::Hash as _,
1768                dist_key_indices,
1769                output_mapping: Some(output_mapping),
1770            }
1771        } else {
1772            DispatchStrategy {
1773                r#type: DispatcherType::Simple as _,
1774                dist_key_indices: vec![], // empty for Simple
1775                output_mapping: Some(output_mapping),
1776            }
1777        }
1778    } else {
1779        DispatchStrategy {
1780            r#type: DispatcherType::NoShuffle as _,
1781            dist_key_indices: vec![], // not used for `NoShuffle`
1782            output_mapping: Some(output_mapping),
1783        }
1784    }
1785}
1786
1787impl CompleteStreamFragmentGraph {
1788    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1789    /// building graph.
1790    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1791        self.building_graph
1792            .fragments
1793            .keys()
1794            .chain(self.existing_fragments.keys())
1795            .copied()
1796    }
1797
1798    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1799    pub(super) fn all_edges(
1800        &self,
1801    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1802        self.building_graph
1803            .downstreams
1804            .iter()
1805            .chain(self.extra_downstreams.iter())
1806            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1807    }
1808
1809    /// Returns the distribution of the existing fragments.
1810    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1811        self.existing_fragments
1812            .iter()
1813            .map(|(&id, f)| {
1814                (
1815                    id,
1816                    Distribution::from_fragment(f, &self.existing_actor_location),
1817                )
1818            })
1819            .collect()
1820    }
1821
1822    /// Generate topological order of **all** fragments in this graph, including the ones that are
1823    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1824    /// not be done.
1825    ///
1826    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1827    /// `Sink` / `Materialize` in stream graph.
1828    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1829        let mut topo = Vec::new();
1830        let mut downstream_cnts = HashMap::new();
1831
1832        // Iterate all fragments.
1833        for fragment_id in self.all_fragment_ids() {
1834            // Count how many downstreams we have for a given fragment.
1835            let downstream_cnt = self.get_downstreams(fragment_id).count();
1836            if downstream_cnt == 0 {
1837                topo.push(fragment_id);
1838            } else {
1839                downstream_cnts.insert(fragment_id, downstream_cnt);
1840            }
1841        }
1842
1843        let mut i = 0;
1844        while let Some(&fragment_id) = topo.get(i) {
1845            i += 1;
1846            // Find if we can process more fragments.
1847            for (upstream_job_id, _) in self.get_upstreams(fragment_id) {
1848                let downstream_cnt = downstream_cnts.get_mut(&upstream_job_id).unwrap();
1849                *downstream_cnt -= 1;
1850                if *downstream_cnt == 0 {
1851                    downstream_cnts.remove(&upstream_job_id);
1852                    topo.push(upstream_job_id);
1853                }
1854            }
1855        }
1856
1857        if !downstream_cnts.is_empty() {
1858            // There are fragments that are not processed yet.
1859            bail!("graph is not a DAG");
1860        }
1861
1862        Ok(topo)
1863    }
1864
1865    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1866    /// to build actors on the compute nodes and persist into meta store.
1867    pub(super) fn seal_fragment(
1868        &self,
1869        id: GlobalFragmentId,
1870        actors: Vec<StreamActor>,
1871        distribution: Distribution,
1872        stream_node: StreamNode,
1873    ) -> Fragment {
1874        let building_fragment = self.get_fragment(id).into_building().unwrap();
1875        let internal_tables = building_fragment.extract_internal_tables();
1876        let BuildingFragment {
1877            inner,
1878            job_id,
1879            upstream_job_columns: _,
1880        } = building_fragment;
1881
1882        let distribution_type = distribution.to_distribution_type();
1883        let vnode_count = distribution.vnode_count();
1884
1885        let materialized_fragment_id =
1886            if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1887                job_id.map(JobId::as_mv_table_id)
1888            } else {
1889                None
1890            };
1891
1892        let vector_index_fragment_id =
1893            if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1894                job_id.map(JobId::as_mv_table_id)
1895            } else {
1896                None
1897            };
1898
1899        let state_table_ids = internal_tables
1900            .iter()
1901            .map(|t| t.id)
1902            .chain(materialized_fragment_id)
1903            .chain(vector_index_fragment_id)
1904            .collect();
1905
1906        Fragment {
1907            fragment_id: inner.fragment_id,
1908            fragment_type_mask: inner.fragment_type_mask.into(),
1909            distribution_type,
1910            actors,
1911            state_table_ids,
1912            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1913            nodes: stream_node,
1914        }
1915    }
1916
1917    /// Get a fragment from the complete graph, which can be either a building fragment or an
1918    /// existing fragment.
1919    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1920        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1921            EitherFragment::Existing(fragment.clone())
1922        } else {
1923            EitherFragment::Building(
1924                self.building_graph
1925                    .fragments
1926                    .get(&fragment_id)
1927                    .unwrap()
1928                    .clone(),
1929            )
1930        }
1931    }
1932
1933    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1934    /// graph.
1935    pub(super) fn get_downstreams(
1936        &self,
1937        fragment_id: GlobalFragmentId,
1938    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1939        self.building_graph
1940            .get_downstreams(fragment_id)
1941            .iter()
1942            .chain(
1943                self.extra_downstreams
1944                    .get(&fragment_id)
1945                    .into_iter()
1946                    .flatten(),
1947            )
1948            .map(|(&id, edge)| (id, edge))
1949    }
1950
1951    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1952    /// graph.
1953    pub(super) fn get_upstreams(
1954        &self,
1955        fragment_id: GlobalFragmentId,
1956    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1957        self.building_graph
1958            .get_upstreams(fragment_id)
1959            .iter()
1960            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1961            .map(|(&id, edge)| (id, edge))
1962    }
1963
1964    /// Returns all building fragments in the graph.
1965    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1966        &self.building_graph.fragments
1967    }
1968
1969    /// Returns all building fragments in the graph, mutable.
1970    pub(super) fn building_fragments_mut(
1971        &mut self,
1972    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1973        &mut self.building_graph.fragments
1974    }
1975
1976    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1977    pub(super) fn max_parallelism(&self) -> usize {
1978        self.building_graph.max_parallelism()
1979    }
1980}