risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_plan::stream_fragment_graph::{
21 StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
22};
23use risingwave_pb::stream_plan::{DispatchStrategy, StreamNode};
24use thiserror_ext::AsReport;
25
26pub type LocalFragmentId = FragmentId;
27
28#[derive(Clone, Debug)]
30pub struct StreamFragment {
31 pub fragment_id: LocalFragmentId,
33
34 pub node: Option<Box<StreamNode>>,
36
37 pub fragment_type_mask: FragmentTypeMask,
39
40 pub requires_singleton: bool,
42}
43
44#[derive(Debug, Clone)]
46pub struct StreamFragmentEdge {
47 pub dispatch_strategy: DispatchStrategy,
49
50 pub link_id: u64,
54}
55
56impl StreamFragment {
57 pub fn new(fragment_id: LocalFragmentId) -> Self {
58 Self {
59 fragment_id,
60 fragment_type_mask: FragmentTypeMask::empty(),
61 requires_singleton: false,
62 node: None,
63 }
64 }
65
66 pub fn to_protobuf(&self) -> StreamFragmentProto {
67 StreamFragmentProto {
68 fragment_id: self.fragment_id,
69 node: self.node.clone().map(|n| *n),
70 fragment_type_mask: self.fragment_type_mask.into(),
71 requires_singleton: self.requires_singleton,
72 }
73 }
74}
75
76#[derive(Default)]
78pub struct StreamFragmentGraph {
79 pub(in crate::stream_fragmenter) fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
81
82 pub(in crate::stream_fragmenter) edges:
84 HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
85}
86
87impl StreamFragmentGraph {
88 pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
90 let id = stream_fragment.fragment_id;
91 let ret = self.fragments.insert(id, stream_fragment);
92 assert!(ret.is_none(), "fragment already exists: {:?}", id);
93 }
94
95 pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
96 self.fragments.get(fragment_id)
97 }
98
99 pub fn add_edge(
101 &mut self,
102 upstream_id: LocalFragmentId,
103 downstream_id: LocalFragmentId,
104 edge: StreamFragmentEdge,
105 ) {
106 self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
107 }
108
109 pub fn try_add_edge(
113 &mut self,
114 upstream_id: LocalFragmentId,
115 downstream_id: LocalFragmentId,
116 edge: StreamFragmentEdge,
117 ) -> Result<(), String> {
118 let edge = StreamFragmentEdgeProto {
119 upstream_id,
120 downstream_id,
121 dispatch_strategy: Some(edge.dispatch_strategy),
122 link_id: edge.link_id,
123 };
124
125 self.edges
126 .try_insert((upstream_id, downstream_id), edge)
127 .map(|_| ())
128 .map_err(|e| {
129 format!(
130 "edge between {} and {} already exists: {}",
131 upstream_id,
132 downstream_id,
133 e.to_report_string()
134 )
135 })
136 }
137}