1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39 StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49 ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56 fragment_graph: StreamFragmentGraph,
58 next_local_fragment_id: FragmentId,
60
61 next_table_id: u32,
64
65 #[educe(Default(expression = u32::MAX - 1))]
67 next_operator_id: u32,
68
69 dependent_table_ids: HashSet<TableId>,
71
72 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77 has_source_backfill: bool,
78 has_snapshot_backfill: bool,
79 has_cross_db_snapshot_backfill: bool,
80 has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84 fn new_stream_fragment(&mut self) -> StreamFragment {
86 let fragment = StreamFragment::new(self.next_local_fragment_id);
87 self.next_local_fragment_id += 1;
88 fragment
89 }
90
91 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93 self.next_operator_id -= 1;
94 LocalOperatorId::new(self.next_operator_id).into()
95 }
96
97 pub fn gen_table_id(&mut self) -> u32 {
99 let ret = self.next_table_id;
100 self.next_table_id += 1;
101 ret
102 }
103
104 pub fn gen_table_id_wrapped(&mut self) -> TableId {
106 TableId::new(self.gen_table_id())
107 }
108
109 pub fn add_share_stream_node(
110 &mut self,
111 operator_id: StreamNodeLocalOperatorId,
112 stream_node: StreamNode,
113 ) {
114 self.share_stream_node_mapping
115 .insert(operator_id, stream_node);
116 }
117
118 pub fn get_share_stream_node(
119 &mut self,
120 operator_id: StreamNodeLocalOperatorId,
121 ) -> Option<&StreamNode> {
122 self.share_stream_node_mapping.get(&operator_id)
123 }
124
125 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128 StreamNode {
129 operator_id: self.gen_operator_id(),
130 identity: "StreamNoOp".into(),
131 node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133 stream_key: input.stream_key.clone(),
135 stream_kind: input.stream_kind,
136 fields: input.fields.clone(),
137
138 input: vec![input],
139 }
140 }
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146 Table,
147 MaterializedView,
148 Source,
149 Sink,
150 Index,
151}
152
153impl GraphJobType {
154 pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155 match self {
156 GraphJobType::Table => config.streaming_parallelism_for_table(),
157 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158 GraphJobType::Source => config.streaming_parallelism_for_source(),
159 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160 GraphJobType::Index => config.streaming_parallelism_for_index(),
161 }
162 }
163}
164
165pub fn build_graph(
166 plan_node: PlanRef,
167 job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169 build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173 plan_node: PlanRef,
174 job_type: Option<GraphJobType>,
175 backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177 let ctx = plan_node.plan_base().ctx();
178 let plan_node = reorganize_elements_id(plan_node);
179
180 let mut state = BuildFragmentGraphState::default();
181 let stream_node = plan_node.to_stream_prost(&mut state)?;
182 generate_fragment_graph(&mut state, stream_node)?;
183 if state.has_source_backfill && state.has_snapshot_backfill {
184 return Err(RwError::from(NotSupported(
185 "Snapshot backfill with shared source backfill is not supported".to_owned(),
186 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188 .to_owned(),
189 )));
190 }
191 if state.has_cross_db_snapshot_backfill
192 && let Some(ref backfill_order) = backfill_order
193 && !backfill_order.order.is_empty()
194 {
195 return Err(RwError::from(NotSupported(
196 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197 "Please remove backfill order specification from your query".to_owned(),
198 )));
199 }
200
201 let (
202 normal_parallelism,
203 backfill_parallelism,
204 adaptive_parallelism_strategy,
205 backfill_adaptive_parallelism_strategy,
206 max_parallelism,
207 ) = {
208 let config = ctx.session_ctx().config();
209 let streaming_parallelism = config.streaming_parallelism();
210 let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
211 let normal_parallelism =
212 derive_parallelism(job_type, job_parallelism, streaming_parallelism);
213 let backfill_parallelism = if state.has_any_backfill {
214 derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
215 } else {
216 ResolvedParallelism {
217 parallelism: None,
218 adaptive_strategy: None,
219 }
220 };
221 (
222 normal_parallelism.parallelism,
223 backfill_parallelism.parallelism,
224 normal_parallelism
225 .adaptive_strategy
226 .as_ref()
227 .map(AdaptiveParallelismStrategy::to_string)
228 .unwrap_or_default(),
229 backfill_parallelism
230 .adaptive_strategy
231 .as_ref()
232 .map(AdaptiveParallelismStrategy::to_string)
233 .unwrap_or_default(),
234 config.streaming_max_parallelism() as _,
235 )
236 };
237
238 let config_override = ctx
239 .session_ctx()
240 .config()
241 .to_initial_streaming_config_override()
242 .context("invalid initial streaming config override")?;
243 let fragments = state
244 .fragment_graph
245 .fragments
246 .into_iter()
247 .map(|(k, v)| (k, v.to_protobuf()))
248 .collect();
249 let edges = state.fragment_graph.edges.into_values().collect();
250
251 Ok(StreamFragmentGraphProto {
252 fragments,
253 edges,
254 dependent_table_ids: state.dependent_table_ids.into_iter().collect(),
255 table_ids_cnt: state.next_table_id,
256 ctx: Some(StreamContext {
257 timezone: ctx.get_session_timezone(),
258 config_override,
259 }),
260 parallelism: normal_parallelism,
261 backfill_parallelism,
262 adaptive_parallelism_strategy,
263 backfill_adaptive_parallelism_strategy,
264 max_parallelism,
265 backfill_order,
266 })
267}
268
269#[cfg(any())]
270fn is_stateful_executor(stream_node: &StreamNode) -> bool {
271 matches!(
272 stream_node.get_node_body().unwrap(),
273 NodeBody::HashAgg(_)
274 | NodeBody::HashJoin(_)
275 | NodeBody::DeltaIndexJoin(_)
276 | NodeBody::StreamScan(_)
277 | NodeBody::StreamCdcScan(_)
278 | NodeBody::DynamicFilter(_)
279 )
280}
281
282#[cfg(any())]
287fn rewrite_stream_node(
288 state: &mut BuildFragmentGraphState,
289 stream_node: StreamNode,
290 insert_exchange_flag: bool,
291) -> Result<StreamNode> {
292 let f = |child| {
293 if is_stateful_executor(&child) {
296 if insert_exchange_flag {
297 let child_node = rewrite_stream_node(state, child, true)?;
298
299 let strategy = DispatchStrategy {
300 r#type: DispatcherType::NoShuffle.into(),
301 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
303 };
304 Ok(StreamNode {
305 stream_key: child_node.stream_key.clone(),
306 fields: child_node.fields.clone(),
307 node_body: Some(NodeBody::Exchange(ExchangeNode {
308 strategy: Some(strategy),
309 })),
310 operator_id: state.gen_operator_id(),
311 append_only: child_node.append_only,
312 input: vec![child_node],
313 identity: "Exchange (NoShuffle)".to_string(),
314 })
315 } else {
316 rewrite_stream_node(state, child, true)
317 }
318 } else {
319 match child.get_node_body()? {
320 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
322 _ => rewrite_stream_node(state, child, insert_exchange_flag),
324 }
325 }
326 };
327 Ok(StreamNode {
328 input: stream_node
329 .input
330 .into_iter()
331 .map(f)
332 .collect::<Result<_>>()?,
333 ..stream_node
334 })
335}
336
337fn generate_fragment_graph(
339 state: &mut BuildFragmentGraphState,
340 stream_node: StreamNode,
341) -> Result<()> {
342 #[cfg(any())]
346 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
347
348 build_and_add_fragment(state, stream_node)?;
349 Ok(())
350}
351
352fn build_and_add_fragment(
354 state: &mut BuildFragmentGraphState,
355 stream_node: StreamNode,
356) -> Result<Rc<StreamFragment>> {
357 let operator_id = stream_node.operator_id;
358 match state.share_mapping.get(&operator_id) {
359 None => {
360 let mut fragment = state.new_stream_fragment();
361 let node = build_fragment(state, &mut fragment, stream_node)?;
362
363 let operator_id = node.operator_id;
367
368 assert!(fragment.node.is_none());
369 fragment.node = Some(Box::new(node));
370 let fragment_ref = Rc::new(fragment);
371
372 state.fragment_graph.add_fragment(fragment_ref.clone());
373 state
374 .share_mapping
375 .insert(operator_id, fragment_ref.fragment_id);
376 Ok(fragment_ref)
377 }
378 Some(fragment_id) => Ok(state
379 .fragment_graph
380 .get_fragment(fragment_id)
381 .unwrap()
382 .clone()),
383 }
384}
385
386fn build_fragment(
389 state: &mut BuildFragmentGraphState,
390 current_fragment: &mut StreamFragment,
391 mut stream_node: StreamNode,
392) -> Result<StreamNode> {
393 recursive::tracker!().recurse(|_t| {
394 match stream_node.get_node_body()? {
396 NodeBody::BarrierRecv(_) => current_fragment
397 .fragment_type_mask
398 .add(FragmentTypeFlag::BarrierRecv),
399
400 NodeBody::Source(node) => {
401 current_fragment
402 .fragment_type_mask
403 .add(FragmentTypeFlag::Source);
404
405 if let Some(source) = node.source_inner.as_ref()
406 && let Some(source_info) = source.info.as_ref()
407 && ((source_info.is_shared() && !source_info.is_distributed)
408 || source.with_properties.requires_singleton())
409 {
410 current_fragment.requires_singleton = true;
411 }
412 }
413
414 NodeBody::Dml(_) => {
415 current_fragment
416 .fragment_type_mask
417 .add(FragmentTypeFlag::Dml);
418 }
419
420 NodeBody::Materialize(_) => {
421 current_fragment
422 .fragment_type_mask
423 .add(FragmentTypeFlag::Mview);
424 }
425
426 NodeBody::Sink(_) => current_fragment
427 .fragment_type_mask
428 .add(FragmentTypeFlag::Sink),
429
430 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
431
432 NodeBody::StreamScan(node) => {
433 current_fragment
434 .fragment_type_mask
435 .add(FragmentTypeFlag::StreamScan);
436 #[expect(deprecated)]
437 match node.stream_scan_type() {
438 StreamScanType::SnapshotBackfill => {
439 current_fragment
440 .fragment_type_mask
441 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
442 state.has_snapshot_backfill = true;
443 state.has_any_backfill = true;
444 }
445 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
446 state.has_any_backfill = true;
447 }
448 StreamScanType::CrossDbSnapshotBackfill => {
449 current_fragment
450 .fragment_type_mask
451 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
452 state.has_cross_db_snapshot_backfill = true;
453 state.has_any_backfill = true;
454 }
455 StreamScanType::Unspecified
456 | StreamScanType::Chain
457 | StreamScanType::Rearrange
458 | StreamScanType::UpstreamOnly => {}
459 }
460 state.dependent_table_ids.insert(node.table_id);
463 }
464
465 NodeBody::StreamCdcScan(node) => {
466 if let Some(o) = node.options
467 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
468 {
469 current_fragment
471 .fragment_type_mask
472 .add(FragmentTypeFlag::StreamCdcScan);
473 } else {
474 current_fragment
475 .fragment_type_mask
476 .add(FragmentTypeFlag::StreamScan);
477 current_fragment.requires_singleton = true;
479 }
480 state.has_source_backfill = true;
481 state.has_any_backfill = true;
482 }
483
484 NodeBody::CdcFilter(node) => {
485 current_fragment
486 .fragment_type_mask
487 .add(FragmentTypeFlag::CdcFilter);
488 state
490 .dependent_table_ids
491 .insert(node.upstream_source_id.as_cdc_table_id());
492 }
493 NodeBody::SourceBackfill(node) => {
494 current_fragment
495 .fragment_type_mask
496 .add(FragmentTypeFlag::SourceScan);
497 let source_id = node.upstream_source_id;
499 state
500 .dependent_table_ids
501 .insert(source_id.as_cdc_table_id());
502 state.has_source_backfill = true;
503 state.has_any_backfill = true;
504 }
505
506 NodeBody::Now(_) => {
507 current_fragment
509 .fragment_type_mask
510 .add(FragmentTypeFlag::Now);
511 current_fragment.requires_singleton = true;
512 }
513
514 NodeBody::Values(_) => {
515 current_fragment
516 .fragment_type_mask
517 .add(FragmentTypeFlag::Values);
518 current_fragment.requires_singleton = true;
519 }
520
521 NodeBody::StreamFsFetch(_) => {
522 current_fragment
523 .fragment_type_mask
524 .add(FragmentTypeFlag::FsFetch);
525 }
526
527 NodeBody::VectorIndexWrite(_) => {
528 current_fragment
529 .fragment_type_mask
530 .add(FragmentTypeFlag::VectorIndexWrite);
531 }
532
533 NodeBody::UpstreamSinkUnion(_) => {
534 current_fragment
535 .fragment_type_mask
536 .add(FragmentTypeFlag::UpstreamSinkUnion);
537 }
538
539 NodeBody::LocalityProvider(_) => {
540 current_fragment
541 .fragment_type_mask
542 .add(FragmentTypeFlag::LocalityProvider);
543 }
544
545 _ => {}
546 };
547
548 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
550 {
551 if delta_index_join.get_join_type()? == JoinType::Inner
552 && delta_index_join.condition.is_none()
553 {
554 return build_delta_join_without_arrange(state, current_fragment, stream_node);
555 } else {
556 panic!("only inner join without non-equal condition is supported for delta joins");
557 }
558 }
559
560 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
565 stream_node = state.gen_no_op_stream_node(stream_node);
566 }
567
568 stream_node.input = stream_node
570 .input
571 .into_iter()
572 .map(|mut child_node| {
573 match child_node.get_node_body()? {
574 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
577 NodeBody::Exchange(exchange_node) => {
579 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
580
581 let [input]: [_; 1] =
583 std::mem::take(&mut child_node.input).try_into().unwrap();
584 let child_fragment = build_and_add_fragment(state, input)?;
585
586 let result = state.fragment_graph.try_add_edge(
587 child_fragment.fragment_id,
588 current_fragment.fragment_id,
589 StreamFragmentEdge {
590 dispatch_strategy: exchange_node_strategy.clone(),
591 link_id: child_node.operator_id.as_raw_id(),
593 },
594 );
595
596 if result.is_err() {
600 child_node.operator_id = state.gen_operator_id();
603
604 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
606 let no_shuffle_strategy = DispatchStrategy {
607 r#type: DispatcherType::NoShuffle as i32,
608 dist_key_indices: vec![],
609 output_mapping: PbDispatchOutputMapping::identical(
610 ref_fragment_node.fields.len(),
611 )
612 .into(),
613 };
614
615 let no_shuffle_exchange_operator_id = state.gen_operator_id();
616
617 let no_op_fragment = {
618 let node = state.gen_no_op_stream_node(StreamNode {
619 operator_id: no_shuffle_exchange_operator_id,
620 identity: "StreamNoShuffleExchange".into(),
621 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
622 strategy: Some(no_shuffle_strategy.clone()),
623 }))),
624 input: vec![],
625
626 stream_key: ref_fragment_node.stream_key.clone(),
628 stream_kind: ref_fragment_node.stream_kind,
629 fields: ref_fragment_node.fields.clone(),
630 });
631
632 let mut fragment = state.new_stream_fragment();
633 fragment.node = Some(node.into());
634 Rc::new(fragment)
635 };
636
637 state.fragment_graph.add_fragment(no_op_fragment.clone());
638
639 state.fragment_graph.add_edge(
640 child_fragment.fragment_id,
641 no_op_fragment.fragment_id,
642 StreamFragmentEdge {
643 dispatch_strategy: no_shuffle_strategy,
645 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
646 },
647 );
648 state.fragment_graph.add_edge(
649 no_op_fragment.fragment_id,
650 current_fragment.fragment_id,
651 StreamFragmentEdge {
652 dispatch_strategy: exchange_node_strategy,
654 link_id: child_node.operator_id.as_raw_id(),
655 },
656 );
657 }
658
659 Ok(child_node)
660 }
661
662 _ => build_fragment(state, current_fragment, child_node),
664 }
665 })
666 .collect::<Result<_>>()?;
667 Ok(stream_node)
668 })
669}