risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39 StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49 ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56 fragment_graph: StreamFragmentGraph,
58 next_local_fragment_id: FragmentId,
60
61 next_table_id: u32,
64
65 #[educe(Default(expression = u32::MAX - 1))]
67 next_operator_id: u32,
68
69 dependent_table_ids: HashSet<TableId>,
71
72 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77 has_source_backfill: bool,
78 has_snapshot_backfill: bool,
79 has_cross_db_snapshot_backfill: bool,
80 has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84 fn new_stream_fragment(&mut self) -> StreamFragment {
86 let fragment = StreamFragment::new(self.next_local_fragment_id);
87 self.next_local_fragment_id += 1;
88 fragment
89 }
90
91 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93 self.next_operator_id -= 1;
94 LocalOperatorId::new(self.next_operator_id).into()
95 }
96
97 pub fn gen_table_id(&mut self) -> u32 {
99 let ret = self.next_table_id;
100 self.next_table_id += 1;
101 ret
102 }
103
104 pub fn gen_table_id_wrapped(&mut self) -> TableId {
106 TableId::new(self.gen_table_id())
107 }
108
109 pub fn add_share_stream_node(
110 &mut self,
111 operator_id: StreamNodeLocalOperatorId,
112 stream_node: StreamNode,
113 ) {
114 self.share_stream_node_mapping
115 .insert(operator_id, stream_node);
116 }
117
118 pub fn get_share_stream_node(
119 &mut self,
120 operator_id: StreamNodeLocalOperatorId,
121 ) -> Option<&StreamNode> {
122 self.share_stream_node_mapping.get(&operator_id)
123 }
124
125 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128 StreamNode {
129 operator_id: self.gen_operator_id(),
130 identity: "StreamNoOp".into(),
131 node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133 stream_key: input.stream_key.clone(),
135 stream_kind: input.stream_kind,
136 fields: input.fields.clone(),
137
138 input: vec![input],
139 }
140 }
141}
142
143#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146 Table,
147 MaterializedView,
148 Source,
149 Sink,
150 Index,
151}
152
153impl GraphJobType {
154 pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155 match self {
156 GraphJobType::Table => config.streaming_parallelism_for_table(),
157 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158 GraphJobType::Source => config.streaming_parallelism_for_source(),
159 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160 GraphJobType::Index => config.streaming_parallelism_for_index(),
161 }
162 }
163}
164
165pub fn build_graph(
166 plan_node: PlanRef,
167 job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169 build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173 plan_node: PlanRef,
174 job_type: Option<GraphJobType>,
175 backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177 let ctx = plan_node.plan_base().ctx();
178 let plan_node = reorganize_elements_id(plan_node);
179
180 let mut state = BuildFragmentGraphState::default();
181 let stream_node = plan_node.to_stream_prost(&mut state)?;
182 generate_fragment_graph(&mut state, stream_node)?;
183 if state.has_source_backfill && state.has_snapshot_backfill {
184 return Err(RwError::from(NotSupported(
185 "Snapshot backfill with shared source backfill is not supported".to_owned(),
186 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188 .to_owned(),
189 )));
190 }
191 if state.has_cross_db_snapshot_backfill
192 && let Some(ref backfill_order) = backfill_order
193 && !backfill_order.order.is_empty()
194 {
195 return Err(RwError::from(NotSupported(
196 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197 "Please remove backfill order specification from your query".to_owned(),
198 )));
199 }
200
201 let mut fragment_graph = state.fragment_graph.to_protobuf();
202
203 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
205 fragment_graph.table_ids_cnt = state.next_table_id;
206
207 let parallelism_strategy = {
209 let config = ctx.session_ctx().config();
210 let streaming_parallelism = config.streaming_parallelism();
211 let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
212 let normal_parallelism =
213 derive_parallelism(job_type, job_parallelism, streaming_parallelism);
214 let backfill_parallelism = if state.has_any_backfill {
215 derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
216 } else {
217 ResolvedParallelism {
218 parallelism: None,
219 adaptive_strategy: None,
220 }
221 };
222 fragment_graph.parallelism = normal_parallelism.parallelism;
223 fragment_graph.backfill_parallelism = backfill_parallelism.parallelism;
224 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
225 (
226 normal_parallelism.adaptive_strategy,
227 backfill_parallelism.adaptive_strategy,
228 )
229 };
230
231 let config_override = ctx
233 .session_ctx()
234 .config()
235 .to_initial_streaming_config_override()
236 .context("invalid initial streaming config override")?;
237 let adaptive_parallelism_strategy = parallelism_strategy
238 .0
239 .as_ref()
240 .map(AdaptiveParallelismStrategy::to_string)
241 .unwrap_or_default();
242 let backfill_adaptive_parallelism_strategy = parallelism_strategy
243 .1
244 .as_ref()
245 .map(AdaptiveParallelismStrategy::to_string)
246 .unwrap_or_default();
247 fragment_graph.ctx = Some(StreamContext {
248 timezone: ctx.get_session_timezone(),
249 config_override,
250 adaptive_parallelism_strategy,
251 backfill_adaptive_parallelism_strategy,
252 });
253
254 fragment_graph.backfill_order = backfill_order;
255
256 Ok(fragment_graph)
257}
258
259#[cfg(any())]
260fn is_stateful_executor(stream_node: &StreamNode) -> bool {
261 matches!(
262 stream_node.get_node_body().unwrap(),
263 NodeBody::HashAgg(_)
264 | NodeBody::HashJoin(_)
265 | NodeBody::DeltaIndexJoin(_)
266 | NodeBody::StreamScan(_)
267 | NodeBody::StreamCdcScan(_)
268 | NodeBody::DynamicFilter(_)
269 )
270}
271
272#[cfg(any())]
277fn rewrite_stream_node(
278 state: &mut BuildFragmentGraphState,
279 stream_node: StreamNode,
280 insert_exchange_flag: bool,
281) -> Result<StreamNode> {
282 let f = |child| {
283 if is_stateful_executor(&child) {
286 if insert_exchange_flag {
287 let child_node = rewrite_stream_node(state, child, true)?;
288
289 let strategy = DispatchStrategy {
290 r#type: DispatcherType::NoShuffle.into(),
291 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
293 };
294 Ok(StreamNode {
295 stream_key: child_node.stream_key.clone(),
296 fields: child_node.fields.clone(),
297 node_body: Some(NodeBody::Exchange(ExchangeNode {
298 strategy: Some(strategy),
299 })),
300 operator_id: state.gen_operator_id(),
301 append_only: child_node.append_only,
302 input: vec![child_node],
303 identity: "Exchange (NoShuffle)".to_string(),
304 })
305 } else {
306 rewrite_stream_node(state, child, true)
307 }
308 } else {
309 match child.get_node_body()? {
310 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
312 _ => rewrite_stream_node(state, child, insert_exchange_flag),
314 }
315 }
316 };
317 Ok(StreamNode {
318 input: stream_node
319 .input
320 .into_iter()
321 .map(f)
322 .collect::<Result<_>>()?,
323 ..stream_node
324 })
325}
326
327fn generate_fragment_graph(
329 state: &mut BuildFragmentGraphState,
330 stream_node: StreamNode,
331) -> Result<()> {
332 #[cfg(any())]
336 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
337
338 build_and_add_fragment(state, stream_node)?;
339 Ok(())
340}
341
342fn build_and_add_fragment(
344 state: &mut BuildFragmentGraphState,
345 stream_node: StreamNode,
346) -> Result<Rc<StreamFragment>> {
347 let operator_id = stream_node.operator_id;
348 match state.share_mapping.get(&operator_id) {
349 None => {
350 let mut fragment = state.new_stream_fragment();
351 let node = build_fragment(state, &mut fragment, stream_node)?;
352
353 let operator_id = node.operator_id;
357
358 assert!(fragment.node.is_none());
359 fragment.node = Some(Box::new(node));
360 let fragment_ref = Rc::new(fragment);
361
362 state.fragment_graph.add_fragment(fragment_ref.clone());
363 state
364 .share_mapping
365 .insert(operator_id, fragment_ref.fragment_id);
366 Ok(fragment_ref)
367 }
368 Some(fragment_id) => Ok(state
369 .fragment_graph
370 .get_fragment(fragment_id)
371 .unwrap()
372 .clone()),
373 }
374}
375
376fn build_fragment(
379 state: &mut BuildFragmentGraphState,
380 current_fragment: &mut StreamFragment,
381 mut stream_node: StreamNode,
382) -> Result<StreamNode> {
383 recursive::tracker!().recurse(|_t| {
384 match stream_node.get_node_body()? {
386 NodeBody::BarrierRecv(_) => current_fragment
387 .fragment_type_mask
388 .add(FragmentTypeFlag::BarrierRecv),
389
390 NodeBody::Source(node) => {
391 current_fragment
392 .fragment_type_mask
393 .add(FragmentTypeFlag::Source);
394
395 if let Some(source) = node.source_inner.as_ref()
396 && let Some(source_info) = source.info.as_ref()
397 && ((source_info.is_shared() && !source_info.is_distributed)
398 || source.with_properties.requires_singleton())
399 {
400 current_fragment.requires_singleton = true;
401 }
402 }
403
404 NodeBody::Dml(_) => {
405 current_fragment
406 .fragment_type_mask
407 .add(FragmentTypeFlag::Dml);
408 }
409
410 NodeBody::Materialize(_) => {
411 current_fragment
412 .fragment_type_mask
413 .add(FragmentTypeFlag::Mview);
414 }
415
416 NodeBody::Sink(_) => current_fragment
417 .fragment_type_mask
418 .add(FragmentTypeFlag::Sink),
419
420 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
421
422 NodeBody::StreamScan(node) => {
423 current_fragment
424 .fragment_type_mask
425 .add(FragmentTypeFlag::StreamScan);
426 match node.stream_scan_type() {
427 StreamScanType::SnapshotBackfill => {
428 current_fragment
429 .fragment_type_mask
430 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
431 state.has_snapshot_backfill = true;
432 state.has_any_backfill = true;
433 }
434 StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
435 state.has_any_backfill = true;
436 }
437 StreamScanType::CrossDbSnapshotBackfill => {
438 current_fragment
439 .fragment_type_mask
440 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
441 state.has_cross_db_snapshot_backfill = true;
442 state.has_any_backfill = true;
443 }
444 StreamScanType::Unspecified
445 | StreamScanType::Chain
446 | StreamScanType::Rearrange
447 | StreamScanType::UpstreamOnly => {}
448 }
449 state.dependent_table_ids.insert(node.table_id);
452 }
453
454 NodeBody::StreamCdcScan(node) => {
455 if let Some(o) = node.options
456 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
457 {
458 current_fragment
460 .fragment_type_mask
461 .add(FragmentTypeFlag::StreamCdcScan);
462 } else {
463 current_fragment
464 .fragment_type_mask
465 .add(FragmentTypeFlag::StreamScan);
466 current_fragment.requires_singleton = true;
468 }
469 state.has_source_backfill = true;
470 state.has_any_backfill = true;
471 }
472
473 NodeBody::CdcFilter(node) => {
474 current_fragment
475 .fragment_type_mask
476 .add(FragmentTypeFlag::CdcFilter);
477 state
479 .dependent_table_ids
480 .insert(node.upstream_source_id.as_cdc_table_id());
481 }
482 NodeBody::SourceBackfill(node) => {
483 current_fragment
484 .fragment_type_mask
485 .add(FragmentTypeFlag::SourceScan);
486 let source_id = node.upstream_source_id;
488 state
489 .dependent_table_ids
490 .insert(source_id.as_cdc_table_id());
491 state.has_source_backfill = true;
492 state.has_any_backfill = true;
493 }
494
495 NodeBody::Now(_) => {
496 current_fragment
498 .fragment_type_mask
499 .add(FragmentTypeFlag::Now);
500 current_fragment.requires_singleton = true;
501 }
502
503 NodeBody::Values(_) => {
504 current_fragment
505 .fragment_type_mask
506 .add(FragmentTypeFlag::Values);
507 current_fragment.requires_singleton = true;
508 }
509
510 NodeBody::StreamFsFetch(_) => {
511 current_fragment
512 .fragment_type_mask
513 .add(FragmentTypeFlag::FsFetch);
514 }
515
516 NodeBody::VectorIndexWrite(_) => {
517 current_fragment
518 .fragment_type_mask
519 .add(FragmentTypeFlag::VectorIndexWrite);
520 }
521
522 NodeBody::UpstreamSinkUnion(_) => {
523 current_fragment
524 .fragment_type_mask
525 .add(FragmentTypeFlag::UpstreamSinkUnion);
526 }
527
528 NodeBody::LocalityProvider(_) => {
529 current_fragment
530 .fragment_type_mask
531 .add(FragmentTypeFlag::LocalityProvider);
532 }
533
534 _ => {}
535 };
536
537 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
539 {
540 if delta_index_join.get_join_type()? == JoinType::Inner
541 && delta_index_join.condition.is_none()
542 {
543 return build_delta_join_without_arrange(state, current_fragment, stream_node);
544 } else {
545 panic!("only inner join without non-equal condition is supported for delta joins");
546 }
547 }
548
549 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
554 stream_node = state.gen_no_op_stream_node(stream_node);
555 }
556
557 stream_node.input = stream_node
559 .input
560 .into_iter()
561 .map(|mut child_node| {
562 match child_node.get_node_body()? {
563 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
566 NodeBody::Exchange(exchange_node) => {
568 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
569
570 let [input]: [_; 1] =
572 std::mem::take(&mut child_node.input).try_into().unwrap();
573 let child_fragment = build_and_add_fragment(state, input)?;
574
575 let result = state.fragment_graph.try_add_edge(
576 child_fragment.fragment_id,
577 current_fragment.fragment_id,
578 StreamFragmentEdge {
579 dispatch_strategy: exchange_node_strategy.clone(),
580 link_id: child_node.operator_id.as_raw_id(),
582 },
583 );
584
585 if result.is_err() {
589 child_node.operator_id = state.gen_operator_id();
592
593 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
595 let no_shuffle_strategy = DispatchStrategy {
596 r#type: DispatcherType::NoShuffle as i32,
597 dist_key_indices: vec![],
598 output_mapping: PbDispatchOutputMapping::identical(
599 ref_fragment_node.fields.len(),
600 )
601 .into(),
602 };
603
604 let no_shuffle_exchange_operator_id = state.gen_operator_id();
605
606 let no_op_fragment = {
607 let node = state.gen_no_op_stream_node(StreamNode {
608 operator_id: no_shuffle_exchange_operator_id,
609 identity: "StreamNoShuffleExchange".into(),
610 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
611 strategy: Some(no_shuffle_strategy.clone()),
612 }))),
613 input: vec![],
614
615 stream_key: ref_fragment_node.stream_key.clone(),
617 stream_kind: ref_fragment_node.stream_kind,
618 fields: ref_fragment_node.fields.clone(),
619 });
620
621 let mut fragment = state.new_stream_fragment();
622 fragment.node = Some(node.into());
623 Rc::new(fragment)
624 };
625
626 state.fragment_graph.add_fragment(no_op_fragment.clone());
627
628 state.fragment_graph.add_edge(
629 child_fragment.fragment_id,
630 no_op_fragment.fragment_id,
631 StreamFragmentEdge {
632 dispatch_strategy: no_shuffle_strategy,
634 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
635 },
636 );
637 state.fragment_graph.add_edge(
638 no_op_fragment.fragment_id,
639 current_fragment.fragment_id,
640 StreamFragmentEdge {
641 dispatch_strategy: exchange_node_strategy,
643 link_id: child_node.operator_id.as_raw_id(),
644 },
645 );
646 }
647
648 Ok(child_node)
649 }
650
651 _ => build_fragment(state, current_fragment, child_node),
653 }
654 })
655 .collect::<Result<_>>()?;
656 Ok(stream_node)
657 })
658}