risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::rc::Rc;
25
26use educe::Educe;
27use risingwave_common::catalog::TableId;
28use risingwave_pb::plan_common::JoinType;
29use risingwave_pb::stream_plan::{
30 DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode, StreamContext,
31 StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
32};
33
34use self::rewrite::build_delta_join_without_arrange;
35use crate::error::Result;
36use crate::optimizer::PlanRef;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::reorganize_elements_id;
39use crate::scheduler::SchedulerResult;
40
41#[derive(Educe)]
43#[educe(Default)]
44pub struct BuildFragmentGraphState {
45 fragment_graph: StreamFragmentGraph,
47 next_local_fragment_id: u32,
49
50 next_table_id: u32,
53
54 #[educe(Default(expression = u32::MAX - 1))]
56 next_operator_id: u32,
57
58 dependent_table_ids: HashSet<TableId>,
60
61 share_mapping: HashMap<u32, LocalFragmentId>,
63 share_stream_node_mapping: HashMap<u32, StreamNode>,
65}
66
67impl BuildFragmentGraphState {
68 fn new_stream_fragment(&mut self) -> StreamFragment {
70 let fragment = StreamFragment::new(self.next_local_fragment_id);
71 self.next_local_fragment_id += 1;
72 fragment
73 }
74
75 fn gen_operator_id(&mut self) -> u32 {
77 self.next_operator_id -= 1;
78 self.next_operator_id
79 }
80
81 pub fn gen_table_id(&mut self) -> u32 {
83 let ret = self.next_table_id;
84 self.next_table_id += 1;
85 ret
86 }
87
88 pub fn gen_table_id_wrapped(&mut self) -> TableId {
90 TableId::new(self.gen_table_id())
91 }
92
93 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
94 self.share_stream_node_mapping
95 .insert(operator_id, stream_node);
96 }
97
98 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
99 self.share_stream_node_mapping.get(&operator_id)
100 }
101
102 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
105 StreamNode {
106 operator_id: self.gen_operator_id() as u64,
107 identity: "StreamNoOp".into(),
108 node_body: Some(NodeBody::NoOp(NoOpNode {})),
109
110 stream_key: input.stream_key.clone(),
112 append_only: input.append_only,
113 fields: input.fields.clone(),
114
115 input: vec![input],
116 }
117 }
118}
119
120pub fn build_graph(plan_node: PlanRef) -> SchedulerResult<StreamFragmentGraphProto> {
121 let ctx = plan_node.plan_base().ctx();
122 let plan_node = reorganize_elements_id(plan_node);
123
124 let mut state = BuildFragmentGraphState::default();
125 let stream_node = plan_node.to_stream_prost(&mut state)?;
126 generate_fragment_graph(&mut state, stream_node).unwrap();
127 let mut fragment_graph = state.fragment_graph.to_protobuf();
128
129 fragment_graph.dependent_table_ids = state
131 .dependent_table_ids
132 .into_iter()
133 .map(|id| id.table_id)
134 .collect();
135 fragment_graph.table_ids_cnt = state.next_table_id;
136
137 {
139 let config = ctx.session_ctx().config();
140
141 fragment_graph.parallelism =
142 config
143 .streaming_parallelism()
144 .map(|parallelism| Parallelism {
145 parallelism: parallelism.get(),
146 });
147 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
148 }
149
150 fragment_graph.ctx = Some(StreamContext {
152 timezone: ctx.get_session_timezone(),
153 });
154
155 Ok(fragment_graph)
156}
157
158#[cfg(any())]
159fn is_stateful_executor(stream_node: &StreamNode) -> bool {
160 matches!(
161 stream_node.get_node_body().unwrap(),
162 NodeBody::HashAgg(_)
163 | NodeBody::HashJoin(_)
164 | NodeBody::DeltaIndexJoin(_)
165 | NodeBody::StreamScan(_)
166 | NodeBody::StreamCdcScan(_)
167 | NodeBody::DynamicFilter(_)
168 )
169}
170
171#[cfg(any())]
176fn rewrite_stream_node(
177 state: &mut BuildFragmentGraphState,
178 stream_node: StreamNode,
179 insert_exchange_flag: bool,
180) -> Result<StreamNode> {
181 let f = |child| {
182 if is_stateful_executor(&child) {
185 if insert_exchange_flag {
186 let child_node = rewrite_stream_node(state, child, true)?;
187
188 let strategy = DispatchStrategy {
189 r#type: DispatcherType::NoShuffle.into(),
190 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
192 };
193 Ok(StreamNode {
194 stream_key: child_node.stream_key.clone(),
195 fields: child_node.fields.clone(),
196 node_body: Some(NodeBody::Exchange(ExchangeNode {
197 strategy: Some(strategy),
198 })),
199 operator_id: state.gen_operator_id() as u64,
200 append_only: child_node.append_only,
201 input: vec![child_node],
202 identity: "Exchange (NoShuffle)".to_string(),
203 })
204 } else {
205 rewrite_stream_node(state, child, true)
206 }
207 } else {
208 match child.get_node_body()? {
209 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
211 _ => rewrite_stream_node(state, child, insert_exchange_flag),
213 }
214 }
215 };
216 Ok(StreamNode {
217 input: stream_node
218 .input
219 .into_iter()
220 .map(f)
221 .collect::<Result<_>>()?,
222 ..stream_node
223 })
224}
225
226fn generate_fragment_graph(
228 state: &mut BuildFragmentGraphState,
229 stream_node: StreamNode,
230) -> Result<()> {
231 #[cfg(any())]
235 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
236
237 build_and_add_fragment(state, stream_node)?;
238 Ok(())
239}
240
241fn build_and_add_fragment(
243 state: &mut BuildFragmentGraphState,
244 stream_node: StreamNode,
245) -> Result<Rc<StreamFragment>> {
246 let operator_id = stream_node.operator_id as u32;
247 match state.share_mapping.get(&operator_id) {
248 None => {
249 let mut fragment = state.new_stream_fragment();
250 let node = build_fragment(state, &mut fragment, stream_node)?;
251
252 let operator_id = node.operator_id as u32;
256
257 assert!(fragment.node.is_none());
258 fragment.node = Some(Box::new(node));
259 let fragment_ref = Rc::new(fragment);
260
261 state.fragment_graph.add_fragment(fragment_ref.clone());
262 state
263 .share_mapping
264 .insert(operator_id, fragment_ref.fragment_id);
265 Ok(fragment_ref)
266 }
267 Some(fragment_id) => Ok(state
268 .fragment_graph
269 .get_fragment(fragment_id)
270 .unwrap()
271 .clone()),
272 }
273}
274
275fn build_fragment(
278 state: &mut BuildFragmentGraphState,
279 current_fragment: &mut StreamFragment,
280 mut stream_node: StreamNode,
281) -> Result<StreamNode> {
282 recursive::tracker!().recurse(|_t| {
283 match stream_node.get_node_body()? {
285 NodeBody::BarrierRecv(_) => {
286 current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
287 }
288
289 NodeBody::Source(node) => {
290 current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
291
292 if let Some(source) = node.source_inner.as_ref()
293 && let Some(source_info) = source.info.as_ref()
294 && ((source_info.is_shared() && !source_info.is_distributed)
295 || source.with_properties.is_new_fs_connector()
296 || source.with_properties.is_iceberg_connector())
297 {
298 current_fragment.requires_singleton = true;
299 }
300 }
301
302 NodeBody::Dml(_) => {
303 current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
304 }
305
306 NodeBody::Materialize(_) => {
307 current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
308 }
309
310 NodeBody::Sink(_) => {
311 current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
312 }
313
314 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
315
316 NodeBody::StreamScan(node) => {
317 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
318 match node.stream_scan_type() {
319 StreamScanType::SnapshotBackfill => {
320 current_fragment.fragment_type_mask |=
321 FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
322 }
323 StreamScanType::CrossDbSnapshotBackfill => {
324 current_fragment.fragment_type_mask |=
325 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
326 }
327 StreamScanType::Unspecified
328 | StreamScanType::Chain
329 | StreamScanType::Rearrange
330 | StreamScanType::Backfill
331 | StreamScanType::UpstreamOnly
332 | StreamScanType::ArrangementBackfill => {}
333 }
334 state
337 .dependent_table_ids
338 .insert(TableId::new(node.table_id));
339 current_fragment.upstream_table_ids.push(node.table_id);
340 }
341
342 NodeBody::StreamCdcScan(_) => {
343 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
344 current_fragment.requires_singleton = true;
346 }
347
348 NodeBody::CdcFilter(node) => {
349 current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
350 state
352 .dependent_table_ids
353 .insert(node.upstream_source_id.into());
354 current_fragment
355 .upstream_table_ids
356 .push(node.upstream_source_id);
357 }
358 NodeBody::SourceBackfill(node) => {
359 current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
360 let source_id = node.upstream_source_id;
362 state.dependent_table_ids.insert(source_id.into());
363 current_fragment.upstream_table_ids.push(source_id);
364 }
365
366 NodeBody::Now(_) => {
367 current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
369 current_fragment.requires_singleton = true;
370 }
371
372 NodeBody::Values(_) => {
373 current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
374 current_fragment.requires_singleton = true;
375 }
376
377 NodeBody::StreamFsFetch(_) => {
378 current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
379 }
380
381 _ => {}
382 };
383
384 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
386 {
387 if delta_index_join.get_join_type()? == JoinType::Inner
388 && delta_index_join.condition.is_none()
389 {
390 return build_delta_join_without_arrange(state, current_fragment, stream_node);
391 } else {
392 panic!("only inner join without non-equal condition is supported for delta joins");
393 }
394 }
395
396 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
401 stream_node = state.gen_no_op_stream_node(stream_node);
402 }
403
404 stream_node.input = stream_node
406 .input
407 .into_iter()
408 .map(|mut child_node| {
409 match child_node.get_node_body()? {
410 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
413 NodeBody::Exchange(exchange_node) => {
415 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
416
417 let [input]: [_; 1] =
419 std::mem::take(&mut child_node.input).try_into().unwrap();
420 let child_fragment = build_and_add_fragment(state, input)?;
421
422 let result = state.fragment_graph.try_add_edge(
423 child_fragment.fragment_id,
424 current_fragment.fragment_id,
425 StreamFragmentEdge {
426 dispatch_strategy: exchange_node_strategy.clone(),
427 link_id: child_node.operator_id,
429 },
430 );
431
432 if result.is_err() {
436 child_node.operator_id = state.gen_operator_id() as u64;
439
440 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
442 let no_shuffle_strategy = DispatchStrategy {
443 r#type: DispatcherType::NoShuffle as i32,
444 dist_key_indices: vec![],
445 output_indices: (0..ref_fragment_node.fields.len() as u32)
446 .collect(),
447 };
448
449 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
450
451 let no_op_fragment = {
452 let node = state.gen_no_op_stream_node(StreamNode {
453 operator_id: no_shuffle_exchange_operator_id,
454 identity: "StreamNoShuffleExchange".into(),
455 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
456 strategy: Some(no_shuffle_strategy.clone()),
457 }))),
458 input: vec![],
459
460 stream_key: ref_fragment_node.stream_key.clone(),
462 append_only: ref_fragment_node.append_only,
463 fields: ref_fragment_node.fields.clone(),
464 });
465
466 let mut fragment = state.new_stream_fragment();
467 fragment.node = Some(node.into());
468 Rc::new(fragment)
469 };
470
471 state.fragment_graph.add_fragment(no_op_fragment.clone());
472
473 state.fragment_graph.add_edge(
474 child_fragment.fragment_id,
475 no_op_fragment.fragment_id,
476 StreamFragmentEdge {
477 dispatch_strategy: no_shuffle_strategy,
479 link_id: no_shuffle_exchange_operator_id,
480 },
481 );
482 state.fragment_graph.add_edge(
483 no_op_fragment.fragment_id,
484 current_fragment.fragment_id,
485 StreamFragmentEdge {
486 dispatch_strategy: exchange_node_strategy,
488 link_id: child_node.operator_id,
489 },
490 );
491 }
492
493 Ok(child_node)
494 }
495
496 _ => build_fragment(state, current_fragment, child_node),
498 }
499 })
500 .collect::<Result<_>>()?;
501 Ok(stream_node)
502 })
503}