1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39 StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49 ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56 fragment_graph: StreamFragmentGraph,
58 next_local_fragment_id: FragmentId,
60
61 next_table_id: u32,
64
65 #[educe(Default(expression = u32::MAX - 1))]
67 next_operator_id: u32,
68
69 dependent_table_ids: HashSet<TableId>,
71
72 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77 has_source_backfill: bool,
78 has_snapshot_backfill: bool,
79 has_cross_db_snapshot_backfill: bool,
80 has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84 fn new_stream_fragment(&mut self) -> StreamFragment {
86 let fragment = StreamFragment::new(self.next_local_fragment_id);
87 self.next_local_fragment_id += 1;
88 fragment
89 }
90
91 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93 self.next_operator_id -= 1;
94 LocalOperatorId::new(self.next_operator_id).into()
95 }
96
97 pub fn gen_table_id(&mut self) -> u32 {
99 let ret = self.next_table_id;
100 self.next_table_id += 1;
101 ret
102 }
103
104 pub fn gen_table_id_wrapped(&mut self) -> TableId {
106 TableId::new(self.gen_table_id())
107 }
108
109 pub fn add_share_stream_node(
110 &mut self,
111 operator_id: StreamNodeLocalOperatorId,
112 stream_node: StreamNode,
113 ) {
114 self.share_stream_node_mapping
115 .insert(operator_id, stream_node);
116 }
117
118 pub fn get_share_stream_node(
119 &mut self,
120 operator_id: StreamNodeLocalOperatorId,
121 ) -> Option<&StreamNode> {
122 self.share_stream_node_mapping.get(&operator_id)
123 }
124
125 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128 StreamNode {
129 operator_id: self.gen_operator_id(),
130 identity: "StreamNoOp".into(),
131 node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133 stream_key: input.stream_key.clone(),
135 stream_kind: input.stream_kind,
136 fields: input.fields.clone(),
137
138 input: vec![input],
139 }
140 }
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146 Table,
147 MaterializedView,
148 Source,
149 Sink,
150 Index,
151}
152
153impl GraphJobType {
154 pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155 match self {
156 GraphJobType::Table => config.streaming_parallelism_for_table(),
157 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158 GraphJobType::Source => config.streaming_parallelism_for_source(),
159 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160 GraphJobType::Index => config.streaming_parallelism_for_index(),
161 }
162 }
163}
164
165pub fn build_graph(
166 plan_node: PlanRef,
167 job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169 build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173 plan_node: PlanRef,
174 job_type: Option<GraphJobType>,
175 backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177 let ctx = plan_node.plan_base().ctx();
178 let plan_node = reorganize_elements_id(plan_node);
179
180 let mut state = BuildFragmentGraphState::default();
181 let stream_node = plan_node.to_stream_prost(&mut state)?;
182 generate_fragment_graph(&mut state, stream_node)?;
183 if state.has_source_backfill && state.has_snapshot_backfill {
184 return Err(RwError::from(NotSupported(
185 "Snapshot backfill with shared source backfill is not supported".to_owned(),
186 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188 .to_owned(),
189 )));
190 }
191 if state.has_cross_db_snapshot_backfill
192 && let Some(ref backfill_order) = backfill_order
193 && !backfill_order.order.is_empty()
194 {
195 return Err(RwError::from(NotSupported(
196 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197 "Please remove backfill order specification from your query".to_owned(),
198 )));
199 }
200
201 let (
202 normal_parallelism,
203 backfill_parallelism,
204 adaptive_parallelism_strategy,
205 backfill_adaptive_parallelism_strategy,
206 max_parallelism,
207 ) = {
208 let config = ctx.session_ctx().config();
209 let streaming_parallelism = config.streaming_parallelism();
210 let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
211 let normal_parallelism =
212 derive_parallelism(job_type, job_parallelism, streaming_parallelism);
213 let backfill_parallelism = if state.has_any_backfill {
214 derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
215 } else {
216 ResolvedParallelism {
217 parallelism: None,
218 adaptive_strategy: None,
219 }
220 };
221 (
222 normal_parallelism.parallelism,
223 backfill_parallelism.parallelism,
224 normal_parallelism
225 .adaptive_strategy
226 .as_ref()
227 .map(AdaptiveParallelismStrategy::to_string)
228 .unwrap_or_default(),
229 backfill_parallelism
230 .adaptive_strategy
231 .as_ref()
232 .map(AdaptiveParallelismStrategy::to_string)
233 .unwrap_or_default(),
234 config.streaming_max_parallelism() as _,
235 )
236 };
237
238 let config_override = ctx
239 .session_ctx()
240 .config()
241 .to_initial_streaming_config_override()
242 .context("invalid initial streaming config override")?;
243 let fragments = state
244 .fragment_graph
245 .fragments
246 .into_iter()
247 .map(|(k, v)| (k, v.to_protobuf()))
248 .collect();
249 let edges = state.fragment_graph.edges.into_values().collect();
250
251 Ok(StreamFragmentGraphProto {
252 fragments,
253 edges,
254 dependent_table_ids: state.dependent_table_ids.into_iter().collect(),
255 table_ids_cnt: state.next_table_id,
256 ctx: Some(StreamContext {
257 timezone: ctx.get_session_timezone(),
258 config_override,
259 }),
260 parallelism: normal_parallelism,
261 backfill_parallelism,
262 adaptive_parallelism_strategy,
263 backfill_adaptive_parallelism_strategy,
264 max_parallelism,
265 backfill_order,
266 })
267}
268
269#[cfg(any())]
270fn is_stateful_executor(stream_node: &StreamNode) -> bool {
271 matches!(
272 stream_node.get_node_body().unwrap(),
273 NodeBody::HashAgg(_)
274 | NodeBody::HashJoin(_)
275 | NodeBody::DeltaIndexJoin(_)
276 | NodeBody::StreamScan(_)
277 | NodeBody::StreamCdcScan(_)
278 | NodeBody::DynamicFilter(_)
279 )
280}
281
282#[cfg(any())]
287fn rewrite_stream_node(
288 state: &mut BuildFragmentGraphState,
289 stream_node: StreamNode,
290 insert_exchange_flag: bool,
291) -> Result<StreamNode> {
292 let f = |child| {
293 if is_stateful_executor(&child) {
296 if insert_exchange_flag {
297 let child_node = rewrite_stream_node(state, child, true)?;
298
299 let strategy = DispatchStrategy {
300 r#type: DispatcherType::NoShuffle.into(),
301 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
303 };
304 Ok(StreamNode {
305 stream_key: child_node.stream_key.clone(),
306 fields: child_node.fields.clone(),
307 node_body: Some(NodeBody::Exchange(ExchangeNode {
308 strategy: Some(strategy),
309 })),
310 operator_id: state.gen_operator_id(),
311 append_only: child_node.append_only,
312 input: vec![child_node],
313 identity: "Exchange (NoShuffle)".to_string(),
314 })
315 } else {
316 rewrite_stream_node(state, child, true)
317 }
318 } else {
319 match child.get_node_body()? {
320 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
322 _ => rewrite_stream_node(state, child, insert_exchange_flag),
324 }
325 }
326 };
327 Ok(StreamNode {
328 input: stream_node
329 .input
330 .into_iter()
331 .map(f)
332 .collect::<Result<_>>()?,
333 ..stream_node
334 })
335}
336
337fn generate_fragment_graph(
339 state: &mut BuildFragmentGraphState,
340 stream_node: StreamNode,
341) -> Result<()> {
342 #[cfg(any())]
346 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
347
348 build_and_add_fragment(state, stream_node)?;
349 Ok(())
350}
351
352fn build_and_add_fragment(
354 state: &mut BuildFragmentGraphState,
355 stream_node: StreamNode,
356) -> Result<Rc<StreamFragment>> {
357 let operator_id = stream_node.operator_id;
358 match state.share_mapping.get(&operator_id) {
359 None => {
360 let mut fragment = state.new_stream_fragment();
361 let node = build_fragment(state, &mut fragment, stream_node)?;
362
363 let operator_id = node.operator_id;
367
368 assert!(fragment.node.is_none());
369 fragment.node = Some(Box::new(node));
370 let fragment_ref = Rc::new(fragment);
371
372 state.fragment_graph.add_fragment(fragment_ref.clone());
373 state
374 .share_mapping
375 .insert(operator_id, fragment_ref.fragment_id);
376 Ok(fragment_ref)
377 }
378 Some(fragment_id) => Ok(state
379 .fragment_graph
380 .get_fragment(fragment_id)
381 .unwrap()
382 .clone()),
383 }
384}
385
386fn build_fragment(
389 state: &mut BuildFragmentGraphState,
390 current_fragment: &mut StreamFragment,
391 mut stream_node: StreamNode,
392) -> Result<StreamNode> {
393 recursive::tracker!().recurse(|_t| {
394 match stream_node.get_node_body()? {
396 NodeBody::BarrierRecv(_) => current_fragment
397 .fragment_type_mask
398 .add(FragmentTypeFlag::BarrierRecv),
399
400 NodeBody::Source(node) => {
401 current_fragment
402 .fragment_type_mask
403 .add(FragmentTypeFlag::Source);
404
405 if let Some(source) = node.source_inner.as_ref()
406 && let Some(source_info) = source.info.as_ref()
407 && ((source_info.is_shared() && !source_info.is_distributed)
408 || source.with_properties.requires_singleton())
409 {
410 current_fragment.requires_singleton = true;
411 }
412 }
413
414 NodeBody::Dml(_) => {
415 current_fragment
416 .fragment_type_mask
417 .add(FragmentTypeFlag::Dml);
418 }
419
420 NodeBody::Materialize(_) => {
421 current_fragment
422 .fragment_type_mask
423 .add(FragmentTypeFlag::Mview);
424 }
425
426 NodeBody::Sink(_) => current_fragment
427 .fragment_type_mask
428 .add(FragmentTypeFlag::Sink),
429
430 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
431
432 NodeBody::StreamScan(node) => {
433 current_fragment
434 .fragment_type_mask
435 .add(FragmentTypeFlag::StreamScan);
436 match node.stream_scan_type() {
437 StreamScanType::SnapshotBackfill => {
438 current_fragment
439 .fragment_type_mask
440 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
441 state.has_snapshot_backfill = true;
442 state.has_any_backfill = true;
443 }
444 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
445 state.has_any_backfill = true;
446 }
447 StreamScanType::CrossDbSnapshotBackfill => {
448 current_fragment
449 .fragment_type_mask
450 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
451 state.has_cross_db_snapshot_backfill = true;
452 state.has_any_backfill = true;
453 }
454 StreamScanType::Unspecified
455 | StreamScanType::Chain
456 | StreamScanType::Rearrange
457 | StreamScanType::UpstreamOnly => {}
458 }
459 state.dependent_table_ids.insert(node.table_id);
462 }
463
464 NodeBody::StreamCdcScan(node) => {
465 if let Some(o) = node.options
466 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
467 {
468 current_fragment
470 .fragment_type_mask
471 .add(FragmentTypeFlag::StreamCdcScan);
472 } else {
473 current_fragment
474 .fragment_type_mask
475 .add(FragmentTypeFlag::StreamScan);
476 current_fragment.requires_singleton = true;
478 }
479 state.has_source_backfill = true;
480 state.has_any_backfill = true;
481 }
482
483 NodeBody::CdcFilter(node) => {
484 current_fragment
485 .fragment_type_mask
486 .add(FragmentTypeFlag::CdcFilter);
487 state
489 .dependent_table_ids
490 .insert(node.upstream_source_id.as_cdc_table_id());
491 }
492 NodeBody::SourceBackfill(node) => {
493 current_fragment
494 .fragment_type_mask
495 .add(FragmentTypeFlag::SourceScan);
496 let source_id = node.upstream_source_id;
498 state
499 .dependent_table_ids
500 .insert(source_id.as_cdc_table_id());
501 state.has_source_backfill = true;
502 state.has_any_backfill = true;
503 }
504
505 NodeBody::Now(_) => {
506 current_fragment
508 .fragment_type_mask
509 .add(FragmentTypeFlag::Now);
510 current_fragment.requires_singleton = true;
511 }
512
513 NodeBody::Values(_) => {
514 current_fragment
515 .fragment_type_mask
516 .add(FragmentTypeFlag::Values);
517 current_fragment.requires_singleton = true;
518 }
519
520 NodeBody::StreamFsFetch(_) => {
521 current_fragment
522 .fragment_type_mask
523 .add(FragmentTypeFlag::FsFetch);
524 }
525
526 NodeBody::VectorIndexWrite(_) => {
527 current_fragment
528 .fragment_type_mask
529 .add(FragmentTypeFlag::VectorIndexWrite);
530 }
531
532 NodeBody::UpstreamSinkUnion(_) => {
533 current_fragment
534 .fragment_type_mask
535 .add(FragmentTypeFlag::UpstreamSinkUnion);
536 }
537
538 NodeBody::LocalityProvider(_) => {
539 current_fragment
540 .fragment_type_mask
541 .add(FragmentTypeFlag::LocalityProvider);
542 }
543
544 _ => {}
545 };
546
547 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
549 {
550 if delta_index_join.get_join_type()? == JoinType::Inner
551 && delta_index_join.condition.is_none()
552 {
553 return build_delta_join_without_arrange(state, current_fragment, stream_node);
554 } else {
555 panic!("only inner join without non-equal condition is supported for delta joins");
556 }
557 }
558
559 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
564 stream_node = state.gen_no_op_stream_node(stream_node);
565 }
566
567 stream_node.input = stream_node
569 .input
570 .into_iter()
571 .map(|mut child_node| {
572 match child_node.get_node_body()? {
573 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
576 NodeBody::Exchange(exchange_node) => {
578 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
579
580 let [input]: [_; 1] =
582 std::mem::take(&mut child_node.input).try_into().unwrap();
583 let child_fragment = build_and_add_fragment(state, input)?;
584
585 let result = state.fragment_graph.try_add_edge(
586 child_fragment.fragment_id,
587 current_fragment.fragment_id,
588 StreamFragmentEdge {
589 dispatch_strategy: exchange_node_strategy.clone(),
590 link_id: child_node.operator_id.as_raw_id(),
592 },
593 );
594
595 if result.is_err() {
599 child_node.operator_id = state.gen_operator_id();
602
603 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
605 let no_shuffle_strategy = DispatchStrategy {
606 r#type: DispatcherType::NoShuffle as i32,
607 dist_key_indices: vec![],
608 output_mapping: PbDispatchOutputMapping::identical(
609 ref_fragment_node.fields.len(),
610 )
611 .into(),
612 };
613
614 let no_shuffle_exchange_operator_id = state.gen_operator_id();
615
616 let no_op_fragment = {
617 let node = state.gen_no_op_stream_node(StreamNode {
618 operator_id: no_shuffle_exchange_operator_id,
619 identity: "StreamNoShuffleExchange".into(),
620 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
621 strategy: Some(no_shuffle_strategy.clone()),
622 }))),
623 input: vec![],
624
625 stream_key: ref_fragment_node.stream_key.clone(),
627 stream_kind: ref_fragment_node.stream_kind,
628 fields: ref_fragment_node.fields.clone(),
629 });
630
631 let mut fragment = state.new_stream_fragment();
632 fragment.node = Some(node.into());
633 Rc::new(fragment)
634 };
635
636 state.fragment_graph.add_fragment(no_op_fragment.clone());
637
638 state.fragment_graph.add_edge(
639 child_fragment.fragment_id,
640 no_op_fragment.fragment_id,
641 StreamFragmentEdge {
642 dispatch_strategy: no_shuffle_strategy,
644 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
645 },
646 );
647 state.fragment_graph.add_edge(
648 no_op_fragment.fragment_id,
649 current_fragment.fragment_id,
650 StreamFragmentEdge {
651 dispatch_strategy: exchange_node_strategy,
653 link_id: child_node.operator_id.as_raw_id(),
654 },
655 );
656 }
657
658 Ok(child_node)
659 }
660
661 _ => build_fragment(state, current_fragment, child_node),
663 }
664 })
665 .collect::<Result<_>>()?;
666 Ok(stream_node)
667 })
668}