risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::stream_plan::stream_fragment_graph::{
20 StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
21};
22use risingwave_pb::stream_plan::{
23 DispatchStrategy, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
24};
25use thiserror_ext::AsReport;
26
27pub type LocalFragmentId = u32;
28
29#[derive(Clone, Debug)]
31pub struct StreamFragment {
32 pub fragment_id: LocalFragmentId,
34
35 pub node: Option<Box<StreamNode>>,
37
38 pub fragment_type_mask: FragmentTypeMask,
40
41 pub requires_singleton: bool,
43
44 pub table_ids_cnt: u32,
46
47 pub upstream_table_ids: Vec<u32>,
49}
50
51#[derive(Debug, Clone)]
53pub struct StreamFragmentEdge {
54 pub dispatch_strategy: DispatchStrategy,
56
57 pub link_id: u64,
61}
62
63impl StreamFragment {
64 pub fn new(fragment_id: LocalFragmentId) -> Self {
65 Self {
66 fragment_id,
67 fragment_type_mask: FragmentTypeMask::empty(),
68 requires_singleton: false,
69 node: None,
70 table_ids_cnt: 0,
71 upstream_table_ids: vec![],
72 }
73 }
74
75 pub fn to_protobuf(&self) -> StreamFragmentProto {
76 StreamFragmentProto {
77 fragment_id: self.fragment_id,
78 node: self.node.clone().map(|n| *n),
79 fragment_type_mask: self.fragment_type_mask.into(),
80 requires_singleton: self.requires_singleton,
81 table_ids_cnt: self.table_ids_cnt,
82 upstream_table_ids: self.upstream_table_ids.clone(),
83 }
84 }
85}
86
87#[derive(Default)]
89pub struct StreamFragmentGraph {
90 fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
92
93 edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
95}
96
97impl StreamFragmentGraph {
98 pub fn to_protobuf(&self) -> StreamFragmentGraphProto {
99 StreamFragmentGraphProto {
100 fragments: self
101 .fragments
102 .iter()
103 .map(|(k, v)| (*k, v.to_protobuf()))
104 .collect(),
105 edges: self.edges.values().cloned().collect(),
106
107 ctx: None,
109 dependent_table_ids: vec![],
110 table_ids_cnt: 0,
111 parallelism: None,
112 max_parallelism: 0,
113 backfill_order: Default::default(),
114 }
115 }
116
117 pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
119 let id = stream_fragment.fragment_id;
120 let ret = self.fragments.insert(id, stream_fragment);
121 assert!(ret.is_none(), "fragment already exists: {:?}", id);
122 }
123
124 pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
125 self.fragments.get(fragment_id)
126 }
127
128 pub fn add_edge(
130 &mut self,
131 upstream_id: LocalFragmentId,
132 downstream_id: LocalFragmentId,
133 edge: StreamFragmentEdge,
134 ) {
135 self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
136 }
137
138 pub fn try_add_edge(
142 &mut self,
143 upstream_id: LocalFragmentId,
144 downstream_id: LocalFragmentId,
145 edge: StreamFragmentEdge,
146 ) -> Result<(), String> {
147 let edge = StreamFragmentEdgeProto {
148 upstream_id,
149 downstream_id,
150 dispatch_strategy: Some(edge.dispatch_strategy),
151 link_id: edge.link_id,
152 };
153
154 self.edges
155 .try_insert((upstream_id, downstream_id), edge)
156 .map(|_| ())
157 .map_err(|e| {
158 format!(
159 "edge between {} and {} already exists: {}",
160 upstream_id,
161 downstream_id,
162 e.to_report_string()
163 )
164 })
165 }
166}