risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_plan::stream_fragment_graph::{
21    StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
22};
23use risingwave_pb::stream_plan::{
24    DispatchStrategy, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
25};
26use thiserror_ext::AsReport;
27
28pub type LocalFragmentId = FragmentId;
29
30/// [`StreamFragment`] represent a fragment node in fragment DAG.
31#[derive(Clone, Debug)]
32pub struct StreamFragment {
33    /// the allocated fragment id.
34    pub fragment_id: LocalFragmentId,
35
36    /// root stream node in this fragment.
37    pub node: Option<Box<StreamNode>>,
38
39    /// Bitwise-OR of type Flags of this fragment.
40    pub fragment_type_mask: FragmentTypeMask,
41
42    /// Mark whether this fragment requires exactly one actor.
43    pub requires_singleton: bool,
44}
45
46/// An edge between the nodes in the fragment graph.
47#[derive(Debug, Clone)]
48pub struct StreamFragmentEdge {
49    /// Dispatch strategy for the fragment.
50    pub dispatch_strategy: DispatchStrategy,
51
52    /// A unique identifier of this edge. Generally it should be exchange node's operator id. When
53    /// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
54    /// virtual links generated.
55    pub link_id: u64,
56}
57
58impl StreamFragment {
59    pub fn new(fragment_id: LocalFragmentId) -> Self {
60        Self {
61            fragment_id,
62            fragment_type_mask: FragmentTypeMask::empty(),
63            requires_singleton: false,
64            node: None,
65        }
66    }
67
68    pub fn to_protobuf(&self) -> StreamFragmentProto {
69        StreamFragmentProto {
70            fragment_id: self.fragment_id,
71            node: self.node.clone().map(|n| *n),
72            fragment_type_mask: self.fragment_type_mask.into(),
73            requires_singleton: self.requires_singleton,
74        }
75    }
76}
77
78/// [`StreamFragmentGraph`] stores a fragment graph (DAG).
79#[derive(Default)]
80pub struct StreamFragmentGraph {
81    /// stores all the fragments in the graph.
82    fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
83
84    /// stores edges between fragments: (upstream, downstream) => edge.
85    edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
86}
87
88impl StreamFragmentGraph {
89    pub fn to_protobuf(&self) -> StreamFragmentGraphProto {
90        StreamFragmentGraphProto {
91            fragments: self
92                .fragments
93                .iter()
94                .map(|(k, v)| (*k, v.to_protobuf()))
95                .collect(),
96            edges: self.edges.values().cloned().collect(),
97
98            // Following fields will be filled later in `build_graph` based on session context.
99            ctx: None,
100            dependent_table_ids: vec![],
101            table_ids_cnt: 0,
102            parallelism: None,
103            max_parallelism: 0,
104            backfill_parallelism: None,
105            backfill_order: Default::default(),
106        }
107    }
108
109    /// Adds a fragment to the graph.
110    pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
111        let id = stream_fragment.fragment_id;
112        let ret = self.fragments.insert(id, stream_fragment);
113        assert!(ret.is_none(), "fragment already exists: {:?}", id);
114    }
115
116    pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
117        self.fragments.get(fragment_id)
118    }
119
120    /// Links upstream to downstream in the graph.
121    pub fn add_edge(
122        &mut self,
123        upstream_id: LocalFragmentId,
124        downstream_id: LocalFragmentId,
125        edge: StreamFragmentEdge,
126    ) {
127        self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
128    }
129
130    /// Try to link upstream to downstream in the graph.
131    ///
132    /// If the edge between upstream and downstream already exists, return an error.
133    pub fn try_add_edge(
134        &mut self,
135        upstream_id: LocalFragmentId,
136        downstream_id: LocalFragmentId,
137        edge: StreamFragmentEdge,
138    ) -> Result<(), String> {
139        let edge = StreamFragmentEdgeProto {
140            upstream_id,
141            downstream_id,
142            dispatch_strategy: Some(edge.dispatch_strategy),
143            link_id: edge.link_id,
144        };
145
146        self.edges
147            .try_insert((upstream_id, downstream_id), edge)
148            .map(|_| ())
149            .map_err(|e| {
150                format!(
151                    "edge between {} and {} already exists: {}",
152                    upstream_id,
153                    downstream_id,
154                    e.to_report_string()
155                )
156            })
157    }
158}