risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_connector::source::cdc::CdcScanOptions;
33use risingwave_pb::plan_common::JoinType;
34use risingwave_pb::stream_plan::{
35 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
36 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
37 StreamNode, StreamScanType,
38};
39
40use self::rewrite::build_delta_join_without_arrange;
41use crate::catalog::FragmentId;
42use crate::error::ErrorCode::NotSupported;
43use crate::error::{Result, RwError};
44use crate::optimizer::plan_node::generic::GenericPlanRef;
45use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
46use crate::stream_fragmenter::parallelism::derive_parallelism;
47
48#[derive(Educe)]
50#[educe(Default)]
51pub struct BuildFragmentGraphState {
52 fragment_graph: StreamFragmentGraph,
54 next_local_fragment_id: FragmentId,
56
57 next_table_id: u32,
60
61 #[educe(Default(expression = u32::MAX - 1))]
63 next_operator_id: u32,
64
65 dependent_table_ids: HashSet<TableId>,
67
68 share_mapping: HashMap<u32, LocalFragmentId>,
70 share_stream_node_mapping: HashMap<u32, StreamNode>,
72
73 has_source_backfill: bool,
74 has_snapshot_backfill: bool,
75 has_cross_db_snapshot_backfill: bool,
76 tables: HashMap<TableId, Table>,
77}
78
79impl BuildFragmentGraphState {
80 fn new_stream_fragment(&mut self) -> StreamFragment {
82 let fragment = StreamFragment::new(self.next_local_fragment_id);
83 self.next_local_fragment_id += 1;
84 fragment
85 }
86
87 fn gen_operator_id(&mut self) -> u32 {
89 self.next_operator_id -= 1;
90 self.next_operator_id
91 }
92
93 pub fn gen_table_id(&mut self) -> u32 {
95 let ret = self.next_table_id;
96 self.next_table_id += 1;
97 ret
98 }
99
100 pub fn gen_table_id_wrapped(&mut self) -> TableId {
102 TableId::new(self.gen_table_id())
103 }
104
105 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
106 self.share_stream_node_mapping
107 .insert(operator_id, stream_node);
108 }
109
110 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
111 self.share_stream_node_mapping.get(&operator_id)
112 }
113
114 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
117 StreamNode {
118 operator_id: self.gen_operator_id() as u64,
119 identity: "StreamNoOp".into(),
120 node_body: Some(NodeBody::NoOp(NoOpNode {})),
121
122 stream_key: input.stream_key.clone(),
124 stream_kind: input.stream_kind,
125 fields: input.fields.clone(),
126
127 input: vec![input],
128 }
129 }
130}
131
132pub enum GraphJobType {
134 Table,
135 MaterializedView,
136 Source,
137 Sink,
138 Index,
139}
140
141impl GraphJobType {
142 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
143 match self {
144 GraphJobType::Table => config.streaming_parallelism_for_table(),
145 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
146 GraphJobType::Source => config.streaming_parallelism_for_source(),
147 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
148 GraphJobType::Index => config.streaming_parallelism_for_index(),
149 }
150 }
151}
152
153pub fn build_graph(
154 plan_node: PlanRef,
155 job_type: Option<GraphJobType>,
156) -> Result<StreamFragmentGraphProto> {
157 build_graph_with_strategy(plan_node, job_type, None)
158}
159
160pub fn build_graph_with_strategy(
161 plan_node: PlanRef,
162 job_type: Option<GraphJobType>,
163 backfill_order: Option<BackfillOrder>,
164) -> Result<StreamFragmentGraphProto> {
165 let ctx = plan_node.plan_base().ctx();
166 let plan_node = reorganize_elements_id(plan_node);
167
168 let mut state = BuildFragmentGraphState::default();
169 let stream_node = plan_node.to_stream_prost(&mut state)?;
170 generate_fragment_graph(&mut state, stream_node)?;
171 if state.has_source_backfill && state.has_snapshot_backfill {
172 return Err(RwError::from(NotSupported(
173 "Snapshot backfill with shared source backfill is not supported".to_owned(),
174 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
175 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
176 .to_owned(),
177 )));
178 }
179 if state.has_cross_db_snapshot_backfill
180 && let Some(ref backfill_order) = backfill_order
181 && !backfill_order.order.is_empty()
182 {
183 return Err(RwError::from(NotSupported(
184 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
185 "Please remove backfill order specification from your query".to_owned(),
186 )));
187 }
188
189 let mut fragment_graph = state.fragment_graph.to_protobuf();
190
191 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
193 fragment_graph.table_ids_cnt = state.next_table_id;
194
195 {
197 let config = ctx.session_ctx().config();
198 fragment_graph.parallelism = derive_parallelism(
199 job_type.map(|t| t.to_parallelism(config.deref())),
200 config.streaming_parallelism(),
201 );
202 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
203 }
204
205 fragment_graph.ctx = Some(StreamContext {
207 timezone: ctx.get_session_timezone(),
208 });
209
210 fragment_graph.backfill_order = backfill_order;
211
212 Ok(fragment_graph)
213}
214
215#[cfg(any())]
216fn is_stateful_executor(stream_node: &StreamNode) -> bool {
217 matches!(
218 stream_node.get_node_body().unwrap(),
219 NodeBody::HashAgg(_)
220 | NodeBody::HashJoin(_)
221 | NodeBody::DeltaIndexJoin(_)
222 | NodeBody::StreamScan(_)
223 | NodeBody::StreamCdcScan(_)
224 | NodeBody::DynamicFilter(_)
225 )
226}
227
228#[cfg(any())]
233fn rewrite_stream_node(
234 state: &mut BuildFragmentGraphState,
235 stream_node: StreamNode,
236 insert_exchange_flag: bool,
237) -> Result<StreamNode> {
238 let f = |child| {
239 if is_stateful_executor(&child) {
242 if insert_exchange_flag {
243 let child_node = rewrite_stream_node(state, child, true)?;
244
245 let strategy = DispatchStrategy {
246 r#type: DispatcherType::NoShuffle.into(),
247 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
249 };
250 Ok(StreamNode {
251 stream_key: child_node.stream_key.clone(),
252 fields: child_node.fields.clone(),
253 node_body: Some(NodeBody::Exchange(ExchangeNode {
254 strategy: Some(strategy),
255 })),
256 operator_id: state.gen_operator_id() as u64,
257 append_only: child_node.append_only,
258 input: vec![child_node],
259 identity: "Exchange (NoShuffle)".to_string(),
260 })
261 } else {
262 rewrite_stream_node(state, child, true)
263 }
264 } else {
265 match child.get_node_body()? {
266 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
268 _ => rewrite_stream_node(state, child, insert_exchange_flag),
270 }
271 }
272 };
273 Ok(StreamNode {
274 input: stream_node
275 .input
276 .into_iter()
277 .map(f)
278 .collect::<Result<_>>()?,
279 ..stream_node
280 })
281}
282
283fn generate_fragment_graph(
285 state: &mut BuildFragmentGraphState,
286 stream_node: StreamNode,
287) -> Result<()> {
288 #[cfg(any())]
292 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
293
294 build_and_add_fragment(state, stream_node)?;
295 Ok(())
296}
297
298fn build_and_add_fragment(
300 state: &mut BuildFragmentGraphState,
301 stream_node: StreamNode,
302) -> Result<Rc<StreamFragment>> {
303 let operator_id = stream_node.operator_id as u32;
304 match state.share_mapping.get(&operator_id) {
305 None => {
306 let mut fragment = state.new_stream_fragment();
307 let node = build_fragment(state, &mut fragment, stream_node)?;
308
309 let operator_id = node.operator_id as u32;
313
314 assert!(fragment.node.is_none());
315 fragment.node = Some(Box::new(node));
316 let fragment_ref = Rc::new(fragment);
317
318 state.fragment_graph.add_fragment(fragment_ref.clone());
319 state
320 .share_mapping
321 .insert(operator_id, fragment_ref.fragment_id);
322 Ok(fragment_ref)
323 }
324 Some(fragment_id) => Ok(state
325 .fragment_graph
326 .get_fragment(fragment_id)
327 .unwrap()
328 .clone()),
329 }
330}
331
332fn build_fragment(
335 state: &mut BuildFragmentGraphState,
336 current_fragment: &mut StreamFragment,
337 mut stream_node: StreamNode,
338) -> Result<StreamNode> {
339 recursive::tracker!().recurse(|_t| {
340 match stream_node.get_node_body()? {
342 NodeBody::BarrierRecv(_) => current_fragment
343 .fragment_type_mask
344 .add(FragmentTypeFlag::BarrierRecv),
345
346 NodeBody::Source(node) => {
347 current_fragment
348 .fragment_type_mask
349 .add(FragmentTypeFlag::Source);
350
351 if let Some(source) = node.source_inner.as_ref()
352 && let Some(source_info) = source.info.as_ref()
353 && ((source_info.is_shared() && !source_info.is_distributed)
354 || source.with_properties.requires_singleton())
355 {
356 current_fragment.requires_singleton = true;
357 }
358 }
359
360 NodeBody::Dml(_) => {
361 current_fragment
362 .fragment_type_mask
363 .add(FragmentTypeFlag::Dml);
364 }
365
366 NodeBody::Materialize(_) => {
367 current_fragment
368 .fragment_type_mask
369 .add(FragmentTypeFlag::Mview);
370 }
371
372 NodeBody::Sink(_) => current_fragment
373 .fragment_type_mask
374 .add(FragmentTypeFlag::Sink),
375
376 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
377
378 NodeBody::EowcGapFill(node) => {
379 let table = node.buffer_table.as_ref().unwrap().clone();
380 state.tables.insert(table.id, table);
381 let table = node.prev_row_table.as_ref().unwrap().clone();
382 state.tables.insert(table.id, table);
383 }
384
385 NodeBody::GapFill(node) => {
386 let table = node.state_table.as_ref().unwrap().clone();
387 state.tables.insert(table.id, table);
388 }
389
390 NodeBody::StreamScan(node) => {
391 current_fragment
392 .fragment_type_mask
393 .add(FragmentTypeFlag::StreamScan);
394 match node.stream_scan_type() {
395 StreamScanType::SnapshotBackfill => {
396 current_fragment
397 .fragment_type_mask
398 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
399 state.has_snapshot_backfill = true;
400 }
401 StreamScanType::CrossDbSnapshotBackfill => {
402 current_fragment
403 .fragment_type_mask
404 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
405 state.has_cross_db_snapshot_backfill = true;
406 }
407 StreamScanType::Unspecified
408 | StreamScanType::Chain
409 | StreamScanType::Rearrange
410 | StreamScanType::Backfill
411 | StreamScanType::UpstreamOnly
412 | StreamScanType::ArrangementBackfill => {}
413 }
414 state.dependent_table_ids.insert(node.table_id);
417
418 if let Some(state_table) = &node.state_table {
420 let table = state_table.clone();
421 state.tables.insert(table.id, table);
422 }
423 }
424
425 NodeBody::StreamCdcScan(node) => {
426 if let Some(o) = node.options
427 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
428 {
429 current_fragment
431 .fragment_type_mask
432 .add(FragmentTypeFlag::StreamCdcScan);
433 } else {
434 current_fragment
435 .fragment_type_mask
436 .add(FragmentTypeFlag::StreamScan);
437 current_fragment.requires_singleton = true;
439 }
440 state.has_source_backfill = true;
441 }
442
443 NodeBody::CdcFilter(node) => {
444 current_fragment
445 .fragment_type_mask
446 .add(FragmentTypeFlag::CdcFilter);
447 state
449 .dependent_table_ids
450 .insert(node.upstream_source_id.as_cdc_table_id());
451 }
452 NodeBody::SourceBackfill(node) => {
453 current_fragment
454 .fragment_type_mask
455 .add(FragmentTypeFlag::SourceScan);
456 let source_id = node.upstream_source_id;
458 state
459 .dependent_table_ids
460 .insert(source_id.as_cdc_table_id());
461 state.has_source_backfill = true;
462 }
463
464 NodeBody::Now(_) => {
465 current_fragment
467 .fragment_type_mask
468 .add(FragmentTypeFlag::Now);
469 current_fragment.requires_singleton = true;
470 }
471
472 NodeBody::Values(_) => {
473 current_fragment
474 .fragment_type_mask
475 .add(FragmentTypeFlag::Values);
476 current_fragment.requires_singleton = true;
477 }
478
479 NodeBody::StreamFsFetch(_) => {
480 current_fragment
481 .fragment_type_mask
482 .add(FragmentTypeFlag::FsFetch);
483 }
484
485 NodeBody::VectorIndexWrite(_) => {
486 current_fragment
487 .fragment_type_mask
488 .add(FragmentTypeFlag::VectorIndexWrite);
489 }
490
491 NodeBody::UpstreamSinkUnion(_) => {
492 current_fragment
493 .fragment_type_mask
494 .add(FragmentTypeFlag::UpstreamSinkUnion);
495 }
496
497 NodeBody::LocalityProvider(_) => {
498 current_fragment
499 .fragment_type_mask
500 .add(FragmentTypeFlag::LocalityProvider);
501 }
502
503 _ => {}
504 };
505
506 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
508 {
509 if delta_index_join.get_join_type()? == JoinType::Inner
510 && delta_index_join.condition.is_none()
511 {
512 return build_delta_join_without_arrange(state, current_fragment, stream_node);
513 } else {
514 panic!("only inner join without non-equal condition is supported for delta joins");
515 }
516 }
517
518 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
523 stream_node = state.gen_no_op_stream_node(stream_node);
524 }
525
526 stream_node.input = stream_node
528 .input
529 .into_iter()
530 .map(|mut child_node| {
531 match child_node.get_node_body()? {
532 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
535 NodeBody::Exchange(exchange_node) => {
537 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
538
539 let [input]: [_; 1] =
541 std::mem::take(&mut child_node.input).try_into().unwrap();
542 let child_fragment = build_and_add_fragment(state, input)?;
543
544 let result = state.fragment_graph.try_add_edge(
545 child_fragment.fragment_id,
546 current_fragment.fragment_id,
547 StreamFragmentEdge {
548 dispatch_strategy: exchange_node_strategy.clone(),
549 link_id: child_node.operator_id,
551 },
552 );
553
554 if result.is_err() {
558 child_node.operator_id = state.gen_operator_id() as u64;
561
562 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
564 let no_shuffle_strategy = DispatchStrategy {
565 r#type: DispatcherType::NoShuffle as i32,
566 dist_key_indices: vec![],
567 output_mapping: PbDispatchOutputMapping::identical(
568 ref_fragment_node.fields.len(),
569 )
570 .into(),
571 };
572
573 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
574
575 let no_op_fragment = {
576 let node = state.gen_no_op_stream_node(StreamNode {
577 operator_id: no_shuffle_exchange_operator_id,
578 identity: "StreamNoShuffleExchange".into(),
579 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
580 strategy: Some(no_shuffle_strategy.clone()),
581 }))),
582 input: vec![],
583
584 stream_key: ref_fragment_node.stream_key.clone(),
586 stream_kind: ref_fragment_node.stream_kind,
587 fields: ref_fragment_node.fields.clone(),
588 });
589
590 let mut fragment = state.new_stream_fragment();
591 fragment.node = Some(node.into());
592 Rc::new(fragment)
593 };
594
595 state.fragment_graph.add_fragment(no_op_fragment.clone());
596
597 state.fragment_graph.add_edge(
598 child_fragment.fragment_id,
599 no_op_fragment.fragment_id,
600 StreamFragmentEdge {
601 dispatch_strategy: no_shuffle_strategy,
603 link_id: no_shuffle_exchange_operator_id,
604 },
605 );
606 state.fragment_graph.add_edge(
607 no_op_fragment.fragment_id,
608 current_fragment.fragment_id,
609 StreamFragmentEdge {
610 dispatch_strategy: exchange_node_strategy,
612 link_id: child_node.operator_id,
613 },
614 );
615 }
616
617 Ok(child_node)
618 }
619
620 _ => build_fragment(state, current_fragment, child_node),
622 }
623 })
624 .collect::<Result<_>>()?;
625 Ok(stream_node)
626 })
627}