risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::stream_plan::stream_fragment_graph::{
20 StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
21};
22use risingwave_pb::stream_plan::{
23 DispatchStrategy, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
24};
25use thiserror_ext::AsReport;
26
27pub type LocalFragmentId = u32;
28
29#[derive(Clone, Debug)]
31pub struct StreamFragment {
32 pub fragment_id: LocalFragmentId,
34
35 pub node: Option<Box<StreamNode>>,
37
38 pub fragment_type_mask: FragmentTypeMask,
40
41 pub requires_singleton: bool,
43}
44
45#[derive(Debug, Clone)]
47pub struct StreamFragmentEdge {
48 pub dispatch_strategy: DispatchStrategy,
50
51 pub link_id: u64,
55}
56
57impl StreamFragment {
58 pub fn new(fragment_id: LocalFragmentId) -> Self {
59 Self {
60 fragment_id,
61 fragment_type_mask: FragmentTypeMask::empty(),
62 requires_singleton: false,
63 node: None,
64 }
65 }
66
67 pub fn to_protobuf(&self) -> StreamFragmentProto {
68 StreamFragmentProto {
69 fragment_id: self.fragment_id,
70 node: self.node.clone().map(|n| *n),
71 fragment_type_mask: self.fragment_type_mask.into(),
72 requires_singleton: self.requires_singleton,
73 }
74 }
75}
76
77#[derive(Default)]
79pub struct StreamFragmentGraph {
80 fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
82
83 edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
85}
86
87impl StreamFragmentGraph {
88 pub fn to_protobuf(&self) -> StreamFragmentGraphProto {
89 StreamFragmentGraphProto {
90 fragments: self
91 .fragments
92 .iter()
93 .map(|(k, v)| (*k, v.to_protobuf()))
94 .collect(),
95 edges: self.edges.values().cloned().collect(),
96
97 ctx: None,
99 dependent_table_ids: vec![],
100 table_ids_cnt: 0,
101 parallelism: None,
102 max_parallelism: 0,
103 backfill_order: Default::default(),
104 }
105 }
106
107 pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
109 let id = stream_fragment.fragment_id;
110 let ret = self.fragments.insert(id, stream_fragment);
111 assert!(ret.is_none(), "fragment already exists: {:?}", id);
112 }
113
114 pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
115 self.fragments.get(fragment_id)
116 }
117
118 pub fn add_edge(
120 &mut self,
121 upstream_id: LocalFragmentId,
122 downstream_id: LocalFragmentId,
123 edge: StreamFragmentEdge,
124 ) {
125 self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
126 }
127
128 pub fn try_add_edge(
132 &mut self,
133 upstream_id: LocalFragmentId,
134 downstream_id: LocalFragmentId,
135 edge: StreamFragmentEdge,
136 ) -> Result<(), String> {
137 let edge = StreamFragmentEdgeProto {
138 upstream_id,
139 downstream_id,
140 dispatch_strategy: Some(edge.dispatch_strategy),
141 link_id: edge.link_id,
142 };
143
144 self.edges
145 .try_insert((upstream_id, downstream_id), edge)
146 .map(|_| ())
147 .map_err(|e| {
148 format!(
149 "edge between {} and {} already exists: {}",
150 upstream_id,
151 downstream_id,
152 e.to_report_string()
153 )
154 })
155 }
156}