risingwave_frontend/stream_fragmenter/
mod.rs1mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::TableId;
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_pb::plan_common::JoinType;
32use risingwave_pb::stream_plan::{
33 DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode, StreamContext,
34 StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
35};
36
37use self::rewrite::build_delta_join_without_arrange;
38use crate::error::Result;
39use crate::optimizer::PlanRef;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::reorganize_elements_id;
42use crate::scheduler::SchedulerResult;
43use crate::stream_fragmenter::parallelism::derive_parallelism;
44
45#[derive(Educe)]
47#[educe(Default)]
48pub struct BuildFragmentGraphState {
49 fragment_graph: StreamFragmentGraph,
51 next_local_fragment_id: u32,
53
54 next_table_id: u32,
57
58 #[educe(Default(expression = u32::MAX - 1))]
60 next_operator_id: u32,
61
62 dependent_table_ids: HashSet<TableId>,
64
65 share_mapping: HashMap<u32, LocalFragmentId>,
67 share_stream_node_mapping: HashMap<u32, StreamNode>,
69}
70
71impl BuildFragmentGraphState {
72 fn new_stream_fragment(&mut self) -> StreamFragment {
74 let fragment = StreamFragment::new(self.next_local_fragment_id);
75 self.next_local_fragment_id += 1;
76 fragment
77 }
78
79 fn gen_operator_id(&mut self) -> u32 {
81 self.next_operator_id -= 1;
82 self.next_operator_id
83 }
84
85 pub fn gen_table_id(&mut self) -> u32 {
87 let ret = self.next_table_id;
88 self.next_table_id += 1;
89 ret
90 }
91
92 pub fn gen_table_id_wrapped(&mut self) -> TableId {
94 TableId::new(self.gen_table_id())
95 }
96
97 pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
98 self.share_stream_node_mapping
99 .insert(operator_id, stream_node);
100 }
101
102 pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
103 self.share_stream_node_mapping.get(&operator_id)
104 }
105
106 pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
109 StreamNode {
110 operator_id: self.gen_operator_id() as u64,
111 identity: "StreamNoOp".into(),
112 node_body: Some(NodeBody::NoOp(NoOpNode {})),
113
114 stream_key: input.stream_key.clone(),
116 append_only: input.append_only,
117 fields: input.fields.clone(),
118
119 input: vec![input],
120 }
121 }
122}
123
124pub enum GraphJobType {
126 Table,
127 MaterializedView,
128 Source,
129 Sink,
130 Index,
131}
132
133impl GraphJobType {
134 pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
135 match self {
136 GraphJobType::Table => config.streaming_parallelism_for_table(),
137 GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
138 GraphJobType::Source => config.streaming_parallelism_for_source(),
139 GraphJobType::Sink => config.streaming_parallelism_for_sink(),
140 GraphJobType::Index => config.streaming_parallelism_for_index(),
141 }
142 }
143}
144
145pub fn build_graph(
146 plan_node: PlanRef,
147 job_type: Option<GraphJobType>,
148) -> SchedulerResult<StreamFragmentGraphProto> {
149 let ctx = plan_node.plan_base().ctx();
150 let plan_node = reorganize_elements_id(plan_node);
151
152 let mut state = BuildFragmentGraphState::default();
153 let stream_node = plan_node.to_stream_prost(&mut state)?;
154 generate_fragment_graph(&mut state, stream_node).unwrap();
155 let mut fragment_graph = state.fragment_graph.to_protobuf();
156
157 fragment_graph.dependent_table_ids = state
159 .dependent_table_ids
160 .into_iter()
161 .map(|id| id.table_id)
162 .collect();
163 fragment_graph.table_ids_cnt = state.next_table_id;
164
165 {
167 let config = ctx.session_ctx().config();
168 fragment_graph.parallelism = derive_parallelism(
169 job_type.map(|t| t.to_parallelism(config.deref())),
170 config.streaming_parallelism(),
171 );
172 fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
173 }
174
175 fragment_graph.ctx = Some(StreamContext {
177 timezone: ctx.get_session_timezone(),
178 });
179
180 Ok(fragment_graph)
181}
182
183#[cfg(any())]
184fn is_stateful_executor(stream_node: &StreamNode) -> bool {
185 matches!(
186 stream_node.get_node_body().unwrap(),
187 NodeBody::HashAgg(_)
188 | NodeBody::HashJoin(_)
189 | NodeBody::DeltaIndexJoin(_)
190 | NodeBody::StreamScan(_)
191 | NodeBody::StreamCdcScan(_)
192 | NodeBody::DynamicFilter(_)
193 )
194}
195
196#[cfg(any())]
201fn rewrite_stream_node(
202 state: &mut BuildFragmentGraphState,
203 stream_node: StreamNode,
204 insert_exchange_flag: bool,
205) -> Result<StreamNode> {
206 let f = |child| {
207 if is_stateful_executor(&child) {
210 if insert_exchange_flag {
211 let child_node = rewrite_stream_node(state, child, true)?;
212
213 let strategy = DispatchStrategy {
214 r#type: DispatcherType::NoShuffle.into(),
215 dist_key_indices: vec![], output_indices: (0..(child_node.fields.len() as u32)).collect(),
217 };
218 Ok(StreamNode {
219 stream_key: child_node.stream_key.clone(),
220 fields: child_node.fields.clone(),
221 node_body: Some(NodeBody::Exchange(ExchangeNode {
222 strategy: Some(strategy),
223 })),
224 operator_id: state.gen_operator_id() as u64,
225 append_only: child_node.append_only,
226 input: vec![child_node],
227 identity: "Exchange (NoShuffle)".to_string(),
228 })
229 } else {
230 rewrite_stream_node(state, child, true)
231 }
232 } else {
233 match child.get_node_body()? {
234 NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
236 _ => rewrite_stream_node(state, child, insert_exchange_flag),
238 }
239 }
240 };
241 Ok(StreamNode {
242 input: stream_node
243 .input
244 .into_iter()
245 .map(f)
246 .collect::<Result<_>>()?,
247 ..stream_node
248 })
249}
250
251fn generate_fragment_graph(
253 state: &mut BuildFragmentGraphState,
254 stream_node: StreamNode,
255) -> Result<()> {
256 #[cfg(any())]
260 let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
261
262 build_and_add_fragment(state, stream_node)?;
263 Ok(())
264}
265
266fn build_and_add_fragment(
268 state: &mut BuildFragmentGraphState,
269 stream_node: StreamNode,
270) -> Result<Rc<StreamFragment>> {
271 let operator_id = stream_node.operator_id as u32;
272 match state.share_mapping.get(&operator_id) {
273 None => {
274 let mut fragment = state.new_stream_fragment();
275 let node = build_fragment(state, &mut fragment, stream_node)?;
276
277 let operator_id = node.operator_id as u32;
281
282 assert!(fragment.node.is_none());
283 fragment.node = Some(Box::new(node));
284 let fragment_ref = Rc::new(fragment);
285
286 state.fragment_graph.add_fragment(fragment_ref.clone());
287 state
288 .share_mapping
289 .insert(operator_id, fragment_ref.fragment_id);
290 Ok(fragment_ref)
291 }
292 Some(fragment_id) => Ok(state
293 .fragment_graph
294 .get_fragment(fragment_id)
295 .unwrap()
296 .clone()),
297 }
298}
299
300fn build_fragment(
303 state: &mut BuildFragmentGraphState,
304 current_fragment: &mut StreamFragment,
305 mut stream_node: StreamNode,
306) -> Result<StreamNode> {
307 recursive::tracker!().recurse(|_t| {
308 match stream_node.get_node_body()? {
310 NodeBody::BarrierRecv(_) => {
311 current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
312 }
313
314 NodeBody::Source(node) => {
315 current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
316
317 if let Some(source) = node.source_inner.as_ref()
318 && let Some(source_info) = source.info.as_ref()
319 && ((source_info.is_shared() && !source_info.is_distributed)
320 || source.with_properties.is_new_fs_connector()
321 || source.with_properties.is_iceberg_connector())
322 {
323 current_fragment.requires_singleton = true;
324 }
325 }
326
327 NodeBody::Dml(_) => {
328 current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
329 }
330
331 NodeBody::Materialize(_) => {
332 current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
333 }
334
335 NodeBody::Sink(_) => {
336 current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
337 }
338
339 NodeBody::TopN(_) => current_fragment.requires_singleton = true,
340
341 NodeBody::StreamScan(node) => {
342 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
343 match node.stream_scan_type() {
344 StreamScanType::SnapshotBackfill => {
345 current_fragment.fragment_type_mask |=
346 FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
347 }
348 StreamScanType::CrossDbSnapshotBackfill => {
349 current_fragment.fragment_type_mask |=
350 FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
351 }
352 StreamScanType::Unspecified
353 | StreamScanType::Chain
354 | StreamScanType::Rearrange
355 | StreamScanType::Backfill
356 | StreamScanType::UpstreamOnly
357 | StreamScanType::ArrangementBackfill => {}
358 }
359 state
362 .dependent_table_ids
363 .insert(TableId::new(node.table_id));
364 current_fragment.upstream_table_ids.push(node.table_id);
365 }
366
367 NodeBody::StreamCdcScan(_) => {
368 current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
369 current_fragment.requires_singleton = true;
371 }
372
373 NodeBody::CdcFilter(node) => {
374 current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
375 state
377 .dependent_table_ids
378 .insert(node.upstream_source_id.into());
379 current_fragment
380 .upstream_table_ids
381 .push(node.upstream_source_id);
382 }
383 NodeBody::SourceBackfill(node) => {
384 current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
385 let source_id = node.upstream_source_id;
387 state.dependent_table_ids.insert(source_id.into());
388 current_fragment.upstream_table_ids.push(source_id);
389 }
390
391 NodeBody::Now(_) => {
392 current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
394 current_fragment.requires_singleton = true;
395 }
396
397 NodeBody::Values(_) => {
398 current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
399 current_fragment.requires_singleton = true;
400 }
401
402 NodeBody::StreamFsFetch(_) => {
403 current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
404 }
405
406 _ => {}
407 };
408
409 if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
411 {
412 if delta_index_join.get_join_type()? == JoinType::Inner
413 && delta_index_join.condition.is_none()
414 {
415 return build_delta_join_without_arrange(state, current_fragment, stream_node);
416 } else {
417 panic!("only inner join without non-equal condition is supported for delta joins");
418 }
419 }
420
421 if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
426 stream_node = state.gen_no_op_stream_node(stream_node);
427 }
428
429 stream_node.input = stream_node
431 .input
432 .into_iter()
433 .map(|mut child_node| {
434 match child_node.get_node_body()? {
435 NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
438 NodeBody::Exchange(exchange_node) => {
440 let exchange_node_strategy = exchange_node.get_strategy()?.clone();
441
442 let [input]: [_; 1] =
444 std::mem::take(&mut child_node.input).try_into().unwrap();
445 let child_fragment = build_and_add_fragment(state, input)?;
446
447 let result = state.fragment_graph.try_add_edge(
448 child_fragment.fragment_id,
449 current_fragment.fragment_id,
450 StreamFragmentEdge {
451 dispatch_strategy: exchange_node_strategy.clone(),
452 link_id: child_node.operator_id,
454 },
455 );
456
457 if result.is_err() {
461 child_node.operator_id = state.gen_operator_id() as u64;
464
465 let ref_fragment_node = child_fragment.node.as_ref().unwrap();
467 let no_shuffle_strategy = DispatchStrategy {
468 r#type: DispatcherType::NoShuffle as i32,
469 dist_key_indices: vec![],
470 output_indices: (0..ref_fragment_node.fields.len() as u32)
471 .collect(),
472 };
473
474 let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
475
476 let no_op_fragment = {
477 let node = state.gen_no_op_stream_node(StreamNode {
478 operator_id: no_shuffle_exchange_operator_id,
479 identity: "StreamNoShuffleExchange".into(),
480 node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
481 strategy: Some(no_shuffle_strategy.clone()),
482 }))),
483 input: vec![],
484
485 stream_key: ref_fragment_node.stream_key.clone(),
487 append_only: ref_fragment_node.append_only,
488 fields: ref_fragment_node.fields.clone(),
489 });
490
491 let mut fragment = state.new_stream_fragment();
492 fragment.node = Some(node.into());
493 Rc::new(fragment)
494 };
495
496 state.fragment_graph.add_fragment(no_op_fragment.clone());
497
498 state.fragment_graph.add_edge(
499 child_fragment.fragment_id,
500 no_op_fragment.fragment_id,
501 StreamFragmentEdge {
502 dispatch_strategy: no_shuffle_strategy,
504 link_id: no_shuffle_exchange_operator_id,
505 },
506 );
507 state.fragment_graph.add_edge(
508 no_op_fragment.fragment_id,
509 current_fragment.fragment_id,
510 StreamFragmentEdge {
511 dispatch_strategy: exchange_node_strategy,
513 link_id: child_node.operator_id,
514 },
515 );
516 }
517
518 Ok(child_node)
519 }
520
521 _ => build_fragment(state, current_fragment, child_node),
523 }
524 })
525 .collect::<Result<_>>()?;
526 Ok(stream_node)
527 })
528}