risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39 StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::derive_parallelism;
49
50#[derive(Educe)]
52#[educe(Default)]
53pub struct BuildFragmentGraphState {
54 fragment_graph: StreamFragmentGraph,
56 next_local_fragment_id: FragmentId,
58
59 next_table_id: u32,
62
63 #[educe(Default(expression = u32::MAX - 1))]
65 next_operator_id: u32,
66
67 dependent_table_ids: HashSet<TableId>,
69
70 share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
72 share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
74
75 has_source_backfill: bool,
76 has_snapshot_backfill: bool,
77 has_cross_db_snapshot_backfill: bool,
78 tables: HashMap<TableId, Table>,
79}
80
81impl BuildFragmentGraphState {
82 fn new_stream_fragment(&mut self) -> StreamFragment {
84 let fragment = StreamFragment::new(self.next_local_fragment_id);
85 self.next_local_fragment_id += 1;
86 fragment
87 }
88
89 fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
91 self.next_operator_id -= 1;
92 LocalOperatorId::new(self.next_operator_id).into()
93 }
94
95 pub fn gen_table_id(&mut self) -> u32 {
97 let ret = self.next_table_id;
98 self.next_table_id += 1;
99 ret
100 }
101
102 pub fn gen_table_id_wrapped(&mut self) -> TableId {
104 TableId::new(self.gen_table_id())
105 }
106
107 pub fn add_share_stream_node(
108 &mut self,
109 operator_id: StreamNodeLocalOperatorId,
110 stream_node: StreamNode,
111 ) {
112 self.share_stream_node_mapping
113 .insert(operator_id, stream_node);
114 }
115
116 pub fn get_share_stream_node(
117 &mut self,
118 operator_id: StreamNodeLocalOperatorId,
119 ) -> Option<&StreamNode> {
120 self.share_stream_node_mapping.get(&operator_id)
121 }
122
123 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
126 StreamNode {
127 operator_id: self.gen_operator_id(),
128 identity: "StreamNoOp".into(),
129 node_body: Some(NodeBody::NoOp(NoOpNode {})),
130
131 stream_key: input.stream_key.clone(),
133 stream_kind: input.stream_kind,
134 fields: input.fields.clone(),
135
136 input: vec![input],
137 }
138 }
139}
140
141pub enum GraphJobType {
143 Table,
144 MaterializedView,
145 Source,
146 Sink,
147 Index,
148}
149
150impl GraphJobType {
151 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
152 match self {
153 GraphJobType::Table => config.streaming_parallelism_for_table(),
154 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
155 GraphJobType::Source => config.streaming_parallelism_for_source(),
156 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
157 GraphJobType::Index => config.streaming_parallelism_for_index(),
158 }
159 }
160}
161
162pub fn build_graph(
163 plan_node: PlanRef,
164 job_type: Option<GraphJobType>,
165) -> Result<StreamFragmentGraphProto> {
166 build_graph_with_strategy(plan_node, job_type, None)
167}
168
169pub fn build_graph_with_strategy(
170 plan_node: PlanRef,
171 job_type: Option<GraphJobType>,
172 backfill_order: Option<BackfillOrder>,
173) -> Result<StreamFragmentGraphProto> {
174 let ctx = plan_node.plan_base().ctx();
175 let plan_node = reorganize_elements_id(plan_node);
176
177 let mut state = BuildFragmentGraphState::default();
178 let stream_node = plan_node.to_stream_prost(&mut state)?;
179 generate_fragment_graph(&mut state, stream_node)?;
180 if state.has_source_backfill && state.has_snapshot_backfill {
181 return Err(RwError::from(NotSupported(
182 "Snapshot backfill with shared source backfill is not supported".to_owned(),
183 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
184 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
185 .to_owned(),
186 )));
187 }
188 if state.has_cross_db_snapshot_backfill
189 && let Some(ref backfill_order) = backfill_order
190 && !backfill_order.order.is_empty()
191 {
192 return Err(RwError::from(NotSupported(
193 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
194 "Please remove backfill order specification from your query".to_owned(),
195 )));
196 }
197
198 let mut fragment_graph = state.fragment_graph.to_protobuf();
199
200 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
202 fragment_graph.table_ids_cnt = state.next_table_id;
203
204 {
206 let config = ctx.session_ctx().config();
207 fragment_graph.parallelism = derive_parallelism(
208 job_type.map(|t| t.to_parallelism(config.deref())),
209 config.streaming_parallelism(),
210 );
211 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
212 }
213
214 let config_override = ctx
216 .session_ctx()
217 .config()
218 .to_initial_streaming_config_override()
219 .context("invalid initial streaming config override")?;
220 fragment_graph.ctx = Some(StreamContext {
221 timezone: ctx.get_session_timezone(),
222 config_override,
223 });
224
225 fragment_graph.backfill_order = backfill_order;
226
227 Ok(fragment_graph)
228}
229
230#[cfg(any())]
231fn is_stateful_executor(stream_node: &StreamNode) -> bool {
232 matches!(
233 stream_node.get_node_body().unwrap(),
234 NodeBody::HashAgg(_)
235 | NodeBody::HashJoin(_)
236 | NodeBody::DeltaIndexJoin(_)
237 | NodeBody::StreamScan(_)
238 | NodeBody::StreamCdcScan(_)
239 | NodeBody::DynamicFilter(_)
240 )
241}
242
243#[cfg(any())]
248fn rewrite_stream_node(
249 state: &mut BuildFragmentGraphState,
250 stream_node: StreamNode,
251 insert_exchange_flag: bool,
252) -> Result<StreamNode> {
253 let f = |child| {
254 if is_stateful_executor(&child) {
257 if insert_exchange_flag {
258 let child_node = rewrite_stream_node(state, child, true)?;
259
260 let strategy = DispatchStrategy {
261 r#type: DispatcherType::NoShuffle.into(),
262 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
264 };
265 Ok(StreamNode {
266 stream_key: child_node.stream_key.clone(),
267 fields: child_node.fields.clone(),
268 node_body: Some(NodeBody::Exchange(ExchangeNode {
269 strategy: Some(strategy),
270 })),
271 operator_id: state.gen_operator_id(),
272 append_only: child_node.append_only,
273 input: vec![child_node],
274 identity: "Exchange (NoShuffle)".to_string(),
275 })
276 } else {
277 rewrite_stream_node(state, child, true)
278 }
279 } else {
280 match child.get_node_body()? {
281 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
283 _ => rewrite_stream_node(state, child, insert_exchange_flag),
285 }
286 }
287 };
288 Ok(StreamNode {
289 input: stream_node
290 .input
291 .into_iter()
292 .map(f)
293 .collect::<Result<_>>()?,
294 ..stream_node
295 })
296}
297
298fn generate_fragment_graph(
300 state: &mut BuildFragmentGraphState,
301 stream_node: StreamNode,
302) -> Result<()> {
303 #[cfg(any())]
307 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
308
309 build_and_add_fragment(state, stream_node)?;
310 Ok(())
311}
312
313fn build_and_add_fragment(
315 state: &mut BuildFragmentGraphState,
316 stream_node: StreamNode,
317) -> Result<Rc<StreamFragment>> {
318 let operator_id = stream_node.operator_id;
319 match state.share_mapping.get(&operator_id) {
320 None => {
321 let mut fragment = state.new_stream_fragment();
322 let node = build_fragment(state, &mut fragment, stream_node)?;
323
324 let operator_id = node.operator_id;
328
329 assert!(fragment.node.is_none());
330 fragment.node = Some(Box::new(node));
331 let fragment_ref = Rc::new(fragment);
332
333 state.fragment_graph.add_fragment(fragment_ref.clone());
334 state
335 .share_mapping
336 .insert(operator_id, fragment_ref.fragment_id);
337 Ok(fragment_ref)
338 }
339 Some(fragment_id) => Ok(state
340 .fragment_graph
341 .get_fragment(fragment_id)
342 .unwrap()
343 .clone()),
344 }
345}
346
347fn build_fragment(
350 state: &mut BuildFragmentGraphState,
351 current_fragment: &mut StreamFragment,
352 mut stream_node: StreamNode,
353) -> Result<StreamNode> {
354 recursive::tracker!().recurse(|_t| {
355 match stream_node.get_node_body()? {
357 NodeBody::BarrierRecv(_) => current_fragment
358 .fragment_type_mask
359 .add(FragmentTypeFlag::BarrierRecv),
360
361 NodeBody::Source(node) => {
362 current_fragment
363 .fragment_type_mask
364 .add(FragmentTypeFlag::Source);
365
366 if let Some(source) = node.source_inner.as_ref()
367 && let Some(source_info) = source.info.as_ref()
368 && ((source_info.is_shared() && !source_info.is_distributed)
369 || source.with_properties.requires_singleton())
370 {
371 current_fragment.requires_singleton = true;
372 }
373 }
374
375 NodeBody::Dml(_) => {
376 current_fragment
377 .fragment_type_mask
378 .add(FragmentTypeFlag::Dml);
379 }
380
381 NodeBody::Materialize(_) => {
382 current_fragment
383 .fragment_type_mask
384 .add(FragmentTypeFlag::Mview);
385 }
386
387 NodeBody::Sink(_) => current_fragment
388 .fragment_type_mask
389 .add(FragmentTypeFlag::Sink),
390
391 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
392
393 NodeBody::EowcGapFill(node) => {
394 let table = node.buffer_table.as_ref().unwrap().clone();
395 state.tables.insert(table.id, table);
396 let table = node.prev_row_table.as_ref().unwrap().clone();
397 state.tables.insert(table.id, table);
398 }
399
400 NodeBody::GapFill(node) => {
401 let table = node.state_table.as_ref().unwrap().clone();
402 state.tables.insert(table.id, table);
403 }
404
405 NodeBody::StreamScan(node) => {
406 current_fragment
407 .fragment_type_mask
408 .add(FragmentTypeFlag::StreamScan);
409 match node.stream_scan_type() {
410 StreamScanType::SnapshotBackfill => {
411 current_fragment
412 .fragment_type_mask
413 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
414 state.has_snapshot_backfill = true;
415 }
416 StreamScanType::CrossDbSnapshotBackfill => {
417 current_fragment
418 .fragment_type_mask
419 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
420 state.has_cross_db_snapshot_backfill = true;
421 }
422 StreamScanType::Unspecified
423 | StreamScanType::Chain
424 | StreamScanType::Rearrange
425 | StreamScanType::Backfill
426 | StreamScanType::UpstreamOnly
427 | StreamScanType::ArrangementBackfill => {}
428 }
429 state.dependent_table_ids.insert(node.table_id);
432
433 if let Some(state_table) = &node.state_table {
435 let table = state_table.clone();
436 state.tables.insert(table.id, table);
437 }
438 }
439
440 NodeBody::StreamCdcScan(node) => {
441 if let Some(o) = node.options
442 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
443 {
444 current_fragment
446 .fragment_type_mask
447 .add(FragmentTypeFlag::StreamCdcScan);
448 } else {
449 current_fragment
450 .fragment_type_mask
451 .add(FragmentTypeFlag::StreamScan);
452 current_fragment.requires_singleton = true;
454 }
455 state.has_source_backfill = true;
456 }
457
458 NodeBody::CdcFilter(node) => {
459 current_fragment
460 .fragment_type_mask
461 .add(FragmentTypeFlag::CdcFilter);
462 state
464 .dependent_table_ids
465 .insert(node.upstream_source_id.as_cdc_table_id());
466 }
467 NodeBody::SourceBackfill(node) => {
468 current_fragment
469 .fragment_type_mask
470 .add(FragmentTypeFlag::SourceScan);
471 let source_id = node.upstream_source_id;
473 state
474 .dependent_table_ids
475 .insert(source_id.as_cdc_table_id());
476 state.has_source_backfill = true;
477 }
478
479 NodeBody::Now(_) => {
480 current_fragment
482 .fragment_type_mask
483 .add(FragmentTypeFlag::Now);
484 current_fragment.requires_singleton = true;
485 }
486
487 NodeBody::Values(_) => {
488 current_fragment
489 .fragment_type_mask
490 .add(FragmentTypeFlag::Values);
491 current_fragment.requires_singleton = true;
492 }
493
494 NodeBody::StreamFsFetch(_) => {
495 current_fragment
496 .fragment_type_mask
497 .add(FragmentTypeFlag::FsFetch);
498 }
499
500 NodeBody::VectorIndexWrite(_) => {
501 current_fragment
502 .fragment_type_mask
503 .add(FragmentTypeFlag::VectorIndexWrite);
504 }
505
506 NodeBody::UpstreamSinkUnion(_) => {
507 current_fragment
508 .fragment_type_mask
509 .add(FragmentTypeFlag::UpstreamSinkUnion);
510 }
511
512 NodeBody::LocalityProvider(_) => {
513 current_fragment
514 .fragment_type_mask
515 .add(FragmentTypeFlag::LocalityProvider);
516 }
517
518 _ => {}
519 };
520
521 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
523 {
524 if delta_index_join.get_join_type()? == JoinType::Inner
525 && delta_index_join.condition.is_none()
526 {
527 return build_delta_join_without_arrange(state, current_fragment, stream_node);
528 } else {
529 panic!("only inner join without non-equal condition is supported for delta joins");
530 }
531 }
532
533 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
538 stream_node = state.gen_no_op_stream_node(stream_node);
539 }
540
541 stream_node.input = stream_node
543 .input
544 .into_iter()
545 .map(|mut child_node| {
546 match child_node.get_node_body()? {
547 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
550 NodeBody::Exchange(exchange_node) => {
552 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
553
554 let [input]: [_; 1] =
556 std::mem::take(&mut child_node.input).try_into().unwrap();
557 let child_fragment = build_and_add_fragment(state, input)?;
558
559 let result = state.fragment_graph.try_add_edge(
560 child_fragment.fragment_id,
561 current_fragment.fragment_id,
562 StreamFragmentEdge {
563 dispatch_strategy: exchange_node_strategy.clone(),
564 link_id: child_node.operator_id.as_raw_id(),
566 },
567 );
568
569 if result.is_err() {
573 child_node.operator_id = state.gen_operator_id();
576
577 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
579 let no_shuffle_strategy = DispatchStrategy {
580 r#type: DispatcherType::NoShuffle as i32,
581 dist_key_indices: vec![],
582 output_mapping: PbDispatchOutputMapping::identical(
583 ref_fragment_node.fields.len(),
584 )
585 .into(),
586 };
587
588 let no_shuffle_exchange_operator_id = state.gen_operator_id();
589
590 let no_op_fragment = {
591 let node = state.gen_no_op_stream_node(StreamNode {
592 operator_id: no_shuffle_exchange_operator_id,
593 identity: "StreamNoShuffleExchange".into(),
594 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
595 strategy: Some(no_shuffle_strategy.clone()),
596 }))),
597 input: vec![],
598
599 stream_key: ref_fragment_node.stream_key.clone(),
601 stream_kind: ref_fragment_node.stream_kind,
602 fields: ref_fragment_node.fields.clone(),
603 });
604
605 let mut fragment = state.new_stream_fragment();
606 fragment.node = Some(node.into());
607 Rc::new(fragment)
608 };
609
610 state.fragment_graph.add_fragment(no_op_fragment.clone());
611
612 state.fragment_graph.add_edge(
613 child_fragment.fragment_id,
614 no_op_fragment.fragment_id,
615 StreamFragmentEdge {
616 dispatch_strategy: no_shuffle_strategy,
618 link_id: no_shuffle_exchange_operator_id.as_raw_id(),
619 },
620 );
621 state.fragment_graph.add_edge(
622 no_op_fragment.fragment_id,
623 current_fragment.fragment_id,
624 StreamFragmentEdge {
625 dispatch_strategy: exchange_node_strategy,
627 link_id: child_node.operator_id.as_raw_id(),
628 },
629 );
630 }
631
632 Ok(child_node)
633 }
634
635 _ => build_fragment(state, current_fragment, child_node),
637 }
638 })
639 .collect::<Result<_>>()?;
640 Ok(stream_node)
641 })
642}