risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::TableId;
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_pb::plan_common::JoinType;
32use risingwave_pb::stream_plan::{
33 BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode,
34 StreamContext, StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
35};
36
37use self::rewrite::build_delta_join_without_arrange;
38use crate::error::Result;
39use crate::optimizer::PlanRef;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::reorganize_elements_id;
42use crate::scheduler::SchedulerResult;
43use crate::stream_fragmenter::parallelism::derive_parallelism;
44
45#[derive(Educe)]
47#[educe(Default)]
48pub struct BuildFragmentGraphState {
49 fragment_graph: StreamFragmentGraph,
51 next_local_fragment_id: u32,
53
54 next_table_id: u32,
57
58 #[educe(Default(expression = u32::MAX - 1))]
60 next_operator_id: u32,
61
62 dependent_table_ids: HashSet<TableId>,
64
65 share_mapping: HashMap<u32, LocalFragmentId>,
67 share_stream_node_mapping: HashMap<u32, StreamNode>,
69}
70
71impl BuildFragmentGraphState {
72 fn new_stream_fragment(&mut self) -> StreamFragment {
74 let fragment = StreamFragment::new(self.next_local_fragment_id);
75 self.next_local_fragment_id += 1;
76 fragment
77 }
78
79 fn gen_operator_id(&mut self) -> u32 {
81 self.next_operator_id -= 1;
82 self.next_operator_id
83 }
84
85 pub fn gen_table_id(&mut self) -> u32 {
87 let ret = self.next_table_id;
88 self.next_table_id += 1;
89 ret
90 }
91
92 pub fn gen_table_id_wrapped(&mut self) -> TableId {
94 TableId::new(self.gen_table_id())
95 }
96
97 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
98 self.share_stream_node_mapping
99 .insert(operator_id, stream_node);
100 }
101
102 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
103 self.share_stream_node_mapping.get(&operator_id)
104 }
105
106 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
109 StreamNode {
110 operator_id: self.gen_operator_id() as u64,
111 identity: "StreamNoOp".into(),
112 node_body: Some(NodeBody::NoOp(NoOpNode {})),
113
114 stream_key: input.stream_key.clone(),
116 append_only: input.append_only,
117 fields: input.fields.clone(),
118
119 input: vec![input],
120 }
121 }
122}
123
124pub enum GraphJobType {
126 Table,
127 MaterializedView,
128 Source,
129 Sink,
130 Index,
131}
132
133impl GraphJobType {
134 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
135 match self {
136 GraphJobType::Table => config.streaming_parallelism_for_table(),
137 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
138 GraphJobType::Source => config.streaming_parallelism_for_source(),
139 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
140 GraphJobType::Index => config.streaming_parallelism_for_index(),
141 }
142 }
143}
144
145pub fn build_graph(
146 plan_node: PlanRef,
147 job_type: Option<GraphJobType>,
148) -> SchedulerResult<StreamFragmentGraphProto> {
149 build_graph_with_strategy(plan_node, job_type, None)
150}
151
152pub fn build_graph_with_strategy(
153 plan_node: PlanRef,
154 job_type: Option<GraphJobType>,
155 backfill_order: Option<BackfillOrder>,
156) -> SchedulerResult<StreamFragmentGraphProto> {
157 let ctx = plan_node.plan_base().ctx();
158 let plan_node = reorganize_elements_id(plan_node);
159
160 let mut state = BuildFragmentGraphState::default();
161 let stream_node = plan_node.to_stream_prost(&mut state)?;
162 generate_fragment_graph(&mut state, stream_node).unwrap();
163 let mut fragment_graph = state.fragment_graph.to_protobuf();
164
165 fragment_graph.dependent_table_ids = state
167 .dependent_table_ids
168 .into_iter()
169 .map(|id| id.table_id)
170 .collect();
171 fragment_graph.table_ids_cnt = state.next_table_id;
172
173 {
175 let config = ctx.session_ctx().config();
176 fragment_graph.parallelism = derive_parallelism(
177 job_type.map(|t| t.to_parallelism(config.deref())),
178 config.streaming_parallelism(),
179 );
180 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
181 }
182
183 fragment_graph.ctx = Some(StreamContext {
185 timezone: ctx.get_session_timezone(),
186 });
187
188 fragment_graph.backfill_order = backfill_order;
189
190 Ok(fragment_graph)
191}
192
193#[cfg(any())]
194fn is_stateful_executor(stream_node: &StreamNode) -> bool {
195 matches!(
196 stream_node.get_node_body().unwrap(),
197 NodeBody::HashAgg(_)
198 | NodeBody::HashJoin(_)
199 | NodeBody::DeltaIndexJoin(_)
200 | NodeBody::StreamScan(_)
201 | NodeBody::StreamCdcScan(_)
202 | NodeBody::DynamicFilter(_)
203 )
204}
205
206#[cfg(any())]
211fn rewrite_stream_node(
212 state: &mut BuildFragmentGraphState,
213 stream_node: StreamNode,
214 insert_exchange_flag: bool,
215) -> Result<StreamNode> {
216 let f = |child| {
217 if is_stateful_executor(&child) {
220 if insert_exchange_flag {
221 let child_node = rewrite_stream_node(state, child, true)?;
222
223 let strategy = DispatchStrategy {
224 r#type: DispatcherType::NoShuffle.into(),
225 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
227 };
228 Ok(StreamNode {
229 stream_key: child_node.stream_key.clone(),
230 fields: child_node.fields.clone(),
231 node_body: Some(NodeBody::Exchange(ExchangeNode {
232 strategy: Some(strategy),
233 })),
234 operator_id: state.gen_operator_id() as u64,
235 append_only: child_node.append_only,
236 input: vec![child_node],
237 identity: "Exchange (NoShuffle)".to_string(),
238 })
239 } else {
240 rewrite_stream_node(state, child, true)
241 }
242 } else {
243 match child.get_node_body()? {
244 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
246 _ => rewrite_stream_node(state, child, insert_exchange_flag),
248 }
249 }
250 };
251 Ok(StreamNode {
252 input: stream_node
253 .input
254 .into_iter()
255 .map(f)
256 .collect::<Result<_>>()?,
257 ..stream_node
258 })
259}
260
261fn generate_fragment_graph(
263 state: &mut BuildFragmentGraphState,
264 stream_node: StreamNode,
265) -> Result<()> {
266 #[cfg(any())]
270 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
271
272 build_and_add_fragment(state, stream_node)?;
273 Ok(())
274}
275
276fn build_and_add_fragment(
278 state: &mut BuildFragmentGraphState,
279 stream_node: StreamNode,
280) -> Result<Rc<StreamFragment>> {
281 let operator_id = stream_node.operator_id as u32;
282 match state.share_mapping.get(&operator_id) {
283 None => {
284 let mut fragment = state.new_stream_fragment();
285 let node = build_fragment(state, &mut fragment, stream_node)?;
286
287 let operator_id = node.operator_id as u32;
291
292 assert!(fragment.node.is_none());
293 fragment.node = Some(Box::new(node));
294 let fragment_ref = Rc::new(fragment);
295
296 state.fragment_graph.add_fragment(fragment_ref.clone());
297 state
298 .share_mapping
299 .insert(operator_id, fragment_ref.fragment_id);
300 Ok(fragment_ref)
301 }
302 Some(fragment_id) => Ok(state
303 .fragment_graph
304 .get_fragment(fragment_id)
305 .unwrap()
306 .clone()),
307 }
308}
309
310fn build_fragment(
313 state: &mut BuildFragmentGraphState,
314 current_fragment: &mut StreamFragment,
315 mut stream_node: StreamNode,
316) -> Result<StreamNode> {
317 recursive::tracker!().recurse(|_t| {
318 match stream_node.get_node_body()? {
320 NodeBody::BarrierRecv(_) => {
321 current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
322 }
323
324 NodeBody::Source(node) => {
325 current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
326
327 if let Some(source) = node.source_inner.as_ref()
328 && let Some(source_info) = source.info.as_ref()
329 && ((source_info.is_shared() && !source_info.is_distributed)
330 || source.with_properties.is_new_fs_connector()
331 || source.with_properties.is_iceberg_connector())
332 {
333 current_fragment.requires_singleton = true;
334 }
335 }
336
337 NodeBody::Dml(_) => {
338 current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
339 }
340
341 NodeBody::Materialize(_) => {
342 current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
343 }
344
345 NodeBody::Sink(_) => {
346 current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
347 }
348
349 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
350
351 NodeBody::StreamScan(node) => {
352 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
353 match node.stream_scan_type() {
354 StreamScanType::SnapshotBackfill => {
355 current_fragment.fragment_type_mask |=
356 FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
357 }
358 StreamScanType::CrossDbSnapshotBackfill => {
359 current_fragment.fragment_type_mask |=
360 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
361 }
362 StreamScanType::Unspecified
363 | StreamScanType::Chain
364 | StreamScanType::Rearrange
365 | StreamScanType::Backfill
366 | StreamScanType::UpstreamOnly
367 | StreamScanType::ArrangementBackfill => {}
368 }
369 state
372 .dependent_table_ids
373 .insert(TableId::new(node.table_id));
374 current_fragment.upstream_table_ids.push(node.table_id);
375 }
376
377 NodeBody::StreamCdcScan(_) => {
378 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
379 current_fragment.requires_singleton = true;
381 }
382
383 NodeBody::CdcFilter(node) => {
384 current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
385 state
387 .dependent_table_ids
388 .insert(node.upstream_source_id.into());
389 current_fragment
390 .upstream_table_ids
391 .push(node.upstream_source_id);
392 }
393 NodeBody::SourceBackfill(node) => {
394 current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
395 let source_id = node.upstream_source_id;
397 state.dependent_table_ids.insert(source_id.into());
398 current_fragment.upstream_table_ids.push(source_id);
399 }
400
401 NodeBody::Now(_) => {
402 current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
404 current_fragment.requires_singleton = true;
405 }
406
407 NodeBody::Values(_) => {
408 current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
409 current_fragment.requires_singleton = true;
410 }
411
412 NodeBody::StreamFsFetch(_) => {
413 current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
414 }
415
416 _ => {}
417 };
418
419 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
421 {
422 if delta_index_join.get_join_type()? == JoinType::Inner
423 && delta_index_join.condition.is_none()
424 {
425 return build_delta_join_without_arrange(state, current_fragment, stream_node);
426 } else {
427 panic!("only inner join without non-equal condition is supported for delta joins");
428 }
429 }
430
431 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
436 stream_node = state.gen_no_op_stream_node(stream_node);
437 }
438
439 stream_node.input = stream_node
441 .input
442 .into_iter()
443 .map(|mut child_node| {
444 match child_node.get_node_body()? {
445 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
448 NodeBody::Exchange(exchange_node) => {
450 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
451
452 let [input]: [_; 1] =
454 std::mem::take(&mut child_node.input).try_into().unwrap();
455 let child_fragment = build_and_add_fragment(state, input)?;
456
457 let result = state.fragment_graph.try_add_edge(
458 child_fragment.fragment_id,
459 current_fragment.fragment_id,
460 StreamFragmentEdge {
461 dispatch_strategy: exchange_node_strategy.clone(),
462 link_id: child_node.operator_id,
464 },
465 );
466
467 if result.is_err() {
471 child_node.operator_id = state.gen_operator_id() as u64;
474
475 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
477 let no_shuffle_strategy = DispatchStrategy {
478 r#type: DispatcherType::NoShuffle as i32,
479 dist_key_indices: vec![],
480 output_indices: (0..ref_fragment_node.fields.len() as u32)
481 .collect(),
482 };
483
484 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
485
486 let no_op_fragment = {
487 let node = state.gen_no_op_stream_node(StreamNode {
488 operator_id: no_shuffle_exchange_operator_id,
489 identity: "StreamNoShuffleExchange".into(),
490 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
491 strategy: Some(no_shuffle_strategy.clone()),
492 }))),
493 input: vec![],
494
495 stream_key: ref_fragment_node.stream_key.clone(),
497 append_only: ref_fragment_node.append_only,
498 fields: ref_fragment_node.fields.clone(),
499 });
500
501 let mut fragment = state.new_stream_fragment();
502 fragment.node = Some(node.into());
503 Rc::new(fragment)
504 };
505
506 state.fragment_graph.add_fragment(no_op_fragment.clone());
507
508 state.fragment_graph.add_edge(
509 child_fragment.fragment_id,
510 no_op_fragment.fragment_id,
511 StreamFragmentEdge {
512 dispatch_strategy: no_shuffle_strategy,
514 link_id: no_shuffle_exchange_operator_id,
515 },
516 );
517 state.fragment_graph.add_edge(
518 no_op_fragment.fragment_id,
519 current_fragment.fragment_id,
520 StreamFragmentEdge {
521 dispatch_strategy: exchange_node_strategy,
523 link_id: child_node.operator_id,
524 },
525 );
526 }
527
528 Ok(child_node)
529 }
530
531 _ => build_fragment(state, current_fragment, child_node),
533 }
534 })
535 .collect::<Result<_>>()?;
536 Ok(stream_node)
537 })
538}