risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39 StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::derive_parallelism;
49
50#[derive(Educe)]
52#[educe(Default)]
53pub struct BuildFragmentGraphState {
54 fragment_graph: StreamFragmentGraph,
56 next_local_fragment_id: FragmentId,
58
59 next_table_id: u32,
62
63 #[educe(Default(expression = u32::MAX - 1))]
65 next_operator_id: u32,
66
67 dependent_table_ids: HashSet<TableId>,
69
70 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
72 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
74
75 has_source_backfill: bool,
76 has_snapshot_backfill: bool,
77 has_cross_db_snapshot_backfill: bool,
78 has_any_backfill: bool,
79 tables: HashMap<TableId, Table>,
80}
81
82impl BuildFragmentGraphState {
83 fn new_stream_fragment(&mut self) -> StreamFragment {
85 let fragment = StreamFragment::new(self.next_local_fragment_id);
86 self.next_local_fragment_id += 1;
87 fragment
88 }
89
90 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
92 self.next_operator_id -= 1;
93 LocalOperatorId::new(self.next_operator_id).into()
94 }
95
96 pub fn gen_table_id(&mut self) -> u32 {
98 let ret = self.next_table_id;
99 self.next_table_id += 1;
100 ret
101 }
102
103 pub fn gen_table_id_wrapped(&mut self) -> TableId {
105 TableId::new(self.gen_table_id())
106 }
107
108 pub fn add_share_stream_node(
109 &mut self,
110 operator_id: StreamNodeLocalOperatorId,
111 stream_node: StreamNode,
112 ) {
113 self.share_stream_node_mapping
114 .insert(operator_id, stream_node);
115 }
116
117 pub fn get_share_stream_node(
118 &mut self,
119 operator_id: StreamNodeLocalOperatorId,
120 ) -> Option<&StreamNode> {
121 self.share_stream_node_mapping.get(&operator_id)
122 }
123
124 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
127 StreamNode {
128 operator_id: self.gen_operator_id(),
129 identity: "StreamNoOp".into(),
130 node_body: Some(NodeBody::NoOp(NoOpNode {})),
131
132 stream_key: input.stream_key.clone(),
134 stream_kind: input.stream_kind,
135 fields: input.fields.clone(),
136
137 input: vec![input],
138 }
139 }
140}
141
142pub enum GraphJobType {
144 Table,
145 MaterializedView,
146 Source,
147 Sink,
148 Index,
149}
150
151impl GraphJobType {
152 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
153 match self {
154 GraphJobType::Table => config.streaming_parallelism_for_table(),
155 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
156 GraphJobType::Source => config.streaming_parallelism_for_source(),
157 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
158 GraphJobType::Index => config.streaming_parallelism_for_index(),
159 }
160 }
161}
162
163pub fn build_graph(
164 plan_node: PlanRef,
165 job_type: Option<GraphJobType>,
166) -> Result<StreamFragmentGraphProto> {
167 build_graph_with_strategy(plan_node, job_type, None)
168}
169
170pub fn build_graph_with_strategy(
171 plan_node: PlanRef,
172 job_type: Option<GraphJobType>,
173 backfill_order: Option<BackfillOrder>,
174) -> Result<StreamFragmentGraphProto> {
175 let ctx = plan_node.plan_base().ctx();
176 let plan_node = reorganize_elements_id(plan_node);
177
178 let mut state = BuildFragmentGraphState::default();
179 let stream_node = plan_node.to_stream_prost(&mut state)?;
180 generate_fragment_graph(&mut state, stream_node)?;
181 if state.has_source_backfill && state.has_snapshot_backfill {
182 return Err(RwError::from(NotSupported(
183 "Snapshot backfill with shared source backfill is not supported".to_owned(),
184 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
185 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
186 .to_owned(),
187 )));
188 }
189 if state.has_cross_db_snapshot_backfill
190 && let Some(ref backfill_order) = backfill_order
191 && !backfill_order.order.is_empty()
192 {
193 return Err(RwError::from(NotSupported(
194 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
195 "Please remove backfill order specification from your query".to_owned(),
196 )));
197 }
198
199 let mut fragment_graph = state.fragment_graph.to_protobuf();
200
201 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
203 fragment_graph.table_ids_cnt = state.next_table_id;
204
205 {
207 let config = ctx.session_ctx().config();
208 let streaming_parallelism = config.streaming_parallelism();
209 let normal_parallelism = derive_parallelism(
210 job_type.map(|t| t.to_parallelism(config.deref())),
211 streaming_parallelism,
212 );
213 let backfill_parallelism = if state.has_any_backfill {
214 match config.streaming_parallelism_for_backfill() {
215 ConfigParallelism::Default => None,
216 override_parallelism => {
217 derive_parallelism(Some(override_parallelism), streaming_parallelism)
218 .or(normal_parallelism)
219 }
220 }
221 } else {
222 None
223 };
224 fragment_graph.parallelism = normal_parallelism;
225 fragment_graph.backfill_parallelism = backfill_parallelism;
226 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
227 }
228
229 let config_override = ctx
231 .session_ctx()
232 .config()
233 .to_initial_streaming_config_override()
234 .context("invalid initial streaming config override")?;
235 fragment_graph.ctx = Some(StreamContext {
236 timezone: ctx.get_session_timezone(),
237 config_override,
238 });
239
240 fragment_graph.backfill_order = backfill_order;
241
242 Ok(fragment_graph)
243}
244
245#[cfg(any())]
246fn is_stateful_executor(stream_node: &StreamNode) -> bool {
247 matches!(
248 stream_node.get_node_body().unwrap(),
249 NodeBody::HashAgg(_)
250 | NodeBody::HashJoin(_)
251 | NodeBody::DeltaIndexJoin(_)
252 | NodeBody::StreamScan(_)
253 | NodeBody::StreamCdcScan(_)
254 | NodeBody::DynamicFilter(_)
255 )
256}
257
258#[cfg(any())]
263fn rewrite_stream_node(
264 state: &mut BuildFragmentGraphState,
265 stream_node: StreamNode,
266 insert_exchange_flag: bool,
267) -> Result<StreamNode> {
268 let f = |child| {
269 if is_stateful_executor(&child) {
272 if insert_exchange_flag {
273 let child_node = rewrite_stream_node(state, child, true)?;
274
275 let strategy = DispatchStrategy {
276 r#type: DispatcherType::NoShuffle.into(),
277 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
279 };
280 Ok(StreamNode {
281 stream_key: child_node.stream_key.clone(),
282 fields: child_node.fields.clone(),
283 node_body: Some(NodeBody::Exchange(ExchangeNode {
284 strategy: Some(strategy),
285 })),
286 operator_id: state.gen_operator_id(),
287 append_only: child_node.append_only,
288 input: vec![child_node],
289 identity: "Exchange (NoShuffle)".to_string(),
290 })
291 } else {
292 rewrite_stream_node(state, child, true)
293 }
294 } else {
295 match child.get_node_body()? {
296 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
298 _ => rewrite_stream_node(state, child, insert_exchange_flag),
300 }
301 }
302 };
303 Ok(StreamNode {
304 input: stream_node
305 .input
306 .into_iter()
307 .map(f)
308 .collect::<Result<_>>()?,
309 ..stream_node
310 })
311}
312
313fn generate_fragment_graph(
315 state: &mut BuildFragmentGraphState,
316 stream_node: StreamNode,
317) -> Result<()> {
318 #[cfg(any())]
322 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
323
324 build_and_add_fragment(state, stream_node)?;
325 Ok(())
326}
327
328fn build_and_add_fragment(
330 state: &mut BuildFragmentGraphState,
331 stream_node: StreamNode,
332) -> Result<Rc<StreamFragment>> {
333 let operator_id = stream_node.operator_id;
334 match state.share_mapping.get(&operator_id) {
335 None => {
336 let mut fragment = state.new_stream_fragment();
337 let node = build_fragment(state, &mut fragment, stream_node)?;
338
339 let operator_id = node.operator_id;
343
344 assert!(fragment.node.is_none());
345 fragment.node = Some(Box::new(node));
346 let fragment_ref = Rc::new(fragment);
347
348 state.fragment_graph.add_fragment(fragment_ref.clone());
349 state
350 .share_mapping
351 .insert(operator_id, fragment_ref.fragment_id);
352 Ok(fragment_ref)
353 }
354 Some(fragment_id) => Ok(state
355 .fragment_graph
356 .get_fragment(fragment_id)
357 .unwrap()
358 .clone()),
359 }
360}
361
362fn build_fragment(
365 state: &mut BuildFragmentGraphState,
366 current_fragment: &mut StreamFragment,
367 mut stream_node: StreamNode,
368) -> Result<StreamNode> {
369 recursive::tracker!().recurse(|_t| {
370 match stream_node.get_node_body()? {
372 NodeBody::BarrierRecv(_) => current_fragment
373 .fragment_type_mask
374 .add(FragmentTypeFlag::BarrierRecv),
375
376 NodeBody::Source(node) => {
377 current_fragment
378 .fragment_type_mask
379 .add(FragmentTypeFlag::Source);
380
381 if let Some(source) = node.source_inner.as_ref()
382 && let Some(source_info) = source.info.as_ref()
383 && ((source_info.is_shared() && !source_info.is_distributed)
384 || source.with_properties.requires_singleton())
385 {
386 current_fragment.requires_singleton = true;
387 }
388 }
389
390 NodeBody::Dml(_) => {
391 current_fragment
392 .fragment_type_mask
393 .add(FragmentTypeFlag::Dml);
394 }
395
396 NodeBody::Materialize(_) => {
397 current_fragment
398 .fragment_type_mask
399 .add(FragmentTypeFlag::Mview);
400 }
401
402 NodeBody::Sink(_) => current_fragment
403 .fragment_type_mask
404 .add(FragmentTypeFlag::Sink),
405
406 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
407
408 NodeBody::EowcGapFill(node) => {
409 let table = node.buffer_table.as_ref().unwrap().clone();
410 state.tables.insert(table.id, table);
411 let table = node.prev_row_table.as_ref().unwrap().clone();
412 state.tables.insert(table.id, table);
413 }
414
415 NodeBody::GapFill(node) => {
416 let table = node.state_table.as_ref().unwrap().clone();
417 state.tables.insert(table.id, table);
418 }
419
420 NodeBody::StreamScan(node) => {
421 current_fragment
422 .fragment_type_mask
423 .add(FragmentTypeFlag::StreamScan);
424 match node.stream_scan_type() {
425 StreamScanType::SnapshotBackfill => {
426 current_fragment
427 .fragment_type_mask
428 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
429 state.has_snapshot_backfill = true;
430 state.has_any_backfill = true;
431 }
432 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
433 state.has_any_backfill = true;
434 }
435 StreamScanType::CrossDbSnapshotBackfill => {
436 current_fragment
437 .fragment_type_mask
438 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
439 state.has_cross_db_snapshot_backfill = true;
440 state.has_any_backfill = true;
441 }
442 StreamScanType::Unspecified
443 | StreamScanType::Chain
444 | StreamScanType::Rearrange
445 | StreamScanType::UpstreamOnly => {}
446 }
447 state.dependent_table_ids.insert(node.table_id);
450
451 if let Some(state_table) = &node.state_table {
453 let table = state_table.clone();
454 state.tables.insert(table.id, table);
455 }
456 }
457
458 NodeBody::StreamCdcScan(node) => {
459 if let Some(o) = node.options
460 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
461 {
462 current_fragment
464 .fragment_type_mask
465 .add(FragmentTypeFlag::StreamCdcScan);
466 } else {
467 current_fragment
468 .fragment_type_mask
469 .add(FragmentTypeFlag::StreamScan);
470 current_fragment.requires_singleton = true;
472 }
473 state.has_source_backfill = true;
474 state.has_any_backfill = true;
475 }
476
477 NodeBody::CdcFilter(node) => {
478 current_fragment
479 .fragment_type_mask
480 .add(FragmentTypeFlag::CdcFilter);
481 state
483 .dependent_table_ids
484 .insert(node.upstream_source_id.as_cdc_table_id());
485 }
486 NodeBody::SourceBackfill(node) => {
487 current_fragment
488 .fragment_type_mask
489 .add(FragmentTypeFlag::SourceScan);
490 let source_id = node.upstream_source_id;
492 state
493 .dependent_table_ids
494 .insert(source_id.as_cdc_table_id());
495 state.has_source_backfill = true;
496 state.has_any_backfill = true;
497 }
498
499 NodeBody::Now(_) => {
500 current_fragment
502 .fragment_type_mask
503 .add(FragmentTypeFlag::Now);
504 current_fragment.requires_singleton = true;
505 }
506
507 NodeBody::Values(_) => {
508 current_fragment
509 .fragment_type_mask
510 .add(FragmentTypeFlag::Values);
511 current_fragment.requires_singleton = true;
512 }
513
514 NodeBody::StreamFsFetch(_) => {
515 current_fragment
516 .fragment_type_mask
517 .add(FragmentTypeFlag::FsFetch);
518 }
519
520 NodeBody::VectorIndexWrite(_) => {
521 current_fragment
522 .fragment_type_mask
523 .add(FragmentTypeFlag::VectorIndexWrite);
524 }
525
526 NodeBody::UpstreamSinkUnion(_) => {
527 current_fragment
528 .fragment_type_mask
529 .add(FragmentTypeFlag::UpstreamSinkUnion);
530 }
531
532 NodeBody::LocalityProvider(_) => {
533 current_fragment
534 .fragment_type_mask
535 .add(FragmentTypeFlag::LocalityProvider);
536 }
537
538 _ => {}
539 };
540
541 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
543 {
544 if delta_index_join.get_join_type()? == JoinType::Inner
545 && delta_index_join.condition.is_none()
546 {
547 return build_delta_join_without_arrange(state, current_fragment, stream_node);
548 } else {
549 panic!("only inner join without non-equal condition is supported for delta joins");
550 }
551 }
552
553 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
558 stream_node = state.gen_no_op_stream_node(stream_node);
559 }
560
561 stream_node.input = stream_node
563 .input
564 .into_iter()
565 .map(|mut child_node| {
566 match child_node.get_node_body()? {
567 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
570 NodeBody::Exchange(exchange_node) => {
572 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
573
574 let [input]: [_; 1] =
576 std::mem::take(&mut child_node.input).try_into().unwrap();
577 let child_fragment = build_and_add_fragment(state, input)?;
578
579 let result = state.fragment_graph.try_add_edge(
580 child_fragment.fragment_id,
581 current_fragment.fragment_id,
582 StreamFragmentEdge {
583 dispatch_strategy: exchange_node_strategy.clone(),
584 link_id: child_node.operator_id.as_raw_id(),
586 },
587 );
588
589 if result.is_err() {
593 child_node.operator_id = state.gen_operator_id();
596
597 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
599 let no_shuffle_strategy = DispatchStrategy {
600 r#type: DispatcherType::NoShuffle as i32,
601 dist_key_indices: vec![],
602 output_mapping: PbDispatchOutputMapping::identical(
603 ref_fragment_node.fields.len(),
604 )
605 .into(),
606 };
607
608 let no_shuffle_exchange_operator_id = state.gen_operator_id();
609
610 let no_op_fragment = {
611 let node = state.gen_no_op_stream_node(StreamNode {
612 operator_id: no_shuffle_exchange_operator_id,
613 identity: "StreamNoShuffleExchange".into(),
614 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
615 strategy: Some(no_shuffle_strategy.clone()),
616 }))),
617 input: vec![],
618
619 stream_key: ref_fragment_node.stream_key.clone(),
621 stream_kind: ref_fragment_node.stream_kind,
622 fields: ref_fragment_node.fields.clone(),
623 });
624
625 let mut fragment = state.new_stream_fragment();
626 fragment.node = Some(node.into());
627 Rc::new(fragment)
628 };
629
630 state.fragment_graph.add_fragment(no_op_fragment.clone());
631
632 state.fragment_graph.add_edge(
633 child_fragment.fragment_id,
634 no_op_fragment.fragment_id,
635 StreamFragmentEdge {
636 dispatch_strategy: no_shuffle_strategy,
638 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
639 },
640 );
641 state.fragment_graph.add_edge(
642 no_op_fragment.fragment_id,
643 current_fragment.fragment_id,
644 StreamFragmentEdge {
645 dispatch_strategy: exchange_node_strategy,
647 link_id: child_node.operator_id.as_raw_id(),
648 },
649 );
650 }
651
652 Ok(child_node)
653 }
654
655 _ => build_fragment(state, current_fragment, child_node),
657 }
658 })
659 .collect::<Result<_>>()?;
660 Ok(stream_node)
661 })
662}