risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_pb::plan_common::JoinType;
33use risingwave_pb::stream_plan::{
34 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
35 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
36 StreamNode, StreamScanType,
37};
38
39use self::rewrite::build_delta_join_without_arrange;
40use crate::error::ErrorCode::NotSupported;
41use crate::error::{Result, RwError};
42use crate::optimizer::plan_node::generic::GenericPlanRef;
43use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
44use crate::stream_fragmenter::parallelism::derive_parallelism;
45
46#[derive(Educe)]
48#[educe(Default)]
49pub struct BuildFragmentGraphState {
50 fragment_graph: StreamFragmentGraph,
52 next_local_fragment_id: u32,
54
55 next_table_id: u32,
58
59 #[educe(Default(expression = u32::MAX - 1))]
61 next_operator_id: u32,
62
63 dependent_table_ids: HashSet<TableId>,
65
66 share_mapping: HashMap<u32, LocalFragmentId>,
68 share_stream_node_mapping: HashMap<u32, StreamNode>,
70
71 has_source_backfill: bool,
72 has_snapshot_backfill: bool,
73 has_cross_db_snapshot_backfill: bool,
74}
75
76impl BuildFragmentGraphState {
77 fn new_stream_fragment(&mut self) -> StreamFragment {
79 let fragment = StreamFragment::new(self.next_local_fragment_id);
80 self.next_local_fragment_id += 1;
81 fragment
82 }
83
84 fn gen_operator_id(&mut self) -> u32 {
86 self.next_operator_id -= 1;
87 self.next_operator_id
88 }
89
90 pub fn gen_table_id(&mut self) -> u32 {
92 let ret = self.next_table_id;
93 self.next_table_id += 1;
94 ret
95 }
96
97 pub fn gen_table_id_wrapped(&mut self) -> TableId {
99 TableId::new(self.gen_table_id())
100 }
101
102 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
103 self.share_stream_node_mapping
104 .insert(operator_id, stream_node);
105 }
106
107 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
108 self.share_stream_node_mapping.get(&operator_id)
109 }
110
111 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
114 StreamNode {
115 operator_id: self.gen_operator_id() as u64,
116 identity: "StreamNoOp".into(),
117 node_body: Some(NodeBody::NoOp(NoOpNode {})),
118
119 stream_key: input.stream_key.clone(),
121 append_only: input.append_only,
122 fields: input.fields.clone(),
123
124 input: vec![input],
125 }
126 }
127}
128
129pub enum GraphJobType {
131 Table,
132 MaterializedView,
133 Source,
134 Sink,
135 Index,
136}
137
138impl GraphJobType {
139 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
140 match self {
141 GraphJobType::Table => config.streaming_parallelism_for_table(),
142 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
143 GraphJobType::Source => config.streaming_parallelism_for_source(),
144 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
145 GraphJobType::Index => config.streaming_parallelism_for_index(),
146 }
147 }
148}
149
150pub fn build_graph(
151 plan_node: PlanRef,
152 job_type: Option<GraphJobType>,
153) -> Result<StreamFragmentGraphProto> {
154 build_graph_with_strategy(plan_node, job_type, None)
155}
156
157pub fn build_graph_with_strategy(
158 plan_node: PlanRef,
159 job_type: Option<GraphJobType>,
160 backfill_order: Option<BackfillOrder>,
161) -> Result<StreamFragmentGraphProto> {
162 let ctx = plan_node.plan_base().ctx();
163 let plan_node = reorganize_elements_id(plan_node);
164
165 let mut state = BuildFragmentGraphState::default();
166 let stream_node = plan_node.to_stream_prost(&mut state)?;
167 generate_fragment_graph(&mut state, stream_node)?;
168 if state.has_source_backfill && state.has_snapshot_backfill {
169 return Err(RwError::from(NotSupported(
170 "Snapshot backfill with shared source backfill is not supported".to_owned(),
171 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
172 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
173 .to_owned(),
174 )));
175 }
176 if state.has_cross_db_snapshot_backfill
177 && let Some(ref backfill_order) = backfill_order
178 && !backfill_order.order.is_empty()
179 {
180 return Err(RwError::from(NotSupported(
181 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
182 "Please remove backfill order specification from your query".to_owned(),
183 )));
184 }
185
186 let mut fragment_graph = state.fragment_graph.to_protobuf();
187
188 fragment_graph.dependent_table_ids = state
190 .dependent_table_ids
191 .into_iter()
192 .map(|id| id.table_id)
193 .collect();
194 fragment_graph.table_ids_cnt = state.next_table_id;
195
196 {
198 let config = ctx.session_ctx().config();
199 fragment_graph.parallelism = derive_parallelism(
200 job_type.map(|t| t.to_parallelism(config.deref())),
201 config.streaming_parallelism(),
202 );
203 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204 }
205
206 fragment_graph.ctx = Some(StreamContext {
208 timezone: ctx.get_session_timezone(),
209 });
210
211 fragment_graph.backfill_order = backfill_order;
212
213 Ok(fragment_graph)
214}
215
216#[cfg(any())]
217fn is_stateful_executor(stream_node: &StreamNode) -> bool {
218 matches!(
219 stream_node.get_node_body().unwrap(),
220 NodeBody::HashAgg(_)
221 | NodeBody::HashJoin(_)
222 | NodeBody::DeltaIndexJoin(_)
223 | NodeBody::StreamScan(_)
224 | NodeBody::StreamCdcScan(_)
225 | NodeBody::DynamicFilter(_)
226 )
227}
228
229#[cfg(any())]
234fn rewrite_stream_node(
235 state: &mut BuildFragmentGraphState,
236 stream_node: StreamNode,
237 insert_exchange_flag: bool,
238) -> Result<StreamNode> {
239 let f = |child| {
240 if is_stateful_executor(&child) {
243 if insert_exchange_flag {
244 let child_node = rewrite_stream_node(state, child, true)?;
245
246 let strategy = DispatchStrategy {
247 r#type: DispatcherType::NoShuffle.into(),
248 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
250 };
251 Ok(StreamNode {
252 stream_key: child_node.stream_key.clone(),
253 fields: child_node.fields.clone(),
254 node_body: Some(NodeBody::Exchange(ExchangeNode {
255 strategy: Some(strategy),
256 })),
257 operator_id: state.gen_operator_id() as u64,
258 append_only: child_node.append_only,
259 input: vec![child_node],
260 identity: "Exchange (NoShuffle)".to_string(),
261 })
262 } else {
263 rewrite_stream_node(state, child, true)
264 }
265 } else {
266 match child.get_node_body()? {
267 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
269 _ => rewrite_stream_node(state, child, insert_exchange_flag),
271 }
272 }
273 };
274 Ok(StreamNode {
275 input: stream_node
276 .input
277 .into_iter()
278 .map(f)
279 .collect::<Result<_>>()?,
280 ..stream_node
281 })
282}
283
284fn generate_fragment_graph(
286 state: &mut BuildFragmentGraphState,
287 stream_node: StreamNode,
288) -> Result<()> {
289 #[cfg(any())]
293 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
294
295 build_and_add_fragment(state, stream_node)?;
296 Ok(())
297}
298
299fn build_and_add_fragment(
301 state: &mut BuildFragmentGraphState,
302 stream_node: StreamNode,
303) -> Result<Rc<StreamFragment>> {
304 let operator_id = stream_node.operator_id as u32;
305 match state.share_mapping.get(&operator_id) {
306 None => {
307 let mut fragment = state.new_stream_fragment();
308 let node = build_fragment(state, &mut fragment, stream_node)?;
309
310 let operator_id = node.operator_id as u32;
314
315 assert!(fragment.node.is_none());
316 fragment.node = Some(Box::new(node));
317 let fragment_ref = Rc::new(fragment);
318
319 state.fragment_graph.add_fragment(fragment_ref.clone());
320 state
321 .share_mapping
322 .insert(operator_id, fragment_ref.fragment_id);
323 Ok(fragment_ref)
324 }
325 Some(fragment_id) => Ok(state
326 .fragment_graph
327 .get_fragment(fragment_id)
328 .unwrap()
329 .clone()),
330 }
331}
332
333fn build_fragment(
336 state: &mut BuildFragmentGraphState,
337 current_fragment: &mut StreamFragment,
338 mut stream_node: StreamNode,
339) -> Result<StreamNode> {
340 recursive::tracker!().recurse(|_t| {
341 match stream_node.get_node_body()? {
343 NodeBody::BarrierRecv(_) => current_fragment
344 .fragment_type_mask
345 .add(FragmentTypeFlag::BarrierRecv),
346
347 NodeBody::Source(node) => {
348 current_fragment
349 .fragment_type_mask
350 .add(FragmentTypeFlag::Source);
351
352 if let Some(source) = node.source_inner.as_ref()
353 && let Some(source_info) = source.info.as_ref()
354 && ((source_info.is_shared() && !source_info.is_distributed)
355 || source.with_properties.requires_singleton())
356 {
357 current_fragment.requires_singleton = true;
358 }
359 }
360
361 NodeBody::Dml(_) => {
362 current_fragment
363 .fragment_type_mask
364 .add(FragmentTypeFlag::Dml);
365 }
366
367 NodeBody::Materialize(_) => {
368 current_fragment
369 .fragment_type_mask
370 .add(FragmentTypeFlag::Mview);
371 }
372
373 NodeBody::Sink(_) => current_fragment
374 .fragment_type_mask
375 .add(FragmentTypeFlag::Sink),
376
377 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
378
379 NodeBody::StreamScan(node) => {
380 current_fragment
381 .fragment_type_mask
382 .add(FragmentTypeFlag::StreamScan);
383 match node.stream_scan_type() {
384 StreamScanType::SnapshotBackfill => {
385 current_fragment
386 .fragment_type_mask
387 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
388 state.has_snapshot_backfill = true;
389 }
390 StreamScanType::CrossDbSnapshotBackfill => {
391 current_fragment
392 .fragment_type_mask
393 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
394 state.has_cross_db_snapshot_backfill = true;
395 }
396 StreamScanType::Unspecified
397 | StreamScanType::Chain
398 | StreamScanType::Rearrange
399 | StreamScanType::Backfill
400 | StreamScanType::UpstreamOnly
401 | StreamScanType::ArrangementBackfill => {}
402 }
403 state
406 .dependent_table_ids
407 .insert(TableId::new(node.table_id));
408 current_fragment.upstream_table_ids.push(node.table_id);
409 }
410
411 NodeBody::StreamCdcScan(node) => {
412 if let Some(o) = node.options
413 && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
414 {
415 current_fragment
417 .fragment_type_mask
418 .add(FragmentTypeFlag::StreamCdcScan);
419 } else {
420 current_fragment
421 .fragment_type_mask
422 .add(FragmentTypeFlag::StreamScan);
423 current_fragment.requires_singleton = true;
425 }
426 state.has_source_backfill = true;
427 }
428
429 NodeBody::CdcFilter(node) => {
430 current_fragment
431 .fragment_type_mask
432 .add(FragmentTypeFlag::CdcFilter);
433 state
435 .dependent_table_ids
436 .insert(node.upstream_source_id.into());
437 current_fragment
438 .upstream_table_ids
439 .push(node.upstream_source_id);
440 }
441 NodeBody::SourceBackfill(node) => {
442 current_fragment
443 .fragment_type_mask
444 .add(FragmentTypeFlag::SourceScan);
445 let source_id = node.upstream_source_id;
447 state.dependent_table_ids.insert(source_id.into());
448 current_fragment.upstream_table_ids.push(source_id);
449 state.has_source_backfill = true;
450 }
451
452 NodeBody::Now(_) => {
453 current_fragment
455 .fragment_type_mask
456 .add(FragmentTypeFlag::Now);
457 current_fragment.requires_singleton = true;
458 }
459
460 NodeBody::Values(_) => {
461 current_fragment
462 .fragment_type_mask
463 .add(FragmentTypeFlag::Values);
464 current_fragment.requires_singleton = true;
465 }
466
467 NodeBody::StreamFsFetch(_) => {
468 current_fragment
469 .fragment_type_mask
470 .add(FragmentTypeFlag::FsFetch);
471 }
472
473 NodeBody::VectorIndexWrite(_) => {
474 current_fragment
475 .fragment_type_mask
476 .add(FragmentTypeFlag::VectorIndexWrite);
477 }
478
479 _ => {}
480 };
481
482 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
484 {
485 if delta_index_join.get_join_type()? == JoinType::Inner
486 && delta_index_join.condition.is_none()
487 {
488 return build_delta_join_without_arrange(state, current_fragment, stream_node);
489 } else {
490 panic!("only inner join without non-equal condition is supported for delta joins");
491 }
492 }
493
494 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
499 stream_node = state.gen_no_op_stream_node(stream_node);
500 }
501
502 stream_node.input = stream_node
504 .input
505 .into_iter()
506 .map(|mut child_node| {
507 match child_node.get_node_body()? {
508 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
511 NodeBody::Exchange(exchange_node) => {
513 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
514
515 let [input]: [_; 1] =
517 std::mem::take(&mut child_node.input).try_into().unwrap();
518 let child_fragment = build_and_add_fragment(state, input)?;
519
520 let result = state.fragment_graph.try_add_edge(
521 child_fragment.fragment_id,
522 current_fragment.fragment_id,
523 StreamFragmentEdge {
524 dispatch_strategy: exchange_node_strategy.clone(),
525 link_id: child_node.operator_id,
527 },
528 );
529
530 if result.is_err() {
534 child_node.operator_id = state.gen_operator_id() as u64;
537
538 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
540 let no_shuffle_strategy = DispatchStrategy {
541 r#type: DispatcherType::NoShuffle as i32,
542 dist_key_indices: vec![],
543 output_mapping: PbDispatchOutputMapping::identical(
544 ref_fragment_node.fields.len(),
545 )
546 .into(),
547 };
548
549 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
550
551 let no_op_fragment = {
552 let node = state.gen_no_op_stream_node(StreamNode {
553 operator_id: no_shuffle_exchange_operator_id,
554 identity: "StreamNoShuffleExchange".into(),
555 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
556 strategy: Some(no_shuffle_strategy.clone()),
557 }))),
558 input: vec![],
559
560 stream_key: ref_fragment_node.stream_key.clone(),
562 append_only: ref_fragment_node.append_only,
563 fields: ref_fragment_node.fields.clone(),
564 });
565
566 let mut fragment = state.new_stream_fragment();
567 fragment.node = Some(node.into());
568 Rc::new(fragment)
569 };
570
571 state.fragment_graph.add_fragment(no_op_fragment.clone());
572
573 state.fragment_graph.add_edge(
574 child_fragment.fragment_id,
575 no_op_fragment.fragment_id,
576 StreamFragmentEdge {
577 dispatch_strategy: no_shuffle_strategy,
579 link_id: no_shuffle_exchange_operator_id,
580 },
581 );
582 state.fragment_graph.add_edge(
583 no_op_fragment.fragment_id,
584 current_fragment.fragment_id,
585 StreamFragmentEdge {
586 dispatch_strategy: exchange_node_strategy,
588 link_id: child_node.operator_id,
589 },
590 );
591 }
592
593 Ok(child_node)
594 }
595
596 _ => build_fragment(state, current_fragment, child_node),
598 }
599 })
600 .collect::<Result<_>>()?;
601 Ok(stream_node)
602 })
603}