risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_connector::source::cdc::CdcScanOptions;
33use risingwave_pb::plan_common::JoinType;
34use risingwave_pb::stream_plan::{
35 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
36 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
37 StreamNode, StreamScanType,
38};
39
40use self::rewrite::build_delta_join_without_arrange;
41use crate::catalog::FragmentId;
42use crate::error::ErrorCode::NotSupported;
43use crate::error::{Result, RwError};
44use crate::optimizer::plan_node::generic::GenericPlanRef;
45use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
46use crate::stream_fragmenter::parallelism::derive_parallelism;
47
48#[derive(Educe)]
50#[educe(Default)]
51pub struct BuildFragmentGraphState {
52 fragment_graph: StreamFragmentGraph,
54 next_local_fragment_id: FragmentId,
56
57 next_table_id: u32,
60
61 #[educe(Default(expression = u32::MAX - 1))]
63 next_operator_id: u32,
64
65 dependent_table_ids: HashSet<TableId>,
67
68 share_mapping: HashMap<u32, LocalFragmentId>,
70 share_stream_node_mapping: HashMap<u32, StreamNode>,
72
73 has_source_backfill: bool,
74 has_snapshot_backfill: bool,
75 has_cross_db_snapshot_backfill: bool,
76 tables: HashMap<TableId, Table>,
77}
78
79impl BuildFragmentGraphState {
80 fn new_stream_fragment(&mut self) -> StreamFragment {
82 let fragment = StreamFragment::new(self.next_local_fragment_id);
83 self.next_local_fragment_id += 1;
84 fragment
85 }
86
87 fn gen_operator_id(&mut self) -> u32 {
89 self.next_operator_id -= 1;
90 self.next_operator_id
91 }
92
93 pub fn gen_table_id(&mut self) -> u32 {
95 let ret = self.next_table_id;
96 self.next_table_id += 1;
97 ret
98 }
99
100 pub fn gen_table_id_wrapped(&mut self) -> TableId {
102 TableId::new(self.gen_table_id())
103 }
104
105 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
106 self.share_stream_node_mapping
107 .insert(operator_id, stream_node);
108 }
109
110 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
111 self.share_stream_node_mapping.get(&operator_id)
112 }
113
114 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
117 StreamNode {
118 operator_id: self.gen_operator_id() as u64,
119 identity: "StreamNoOp".into(),
120 node_body: Some(NodeBody::NoOp(NoOpNode {})),
121
122 stream_key: input.stream_key.clone(),
124 stream_kind: input.stream_kind,
125 fields: input.fields.clone(),
126
127 input: vec![input],
128 }
129 }
130}
131
132pub enum GraphJobType {
134 Table,
135 MaterializedView,
136 Source,
137 Sink,
138 Index,
139}
140
141impl GraphJobType {
142 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
143 match self {
144 GraphJobType::Table => config.streaming_parallelism_for_table(),
145 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
146 GraphJobType::Source => config.streaming_parallelism_for_source(),
147 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
148 GraphJobType::Index => config.streaming_parallelism_for_index(),
149 }
150 }
151}
152
153pub fn build_graph(
154 plan_node: PlanRef,
155 job_type: Option<GraphJobType>,
156) -> Result<StreamFragmentGraphProto> {
157 build_graph_with_strategy(plan_node, job_type, None)
158}
159
160pub fn build_graph_with_strategy(
161 plan_node: PlanRef,
162 job_type: Option<GraphJobType>,
163 backfill_order: Option<BackfillOrder>,
164) -> Result<StreamFragmentGraphProto> {
165 let ctx = plan_node.plan_base().ctx();
166 let plan_node = reorganize_elements_id(plan_node);
167
168 let mut state = BuildFragmentGraphState::default();
169 let stream_node = plan_node.to_stream_prost(&mut state)?;
170 generate_fragment_graph(&mut state, stream_node)?;
171 if state.has_source_backfill && state.has_snapshot_backfill {
172 return Err(RwError::from(NotSupported(
173 "Snapshot backfill with shared source backfill is not supported".to_owned(),
174 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
175 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
176 .to_owned(),
177 )));
178 }
179 if state.has_cross_db_snapshot_backfill
180 && let Some(ref backfill_order) = backfill_order
181 && !backfill_order.order.is_empty()
182 {
183 return Err(RwError::from(NotSupported(
184 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
185 "Please remove backfill order specification from your query".to_owned(),
186 )));
187 }
188
189 let mut fragment_graph = state.fragment_graph.to_protobuf();
190
191 fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
193 fragment_graph.table_ids_cnt = state.next_table_id;
194
195 {
197 let config = ctx.session_ctx().config();
198 fragment_graph.parallelism = derive_parallelism(
199 job_type.map(|t| t.to_parallelism(config.deref())),
200 config.streaming_parallelism(),
201 );
202 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
203 }
204
205 fragment_graph.ctx = Some(StreamContext {
207 timezone: ctx.get_session_timezone(),
208 config_override: "".to_owned(), });
210
211 fragment_graph.backfill_order = backfill_order;
212
213 Ok(fragment_graph)
214}
215
216#[cfg(any())]
217fn is_stateful_executor(stream_node: &StreamNode) -> bool {
218 matches!(
219 stream_node.get_node_body().unwrap(),
220 NodeBody::HashAgg(_)
221 | NodeBody::HashJoin(_)
222 | NodeBody::DeltaIndexJoin(_)
223 | NodeBody::StreamScan(_)
224 | NodeBody::StreamCdcScan(_)
225 | NodeBody::DynamicFilter(_)
226 )
227}
228
229#[cfg(any())]
234fn rewrite_stream_node(
235 state: &mut BuildFragmentGraphState,
236 stream_node: StreamNode,
237 insert_exchange_flag: bool,
238) -> Result<StreamNode> {
239 let f = |child| {
240 if is_stateful_executor(&child) {
243 if insert_exchange_flag {
244 let child_node = rewrite_stream_node(state, child, true)?;
245
246 let strategy = DispatchStrategy {
247 r#type: DispatcherType::NoShuffle.into(),
248 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
250 };
251 Ok(StreamNode {
252 stream_key: child_node.stream_key.clone(),
253 fields: child_node.fields.clone(),
254 node_body: Some(NodeBody::Exchange(ExchangeNode {
255 strategy: Some(strategy),
256 })),
257 operator_id: state.gen_operator_id() as u64,
258 append_only: child_node.append_only,
259 input: vec![child_node],
260 identity: "Exchange (NoShuffle)".to_string(),
261 })
262 } else {
263 rewrite_stream_node(state, child, true)
264 }
265 } else {
266 match child.get_node_body()? {
267 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
269 _ => rewrite_stream_node(state, child, insert_exchange_flag),
271 }
272 }
273 };
274 Ok(StreamNode {
275 input: stream_node
276 .input
277 .into_iter()
278 .map(f)
279 .collect::<Result<_>>()?,
280 ..stream_node
281 })
282}
283
284fn generate_fragment_graph(
286 state: &mut BuildFragmentGraphState,
287 stream_node: StreamNode,
288) -> Result<()> {
289 #[cfg(any())]
293 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
294
295 build_and_add_fragment(state, stream_node)?;
296 Ok(())
297}
298
299fn build_and_add_fragment(
301 state: &mut BuildFragmentGraphState,
302 stream_node: StreamNode,
303) -> Result<Rc<StreamFragment>> {
304 let operator_id = stream_node.operator_id as u32;
305 match state.share_mapping.get(&operator_id) {
306 None => {
307 let mut fragment = state.new_stream_fragment();
308 let node = build_fragment(state, &mut fragment, stream_node)?;
309
310 let operator_id = node.operator_id as u32;
314
315 assert!(fragment.node.is_none());
316 fragment.node = Some(Box::new(node));
317 let fragment_ref = Rc::new(fragment);
318
319 state.fragment_graph.add_fragment(fragment_ref.clone());
320 state
321 .share_mapping
322 .insert(operator_id, fragment_ref.fragment_id);
323 Ok(fragment_ref)
324 }
325 Some(fragment_id) => Ok(state
326 .fragment_graph
327 .get_fragment(fragment_id)
328 .unwrap()
329 .clone()),
330 }
331}
332
333fn build_fragment(
336 state: &mut BuildFragmentGraphState,
337 current_fragment: &mut StreamFragment,
338 mut stream_node: StreamNode,
339) -> Result<StreamNode> {
340 recursive::tracker!().recurse(|_t| {
341 match stream_node.get_node_body()? {
343 NodeBody::BarrierRecv(_) => current_fragment
344 .fragment_type_mask
345 .add(FragmentTypeFlag::BarrierRecv),
346
347 NodeBody::Source(node) => {
348 current_fragment
349 .fragment_type_mask
350 .add(FragmentTypeFlag::Source);
351
352 if let Some(source) = node.source_inner.as_ref()
353 && let Some(source_info) = source.info.as_ref()
354 && ((source_info.is_shared() && !source_info.is_distributed)
355 || source.with_properties.requires_singleton())
356 {
357 current_fragment.requires_singleton = true;
358 }
359 }
360
361 NodeBody::Dml(_) => {
362 current_fragment
363 .fragment_type_mask
364 .add(FragmentTypeFlag::Dml);
365 }
366
367 NodeBody::Materialize(_) => {
368 current_fragment
369 .fragment_type_mask
370 .add(FragmentTypeFlag::Mview);
371 }
372
373 NodeBody::Sink(_) => current_fragment
374 .fragment_type_mask
375 .add(FragmentTypeFlag::Sink),
376
377 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
378
379 NodeBody::EowcGapFill(node) => {
380 let table = node.buffer_table.as_ref().unwrap().clone();
381 state.tables.insert(table.id, table);
382 let table = node.prev_row_table.as_ref().unwrap().clone();
383 state.tables.insert(table.id, table);
384 }
385
386 NodeBody::GapFill(node) => {
387 let table = node.state_table.as_ref().unwrap().clone();
388 state.tables.insert(table.id, table);
389 }
390
391 NodeBody::StreamScan(node) => {
392 current_fragment
393 .fragment_type_mask
394 .add(FragmentTypeFlag::StreamScan);
395 match node.stream_scan_type() {
396 StreamScanType::SnapshotBackfill => {
397 current_fragment
398 .fragment_type_mask
399 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
400 state.has_snapshot_backfill = true;
401 }
402 StreamScanType::CrossDbSnapshotBackfill => {
403 current_fragment
404 .fragment_type_mask
405 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
406 state.has_cross_db_snapshot_backfill = true;
407 }
408 StreamScanType::Unspecified
409 | StreamScanType::Chain
410 | StreamScanType::Rearrange
411 | StreamScanType::Backfill
412 | StreamScanType::UpstreamOnly
413 | StreamScanType::ArrangementBackfill => {}
414 }
415 state.dependent_table_ids.insert(node.table_id);
418
419 if let Some(state_table) = &node.state_table {
421 let table = state_table.clone();
422 state.tables.insert(table.id, table);
423 }
424 }
425
426 NodeBody::StreamCdcScan(node) => {
427 if let Some(o) = node.options
428 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
429 {
430 current_fragment
432 .fragment_type_mask
433 .add(FragmentTypeFlag::StreamCdcScan);
434 } else {
435 current_fragment
436 .fragment_type_mask
437 .add(FragmentTypeFlag::StreamScan);
438 current_fragment.requires_singleton = true;
440 }
441 state.has_source_backfill = true;
442 }
443
444 NodeBody::CdcFilter(node) => {
445 current_fragment
446 .fragment_type_mask
447 .add(FragmentTypeFlag::CdcFilter);
448 state
450 .dependent_table_ids
451 .insert(node.upstream_source_id.as_cdc_table_id());
452 }
453 NodeBody::SourceBackfill(node) => {
454 current_fragment
455 .fragment_type_mask
456 .add(FragmentTypeFlag::SourceScan);
457 let source_id = node.upstream_source_id;
459 state
460 .dependent_table_ids
461 .insert(source_id.as_cdc_table_id());
462 state.has_source_backfill = true;
463 }
464
465 NodeBody::Now(_) => {
466 current_fragment
468 .fragment_type_mask
469 .add(FragmentTypeFlag::Now);
470 current_fragment.requires_singleton = true;
471 }
472
473 NodeBody::Values(_) => {
474 current_fragment
475 .fragment_type_mask
476 .add(FragmentTypeFlag::Values);
477 current_fragment.requires_singleton = true;
478 }
479
480 NodeBody::StreamFsFetch(_) => {
481 current_fragment
482 .fragment_type_mask
483 .add(FragmentTypeFlag::FsFetch);
484 }
485
486 NodeBody::VectorIndexWrite(_) => {
487 current_fragment
488 .fragment_type_mask
489 .add(FragmentTypeFlag::VectorIndexWrite);
490 }
491
492 NodeBody::UpstreamSinkUnion(_) => {
493 current_fragment
494 .fragment_type_mask
495 .add(FragmentTypeFlag::UpstreamSinkUnion);
496 }
497
498 NodeBody::LocalityProvider(_) => {
499 current_fragment
500 .fragment_type_mask
501 .add(FragmentTypeFlag::LocalityProvider);
502 }
503
504 _ => {}
505 };
506
507 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
509 {
510 if delta_index_join.get_join_type()? == JoinType::Inner
511 && delta_index_join.condition.is_none()
512 {
513 return build_delta_join_without_arrange(state, current_fragment, stream_node);
514 } else {
515 panic!("only inner join without non-equal condition is supported for delta joins");
516 }
517 }
518
519 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
524 stream_node = state.gen_no_op_stream_node(stream_node);
525 }
526
527 stream_node.input = stream_node
529 .input
530 .into_iter()
531 .map(|mut child_node| {
532 match child_node.get_node_body()? {
533 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
536 NodeBody::Exchange(exchange_node) => {
538 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
539
540 let [input]: [_; 1] =
542 std::mem::take(&mut child_node.input).try_into().unwrap();
543 let child_fragment = build_and_add_fragment(state, input)?;
544
545 let result = state.fragment_graph.try_add_edge(
546 child_fragment.fragment_id,
547 current_fragment.fragment_id,
548 StreamFragmentEdge {
549 dispatch_strategy: exchange_node_strategy.clone(),
550 link_id: child_node.operator_id,
552 },
553 );
554
555 if result.is_err() {
559 child_node.operator_id = state.gen_operator_id() as u64;
562
563 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
565 let no_shuffle_strategy = DispatchStrategy {
566 r#type: DispatcherType::NoShuffle as i32,
567 dist_key_indices: vec![],
568 output_mapping: PbDispatchOutputMapping::identical(
569 ref_fragment_node.fields.len(),
570 )
571 .into(),
572 };
573
574 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
575
576 let no_op_fragment = {
577 let node = state.gen_no_op_stream_node(StreamNode {
578 operator_id: no_shuffle_exchange_operator_id,
579 identity: "StreamNoShuffleExchange".into(),
580 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
581 strategy: Some(no_shuffle_strategy.clone()),
582 }))),
583 input: vec![],
584
585 stream_key: ref_fragment_node.stream_key.clone(),
587 stream_kind: ref_fragment_node.stream_kind,
588 fields: ref_fragment_node.fields.clone(),
589 });
590
591 let mut fragment = state.new_stream_fragment();
592 fragment.node = Some(node.into());
593 Rc::new(fragment)
594 };
595
596 state.fragment_graph.add_fragment(no_op_fragment.clone());
597
598 state.fragment_graph.add_edge(
599 child_fragment.fragment_id,
600 no_op_fragment.fragment_id,
601 StreamFragmentEdge {
602 dispatch_strategy: no_shuffle_strategy,
604 link_id: no_shuffle_exchange_operator_id,
605 },
606 );
607 state.fragment_graph.add_edge(
608 no_op_fragment.fragment_id,
609 current_fragment.fragment_id,
610 StreamFragmentEdge {
611 dispatch_strategy: exchange_node_strategy,
613 link_id: child_node.operator_id,
614 },
615 );
616 }
617
618 Ok(child_node)
619 }
620
621 _ => build_fragment(state, current_fragment, child_node),
623 }
624 })
625 .collect::<Result<_>>()?;
626 Ok(stream_node)
627 })
628}