risingwave_meta/stream/stream_graph/
fragment.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17use std::ops::{Deref, DerefMut};
18use std::sync::LazyLock;
19
20use anyhow::{Context, anyhow};
21use enum_as_inner::EnumAsInner;
22use itertools::Itertools;
23use risingwave_common::bail;
24use risingwave_common::catalog::{
25    CDC_SOURCE_COLUMN_NUM, ColumnCatalog, Field, FragmentTypeFlag, FragmentTypeMask, TableId,
26    generate_internal_table_name_with_type,
27};
28use risingwave_common::hash::VnodeCount;
29use risingwave_common::util::iter_util::ZipEqFast;
30use risingwave_common::util::stream_graph_visitor::{
31    self, visit_stream_node_cont, visit_stream_node_cont_mut,
32};
33use risingwave_connector::sink::catalog::SinkType;
34use risingwave_meta_model::WorkerId;
35use risingwave_pb::catalog::{PbSink, PbTable, Table};
36use risingwave_pb::ddl_service::TableJobType;
37use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc};
38use risingwave_pb::stream_plan::dispatch_output_mapping::TypePair;
39use risingwave_pb::stream_plan::stream_fragment_graph::{
40    Parallelism, StreamFragment, StreamFragmentEdge as StreamFragmentEdgeProto,
41};
42use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
43use risingwave_pb::stream_plan::{
44    BackfillOrder, DispatchOutputMapping, DispatchStrategy, DispatcherType, PbStreamNode,
45    PbStreamScanType, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanNode,
46    StreamScanType,
47};
48
49use crate::barrier::SnapshotBackfillInfo;
50use crate::controller::id::IdGeneratorManager;
51use crate::manager::{MetaSrvEnv, StreamingJob, StreamingJobType};
52use crate::model::{ActorId, Fragment, FragmentId, StreamActor};
53use crate::stream::stream_graph::id::{
54    GlobalActorIdGen, GlobalFragmentId, GlobalFragmentIdGen, GlobalTableIdGen,
55};
56use crate::stream::stream_graph::schedule::Distribution;
57use crate::{MetaError, MetaResult};
58
59/// The fragment in the building phase, including the [`StreamFragment`] from the frontend and
60/// several additional helper fields.
61#[derive(Debug, Clone)]
62pub(super) struct BuildingFragment {
63    /// The fragment structure from the frontend, with the global fragment ID.
64    inner: StreamFragment,
65
66    /// The ID of the job if it contains the streaming job node.
67    job_id: Option<u32>,
68
69    /// The required column IDs of each upstream table.
70    /// Will be converted to indices when building the edge connected to the upstream.
71    ///
72    /// For shared CDC table on source, its `vec![]`, since the upstream source's output schema is fixed.
73    upstream_table_columns: HashMap<TableId, Vec<PbColumnDesc>>,
74}
75
76impl BuildingFragment {
77    /// Create a new [`BuildingFragment`] from a [`StreamFragment`]. The global fragment ID and
78    /// global table IDs will be correctly filled with the given `id` and `table_id_gen`.
79    fn new(
80        id: GlobalFragmentId,
81        fragment: StreamFragment,
82        job: &StreamingJob,
83        table_id_gen: GlobalTableIdGen,
84    ) -> Self {
85        let mut fragment = StreamFragment {
86            fragment_id: id.as_global_id(),
87            ..fragment
88        };
89
90        // Fill the information of the internal tables in the fragment.
91        Self::fill_internal_tables(&mut fragment, job, table_id_gen);
92
93        let job_id = Self::fill_job(&mut fragment, job).then(|| job.id());
94        let upstream_table_columns =
95            Self::extract_upstream_table_columns_except_cross_db_backfill(&fragment);
96
97        Self {
98            inner: fragment,
99            job_id,
100            upstream_table_columns,
101        }
102    }
103
104    /// Extract the internal tables from the fragment.
105    fn extract_internal_tables(&self) -> Vec<Table> {
106        let mut fragment = self.inner.to_owned();
107        let mut tables = Vec::new();
108        stream_graph_visitor::visit_internal_tables(&mut fragment, |table, _| {
109            tables.push(table.clone());
110        });
111        tables
112    }
113
114    /// Fill the information with the internal tables in the fragment.
115    fn fill_internal_tables(
116        fragment: &mut StreamFragment,
117        job: &StreamingJob,
118        table_id_gen: GlobalTableIdGen,
119    ) {
120        let fragment_id = fragment.fragment_id;
121        stream_graph_visitor::visit_internal_tables(fragment, |table, table_type_name| {
122            table.id = table_id_gen.to_global_id(table.id).as_global_id();
123            table.schema_id = job.schema_id();
124            table.database_id = job.database_id();
125            table.name = generate_internal_table_name_with_type(
126                &job.name(),
127                fragment_id,
128                table.id,
129                table_type_name,
130            );
131            table.fragment_id = fragment_id;
132            table.owner = job.owner();
133            table.job_id = Some(job.id());
134        });
135    }
136
137    /// Fill the information with the job in the fragment.
138    fn fill_job(fragment: &mut StreamFragment, job: &StreamingJob) -> bool {
139        let job_id = job.id();
140        let fragment_id = fragment.fragment_id;
141        let mut has_job = false;
142
143        stream_graph_visitor::visit_fragment_mut(fragment, |node_body| match node_body {
144            NodeBody::Materialize(materialize_node) => {
145                materialize_node.table_id = job_id;
146
147                // Fill the table field of `MaterializeNode` from the job.
148                let table = materialize_node.table.insert(job.table().unwrap().clone());
149                table.fragment_id = fragment_id; // this will later be synced back to `job.table` with `set_info_from_graph`
150                // In production, do not include full definition in the table in plan node.
151                if cfg!(not(debug_assertions)) {
152                    table.definition = job.name();
153                }
154
155                has_job = true;
156            }
157            NodeBody::Sink(sink_node) => {
158                sink_node.sink_desc.as_mut().unwrap().id = job_id;
159
160                has_job = true;
161            }
162            NodeBody::Dml(dml_node) => {
163                dml_node.table_id = job_id;
164                dml_node.table_version_id = job.table_version_id().unwrap();
165            }
166            NodeBody::StreamFsFetch(fs_fetch_node) => {
167                if let StreamingJob::Table(table_source, _, _) = job
168                    && let Some(node_inner) = fs_fetch_node.node_inner.as_mut()
169                    && let Some(source) = table_source
170                {
171                    node_inner.source_id = source.id;
172                }
173            }
174            NodeBody::Source(source_node) => {
175                match job {
176                    // Note: For table without connector, it has a dummy Source node.
177                    // Note: For table with connector, it's source node has a source id different with the table id (job id), assigned in create_job_catalog.
178                    StreamingJob::Table(source, _table, _table_job_type) => {
179                        if let Some(source_inner) = source_node.source_inner.as_mut()
180                            && let Some(source) = source
181                        {
182                            debug_assert_ne!(source.id, job_id);
183                            source_inner.source_id = source.id;
184                        }
185                    }
186                    StreamingJob::Source(source) => {
187                        has_job = true;
188                        if let Some(source_inner) = source_node.source_inner.as_mut() {
189                            debug_assert_eq!(source.id, job_id);
190                            source_inner.source_id = source.id;
191                        }
192                    }
193                    // For other job types, no need to fill the source id, since it refers to an existing source.
194                    _ => {}
195                }
196            }
197            NodeBody::StreamCdcScan(node) => {
198                if let Some(table_desc) = node.cdc_table_desc.as_mut() {
199                    table_desc.table_id = job_id;
200                }
201            }
202            NodeBody::VectorIndexWrite(node) => {
203                let table = node.table.as_mut().unwrap();
204                table.id = job_id;
205                table.database_id = job.database_id();
206                table.schema_id = job.schema_id();
207                table.fragment_id = fragment_id;
208                #[cfg(not(debug_assertions))]
209                {
210                    table.definition = job.name();
211                }
212
213                has_job = true;
214            }
215            _ => {}
216        });
217
218        has_job
219    }
220
221    /// Extract the required columns of each upstream table except for cross-db backfill.
222    fn extract_upstream_table_columns_except_cross_db_backfill(
223        fragment: &StreamFragment,
224    ) -> HashMap<TableId, Vec<PbColumnDesc>> {
225        let mut table_columns = HashMap::new();
226
227        stream_graph_visitor::visit_fragment(fragment, |node_body| {
228            let (table_id, column_ids) = match node_body {
229                NodeBody::StreamScan(stream_scan) => {
230                    if stream_scan.get_stream_scan_type().unwrap()
231                        == StreamScanType::CrossDbSnapshotBackfill
232                    {
233                        return;
234                    }
235                    (stream_scan.table_id.into(), stream_scan.upstream_columns())
236                }
237                NodeBody::CdcFilter(cdc_filter) => (cdc_filter.upstream_source_id.into(), vec![]),
238                NodeBody::SourceBackfill(backfill) => (
239                    backfill.upstream_source_id.into(),
240                    // FIXME: only pass required columns instead of all columns here
241                    backfill.column_descs(),
242                ),
243                _ => return,
244            };
245            table_columns
246                .try_insert(table_id, column_ids)
247                .expect("currently there should be no two same upstream tables in a fragment");
248        });
249
250        table_columns
251    }
252
253    pub fn has_shuffled_backfill(&self) -> bool {
254        let stream_node = match self.inner.node.as_ref() {
255            Some(node) => node,
256            _ => return false,
257        };
258        let mut has_shuffled_backfill = false;
259        let has_shuffled_backfill_mut_ref = &mut has_shuffled_backfill;
260        visit_stream_node_cont(stream_node, |node| {
261            let is_shuffled_backfill = if let Some(node) = &node.node_body
262                && let Some(node) = node.as_stream_scan()
263            {
264                node.stream_scan_type == StreamScanType::ArrangementBackfill as i32
265                    || node.stream_scan_type == StreamScanType::SnapshotBackfill as i32
266            } else {
267                false
268            };
269            if is_shuffled_backfill {
270                *has_shuffled_backfill_mut_ref = true;
271                false
272            } else {
273                true
274            }
275        });
276        has_shuffled_backfill
277    }
278}
279
280impl Deref for BuildingFragment {
281    type Target = StreamFragment;
282
283    fn deref(&self) -> &Self::Target {
284        &self.inner
285    }
286}
287
288impl DerefMut for BuildingFragment {
289    fn deref_mut(&mut self) -> &mut Self::Target {
290        &mut self.inner
291    }
292}
293
294/// The ID of an edge in the fragment graph. For different types of edges, the ID will be in
295/// different variants.
296#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
297pub(super) enum EdgeId {
298    /// The edge between two building (internal) fragments.
299    Internal {
300        /// The ID generated by the frontend, generally the operator ID of `Exchange`.
301        /// See [`StreamFragmentEdgeProto`].
302        link_id: u64,
303    },
304
305    /// The edge between an upstream external fragment and downstream building fragment. Used for
306    /// MV on MV.
307    UpstreamExternal {
308        /// The ID of the upstream table or materialized view.
309        upstream_table_id: TableId,
310        /// The ID of the downstream fragment.
311        downstream_fragment_id: GlobalFragmentId,
312    },
313
314    /// The edge between an upstream building fragment and downstream external fragment. Used for
315    /// schema change (replace table plan).
316    DownstreamExternal(DownstreamExternalEdgeId),
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
320pub(super) struct DownstreamExternalEdgeId {
321    /// The ID of the original upstream fragment (`Materialize`).
322    pub(super) original_upstream_fragment_id: GlobalFragmentId,
323    /// The ID of the downstream fragment.
324    pub(super) downstream_fragment_id: GlobalFragmentId,
325}
326
327/// The edge in the fragment graph.
328///
329/// The edge can be either internal or external. This is distinguished by the [`EdgeId`].
330#[derive(Debug, Clone)]
331pub(super) struct StreamFragmentEdge {
332    /// The ID of the edge.
333    pub id: EdgeId,
334
335    /// The strategy used for dispatching the data.
336    pub dispatch_strategy: DispatchStrategy,
337}
338
339impl StreamFragmentEdge {
340    fn from_protobuf(edge: &StreamFragmentEdgeProto) -> Self {
341        Self {
342            // By creating an edge from the protobuf, we know that the edge is from the frontend and
343            // is internal.
344            id: EdgeId::Internal {
345                link_id: edge.link_id,
346            },
347            dispatch_strategy: edge.get_dispatch_strategy().unwrap().clone(),
348        }
349    }
350}
351
352fn clone_fragment(fragment: &Fragment, id_generator_manager: &IdGeneratorManager) -> Fragment {
353    let fragment_id = GlobalFragmentIdGen::new(id_generator_manager, 1)
354        .to_global_id(0)
355        .as_global_id();
356    let actor_id_gen = GlobalActorIdGen::new(id_generator_manager, fragment.actors.len() as _);
357    Fragment {
358        fragment_id,
359        fragment_type_mask: fragment.fragment_type_mask,
360        distribution_type: fragment.distribution_type,
361        actors: fragment
362            .actors
363            .iter()
364            .enumerate()
365            .map(|(i, actor)| StreamActor {
366                actor_id: actor_id_gen.to_global_id(i as _).as_global_id() as _,
367                fragment_id,
368                vnode_bitmap: actor.vnode_bitmap.clone(),
369                mview_definition: actor.mview_definition.clone(),
370                expr_context: actor.expr_context.clone(),
371            })
372            .collect(),
373        state_table_ids: fragment.state_table_ids.clone(),
374        maybe_vnode_count: fragment.maybe_vnode_count,
375        nodes: fragment.nodes.clone(),
376    }
377}
378
379pub fn check_sink_fragments_support_refresh_schema(
380    fragments: &BTreeMap<FragmentId, Fragment>,
381) -> MetaResult<()> {
382    if fragments.len() != 1 {
383        return Err(anyhow!(
384            "sink with auto schema change should have only 1 fragment, but got {:?}",
385            fragments.len()
386        )
387        .into());
388    }
389    let (_, fragment) = fragments.first_key_value().expect("non-empty");
390    let sink_node = &fragment.nodes;
391    let PbNodeBody::Sink(_) = sink_node.node_body.as_ref().unwrap() else {
392        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
393    };
394    let [stream_scan_node] = sink_node.input.as_slice() else {
395        panic!("Sink has more than 1 input: {:?}", sink_node.input);
396    };
397    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_ref().unwrap() else {
398        return Err(anyhow!(
399            "expect PbNodeBody::StreamScan but got: {:?}",
400            stream_scan_node.node_body
401        )
402        .into());
403    };
404    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
405    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
406        return Err(anyhow!(
407            "unsupported stream_scan_type for auto refresh schema: {:?}",
408            stream_scan_type
409        )
410        .into());
411    }
412    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_slice() else {
413        panic!(
414            "the number of StreamScan inputs is not 2: {:?}",
415            stream_scan_node.input
416        );
417    };
418    let NodeBody::Merge(_) = merge_node.node_body.as_ref().unwrap() else {
419        return Err(anyhow!(
420            "expect PbNodeBody::Merge but got: {:?}",
421            merge_node.node_body
422        )
423        .into());
424    };
425    Ok(())
426}
427
428pub fn rewrite_refresh_schema_sink_fragment(
429    original_sink_fragment: &Fragment,
430    sink: &PbSink,
431    newly_added_columns: &[ColumnCatalog],
432    upstream_table: &PbTable,
433    upstream_table_fragment_id: FragmentId,
434    id_generator_manager: &IdGeneratorManager,
435) -> MetaResult<(Fragment, Vec<PbColumnCatalog>, Option<PbTable>)> {
436    let mut new_sink_columns = sink.columns.clone();
437    fn extend_sink_columns(
438        sink_columns: &mut Vec<PbColumnCatalog>,
439        new_columns: &[ColumnCatalog],
440        get_column_name: impl Fn(&String) -> String,
441    ) {
442        let next_column_id = sink_columns
443            .iter()
444            .map(|col| col.column_desc.as_ref().unwrap().column_id + 1)
445            .max()
446            .unwrap_or(1);
447        sink_columns.extend(new_columns.iter().enumerate().map(|(i, col)| {
448            let mut col = col.to_protobuf();
449            let column_desc = col.column_desc.as_mut().unwrap();
450            column_desc.column_id = next_column_id + (i as i32);
451            column_desc.name = get_column_name(&column_desc.name);
452            col
453        }));
454    }
455    extend_sink_columns(&mut new_sink_columns, newly_added_columns, |name| {
456        name.clone()
457    });
458
459    let mut new_sink_fragment = clone_fragment(original_sink_fragment, id_generator_manager);
460    let sink_node = &mut new_sink_fragment.nodes;
461    let PbNodeBody::Sink(sink_node_body) = sink_node.node_body.as_mut().unwrap() else {
462        return Err(anyhow!("expect PbNodeBody::Sink but got: {:?}", sink_node.node_body).into());
463    };
464    let [stream_scan_node] = sink_node.input.as_mut_slice() else {
465        panic!("Sink has more than 1 input: {:?}", sink_node.input);
466    };
467    let PbNodeBody::StreamScan(scan) = stream_scan_node.node_body.as_mut().unwrap() else {
468        return Err(anyhow!(
469            "expect PbNodeBody::StreamScan but got: {:?}",
470            stream_scan_node.node_body
471        )
472        .into());
473    };
474    let [merge_node, _batch_plan_node] = stream_scan_node.input.as_mut_slice() else {
475        panic!(
476            "the number of StreamScan inputs is not 2: {:?}",
477            stream_scan_node.input
478        );
479    };
480    let NodeBody::Merge(merge) = merge_node.node_body.as_mut().unwrap() else {
481        return Err(anyhow!(
482            "expect PbNodeBody::Merge but got: {:?}",
483            merge_node.node_body
484        )
485        .into());
486    };
487    // update sink_node
488    // following logic in <StreamSink as Explain>::distill
489    sink_node.identity = {
490        let sink_type = SinkType::from_proto(sink.sink_type());
491        let sink_type_str = if sink_type.is_append_only() {
492            "append-only"
493        } else {
494            "upsert"
495        };
496        let column_names = new_sink_columns
497            .iter()
498            .map(|col| {
499                ColumnCatalog::from(col.clone())
500                    .name_with_hidden()
501                    .to_string()
502            })
503            .join(", ");
504        let downstream_pk = if sink_type.is_upsert() {
505            let downstream_pk = sink
506                .downstream_pk
507                .iter()
508                .map(|i| &sink.columns[*i as usize].column_desc.as_ref().unwrap().name)
509                .collect_vec();
510            format!(", downstream_pk: {downstream_pk:?}")
511        } else {
512            "".to_owned()
513        };
514        format!("StreamSink {{ type: {sink_type_str}, columns: [{column_names}]{downstream_pk} }}")
515    };
516    sink_node
517        .fields
518        .extend(newly_added_columns.iter().map(|col| {
519            Field::new(
520                format!("{}.{}", upstream_table.name, col.column_desc.name),
521                col.data_type().clone(),
522            )
523            .to_prost()
524        }));
525
526    let new_log_store_table = if let Some(log_store_table) = &mut sink_node_body.table {
527        extend_sink_columns(&mut log_store_table.columns, newly_added_columns, |name| {
528            format!("{}_{}", upstream_table.name, name)
529        });
530        Some(log_store_table.clone())
531    } else {
532        None
533    };
534    sink_node_body.sink_desc.as_mut().unwrap().column_catalogs = new_sink_columns.clone();
535
536    // update stream scan node
537    stream_scan_node
538        .fields
539        .extend(newly_added_columns.iter().map(|col| {
540            Field::new(
541                format!("{}.{}", upstream_table.name, col.column_desc.name),
542                col.data_type().clone(),
543            )
544            .to_prost()
545        }));
546    // following logic in <StreamTableScan as Explain>::distill
547    stream_scan_node.identity = {
548        let columns = stream_scan_node
549            .fields
550            .iter()
551            .map(|col| &col.name)
552            .join(", ");
553        format!("StreamTableScan {{ table: t, columns: [{columns}] }}")
554    };
555
556    let stream_scan_type = PbStreamScanType::try_from(scan.stream_scan_type).unwrap();
557    if stream_scan_type != PbStreamScanType::ArrangementBackfill {
558        return Err(anyhow!(
559            "unsupported stream_scan_type for auto refresh schema: {:?}",
560            stream_scan_type
561        )
562        .into());
563    }
564    scan.arrangement_table = Some(upstream_table.clone());
565    scan.output_indices.extend(
566        (0..newly_added_columns.len()).map(|i| (i + scan.upstream_column_ids.len()) as u32),
567    );
568    scan.upstream_column_ids.extend(
569        newly_added_columns
570            .iter()
571            .map(|col| col.column_id().get_id()),
572    );
573    let table_desc = scan.table_desc.as_mut().unwrap();
574    table_desc
575        .value_indices
576        .extend((0..newly_added_columns.len()).map(|i| (i + table_desc.columns.len()) as u32));
577    table_desc.columns.extend(
578        newly_added_columns
579            .iter()
580            .map(|col| col.column_desc.to_protobuf()),
581    );
582
583    // update merge node
584    merge_node.fields = scan
585        .upstream_column_ids
586        .iter()
587        .map(|&column_id| {
588            let col = upstream_table
589                .columns
590                .iter()
591                .find(|c| c.column_desc.as_ref().unwrap().column_id == column_id)
592                .unwrap();
593            let col_desc = col.column_desc.as_ref().unwrap();
594            Field::new(
595                col_desc.name.clone(),
596                col_desc.column_type.as_ref().unwrap().into(),
597            )
598            .to_prost()
599        })
600        .collect();
601    merge.upstream_fragment_id = upstream_table_fragment_id;
602    Ok((new_sink_fragment, new_sink_columns, new_log_store_table))
603}
604
605/// Adjacency list (G) of backfill orders.
606/// `G[10] -> [1, 2, 11]`
607/// means for the backfill node in `fragment 10`
608/// should be backfilled before the backfill nodes in `fragment 1, 2 and 11`.
609pub type FragmentBackfillOrder = HashMap<FragmentId, Vec<FragmentId>>;
610
611/// In-memory representation of a **Fragment** Graph, built from the [`StreamFragmentGraphProto`]
612/// from the frontend.
613///
614/// This only includes nodes and edges of the current job itself. It will be converted to [`CompleteStreamFragmentGraph`] later,
615/// that contains the additional information of pre-existing
616/// fragments, which are connected to the graph's top-most or bottom-most fragments.
617#[derive(Default, Debug)]
618pub struct StreamFragmentGraph {
619    /// stores all the fragments in the graph.
620    pub(super) fragments: HashMap<GlobalFragmentId, BuildingFragment>,
621
622    /// stores edges between fragments: upstream => downstream.
623    pub(super) downstreams:
624        HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
625
626    /// stores edges between fragments: downstream -> upstream.
627    pub(super) upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
628
629    /// Dependent relations of this job.
630    dependent_table_ids: HashSet<TableId>,
631
632    /// The default parallelism of the job, specified by the `STREAMING_PARALLELISM` session
633    /// variable. If not specified, all active worker slots will be used.
634    specified_parallelism: Option<NonZeroUsize>,
635
636    /// Specified max parallelism, i.e., expected vnode count for the graph.
637    ///
638    /// The scheduler on the meta service will use this as a hint to decide the vnode count
639    /// for each fragment.
640    ///
641    /// Note that the actual vnode count may be different from this value.
642    /// For example, a no-shuffle exchange between current fragment graph and an existing
643    /// upstream fragment graph requires two fragments to be in the same distribution,
644    /// thus the same vnode count.
645    max_parallelism: usize,
646
647    /// The backfill ordering strategy of the graph.
648    backfill_order: BackfillOrder,
649}
650
651impl StreamFragmentGraph {
652    /// Create a new [`StreamFragmentGraph`] from the given [`StreamFragmentGraphProto`], with all
653    /// global IDs correctly filled.
654    pub fn new(
655        env: &MetaSrvEnv,
656        proto: StreamFragmentGraphProto,
657        job: &StreamingJob,
658    ) -> MetaResult<Self> {
659        let fragment_id_gen =
660            GlobalFragmentIdGen::new(env.id_gen_manager(), proto.fragments.len() as u64);
661        // Note: in SQL backend, the ids generated here are fake and will be overwritten again
662        // with `refill_internal_table_ids` later.
663        // TODO: refactor the code to remove this step.
664        let table_id_gen = GlobalTableIdGen::new(env.id_gen_manager(), proto.table_ids_cnt as u64);
665
666        // Create nodes.
667        let fragments: HashMap<_, _> = proto
668            .fragments
669            .into_iter()
670            .map(|(id, fragment)| {
671                let id = fragment_id_gen.to_global_id(id);
672                let fragment = BuildingFragment::new(id, fragment, job, table_id_gen);
673                (id, fragment)
674            })
675            .collect();
676
677        assert_eq!(
678            fragments
679                .values()
680                .map(|f| f.extract_internal_tables().len() as u32)
681                .sum::<u32>(),
682            proto.table_ids_cnt
683        );
684
685        // Create edges.
686        let mut downstreams = HashMap::new();
687        let mut upstreams = HashMap::new();
688
689        for edge in proto.edges {
690            let upstream_id = fragment_id_gen.to_global_id(edge.upstream_id);
691            let downstream_id = fragment_id_gen.to_global_id(edge.downstream_id);
692            let edge = StreamFragmentEdge::from_protobuf(&edge);
693
694            upstreams
695                .entry(downstream_id)
696                .or_insert_with(HashMap::new)
697                .try_insert(upstream_id, edge.clone())
698                .unwrap();
699            downstreams
700                .entry(upstream_id)
701                .or_insert_with(HashMap::new)
702                .try_insert(downstream_id, edge)
703                .unwrap();
704        }
705
706        // Note: Here we directly use the field `dependent_table_ids` in the proto (resolved in
707        // frontend), instead of visiting the graph ourselves.
708        let dependent_table_ids = proto
709            .dependent_table_ids
710            .iter()
711            .map(TableId::from)
712            .collect();
713
714        let specified_parallelism = if let Some(Parallelism { parallelism }) = proto.parallelism {
715            Some(NonZeroUsize::new(parallelism as usize).context("parallelism should not be 0")?)
716        } else {
717            None
718        };
719
720        let max_parallelism = proto.max_parallelism as usize;
721        let backfill_order = proto.backfill_order.unwrap_or(BackfillOrder {
722            order: Default::default(),
723        });
724
725        Ok(Self {
726            fragments,
727            downstreams,
728            upstreams,
729            dependent_table_ids,
730            specified_parallelism,
731            max_parallelism,
732            backfill_order,
733        })
734    }
735
736    /// Retrieve the **incomplete** internal tables map of the whole graph.
737    ///
738    /// Note that some fields in the table catalogs are not filled during the current phase, e.g.,
739    /// `fragment_id`, `vnode_count`. They will be all filled after a `TableFragments` is built.
740    /// Be careful when using the returned values.
741    pub fn incomplete_internal_tables(&self) -> BTreeMap<u32, Table> {
742        let mut tables = BTreeMap::new();
743        for fragment in self.fragments.values() {
744            for table in fragment.extract_internal_tables() {
745                let table_id = table.id;
746                tables
747                    .try_insert(table_id, table)
748                    .unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
749            }
750        }
751        tables
752    }
753
754    /// Refill the internal tables' `table_id`s according to the given map, typically obtained from
755    /// `create_internal_table_catalog`.
756    pub fn refill_internal_table_ids(&mut self, table_id_map: HashMap<u32, u32>) {
757        for fragment in self.fragments.values_mut() {
758            stream_graph_visitor::visit_internal_tables(
759                &mut fragment.inner,
760                |table, _table_type_name| {
761                    let target = table_id_map.get(&table.id).cloned().unwrap();
762                    table.id = target;
763                },
764            );
765        }
766    }
767
768    /// Use a trivial algorithm to match the internal tables of the new graph for
769    /// `ALTER TABLE` or `ALTER SOURCE`.
770    pub fn fit_internal_tables_trivial(
771        &mut self,
772        mut old_internal_tables: Vec<Table>,
773    ) -> MetaResult<()> {
774        let mut new_internal_table_ids = Vec::new();
775        for fragment in self.fragments.values() {
776            for table in &fragment.extract_internal_tables() {
777                new_internal_table_ids.push(table.id);
778            }
779        }
780
781        if new_internal_table_ids.len() != old_internal_tables.len() {
782            bail!(
783                "Different number of internal tables. New: {}, Old: {}",
784                new_internal_table_ids.len(),
785                old_internal_tables.len()
786            );
787        }
788        old_internal_tables.sort_by(|a, b| a.id.cmp(&b.id));
789        new_internal_table_ids.sort();
790
791        let internal_table_id_map = new_internal_table_ids
792            .into_iter()
793            .zip_eq_fast(old_internal_tables.into_iter())
794            .collect::<HashMap<_, _>>();
795
796        // TODO(alter-mv): unify this with `fit_internal_table_ids_with_mapping` after we
797        // confirm the behavior is the same.
798        for fragment in self.fragments.values_mut() {
799            stream_graph_visitor::visit_internal_tables(
800                &mut fragment.inner,
801                |table, _table_type_name| {
802                    // XXX: this replaces the entire table, instead of just the id!
803                    let target = internal_table_id_map.get(&table.id).cloned().unwrap();
804                    *table = target;
805                },
806            );
807        }
808
809        Ok(())
810    }
811
812    /// Fit the internal tables' `table_id`s according to the given mapping.
813    pub fn fit_internal_table_ids_with_mapping(&mut self, mut matches: HashMap<u32, Table>) {
814        for fragment in self.fragments.values_mut() {
815            stream_graph_visitor::visit_internal_tables(
816                &mut fragment.inner,
817                |table, _table_type_name| {
818                    let target = matches.remove(&table.id).unwrap_or_else(|| {
819                        panic!("no matching table for table {}({})", table.id, table.name)
820                    });
821                    table.id = target.id;
822                    table.maybe_vnode_count = target.maybe_vnode_count;
823                },
824            );
825        }
826    }
827
828    /// Returns the fragment id where the streaming job node located.
829    pub fn table_fragment_id(&self) -> FragmentId {
830        self.fragments
831            .values()
832            .filter(|b| b.job_id.is_some())
833            .map(|b| b.fragment_id)
834            .exactly_one()
835            .expect("require exactly 1 materialize/sink/cdc source node when creating the streaming job")
836    }
837
838    /// Returns the fragment id where the table dml is received.
839    pub fn dml_fragment_id(&self) -> Option<FragmentId> {
840        self.fragments
841            .values()
842            .filter(|b| {
843                FragmentTypeMask::from(b.fragment_type_mask).contains(FragmentTypeFlag::Dml)
844            })
845            .map(|b| b.fragment_id)
846            .at_most_one()
847            .expect("require at most 1 dml node when creating the streaming job")
848    }
849
850    /// Get the dependent streaming job ids of this job.
851    pub fn dependent_table_ids(&self) -> &HashSet<TableId> {
852        &self.dependent_table_ids
853    }
854
855    /// Get the parallelism of the job, if specified by the user.
856    pub fn specified_parallelism(&self) -> Option<NonZeroUsize> {
857        self.specified_parallelism
858    }
859
860    /// Get the expected vnode count of the graph. See documentation of the field for more details.
861    pub fn max_parallelism(&self) -> usize {
862        self.max_parallelism
863    }
864
865    /// Get downstreams of a fragment.
866    fn get_downstreams(
867        &self,
868        fragment_id: GlobalFragmentId,
869    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
870        self.downstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
871    }
872
873    /// Get upstreams of a fragment.
874    fn get_upstreams(
875        &self,
876        fragment_id: GlobalFragmentId,
877    ) -> &HashMap<GlobalFragmentId, StreamFragmentEdge> {
878        self.upstreams.get(&fragment_id).unwrap_or(&EMPTY_HASHMAP)
879    }
880
881    pub fn collect_snapshot_backfill_info(
882        &self,
883    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
884        Self::collect_snapshot_backfill_info_impl(self.fragments.values().map(|fragment| {
885            (
886                fragment.node.as_ref().unwrap(),
887                fragment.fragment_type_mask.into(),
888            )
889        }))
890    }
891
892    /// Returns `Ok((Some(``snapshot_backfill_info``), ``cross_db_snapshot_backfill_info``))`
893    pub fn collect_snapshot_backfill_info_impl(
894        fragments: impl IntoIterator<Item = (&PbStreamNode, FragmentTypeMask)>,
895    ) -> MetaResult<(Option<SnapshotBackfillInfo>, SnapshotBackfillInfo)> {
896        let mut prev_stream_scan: Option<(Option<SnapshotBackfillInfo>, StreamScanNode)> = None;
897        let mut cross_db_info = SnapshotBackfillInfo {
898            upstream_mv_table_id_to_backfill_epoch: Default::default(),
899        };
900        let mut result = Ok(());
901        for (node, fragment_type_mask) in fragments {
902            visit_stream_node_cont(node, |node| {
903                if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_ref() {
904                    let stream_scan_type = StreamScanType::try_from(stream_scan.stream_scan_type)
905                        .expect("invalid stream_scan_type");
906                    let is_snapshot_backfill = match stream_scan_type {
907                        StreamScanType::SnapshotBackfill => {
908                            assert!(
909                                fragment_type_mask
910                                    .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
911                            );
912                            true
913                        }
914                        StreamScanType::CrossDbSnapshotBackfill => {
915                            assert!(
916                                fragment_type_mask
917                                    .contains(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan)
918                            );
919                            cross_db_info.upstream_mv_table_id_to_backfill_epoch.insert(
920                                TableId::new(stream_scan.table_id),
921                                stream_scan.snapshot_backfill_epoch,
922                            );
923
924                            return true;
925                        }
926                        _ => false,
927                    };
928
929                    match &mut prev_stream_scan {
930                        Some((prev_snapshot_backfill_info, prev_stream_scan)) => {
931                            match (prev_snapshot_backfill_info, is_snapshot_backfill) {
932                                (Some(prev_snapshot_backfill_info), true) => {
933                                    prev_snapshot_backfill_info
934                                        .upstream_mv_table_id_to_backfill_epoch
935                                        .insert(
936                                            TableId::new(stream_scan.table_id),
937                                            stream_scan.snapshot_backfill_epoch,
938                                        );
939                                    true
940                                }
941                                (None, false) => true,
942                                (_, _) => {
943                                    result = Err(anyhow!("must be either all snapshot_backfill or no snapshot_backfill. Curr: {stream_scan:?} Prev: {prev_stream_scan:?}").into());
944                                    false
945                                }
946                            }
947                        }
948                        None => {
949                            prev_stream_scan = Some((
950                                if is_snapshot_backfill {
951                                    Some(SnapshotBackfillInfo {
952                                        upstream_mv_table_id_to_backfill_epoch: HashMap::from_iter(
953                                            [(
954                                                TableId::new(stream_scan.table_id),
955                                                stream_scan.snapshot_backfill_epoch,
956                                            )],
957                                        ),
958                                    })
959                                } else {
960                                    None
961                                },
962                                *stream_scan.clone(),
963                            ));
964                            true
965                        }
966                    }
967                } else {
968                    true
969                }
970            })
971        }
972        result.map(|_| {
973            (
974                prev_stream_scan
975                    .map(|(snapshot_backfill_info, _)| snapshot_backfill_info)
976                    .unwrap_or(None),
977                cross_db_info,
978            )
979        })
980    }
981
982    /// Collect the mapping from table / `source_id` -> `fragment_id`
983    pub fn collect_backfill_mapping(&self) -> HashMap<u32, Vec<FragmentId>> {
984        let mut mapping = HashMap::new();
985        for (fragment_id, fragment) in &self.fragments {
986            let fragment_id = fragment_id.as_global_id();
987            let fragment_mask = fragment.fragment_type_mask;
988            let candidates = [FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan];
989            let has_some_scan = candidates
990                .into_iter()
991                .any(|flag| (fragment_mask & flag as u32) > 0);
992            if has_some_scan {
993                visit_stream_node_cont(fragment.node.as_ref().unwrap(), |node| {
994                    match node.node_body.as_ref() {
995                        Some(NodeBody::StreamScan(stream_scan)) => {
996                            let table_id = stream_scan.table_id;
997                            let fragments: &mut Vec<_> = mapping.entry(table_id).or_default();
998                            fragments.push(fragment_id);
999                            // each fragment should have only 1 scan node.
1000                            false
1001                        }
1002                        Some(NodeBody::SourceBackfill(source_backfill)) => {
1003                            let source_id = source_backfill.upstream_source_id;
1004                            let fragments: &mut Vec<_> = mapping.entry(source_id).or_default();
1005                            fragments.push(fragment_id);
1006                            // each fragment should have only 1 scan node.
1007                            false
1008                        }
1009                        _ => true,
1010                    }
1011                })
1012            }
1013        }
1014        mapping
1015    }
1016
1017    /// Initially the mapping that comes from frontend is between `table_ids`.
1018    /// We should remap it to fragment level, since we track progress by actor, and we can get
1019    /// a fragment <-> actor mapping
1020    pub fn create_fragment_backfill_ordering(&self) -> FragmentBackfillOrder {
1021        let mapping = self.collect_backfill_mapping();
1022        let mut fragment_ordering: HashMap<u32, Vec<u32>> = HashMap::new();
1023        for (rel_id, downstream_rel_ids) in &self.backfill_order.order {
1024            let fragment_ids = mapping.get(rel_id).unwrap();
1025            for fragment_id in fragment_ids {
1026                let downstream_fragment_ids = downstream_rel_ids
1027                    .data
1028                    .iter()
1029                    .flat_map(|downstream_rel_id| mapping.get(downstream_rel_id).unwrap().iter())
1030                    .copied()
1031                    .collect();
1032                fragment_ordering.insert(*fragment_id, downstream_fragment_ids);
1033            }
1034        }
1035        fragment_ordering
1036    }
1037}
1038
1039/// Fill snapshot epoch for `StreamScanNode` of `SnapshotBackfill`.
1040/// Return `true` when has change applied.
1041pub fn fill_snapshot_backfill_epoch(
1042    node: &mut StreamNode,
1043    snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1044    cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1045) -> MetaResult<bool> {
1046    let mut result = Ok(());
1047    let mut applied = false;
1048    visit_stream_node_cont_mut(node, |node| {
1049        if let Some(NodeBody::StreamScan(stream_scan)) = node.node_body.as_mut()
1050            && (stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32
1051                || stream_scan.stream_scan_type == StreamScanType::CrossDbSnapshotBackfill as i32)
1052        {
1053            result = try {
1054                let table_id = TableId::new(stream_scan.table_id);
1055                let snapshot_epoch = cross_db_snapshot_backfill_info
1056                    .upstream_mv_table_id_to_backfill_epoch
1057                    .get(&table_id)
1058                    .or_else(|| {
1059                        snapshot_backfill_info.and_then(|snapshot_backfill_info| {
1060                            snapshot_backfill_info
1061                                .upstream_mv_table_id_to_backfill_epoch
1062                                .get(&table_id)
1063                        })
1064                    })
1065                    .ok_or_else(|| anyhow!("upstream table id not covered: {}", table_id))?
1066                    .ok_or_else(|| anyhow!("upstream table id not set: {}", table_id))?;
1067                if let Some(prev_snapshot_epoch) =
1068                    stream_scan.snapshot_backfill_epoch.replace(snapshot_epoch)
1069                {
1070                    Err(anyhow!(
1071                        "snapshot backfill epoch set again: {} {} {}",
1072                        table_id,
1073                        prev_snapshot_epoch,
1074                        snapshot_epoch
1075                    ))?;
1076                }
1077                applied = true;
1078            };
1079            result.is_ok()
1080        } else {
1081            true
1082        }
1083    });
1084    result.map(|_| applied)
1085}
1086
1087static EMPTY_HASHMAP: LazyLock<HashMap<GlobalFragmentId, StreamFragmentEdge>> =
1088    LazyLock::new(HashMap::new);
1089
1090/// A fragment that is either being built or already exists. Used for generalize the logic of
1091/// [`crate::stream::ActorGraphBuilder`].
1092#[derive(Debug, Clone, EnumAsInner)]
1093pub(super) enum EitherFragment {
1094    /// An internal fragment that is being built for the current streaming job.
1095    Building(BuildingFragment),
1096
1097    /// An existing fragment that is external but connected to the fragments being built.
1098    Existing(Fragment),
1099}
1100
1101/// A wrapper of [`StreamFragmentGraph`] that contains the additional information of pre-existing
1102/// fragments, which are connected to the graph's top-most or bottom-most fragments.
1103///
1104/// For example,
1105/// - if we're going to build a mview on an existing mview, the upstream fragment containing the
1106///   `Materialize` node will be included in this structure.
1107/// - if we're going to replace the plan of a table with downstream mviews, the downstream fragments
1108///   containing the `StreamScan` nodes will be included in this structure.
1109#[derive(Debug)]
1110pub struct CompleteStreamFragmentGraph {
1111    /// The fragment graph of the streaming job being built.
1112    building_graph: StreamFragmentGraph,
1113
1114    /// The required information of existing fragments.
1115    existing_fragments: HashMap<GlobalFragmentId, Fragment>,
1116
1117    /// The location of the actors in the existing fragments.
1118    existing_actor_location: HashMap<ActorId, WorkerId>,
1119
1120    /// Extra edges between existing fragments and the building fragments.
1121    extra_downstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1122
1123    /// Extra edges between existing fragments and the building fragments.
1124    extra_upstreams: HashMap<GlobalFragmentId, HashMap<GlobalFragmentId, StreamFragmentEdge>>,
1125}
1126
1127pub struct FragmentGraphUpstreamContext {
1128    /// Root fragment is the root of upstream stream graph, which can be a
1129    /// mview fragment or source fragment for cdc source job
1130    upstream_root_fragments: HashMap<TableId, Fragment>,
1131    upstream_actor_location: HashMap<ActorId, WorkerId>,
1132}
1133
1134pub struct FragmentGraphDownstreamContext {
1135    original_root_fragment_id: FragmentId,
1136    downstream_fragments: Vec<(DispatcherType, Fragment)>,
1137    downstream_actor_location: HashMap<ActorId, WorkerId>,
1138}
1139
1140impl CompleteStreamFragmentGraph {
1141    /// Create a new [`CompleteStreamFragmentGraph`] with empty existing fragments, i.e., there's no
1142    /// upstream mviews.
1143    #[cfg(test)]
1144    pub fn for_test(graph: StreamFragmentGraph) -> Self {
1145        Self {
1146            building_graph: graph,
1147            existing_fragments: Default::default(),
1148            existing_actor_location: Default::default(),
1149            extra_downstreams: Default::default(),
1150            extra_upstreams: Default::default(),
1151        }
1152    }
1153
1154    /// Create a new [`CompleteStreamFragmentGraph`] for newly created job (which has no downstreams).
1155    /// e.g., MV on MV and CDC/Source Table with the upstream existing
1156    /// `Materialize` or `Source` fragments.
1157    pub fn with_upstreams(
1158        graph: StreamFragmentGraph,
1159        upstream_root_fragments: HashMap<TableId, Fragment>,
1160        existing_actor_location: HashMap<ActorId, WorkerId>,
1161        job_type: StreamingJobType,
1162    ) -> MetaResult<Self> {
1163        Self::build_helper(
1164            graph,
1165            Some(FragmentGraphUpstreamContext {
1166                upstream_root_fragments,
1167                upstream_actor_location: existing_actor_location,
1168            }),
1169            None,
1170            job_type,
1171        )
1172    }
1173
1174    /// Create a new [`CompleteStreamFragmentGraph`] for replacing an existing table/source,
1175    /// with the downstream existing `StreamScan`/`StreamSourceScan` fragments.
1176    pub fn with_downstreams(
1177        graph: StreamFragmentGraph,
1178        original_root_fragment_id: FragmentId,
1179        downstream_fragments: Vec<(DispatcherType, Fragment)>,
1180        existing_actor_location: HashMap<ActorId, WorkerId>,
1181        job_type: StreamingJobType,
1182    ) -> MetaResult<Self> {
1183        Self::build_helper(
1184            graph,
1185            None,
1186            Some(FragmentGraphDownstreamContext {
1187                original_root_fragment_id,
1188                downstream_fragments,
1189                downstream_actor_location: existing_actor_location,
1190            }),
1191            job_type,
1192        )
1193    }
1194
1195    /// For replacing an existing table based on shared cdc source, which has both upstreams and downstreams.
1196    pub fn with_upstreams_and_downstreams(
1197        graph: StreamFragmentGraph,
1198        upstream_root_fragments: HashMap<TableId, Fragment>,
1199        upstream_actor_location: HashMap<ActorId, WorkerId>,
1200        original_root_fragment_id: FragmentId,
1201        downstream_fragments: Vec<(DispatcherType, Fragment)>,
1202        downstream_actor_location: HashMap<ActorId, WorkerId>,
1203        job_type: StreamingJobType,
1204    ) -> MetaResult<Self> {
1205        Self::build_helper(
1206            graph,
1207            Some(FragmentGraphUpstreamContext {
1208                upstream_root_fragments,
1209                upstream_actor_location,
1210            }),
1211            Some(FragmentGraphDownstreamContext {
1212                original_root_fragment_id,
1213                downstream_fragments,
1214                downstream_actor_location,
1215            }),
1216            job_type,
1217        )
1218    }
1219
1220    /// The core logic of building a [`CompleteStreamFragmentGraph`], i.e., adding extra upstream/downstream fragments.
1221    fn build_helper(
1222        mut graph: StreamFragmentGraph,
1223        upstream_ctx: Option<FragmentGraphUpstreamContext>,
1224        downstream_ctx: Option<FragmentGraphDownstreamContext>,
1225        job_type: StreamingJobType,
1226    ) -> MetaResult<Self> {
1227        let mut extra_downstreams = HashMap::new();
1228        let mut extra_upstreams = HashMap::new();
1229        let mut existing_fragments = HashMap::new();
1230
1231        let mut existing_actor_location = HashMap::new();
1232
1233        if let Some(FragmentGraphUpstreamContext {
1234            upstream_root_fragments,
1235            upstream_actor_location,
1236        }) = upstream_ctx
1237        {
1238            for (&id, fragment) in &mut graph.fragments {
1239                let uses_shuffled_backfill = fragment.has_shuffled_backfill();
1240
1241                for (&upstream_table_id, required_columns) in &fragment.upstream_table_columns {
1242                    let upstream_fragment = upstream_root_fragments
1243                        .get(&upstream_table_id)
1244                        .context("upstream fragment not found")?;
1245                    let upstream_root_fragment_id =
1246                        GlobalFragmentId::new(upstream_fragment.fragment_id);
1247
1248                    let edge = match job_type {
1249                        StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1250                            // we traverse all fragments in the graph, and we should find out the
1251                            // CdcFilter fragment and add an edge between upstream source fragment and it.
1252                            assert_ne!(
1253                                (fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32),
1254                                0
1255                            );
1256
1257                            tracing::debug!(
1258                                ?upstream_root_fragment_id,
1259                                ?required_columns,
1260                                identity = ?fragment.inner.get_node().unwrap().get_identity(),
1261                                current_frag_id=?id,
1262                                "CdcFilter with upstream source fragment"
1263                            );
1264
1265                            StreamFragmentEdge {
1266                                id: EdgeId::UpstreamExternal {
1267                                    upstream_table_id,
1268                                    downstream_fragment_id: id,
1269                                },
1270                                // We always use `NoShuffle` for the exchange between the upstream
1271                                // `Source` and the downstream `StreamScan` of the new cdc table.
1272                                dispatch_strategy: DispatchStrategy {
1273                                    r#type: DispatcherType::NoShuffle as _,
1274                                    dist_key_indices: vec![], // not used for `NoShuffle`
1275                                    output_mapping: DispatchOutputMapping::identical(
1276                                        CDC_SOURCE_COLUMN_NUM as _,
1277                                    )
1278                                    .into(),
1279                                },
1280                            }
1281                        }
1282
1283                        // handle MV on MV/Source
1284                        StreamingJobType::MaterializedView
1285                        | StreamingJobType::Sink
1286                        | StreamingJobType::Index => {
1287                            // Build the extra edges between the upstream `Materialize` and
1288                            // the downstream `StreamScan` of the new job.
1289                            if upstream_fragment
1290                                .fragment_type_mask
1291                                .contains(FragmentTypeFlag::Mview)
1292                            {
1293                                // Resolve the required output columns from the upstream materialized view.
1294                                let (dist_key_indices, output_mapping) = {
1295                                    let nodes = &upstream_fragment.nodes;
1296                                    let mview_node =
1297                                        nodes.get_node_body().unwrap().as_materialize().unwrap();
1298                                    let all_columns = mview_node.column_descs();
1299                                    let dist_key_indices = mview_node.dist_key_indices();
1300                                    let output_mapping = gen_output_mapping(
1301                                        required_columns,
1302                                        &all_columns,
1303                                    )
1304                                    .context(
1305                                        "BUG: column not found in the upstream materialized view",
1306                                    )?;
1307                                    (dist_key_indices, output_mapping)
1308                                };
1309                                let dispatch_strategy = mv_on_mv_dispatch_strategy(
1310                                    uses_shuffled_backfill,
1311                                    dist_key_indices,
1312                                    output_mapping,
1313                                );
1314
1315                                StreamFragmentEdge {
1316                                    id: EdgeId::UpstreamExternal {
1317                                        upstream_table_id,
1318                                        downstream_fragment_id: id,
1319                                    },
1320                                    dispatch_strategy,
1321                                }
1322                            }
1323                            // Build the extra edges between the upstream `Source` and
1324                            // the downstream `SourceBackfill` of the new job.
1325                            else if upstream_fragment
1326                                .fragment_type_mask
1327                                .contains(FragmentTypeFlag::Source)
1328                            {
1329                                let output_mapping = {
1330                                    let nodes = &upstream_fragment.nodes;
1331                                    let source_node =
1332                                        nodes.get_node_body().unwrap().as_source().unwrap();
1333
1334                                    let all_columns = source_node.column_descs().unwrap();
1335                                    gen_output_mapping(required_columns, &all_columns).context(
1336                                        "BUG: column not found in the upstream source node",
1337                                    )?
1338                                };
1339
1340                                StreamFragmentEdge {
1341                                    id: EdgeId::UpstreamExternal {
1342                                        upstream_table_id,
1343                                        downstream_fragment_id: id,
1344                                    },
1345                                    // We always use `NoShuffle` for the exchange between the upstream
1346                                    // `Source` and the downstream `StreamScan` of the new MV.
1347                                    dispatch_strategy: DispatchStrategy {
1348                                        r#type: DispatcherType::NoShuffle as _,
1349                                        dist_key_indices: vec![], // not used for `NoShuffle`
1350                                        output_mapping: Some(output_mapping),
1351                                    },
1352                                }
1353                            } else {
1354                                bail!(
1355                                    "the upstream fragment should be a MView or Source, got fragment type: {:b}",
1356                                    upstream_fragment.fragment_type_mask
1357                                )
1358                            }
1359                        }
1360                        StreamingJobType::Source | StreamingJobType::Table(_) => {
1361                            bail!(
1362                                "the streaming job shouldn't have an upstream fragment, job_type: {:?}",
1363                                job_type
1364                            )
1365                        }
1366                    };
1367
1368                    // put the edge into the extra edges
1369                    extra_downstreams
1370                        .entry(upstream_root_fragment_id)
1371                        .or_insert_with(HashMap::new)
1372                        .try_insert(id, edge.clone())
1373                        .unwrap();
1374                    extra_upstreams
1375                        .entry(id)
1376                        .or_insert_with(HashMap::new)
1377                        .try_insert(upstream_root_fragment_id, edge)
1378                        .unwrap();
1379                }
1380            }
1381
1382            existing_fragments.extend(
1383                upstream_root_fragments
1384                    .into_values()
1385                    .map(|f| (GlobalFragmentId::new(f.fragment_id), f)),
1386            );
1387
1388            existing_actor_location.extend(upstream_actor_location);
1389        }
1390
1391        if let Some(FragmentGraphDownstreamContext {
1392            original_root_fragment_id,
1393            downstream_fragments,
1394            downstream_actor_location,
1395        }) = downstream_ctx
1396        {
1397            let original_table_fragment_id = GlobalFragmentId::new(original_root_fragment_id);
1398            let table_fragment_id = GlobalFragmentId::new(graph.table_fragment_id());
1399
1400            // Build the extra edges between the `Materialize` and the downstream `StreamScan` of the
1401            // existing materialized views.
1402            for (dispatcher_type, fragment) in &downstream_fragments {
1403                let id = GlobalFragmentId::new(fragment.fragment_id);
1404
1405                // Similar to `extract_upstream_table_columns_except_cross_db_backfill`.
1406                let output_columns = {
1407                    let mut res = None;
1408
1409                    stream_graph_visitor::visit_stream_node_body(&fragment.nodes, |node_body| {
1410                        let columns = match node_body {
1411                            NodeBody::StreamScan(stream_scan) => stream_scan.upstream_columns(),
1412                            NodeBody::SourceBackfill(source_backfill) => {
1413                                // FIXME: only pass required columns instead of all columns here
1414                                source_backfill.column_descs()
1415                            }
1416                            _ => return,
1417                        };
1418                        res = Some(columns);
1419                    });
1420
1421                    res.context("failed to locate downstream scan")?
1422                };
1423
1424                let table_fragment = graph.fragments.get(&table_fragment_id).unwrap();
1425                let nodes = table_fragment.node.as_ref().unwrap();
1426
1427                let (dist_key_indices, output_mapping) = match job_type {
1428                    StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1429                        let mview_node = nodes.get_node_body().unwrap().as_materialize().unwrap();
1430                        let all_columns = mview_node.column_descs();
1431                        let dist_key_indices = mview_node.dist_key_indices();
1432                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1433                            .ok_or_else(|| {
1434                                MetaError::invalid_parameter(
1435                                    "unable to drop the column due to \
1436                                     being referenced by downstream materialized views or sinks",
1437                                )
1438                            })?;
1439                        (dist_key_indices, output_mapping)
1440                    }
1441
1442                    StreamingJobType::Source => {
1443                        let source_node = nodes.get_node_body().unwrap().as_source().unwrap();
1444                        let all_columns = source_node.column_descs().unwrap();
1445                        let output_mapping = gen_output_mapping(&output_columns, &all_columns)
1446                            .ok_or_else(|| {
1447                                MetaError::invalid_parameter(
1448                                    "unable to drop the column due to \
1449                                     being referenced by downstream materialized views or sinks",
1450                                )
1451                            })?;
1452                        assert_eq!(*dispatcher_type, DispatcherType::NoShuffle);
1453                        (
1454                            vec![], // not used for `NoShuffle`
1455                            output_mapping,
1456                        )
1457                    }
1458
1459                    _ => bail!("unsupported job type for replacement: {job_type:?}"),
1460                };
1461
1462                let edge = StreamFragmentEdge {
1463                    id: EdgeId::DownstreamExternal(DownstreamExternalEdgeId {
1464                        original_upstream_fragment_id: original_table_fragment_id,
1465                        downstream_fragment_id: id,
1466                    }),
1467                    dispatch_strategy: DispatchStrategy {
1468                        r#type: *dispatcher_type as i32,
1469                        output_mapping: Some(output_mapping),
1470                        dist_key_indices,
1471                    },
1472                };
1473
1474                extra_downstreams
1475                    .entry(table_fragment_id)
1476                    .or_insert_with(HashMap::new)
1477                    .try_insert(id, edge.clone())
1478                    .unwrap();
1479                extra_upstreams
1480                    .entry(id)
1481                    .or_insert_with(HashMap::new)
1482                    .try_insert(table_fragment_id, edge)
1483                    .unwrap();
1484            }
1485
1486            existing_fragments.extend(
1487                downstream_fragments
1488                    .into_iter()
1489                    .map(|(_, f)| (GlobalFragmentId::new(f.fragment_id), f)),
1490            );
1491
1492            existing_actor_location.extend(downstream_actor_location);
1493        }
1494
1495        Ok(Self {
1496            building_graph: graph,
1497            existing_fragments,
1498            existing_actor_location,
1499            extra_downstreams,
1500            extra_upstreams,
1501        })
1502    }
1503}
1504
1505/// Generate the `output_mapping` for [`DispatchStrategy`] from given columns.
1506fn gen_output_mapping(
1507    required_columns: &[PbColumnDesc],
1508    upstream_columns: &[PbColumnDesc],
1509) -> Option<DispatchOutputMapping> {
1510    let len = required_columns.len();
1511    let mut indices = vec![0; len];
1512    let mut types = None;
1513
1514    for (i, r) in required_columns.iter().enumerate() {
1515        let (ui, u) = upstream_columns
1516            .iter()
1517            .find_position(|&u| u.column_id == r.column_id)?;
1518        indices[i] = ui as u32;
1519
1520        // Only if we encounter type change (`ALTER TABLE ALTER COLUMN TYPE`) will we generate a
1521        // non-empty `types`.
1522        if u.column_type != r.column_type {
1523            types.get_or_insert_with(|| vec![TypePair::default(); len])[i] = TypePair {
1524                upstream: u.column_type.clone(),
1525                downstream: r.column_type.clone(),
1526            };
1527        }
1528    }
1529
1530    // If there's no type change, indicate it by empty `types`.
1531    let types = types.unwrap_or(Vec::new());
1532
1533    Some(DispatchOutputMapping { indices, types })
1534}
1535
1536fn mv_on_mv_dispatch_strategy(
1537    uses_shuffled_backfill: bool,
1538    dist_key_indices: Vec<u32>,
1539    output_mapping: DispatchOutputMapping,
1540) -> DispatchStrategy {
1541    if uses_shuffled_backfill {
1542        if !dist_key_indices.is_empty() {
1543            DispatchStrategy {
1544                r#type: DispatcherType::Hash as _,
1545                dist_key_indices,
1546                output_mapping: Some(output_mapping),
1547            }
1548        } else {
1549            DispatchStrategy {
1550                r#type: DispatcherType::Simple as _,
1551                dist_key_indices: vec![], // empty for Simple
1552                output_mapping: Some(output_mapping),
1553            }
1554        }
1555    } else {
1556        DispatchStrategy {
1557            r#type: DispatcherType::NoShuffle as _,
1558            dist_key_indices: vec![], // not used for `NoShuffle`
1559            output_mapping: Some(output_mapping),
1560        }
1561    }
1562}
1563
1564impl CompleteStreamFragmentGraph {
1565    /// Returns **all** fragment IDs in the complete graph, including the ones that are not in the
1566    /// building graph.
1567    pub(super) fn all_fragment_ids(&self) -> impl Iterator<Item = GlobalFragmentId> + '_ {
1568        self.building_graph
1569            .fragments
1570            .keys()
1571            .chain(self.existing_fragments.keys())
1572            .copied()
1573    }
1574
1575    /// Returns an iterator of **all** edges in the complete graph, including the external edges.
1576    pub(super) fn all_edges(
1577        &self,
1578    ) -> impl Iterator<Item = (GlobalFragmentId, GlobalFragmentId, &StreamFragmentEdge)> + '_ {
1579        self.building_graph
1580            .downstreams
1581            .iter()
1582            .chain(self.extra_downstreams.iter())
1583            .flat_map(|(&from, tos)| tos.iter().map(move |(&to, edge)| (from, to, edge)))
1584    }
1585
1586    /// Returns the distribution of the existing fragments.
1587    pub(super) fn existing_distribution(&self) -> HashMap<GlobalFragmentId, Distribution> {
1588        self.existing_fragments
1589            .iter()
1590            .map(|(&id, f)| {
1591                (
1592                    id,
1593                    Distribution::from_fragment(f, &self.existing_actor_location),
1594                )
1595            })
1596            .collect()
1597    }
1598
1599    /// Generate topological order of **all** fragments in this graph, including the ones that are
1600    /// not in the building graph. Returns error if the graph is not a DAG and topological sort can
1601    /// not be done.
1602    ///
1603    /// For MV on MV, the first fragment popped out from the heap will be the top-most node, or the
1604    /// `Sink` / `Materialize` in stream graph.
1605    pub(super) fn topo_order(&self) -> MetaResult<Vec<GlobalFragmentId>> {
1606        let mut topo = Vec::new();
1607        let mut downstream_cnts = HashMap::new();
1608
1609        // Iterate all fragments.
1610        for fragment_id in self.all_fragment_ids() {
1611            // Count how many downstreams we have for a given fragment.
1612            let downstream_cnt = self.get_downstreams(fragment_id).count();
1613            if downstream_cnt == 0 {
1614                topo.push(fragment_id);
1615            } else {
1616                downstream_cnts.insert(fragment_id, downstream_cnt);
1617            }
1618        }
1619
1620        let mut i = 0;
1621        while let Some(&fragment_id) = topo.get(i) {
1622            i += 1;
1623            // Find if we can process more fragments.
1624            for (upstream_id, _) in self.get_upstreams(fragment_id) {
1625                let downstream_cnt = downstream_cnts.get_mut(&upstream_id).unwrap();
1626                *downstream_cnt -= 1;
1627                if *downstream_cnt == 0 {
1628                    downstream_cnts.remove(&upstream_id);
1629                    topo.push(upstream_id);
1630                }
1631            }
1632        }
1633
1634        if !downstream_cnts.is_empty() {
1635            // There are fragments that are not processed yet.
1636            bail!("graph is not a DAG");
1637        }
1638
1639        Ok(topo)
1640    }
1641
1642    /// Seal a [`BuildingFragment`] from the graph into a [`Fragment`], which will be further used
1643    /// to build actors on the compute nodes and persist into meta store.
1644    pub(super) fn seal_fragment(
1645        &self,
1646        id: GlobalFragmentId,
1647        actors: Vec<StreamActor>,
1648        distribution: Distribution,
1649        stream_node: StreamNode,
1650    ) -> Fragment {
1651        let building_fragment = self.get_fragment(id).into_building().unwrap();
1652        let internal_tables = building_fragment.extract_internal_tables();
1653        let BuildingFragment {
1654            inner,
1655            job_id,
1656            upstream_table_columns: _,
1657        } = building_fragment;
1658
1659        let distribution_type = distribution.to_distribution_type();
1660        let vnode_count = distribution.vnode_count();
1661
1662        let materialized_fragment_id =
1663            if FragmentTypeMask::from(inner.fragment_type_mask).contains(FragmentTypeFlag::Mview) {
1664                job_id
1665            } else {
1666                None
1667            };
1668
1669        let vector_index_fragment_id =
1670            if inner.fragment_type_mask & FragmentTypeFlag::VectorIndexWrite as u32 != 0 {
1671                job_id
1672            } else {
1673                None
1674            };
1675
1676        let state_table_ids = internal_tables
1677            .iter()
1678            .map(|t| t.id)
1679            .chain(materialized_fragment_id)
1680            .chain(vector_index_fragment_id)
1681            .collect();
1682
1683        Fragment {
1684            fragment_id: inner.fragment_id,
1685            fragment_type_mask: inner.fragment_type_mask.into(),
1686            distribution_type,
1687            actors,
1688            state_table_ids,
1689            maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
1690            nodes: stream_node,
1691        }
1692    }
1693
1694    /// Get a fragment from the complete graph, which can be either a building fragment or an
1695    /// existing fragment.
1696    pub(super) fn get_fragment(&self, fragment_id: GlobalFragmentId) -> EitherFragment {
1697        if let Some(fragment) = self.existing_fragments.get(&fragment_id) {
1698            EitherFragment::Existing(fragment.clone())
1699        } else {
1700            EitherFragment::Building(
1701                self.building_graph
1702                    .fragments
1703                    .get(&fragment_id)
1704                    .unwrap()
1705                    .clone(),
1706            )
1707        }
1708    }
1709
1710    /// Get **all** downstreams of a fragment, including the ones that are not in the building
1711    /// graph.
1712    pub(super) fn get_downstreams(
1713        &self,
1714        fragment_id: GlobalFragmentId,
1715    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1716        self.building_graph
1717            .get_downstreams(fragment_id)
1718            .iter()
1719            .chain(
1720                self.extra_downstreams
1721                    .get(&fragment_id)
1722                    .into_iter()
1723                    .flatten(),
1724            )
1725            .map(|(&id, edge)| (id, edge))
1726    }
1727
1728    /// Get **all** upstreams of a fragment, including the ones that are not in the building
1729    /// graph.
1730    pub(super) fn get_upstreams(
1731        &self,
1732        fragment_id: GlobalFragmentId,
1733    ) -> impl Iterator<Item = (GlobalFragmentId, &StreamFragmentEdge)> {
1734        self.building_graph
1735            .get_upstreams(fragment_id)
1736            .iter()
1737            .chain(self.extra_upstreams.get(&fragment_id).into_iter().flatten())
1738            .map(|(&id, edge)| (id, edge))
1739    }
1740
1741    /// Returns all building fragments in the graph.
1742    pub(super) fn building_fragments(&self) -> &HashMap<GlobalFragmentId, BuildingFragment> {
1743        &self.building_graph.fragments
1744    }
1745
1746    /// Returns all building fragments in the graph, mutable.
1747    pub(super) fn building_fragments_mut(
1748        &mut self,
1749    ) -> &mut HashMap<GlobalFragmentId, BuildingFragment> {
1750        &mut self.building_graph.fragments
1751    }
1752
1753    /// Get the expected vnode count of the building graph. See documentation of the field for more details.
1754    pub(super) fn max_parallelism(&self) -> usize {
1755        self.building_graph.max_parallelism()
1756    }
1757}