risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_connector::source::cdc::CdcScanOptions;
33use risingwave_pb::plan_common::JoinType;
34use risingwave_pb::stream_plan::{
35 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
36 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
37 StreamNode, StreamScanType,
38};
39
40use self::rewrite::build_delta_join_without_arrange;
41use crate::error::ErrorCode::NotSupported;
42use crate::error::{Result, RwError};
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
45use crate::stream_fragmenter::parallelism::derive_parallelism;
46
47#[derive(Educe)]
49#[educe(Default)]
50pub struct BuildFragmentGraphState {
51 fragment_graph: StreamFragmentGraph,
53 next_local_fragment_id: u32,
55
56 next_table_id: u32,
59
60 #[educe(Default(expression = u32::MAX - 1))]
62 next_operator_id: u32,
63
64 dependent_table_ids: HashSet<TableId>,
66
67 share_mapping: HashMap<u32, LocalFragmentId>,
69 share_stream_node_mapping: HashMap<u32, StreamNode>,
71
72 has_source_backfill: bool,
73 has_snapshot_backfill: bool,
74 has_cross_db_snapshot_backfill: bool,
75 tables: HashMap<TableId, Table>,
76}
77
78impl BuildFragmentGraphState {
79 fn new_stream_fragment(&mut self) -> StreamFragment {
81 let fragment = StreamFragment::new(self.next_local_fragment_id);
82 self.next_local_fragment_id += 1;
83 fragment
84 }
85
86 fn gen_operator_id(&mut self) -> u32 {
88 self.next_operator_id -= 1;
89 self.next_operator_id
90 }
91
92 pub fn gen_table_id(&mut self) -> u32 {
94 let ret = self.next_table_id;
95 self.next_table_id += 1;
96 ret
97 }
98
99 pub fn gen_table_id_wrapped(&mut self) -> TableId {
101 TableId::new(self.gen_table_id())
102 }
103
104 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
105 self.share_stream_node_mapping
106 .insert(operator_id, stream_node);
107 }
108
109 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
110 self.share_stream_node_mapping.get(&operator_id)
111 }
112
113 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
116 StreamNode {
117 operator_id: self.gen_operator_id() as u64,
118 identity: "StreamNoOp".into(),
119 node_body: Some(NodeBody::NoOp(NoOpNode {})),
120
121 stream_key: input.stream_key.clone(),
123 stream_kind: input.stream_kind,
124 fields: input.fields.clone(),
125
126 input: vec![input],
127 }
128 }
129}
130
131pub enum GraphJobType {
133 Table,
134 MaterializedView,
135 Source,
136 Sink,
137 Index,
138}
139
140impl GraphJobType {
141 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
142 match self {
143 GraphJobType::Table => config.streaming_parallelism_for_table(),
144 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
145 GraphJobType::Source => config.streaming_parallelism_for_source(),
146 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
147 GraphJobType::Index => config.streaming_parallelism_for_index(),
148 }
149 }
150}
151
152pub fn build_graph(
153 plan_node: PlanRef,
154 job_type: Option<GraphJobType>,
155) -> Result<StreamFragmentGraphProto> {
156 build_graph_with_strategy(plan_node, job_type, None)
157}
158
159pub fn build_graph_with_strategy(
160 plan_node: PlanRef,
161 job_type: Option<GraphJobType>,
162 backfill_order: Option<BackfillOrder>,
163) -> Result<StreamFragmentGraphProto> {
164 let ctx = plan_node.plan_base().ctx();
165 let plan_node = reorganize_elements_id(plan_node);
166
167 let mut state = BuildFragmentGraphState::default();
168 let stream_node = plan_node.to_stream_prost(&mut state)?;
169 generate_fragment_graph(&mut state, stream_node)?;
170 if state.has_source_backfill && state.has_snapshot_backfill {
171 return Err(RwError::from(NotSupported(
172 "Snapshot backfill with shared source backfill is not supported".to_owned(),
173 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
174 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
175 .to_owned(),
176 )));
177 }
178 if state.has_cross_db_snapshot_backfill
179 && let Some(ref backfill_order) = backfill_order
180 && !backfill_order.order.is_empty()
181 {
182 return Err(RwError::from(NotSupported(
183 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
184 "Please remove backfill order specification from your query".to_owned(),
185 )));
186 }
187
188 let mut fragment_graph = state.fragment_graph.to_protobuf();
189
190 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
192 fragment_graph.table_ids_cnt = state.next_table_id;
193
194 {
196 let config = ctx.session_ctx().config();
197 fragment_graph.parallelism = derive_parallelism(
198 job_type.map(|t| t.to_parallelism(config.deref())),
199 config.streaming_parallelism(),
200 );
201 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
202 }
203
204 fragment_graph.ctx = Some(StreamContext {
206 timezone: ctx.get_session_timezone(),
207 });
208
209 fragment_graph.backfill_order = backfill_order;
210
211 Ok(fragment_graph)
212}
213
214#[cfg(any())]
215fn is_stateful_executor(stream_node: &StreamNode) -> bool {
216 matches!(
217 stream_node.get_node_body().unwrap(),
218 NodeBody::HashAgg(_)
219 | NodeBody::HashJoin(_)
220 | NodeBody::DeltaIndexJoin(_)
221 | NodeBody::StreamScan(_)
222 | NodeBody::StreamCdcScan(_)
223 | NodeBody::DynamicFilter(_)
224 )
225}
226
227#[cfg(any())]
232fn rewrite_stream_node(
233 state: &mut BuildFragmentGraphState,
234 stream_node: StreamNode,
235 insert_exchange_flag: bool,
236) -> Result<StreamNode> {
237 let f = |child| {
238 if is_stateful_executor(&child) {
241 if insert_exchange_flag {
242 let child_node = rewrite_stream_node(state, child, true)?;
243
244 let strategy = DispatchStrategy {
245 r#type: DispatcherType::NoShuffle.into(),
246 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
248 };
249 Ok(StreamNode {
250 stream_key: child_node.stream_key.clone(),
251 fields: child_node.fields.clone(),
252 node_body: Some(NodeBody::Exchange(ExchangeNode {
253 strategy: Some(strategy),
254 })),
255 operator_id: state.gen_operator_id() as u64,
256 append_only: child_node.append_only,
257 input: vec![child_node],
258 identity: "Exchange (NoShuffle)".to_string(),
259 })
260 } else {
261 rewrite_stream_node(state, child, true)
262 }
263 } else {
264 match child.get_node_body()? {
265 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
267 _ => rewrite_stream_node(state, child, insert_exchange_flag),
269 }
270 }
271 };
272 Ok(StreamNode {
273 input: stream_node
274 .input
275 .into_iter()
276 .map(f)
277 .collect::<Result<_>>()?,
278 ..stream_node
279 })
280}
281
282fn generate_fragment_graph(
284 state: &mut BuildFragmentGraphState,
285 stream_node: StreamNode,
286) -> Result<()> {
287 #[cfg(any())]
291 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
292
293 build_and_add_fragment(state, stream_node)?;
294 Ok(())
295}
296
297fn build_and_add_fragment(
299 state: &mut BuildFragmentGraphState,
300 stream_node: StreamNode,
301) -> Result<Rc<StreamFragment>> {
302 let operator_id = stream_node.operator_id as u32;
303 match state.share_mapping.get(&operator_id) {
304 None => {
305 let mut fragment = state.new_stream_fragment();
306 let node = build_fragment(state, &mut fragment, stream_node)?;
307
308 let operator_id = node.operator_id as u32;
312
313 assert!(fragment.node.is_none());
314 fragment.node = Some(Box::new(node));
315 let fragment_ref = Rc::new(fragment);
316
317 state.fragment_graph.add_fragment(fragment_ref.clone());
318 state
319 .share_mapping
320 .insert(operator_id, fragment_ref.fragment_id);
321 Ok(fragment_ref)
322 }
323 Some(fragment_id) => Ok(state
324 .fragment_graph
325 .get_fragment(fragment_id)
326 .unwrap()
327 .clone()),
328 }
329}
330
331fn build_fragment(
334 state: &mut BuildFragmentGraphState,
335 current_fragment: &mut StreamFragment,
336 mut stream_node: StreamNode,
337) -> Result<StreamNode> {
338 recursive::tracker!().recurse(|_t| {
339 match stream_node.get_node_body()? {
341 NodeBody::BarrierRecv(_) => current_fragment
342 .fragment_type_mask
343 .add(FragmentTypeFlag::BarrierRecv),
344
345 NodeBody::Source(node) => {
346 current_fragment
347 .fragment_type_mask
348 .add(FragmentTypeFlag::Source);
349
350 if let Some(source) = node.source_inner.as_ref()
351 && let Some(source_info) = source.info.as_ref()
352 && ((source_info.is_shared() && !source_info.is_distributed)
353 || source.with_properties.requires_singleton())
354 {
355 current_fragment.requires_singleton = true;
356 }
357 }
358
359 NodeBody::Dml(_) => {
360 current_fragment
361 .fragment_type_mask
362 .add(FragmentTypeFlag::Dml);
363 }
364
365 NodeBody::Materialize(_) => {
366 current_fragment
367 .fragment_type_mask
368 .add(FragmentTypeFlag::Mview);
369 }
370
371 NodeBody::Sink(_) => current_fragment
372 .fragment_type_mask
373 .add(FragmentTypeFlag::Sink),
374
375 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
376
377 NodeBody::EowcGapFill(node) => {
378 let table = node.buffer_table.as_ref().unwrap().clone();
379 state.tables.insert(table.id, table);
380 let table = node.prev_row_table.as_ref().unwrap().clone();
381 state.tables.insert(table.id, table);
382 }
383
384 NodeBody::GapFill(node) => {
385 let table = node.state_table.as_ref().unwrap().clone();
386 state.tables.insert(table.id, table);
387 }
388
389 NodeBody::StreamScan(node) => {
390 current_fragment
391 .fragment_type_mask
392 .add(FragmentTypeFlag::StreamScan);
393 match node.stream_scan_type() {
394 StreamScanType::SnapshotBackfill => {
395 current_fragment
396 .fragment_type_mask
397 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
398 state.has_snapshot_backfill = true;
399 }
400 StreamScanType::CrossDbSnapshotBackfill => {
401 current_fragment
402 .fragment_type_mask
403 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
404 state.has_cross_db_snapshot_backfill = true;
405 }
406 StreamScanType::Unspecified
407 | StreamScanType::Chain
408 | StreamScanType::Rearrange
409 | StreamScanType::Backfill
410 | StreamScanType::UpstreamOnly
411 | StreamScanType::ArrangementBackfill => {}
412 }
413 state.dependent_table_ids.insert(node.table_id);
416
417 if let Some(state_table) = &node.state_table {
419 let table = state_table.clone();
420 state.tables.insert(table.id, table);
421 }
422 }
423
424 NodeBody::StreamCdcScan(node) => {
425 if let Some(o) = node.options
426 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
427 {
428 current_fragment
430 .fragment_type_mask
431 .add(FragmentTypeFlag::StreamCdcScan);
432 } else {
433 current_fragment
434 .fragment_type_mask
435 .add(FragmentTypeFlag::StreamScan);
436 current_fragment.requires_singleton = true;
438 }
439 state.has_source_backfill = true;
440 }
441
442 NodeBody::CdcFilter(node) => {
443 current_fragment
444 .fragment_type_mask
445 .add(FragmentTypeFlag::CdcFilter);
446 state
448 .dependent_table_ids
449 .insert(node.upstream_source_id.into());
450 }
451 NodeBody::SourceBackfill(node) => {
452 current_fragment
453 .fragment_type_mask
454 .add(FragmentTypeFlag::SourceScan);
455 let source_id = node.upstream_source_id;
457 state.dependent_table_ids.insert(source_id.into());
458 state.has_source_backfill = true;
459 }
460
461 NodeBody::Now(_) => {
462 current_fragment
464 .fragment_type_mask
465 .add(FragmentTypeFlag::Now);
466 current_fragment.requires_singleton = true;
467 }
468
469 NodeBody::Values(_) => {
470 current_fragment
471 .fragment_type_mask
472 .add(FragmentTypeFlag::Values);
473 current_fragment.requires_singleton = true;
474 }
475
476 NodeBody::StreamFsFetch(_) => {
477 current_fragment
478 .fragment_type_mask
479 .add(FragmentTypeFlag::FsFetch);
480 }
481
482 NodeBody::VectorIndexWrite(_) => {
483 current_fragment
484 .fragment_type_mask
485 .add(FragmentTypeFlag::VectorIndexWrite);
486 }
487
488 NodeBody::UpstreamSinkUnion(_) => {
489 current_fragment
490 .fragment_type_mask
491 .add(FragmentTypeFlag::UpstreamSinkUnion);
492 }
493
494 NodeBody::LocalityProvider(_) => {
495 current_fragment
496 .fragment_type_mask
497 .add(FragmentTypeFlag::LocalityProvider);
498 }
499
500 _ => {}
501 };
502
503 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
505 {
506 if delta_index_join.get_join_type()? == JoinType::Inner
507 && delta_index_join.condition.is_none()
508 {
509 return build_delta_join_without_arrange(state, current_fragment, stream_node);
510 } else {
511 panic!("only inner join without non-equal condition is supported for delta joins");
512 }
513 }
514
515 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
520 stream_node = state.gen_no_op_stream_node(stream_node);
521 }
522
523 stream_node.input = stream_node
525 .input
526 .into_iter()
527 .map(|mut child_node| {
528 match child_node.get_node_body()? {
529 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
532 NodeBody::Exchange(exchange_node) => {
534 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
535
536 let [input]: [_; 1] =
538 std::mem::take(&mut child_node.input).try_into().unwrap();
539 let child_fragment = build_and_add_fragment(state, input)?;
540
541 let result = state.fragment_graph.try_add_edge(
542 child_fragment.fragment_id,
543 current_fragment.fragment_id,
544 StreamFragmentEdge {
545 dispatch_strategy: exchange_node_strategy.clone(),
546 link_id: child_node.operator_id,
548 },
549 );
550
551 if result.is_err() {
555 child_node.operator_id = state.gen_operator_id() as u64;
558
559 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
561 let no_shuffle_strategy = DispatchStrategy {
562 r#type: DispatcherType::NoShuffle as i32,
563 dist_key_indices: vec![],
564 output_mapping: PbDispatchOutputMapping::identical(
565 ref_fragment_node.fields.len(),
566 )
567 .into(),
568 };
569
570 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
571
572 let no_op_fragment = {
573 let node = state.gen_no_op_stream_node(StreamNode {
574 operator_id: no_shuffle_exchange_operator_id,
575 identity: "StreamNoShuffleExchange".into(),
576 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
577 strategy: Some(no_shuffle_strategy.clone()),
578 }))),
579 input: vec![],
580
581 stream_key: ref_fragment_node.stream_key.clone(),
583 stream_kind: ref_fragment_node.stream_kind,
584 fields: ref_fragment_node.fields.clone(),
585 });
586
587 let mut fragment = state.new_stream_fragment();
588 fragment.node = Some(node.into());
589 Rc::new(fragment)
590 };
591
592 state.fragment_graph.add_fragment(no_op_fragment.clone());
593
594 state.fragment_graph.add_edge(
595 child_fragment.fragment_id,
596 no_op_fragment.fragment_id,
597 StreamFragmentEdge {
598 dispatch_strategy: no_shuffle_strategy,
600 link_id: no_shuffle_exchange_operator_id,
601 },
602 );
603 state.fragment_graph.add_edge(
604 no_op_fragment.fragment_id,
605 current_fragment.fragment_id,
606 StreamFragmentEdge {
607 dispatch_strategy: exchange_node_strategy,
609 link_id: child_node.operator_id,
610 },
611 );
612 }
613
614 Ok(child_node)
615 }
616
617 _ => build_fragment(state, current_fragment, child_node),
619 }
620 })
621 .collect::<Result<_>>()?;
622 Ok(stream_node)
623 })
624}