risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::TableId;
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_pb::plan_common::JoinType;
32use risingwave_pb::stream_plan::{
33 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
34 PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
35 StreamNode, StreamScanType,
36};
37
38use self::rewrite::build_delta_join_without_arrange;
39use crate::error::Result;
40use crate::optimizer::PlanRef;
41use crate::optimizer::plan_node::generic::GenericPlanRef;
42use crate::optimizer::plan_node::reorganize_elements_id;
43use crate::scheduler::SchedulerResult;
44use crate::stream_fragmenter::parallelism::derive_parallelism;
45
46#[derive(Educe)]
48#[educe(Default)]
49pub struct BuildFragmentGraphState {
50 fragment_graph: StreamFragmentGraph,
52 next_local_fragment_id: u32,
54
55 next_table_id: u32,
58
59 #[educe(Default(expression = u32::MAX - 1))]
61 next_operator_id: u32,
62
63 dependent_table_ids: HashSet<TableId>,
65
66 share_mapping: HashMap<u32, LocalFragmentId>,
68 share_stream_node_mapping: HashMap<u32, StreamNode>,
70}
71
72impl BuildFragmentGraphState {
73 fn new_stream_fragment(&mut self) -> StreamFragment {
75 let fragment = StreamFragment::new(self.next_local_fragment_id);
76 self.next_local_fragment_id += 1;
77 fragment
78 }
79
80 fn gen_operator_id(&mut self) -> u32 {
82 self.next_operator_id -= 1;
83 self.next_operator_id
84 }
85
86 pub fn gen_table_id(&mut self) -> u32 {
88 let ret = self.next_table_id;
89 self.next_table_id += 1;
90 ret
91 }
92
93 pub fn gen_table_id_wrapped(&mut self) -> TableId {
95 TableId::new(self.gen_table_id())
96 }
97
98 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
99 self.share_stream_node_mapping
100 .insert(operator_id, stream_node);
101 }
102
103 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
104 self.share_stream_node_mapping.get(&operator_id)
105 }
106
107 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
110 StreamNode {
111 operator_id: self.gen_operator_id() as u64,
112 identity: "StreamNoOp".into(),
113 node_body: Some(NodeBody::NoOp(NoOpNode {})),
114
115 stream_key: input.stream_key.clone(),
117 append_only: input.append_only,
118 fields: input.fields.clone(),
119
120 input: vec![input],
121 }
122 }
123}
124
125pub enum GraphJobType {
127 Table,
128 MaterializedView,
129 Source,
130 Sink,
131 Index,
132}
133
134impl GraphJobType {
135 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
136 match self {
137 GraphJobType::Table => config.streaming_parallelism_for_table(),
138 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
139 GraphJobType::Source => config.streaming_parallelism_for_source(),
140 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
141 GraphJobType::Index => config.streaming_parallelism_for_index(),
142 }
143 }
144}
145
146pub fn build_graph(
147 plan_node: PlanRef,
148 job_type: Option<GraphJobType>,
149) -> SchedulerResult<StreamFragmentGraphProto> {
150 build_graph_with_strategy(plan_node, job_type, None)
151}
152
153pub fn build_graph_with_strategy(
154 plan_node: PlanRef,
155 job_type: Option<GraphJobType>,
156 backfill_order: Option<BackfillOrder>,
157) -> SchedulerResult<StreamFragmentGraphProto> {
158 let ctx = plan_node.plan_base().ctx();
159 let plan_node = reorganize_elements_id(plan_node);
160
161 let mut state = BuildFragmentGraphState::default();
162 let stream_node = plan_node.to_stream_prost(&mut state)?;
163 generate_fragment_graph(&mut state, stream_node).unwrap();
164 let mut fragment_graph = state.fragment_graph.to_protobuf();
165
166 fragment_graph.dependent_table_ids = state
168 .dependent_table_ids
169 .into_iter()
170 .map(|id| id.table_id)
171 .collect();
172 fragment_graph.table_ids_cnt = state.next_table_id;
173
174 {
176 let config = ctx.session_ctx().config();
177 fragment_graph.parallelism = derive_parallelism(
178 job_type.map(|t| t.to_parallelism(config.deref())),
179 config.streaming_parallelism(),
180 );
181 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
182 }
183
184 fragment_graph.ctx = Some(StreamContext {
186 timezone: ctx.get_session_timezone(),
187 });
188
189 fragment_graph.backfill_order = backfill_order;
190
191 Ok(fragment_graph)
192}
193
194#[cfg(any())]
195fn is_stateful_executor(stream_node: &StreamNode) -> bool {
196 matches!(
197 stream_node.get_node_body().unwrap(),
198 NodeBody::HashAgg(_)
199 | NodeBody::HashJoin(_)
200 | NodeBody::DeltaIndexJoin(_)
201 | NodeBody::StreamScan(_)
202 | NodeBody::StreamCdcScan(_)
203 | NodeBody::DynamicFilter(_)
204 )
205}
206
207#[cfg(any())]
212fn rewrite_stream_node(
213 state: &mut BuildFragmentGraphState,
214 stream_node: StreamNode,
215 insert_exchange_flag: bool,
216) -> Result<StreamNode> {
217 let f = |child| {
218 if is_stateful_executor(&child) {
221 if insert_exchange_flag {
222 let child_node = rewrite_stream_node(state, child, true)?;
223
224 let strategy = DispatchStrategy {
225 r#type: DispatcherType::NoShuffle.into(),
226 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
228 };
229 Ok(StreamNode {
230 stream_key: child_node.stream_key.clone(),
231 fields: child_node.fields.clone(),
232 node_body: Some(NodeBody::Exchange(ExchangeNode {
233 strategy: Some(strategy),
234 })),
235 operator_id: state.gen_operator_id() as u64,
236 append_only: child_node.append_only,
237 input: vec![child_node],
238 identity: "Exchange (NoShuffle)".to_string(),
239 })
240 } else {
241 rewrite_stream_node(state, child, true)
242 }
243 } else {
244 match child.get_node_body()? {
245 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
247 _ => rewrite_stream_node(state, child, insert_exchange_flag),
249 }
250 }
251 };
252 Ok(StreamNode {
253 input: stream_node
254 .input
255 .into_iter()
256 .map(f)
257 .collect::<Result<_>>()?,
258 ..stream_node
259 })
260}
261
262fn generate_fragment_graph(
264 state: &mut BuildFragmentGraphState,
265 stream_node: StreamNode,
266) -> Result<()> {
267 #[cfg(any())]
271 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
272
273 build_and_add_fragment(state, stream_node)?;
274 Ok(())
275}
276
277fn build_and_add_fragment(
279 state: &mut BuildFragmentGraphState,
280 stream_node: StreamNode,
281) -> Result<Rc<StreamFragment>> {
282 let operator_id = stream_node.operator_id as u32;
283 match state.share_mapping.get(&operator_id) {
284 None => {
285 let mut fragment = state.new_stream_fragment();
286 let node = build_fragment(state, &mut fragment, stream_node)?;
287
288 let operator_id = node.operator_id as u32;
292
293 assert!(fragment.node.is_none());
294 fragment.node = Some(Box::new(node));
295 let fragment_ref = Rc::new(fragment);
296
297 state.fragment_graph.add_fragment(fragment_ref.clone());
298 state
299 .share_mapping
300 .insert(operator_id, fragment_ref.fragment_id);
301 Ok(fragment_ref)
302 }
303 Some(fragment_id) => Ok(state
304 .fragment_graph
305 .get_fragment(fragment_id)
306 .unwrap()
307 .clone()),
308 }
309}
310
311fn build_fragment(
314 state: &mut BuildFragmentGraphState,
315 current_fragment: &mut StreamFragment,
316 mut stream_node: StreamNode,
317) -> Result<StreamNode> {
318 recursive::tracker!().recurse(|_t| {
319 match stream_node.get_node_body()? {
321 NodeBody::BarrierRecv(_) => {
322 current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
323 }
324
325 NodeBody::Source(node) => {
326 current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
327
328 if let Some(source) = node.source_inner.as_ref()
329 && let Some(source_info) = source.info.as_ref()
330 && ((source_info.is_shared() && !source_info.is_distributed)
331 || source.with_properties.is_new_fs_connector()
332 || source.with_properties.is_iceberg_connector())
333 {
334 current_fragment.requires_singleton = true;
335 }
336 }
337
338 NodeBody::Dml(_) => {
339 current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
340 }
341
342 NodeBody::Materialize(_) => {
343 current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
344 }
345
346 NodeBody::Sink(_) => {
347 current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
348 }
349
350 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
351
352 NodeBody::StreamScan(node) => {
353 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
354 match node.stream_scan_type() {
355 StreamScanType::SnapshotBackfill => {
356 current_fragment.fragment_type_mask |=
357 FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
358 }
359 StreamScanType::CrossDbSnapshotBackfill => {
360 current_fragment.fragment_type_mask |=
361 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
362 }
363 StreamScanType::Unspecified
364 | StreamScanType::Chain
365 | StreamScanType::Rearrange
366 | StreamScanType::Backfill
367 | StreamScanType::UpstreamOnly
368 | StreamScanType::ArrangementBackfill => {}
369 }
370 state
373 .dependent_table_ids
374 .insert(TableId::new(node.table_id));
375 current_fragment.upstream_table_ids.push(node.table_id);
376 }
377
378 NodeBody::StreamCdcScan(_) => {
379 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
380 current_fragment.requires_singleton = true;
382 }
383
384 NodeBody::CdcFilter(node) => {
385 current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
386 state
388 .dependent_table_ids
389 .insert(node.upstream_source_id.into());
390 current_fragment
391 .upstream_table_ids
392 .push(node.upstream_source_id);
393 }
394 NodeBody::SourceBackfill(node) => {
395 current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
396 let source_id = node.upstream_source_id;
398 state.dependent_table_ids.insert(source_id.into());
399 current_fragment.upstream_table_ids.push(source_id);
400 }
401
402 NodeBody::Now(_) => {
403 current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
405 current_fragment.requires_singleton = true;
406 }
407
408 NodeBody::Values(_) => {
409 current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
410 current_fragment.requires_singleton = true;
411 }
412
413 NodeBody::StreamFsFetch(_) => {
414 current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
415 }
416
417 _ => {}
418 };
419
420 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
422 {
423 if delta_index_join.get_join_type()? == JoinType::Inner
424 && delta_index_join.condition.is_none()
425 {
426 return build_delta_join_without_arrange(state, current_fragment, stream_node);
427 } else {
428 panic!("only inner join without non-equal condition is supported for delta joins");
429 }
430 }
431
432 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
437 stream_node = state.gen_no_op_stream_node(stream_node);
438 }
439
440 stream_node.input = stream_node
442 .input
443 .into_iter()
444 .map(|mut child_node| {
445 match child_node.get_node_body()? {
446 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
449 NodeBody::Exchange(exchange_node) => {
451 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
452
453 let [input]: [_; 1] =
455 std::mem::take(&mut child_node.input).try_into().unwrap();
456 let child_fragment = build_and_add_fragment(state, input)?;
457
458 let result = state.fragment_graph.try_add_edge(
459 child_fragment.fragment_id,
460 current_fragment.fragment_id,
461 StreamFragmentEdge {
462 dispatch_strategy: exchange_node_strategy.clone(),
463 link_id: child_node.operator_id,
465 },
466 );
467
468 if result.is_err() {
472 child_node.operator_id = state.gen_operator_id() as u64;
475
476 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
478 let no_shuffle_strategy = DispatchStrategy {
479 r#type: DispatcherType::NoShuffle as i32,
480 dist_key_indices: vec![],
481 output_mapping: PbDispatchOutputMapping::identical(
482 ref_fragment_node.fields.len(),
483 )
484 .into(),
485 };
486
487 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
488
489 let no_op_fragment = {
490 let node = state.gen_no_op_stream_node(StreamNode {
491 operator_id: no_shuffle_exchange_operator_id,
492 identity: "StreamNoShuffleExchange".into(),
493 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
494 strategy: Some(no_shuffle_strategy.clone()),
495 }))),
496 input: vec![],
497
498 stream_key: ref_fragment_node.stream_key.clone(),
500 append_only: ref_fragment_node.append_only,
501 fields: ref_fragment_node.fields.clone(),
502 });
503
504 let mut fragment = state.new_stream_fragment();
505 fragment.node = Some(node.into());
506 Rc::new(fragment)
507 };
508
509 state.fragment_graph.add_fragment(no_op_fragment.clone());
510
511 state.fragment_graph.add_edge(
512 child_fragment.fragment_id,
513 no_op_fragment.fragment_id,
514 StreamFragmentEdge {
515 dispatch_strategy: no_shuffle_strategy,
517 link_id: no_shuffle_exchange_operator_id,
518 },
519 );
520 state.fragment_graph.add_edge(
521 no_op_fragment.fragment_id,
522 current_fragment.fragment_id,
523 StreamFragmentEdge {
524 dispatch_strategy: exchange_node_strategy,
526 link_id: child_node.operator_id,
527 },
528 );
529 }
530
531 Ok(child_node)
532 }
533
534 _ => build_fragment(state, current_fragment, child_node),
536 }
537 })
538 .collect::<Result<_>>()?;
539 Ok(stream_node)
540 })
541}