risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::stream_plan::stream_fragment_graph::{
20    StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
21};
22use risingwave_pb::stream_plan::{
23    DispatchStrategy, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
24};
25use thiserror_ext::AsReport;
26
27pub type LocalFragmentId = u32;
28
29/// [`StreamFragment`] represent a fragment node in fragment DAG.
30#[derive(Clone, Debug)]
31pub struct StreamFragment {
32    /// the allocated fragment id.
33    pub fragment_id: LocalFragmentId,
34
35    /// root stream node in this fragment.
36    pub node: Option<Box<StreamNode>>,
37
38    /// Bitwise-OR of type Flags of this fragment.
39    pub fragment_type_mask: FragmentTypeMask,
40
41    /// Mark whether this fragment requires exactly one actor.
42    pub requires_singleton: bool,
43
44    /// Number of table ids (stateful states) for this fragment.
45    pub table_ids_cnt: u32,
46
47    /// Mark the upstream table ids of this fragment.
48    pub upstream_table_ids: Vec<u32>,
49}
50
51/// An edge between the nodes in the fragment graph.
52#[derive(Debug, Clone)]
53pub struct StreamFragmentEdge {
54    /// Dispatch strategy for the fragment.
55    pub dispatch_strategy: DispatchStrategy,
56
57    /// A unique identifier of this edge. Generally it should be exchange node's operator id. When
58    /// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
59    /// virtual links generated.
60    pub link_id: u64,
61}
62
63impl StreamFragment {
64    pub fn new(fragment_id: LocalFragmentId) -> Self {
65        Self {
66            fragment_id,
67            fragment_type_mask: FragmentTypeMask::empty(),
68            requires_singleton: false,
69            node: None,
70            table_ids_cnt: 0,
71            upstream_table_ids: vec![],
72        }
73    }
74
75    pub fn to_protobuf(&self) -> StreamFragmentProto {
76        StreamFragmentProto {
77            fragment_id: self.fragment_id,
78            node: self.node.clone().map(|n| *n),
79            fragment_type_mask: self.fragment_type_mask.into(),
80            requires_singleton: self.requires_singleton,
81            table_ids_cnt: self.table_ids_cnt,
82            upstream_table_ids: self.upstream_table_ids.clone(),
83        }
84    }
85}
86
87/// [`StreamFragmentGraph`] stores a fragment graph (DAG).
88#[derive(Default)]
89pub struct StreamFragmentGraph {
90    /// stores all the fragments in the graph.
91    fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
92
93    /// stores edges between fragments: (upstream, downstream) => edge.
94    edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
95}
96
97impl StreamFragmentGraph {
98    pub fn to_protobuf(&self) -> StreamFragmentGraphProto {
99        StreamFragmentGraphProto {
100            fragments: self
101                .fragments
102                .iter()
103                .map(|(k, v)| (*k, v.to_protobuf()))
104                .collect(),
105            edges: self.edges.values().cloned().collect(),
106
107            // Following fields will be filled later in `build_graph` based on session context.
108            ctx: None,
109            dependent_table_ids: vec![],
110            table_ids_cnt: 0,
111            parallelism: None,
112            max_parallelism: 0,
113            backfill_order: Default::default(),
114        }
115    }
116
117    /// Adds a fragment to the graph.
118    pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
119        let id = stream_fragment.fragment_id;
120        let ret = self.fragments.insert(id, stream_fragment);
121        assert!(ret.is_none(), "fragment already exists: {:?}", id);
122    }
123
124    pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
125        self.fragments.get(fragment_id)
126    }
127
128    /// Links upstream to downstream in the graph.
129    pub fn add_edge(
130        &mut self,
131        upstream_id: LocalFragmentId,
132        downstream_id: LocalFragmentId,
133        edge: StreamFragmentEdge,
134    ) {
135        self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
136    }
137
138    /// Try to link upstream to downstream in the graph.
139    ///
140    /// If the edge between upstream and downstream already exists, return an error.
141    pub fn try_add_edge(
142        &mut self,
143        upstream_id: LocalFragmentId,
144        downstream_id: LocalFragmentId,
145        edge: StreamFragmentEdge,
146    ) -> Result<(), String> {
147        let edge = StreamFragmentEdgeProto {
148            upstream_id,
149            downstream_id,
150            dispatch_strategy: Some(edge.dispatch_strategy),
151            link_id: edge.link_id,
152        };
153
154        self.edges
155            .try_insert((upstream_id, downstream_id), edge)
156            .map(|_| ())
157            .map_err(|e| {
158                format!(
159                    "edge between {} and {} already exists: {}",
160                    upstream_id,
161                    downstream_id,
162                    e.to_report_string()
163                )
164            })
165    }
166}