risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_pb::plan_common::JoinType;
32use risingwave_pb::stream_plan::{
33 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
34 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
35 StreamNode, StreamScanType,
36};
37
38use self::rewrite::build_delta_join_without_arrange;
39use crate::error::ErrorCode::NotSupported;
40use crate::error::{Result, RwError};
41use crate::optimizer::PlanRef;
42use crate::optimizer::plan_node::generic::GenericPlanRef;
43use crate::optimizer::plan_node::reorganize_elements_id;
44use crate::stream_fragmenter::parallelism::derive_parallelism;
45
46#[derive(Educe)]
48#[educe(Default)]
49pub struct BuildFragmentGraphState {
50 fragment_graph: StreamFragmentGraph,
52 next_local_fragment_id: u32,
54
55 next_table_id: u32,
58
59 #[educe(Default(expression = u32::MAX - 1))]
61 next_operator_id: u32,
62
63 dependent_table_ids: HashSet<TableId>,
65
66 share_mapping: HashMap<u32, LocalFragmentId>,
68 share_stream_node_mapping: HashMap<u32, StreamNode>,
70
71 has_source_backfill: bool,
72 has_snapshot_backfill: bool,
73 has_cross_db_snapshot_backfill: bool,
74}
75
76impl BuildFragmentGraphState {
77 fn new_stream_fragment(&mut self) -> StreamFragment {
79 let fragment = StreamFragment::new(self.next_local_fragment_id);
80 self.next_local_fragment_id += 1;
81 fragment
82 }
83
84 fn gen_operator_id(&mut self) -> u32 {
86 self.next_operator_id -= 1;
87 self.next_operator_id
88 }
89
90 pub fn gen_table_id(&mut self) -> u32 {
92 let ret = self.next_table_id;
93 self.next_table_id += 1;
94 ret
95 }
96
97 pub fn gen_table_id_wrapped(&mut self) -> TableId {
99 TableId::new(self.gen_table_id())
100 }
101
102 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
103 self.share_stream_node_mapping
104 .insert(operator_id, stream_node);
105 }
106
107 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
108 self.share_stream_node_mapping.get(&operator_id)
109 }
110
111 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
114 StreamNode {
115 operator_id: self.gen_operator_id() as u64,
116 identity: "StreamNoOp".into(),
117 node_body: Some(NodeBody::NoOp(NoOpNode {})),
118
119 stream_key: input.stream_key.clone(),
121 append_only: input.append_only,
122 fields: input.fields.clone(),
123
124 input: vec![input],
125 }
126 }
127}
128
129pub enum GraphJobType {
131 Table,
132 MaterializedView,
133 Source,
134 Sink,
135 Index,
136}
137
138impl GraphJobType {
139 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
140 match self {
141 GraphJobType::Table => config.streaming_parallelism_for_table(),
142 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
143 GraphJobType::Source => config.streaming_parallelism_for_source(),
144 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
145 GraphJobType::Index => config.streaming_parallelism_for_index(),
146 }
147 }
148}
149
150pub fn build_graph(
151 plan_node: PlanRef,
152 job_type: Option<GraphJobType>,
153) -> Result<StreamFragmentGraphProto> {
154 build_graph_with_strategy(plan_node, job_type, None)
155}
156
157pub fn build_graph_with_strategy(
158 plan_node: PlanRef,
159 job_type: Option<GraphJobType>,
160 backfill_order: Option<BackfillOrder>,
161) -> Result<StreamFragmentGraphProto> {
162 let ctx = plan_node.plan_base().ctx();
163 let plan_node = reorganize_elements_id(plan_node);
164
165 let mut state = BuildFragmentGraphState::default();
166 let stream_node = plan_node.to_stream_prost(&mut state)?;
167 generate_fragment_graph(&mut state, stream_node)?;
168 if state.has_source_backfill && state.has_snapshot_backfill {
169 return Err(RwError::from(NotSupported(
170 "Snapshot backfill with shared source backfill is not supported".to_owned(),
171 "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
172 `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
173 .to_owned(),
174 )));
175 }
176 if state.has_cross_db_snapshot_backfill
177 && let Some(ref backfill_order) = backfill_order
178 && !backfill_order.order.is_empty()
179 {
180 return Err(RwError::from(NotSupported(
181 "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
182 "Please remove backfill order specification from your query".to_owned(),
183 )));
184 }
185
186 let mut fragment_graph = state.fragment_graph.to_protobuf();
187
188 fragment_graph.dependent_table_ids = state
190 .dependent_table_ids
191 .into_iter()
192 .map(|id| id.table_id)
193 .collect();
194 fragment_graph.table_ids_cnt = state.next_table_id;
195
196 {
198 let config = ctx.session_ctx().config();
199 fragment_graph.parallelism = derive_parallelism(
200 job_type.map(|t| t.to_parallelism(config.deref())),
201 config.streaming_parallelism(),
202 );
203 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204 }
205
206 fragment_graph.ctx = Some(StreamContext {
208 timezone: ctx.get_session_timezone(),
209 });
210
211 fragment_graph.backfill_order = backfill_order;
212
213 Ok(fragment_graph)
214}
215
216#[cfg(any())]
217fn is_stateful_executor(stream_node: &StreamNode) -> bool {
218 matches!(
219 stream_node.get_node_body().unwrap(),
220 NodeBody::HashAgg(_)
221 | NodeBody::HashJoin(_)
222 | NodeBody::DeltaIndexJoin(_)
223 | NodeBody::StreamScan(_)
224 | NodeBody::StreamCdcScan(_)
225 | NodeBody::DynamicFilter(_)
226 )
227}
228
229#[cfg(any())]
234fn rewrite_stream_node(
235 state: &mut BuildFragmentGraphState,
236 stream_node: StreamNode,
237 insert_exchange_flag: bool,
238) -> Result<StreamNode> {
239 let f = |child| {
240 if is_stateful_executor(&child) {
243 if insert_exchange_flag {
244 let child_node = rewrite_stream_node(state, child, true)?;
245
246 let strategy = DispatchStrategy {
247 r#type: DispatcherType::NoShuffle.into(),
248 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
250 };
251 Ok(StreamNode {
252 stream_key: child_node.stream_key.clone(),
253 fields: child_node.fields.clone(),
254 node_body: Some(NodeBody::Exchange(ExchangeNode {
255 strategy: Some(strategy),
256 })),
257 operator_id: state.gen_operator_id() as u64,
258 append_only: child_node.append_only,
259 input: vec![child_node],
260 identity: "Exchange (NoShuffle)".to_string(),
261 })
262 } else {
263 rewrite_stream_node(state, child, true)
264 }
265 } else {
266 match child.get_node_body()? {
267 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
269 _ => rewrite_stream_node(state, child, insert_exchange_flag),
271 }
272 }
273 };
274 Ok(StreamNode {
275 input: stream_node
276 .input
277 .into_iter()
278 .map(f)
279 .collect::<Result<_>>()?,
280 ..stream_node
281 })
282}
283
284fn generate_fragment_graph(
286 state: &mut BuildFragmentGraphState,
287 stream_node: StreamNode,
288) -> Result<()> {
289 #[cfg(any())]
293 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
294
295 build_and_add_fragment(state, stream_node)?;
296 Ok(())
297}
298
299fn build_and_add_fragment(
301 state: &mut BuildFragmentGraphState,
302 stream_node: StreamNode,
303) -> Result<Rc<StreamFragment>> {
304 let operator_id = stream_node.operator_id as u32;
305 match state.share_mapping.get(&operator_id) {
306 None => {
307 let mut fragment = state.new_stream_fragment();
308 let node = build_fragment(state, &mut fragment, stream_node)?;
309
310 let operator_id = node.operator_id as u32;
314
315 assert!(fragment.node.is_none());
316 fragment.node = Some(Box::new(node));
317 let fragment_ref = Rc::new(fragment);
318
319 state.fragment_graph.add_fragment(fragment_ref.clone());
320 state
321 .share_mapping
322 .insert(operator_id, fragment_ref.fragment_id);
323 Ok(fragment_ref)
324 }
325 Some(fragment_id) => Ok(state
326 .fragment_graph
327 .get_fragment(fragment_id)
328 .unwrap()
329 .clone()),
330 }
331}
332
333fn build_fragment(
336 state: &mut BuildFragmentGraphState,
337 current_fragment: &mut StreamFragment,
338 mut stream_node: StreamNode,
339) -> Result<StreamNode> {
340 recursive::tracker!().recurse(|_t| {
341 match stream_node.get_node_body()? {
343 NodeBody::BarrierRecv(_) => current_fragment
344 .fragment_type_mask
345 .add(FragmentTypeFlag::BarrierRecv),
346
347 NodeBody::Source(node) => {
348 current_fragment
349 .fragment_type_mask
350 .add(FragmentTypeFlag::Source);
351
352 if let Some(source) = node.source_inner.as_ref()
353 && let Some(source_info) = source.info.as_ref()
354 && ((source_info.is_shared() && !source_info.is_distributed)
355 || source.with_properties.is_new_fs_connector()
356 || source.with_properties.is_iceberg_connector())
357 {
358 current_fragment.requires_singleton = true;
359 }
360 }
361
362 NodeBody::Dml(_) => {
363 current_fragment
364 .fragment_type_mask
365 .add(FragmentTypeFlag::Dml);
366 }
367
368 NodeBody::Materialize(_) => {
369 current_fragment
370 .fragment_type_mask
371 .add(FragmentTypeFlag::Mview);
372 }
373
374 NodeBody::Sink(_) => current_fragment
375 .fragment_type_mask
376 .add(FragmentTypeFlag::Sink),
377
378 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
379
380 NodeBody::StreamScan(node) => {
381 current_fragment
382 .fragment_type_mask
383 .add(FragmentTypeFlag::StreamScan);
384 match node.stream_scan_type() {
385 StreamScanType::SnapshotBackfill => {
386 current_fragment
387 .fragment_type_mask
388 .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
389 state.has_snapshot_backfill = true;
390 }
391 StreamScanType::CrossDbSnapshotBackfill => {
392 current_fragment
393 .fragment_type_mask
394 .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
395 state.has_cross_db_snapshot_backfill = true;
396 }
397 StreamScanType::Unspecified
398 | StreamScanType::Chain
399 | StreamScanType::Rearrange
400 | StreamScanType::Backfill
401 | StreamScanType::UpstreamOnly
402 | StreamScanType::ArrangementBackfill => {}
403 }
404 state
407 .dependent_table_ids
408 .insert(TableId::new(node.table_id));
409 current_fragment.upstream_table_ids.push(node.table_id);
410 }
411
412 NodeBody::StreamCdcScan(_) => {
413 current_fragment
414 .fragment_type_mask
415 .add(FragmentTypeFlag::StreamScan);
416 current_fragment.requires_singleton = true;
418 state.has_source_backfill = true;
419 }
420
421 NodeBody::CdcFilter(node) => {
422 current_fragment
423 .fragment_type_mask
424 .add(FragmentTypeFlag::CdcFilter);
425 state
427 .dependent_table_ids
428 .insert(node.upstream_source_id.into());
429 current_fragment
430 .upstream_table_ids
431 .push(node.upstream_source_id);
432 }
433 NodeBody::SourceBackfill(node) => {
434 current_fragment
435 .fragment_type_mask
436 .add(FragmentTypeFlag::SourceScan);
437 let source_id = node.upstream_source_id;
439 state.dependent_table_ids.insert(source_id.into());
440 current_fragment.upstream_table_ids.push(source_id);
441 state.has_source_backfill = true;
442 }
443
444 NodeBody::Now(_) => {
445 current_fragment
447 .fragment_type_mask
448 .add(FragmentTypeFlag::Now);
449 current_fragment.requires_singleton = true;
450 }
451
452 NodeBody::Values(_) => {
453 current_fragment
454 .fragment_type_mask
455 .add(FragmentTypeFlag::Values);
456 current_fragment.requires_singleton = true;
457 }
458
459 NodeBody::StreamFsFetch(_) => {
460 current_fragment
461 .fragment_type_mask
462 .add(FragmentTypeFlag::FsFetch);
463 }
464
465 _ => {}
466 };
467
468 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
470 {
471 if delta_index_join.get_join_type()? == JoinType::Inner
472 && delta_index_join.condition.is_none()
473 {
474 return build_delta_join_without_arrange(state, current_fragment, stream_node);
475 } else {
476 panic!("only inner join without non-equal condition is supported for delta joins");
477 }
478 }
479
480 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
485 stream_node = state.gen_no_op_stream_node(stream_node);
486 }
487
488 stream_node.input = stream_node
490 .input
491 .into_iter()
492 .map(|mut child_node| {
493 match child_node.get_node_body()? {
494 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
497 NodeBody::Exchange(exchange_node) => {
499 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
500
501 let [input]: [_; 1] =
503 std::mem::take(&mut child_node.input).try_into().unwrap();
504 let child_fragment = build_and_add_fragment(state, input)?;
505
506 let result = state.fragment_graph.try_add_edge(
507 child_fragment.fragment_id,
508 current_fragment.fragment_id,
509 StreamFragmentEdge {
510 dispatch_strategy: exchange_node_strategy.clone(),
511 link_id: child_node.operator_id,
513 },
514 );
515
516 if result.is_err() {
520 child_node.operator_id = state.gen_operator_id() as u64;
523
524 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
526 let no_shuffle_strategy = DispatchStrategy {
527 r#type: DispatcherType::NoShuffle as i32,
528 dist_key_indices: vec![],
529 output_mapping: PbDispatchOutputMapping::identical(
530 ref_fragment_node.fields.len(),
531 )
532 .into(),
533 };
534
535 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
536
537 let no_op_fragment = {
538 let node = state.gen_no_op_stream_node(StreamNode {
539 operator_id: no_shuffle_exchange_operator_id,
540 identity: "StreamNoShuffleExchange".into(),
541 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
542 strategy: Some(no_shuffle_strategy.clone()),
543 }))),
544 input: vec![],
545
546 stream_key: ref_fragment_node.stream_key.clone(),
548 append_only: ref_fragment_node.append_only,
549 fields: ref_fragment_node.fields.clone(),
550 });
551
552 let mut fragment = state.new_stream_fragment();
553 fragment.node = Some(node.into());
554 Rc::new(fragment)
555 };
556
557 state.fragment_graph.add_fragment(no_op_fragment.clone());
558
559 state.fragment_graph.add_edge(
560 child_fragment.fragment_id,
561 no_op_fragment.fragment_id,
562 StreamFragmentEdge {
563 dispatch_strategy: no_shuffle_strategy,
565 link_id: no_shuffle_exchange_operator_id,
566 },
567 );
568 state.fragment_graph.add_edge(
569 no_op_fragment.fragment_id,
570 current_fragment.fragment_id,
571 StreamFragmentEdge {
572 dispatch_strategy: exchange_node_strategy,
574 link_id: child_node.operator_id,
575 },
576 );
577 }
578
579 Ok(child_node)
580 }
581
582 _ => build_fragment(state, current_fragment, child_node),
584 }
585 })
586 .collect::<Result<_>>()?;
587 Ok(stream_node)
588 })
589}