1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::{
33 ConfigAdaptiveParallelismStrategy, ConfigParallelism,
34};
35use risingwave_common::system_param::AdaptiveParallelismStrategy;
36use risingwave_connector::source::cdc::CdcScanOptions;
37use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
38use risingwave_pb::plan_common::JoinType;
39use risingwave_pb::stream_plan::{
40 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
41 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
42 StreamNode, StreamScanType,
43};
44
45use self::rewrite::build_delta_join_without_arrange;
46use crate::catalog::FragmentId;
47use crate::error::ErrorCode::NotSupported;
48use crate::error::{Result, RwError};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
51use crate::stream_fragmenter::parallelism::{derive_parallelism, derive_parallelism_strategy};
52
53#[derive(Educe)]
55#[educe(Default)]
56pub struct BuildFragmentGraphState {
57 fragment_graph: StreamFragmentGraph,
59 next_local_fragment_id: FragmentId,
61
62 next_table_id: u32,
65
66 #[educe(Default(expression = u32::MAX - 1))]
68 next_operator_id: u32,
69
70 dependent_table_ids: HashSet<TableId>,
72
73 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
75 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
77
78 has_source_backfill: bool,
79 has_snapshot_backfill: bool,
80 has_cross_db_snapshot_backfill: bool,
81 has_any_backfill: bool,
82 tables: HashMap<TableId, Table>,
83}
84
85impl BuildFragmentGraphState {
86 fn new_stream_fragment(&mut self) -> StreamFragment {
88 let fragment = StreamFragment::new(self.next_local_fragment_id);
89 self.next_local_fragment_id += 1;
90 fragment
91 }
92
93 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
95 self.next_operator_id -= 1;
96 LocalOperatorId::new(self.next_operator_id).into()
97 }
98
99 pub fn gen_table_id(&mut self) -> u32 {
101 let ret = self.next_table_id;
102 self.next_table_id += 1;
103 ret
104 }
105
106 pub fn gen_table_id_wrapped(&mut self) -> TableId {
108 TableId::new(self.gen_table_id())
109 }
110
111 pub fn add_share_stream_node(
112 &mut self,
113 operator_id: StreamNodeLocalOperatorId,
114 stream_node: StreamNode,
115 ) {
116 self.share_stream_node_mapping
117 .insert(operator_id, stream_node);
118 }
119
120 pub fn get_share_stream_node(
121 &mut self,
122 operator_id: StreamNodeLocalOperatorId,
123 ) -> Option<&StreamNode> {
124 self.share_stream_node_mapping.get(&operator_id)
125 }
126
127 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
130 StreamNode {
131 operator_id: self.gen_operator_id(),
132 identity: "StreamNoOp".into(),
133 node_body: Some(NodeBody::NoOp(NoOpNode {})),
134
135 stream_key: input.stream_key.clone(),
137 stream_kind: input.stream_kind,
138 fields: input.fields.clone(),
139
140 input: vec![input],
141 }
142 }
143}
144
145pub enum GraphJobType {
147 Table,
148 MaterializedView,
149 Source,
150 Sink,
151 Index,
152}
153
154impl GraphJobType {
155 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
156 match self {
157 GraphJobType::Table => config.streaming_parallelism_for_table(),
158 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
159 GraphJobType::Source => config.streaming_parallelism_for_source(),
160 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
161 GraphJobType::Index => config.streaming_parallelism_for_index(),
162 }
163 }
164
165 pub fn to_parallelism_strategy(
166 &self,
167 config: &SessionConfig,
168 ) -> ConfigAdaptiveParallelismStrategy {
169 match self {
170 GraphJobType::Table => config.streaming_parallelism_strategy_for_table(),
171 GraphJobType::MaterializedView => {
172 config.streaming_parallelism_strategy_for_materialized_view()
173 }
174 GraphJobType::Source => config.streaming_parallelism_strategy_for_source(),
175 GraphJobType::Sink => config.streaming_parallelism_strategy_for_sink(),
176 GraphJobType::Index => config.streaming_parallelism_strategy_for_index(),
177 }
178 }
179}
180
181pub fn build_graph(
182 plan_node: PlanRef,
183 job_type: Option<GraphJobType>,
184) -> Result<StreamFragmentGraphProto> {
185 build_graph_with_strategy(plan_node, job_type, None)
186}
187
188pub fn build_graph_with_strategy(
189 plan_node: PlanRef,
190 job_type: Option<GraphJobType>,
191 backfill_order: Option<BackfillOrder>,
192) -> Result<StreamFragmentGraphProto> {
193 let ctx = plan_node.plan_base().ctx();
194 let plan_node = reorganize_elements_id(plan_node);
195
196 let mut state = BuildFragmentGraphState::default();
197 let stream_node = plan_node.to_stream_prost(&mut state)?;
198 generate_fragment_graph(&mut state, stream_node)?;
199 if state.has_source_backfill && state.has_snapshot_backfill {
200 return Err(RwError::from(NotSupported(
201 "Snapshot backfill with shared source backfill is not supported".to_owned(),
202 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
203 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
204 .to_owned(),
205 )));
206 }
207 if state.has_cross_db_snapshot_backfill
208 && let Some(ref backfill_order) = backfill_order
209 && !backfill_order.order.is_empty()
210 {
211 return Err(RwError::from(NotSupported(
212 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
213 "Please remove backfill order specification from your query".to_owned(),
214 )));
215 }
216
217 let mut fragment_graph = state.fragment_graph.to_protobuf();
218
219 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
221 fragment_graph.table_ids_cnt = state.next_table_id;
222
223 let parallelism_strategy = {
225 let config = ctx.session_ctx().config();
226 let streaming_parallelism = config.streaming_parallelism();
227 let job_parallelism = job_type.as_ref().map(|t| t.to_parallelism(config.deref()));
228 let normal_parallelism = derive_parallelism(job_parallelism, streaming_parallelism);
229 let backfill_parallelism = if state.has_any_backfill {
230 match config.streaming_parallelism_for_backfill() {
231 ConfigParallelism::Default => None,
232 override_parallelism => {
233 derive_parallelism(Some(override_parallelism), streaming_parallelism)
234 .or(normal_parallelism)
235 }
236 }
237 } else {
238 None
239 };
240 fragment_graph.parallelism = normal_parallelism;
241 fragment_graph.backfill_parallelism = backfill_parallelism;
242 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
243
244 let job_strategy = job_type
245 .as_ref()
246 .map(|t| t.to_parallelism_strategy(config.deref()));
247 derive_parallelism_strategy(job_strategy, config.streaming_parallelism_strategy())
248 };
249
250 let config_override = ctx
252 .session_ctx()
253 .config()
254 .to_initial_streaming_config_override()
255 .context("invalid initial streaming config override")?;
256 let adaptive_parallelism_strategy = parallelism_strategy
257 .as_ref()
258 .map(AdaptiveParallelismStrategy::to_string)
259 .unwrap_or_default();
260 fragment_graph.ctx = Some(StreamContext {
261 timezone: ctx.get_session_timezone(),
262 config_override,
263 adaptive_parallelism_strategy,
264 });
265
266 fragment_graph.backfill_order = backfill_order;
267
268 Ok(fragment_graph)
269}
270
271#[cfg(any())]
272fn is_stateful_executor(stream_node: &StreamNode) -> bool {
273 matches!(
274 stream_node.get_node_body().unwrap(),
275 NodeBody::HashAgg(_)
276 | NodeBody::HashJoin(_)
277 | NodeBody::DeltaIndexJoin(_)
278 | NodeBody::StreamScan(_)
279 | NodeBody::StreamCdcScan(_)
280 | NodeBody::DynamicFilter(_)
281 )
282}
283
284#[cfg(any())]
289fn rewrite_stream_node(
290 state: &mut BuildFragmentGraphState,
291 stream_node: StreamNode,
292 insert_exchange_flag: bool,
293) -> Result<StreamNode> {
294 let f = |child| {
295 if is_stateful_executor(&child) {
298 if insert_exchange_flag {
299 let child_node = rewrite_stream_node(state, child, true)?;
300
301 let strategy = DispatchStrategy {
302 r#type: DispatcherType::NoShuffle.into(),
303 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
305 };
306 Ok(StreamNode {
307 stream_key: child_node.stream_key.clone(),
308 fields: child_node.fields.clone(),
309 node_body: Some(NodeBody::Exchange(ExchangeNode {
310 strategy: Some(strategy),
311 })),
312 operator_id: state.gen_operator_id(),
313 append_only: child_node.append_only,
314 input: vec![child_node],
315 identity: "Exchange (NoShuffle)".to_string(),
316 })
317 } else {
318 rewrite_stream_node(state, child, true)
319 }
320 } else {
321 match child.get_node_body()? {
322 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
324 _ => rewrite_stream_node(state, child, insert_exchange_flag),
326 }
327 }
328 };
329 Ok(StreamNode {
330 input: stream_node
331 .input
332 .into_iter()
333 .map(f)
334 .collect::<Result<_>>()?,
335 ..stream_node
336 })
337}
338
339fn generate_fragment_graph(
341 state: &mut BuildFragmentGraphState,
342 stream_node: StreamNode,
343) -> Result<()> {
344 #[cfg(any())]
348 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
349
350 build_and_add_fragment(state, stream_node)?;
351 Ok(())
352}
353
354fn build_and_add_fragment(
356 state: &mut BuildFragmentGraphState,
357 stream_node: StreamNode,
358) -> Result<Rc<StreamFragment>> {
359 let operator_id = stream_node.operator_id;
360 match state.share_mapping.get(&operator_id) {
361 None => {
362 let mut fragment = state.new_stream_fragment();
363 let node = build_fragment(state, &mut fragment, stream_node)?;
364
365 let operator_id = node.operator_id;
369
370 assert!(fragment.node.is_none());
371 fragment.node = Some(Box::new(node));
372 let fragment_ref = Rc::new(fragment);
373
374 state.fragment_graph.add_fragment(fragment_ref.clone());
375 state
376 .share_mapping
377 .insert(operator_id, fragment_ref.fragment_id);
378 Ok(fragment_ref)
379 }
380 Some(fragment_id) => Ok(state
381 .fragment_graph
382 .get_fragment(fragment_id)
383 .unwrap()
384 .clone()),
385 }
386}
387
388fn build_fragment(
391 state: &mut BuildFragmentGraphState,
392 current_fragment: &mut StreamFragment,
393 mut stream_node: StreamNode,
394) -> Result<StreamNode> {
395 recursive::tracker!().recurse(|_t| {
396 match stream_node.get_node_body()? {
398 NodeBody::BarrierRecv(_) => current_fragment
399 .fragment_type_mask
400 .add(FragmentTypeFlag::BarrierRecv),
401
402 NodeBody::Source(node) => {
403 current_fragment
404 .fragment_type_mask
405 .add(FragmentTypeFlag::Source);
406
407 if let Some(source) = node.source_inner.as_ref()
408 && let Some(source_info) = source.info.as_ref()
409 && ((source_info.is_shared() && !source_info.is_distributed)
410 || source.with_properties.requires_singleton())
411 {
412 current_fragment.requires_singleton = true;
413 }
414 }
415
416 NodeBody::Dml(_) => {
417 current_fragment
418 .fragment_type_mask
419 .add(FragmentTypeFlag::Dml);
420 }
421
422 NodeBody::Materialize(_) => {
423 current_fragment
424 .fragment_type_mask
425 .add(FragmentTypeFlag::Mview);
426 }
427
428 NodeBody::Sink(_) => current_fragment
429 .fragment_type_mask
430 .add(FragmentTypeFlag::Sink),
431
432 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
433
434 NodeBody::EowcGapFill(node) => {
435 let table = node.buffer_table.as_ref().unwrap().clone();
436 state.tables.insert(table.id, table);
437 let table = node.prev_row_table.as_ref().unwrap().clone();
438 state.tables.insert(table.id, table);
439 }
440
441 NodeBody::GapFill(node) => {
442 let table = node.state_table.as_ref().unwrap().clone();
443 state.tables.insert(table.id, table);
444 }
445
446 NodeBody::StreamScan(node) => {
447 current_fragment
448 .fragment_type_mask
449 .add(FragmentTypeFlag::StreamScan);
450 match node.stream_scan_type() {
451 StreamScanType::SnapshotBackfill => {
452 current_fragment
453 .fragment_type_mask
454 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
455 state.has_snapshot_backfill = true;
456 state.has_any_backfill = true;
457 }
458 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
459 state.has_any_backfill = true;
460 }
461 StreamScanType::CrossDbSnapshotBackfill => {
462 current_fragment
463 .fragment_type_mask
464 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
465 state.has_cross_db_snapshot_backfill = true;
466 state.has_any_backfill = true;
467 }
468 StreamScanType::Unspecified
469 | StreamScanType::Chain
470 | StreamScanType::Rearrange
471 | StreamScanType::UpstreamOnly => {}
472 }
473 state.dependent_table_ids.insert(node.table_id);
476
477 if let Some(state_table) = &node.state_table {
479 let table = state_table.clone();
480 state.tables.insert(table.id, table);
481 }
482 }
483
484 NodeBody::StreamCdcScan(node) => {
485 if let Some(o) = node.options
486 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
487 {
488 current_fragment
490 .fragment_type_mask
491 .add(FragmentTypeFlag::StreamCdcScan);
492 } else {
493 current_fragment
494 .fragment_type_mask
495 .add(FragmentTypeFlag::StreamScan);
496 current_fragment.requires_singleton = true;
498 }
499 state.has_source_backfill = true;
500 state.has_any_backfill = true;
501 }
502
503 NodeBody::CdcFilter(node) => {
504 current_fragment
505 .fragment_type_mask
506 .add(FragmentTypeFlag::CdcFilter);
507 state
509 .dependent_table_ids
510 .insert(node.upstream_source_id.as_cdc_table_id());
511 }
512 NodeBody::SourceBackfill(node) => {
513 current_fragment
514 .fragment_type_mask
515 .add(FragmentTypeFlag::SourceScan);
516 let source_id = node.upstream_source_id;
518 state
519 .dependent_table_ids
520 .insert(source_id.as_cdc_table_id());
521 state.has_source_backfill = true;
522 state.has_any_backfill = true;
523 }
524
525 NodeBody::Now(_) => {
526 current_fragment
528 .fragment_type_mask
529 .add(FragmentTypeFlag::Now);
530 current_fragment.requires_singleton = true;
531 }
532
533 NodeBody::Values(_) => {
534 current_fragment
535 .fragment_type_mask
536 .add(FragmentTypeFlag::Values);
537 current_fragment.requires_singleton = true;
538 }
539
540 NodeBody::StreamFsFetch(_) => {
541 current_fragment
542 .fragment_type_mask
543 .add(FragmentTypeFlag::FsFetch);
544 }
545
546 NodeBody::VectorIndexWrite(_) => {
547 current_fragment
548 .fragment_type_mask
549 .add(FragmentTypeFlag::VectorIndexWrite);
550 }
551
552 NodeBody::UpstreamSinkUnion(_) => {
553 current_fragment
554 .fragment_type_mask
555 .add(FragmentTypeFlag::UpstreamSinkUnion);
556 }
557
558 NodeBody::LocalityProvider(_) => {
559 current_fragment
560 .fragment_type_mask
561 .add(FragmentTypeFlag::LocalityProvider);
562 }
563
564 _ => {}
565 };
566
567 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
569 {
570 if delta_index_join.get_join_type()? == JoinType::Inner
571 && delta_index_join.condition.is_none()
572 {
573 return build_delta_join_without_arrange(state, current_fragment, stream_node);
574 } else {
575 panic!("only inner join without non-equal condition is supported for delta joins");
576 }
577 }
578
579 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
584 stream_node = state.gen_no_op_stream_node(stream_node);
585 }
586
587 stream_node.input = stream_node
589 .input
590 .into_iter()
591 .map(|mut child_node| {
592 match child_node.get_node_body()? {
593 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
596 NodeBody::Exchange(exchange_node) => {
598 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
599
600 let [input]: [_; 1] =
602 std::mem::take(&mut child_node.input).try_into().unwrap();
603 let child_fragment = build_and_add_fragment(state, input)?;
604
605 let result = state.fragment_graph.try_add_edge(
606 child_fragment.fragment_id,
607 current_fragment.fragment_id,
608 StreamFragmentEdge {
609 dispatch_strategy: exchange_node_strategy.clone(),
610 link_id: child_node.operator_id.as_raw_id(),
612 },
613 );
614
615 if result.is_err() {
619 child_node.operator_id = state.gen_operator_id();
622
623 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
625 let no_shuffle_strategy = DispatchStrategy {
626 r#type: DispatcherType::NoShuffle as i32,
627 dist_key_indices: vec![],
628 output_mapping: PbDispatchOutputMapping::identical(
629 ref_fragment_node.fields.len(),
630 )
631 .into(),
632 };
633
634 let no_shuffle_exchange_operator_id = state.gen_operator_id();
635
636 let no_op_fragment = {
637 let node = state.gen_no_op_stream_node(StreamNode {
638 operator_id: no_shuffle_exchange_operator_id,
639 identity: "StreamNoShuffleExchange".into(),
640 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
641 strategy: Some(no_shuffle_strategy.clone()),
642 }))),
643 input: vec![],
644
645 stream_key: ref_fragment_node.stream_key.clone(),
647 stream_kind: ref_fragment_node.stream_kind,
648 fields: ref_fragment_node.fields.clone(),
649 });
650
651 let mut fragment = state.new_stream_fragment();
652 fragment.node = Some(node.into());
653 Rc::new(fragment)
654 };
655
656 state.fragment_graph.add_fragment(no_op_fragment.clone());
657
658 state.fragment_graph.add_edge(
659 child_fragment.fragment_id,
660 no_op_fragment.fragment_id,
661 StreamFragmentEdge {
662 dispatch_strategy: no_shuffle_strategy,
664 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
665 },
666 );
667 state.fragment_graph.add_edge(
668 no_op_fragment.fragment_id,
669 current_fragment.fragment_id,
670 StreamFragmentEdge {
671 dispatch_strategy: exchange_node_strategy,
673 link_id: child_node.operator_id.as_raw_id(),
674 },
675 );
676 }
677
678 Ok(child_node)
679 }
680
681 _ => build_fragment(state, current_fragment, child_node),
683 }
684 })
685 .collect::<Result<_>>()?;
686 Ok(stream_node)
687 })
688}