risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_plan::stream_fragment_graph::{
21    StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
22};
23use risingwave_pb::stream_plan::{DispatchStrategy, StreamNode};
24use thiserror_ext::AsReport;
25
26pub type LocalFragmentId = FragmentId;
27
28/// [`StreamFragment`] represent a fragment node in fragment DAG.
29#[derive(Clone, Debug)]
30pub struct StreamFragment {
31    /// the allocated fragment id.
32    pub fragment_id: LocalFragmentId,
33
34    /// root stream node in this fragment.
35    pub node: Option<Box<StreamNode>>,
36
37    /// Bitwise-OR of type Flags of this fragment.
38    pub fragment_type_mask: FragmentTypeMask,
39
40    /// Mark whether this fragment requires exactly one actor.
41    pub requires_singleton: bool,
42}
43
44/// An edge between the nodes in the fragment graph.
45#[derive(Debug, Clone)]
46pub struct StreamFragmentEdge {
47    /// Dispatch strategy for the fragment.
48    pub dispatch_strategy: DispatchStrategy,
49
50    /// A unique identifier of this edge. Generally it should be exchange node's operator id. When
51    /// rewriting fragments into delta joins or when inserting 1-to-1 exchange, there will be
52    /// virtual links generated.
53    pub link_id: u64,
54}
55
56impl StreamFragment {
57    pub fn new(fragment_id: LocalFragmentId) -> Self {
58        Self {
59            fragment_id,
60            fragment_type_mask: FragmentTypeMask::empty(),
61            requires_singleton: false,
62            node: None,
63        }
64    }
65
66    pub fn to_protobuf(&self) -> StreamFragmentProto {
67        StreamFragmentProto {
68            fragment_id: self.fragment_id,
69            node: self.node.clone().map(|n| *n),
70            fragment_type_mask: self.fragment_type_mask.into(),
71            requires_singleton: self.requires_singleton,
72        }
73    }
74}
75
76/// [`StreamFragmentGraph`] stores a fragment graph (DAG).
77#[derive(Default)]
78pub struct StreamFragmentGraph {
79    /// stores all the fragments in the graph.
80    pub(in crate::stream_fragmenter) fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
81
82    /// stores edges between fragments: (upstream, downstream) => edge.
83    pub(in crate::stream_fragmenter) edges:
84        HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
85}
86
87impl StreamFragmentGraph {
88    /// Adds a fragment to the graph.
89    pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
90        let id = stream_fragment.fragment_id;
91        let ret = self.fragments.insert(id, stream_fragment);
92        assert!(ret.is_none(), "fragment already exists: {:?}", id);
93    }
94
95    pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
96        self.fragments.get(fragment_id)
97    }
98
99    /// Links upstream to downstream in the graph.
100    pub fn add_edge(
101        &mut self,
102        upstream_id: LocalFragmentId,
103        downstream_id: LocalFragmentId,
104        edge: StreamFragmentEdge,
105    ) {
106        self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
107    }
108
109    /// Try to link upstream to downstream in the graph.
110    ///
111    /// If the edge between upstream and downstream already exists, return an error.
112    pub fn try_add_edge(
113        &mut self,
114        upstream_id: LocalFragmentId,
115        downstream_id: LocalFragmentId,
116        edge: StreamFragmentEdge,
117    ) -> Result<(), String> {
118        let edge = StreamFragmentEdgeProto {
119            upstream_id,
120            downstream_id,
121            dispatch_strategy: Some(edge.dispatch_strategy),
122            link_id: edge.link_id,
123        };
124
125        self.edges
126            .try_insert((upstream_id, downstream_id), edge)
127            .map(|_| ())
128            .map_err(|e| {
129                format!(
130                    "edge between {} and {} already exists: {}",
131                    upstream_id,
132                    downstream_id,
133                    e.to_report_string()
134                )
135            })
136    }
137}