risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::TableId;
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_pb::plan_common::JoinType;
32use risingwave_pb::stream_plan::{
33    DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode, StreamContext,
34    StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
35};
36
37use self::rewrite::build_delta_join_without_arrange;
38use crate::error::Result;
39use crate::optimizer::PlanRef;
40use crate::optimizer::plan_node::generic::GenericPlanRef;
41use crate::optimizer::plan_node::reorganize_elements_id;
42use crate::scheduler::SchedulerResult;
43use crate::stream_fragmenter::parallelism::derive_parallelism;
44
45/// The mutable state when building fragment graph.
46#[derive(Educe)]
47#[educe(Default)]
48pub struct BuildFragmentGraphState {
49    /// fragment graph field, transformed from input streaming plan.
50    fragment_graph: StreamFragmentGraph,
51    /// local fragment id
52    next_local_fragment_id: u32,
53
54    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
55    /// node traversing.
56    next_table_id: u32,
57
58    /// rewrite will produce new operators, and we need to track next operator id
59    #[educe(Default(expression = u32::MAX - 1))]
60    next_operator_id: u32,
61
62    /// dependent streaming job ids.
63    dependent_table_ids: HashSet<TableId>,
64
65    /// operator id to `LocalFragmentId` mapping used by share operator.
66    share_mapping: HashMap<u32, LocalFragmentId>,
67    /// operator id to `StreamNode` mapping used by share operator.
68    share_stream_node_mapping: HashMap<u32, StreamNode>,
69}
70
71impl BuildFragmentGraphState {
72    /// Create a new stream fragment with given node with generating a fragment id.
73    fn new_stream_fragment(&mut self) -> StreamFragment {
74        let fragment = StreamFragment::new(self.next_local_fragment_id);
75        self.next_local_fragment_id += 1;
76        fragment
77    }
78
79    /// Generate an operator id
80    fn gen_operator_id(&mut self) -> u32 {
81        self.next_operator_id -= 1;
82        self.next_operator_id
83    }
84
85    /// Generate an table id
86    pub fn gen_table_id(&mut self) -> u32 {
87        let ret = self.next_table_id;
88        self.next_table_id += 1;
89        ret
90    }
91
92    /// Generate an table id
93    pub fn gen_table_id_wrapped(&mut self) -> TableId {
94        TableId::new(self.gen_table_id())
95    }
96
97    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
98        self.share_stream_node_mapping
99            .insert(operator_id, stream_node);
100    }
101
102    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
103        self.share_stream_node_mapping.get(&operator_id)
104    }
105
106    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
107    /// stream node will also be copied from the `input` node.
108    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
109        StreamNode {
110            operator_id: self.gen_operator_id() as u64,
111            identity: "StreamNoOp".into(),
112            node_body: Some(NodeBody::NoOp(NoOpNode {})),
113
114            // Take input's properties.
115            stream_key: input.stream_key.clone(),
116            append_only: input.append_only,
117            fields: input.fields.clone(),
118
119            input: vec![input],
120        }
121    }
122}
123
124// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
125pub enum GraphJobType {
126    Table,
127    MaterializedView,
128    Source,
129    Sink,
130    Index,
131}
132
133impl GraphJobType {
134    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
135        match self {
136            GraphJobType::Table => config.streaming_parallelism_for_table(),
137            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
138            GraphJobType::Source => config.streaming_parallelism_for_source(),
139            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
140            GraphJobType::Index => config.streaming_parallelism_for_index(),
141        }
142    }
143}
144
145pub fn build_graph(
146    plan_node: PlanRef,
147    job_type: Option<GraphJobType>,
148) -> SchedulerResult<StreamFragmentGraphProto> {
149    let ctx = plan_node.plan_base().ctx();
150    let plan_node = reorganize_elements_id(plan_node);
151
152    let mut state = BuildFragmentGraphState::default();
153    let stream_node = plan_node.to_stream_prost(&mut state)?;
154    generate_fragment_graph(&mut state, stream_node).unwrap();
155    let mut fragment_graph = state.fragment_graph.to_protobuf();
156
157    // Set table ids.
158    fragment_graph.dependent_table_ids = state
159        .dependent_table_ids
160        .into_iter()
161        .map(|id| id.table_id)
162        .collect();
163    fragment_graph.table_ids_cnt = state.next_table_id;
164
165    // Set parallelism and vnode count.
166    {
167        let config = ctx.session_ctx().config();
168        fragment_graph.parallelism = derive_parallelism(
169            job_type.map(|t| t.to_parallelism(config.deref())),
170            config.streaming_parallelism(),
171        );
172        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
173    }
174
175    // Set timezone.
176    fragment_graph.ctx = Some(StreamContext {
177        timezone: ctx.get_session_timezone(),
178    });
179
180    Ok(fragment_graph)
181}
182
183#[cfg(any())]
184fn is_stateful_executor(stream_node: &StreamNode) -> bool {
185    matches!(
186        stream_node.get_node_body().unwrap(),
187        NodeBody::HashAgg(_)
188            | NodeBody::HashJoin(_)
189            | NodeBody::DeltaIndexJoin(_)
190            | NodeBody::StreamScan(_)
191            | NodeBody::StreamCdcScan(_)
192            | NodeBody::DynamicFilter(_)
193    )
194}
195
196/// Do some dirty rewrites before building the fragments.
197/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
198/// throughput) into multiple fragments, which may help improve the I/O concurrency.
199/// Known as "no-shuffle exchange" or "1v1 exchange".
200#[cfg(any())]
201fn rewrite_stream_node(
202    state: &mut BuildFragmentGraphState,
203    stream_node: StreamNode,
204    insert_exchange_flag: bool,
205) -> Result<StreamNode> {
206    let f = |child| {
207        // For stateful operators, set `exchange_flag = true`. If it's already true,
208        // force add an exchange.
209        if is_stateful_executor(&child) {
210            if insert_exchange_flag {
211                let child_node = rewrite_stream_node(state, child, true)?;
212
213                let strategy = DispatchStrategy {
214                    r#type: DispatcherType::NoShuffle.into(),
215                    dist_key_indices: vec![], // TODO: use distribution key
216                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
217                };
218                Ok(StreamNode {
219                    stream_key: child_node.stream_key.clone(),
220                    fields: child_node.fields.clone(),
221                    node_body: Some(NodeBody::Exchange(ExchangeNode {
222                        strategy: Some(strategy),
223                    })),
224                    operator_id: state.gen_operator_id() as u64,
225                    append_only: child_node.append_only,
226                    input: vec![child_node],
227                    identity: "Exchange (NoShuffle)".to_string(),
228                })
229            } else {
230                rewrite_stream_node(state, child, true)
231            }
232        } else {
233            match child.get_node_body()? {
234                // For exchanges, reset the flag.
235                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
236                // Otherwise, recursively visit the children.
237                _ => rewrite_stream_node(state, child, insert_exchange_flag),
238            }
239        }
240    };
241    Ok(StreamNode {
242        input: stream_node
243            .input
244            .into_iter()
245            .map(f)
246            .collect::<Result<_>>()?,
247        ..stream_node
248    })
249}
250
251/// Generate fragment DAG from input streaming plan by their dependency.
252fn generate_fragment_graph(
253    state: &mut BuildFragmentGraphState,
254    stream_node: StreamNode,
255) -> Result<()> {
256    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
257    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
258    // #4614
259    #[cfg(any())]
260    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
261
262    build_and_add_fragment(state, stream_node)?;
263    Ok(())
264}
265
266/// Use the given `stream_node` to create a fragment and add it to graph.
267fn build_and_add_fragment(
268    state: &mut BuildFragmentGraphState,
269    stream_node: StreamNode,
270) -> Result<Rc<StreamFragment>> {
271    let operator_id = stream_node.operator_id as u32;
272    match state.share_mapping.get(&operator_id) {
273        None => {
274            let mut fragment = state.new_stream_fragment();
275            let node = build_fragment(state, &mut fragment, stream_node)?;
276
277            // It's possible that the stream node is rewritten while building the fragment, for
278            // example, empty fragment to no-op fragment. We get the operator id again instead of
279            // using the original one.
280            let operator_id = node.operator_id as u32;
281
282            assert!(fragment.node.is_none());
283            fragment.node = Some(Box::new(node));
284            let fragment_ref = Rc::new(fragment);
285
286            state.fragment_graph.add_fragment(fragment_ref.clone());
287            state
288                .share_mapping
289                .insert(operator_id, fragment_ref.fragment_id);
290            Ok(fragment_ref)
291        }
292        Some(fragment_id) => Ok(state
293            .fragment_graph
294            .get_fragment(fragment_id)
295            .unwrap()
296            .clone()),
297    }
298}
299
300/// Build new fragment and link dependencies by visiting children recursively, update
301/// `requires_singleton` and `fragment_type` properties for current fragment.
302fn build_fragment(
303    state: &mut BuildFragmentGraphState,
304    current_fragment: &mut StreamFragment,
305    mut stream_node: StreamNode,
306) -> Result<StreamNode> {
307    recursive::tracker!().recurse(|_t| {
308        // Update current fragment based on the node we're visiting.
309        match stream_node.get_node_body()? {
310            NodeBody::BarrierRecv(_) => {
311                current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
312            }
313
314            NodeBody::Source(node) => {
315                current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
316
317                if let Some(source) = node.source_inner.as_ref()
318                    && let Some(source_info) = source.info.as_ref()
319                    && ((source_info.is_shared() && !source_info.is_distributed)
320                        || source.with_properties.is_new_fs_connector()
321                        || source.with_properties.is_iceberg_connector())
322                {
323                    current_fragment.requires_singleton = true;
324                }
325            }
326
327            NodeBody::Dml(_) => {
328                current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
329            }
330
331            NodeBody::Materialize(_) => {
332                current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
333            }
334
335            NodeBody::Sink(_) => {
336                current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
337            }
338
339            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
340
341            NodeBody::StreamScan(node) => {
342                current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
343                match node.stream_scan_type() {
344                    StreamScanType::SnapshotBackfill => {
345                        current_fragment.fragment_type_mask |=
346                            FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
347                    }
348                    StreamScanType::CrossDbSnapshotBackfill => {
349                        current_fragment.fragment_type_mask |=
350                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
351                    }
352                    StreamScanType::Unspecified
353                    | StreamScanType::Chain
354                    | StreamScanType::Rearrange
355                    | StreamScanType::Backfill
356                    | StreamScanType::UpstreamOnly
357                    | StreamScanType::ArrangementBackfill => {}
358                }
359                // memorize table id for later use
360                // The table id could be a upstream CDC source
361                state
362                    .dependent_table_ids
363                    .insert(TableId::new(node.table_id));
364                current_fragment.upstream_table_ids.push(node.table_id);
365            }
366
367            NodeBody::StreamCdcScan(_) => {
368                current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
369                // the backfill algorithm is not parallel safe
370                current_fragment.requires_singleton = true;
371            }
372
373            NodeBody::CdcFilter(node) => {
374                current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
375                // memorize upstream source id for later use
376                state
377                    .dependent_table_ids
378                    .insert(node.upstream_source_id.into());
379                current_fragment
380                    .upstream_table_ids
381                    .push(node.upstream_source_id);
382            }
383            NodeBody::SourceBackfill(node) => {
384                current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
385                // memorize upstream source id for later use
386                let source_id = node.upstream_source_id;
387                state.dependent_table_ids.insert(source_id.into());
388                current_fragment.upstream_table_ids.push(source_id);
389            }
390
391            NodeBody::Now(_) => {
392                // TODO: Remove this and insert a `BarrierRecv` instead.
393                current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
394                current_fragment.requires_singleton = true;
395            }
396
397            NodeBody::Values(_) => {
398                current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
399                current_fragment.requires_singleton = true;
400            }
401
402            NodeBody::StreamFsFetch(_) => {
403                current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
404            }
405
406            _ => {}
407        };
408
409        // handle join logic
410        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
411        {
412            if delta_index_join.get_join_type()? == JoinType::Inner
413                && delta_index_join.condition.is_none()
414            {
415                return build_delta_join_without_arrange(state, current_fragment, stream_node);
416            } else {
417                panic!("only inner join without non-equal condition is supported for delta joins");
418            }
419        }
420
421        // Usually we do not expect exchange node to be visited here, which should be handled by the
422        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
423        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
424        // node to it, so that the meta service can handle it correctly.
425        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
426            stream_node = state.gen_no_op_stream_node(stream_node);
427        }
428
429        // Visit plan children.
430        stream_node.input = stream_node
431            .input
432            .into_iter()
433            .map(|mut child_node| {
434                match child_node.get_node_body()? {
435                    // When exchange node is generated when doing rewrites, it could be having
436                    // zero input. In this case, we won't recursively visit its children.
437                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
438                    // Exchange node indicates a new child fragment.
439                    NodeBody::Exchange(exchange_node) => {
440                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
441
442                        // Exchange node should have only one input.
443                        let [input]: [_; 1] =
444                            std::mem::take(&mut child_node.input).try_into().unwrap();
445                        let child_fragment = build_and_add_fragment(state, input)?;
446
447                        let result = state.fragment_graph.try_add_edge(
448                            child_fragment.fragment_id,
449                            current_fragment.fragment_id,
450                            StreamFragmentEdge {
451                                dispatch_strategy: exchange_node_strategy.clone(),
452                                // Always use the exchange operator id as the link id.
453                                link_id: child_node.operator_id,
454                            },
455                        );
456
457                        // It's possible that there're multiple edges between two fragments, while the
458                        // meta service and the compute node does not expect this. In this case, we
459                        // manually insert a fragment of `NoOp` between the two fragments.
460                        if result.is_err() {
461                            // Assign a new operator id for the `Exchange`, so we can distinguish it
462                            // from duplicate edges and break the sharing.
463                            child_node.operator_id = state.gen_operator_id() as u64;
464
465                            // Take the upstream plan node as the reference for properties of `NoOp`.
466                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
467                            let no_shuffle_strategy = DispatchStrategy {
468                                r#type: DispatcherType::NoShuffle as i32,
469                                dist_key_indices: vec![],
470                                output_indices: (0..ref_fragment_node.fields.len() as u32)
471                                    .collect(),
472                            };
473
474                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
475
476                            let no_op_fragment = {
477                                let node = state.gen_no_op_stream_node(StreamNode {
478                                    operator_id: no_shuffle_exchange_operator_id,
479                                    identity: "StreamNoShuffleExchange".into(),
480                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
481                                        strategy: Some(no_shuffle_strategy.clone()),
482                                    }))),
483                                    input: vec![],
484
485                                    // Take reference's properties.
486                                    stream_key: ref_fragment_node.stream_key.clone(),
487                                    append_only: ref_fragment_node.append_only,
488                                    fields: ref_fragment_node.fields.clone(),
489                                });
490
491                                let mut fragment = state.new_stream_fragment();
492                                fragment.node = Some(node.into());
493                                Rc::new(fragment)
494                            };
495
496                            state.fragment_graph.add_fragment(no_op_fragment.clone());
497
498                            state.fragment_graph.add_edge(
499                                child_fragment.fragment_id,
500                                no_op_fragment.fragment_id,
501                                StreamFragmentEdge {
502                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
503                                    dispatch_strategy: no_shuffle_strategy,
504                                    link_id: no_shuffle_exchange_operator_id,
505                                },
506                            );
507                            state.fragment_graph.add_edge(
508                                no_op_fragment.fragment_id,
509                                current_fragment.fragment_id,
510                                StreamFragmentEdge {
511                                    // Use the original exchange strategy for downstream edge.
512                                    dispatch_strategy: exchange_node_strategy,
513                                    link_id: child_node.operator_id,
514                                },
515                            );
516                        }
517
518                        Ok(child_node)
519                    }
520
521                    // For other children, visit recursively.
522                    _ => build_fragment(state, current_fragment, child_node),
523                }
524            })
525            .collect::<Result<_>>()?;
526        Ok(stream_node)
527    })
528}