risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_pb::plan_common::JoinType;
33use risingwave_pb::stream_plan::{
34 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
35 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
36 StreamNode, StreamScanType,
37};
38
39use self::rewrite::build_delta_join_without_arrange;
40use crate::error::ErrorCode::NotSupported;
41use crate::error::{Result, RwError};
42use crate::optimizer::plan_node::generic::GenericPlanRef;
43use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
44use crate::stream_fragmenter::parallelism::derive_parallelism;
45
46#[derive(Educe)]
48#[educe(Default)]
49pub struct BuildFragmentGraphState {
50 fragment_graph: StreamFragmentGraph,
52 next_local_fragment_id: u32,
54
55 next_table_id: u32,
58
59 #[educe(Default(expression = u32::MAX - 1))]
61 next_operator_id: u32,
62
63 dependent_table_ids: HashSet<TableId>,
65
66 share_mapping: HashMap<u32, LocalFragmentId>,
68 share_stream_node_mapping: HashMap<u32, StreamNode>,
70
71 has_source_backfill: bool,
72 has_snapshot_backfill: bool,
73 has_cross_db_snapshot_backfill: bool,
74}
75
76impl BuildFragmentGraphState {
77 fn new_stream_fragment(&mut self) -> StreamFragment {
79 let fragment = StreamFragment::new(self.next_local_fragment_id);
80 self.next_local_fragment_id += 1;
81 fragment
82 }
83
84 fn gen_operator_id(&mut self) -> u32 {
86 self.next_operator_id -= 1;
87 self.next_operator_id
88 }
89
90 pub fn gen_table_id(&mut self) -> u32 {
92 let ret = self.next_table_id;
93 self.next_table_id += 1;
94 ret
95 }
96
97 pub fn gen_table_id_wrapped(&mut self) -> TableId {
99 TableId::new(self.gen_table_id())
100 }
101
102 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
103 self.share_stream_node_mapping
104 .insert(operator_id, stream_node);
105 }
106
107 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
108 self.share_stream_node_mapping.get(&operator_id)
109 }
110
111 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
114 StreamNode {
115 operator_id: self.gen_operator_id() as u64,
116 identity: "StreamNoOp".into(),
117 node_body: Some(NodeBody::NoOp(NoOpNode {})),
118
119 stream_key: input.stream_key.clone(),
121 stream_kind: input.stream_kind,
122 fields: input.fields.clone(),
123
124 input: vec![input],
125 }
126 }
127}
128
129pub enum GraphJobType {
131 Table,
132 MaterializedView,
133 Source,
134 Sink,
135 Index,
136}
137
138impl GraphJobType {
139 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
140 match self {
141 GraphJobType::Table => config.streaming_parallelism_for_table(),
142 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
143 GraphJobType::Source => config.streaming_parallelism_for_source(),
144 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
145 GraphJobType::Index => config.streaming_parallelism_for_index(),
146 }
147 }
148}
149
150pub fn build_graph(
151 plan_node: PlanRef,
152 job_type: Option<GraphJobType>,
153) -> Result<StreamFragmentGraphProto> {
154 build_graph_with_strategy(plan_node, job_type, None)
155}
156
157pub fn build_graph_with_strategy(
158 plan_node: PlanRef,
159 job_type: Option<GraphJobType>,
160 backfill_order: Option<BackfillOrder>,
161) -> Result<StreamFragmentGraphProto> {
162 let ctx = plan_node.plan_base().ctx();
163 let plan_node = reorganize_elements_id(plan_node);
164
165 let mut state = BuildFragmentGraphState::default();
166 let stream_node = plan_node.to_stream_prost(&mut state)?;
167 generate_fragment_graph(&mut state, stream_node)?;
168 if state.has_source_backfill && state.has_snapshot_backfill {
169 return Err(RwError::from(NotSupported(
170 "Snapshot backfill with shared source backfill is not supported".to_owned(),
171 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
172 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
173 .to_owned(),
174 )));
175 }
176 if state.has_cross_db_snapshot_backfill
177 && let Some(ref backfill_order) = backfill_order
178 && !backfill_order.order.is_empty()
179 {
180 return Err(RwError::from(NotSupported(
181 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
182 "Please remove backfill order specification from your query".to_owned(),
183 )));
184 }
185
186 let mut fragment_graph = state.fragment_graph.to_protobuf();
187
188 fragment_graph.dependent_table_ids = state
190 .dependent_table_ids
191 .into_iter()
192 .map(|id| id.table_id)
193 .collect();
194 fragment_graph.table_ids_cnt = state.next_table_id;
195
196 {
198 let config = ctx.session_ctx().config();
199 fragment_graph.parallelism = derive_parallelism(
200 job_type.map(|t| t.to_parallelism(config.deref())),
201 config.streaming_parallelism(),
202 );
203 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204 }
205
206 fragment_graph.ctx = Some(StreamContext {
208 timezone: ctx.get_session_timezone(),
209 });
210
211 fragment_graph.backfill_order = backfill_order;
212
213 Ok(fragment_graph)
214}
215
216#[cfg(any())]
217fn is_stateful_executor(stream_node: &StreamNode) -> bool {
218 matches!(
219 stream_node.get_node_body().unwrap(),
220 NodeBody::HashAgg(_)
221 | NodeBody::HashJoin(_)
222 | NodeBody::DeltaIndexJoin(_)
223 | NodeBody::StreamScan(_)
224 | NodeBody::StreamCdcScan(_)
225 | NodeBody::DynamicFilter(_)
226 )
227}
228
229#[cfg(any())]
234fn rewrite_stream_node(
235 state: &mut BuildFragmentGraphState,
236 stream_node: StreamNode,
237 insert_exchange_flag: bool,
238) -> Result<StreamNode> {
239 let f = |child| {
240 if is_stateful_executor(&child) {
243 if insert_exchange_flag {
244 let child_node = rewrite_stream_node(state, child, true)?;
245
246 let strategy = DispatchStrategy {
247 r#type: DispatcherType::NoShuffle.into(),
248 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
250 };
251 Ok(StreamNode {
252 stream_key: child_node.stream_key.clone(),
253 fields: child_node.fields.clone(),
254 node_body: Some(NodeBody::Exchange(ExchangeNode {
255 strategy: Some(strategy),
256 })),
257 operator_id: state.gen_operator_id() as u64,
258 append_only: child_node.append_only,
259 input: vec![child_node],
260 identity: "Exchange (NoShuffle)".to_string(),
261 })
262 } else {
263 rewrite_stream_node(state, child, true)
264 }
265 } else {
266 match child.get_node_body()? {
267 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
269 _ => rewrite_stream_node(state, child, insert_exchange_flag),
271 }
272 }
273 };
274 Ok(StreamNode {
275 input: stream_node
276 .input
277 .into_iter()
278 .map(f)
279 .collect::<Result<_>>()?,
280 ..stream_node
281 })
282}
283
284fn generate_fragment_graph(
286 state: &mut BuildFragmentGraphState,
287 stream_node: StreamNode,
288) -> Result<()> {
289 #[cfg(any())]
293 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
294
295 build_and_add_fragment(state, stream_node)?;
296 Ok(())
297}
298
299fn build_and_add_fragment(
301 state: &mut BuildFragmentGraphState,
302 stream_node: StreamNode,
303) -> Result<Rc<StreamFragment>> {
304 let operator_id = stream_node.operator_id as u32;
305 match state.share_mapping.get(&operator_id) {
306 None => {
307 let mut fragment = state.new_stream_fragment();
308 let node = build_fragment(state, &mut fragment, stream_node)?;
309
310 let operator_id = node.operator_id as u32;
314
315 assert!(fragment.node.is_none());
316 fragment.node = Some(Box::new(node));
317 let fragment_ref = Rc::new(fragment);
318
319 state.fragment_graph.add_fragment(fragment_ref.clone());
320 state
321 .share_mapping
322 .insert(operator_id, fragment_ref.fragment_id);
323 Ok(fragment_ref)
324 }
325 Some(fragment_id) => Ok(state
326 .fragment_graph
327 .get_fragment(fragment_id)
328 .unwrap()
329 .clone()),
330 }
331}
332
333fn build_fragment(
336 state: &mut BuildFragmentGraphState,
337 current_fragment: &mut StreamFragment,
338 mut stream_node: StreamNode,
339) -> Result<StreamNode> {
340 recursive::tracker!().recurse(|_t| {
341 match stream_node.get_node_body()? {
343 NodeBody::BarrierRecv(_) => current_fragment
344 .fragment_type_mask
345 .add(FragmentTypeFlag::BarrierRecv),
346
347 NodeBody::Source(node) => {
348 current_fragment
349 .fragment_type_mask
350 .add(FragmentTypeFlag::Source);
351
352 if let Some(source) = node.source_inner.as_ref()
353 && let Some(source_info) = source.info.as_ref()
354 && ((source_info.is_shared() && !source_info.is_distributed)
355 || source.with_properties.requires_singleton())
356 {
357 current_fragment.requires_singleton = true;
358 }
359 }
360
361 NodeBody::Dml(_) => {
362 current_fragment
363 .fragment_type_mask
364 .add(FragmentTypeFlag::Dml);
365 }
366
367 NodeBody::Materialize(_) => {
368 current_fragment
369 .fragment_type_mask
370 .add(FragmentTypeFlag::Mview);
371 }
372
373 NodeBody::Sink(_) => current_fragment
374 .fragment_type_mask
375 .add(FragmentTypeFlag::Sink),
376
377 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
378
379 NodeBody::StreamScan(node) => {
380 current_fragment
381 .fragment_type_mask
382 .add(FragmentTypeFlag::StreamScan);
383 match node.stream_scan_type() {
384 StreamScanType::SnapshotBackfill => {
385 current_fragment
386 .fragment_type_mask
387 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
388 state.has_snapshot_backfill = true;
389 }
390 StreamScanType::CrossDbSnapshotBackfill => {
391 current_fragment
392 .fragment_type_mask
393 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
394 state.has_cross_db_snapshot_backfill = true;
395 }
396 StreamScanType::Unspecified
397 | StreamScanType::Chain
398 | StreamScanType::Rearrange
399 | StreamScanType::Backfill
400 | StreamScanType::UpstreamOnly
401 | StreamScanType::ArrangementBackfill => {}
402 }
403 state
406 .dependent_table_ids
407 .insert(TableId::new(node.table_id));
408 }
409
410 NodeBody::StreamCdcScan(node) => {
411 if let Some(o) = node.options
412 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
413 {
414 current_fragment
416 .fragment_type_mask
417 .add(FragmentTypeFlag::StreamCdcScan);
418 } else {
419 current_fragment
420 .fragment_type_mask
421 .add(FragmentTypeFlag::StreamScan);
422 current_fragment.requires_singleton = true;
424 }
425 state.has_source_backfill = true;
426 }
427
428 NodeBody::CdcFilter(node) => {
429 current_fragment
430 .fragment_type_mask
431 .add(FragmentTypeFlag::CdcFilter);
432 state
434 .dependent_table_ids
435 .insert(node.upstream_source_id.into());
436 }
437 NodeBody::SourceBackfill(node) => {
438 current_fragment
439 .fragment_type_mask
440 .add(FragmentTypeFlag::SourceScan);
441 let source_id = node.upstream_source_id;
443 state.dependent_table_ids.insert(source_id.into());
444 state.has_source_backfill = true;
445 }
446
447 NodeBody::Now(_) => {
448 current_fragment
450 .fragment_type_mask
451 .add(FragmentTypeFlag::Now);
452 current_fragment.requires_singleton = true;
453 }
454
455 NodeBody::Values(_) => {
456 current_fragment
457 .fragment_type_mask
458 .add(FragmentTypeFlag::Values);
459 current_fragment.requires_singleton = true;
460 }
461
462 NodeBody::StreamFsFetch(_) => {
463 current_fragment
464 .fragment_type_mask
465 .add(FragmentTypeFlag::FsFetch);
466 }
467
468 NodeBody::VectorIndexWrite(_) => {
469 current_fragment
470 .fragment_type_mask
471 .add(FragmentTypeFlag::VectorIndexWrite);
472 }
473
474 NodeBody::UpstreamSinkUnion(_) => {
475 current_fragment
476 .fragment_type_mask
477 .add(FragmentTypeFlag::UpstreamSinkUnion);
478 }
479
480 NodeBody::LocalityProvider(_) => {
481 current_fragment
482 .fragment_type_mask
483 .add(FragmentTypeFlag::LocalityProvider);
484 }
485
486 _ => {}
487 };
488
489 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
491 {
492 if delta_index_join.get_join_type()? == JoinType::Inner
493 && delta_index_join.condition.is_none()
494 {
495 return build_delta_join_without_arrange(state, current_fragment, stream_node);
496 } else {
497 panic!("only inner join without non-equal condition is supported for delta joins");
498 }
499 }
500
501 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
506 stream_node = state.gen_no_op_stream_node(stream_node);
507 }
508
509 stream_node.input = stream_node
511 .input
512 .into_iter()
513 .map(|mut child_node| {
514 match child_node.get_node_body()? {
515 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
518 NodeBody::Exchange(exchange_node) => {
520 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
521
522 let [input]: [_; 1] =
524 std::mem::take(&mut child_node.input).try_into().unwrap();
525 let child_fragment = build_and_add_fragment(state, input)?;
526
527 let result = state.fragment_graph.try_add_edge(
528 child_fragment.fragment_id,
529 current_fragment.fragment_id,
530 StreamFragmentEdge {
531 dispatch_strategy: exchange_node_strategy.clone(),
532 link_id: child_node.operator_id,
534 },
535 );
536
537 if result.is_err() {
541 child_node.operator_id = state.gen_operator_id() as u64;
544
545 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
547 let no_shuffle_strategy = DispatchStrategy {
548 r#type: DispatcherType::NoShuffle as i32,
549 dist_key_indices: vec![],
550 output_mapping: PbDispatchOutputMapping::identical(
551 ref_fragment_node.fields.len(),
552 )
553 .into(),
554 };
555
556 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
557
558 let no_op_fragment = {
559 let node = state.gen_no_op_stream_node(StreamNode {
560 operator_id: no_shuffle_exchange_operator_id,
561 identity: "StreamNoShuffleExchange".into(),
562 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
563 strategy: Some(no_shuffle_strategy.clone()),
564 }))),
565 input: vec![],
566
567 stream_key: ref_fragment_node.stream_key.clone(),
569 stream_kind: ref_fragment_node.stream_kind,
570 fields: ref_fragment_node.fields.clone(),
571 });
572
573 let mut fragment = state.new_stream_fragment();
574 fragment.node = Some(node.into());
575 Rc::new(fragment)
576 };
577
578 state.fragment_graph.add_fragment(no_op_fragment.clone());
579
580 state.fragment_graph.add_edge(
581 child_fragment.fragment_id,
582 no_op_fragment.fragment_id,
583 StreamFragmentEdge {
584 dispatch_strategy: no_shuffle_strategy,
586 link_id: no_shuffle_exchange_operator_id,
587 },
588 );
589 state.fragment_graph.add_edge(
590 no_op_fragment.fragment_id,
591 current_fragment.fragment_id,
592 StreamFragmentEdge {
593 dispatch_strategy: exchange_node_strategy,
595 link_id: child_node.operator_id,
596 },
597 );
598 }
599
600 Ok(child_node)
601 }
602
603 _ => build_fragment(state, current_fragment, child_node),
605 }
606 })
607 .collect::<Result<_>>()?;
608 Ok(stream_node)
609 })
610}