risingwave_frontend/stream_fragmenter/graph/
fragment_graph.rs1use std::collections::HashMap;
16use std::rc::Rc;
17
18use risingwave_common::catalog::FragmentTypeMask;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_plan::stream_fragment_graph::{
21 StreamFragment as StreamFragmentProto, StreamFragmentEdge as StreamFragmentEdgeProto,
22};
23use risingwave_pb::stream_plan::{DispatchStrategy, StreamNode};
24
25pub type LocalFragmentId = FragmentId;
26
27#[derive(Clone, Debug)]
29pub struct StreamFragment {
30 pub fragment_id: LocalFragmentId,
32
33 pub node: Option<Box<StreamNode>>,
35
36 pub fragment_type_mask: FragmentTypeMask,
38
39 pub requires_singleton: bool,
41}
42
43#[derive(Debug, Clone)]
45pub struct StreamFragmentEdge {
46 pub dispatch_strategy: DispatchStrategy,
48
49 pub link_id: u64,
53}
54
55impl StreamFragment {
56 pub fn new(fragment_id: LocalFragmentId) -> Self {
57 Self {
58 fragment_id,
59 fragment_type_mask: FragmentTypeMask::empty(),
60 requires_singleton: false,
61 node: None,
62 }
63 }
64
65 pub fn to_protobuf(&self) -> StreamFragmentProto {
66 StreamFragmentProto {
67 fragment_id: self.fragment_id,
68 node: self.node.clone().map(|n| *n),
69 fragment_type_mask: self.fragment_type_mask.into(),
70 requires_singleton: self.requires_singleton,
71 }
72 }
73}
74
75#[derive(Default)]
77pub struct StreamFragmentGraph {
78 pub(in crate::stream_fragmenter) fragments: HashMap<LocalFragmentId, Rc<StreamFragment>>,
80
81 pub(in crate::stream_fragmenter) edges:
83 HashMap<(LocalFragmentId, LocalFragmentId), StreamFragmentEdgeProto>,
84}
85
86impl StreamFragmentGraph {
87 pub fn add_fragment(&mut self, stream_fragment: Rc<StreamFragment>) {
89 let id = stream_fragment.fragment_id;
90 let ret = self.fragments.insert(id, stream_fragment);
91 assert!(ret.is_none(), "fragment already exists: {:?}", id);
92 }
93
94 pub fn get_fragment(&self, fragment_id: &LocalFragmentId) -> Option<&Rc<StreamFragment>> {
95 self.fragments.get(fragment_id)
96 }
97
98 pub fn add_edge(
100 &mut self,
101 upstream_id: LocalFragmentId,
102 downstream_id: LocalFragmentId,
103 edge: StreamFragmentEdge,
104 ) {
105 self.try_add_edge(upstream_id, downstream_id, edge).unwrap();
106 }
107
108 pub fn try_add_edge(
112 &mut self,
113 upstream_id: LocalFragmentId,
114 downstream_id: LocalFragmentId,
115 edge: StreamFragmentEdge,
116 ) -> Result<(), String> {
117 let edge = StreamFragmentEdgeProto {
118 upstream_id,
119 downstream_id,
120 dispatch_strategy: Some(edge.dispatch_strategy),
121 link_id: edge.link_id,
122 };
123
124 self.edges
125 .try_insert((upstream_id, downstream_id), edge)
126 .map(|_| ())
127 .map_err(|e| {
128 format!(
129 "edge between {upstream_id} and {downstream_id} already exists (existing: {:?})",
130 e.entry.get(),
131 )
132 })
133 }
134}