risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19use std::sync::atomic::AtomicU32;
20
21use anyhow::{Context, anyhow};
22use enum_as_inner::EnumAsInner;
23use itertools::Itertools;
24use risingwave_common::bail;
25use risingwave_common::catalog::{
26    CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
27    generate_internal_table_name_with_type,
28};
29use risingwave_common::hash::VnodeCount;
30use risingwave_common::util::iter_util::ZipEqFast;
31use risingwave_common::util::stream_graph_visitor::{
32    self, visit_stream_node_cont, visit_stream_node_cont_mut,
33};
34use risingwave_connector::sink::catalog::SinkType;
35use risingwave_meta_model::WorkerId;
36use risingwave_pb::catalog::{PbSink, PbTable, Table};
37use risingwave_pb::ddl_service::TableJobType;
38use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
39use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
40use risingwave_pb::stream_plan::stream_fragment_graph::{
41    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
42};
43use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
44use risingwave_pb::stream_plan::{
45    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
46    PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
47    StreamScanType,
48};
49
50use crate::barrier::{SharedFragmentInfo, SnapshotBackfillInfo};
51use crate::controller::id::IdGeneratorManager;
52use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
53use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
54use crate::stream::stream_graph::id::{
55    GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
56};
57use crate::stream::stream_graph::schedule::Distribution;
58use crate::{MetaError, MetaResult};
59
60/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
61/// several additional helper fields.
62#[derive(Debug, Clone)]
63pub(super) struct BuildingFragment {
64    /// The fragment structure from the frontend, with the global fragment ID.
65    inner: StreamFragment,
66
67    /// The ID of the job if it contains the streaming job node.
68    job_id: Option<u32>,
69
70    /// The required column IDs of each upstream table.
71    /// Will be converted to indices when building the edge connected to the upstream.
72    ///
73    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
74    upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
75}
76
77impl BuildingFragment {
78    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
79    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
80    fn new(
81        id: GlobalFragmentId,
82        fragment: StreamFragment,
83        job: &StreamingJob,
84        table_id_gen: GlobalTableIdGen,
85    ) -> Self {
86        let mut fragment = StreamFragment {
87            fragment_id: id.as_global_id(),
88            ..fragment
89        };
90
91        // Fill the information of the internal tables in the fragment.
92        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
93
94        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
95        let upstream_table_columns =
96            Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
97
98        Self {
99            inner: fragment,
100            job_id,
101            upstream_table_columns,
102        }
103    }
104
105    /// Extract the internal tables from the fragment.
106    fn extract_internal_tables(&self) -> Vec<Table> {
107        let mut fragment = self.inner.clone();
108        let mut tables = Vec::new();
109        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
110            tables.push(table.clone());
111        });
112        tables
113    }
114
115    /// Fill the information with the internal tables in the fragment.
116    fn fill_internal_tables(
117        fragment: &mut StreamFragment,
118        job: &StreamingJob,
119        table_id_gen: GlobalTableIdGen,
120    ) {
121        let fragment_id = fragment.fragment_id;
122        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
123            table.id = table_id_gen.to_global_id(table.id).as_global_id();
124            table.schema_id = job.schema_id();
125            table.database_id = job.database_id();
126            table.name = generate_internal_table_name_with_type(
127                &job.name(),
128                fragment_id,
129                table.id,
130                table_type_name,
131            );
132            table.fragment_id = fragment_id;
133            table.owner = job.owner();
134            table.job_id = Some(job.id());
135        });
136    }
137
138    /// Fill the information with the job in the fragment.
139    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
140        let job_id = job.id();
141        let fragment_id = fragment.fragment_id;
142        let mut has_job = false;
143
144        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
145            NodeBody::Materialize(materialize_node) => {
146                materialize_node.table_id = job_id;
147
148                // Fill the table field of `MaterializeNode` from the job.
149                let table = materialize_node.table.insert(job.table().unwrap().clone());
150                table.fragment_id = fragment_id; // this will later be synced back to `job.table` with `set_info_from_graph`
151                // In production, do not include full definition in the table in plan node.
152                if cfg!(not(debug_assertions)) {
153                    table.definition = job.name();
154                }
155
156                has_job = true;
157            }
158            NodeBody::Sink(sink_node) => {
159                sink_node.sink_desc.as_mut().unwrap().id = job_id;
160
161                has_job = true;
162            }
163            NodeBody::Dml(dml_node) => {
164                dml_node.table_id = job_id;
165                dml_node.table_version_id = job.table_version_id().unwrap();
166            }
167            NodeBody::StreamFsFetch(fs_fetch_node) => {
168                if let StreamingJob::Table(table_source, _, _) = job
169                    && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
170                    && let Some(source) = table_source
171                {
172                    node_inner.source_id = source.id;
173                }
174            }
175            NodeBody::Source(source_node) => {
176                match job {
177                    // Note: For table without connector, it has a dummy Source node.
178                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
179                    StreamingJob::Table(source, _table, _table_job_type) => {
180                        if let Some(source_inner) = source_node.source_inner.as_mut()
181                            && let Some(source) = source
182                        {
183                            debug_assert_ne!(source.id, job_id);
184                            source_inner.source_id = source.id;
185                        }
186                    }
187                    StreamingJob::Source(source) => {
188                        has_job = true;
189                        if let Some(source_inner) = source_node.source_inner.as_mut() {
190                            debug_assert_eq!(source.id, job_id);
191                            source_inner.source_id = source.id;
192                        }
193                    }
194                    // For other job types, no need to fill the source id, since it refers to an existing source.
195                    _ => {}
196                }
197            }
198            NodeBody::StreamCdcScan(node) => {
199                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
200                    table_desc.table_id = job_id;
201                }
202            }
203            NodeBody::VectorIndexWrite(node) => {
204                let table = node.table.as_mut().unwrap();
205                table.id = job_id;
206                table.database_id = job.database_id();
207                table.schema_id = job.schema_id();
208                table.fragment_id = fragment_id;
209                #[cfg(not(debug_assertions))]
210                {
211                    table.definition = job.name();
212                }
213
214                has_job = true;
215            }
216            _ => {}
217        });
218
219        has_job
220    }
221
222    /// Extract the required columns of each upstream table except for cross-db backfill.
223    fn extract_upstream_table_columns_except_cross_db_backfill(
224        fragment: &StreamFragment,
225    ) -> HashMap<TableId, Vec<PbColumnDesc>> {
226        let mut table_columns = HashMap::new();
227
228        stream_graph_visitor::visit_fragment(fragment, |node_body| {
229            let (table_id, column_ids) = match node_body {
230                NodeBody::StreamScan(stream_scan) => {
231                    if stream_scan.get_stream_scan_type().unwrap()
232                        == StreamScanType::CrossDbSnapshotBackfill
233                    {
234                        return;
235                    }
236                    (stream_scan.table_id.into(), stream_scan.upstream_columns())
237                }
238                NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
239                NodeBody::SourceBackfill(backfill) => (
240                    backfill.upstream_source_id.into(),
241                    // FIXME: only pass required columns instead of all columns here
242                    backfill.column_descs(),
243                ),
244                _ => return,
245            };
246            table_columns
247                .try_insert(table_id, column_ids)
248                .expect("currently there should be no two same upstream tables in a fragment");
249        });
250
251        table_columns
252    }
253
254    pub fn has_shuffled_backfill(&self) -> bool {
255        let stream_node = match self.inner.node.as_ref() {
256            Some(node) => node,
257            _ => return false,
258        };
259        let mut has_shuffled_backfill = false;
260        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
261        visit_stream_node_cont(stream_node, |node| {
262            let is_shuffled_backfill = if let Some(node) = &node.node_body
263                && let Some(node) = node.as_stream_scan()
264            {
265                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
266                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
267            } else {
268                false
269            };
270            if is_shuffled_backfill {
271                *has_shuffled_backfill_mut_ref = true;
272                false
273            } else {
274                true
275            }
276        });
277        has_shuffled_backfill
278    }
279}
280
281impl Deref for BuildingFragment {
282    type Target = StreamFragment;
283
284    fn deref(&self) -> &Self::Target {
285        &self.inner
286    }
287}
288
289impl DerefMut for BuildingFragment {
290    fn deref_mut(&mut self) -> &mut Self::Target {
291        &mut self.inner
292    }
293}
294
295/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
296/// different variants.
297#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
298pub(super) enum EdgeId {
299    /// The edge between two building (internal) fragments.
300    Internal {
301        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
302        /// See [`StreamFragmentEdgeProto`].
303        link_id: u64,
304    },
305
306    /// The edge between an upstream external fragment and downstream building fragment. Used for
307    /// MV on MV.
308    UpstreamExternal {
309        /// The ID of the upstream table or materialized view.
310        upstream_table_id: TableId,
311        /// The ID of the downstream fragment.
312        downstream_fragment_id: GlobalFragmentId,
313    },
314
315    /// The edge between an upstream building fragment and downstream external fragment. Used for
316    /// schema change (replace table plan).
317    DownstreamExternal(DownstreamExternalEdgeId),
318}
319
320#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
321pub(super) struct DownstreamExternalEdgeId {
322    /// The ID of the original upstream fragment (`Materialize`).
323    pub(super) original_upstream_fragment_id: GlobalFragmentId,
324    /// The ID of the downstream fragment.
325    pub(super) downstream_fragment_id: GlobalFragmentId,
326}
327
328/// The edge in the fragment graph.
329///
330/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
331#[derive(Debug, Clone)]
332pub(super) struct StreamFragmentEdge {
333    /// The ID of the edge.
334    pub id: EdgeId,
335
336    /// The strategy used for dispatching the data.
337    pub dispatch_strategy: DispatchStrategy,
338}
339
340impl StreamFragmentEdge {
341    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
342        Self {
343            // By creating an edge from the protobuf, we know that the edge is from the frontend and
344            // is internal.
345            id: EdgeId::Internal {
346                link_id: edge.link_id,
347            },
348            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
349        }
350    }
351}
352
353fn clone_fragment(
354    fragment: &Fragment,
355    id_generator_manager: &IdGeneratorManager,
356    actor_id_counter: &AtomicU32,
357) -> Fragment {
358    let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
359        .to_global_id(0)
360        .as_global_id();
361    let actor_id_gen = GlobalActorIdGen::new(actor_id_counter, fragment.actors.len() as _);
362    Fragment {
363        fragment_id,
364        fragment_type_mask: fragment.fragment_type_mask,
365        distribution_type: fragment.distribution_type,
366        actors: fragment
367            .actors
368            .iter()
369            .enumerate()
370            .map(|(i, actor)| StreamActor {
371                actor_id: actor_id_gen.to_global_id(i as _).as_global_id() as _,
372                fragment_id,
373                vnode_bitmap: actor.vnode_bitmap.clone(),
374                mview_definition: actor.mview_definition.clone(),
375                expr_context: actor.expr_context.clone(),
376            })
377            .collect(),
378        state_table_ids: fragment.state_table_ids.clone(),
379        maybe_vnode_count: fragment.maybe_vnode_count,
380        nodes: fragment.nodes.clone(),
381    }
382}
383
384pub fn check_sink_fragments_support_refresh_schema(
385    fragments: &BTreeMap<FragmentId, Fragment>,
386) -> MetaResult<()> {
387    if fragments.len() != 1 {
388        return Err(anyhow!(
389            "sink with auto schema change should have only 1 fragment, but got {:?}",
390            fragments.len()
391        )
392        .into());
393    }
394    let (_, fragment) = fragments.first_key_value().expect("non-empty");
395    let sink_node = &fragment.nodes;
396    let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
397        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
398    };
399    let [stream_scan_node] = sink_node.input.as_slice() else {
400        panic!("Sink has more than 1 input: {:?}", sink_node.input);
401    };
402    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
403        return Err(anyhow!(
404            "expect PbNodeBody::StreamScan but got: {:?}",
405            stream_scan_node.node_body
406        )
407        .into());
408    };
409    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
410    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
411        return Err(anyhow!(
412            "unsupported stream_scan_type for auto refresh schema: {:?}",
413            stream_scan_type
414        )
415        .into());
416    }
417    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
418        panic!(
419            "the number of StreamScan inputs is not 2: {:?}",
420            stream_scan_node.input
421        );
422    };
423    let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
424        return Err(anyhow!(
425            "expect PbNodeBody::Merge but got: {:?}",
426            merge_node.node_body
427        )
428        .into());
429    };
430    Ok(())
431}
432
433pub fn rewrite_refresh_schema_sink_fragment(
434    original_sink_fragment: &Fragment,
435    sink: &PbSink,
436    newly_added_columns: &[ColumnCatalog],
437    upstream_table: &PbTable,
438    upstream_table_fragment_id: FragmentId,
439    id_generator_manager: &IdGeneratorManager,
440    actor_id_counter: &AtomicU32,
441) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
442    let mut new_sink_columns = sink.columns.clone();
443    fn extend_sink_columns(
444        sink_columns: &mut Vec<PbColumnCatalog>,
445        new_columns: &[ColumnCatalog],
446        get_column_name: impl Fn(&String) -> String,
447    ) {
448        let next_column_id = sink_columns
449            .iter()
450            .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
451            .max()
452            .unwrap_or(1);
453        sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
454            let mut col = col.to_protobuf();
455            let column_desc = col.column_desc.as_mut().unwrap();
456            column_desc.column_id = next_column_id + (i as i32);
457            column_desc.name = get_column_name(&column_desc.name);
458            col
459        }));
460    }
461    extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
462        name.clone()
463    });
464
465    let mut new_sink_fragment = clone_fragment(
466        original_sink_fragment,
467        id_generator_manager,
468        actor_id_counter,
469    );
470    let sink_node = &mut new_sink_fragment.nodes;
471    let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
472        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
473    };
474    let [stream_scan_node] = sink_node.input.as_mut_slice() else {
475        panic!("Sink has more than 1 input: {:?}", sink_node.input);
476    };
477    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
478        return Err(anyhow!(
479            "expect PbNodeBody::StreamScan but got: {:?}",
480            stream_scan_node.node_body
481        )
482        .into());
483    };
484    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
485        panic!(
486            "the number of StreamScan inputs is not 2: {:?}",
487            stream_scan_node.input
488        );
489    };
490    let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
491        return Err(anyhow!(
492            "expect PbNodeBody::Merge but got: {:?}",
493            merge_node.node_body
494        )
495        .into());
496    };
497    // update sink_node
498    // following logic in <StreamSink as Explain>::distill
499    sink_node.identity = {
500        let sink_type = SinkType::from_proto(sink.sink_type());
501        let sink_type_str = sink_type.type_str();
502        let column_names = new_sink_columns
503            .iter()
504            .map(|col| {
505                ColumnCatalog::from(col.clone())
506                    .name_with_hidden()
507                    .to_string()
508            })
509            .join(", ");
510        let downstream_pk = if !sink_type.is_append_only() {
511            let downstream_pk = sink
512                .downstream_pk
513                .iter()
514                .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
515                .collect_vec();
516            format!(", downstream_pk: {downstream_pk:?}")
517        } else {
518            "".to_owned()
519        };
520        format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
521    };
522    sink_node
523        .fields
524        .extend(newly_added_columns.iter().map(|col| {
525            Field::new(
526                format!("{}.{}", upstream_table.name, col.column_desc.name),
527                col.data_type().clone(),
528            )
529            .to_prost()
530        }));
531
532    let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
533        extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
534            format!("{}_{}", upstream_table.name, name)
535        });
536        Some(log_store_table.clone())
537    } else {
538        None
539    };
540    sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
541
542    // update stream scan node
543    stream_scan_node
544        .fields
545        .extend(newly_added_columns.iter().map(|col| {
546            Field::new(
547                format!("{}.{}", upstream_table.name, col.column_desc.name),
548                col.data_type().clone(),
549            )
550            .to_prost()
551        }));
552    // following logic in <StreamTableScan as Explain>::distill
553    stream_scan_node.identity = {
554        let columns = stream_scan_node
555            .fields
556            .iter()
557            .map(|col| &col.name)
558            .join(", ");
559        format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
560    };
561
562    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
563    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
564        return Err(anyhow!(
565            "unsupported stream_scan_type for auto refresh schema: {:?}",
566            stream_scan_type
567        )
568        .into());
569    }
570    scan.arrangement_table = Some(upstream_table.clone());
571    scan.output_indices.extend(
572        (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
573    );
574    scan.upstream_column_ids.extend(
575        newly_added_columns
576            .iter()
577            .map(|col| col.column_id().get_id()),
578    );
579    let table_desc = scan.table_desc.as_mut().unwrap();
580    table_desc
581        .value_indices
582        .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
583    table_desc.columns.extend(
584        newly_added_columns
585            .iter()
586            .map(|col| col.column_desc.to_protobuf()),
587    );
588
589    // update merge node
590    merge_node.fields = scan
591        .upstream_column_ids
592        .iter()
593        .map(|&column_id| {
594            let col = upstream_table
595                .columns
596                .iter()
597                .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
598                .unwrap();
599            let col_desc = col.column_desc.as_ref().unwrap();
600            Field::new(
601                col_desc.name.clone(),
602                col_desc.column_type.as_ref().unwrap().into(),
603            )
604            .to_prost()
605        })
606        .collect();
607    merge.upstream_fragment_id = upstream_table_fragment_id;
608    Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
609}
610
611/// Adjacency list (G) of backfill orders.
612/// `G[10] -> [1, 2, 11]`
613/// means for the backfill node in `fragment 10`
614/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
615pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
616
617/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
618/// from the frontend.
619///
620/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
621/// that contains the additional information of pre-existing
622/// fragments, which are connected to the graph's top-most or bottom-most fragments.
623#[derive(Default, Debug)]
624pub struct StreamFragmentGraph {
625    /// stores all the fragments in the graph.
626    pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
627
628    /// stores edges between fragments: upstream => downstream.
629    pub(super) downstreams:
630        HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
631
632    /// stores edges between fragments: downstream -> upstream.
633    pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
634
635    /// Dependent relations of this job.
636    dependent_table_ids: HashSet<TableId>,
637
638    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
639    /// variable. If not specified, all active worker slots will be used.
640    specified_parallelism: Option<NonZeroUsize>,
641
642    /// Specified max parallelism, i.e., expected vnode count for the graph.
643    ///
644    /// The scheduler on the meta service will use this as a hint to decide the vnode count
645    /// for each fragment.
646    ///
647    /// Note that the actual vnode count may be different from this value.
648    /// For example, a no-shuffle exchange between current fragment graph and an existing
649    /// upstream fragment graph requires two fragments to be in the same distribution,
650    /// thus the same vnode count.
651    max_parallelism: usize,
652
653    /// The backfill ordering strategy of the graph.
654    backfill_order: BackfillOrder,
655}
656
657impl StreamFragmentGraph {
658    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
659    /// global IDs correctly filled.
660    pub fn new(
661        env: &MetaSrvEnv,
662        proto: StreamFragmentGraphProto,
663        job: &StreamingJob,
664    ) -> MetaResult<Self> {
665        let fragment_id_gen =
666            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
667        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
668        // with `refill_internal_table_ids` later.
669        // TODO: refactor the code to remove this step.
670        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
671
672        // Create nodes.
673        let fragments: HashMap<_, _> = proto
674            .fragments
675            .into_iter()
676            .map(|(id, fragment)| {
677                let id = fragment_id_gen.to_global_id(id);
678                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
679                (id, fragment)
680            })
681            .collect();
682
683        assert_eq!(
684            fragments
685                .values()
686                .map(|f| f.extract_internal_tables().len() as u32)
687                .sum::<u32>(),
688            proto.table_ids_cnt
689        );
690
691        // Create edges.
692        let mut downstreams = HashMap::new();
693        let mut upstreams = HashMap::new();
694
695        for edge in proto.edges {
696            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
697            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
698            let edge = StreamFragmentEdge::from_protobuf(&edge);
699
700            upstreams
701                .entry(downstream_id)
702                .or_insert_with(HashMap::new)
703                .try_insert(upstream_id, edge.clone())
704                .unwrap();
705            downstreams
706                .entry(upstream_id)
707                .or_insert_with(HashMap::new)
708                .try_insert(downstream_id, edge)
709                .unwrap();
710        }
711
712        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
713        // frontend), instead of visiting the graph ourselves.
714        let dependent_table_ids = proto
715            .dependent_table_ids
716            .iter()
717            .map(TableId::from)
718            .collect();
719
720        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
721            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
722        } else {
723            None
724        };
725
726        let max_parallelism = proto.max_parallelism as usize;
727        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
728            order: Default::default(),
729        });
730
731        Ok(Self {
732            fragments,
733            downstreams,
734            upstreams,
735            dependent_table_ids,
736            specified_parallelism,
737            max_parallelism,
738            backfill_order,
739        })
740    }
741
742    /// Retrieve the **incomplete** internal tables map of the whole graph.
743    ///
744    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
745    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
746    /// Be careful when using the returned values.
747    pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
748        let mut tables = BTreeMap::new();
749        for fragment in self.fragments.values() {
750            for table in fragment.extract_internal_tables() {
751                let table_id = table.id;
752                tables
753                    .try_insert(table_id, table)
754                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
755            }
756        }
757        tables
758    }
759
760    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
761    /// `create_internal_table_catalog`.
762    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
763        for fragment in self.fragments.values_mut() {
764            stream_graph_visitor::visit_internal_tables(
765                &mut fragment.inner,
766                |table, _table_type_name| {
767                    let target = table_id_map.get(&table.id).cloned().unwrap();
768                    table.id = target;
769                },
770            );
771        }
772    }
773
774    /// Use a trivial algorithm to match the internal tables of the new graph for
775    /// `ALTER TABLE` or `ALTER SOURCE`.
776    pub fn fit_internal_tables_trivial(
777        &mut self,
778        mut old_internal_tables: Vec<Table>,
779    ) -> MetaResult<()> {
780        let mut new_internal_table_ids = Vec::new();
781        for fragment in self.fragments.values() {
782            for table in &fragment.extract_internal_tables() {
783                new_internal_table_ids.push(table.id);
784            }
785        }
786
787        if new_internal_table_ids.len() != old_internal_tables.len() {
788            bail!(
789                "Different number of internal tables. New: {}, Old: {}",
790                new_internal_table_ids.len(),
791                old_internal_tables.len()
792            );
793        }
794        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
795        new_internal_table_ids.sort();
796
797        let internal_table_id_map = new_internal_table_ids
798            .into_iter()
799            .zip_eq_fast(old_internal_tables.into_iter())
800            .collect::<HashMap<_, _>>();
801
802        // TODO(alter-mv): unify this with `fit_internal_table_ids_with_mapping` after we
803        // confirm the behavior is the same.
804        for fragment in self.fragments.values_mut() {
805            stream_graph_visitor::visit_internal_tables(
806                &mut fragment.inner,
807                |table, _table_type_name| {
808                    // XXX: this replaces the entire table, instead of just the id!
809                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
810                    *table = target;
811                },
812            );
813        }
814
815        Ok(())
816    }
817
818    /// Fit the internal tables' `table_id`s according to the given mapping.
819    pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<u32, Table>) {
820        for fragment in self.fragments.values_mut() {
821            stream_graph_visitor::visit_internal_tables(
822                &mut fragment.inner,
823                |table, _table_type_name| {
824                    let target = matches.remove(&table.id).unwrap_or_else(|| {
825                        panic!("no matching table for table {}({})", table.id, table.name)
826                    });
827                    table.id = target.id;
828                    table.maybe_vnode_count = target.maybe_vnode_count;
829                },
830            );
831        }
832    }
833
834    /// Returns the fragment id where the streaming job node located.
835    pub fn table_fragment_id(&self) -> FragmentId {
836        self.fragments
837            .values()
838            .filter(|b| b.job_id.is_some())
839            .map(|b| b.fragment_id)
840            .exactly_one()
841            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
842    }
843
844    /// Returns the fragment id where the table dml is received.
845    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
846        self.fragments
847            .values()
848            .filter(|b| {
849                FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
850            })
851            .map(|b| b.fragment_id)
852            .at_most_one()
853            .expect("require at most 1 dml node when creating the streaming job")
854    }
855
856    /// Get the dependent streaming job ids of this job.
857    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
858        &self.dependent_table_ids
859    }
860
861    /// Get the parallelism of the job, if specified by the user.
862    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
863        self.specified_parallelism
864    }
865
866    /// Get the expected vnode count of the graph. See documentation of the field for more details.
867    pub fn max_parallelism(&self) -> usize {
868        self.max_parallelism
869    }
870
871    /// Get downstreams of a fragment.
872    fn get_downstreams(
873        &self,
874        fragment_id: GlobalFragmentId,
875    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
876        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
877    }
878
879    /// Get upstreams of a fragment.
880    fn get_upstreams(
881        &self,
882        fragment_id: GlobalFragmentId,
883    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
884        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
885    }
886
887    pub fn collect_snapshot_backfill_info(
888        &self,
889    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
890        Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
891            (
892                fragment.node.as_ref().unwrap(),
893                fragment.fragment_type_mask.into(),
894            )
895        }))
896    }
897
898    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
899    pub fn collect_snapshot_backfill_info_impl(
900        fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
901    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
902        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
903        let mut cross_db_info = SnapshotBackfillInfo {
904            upstream_mv_table_id_to_backfill_epoch: Default::default(),
905        };
906        let mut result = Ok(());
907        for (node, fragment_type_mask) in fragments {
908            visit_stream_node_cont(node, |node| {
909                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
910                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
911                        .expect("invalid stream_scan_type");
912                    let is_snapshot_backfill = match stream_scan_type {
913                        StreamScanType::SnapshotBackfill => {
914                            assert!(
915                                fragment_type_mask
916                                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
917                            );
918                            true
919                        }
920                        StreamScanType::CrossDbSnapshotBackfill => {
921                            assert!(
922                                fragment_type_mask
923                                    .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
924                            );
925                            cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
926                                TableId::new(stream_scan.table_id),
927                                stream_scan.snapshot_backfill_epoch,
928                            );
929
930                            return true;
931                        }
932                        _ => false,
933                    };
934
935                    match &mut prev_stream_scan {
936                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
937                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
938                                (Some(prev_snapshot_backfill_info), true) => {
939                                    prev_snapshot_backfill_info
940                                        .upstream_mv_table_id_to_backfill_epoch
941                                        .insert(
942                                            TableId::new(stream_scan.table_id),
943                                            stream_scan.snapshot_backfill_epoch,
944                                        );
945                                    true
946                                }
947                                (None, false) => true,
948                                (_, _) => {
949                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
950                                    false
951                                }
952                            }
953                        }
954                        None => {
955                            prev_stream_scan = Some((
956                                if is_snapshot_backfill {
957                                    Some(SnapshotBackfillInfo {
958                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
959                                            [(
960                                                TableId::new(stream_scan.table_id),
961                                                stream_scan.snapshot_backfill_epoch,
962                                            )],
963                                        ),
964                                    })
965                                } else {
966                                    None
967                                },
968                                *stream_scan.clone(),
969                            ));
970                            true
971                        }
972                    }
973                } else {
974                    true
975                }
976            })
977        }
978        result.map(|_| {
979            (
980                prev_stream_scan
981                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
982                    .unwrap_or(None),
983                cross_db_info,
984            )
985        })
986    }
987
988    /// Collect the mapping from table / `source_id` -> `fragment_id`
989    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
990        let mut mapping = HashMap::new();
991        for (fragment_id, fragment) in &self.fragments {
992            let fragment_id = fragment_id.as_global_id();
993            let fragment_mask = fragment.fragment_type_mask;
994            let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
995            let has_some_scan = candidates
996                .into_iter()
997                .any(|flag| (fragment_mask & flag as u32) > 0);
998            if has_some_scan {
999                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
1000                    match node.node_body.as_ref() {
1001                        Some(NodeBody::StreamScan(stream_scan)) => {
1002                            let table_id = stream_scan.table_id;
1003                            let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
1004                            fragments.push(fragment_id);
1005                            // each fragment should have only 1 scan node.
1006                            false
1007                        }
1008                        Some(NodeBody::SourceBackfill(source_backfill)) => {
1009                            let source_id = source_backfill.upstream_source_id;
1010                            let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
1011                            fragments.push(fragment_id);
1012                            // each fragment should have only 1 scan node.
1013                            false
1014                        }
1015                        _ => true,
1016                    }
1017                })
1018            }
1019        }
1020        mapping
1021    }
1022
1023    /// Initially the mapping that comes from frontend is between `table_ids`.
1024    /// We should remap it to fragment level, since we track progress by actor, and we can get
1025    /// a fragment <-> actor mapping
1026    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1027        let mapping = self.collect_backfill_mapping();
1028        let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
1029
1030        // 1. Add backfill dependencies
1031        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1032            let fragment_ids = mapping.get(rel_id).unwrap();
1033            for fragment_id in fragment_ids {
1034                let downstream_fragment_ids = downstream_rel_ids
1035                    .data
1036                    .iter()
1037                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1038                    .copied()
1039                    .collect();
1040                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1041            }
1042        }
1043
1044        // If no backfill order is specified, we still need to ensure that all backfill fragments
1045        // run before LocalityProvider fragments.
1046        if fragment_ordering.is_empty() {
1047            for value in mapping.values() {
1048                for &fragment_id in value {
1049                    fragment_ordering.entry(fragment_id).or_default();
1050                }
1051            }
1052        }
1053
1054        // 2. Add dependencies: all backfill fragments should run before LocalityProvider fragments
1055        let locality_provider_dependencies = self.find_locality_provider_dependencies();
1056
1057        let backfill_fragments: HashSet<u32> = mapping.values().flatten().copied().collect();
1058
1059        // Calculate LocalityProvider root fragments (zero indegree)
1060        // Root fragments are those that appear as keys but never appear as downstream dependencies
1061        let all_locality_provider_fragments: HashSet<u32> =
1062            locality_provider_dependencies.keys().copied().collect();
1063        let downstream_locality_provider_fragments: HashSet<u32> = locality_provider_dependencies
1064            .values()
1065            .flatten()
1066            .copied()
1067            .collect();
1068        let locality_provider_root_fragments: Vec<u32> = all_locality_provider_fragments
1069            .difference(&downstream_locality_provider_fragments)
1070            .copied()
1071            .collect();
1072
1073        // For each backfill fragment, add only the root LocalityProvider fragments as dependents
1074        // This ensures backfill completes before any LocalityProvider starts, while minimizing dependencies
1075        for &backfill_fragment_id in &backfill_fragments {
1076            fragment_ordering
1077                .entry(backfill_fragment_id)
1078                .or_default()
1079                .extend(locality_provider_root_fragments.iter().copied());
1080        }
1081
1082        // 3. Add LocalityProvider internal dependencies
1083        for (fragment_id, downstream_fragments) in locality_provider_dependencies {
1084            fragment_ordering
1085                .entry(fragment_id)
1086                .or_default()
1087                .extend(downstream_fragments);
1088        }
1089
1090        fragment_ordering
1091    }
1092
1093    pub fn find_locality_provider_fragment_state_table_mapping(
1094        &self,
1095    ) -> HashMap<FragmentId, Vec<TableId>> {
1096        let mut mapping: HashMap<FragmentId, Vec<TableId>> = HashMap::new();
1097
1098        for (fragment_id, fragment) in &self.fragments {
1099            let fragment_id = fragment_id.as_global_id();
1100
1101            // Check if this fragment contains a LocalityProvider node
1102            if let Some(node) = fragment.node.as_ref() {
1103                let mut state_table_ids = Vec::new();
1104
1105                visit_stream_node_cont(node, |stream_node| {
1106                    if let Some(NodeBody::LocalityProvider(locality_provider)) =
1107                        stream_node.node_body.as_ref()
1108                    {
1109                        // Collect state table ID (except the progress table)
1110                        let state_table_id = locality_provider
1111                            .state_table
1112                            .as_ref()
1113                            .expect("must have state table")
1114                            .id;
1115                        state_table_ids.push(TableId::new(state_table_id));
1116                        false // Stop visiting once we find a LocalityProvider
1117                    } else {
1118                        true // Continue visiting
1119                    }
1120                });
1121
1122                if !state_table_ids.is_empty() {
1123                    mapping.insert(fragment_id, state_table_ids);
1124                }
1125            }
1126        }
1127
1128        mapping
1129    }
1130
1131    /// Find dependency relationships among fragments containing `LocalityProvider` nodes.
1132    /// Returns a mapping where each fragment ID maps to a list of fragment IDs that should be processed after it.
1133    /// Following the same semantics as `FragmentBackfillOrder`:
1134    /// `G[10] -> [1, 2, 11]` means `LocalityProvider` in fragment 10 should be processed
1135    /// before `LocalityProviders` in fragments 1, 2, and 11.
1136    ///
1137    /// This method assumes each fragment contains at most one `LocalityProvider` node.
1138    pub fn find_locality_provider_dependencies(&self) -> HashMap<FragmentId, Vec<FragmentId>> {
1139        let mut locality_provider_fragments = HashSet::new();
1140        let mut dependencies: HashMap<FragmentId, Vec<FragmentId>> = HashMap::new();
1141
1142        // First, identify all fragments that contain LocalityProvider nodes
1143        for (fragment_id, fragment) in &self.fragments {
1144            let fragment_id = fragment_id.as_global_id();
1145            let has_locality_provider = self.fragment_has_locality_provider(fragment);
1146
1147            if has_locality_provider {
1148                locality_provider_fragments.insert(fragment_id);
1149                dependencies.entry(fragment_id).or_default();
1150            }
1151        }
1152
1153        // Build dependency relationships between LocalityProvider fragments
1154        // For each LocalityProvider fragment, find all downstream LocalityProvider fragments
1155        // The upstream fragment should be processed before the downstream fragments
1156        for &provider_fragment_id in &locality_provider_fragments {
1157            let provider_fragment_global_id = GlobalFragmentId::new(provider_fragment_id);
1158
1159            // Find all fragments downstream from this LocalityProvider fragment
1160            let mut visited = HashSet::new();
1161            let mut downstream_locality_providers = Vec::new();
1162
1163            self.collect_downstream_locality_providers(
1164                provider_fragment_global_id,
1165                &locality_provider_fragments,
1166                &mut visited,
1167                &mut downstream_locality_providers,
1168            );
1169
1170            // This fragment should be processed before all its downstream LocalityProvider fragments
1171            dependencies
1172                .entry(provider_fragment_id)
1173                .or_default()
1174                .extend(downstream_locality_providers);
1175        }
1176
1177        dependencies
1178    }
1179
1180    fn fragment_has_locality_provider(&self, fragment: &BuildingFragment) -> bool {
1181        let mut has_locality_provider = false;
1182
1183        if let Some(node) = fragment.node.as_ref() {
1184            visit_stream_node_cont(node, |stream_node| {
1185                if let Some(NodeBody::LocalityProvider(_)) = stream_node.node_body.as_ref() {
1186                    has_locality_provider = true;
1187                    false // Stop visiting once we find a LocalityProvider
1188                } else {
1189                    true // Continue visiting
1190                }
1191            });
1192        }
1193
1194        has_locality_provider
1195    }
1196
1197    /// Recursively collect downstream `LocalityProvider` fragments
1198    fn collect_downstream_locality_providers(
1199        &self,
1200        current_fragment_id: GlobalFragmentId,
1201        locality_provider_fragments: &HashSet<FragmentId>,
1202        visited: &mut HashSet<GlobalFragmentId>,
1203        downstream_providers: &mut Vec<FragmentId>,
1204    ) {
1205        if visited.contains(&current_fragment_id) {
1206            return;
1207        }
1208        visited.insert(current_fragment_id);
1209
1210        // Check all downstream fragments
1211        for &downstream_id in self.get_downstreams(current_fragment_id).keys() {
1212            let downstream_fragment_id = downstream_id.as_global_id();
1213
1214            // If the downstream fragment is a LocalityProvider, add it to results
1215            if locality_provider_fragments.contains(&downstream_fragment_id) {
1216                downstream_providers.push(downstream_fragment_id);
1217            }
1218
1219            // Recursively check further downstream
1220            self.collect_downstream_locality_providers(
1221                downstream_id,
1222                locality_provider_fragments,
1223                visited,
1224                downstream_providers,
1225            );
1226        }
1227    }
1228}
1229
1230/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
1231/// Return `true` when has change applied.
1232pub fn fill_snapshot_backfill_epoch(
1233    node: &mut StreamNode,
1234    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1235    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1236) -> MetaResult<bool> {
1237    let mut result = Ok(());
1238    let mut applied = false;
1239    visit_stream_node_cont_mut(node, |node| {
1240        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1241            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1242                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1243        {
1244            result = try {
1245                let table_id = TableId::new(stream_scan.table_id);
1246                let snapshot_epoch = cross_db_snapshot_backfill_info
1247                    .upstream_mv_table_id_to_backfill_epoch
1248                    .get(&table_id)
1249                    .or_else(|| {
1250                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1251                            snapshot_backfill_info
1252                                .upstream_mv_table_id_to_backfill_epoch
1253                                .get(&table_id)
1254                        })
1255                    })
1256                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1257                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1258                if let Some(prev_snapshot_epoch) =
1259                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1260                {
1261                    Err(anyhow!(
1262                        "snapshot backfill epoch set again: {} {} {}",
1263                        table_id,
1264                        prev_snapshot_epoch,
1265                        snapshot_epoch
1266                    ))?;
1267                }
1268                applied = true;
1269            };
1270            result.is_ok()
1271        } else {
1272            true
1273        }
1274    });
1275    result.map(|_| applied)
1276}
1277
1278static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1279    LazyLock::new(HashMap::new);
1280
1281/// A fragment that is either being built or already exists. Used for generalize the logic of
1282/// [`crate::stream::ActorGraphBuilder`].
1283#[derive(Debug, Clone, EnumAsInner)]
1284pub(super) enum EitherFragment {
1285    /// An internal fragment that is being built for the current streaming job.
1286    Building(BuildingFragment),
1287
1288    /// An existing fragment that is external but connected to the fragments being built.
1289    Existing(SharedFragmentInfo),
1290}
1291
1292/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
1293/// fragments, which are connected to the graph's top-most or bottom-most fragments.
1294///
1295/// For example,
1296/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
1297///   `Materialize` node will be included in this structure.
1298/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
1299///   containing the `StreamScan` nodes will be included in this structure.
1300#[derive(Debug)]
1301pub struct CompleteStreamFragmentGraph {
1302    /// The fragment graph of the streaming job being built.
1303    building_graph: StreamFragmentGraph,
1304
1305    /// The required information of existing fragments.
1306    existing_fragments: HashMap<GlobalFragmentId, SharedFragmentInfo>,
1307
1308    /// The location of the actors in the existing fragments.
1309    existing_actor_location: HashMap<ActorId, WorkerId>,
1310
1311    /// Extra edges between existing fragments and the building fragments.
1312    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1313
1314    /// Extra edges between existing fragments and the building fragments.
1315    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1316}
1317
1318pub struct FragmentGraphUpstreamContext {
1319    /// Root fragment is the root of upstream stream graph, which can be a
1320    /// mview fragment or source fragment for cdc source job
1321    pub upstream_root_fragments: HashMap<TableId, (SharedFragmentInfo, PbStreamNode)>,
1322    pub upstream_actor_location: HashMap<ActorId, WorkerId>,
1323}
1324
1325pub struct FragmentGraphDownstreamContext {
1326    pub original_root_fragment_id: FragmentId,
1327    pub downstream_fragments: Vec<(DispatcherType, SharedFragmentInfo, PbStreamNode)>,
1328    pub downstream_actor_location: HashMap<ActorId, WorkerId>,
1329}
1330
1331impl CompleteStreamFragmentGraph {
1332    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
1333    /// upstream mviews.
1334    #[cfg(test)]
1335    pub fn for_test(graph: StreamFragmentGraph) -> Self {
1336        Self {
1337            building_graph: graph,
1338            existing_fragments: Default::default(),
1339            existing_actor_location: Default::default(),
1340            extra_downstreams: Default::default(),
1341            extra_upstreams: Default::default(),
1342        }
1343    }
1344
1345    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
1346    /// e.g., MV on MV and CDC/Source Table with the upstream existing
1347    /// `Materialize` or `Source` fragments.
1348    pub fn with_upstreams(
1349        graph: StreamFragmentGraph,
1350        upstream_context: FragmentGraphUpstreamContext,
1351        job_type: StreamingJobType,
1352    ) -> MetaResult<Self> {
1353        Self::build_helper(graph, Some(upstream_context), None, job_type)
1354    }
1355
1356    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
1357    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
1358    pub fn with_downstreams(
1359        graph: StreamFragmentGraph,
1360        downstream_context: FragmentGraphDownstreamContext,
1361        job_type: StreamingJobType,
1362    ) -> MetaResult<Self> {
1363        Self::build_helper(graph, None, Some(downstream_context), job_type)
1364    }
1365
1366    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
1367    pub fn with_upstreams_and_downstreams(
1368        graph: StreamFragmentGraph,
1369        upstream_context: FragmentGraphUpstreamContext,
1370        downstream_context: FragmentGraphDownstreamContext,
1371        job_type: StreamingJobType,
1372    ) -> MetaResult<Self> {
1373        Self::build_helper(
1374            graph,
1375            Some(upstream_context),
1376            Some(downstream_context),
1377            job_type,
1378        )
1379    }
1380
1381    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
1382    fn build_helper(
1383        mut graph: StreamFragmentGraph,
1384        upstream_ctx: Option<FragmentGraphUpstreamContext>,
1385        downstream_ctx: Option<FragmentGraphDownstreamContext>,
1386        job_type: StreamingJobType,
1387    ) -> MetaResult<Self> {
1388        let mut extra_downstreams = HashMap::new();
1389        let mut extra_upstreams = HashMap::new();
1390        let mut existing_fragments = HashMap::new();
1391
1392        let mut existing_actor_location = HashMap::new();
1393
1394        if let Some(FragmentGraphUpstreamContext {
1395            upstream_root_fragments,
1396            upstream_actor_location,
1397        }) = upstream_ctx
1398        {
1399            for (&id, fragment) in &mut graph.fragments {
1400                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1401
1402                for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
1403                    let (upstream_fragment, nodes) = upstream_root_fragments
1404                        .get(&upstream_table_id)
1405                        .context("upstream fragment not found")?;
1406                    let upstream_root_fragment_id =
1407                        GlobalFragmentId::new(upstream_fragment.fragment_id);
1408
1409                    let edge = match job_type {
1410                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1411                            // we traverse all fragments in the graph, and we should find out the
1412                            // CdcFilter fragment and add an edge between upstream source fragment and it.
1413                            assert_ne!(
1414                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1415                                0
1416                            );
1417
1418                            tracing::debug!(
1419                                ?upstream_root_fragment_id,
1420                                ?required_columns,
1421                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
1422                                current_frag_id=?id,
1423                                "CdcFilter with upstream source fragment"
1424                            );
1425
1426                            StreamFragmentEdge {
1427                                id: EdgeId::UpstreamExternal {
1428                                    upstream_table_id,
1429                                    downstream_fragment_id: id,
1430                                },
1431                                // We always use `NoShuffle` for the exchange between the upstream
1432                                // `Source` and the downstream `StreamScan` of the new cdc table.
1433                                dispatch_strategy: DispatchStrategy {
1434                                    r#type: DispatcherType::NoShuffle as _,
1435                                    dist_key_indices: vec![], // not used for `NoShuffle`
1436                                    output_mapping: DispatchOutputMapping::identical(
1437                                        CDC_SOURCE_COLUMN_NUM as _,
1438                                    )
1439                                    .into(),
1440                                },
1441                            }
1442                        }
1443
1444                        // handle MV on MV/Source
1445                        StreamingJobType::MaterializedView
1446                        | StreamingJobType::Sink
1447                        | StreamingJobType::Index => {
1448                            // Build the extra edges between the upstream `Materialize` and
1449                            // the downstream `StreamScan` of the new job.
1450                            if upstream_fragment
1451                                .fragment_type_mask
1452                                .contains(FragmentTypeFlag::Mview)
1453                            {
1454                                // Resolve the required output columns from the upstream materialized view.
1455                                let (dist_key_indices, output_mapping) = {
1456                                    let mview_node =
1457                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1458                                    let all_columns = mview_node.column_descs();
1459                                    let dist_key_indices = mview_node.dist_key_indices();
1460                                    let output_mapping = gen_output_mapping(
1461                                        required_columns,
1462                                        &all_columns,
1463                                    )
1464                                    .context(
1465                                        "BUG: column not found in the upstream materialized view",
1466                                    )?;
1467                                    (dist_key_indices, output_mapping)
1468                                };
1469                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1470                                    uses_shuffled_backfill,
1471                                    dist_key_indices,
1472                                    output_mapping,
1473                                );
1474
1475                                StreamFragmentEdge {
1476                                    id: EdgeId::UpstreamExternal {
1477                                        upstream_table_id,
1478                                        downstream_fragment_id: id,
1479                                    },
1480                                    dispatch_strategy,
1481                                }
1482                            }
1483                            // Build the extra edges between the upstream `Source` and
1484                            // the downstream `SourceBackfill` of the new job.
1485                            else if upstream_fragment
1486                                .fragment_type_mask
1487                                .contains(FragmentTypeFlag::Source)
1488                            {
1489                                let output_mapping = {
1490                                    let source_node =
1491                                        nodes.get_node_body().unwrap().as_source().unwrap();
1492
1493                                    let all_columns = source_node.column_descs().unwrap();
1494                                    gen_output_mapping(required_columns, &all_columns).context(
1495                                        "BUG: column not found in the upstream source node",
1496                                    )?
1497                                };
1498
1499                                StreamFragmentEdge {
1500                                    id: EdgeId::UpstreamExternal {
1501                                        upstream_table_id,
1502                                        downstream_fragment_id: id,
1503                                    },
1504                                    // We always use `NoShuffle` for the exchange between the upstream
1505                                    // `Source` and the downstream `StreamScan` of the new MV.
1506                                    dispatch_strategy: DispatchStrategy {
1507                                        r#type: DispatcherType::NoShuffle as _,
1508                                        dist_key_indices: vec![], // not used for `NoShuffle`
1509                                        output_mapping: Some(output_mapping),
1510                                    },
1511                                }
1512                            } else {
1513                                bail!(
1514                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1515                                    upstream_fragment.fragment_type_mask
1516                                )
1517                            }
1518                        }
1519                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1520                            bail!(
1521                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1522                                job_type
1523                            )
1524                        }
1525                    };
1526
1527                    // put the edge into the extra edges
1528                    extra_downstreams
1529                        .entry(upstream_root_fragment_id)
1530                        .or_insert_with(HashMap::new)
1531                        .try_insert(id, edge.clone())
1532                        .unwrap();
1533                    extra_upstreams
1534                        .entry(id)
1535                        .or_insert_with(HashMap::new)
1536                        .try_insert(upstream_root_fragment_id, edge)
1537                        .unwrap();
1538                }
1539            }
1540
1541            existing_fragments.extend(
1542                upstream_root_fragments
1543                    .into_values()
1544                    .map(|(f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1545            );
1546
1547            existing_actor_location.extend(upstream_actor_location);
1548        }
1549
1550        if let Some(FragmentGraphDownstreamContext {
1551            original_root_fragment_id,
1552            downstream_fragments,
1553            downstream_actor_location,
1554        }) = downstream_ctx
1555        {
1556            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1557            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1558
1559            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1560            // existing materialized views.
1561            for (dispatcher_type, fragment, nodes) in &downstream_fragments {
1562                let id = GlobalFragmentId::new(fragment.fragment_id);
1563
1564                // Similar to `extract_upstream_table_columns_except_cross_db_backfill`.
1565                let output_columns = {
1566                    let mut res = None;
1567
1568                    stream_graph_visitor::visit_stream_node_body(nodes, |node_body| {
1569                        let columns = match node_body {
1570                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1571                            NodeBody::SourceBackfill(source_backfill) => {
1572                                // FIXME: only pass required columns instead of all columns here
1573                                source_backfill.column_descs()
1574                            }
1575                            _ => return,
1576                        };
1577                        res = Some(columns);
1578                    });
1579
1580                    res.context("failed to locate downstream scan")?
1581                };
1582
1583                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1584                let nodes = table_fragment.node.as_ref().unwrap();
1585
1586                let (dist_key_indices, output_mapping) = match job_type {
1587                    StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1588                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1589                        let all_columns = mview_node.column_descs();
1590                        let dist_key_indices = mview_node.dist_key_indices();
1591                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1592                            .ok_or_else(|| {
1593                                MetaError::invalid_parameter(
1594                                    "unable to drop the column due to \
1595                                     being referenced by downstream materialized views or sinks",
1596                                )
1597                            })?;
1598                        (dist_key_indices, output_mapping)
1599                    }
1600
1601                    StreamingJobType::Source => {
1602                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1603                        let all_columns = source_node.column_descs().unwrap();
1604                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1605                            .ok_or_else(|| {
1606                                MetaError::invalid_parameter(
1607                                    "unable to drop the column due to \
1608                                     being referenced by downstream materialized views or sinks",
1609                                )
1610                            })?;
1611                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1612                        (
1613                            vec![], // not used for `NoShuffle`
1614                            output_mapping,
1615                        )
1616                    }
1617
1618                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1619                };
1620
1621                let edge = StreamFragmentEdge {
1622                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1623                        original_upstream_fragment_id: original_table_fragment_id,
1624                        downstream_fragment_id: id,
1625                    }),
1626                    dispatch_strategy: DispatchStrategy {
1627                        r#type: *dispatcher_type as i32,
1628                        output_mapping: Some(output_mapping),
1629                        dist_key_indices,
1630                    },
1631                };
1632
1633                extra_downstreams
1634                    .entry(table_fragment_id)
1635                    .or_insert_with(HashMap::new)
1636                    .try_insert(id, edge.clone())
1637                    .unwrap();
1638                extra_upstreams
1639                    .entry(id)
1640                    .or_insert_with(HashMap::new)
1641                    .try_insert(table_fragment_id, edge)
1642                    .unwrap();
1643            }
1644
1645            existing_fragments.extend(
1646                downstream_fragments
1647                    .into_iter()
1648                    .map(|(_, f, _)| (GlobalFragmentId::new(f.fragment_id), f)),
1649            );
1650
1651            existing_actor_location.extend(downstream_actor_location);
1652        }
1653
1654        Ok(Self {
1655            building_graph: graph,
1656            existing_fragments,
1657            existing_actor_location,
1658            extra_downstreams,
1659            extra_upstreams,
1660        })
1661    }
1662}
1663
1664/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1665fn gen_output_mapping(
1666    required_columns: &[PbColumnDesc],
1667    upstream_columns: &[PbColumnDesc],
1668) -> Option<DispatchOutputMapping> {
1669    let len = required_columns.len();
1670    let mut indices = vec![0; len];
1671    let mut types = None;
1672
1673    for (i, r) in required_columns.iter().enumerate() {
1674        let (ui, u) = upstream_columns
1675            .iter()
1676            .find_position(|&u| u.column_id == r.column_id)?;
1677        indices[i] = ui as u32;
1678
1679        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1680        // non-empty `types`.
1681        if u.column_type != r.column_type {
1682            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1683                upstream: u.column_type.clone(),
1684                downstream: r.column_type.clone(),
1685            };
1686        }
1687    }
1688
1689    // If there's no type change, indicate it by empty `types`.
1690    let types = types.unwrap_or(Vec::new());
1691
1692    Some(DispatchOutputMapping { indices, types })
1693}
1694
1695fn mv_on_mv_dispatch_strategy(
1696    uses_shuffled_backfill: bool,
1697    dist_key_indices: Vec<u32>,
1698    output_mapping: DispatchOutputMapping,
1699) -> DispatchStrategy {
1700    if uses_shuffled_backfill {
1701        if !dist_key_indices.is_empty() {
1702            DispatchStrategy {
1703                r#type: DispatcherType::Hash as _,
1704                dist_key_indices,
1705                output_mapping: Some(output_mapping),
1706            }
1707        } else {
1708            DispatchStrategy {
1709                r#type: DispatcherType::Simple as _,
1710                dist_key_indices: vec![], // empty for Simple
1711                output_mapping: Some(output_mapping),
1712            }
1713        }
1714    } else {
1715        DispatchStrategy {
1716            r#type: DispatcherType::NoShuffle as _,
1717            dist_key_indices: vec![], // not used for `NoShuffle`
1718            output_mapping: Some(output_mapping),
1719        }
1720    }
1721}
1722
1723impl CompleteStreamFragmentGraph {
1724    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1725    /// building graph.
1726    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1727        self.building_graph
1728            .fragments
1729            .keys()
1730            .chain(self.existing_fragments.keys())
1731            .copied()
1732    }
1733
1734    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1735    pub(super) fn all_edges(
1736        &self,
1737    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1738        self.building_graph
1739            .downstreams
1740            .iter()
1741            .chain(self.extra_downstreams.iter())
1742            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1743    }
1744
1745    /// Returns the distribution of the existing fragments.
1746    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1747        self.existing_fragments
1748            .iter()
1749            .map(|(&id, f)| {
1750                (
1751                    id,
1752                    Distribution::from_fragment(f, &self.existing_actor_location),
1753                )
1754            })
1755            .collect()
1756    }
1757
1758    /// Generate topological order of **all** fragments in this graph, including the ones that are
1759    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1760    /// not be done.
1761    ///
1762    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1763    /// `Sink` / `Materialize` in stream graph.
1764    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1765        let mut topo = Vec::new();
1766        let mut downstream_cnts = HashMap::new();
1767
1768        // Iterate all fragments.
1769        for fragment_id in self.all_fragment_ids() {
1770            // Count how many downstreams we have for a given fragment.
1771            let downstream_cnt = self.get_downstreams(fragment_id).count();
1772            if downstream_cnt == 0 {
1773                topo.push(fragment_id);
1774            } else {
1775                downstream_cnts.insert(fragment_id, downstream_cnt);
1776            }
1777        }
1778
1779        let mut i = 0;
1780        while let Some(&fragment_id) = topo.get(i) {
1781            i += 1;
1782            // Find if we can process more fragments.
1783            for (upstream_id, _) in self.get_upstreams(fragment_id) {
1784                let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1785                *downstream_cnt -= 1;
1786                if *downstream_cnt == 0 {
1787                    downstream_cnts.remove(&upstream_id);
1788                    topo.push(upstream_id);
1789                }
1790            }
1791        }
1792
1793        if !downstream_cnts.is_empty() {
1794            // There are fragments that are not processed yet.
1795            bail!("graph is not a DAG");
1796        }
1797
1798        Ok(topo)
1799    }
1800
1801    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1802    /// to build actors on the compute nodes and persist into meta store.
1803    pub(super) fn seal_fragment(
1804        &self,
1805        id: GlobalFragmentId,
1806        actors: Vec<StreamActor>,
1807        distribution: Distribution,
1808        stream_node: StreamNode,
1809    ) -> Fragment {
1810        let building_fragment = self.get_fragment(id).into_building().unwrap();
1811        let internal_tables = building_fragment.extract_internal_tables();
1812        let BuildingFragment {
1813            inner,
1814            job_id,
1815            upstream_table_columns: _,
1816        } = building_fragment;
1817
1818        let distribution_type = distribution.to_distribution_type();
1819        let vnode_count = distribution.vnode_count();
1820
1821        let materialized_fragment_id =
1822            if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1823                job_id
1824            } else {
1825                None
1826            };
1827
1828        let vector_index_fragment_id =
1829            if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1830                job_id
1831            } else {
1832                None
1833            };
1834
1835        let state_table_ids = internal_tables
1836            .iter()
1837            .map(|t| t.id)
1838            .chain(materialized_fragment_id)
1839            .chain(vector_index_fragment_id)
1840            .collect();
1841
1842        Fragment {
1843            fragment_id: inner.fragment_id,
1844            fragment_type_mask: inner.fragment_type_mask.into(),
1845            distribution_type,
1846            actors,
1847            state_table_ids,
1848            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1849            nodes: stream_node,
1850        }
1851    }
1852
1853    /// Get a fragment from the complete graph, which can be either a building fragment or an
1854    /// existing fragment.
1855    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1856        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1857            EitherFragment::Existing(fragment.clone())
1858        } else {
1859            EitherFragment::Building(
1860                self.building_graph
1861                    .fragments
1862                    .get(&fragment_id)
1863                    .unwrap()
1864                    .clone(),
1865            )
1866        }
1867    }
1868
1869    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1870    /// graph.
1871    pub(super) fn get_downstreams(
1872        &self,
1873        fragment_id: GlobalFragmentId,
1874    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1875        self.building_graph
1876            .get_downstreams(fragment_id)
1877            .iter()
1878            .chain(
1879                self.extra_downstreams
1880                    .get(&fragment_id)
1881                    .into_iter()
1882                    .flatten(),
1883            )
1884            .map(|(&id, edge)| (id, edge))
1885    }
1886
1887    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1888    /// graph.
1889    pub(super) fn get_upstreams(
1890        &self,
1891        fragment_id: GlobalFragmentId,
1892    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1893        self.building_graph
1894            .get_upstreams(fragment_id)
1895            .iter()
1896            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1897            .map(|(&id, edge)| (id, edge))
1898    }
1899
1900    /// Returns all building fragments in the graph.
1901    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1902        &self.building_graph.fragments
1903    }
1904
1905    /// Returns all building fragments in the graph, mutable.
1906    pub(super) fn building_fragments_mut(
1907        &mut self,
1908    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1909        &mut self.building_graph.fragments
1910    }
1911
1912    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1913    pub(super) fn max_parallelism(&self) -> usize {
1914        self.building_graph.max_parallelism()
1915    }
1916}