1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_common::system_param::AdaptiveParallelismStrategy;
34use risingwave_connector::source::cdc::CdcScanOptions;
35use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
36use risingwave_pb::plan_common::JoinType;
37use risingwave_pb::stream_plan::{
38 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
39 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
40 StreamNode, StreamScanType,
41};
42
43use self::rewrite::build_delta_join_without_arrange;
44use crate::catalog::FragmentId;
45use crate::error::ErrorCode::NotSupported;
46use crate::error::{Result, RwError};
47use crate::optimizer::plan_node::generic::GenericPlanRef;
48use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
49use crate::stream_fragmenter::parallelism::{
50 ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
51};
52
53#[derive(Educe)]
55#[educe(Default)]
56pub struct BuildFragmentGraphState {
57 fragment_graph: StreamFragmentGraph,
59 next_local_fragment_id: FragmentId,
61
62 next_table_id: u32,
65
66 #[educe(Default(expression = u32::MAX - 1))]
68 next_operator_id: u32,
69
70 dependent_table_ids: HashSet<TableId>,
72
73 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
75 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
77
78 has_source_backfill: bool,
79 has_snapshot_backfill: bool,
80 has_cross_db_snapshot_backfill: bool,
81 has_any_backfill: bool,
82 tables: HashMap<TableId, Table>,
83}
84
85impl BuildFragmentGraphState {
86 fn new_stream_fragment(&mut self) -> StreamFragment {
88 let fragment = StreamFragment::new(self.next_local_fragment_id);
89 self.next_local_fragment_id += 1;
90 fragment
91 }
92
93 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
95 self.next_operator_id -= 1;
96 LocalOperatorId::new(self.next_operator_id).into()
97 }
98
99 pub fn gen_table_id(&mut self) -> u32 {
101 let ret = self.next_table_id;
102 self.next_table_id += 1;
103 ret
104 }
105
106 pub fn gen_table_id_wrapped(&mut self) -> TableId {
108 TableId::new(self.gen_table_id())
109 }
110
111 pub fn add_share_stream_node(
112 &mut self,
113 operator_id: StreamNodeLocalOperatorId,
114 stream_node: StreamNode,
115 ) {
116 self.share_stream_node_mapping
117 .insert(operator_id, stream_node);
118 }
119
120 pub fn get_share_stream_node(
121 &mut self,
122 operator_id: StreamNodeLocalOperatorId,
123 ) -> Option<&StreamNode> {
124 self.share_stream_node_mapping.get(&operator_id)
125 }
126
127 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
130 StreamNode {
131 operator_id: self.gen_operator_id(),
132 identity: "StreamNoOp".into(),
133 node_body: Some(NodeBody::NoOp(NoOpNode {})),
134
135 stream_key: input.stream_key.clone(),
137 stream_kind: input.stream_kind,
138 fields: input.fields.clone(),
139
140 input: vec![input],
141 }
142 }
143}
144
145#[derive(Clone, Copy, Debug, PartialEq, Eq)]
147pub enum GraphJobType {
148 Table,
149 MaterializedView,
150 Source,
151 Sink,
152 Index,
153}
154
155impl GraphJobType {
156 pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
157 match self {
158 GraphJobType::Table => config.streaming_parallelism_for_table(),
159 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
160 GraphJobType::Source => config.streaming_parallelism_for_source(),
161 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
162 GraphJobType::Index => config.streaming_parallelism_for_index(),
163 }
164 }
165}
166
167pub fn build_graph(
168 plan_node: PlanRef,
169 job_type: Option<GraphJobType>,
170) -> Result<StreamFragmentGraphProto> {
171 build_graph_with_strategy(plan_node, job_type, None)
172}
173
174pub fn build_graph_with_strategy(
175 plan_node: PlanRef,
176 job_type: Option<GraphJobType>,
177 backfill_order: Option<BackfillOrder>,
178) -> Result<StreamFragmentGraphProto> {
179 let ctx = plan_node.plan_base().ctx();
180 let plan_node = reorganize_elements_id(plan_node);
181
182 let mut state = BuildFragmentGraphState::default();
183 let stream_node = plan_node.to_stream_prost(&mut state)?;
184 generate_fragment_graph(&mut state, stream_node)?;
185 if state.has_source_backfill && state.has_snapshot_backfill {
186 return Err(RwError::from(NotSupported(
187 "Snapshot backfill with shared source backfill is not supported".to_owned(),
188 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
189 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
190 .to_owned(),
191 )));
192 }
193 if state.has_cross_db_snapshot_backfill
194 && let Some(ref backfill_order) = backfill_order
195 && !backfill_order.order.is_empty()
196 {
197 return Err(RwError::from(NotSupported(
198 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
199 "Please remove backfill order specification from your query".to_owned(),
200 )));
201 }
202
203 let mut fragment_graph = state.fragment_graph.to_protobuf();
204
205 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
207 fragment_graph.table_ids_cnt = state.next_table_id;
208
209 let parallelism_strategy = {
211 let config = ctx.session_ctx().config();
212 let streaming_parallelism = config.streaming_parallelism();
213 let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
214 let normal_parallelism =
215 derive_parallelism(job_type, job_parallelism, streaming_parallelism);
216 let backfill_parallelism = if state.has_any_backfill {
217 derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
218 } else {
219 ResolvedParallelism {
220 parallelism: None,
221 adaptive_strategy: None,
222 }
223 };
224 fragment_graph.parallelism = normal_parallelism.parallelism;
225 fragment_graph.backfill_parallelism = backfill_parallelism.parallelism;
226 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
227 (
228 normal_parallelism.adaptive_strategy,
229 backfill_parallelism.adaptive_strategy,
230 )
231 };
232
233 let config_override = ctx
235 .session_ctx()
236 .config()
237 .to_initial_streaming_config_override()
238 .context("invalid initial streaming config override")?;
239 let adaptive_parallelism_strategy = parallelism_strategy
240 .0
241 .as_ref()
242 .map(AdaptiveParallelismStrategy::to_string)
243 .unwrap_or_default();
244 let backfill_adaptive_parallelism_strategy = parallelism_strategy
245 .1
246 .as_ref()
247 .map(AdaptiveParallelismStrategy::to_string)
248 .unwrap_or_default();
249 fragment_graph.ctx = Some(StreamContext {
250 timezone: ctx.get_session_timezone(),
251 config_override,
252 adaptive_parallelism_strategy,
253 backfill_adaptive_parallelism_strategy,
254 });
255
256 fragment_graph.backfill_order = backfill_order;
257
258 Ok(fragment_graph)
259}
260
261#[cfg(any())]
262fn is_stateful_executor(stream_node: &StreamNode) -> bool {
263 matches!(
264 stream_node.get_node_body().unwrap(),
265 NodeBody::HashAgg(_)
266 | NodeBody::HashJoin(_)
267 | NodeBody::DeltaIndexJoin(_)
268 | NodeBody::StreamScan(_)
269 | NodeBody::StreamCdcScan(_)
270 | NodeBody::DynamicFilter(_)
271 )
272}
273
274#[cfg(any())]
279fn rewrite_stream_node(
280 state: &mut BuildFragmentGraphState,
281 stream_node: StreamNode,
282 insert_exchange_flag: bool,
283) -> Result<StreamNode> {
284 let f = |child| {
285 if is_stateful_executor(&child) {
288 if insert_exchange_flag {
289 let child_node = rewrite_stream_node(state, child, true)?;
290
291 let strategy = DispatchStrategy {
292 r#type: DispatcherType::NoShuffle.into(),
293 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
295 };
296 Ok(StreamNode {
297 stream_key: child_node.stream_key.clone(),
298 fields: child_node.fields.clone(),
299 node_body: Some(NodeBody::Exchange(ExchangeNode {
300 strategy: Some(strategy),
301 })),
302 operator_id: state.gen_operator_id(),
303 append_only: child_node.append_only,
304 input: vec![child_node],
305 identity: "Exchange (NoShuffle)".to_string(),
306 })
307 } else {
308 rewrite_stream_node(state, child, true)
309 }
310 } else {
311 match child.get_node_body()? {
312 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
314 _ => rewrite_stream_node(state, child, insert_exchange_flag),
316 }
317 }
318 };
319 Ok(StreamNode {
320 input: stream_node
321 .input
322 .into_iter()
323 .map(f)
324 .collect::<Result<_>>()?,
325 ..stream_node
326 })
327}
328
329fn generate_fragment_graph(
331 state: &mut BuildFragmentGraphState,
332 stream_node: StreamNode,
333) -> Result<()> {
334 #[cfg(any())]
338 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
339
340 build_and_add_fragment(state, stream_node)?;
341 Ok(())
342}
343
344fn build_and_add_fragment(
346 state: &mut BuildFragmentGraphState,
347 stream_node: StreamNode,
348) -> Result<Rc<StreamFragment>> {
349 let operator_id = stream_node.operator_id;
350 match state.share_mapping.get(&operator_id) {
351 None => {
352 let mut fragment = state.new_stream_fragment();
353 let node = build_fragment(state, &mut fragment, stream_node)?;
354
355 let operator_id = node.operator_id;
359
360 assert!(fragment.node.is_none());
361 fragment.node = Some(Box::new(node));
362 let fragment_ref = Rc::new(fragment);
363
364 state.fragment_graph.add_fragment(fragment_ref.clone());
365 state
366 .share_mapping
367 .insert(operator_id, fragment_ref.fragment_id);
368 Ok(fragment_ref)
369 }
370 Some(fragment_id) => Ok(state
371 .fragment_graph
372 .get_fragment(fragment_id)
373 .unwrap()
374 .clone()),
375 }
376}
377
378fn build_fragment(
381 state: &mut BuildFragmentGraphState,
382 current_fragment: &mut StreamFragment,
383 mut stream_node: StreamNode,
384) -> Result<StreamNode> {
385 recursive::tracker!().recurse(|_t| {
386 match stream_node.get_node_body()? {
388 NodeBody::BarrierRecv(_) => current_fragment
389 .fragment_type_mask
390 .add(FragmentTypeFlag::BarrierRecv),
391
392 NodeBody::Source(node) => {
393 current_fragment
394 .fragment_type_mask
395 .add(FragmentTypeFlag::Source);
396
397 if let Some(source) = node.source_inner.as_ref()
398 && let Some(source_info) = source.info.as_ref()
399 && ((source_info.is_shared() && !source_info.is_distributed)
400 || source.with_properties.requires_singleton())
401 {
402 current_fragment.requires_singleton = true;
403 }
404 }
405
406 NodeBody::Dml(_) => {
407 current_fragment
408 .fragment_type_mask
409 .add(FragmentTypeFlag::Dml);
410 }
411
412 NodeBody::Materialize(_) => {
413 current_fragment
414 .fragment_type_mask
415 .add(FragmentTypeFlag::Mview);
416 }
417
418 NodeBody::Sink(_) => current_fragment
419 .fragment_type_mask
420 .add(FragmentTypeFlag::Sink),
421
422 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
423
424 NodeBody::EowcGapFill(node) => {
425 let table = node.buffer_table.as_ref().unwrap().clone();
426 state.tables.insert(table.id, table);
427 let table = node.prev_row_table.as_ref().unwrap().clone();
428 state.tables.insert(table.id, table);
429 }
430
431 NodeBody::GapFill(node) => {
432 let table = node.state_table.as_ref().unwrap().clone();
433 state.tables.insert(table.id, table);
434 }
435
436 NodeBody::StreamScan(node) => {
437 current_fragment
438 .fragment_type_mask
439 .add(FragmentTypeFlag::StreamScan);
440 match node.stream_scan_type() {
441 StreamScanType::SnapshotBackfill => {
442 current_fragment
443 .fragment_type_mask
444 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
445 state.has_snapshot_backfill = true;
446 state.has_any_backfill = true;
447 }
448 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
449 state.has_any_backfill = true;
450 }
451 StreamScanType::CrossDbSnapshotBackfill => {
452 current_fragment
453 .fragment_type_mask
454 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
455 state.has_cross_db_snapshot_backfill = true;
456 state.has_any_backfill = true;
457 }
458 StreamScanType::Unspecified
459 | StreamScanType::Chain
460 | StreamScanType::Rearrange
461 | StreamScanType::UpstreamOnly => {}
462 }
463 state.dependent_table_ids.insert(node.table_id);
466
467 if let Some(state_table) = &node.state_table {
469 let table = state_table.clone();
470 state.tables.insert(table.id, table);
471 }
472 }
473
474 NodeBody::StreamCdcScan(node) => {
475 if let Some(o) = node.options
476 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
477 {
478 current_fragment
480 .fragment_type_mask
481 .add(FragmentTypeFlag::StreamCdcScan);
482 } else {
483 current_fragment
484 .fragment_type_mask
485 .add(FragmentTypeFlag::StreamScan);
486 current_fragment.requires_singleton = true;
488 }
489 state.has_source_backfill = true;
490 state.has_any_backfill = true;
491 }
492
493 NodeBody::CdcFilter(node) => {
494 current_fragment
495 .fragment_type_mask
496 .add(FragmentTypeFlag::CdcFilter);
497 state
499 .dependent_table_ids
500 .insert(node.upstream_source_id.as_cdc_table_id());
501 }
502 NodeBody::SourceBackfill(node) => {
503 current_fragment
504 .fragment_type_mask
505 .add(FragmentTypeFlag::SourceScan);
506 let source_id = node.upstream_source_id;
508 state
509 .dependent_table_ids
510 .insert(source_id.as_cdc_table_id());
511 state.has_source_backfill = true;
512 state.has_any_backfill = true;
513 }
514
515 NodeBody::Now(_) => {
516 current_fragment
518 .fragment_type_mask
519 .add(FragmentTypeFlag::Now);
520 current_fragment.requires_singleton = true;
521 }
522
523 NodeBody::Values(_) => {
524 current_fragment
525 .fragment_type_mask
526 .add(FragmentTypeFlag::Values);
527 current_fragment.requires_singleton = true;
528 }
529
530 NodeBody::StreamFsFetch(_) => {
531 current_fragment
532 .fragment_type_mask
533 .add(FragmentTypeFlag::FsFetch);
534 }
535
536 NodeBody::VectorIndexWrite(_) => {
537 current_fragment
538 .fragment_type_mask
539 .add(FragmentTypeFlag::VectorIndexWrite);
540 }
541
542 NodeBody::UpstreamSinkUnion(_) => {
543 current_fragment
544 .fragment_type_mask
545 .add(FragmentTypeFlag::UpstreamSinkUnion);
546 }
547
548 NodeBody::LocalityProvider(_) => {
549 current_fragment
550 .fragment_type_mask
551 .add(FragmentTypeFlag::LocalityProvider);
552 }
553
554 _ => {}
555 };
556
557 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
559 {
560 if delta_index_join.get_join_type()? == JoinType::Inner
561 && delta_index_join.condition.is_none()
562 {
563 return build_delta_join_without_arrange(state, current_fragment, stream_node);
564 } else {
565 panic!("only inner join without non-equal condition is supported for delta joins");
566 }
567 }
568
569 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
574 stream_node = state.gen_no_op_stream_node(stream_node);
575 }
576
577 stream_node.input = stream_node
579 .input
580 .into_iter()
581 .map(|mut child_node| {
582 match child_node.get_node_body()? {
583 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
586 NodeBody::Exchange(exchange_node) => {
588 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
589
590 let [input]: [_; 1] =
592 std::mem::take(&mut child_node.input).try_into().unwrap();
593 let child_fragment = build_and_add_fragment(state, input)?;
594
595 let result = state.fragment_graph.try_add_edge(
596 child_fragment.fragment_id,
597 current_fragment.fragment_id,
598 StreamFragmentEdge {
599 dispatch_strategy: exchange_node_strategy.clone(),
600 link_id: child_node.operator_id.as_raw_id(),
602 },
603 );
604
605 if result.is_err() {
609 child_node.operator_id = state.gen_operator_id();
612
613 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
615 let no_shuffle_strategy = DispatchStrategy {
616 r#type: DispatcherType::NoShuffle as i32,
617 dist_key_indices: vec![],
618 output_mapping: PbDispatchOutputMapping::identical(
619 ref_fragment_node.fields.len(),
620 )
621 .into(),
622 };
623
624 let no_shuffle_exchange_operator_id = state.gen_operator_id();
625
626 let no_op_fragment = {
627 let node = state.gen_no_op_stream_node(StreamNode {
628 operator_id: no_shuffle_exchange_operator_id,
629 identity: "StreamNoShuffleExchange".into(),
630 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
631 strategy: Some(no_shuffle_strategy.clone()),
632 }))),
633 input: vec![],
634
635 stream_key: ref_fragment_node.stream_key.clone(),
637 stream_kind: ref_fragment_node.stream_kind,
638 fields: ref_fragment_node.fields.clone(),
639 });
640
641 let mut fragment = state.new_stream_fragment();
642 fragment.node = Some(node.into());
643 Rc::new(fragment)
644 };
645
646 state.fragment_graph.add_fragment(no_op_fragment.clone());
647
648 state.fragment_graph.add_edge(
649 child_fragment.fragment_id,
650 no_op_fragment.fragment_id,
651 StreamFragmentEdge {
652 dispatch_strategy: no_shuffle_strategy,
654 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
655 },
656 );
657 state.fragment_graph.add_edge(
658 no_op_fragment.fragment_id,
659 current_fragment.fragment_id,
660 StreamFragmentEdge {
661 dispatch_strategy: exchange_node_strategy,
663 link_id: child_node.operator_id.as_raw_id(),
664 },
665 );
666 }
667
668 Ok(child_node)
669 }
670
671 _ => build_fragment(state, current_fragment, child_node),
673 }
674 })
675 .collect::<Result<_>>()?;
676 Ok(stream_node)
677 })
678}