risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_plan::stream_fragment_graph::{
21 StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
22};
23use risingwave_pb::stream_plan::{
24 DispatchStrategy, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode,
25};
26use thiserror_ext::AsReport;
27
28pub type LocalFragmentId = FragmentId;
29
30#[derive(Clone, Debug)]
32pub struct StreamFragment {
33 pub fragment_id: LocalFragmentId,
35
36 pub node: Option<Box<StreamNode>>,
38
39 pub fragment_type_mask: FragmentTypeMask,
41
42 pub requires_singleton: bool,
44}
45
46#[derive(Debug, Clone)]
48pub struct StreamFragmentEdge {
49 pub dispatch_strategy: DispatchStrategy,
51
52 pub link_id: u64,
56}
57
58impl StreamFragment {
59 pub fn new(fragment_id: LocalFragmentId) -> Self {
60 Self {
61 fragment_id,
62 fragment_type_mask: FragmentTypeMask::empty(),
63 requires_singleton: false,
64 node: None,
65 }
66 }
67
68 pub fn to_protobuf(&self) -> StreamFragmentProto {
69 StreamFragmentProto {
70 fragment_id: self.fragment_id,
71 node: self.node.clone().map(|n| *n),
72 fragment_type_mask: self.fragment_type_mask.into(),
73 requires_singleton: self.requires_singleton,
74 }
75 }
76}
77
78#[derive(Default)]
80pub struct StreamFragmentGraph {
81 fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
83
84 edges: HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
86}
87
88impl StreamFragmentGraph {
89 pub fn to_protobuf(&self) -> StreamFragmentGraphProto {
90 StreamFragmentGraphProto {
91 fragments: self
92 .fragments
93 .iter()
94 .map(|(k, v)| (*k, v.to_protobuf()))
95 .collect(),
96 edges: self.edges.values().cloned().collect(),
97
98 ctx: None,
100 dependent_table_ids: vec![],
101 table_ids_cnt: 0,
102 parallelism: None,
103 max_parallelism: 0,
104 backfill_parallelism: None,
105 backfill_order: Default::default(),
106 }
107 }
108
109 pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
111 let id = stream_fragment.fragment_id;
112 let ret = self.fragments.insert(id, stream_fragment);
113 assert!(ret.is_none(), "fragment already exists: {:?}", id);
114 }
115
116 pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
117 self.fragments.get(fragment_id)
118 }
119
120 pub fn add_edge(
122 &mut self,
123 upstream_id: LocalFragmentId,
124 downstream_id: LocalFragmentId,
125 edge: StreamFragmentEdge,
126 ) {
127 self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
128 }
129
130 pub fn try_add_edge(
134 &mut self,
135 upstream_id: LocalFragmentId,
136 downstream_id: LocalFragmentId,
137 edge: StreamFragmentEdge,
138 ) -> Result<(), String> {
139 let edge = StreamFragmentEdgeProto {
140 upstream_id,
141 downstream_id,
142 dispatch_strategy: Some(edge.dispatch_strategy),
143 link_id: edge.link_id,
144 };
145
146 self.edges
147 .try_insert((upstream_id, downstream_id), edge)
148 .map(|_| ())
149 .map_err(|e| {
150 format!(
151 "edge between {} and {} already exists: {}",
152 upstream_id,
153 downstream_id,
154 e.to_report_string()
155 )
156 })
157 }
158}