risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::plan_common::JoinType;
35use risingwave_pb::stream_plan::{
36 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
37 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
38 StreamNode, StreamScanType,
39};
40
41use self::rewrite::build_delta_join_without_arrange;
42use crate::catalog::FragmentId;
43use crate::error::ErrorCode::NotSupported;
44use crate::error::{Result, RwError};
45use crate::optimizer::plan_node::generic::GenericPlanRef;
46use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
47use crate::stream_fragmenter::parallelism::derive_parallelism;
48
49#[derive(Educe)]
51#[educe(Default)]
52pub struct BuildFragmentGraphState {
53 fragment_graph: StreamFragmentGraph,
55 next_local_fragment_id: FragmentId,
57
58 next_table_id: u32,
61
62 #[educe(Default(expression = u32::MAX - 1))]
64 next_operator_id: u32,
65
66 dependent_table_ids: HashSet<TableId>,
68
69 share_mapping: HashMap<u32, LocalFragmentId>,
71 share_stream_node_mapping: HashMap<u32, StreamNode>,
73
74 has_source_backfill: bool,
75 has_snapshot_backfill: bool,
76 has_cross_db_snapshot_backfill: bool,
77 tables: HashMap<TableId, Table>,
78}
79
80impl BuildFragmentGraphState {
81 fn new_stream_fragment(&mut self) -> StreamFragment {
83 let fragment = StreamFragment::new(self.next_local_fragment_id);
84 self.next_local_fragment_id += 1;
85 fragment
86 }
87
88 fn gen_operator_id(&mut self) -> u32 {
90 self.next_operator_id -= 1;
91 self.next_operator_id
92 }
93
94 pub fn gen_table_id(&mut self) -> u32 {
96 let ret = self.next_table_id;
97 self.next_table_id += 1;
98 ret
99 }
100
101 pub fn gen_table_id_wrapped(&mut self) -> TableId {
103 TableId::new(self.gen_table_id())
104 }
105
106 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
107 self.share_stream_node_mapping
108 .insert(operator_id, stream_node);
109 }
110
111 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
112 self.share_stream_node_mapping.get(&operator_id)
113 }
114
115 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
118 StreamNode {
119 operator_id: self.gen_operator_id() as u64,
120 identity: "StreamNoOp".into(),
121 node_body: Some(NodeBody::NoOp(NoOpNode {})),
122
123 stream_key: input.stream_key.clone(),
125 stream_kind: input.stream_kind,
126 fields: input.fields.clone(),
127
128 input: vec![input],
129 }
130 }
131}
132
133pub enum GraphJobType {
135 Table,
136 MaterializedView,
137 Source,
138 Sink,
139 Index,
140}
141
142impl GraphJobType {
143 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
144 match self {
145 GraphJobType::Table => config.streaming_parallelism_for_table(),
146 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
147 GraphJobType::Source => config.streaming_parallelism_for_source(),
148 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
149 GraphJobType::Index => config.streaming_parallelism_for_index(),
150 }
151 }
152}
153
154pub fn build_graph(
155 plan_node: PlanRef,
156 job_type: Option<GraphJobType>,
157) -> Result<StreamFragmentGraphProto> {
158 build_graph_with_strategy(plan_node, job_type, None)
159}
160
161pub fn build_graph_with_strategy(
162 plan_node: PlanRef,
163 job_type: Option<GraphJobType>,
164 backfill_order: Option<BackfillOrder>,
165) -> Result<StreamFragmentGraphProto> {
166 let ctx = plan_node.plan_base().ctx();
167 let plan_node = reorganize_elements_id(plan_node);
168
169 let mut state = BuildFragmentGraphState::default();
170 let stream_node = plan_node.to_stream_prost(&mut state)?;
171 generate_fragment_graph(&mut state, stream_node)?;
172 if state.has_source_backfill && state.has_snapshot_backfill {
173 return Err(RwError::from(NotSupported(
174 "Snapshot backfill with shared source backfill is not supported".to_owned(),
175 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
176 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
177 .to_owned(),
178 )));
179 }
180 if state.has_cross_db_snapshot_backfill
181 && let Some(ref backfill_order) = backfill_order
182 && !backfill_order.order.is_empty()
183 {
184 return Err(RwError::from(NotSupported(
185 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
186 "Please remove backfill order specification from your query".to_owned(),
187 )));
188 }
189
190 let mut fragment_graph = state.fragment_graph.to_protobuf();
191
192 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
194 fragment_graph.table_ids_cnt = state.next_table_id;
195
196 {
198 let config = ctx.session_ctx().config();
199 fragment_graph.parallelism = derive_parallelism(
200 job_type.map(|t| t.to_parallelism(config.deref())),
201 config.streaming_parallelism(),
202 );
203 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204 }
205
206 let config_override = ctx
208 .session_ctx()
209 .config()
210 .to_initial_streaming_config_override()
211 .context("invalid initial streaming config override")?;
212 fragment_graph.ctx = Some(StreamContext {
213 timezone: ctx.get_session_timezone(),
214 config_override,
215 });
216
217 fragment_graph.backfill_order = backfill_order;
218
219 Ok(fragment_graph)
220}
221
222#[cfg(any())]
223fn is_stateful_executor(stream_node: &StreamNode) -> bool {
224 matches!(
225 stream_node.get_node_body().unwrap(),
226 NodeBody::HashAgg(_)
227 | NodeBody::HashJoin(_)
228 | NodeBody::DeltaIndexJoin(_)
229 | NodeBody::StreamScan(_)
230 | NodeBody::StreamCdcScan(_)
231 | NodeBody::DynamicFilter(_)
232 )
233}
234
235#[cfg(any())]
240fn rewrite_stream_node(
241 state: &mut BuildFragmentGraphState,
242 stream_node: StreamNode,
243 insert_exchange_flag: bool,
244) -> Result<StreamNode> {
245 let f = |child| {
246 if is_stateful_executor(&child) {
249 if insert_exchange_flag {
250 let child_node = rewrite_stream_node(state, child, true)?;
251
252 let strategy = DispatchStrategy {
253 r#type: DispatcherType::NoShuffle.into(),
254 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
256 };
257 Ok(StreamNode {
258 stream_key: child_node.stream_key.clone(),
259 fields: child_node.fields.clone(),
260 node_body: Some(NodeBody::Exchange(ExchangeNode {
261 strategy: Some(strategy),
262 })),
263 operator_id: state.gen_operator_id() as u64,
264 append_only: child_node.append_only,
265 input: vec![child_node],
266 identity: "Exchange (NoShuffle)".to_string(),
267 })
268 } else {
269 rewrite_stream_node(state, child, true)
270 }
271 } else {
272 match child.get_node_body()? {
273 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
275 _ => rewrite_stream_node(state, child, insert_exchange_flag),
277 }
278 }
279 };
280 Ok(StreamNode {
281 input: stream_node
282 .input
283 .into_iter()
284 .map(f)
285 .collect::<Result<_>>()?,
286 ..stream_node
287 })
288}
289
290fn generate_fragment_graph(
292 state: &mut BuildFragmentGraphState,
293 stream_node: StreamNode,
294) -> Result<()> {
295 #[cfg(any())]
299 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
300
301 build_and_add_fragment(state, stream_node)?;
302 Ok(())
303}
304
305fn build_and_add_fragment(
307 state: &mut BuildFragmentGraphState,
308 stream_node: StreamNode,
309) -> Result<Rc<StreamFragment>> {
310 let operator_id = stream_node.operator_id as u32;
311 match state.share_mapping.get(&operator_id) {
312 None => {
313 let mut fragment = state.new_stream_fragment();
314 let node = build_fragment(state, &mut fragment, stream_node)?;
315
316 let operator_id = node.operator_id as u32;
320
321 assert!(fragment.node.is_none());
322 fragment.node = Some(Box::new(node));
323 let fragment_ref = Rc::new(fragment);
324
325 state.fragment_graph.add_fragment(fragment_ref.clone());
326 state
327 .share_mapping
328 .insert(operator_id, fragment_ref.fragment_id);
329 Ok(fragment_ref)
330 }
331 Some(fragment_id) => Ok(state
332 .fragment_graph
333 .get_fragment(fragment_id)
334 .unwrap()
335 .clone()),
336 }
337}
338
339fn build_fragment(
342 state: &mut BuildFragmentGraphState,
343 current_fragment: &mut StreamFragment,
344 mut stream_node: StreamNode,
345) -> Result<StreamNode> {
346 recursive::tracker!().recurse(|_t| {
347 match stream_node.get_node_body()? {
349 NodeBody::BarrierRecv(_) => current_fragment
350 .fragment_type_mask
351 .add(FragmentTypeFlag::BarrierRecv),
352
353 NodeBody::Source(node) => {
354 current_fragment
355 .fragment_type_mask
356 .add(FragmentTypeFlag::Source);
357
358 if let Some(source) = node.source_inner.as_ref()
359 && let Some(source_info) = source.info.as_ref()
360 && ((source_info.is_shared() && !source_info.is_distributed)
361 || source.with_properties.requires_singleton())
362 {
363 current_fragment.requires_singleton = true;
364 }
365 }
366
367 NodeBody::Dml(_) => {
368 current_fragment
369 .fragment_type_mask
370 .add(FragmentTypeFlag::Dml);
371 }
372
373 NodeBody::Materialize(_) => {
374 current_fragment
375 .fragment_type_mask
376 .add(FragmentTypeFlag::Mview);
377 }
378
379 NodeBody::Sink(_) => current_fragment
380 .fragment_type_mask
381 .add(FragmentTypeFlag::Sink),
382
383 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
384
385 NodeBody::EowcGapFill(node) => {
386 let table = node.buffer_table.as_ref().unwrap().clone();
387 state.tables.insert(table.id, table);
388 let table = node.prev_row_table.as_ref().unwrap().clone();
389 state.tables.insert(table.id, table);
390 }
391
392 NodeBody::GapFill(node) => {
393 let table = node.state_table.as_ref().unwrap().clone();
394 state.tables.insert(table.id, table);
395 }
396
397 NodeBody::StreamScan(node) => {
398 current_fragment
399 .fragment_type_mask
400 .add(FragmentTypeFlag::StreamScan);
401 match node.stream_scan_type() {
402 StreamScanType::SnapshotBackfill => {
403 current_fragment
404 .fragment_type_mask
405 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
406 state.has_snapshot_backfill = true;
407 }
408 StreamScanType::CrossDbSnapshotBackfill => {
409 current_fragment
410 .fragment_type_mask
411 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
412 state.has_cross_db_snapshot_backfill = true;
413 }
414 StreamScanType::Unspecified
415 | StreamScanType::Chain
416 | StreamScanType::Rearrange
417 | StreamScanType::Backfill
418 | StreamScanType::UpstreamOnly
419 | StreamScanType::ArrangementBackfill => {}
420 }
421 state.dependent_table_ids.insert(node.table_id);
424
425 if let Some(state_table) = &node.state_table {
427 let table = state_table.clone();
428 state.tables.insert(table.id, table);
429 }
430 }
431
432 NodeBody::StreamCdcScan(node) => {
433 if let Some(o) = node.options
434 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
435 {
436 current_fragment
438 .fragment_type_mask
439 .add(FragmentTypeFlag::StreamCdcScan);
440 } else {
441 current_fragment
442 .fragment_type_mask
443 .add(FragmentTypeFlag::StreamScan);
444 current_fragment.requires_singleton = true;
446 }
447 state.has_source_backfill = true;
448 }
449
450 NodeBody::CdcFilter(node) => {
451 current_fragment
452 .fragment_type_mask
453 .add(FragmentTypeFlag::CdcFilter);
454 state
456 .dependent_table_ids
457 .insert(node.upstream_source_id.as_cdc_table_id());
458 }
459 NodeBody::SourceBackfill(node) => {
460 current_fragment
461 .fragment_type_mask
462 .add(FragmentTypeFlag::SourceScan);
463 let source_id = node.upstream_source_id;
465 state
466 .dependent_table_ids
467 .insert(source_id.as_cdc_table_id());
468 state.has_source_backfill = true;
469 }
470
471 NodeBody::Now(_) => {
472 current_fragment
474 .fragment_type_mask
475 .add(FragmentTypeFlag::Now);
476 current_fragment.requires_singleton = true;
477 }
478
479 NodeBody::Values(_) => {
480 current_fragment
481 .fragment_type_mask
482 .add(FragmentTypeFlag::Values);
483 current_fragment.requires_singleton = true;
484 }
485
486 NodeBody::StreamFsFetch(_) => {
487 current_fragment
488 .fragment_type_mask
489 .add(FragmentTypeFlag::FsFetch);
490 }
491
492 NodeBody::VectorIndexWrite(_) => {
493 current_fragment
494 .fragment_type_mask
495 .add(FragmentTypeFlag::VectorIndexWrite);
496 }
497
498 NodeBody::UpstreamSinkUnion(_) => {
499 current_fragment
500 .fragment_type_mask
501 .add(FragmentTypeFlag::UpstreamSinkUnion);
502 }
503
504 NodeBody::LocalityProvider(_) => {
505 current_fragment
506 .fragment_type_mask
507 .add(FragmentTypeFlag::LocalityProvider);
508 }
509
510 _ => {}
511 };
512
513 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
515 {
516 if delta_index_join.get_join_type()? == JoinType::Inner
517 && delta_index_join.condition.is_none()
518 {
519 return build_delta_join_without_arrange(state, current_fragment, stream_node);
520 } else {
521 panic!("only inner join without non-equal condition is supported for delta joins");
522 }
523 }
524
525 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
530 stream_node = state.gen_no_op_stream_node(stream_node);
531 }
532
533 stream_node.input = stream_node
535 .input
536 .into_iter()
537 .map(|mut child_node| {
538 match child_node.get_node_body()? {
539 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
542 NodeBody::Exchange(exchange_node) => {
544 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
545
546 let [input]: [_; 1] =
548 std::mem::take(&mut child_node.input).try_into().unwrap();
549 let child_fragment = build_and_add_fragment(state, input)?;
550
551 let result = state.fragment_graph.try_add_edge(
552 child_fragment.fragment_id,
553 current_fragment.fragment_id,
554 StreamFragmentEdge {
555 dispatch_strategy: exchange_node_strategy.clone(),
556 link_id: child_node.operator_id,
558 },
559 );
560
561 if result.is_err() {
565 child_node.operator_id = state.gen_operator_id() as u64;
568
569 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
571 let no_shuffle_strategy = DispatchStrategy {
572 r#type: DispatcherType::NoShuffle as i32,
573 dist_key_indices: vec![],
574 output_mapping: PbDispatchOutputMapping::identical(
575 ref_fragment_node.fields.len(),
576 )
577 .into(),
578 };
579
580 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
581
582 let no_op_fragment = {
583 let node = state.gen_no_op_stream_node(StreamNode {
584 operator_id: no_shuffle_exchange_operator_id,
585 identity: "StreamNoShuffleExchange".into(),
586 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
587 strategy: Some(no_shuffle_strategy.clone()),
588 }))),
589 input: vec![],
590
591 stream_key: ref_fragment_node.stream_key.clone(),
593 stream_kind: ref_fragment_node.stream_kind,
594 fields: ref_fragment_node.fields.clone(),
595 });
596
597 let mut fragment = state.new_stream_fragment();
598 fragment.node = Some(node.into());
599 Rc::new(fragment)
600 };
601
602 state.fragment_graph.add_fragment(no_op_fragment.clone());
603
604 state.fragment_graph.add_edge(
605 child_fragment.fragment_id,
606 no_op_fragment.fragment_id,
607 StreamFragmentEdge {
608 dispatch_strategy: no_shuffle_strategy,
610 link_id: no_shuffle_exchange_operator_id,
611 },
612 );
613 state.fragment_graph.add_edge(
614 no_op_fragment.fragment_id,
615 current_fragment.fragment_id,
616 StreamFragmentEdge {
617 dispatch_strategy: exchange_node_strategy,
619 link_id: child_node.operator_id,
620 },
621 );
622 }
623
624 Ok(child_node)
625 }
626
627 _ => build_fragment(state, current_fragment, child_node),
629 }
630 })
631 .collect::<Result<_>>()?;
632 Ok(stream_node)
633 })
634}