risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::rc::Rc;
25
26use educe::Educe;
27use risingwave_common::catalog::TableId;
28use risingwave_pb::plan_common::JoinType;
29use risingwave_pb::stream_plan::{
30    DispatchStrategy, DispatcherType, ExchangeNode, FragmentTypeFlag, NoOpNode, StreamContext,
31    StreamFragmentGraph as StreamFragmentGraphProto, StreamNode, StreamScanType,
32};
33
34use self::rewrite::build_delta_join_without_arrange;
35use crate::error::Result;
36use crate::optimizer::PlanRef;
37use crate::optimizer::plan_node::generic::GenericPlanRef;
38use crate::optimizer::plan_node::reorganize_elements_id;
39use crate::scheduler::SchedulerResult;
40
41/// The mutable state when building fragment graph.
42#[derive(Educe)]
43#[educe(Default)]
44pub struct BuildFragmentGraphState {
45    /// fragment graph field, transformed from input streaming plan.
46    fragment_graph: StreamFragmentGraph,
47    /// local fragment id
48    next_local_fragment_id: u32,
49
50    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
51    /// node traversing.
52    next_table_id: u32,
53
54    /// rewrite will produce new operators, and we need to track next operator id
55    #[educe(Default(expression = u32::MAX - 1))]
56    next_operator_id: u32,
57
58    /// dependent streaming job ids.
59    dependent_table_ids: HashSet<TableId>,
60
61    /// operator id to `LocalFragmentId` mapping used by share operator.
62    share_mapping: HashMap<u32, LocalFragmentId>,
63    /// operator id to `StreamNode` mapping used by share operator.
64    share_stream_node_mapping: HashMap<u32, StreamNode>,
65}
66
67impl BuildFragmentGraphState {
68    /// Create a new stream fragment with given node with generating a fragment id.
69    fn new_stream_fragment(&mut self) -> StreamFragment {
70        let fragment = StreamFragment::new(self.next_local_fragment_id);
71        self.next_local_fragment_id += 1;
72        fragment
73    }
74
75    /// Generate an operator id
76    fn gen_operator_id(&mut self) -> u32 {
77        self.next_operator_id -= 1;
78        self.next_operator_id
79    }
80
81    /// Generate an table id
82    pub fn gen_table_id(&mut self) -> u32 {
83        let ret = self.next_table_id;
84        self.next_table_id += 1;
85        ret
86    }
87
88    /// Generate an table id
89    pub fn gen_table_id_wrapped(&mut self) -> TableId {
90        TableId::new(self.gen_table_id())
91    }
92
93    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
94        self.share_stream_node_mapping
95            .insert(operator_id, stream_node);
96    }
97
98    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
99        self.share_stream_node_mapping.get(&operator_id)
100    }
101
102    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
103    /// stream node will also be copied from the `input` node.
104    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
105        StreamNode {
106            operator_id: self.gen_operator_id() as u64,
107            identity: "StreamNoOp".into(),
108            node_body: Some(NodeBody::NoOp(NoOpNode {})),
109
110            // Take input's properties.
111            stream_key: input.stream_key.clone(),
112            append_only: input.append_only,
113            fields: input.fields.clone(),
114
115            input: vec![input],
116        }
117    }
118}
119
120pub fn build_graph(plan_node: PlanRef) -> SchedulerResult<StreamFragmentGraphProto> {
121    let ctx = plan_node.plan_base().ctx();
122    let plan_node = reorganize_elements_id(plan_node);
123
124    let mut state = BuildFragmentGraphState::default();
125    let stream_node = plan_node.to_stream_prost(&mut state)?;
126    generate_fragment_graph(&mut state, stream_node).unwrap();
127    let mut fragment_graph = state.fragment_graph.to_protobuf();
128
129    // Set table ids.
130    fragment_graph.dependent_table_ids = state
131        .dependent_table_ids
132        .into_iter()
133        .map(|id| id.table_id)
134        .collect();
135    fragment_graph.table_ids_cnt = state.next_table_id;
136
137    // Set parallelism and vnode count.
138    {
139        let config = ctx.session_ctx().config();
140
141        fragment_graph.parallelism =
142            config
143                .streaming_parallelism()
144                .map(|parallelism| Parallelism {
145                    parallelism: parallelism.get(),
146                });
147        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
148    }
149
150    // Set timezone.
151    fragment_graph.ctx = Some(StreamContext {
152        timezone: ctx.get_session_timezone(),
153    });
154
155    Ok(fragment_graph)
156}
157
158#[cfg(any())]
159fn is_stateful_executor(stream_node: &StreamNode) -> bool {
160    matches!(
161        stream_node.get_node_body().unwrap(),
162        NodeBody::HashAgg(_)
163            | NodeBody::HashJoin(_)
164            | NodeBody::DeltaIndexJoin(_)
165            | NodeBody::StreamScan(_)
166            | NodeBody::StreamCdcScan(_)
167            | NodeBody::DynamicFilter(_)
168    )
169}
170
171/// Do some dirty rewrites before building the fragments.
172/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
173/// throughput) into multiple fragments, which may help improve the I/O concurrency.
174/// Known as "no-shuffle exchange" or "1v1 exchange".
175#[cfg(any())]
176fn rewrite_stream_node(
177    state: &mut BuildFragmentGraphState,
178    stream_node: StreamNode,
179    insert_exchange_flag: bool,
180) -> Result<StreamNode> {
181    let f = |child| {
182        // For stateful operators, set `exchange_flag = true`. If it's already true,
183        // force add an exchange.
184        if is_stateful_executor(&child) {
185            if insert_exchange_flag {
186                let child_node = rewrite_stream_node(state, child, true)?;
187
188                let strategy = DispatchStrategy {
189                    r#type: DispatcherType::NoShuffle.into(),
190                    dist_key_indices: vec![], // TODO: use distribution key
191                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
192                };
193                Ok(StreamNode {
194                    stream_key: child_node.stream_key.clone(),
195                    fields: child_node.fields.clone(),
196                    node_body: Some(NodeBody::Exchange(ExchangeNode {
197                        strategy: Some(strategy),
198                    })),
199                    operator_id: state.gen_operator_id() as u64,
200                    append_only: child_node.append_only,
201                    input: vec![child_node],
202                    identity: "Exchange (NoShuffle)".to_string(),
203                })
204            } else {
205                rewrite_stream_node(state, child, true)
206            }
207        } else {
208            match child.get_node_body()? {
209                // For exchanges, reset the flag.
210                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
211                // Otherwise, recursively visit the children.
212                _ => rewrite_stream_node(state, child, insert_exchange_flag),
213            }
214        }
215    };
216    Ok(StreamNode {
217        input: stream_node
218            .input
219            .into_iter()
220            .map(f)
221            .collect::<Result<_>>()?,
222        ..stream_node
223    })
224}
225
226/// Generate fragment DAG from input streaming plan by their dependency.
227fn generate_fragment_graph(
228    state: &mut BuildFragmentGraphState,
229    stream_node: StreamNode,
230) -> Result<()> {
231    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
232    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
233    // #4614
234    #[cfg(any())]
235    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
236
237    build_and_add_fragment(state, stream_node)?;
238    Ok(())
239}
240
241/// Use the given `stream_node` to create a fragment and add it to graph.
242fn build_and_add_fragment(
243    state: &mut BuildFragmentGraphState,
244    stream_node: StreamNode,
245) -> Result<Rc<StreamFragment>> {
246    let operator_id = stream_node.operator_id as u32;
247    match state.share_mapping.get(&operator_id) {
248        None => {
249            let mut fragment = state.new_stream_fragment();
250            let node = build_fragment(state, &mut fragment, stream_node)?;
251
252            // It's possible that the stream node is rewritten while building the fragment, for
253            // example, empty fragment to no-op fragment. We get the operator id again instead of
254            // using the original one.
255            let operator_id = node.operator_id as u32;
256
257            assert!(fragment.node.is_none());
258            fragment.node = Some(Box::new(node));
259            let fragment_ref = Rc::new(fragment);
260
261            state.fragment_graph.add_fragment(fragment_ref.clone());
262            state
263                .share_mapping
264                .insert(operator_id, fragment_ref.fragment_id);
265            Ok(fragment_ref)
266        }
267        Some(fragment_id) => Ok(state
268            .fragment_graph
269            .get_fragment(fragment_id)
270            .unwrap()
271            .clone()),
272    }
273}
274
275/// Build new fragment and link dependencies by visiting children recursively, update
276/// `requires_singleton` and `fragment_type` properties for current fragment.
277fn build_fragment(
278    state: &mut BuildFragmentGraphState,
279    current_fragment: &mut StreamFragment,
280    mut stream_node: StreamNode,
281) -> Result<StreamNode> {
282    recursive::tracker!().recurse(|_t| {
283        // Update current fragment based on the node we're visiting.
284        match stream_node.get_node_body()? {
285            NodeBody::BarrierRecv(_) => {
286                current_fragment.fragment_type_mask |= FragmentTypeFlag::BarrierRecv as u32
287            }
288
289            NodeBody::Source(node) => {
290                current_fragment.fragment_type_mask |= FragmentTypeFlag::Source as u32;
291
292                if let Some(source) = node.source_inner.as_ref()
293                    && let Some(source_info) = source.info.as_ref()
294                    && ((source_info.is_shared() && !source_info.is_distributed)
295                        || source.with_properties.is_new_fs_connector()
296                        || source.with_properties.is_iceberg_connector())
297                {
298                    current_fragment.requires_singleton = true;
299                }
300            }
301
302            NodeBody::Dml(_) => {
303                current_fragment.fragment_type_mask |= FragmentTypeFlag::Dml as u32;
304            }
305
306            NodeBody::Materialize(_) => {
307                current_fragment.fragment_type_mask |= FragmentTypeFlag::Mview as u32;
308            }
309
310            NodeBody::Sink(_) => {
311                current_fragment.fragment_type_mask |= FragmentTypeFlag::Sink as u32
312            }
313
314            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
315
316            NodeBody::StreamScan(node) => {
317                current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
318                match node.stream_scan_type() {
319                    StreamScanType::SnapshotBackfill => {
320                        current_fragment.fragment_type_mask |=
321                            FragmentTypeFlag::SnapshotBackfillStreamScan as u32;
322                    }
323                    StreamScanType::CrossDbSnapshotBackfill => {
324                        current_fragment.fragment_type_mask |=
325                            FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan as u32;
326                    }
327                    StreamScanType::Unspecified
328                    | StreamScanType::Chain
329                    | StreamScanType::Rearrange
330                    | StreamScanType::Backfill
331                    | StreamScanType::UpstreamOnly
332                    | StreamScanType::ArrangementBackfill => {}
333                }
334                // memorize table id for later use
335                // The table id could be a upstream CDC source
336                state
337                    .dependent_table_ids
338                    .insert(TableId::new(node.table_id));
339                current_fragment.upstream_table_ids.push(node.table_id);
340            }
341
342            NodeBody::StreamCdcScan(_) => {
343                current_fragment.fragment_type_mask |= FragmentTypeFlag::StreamScan as u32;
344                // the backfill algorithm is not parallel safe
345                current_fragment.requires_singleton = true;
346            }
347
348            NodeBody::CdcFilter(node) => {
349                current_fragment.fragment_type_mask |= FragmentTypeFlag::CdcFilter as u32;
350                // memorize upstream source id for later use
351                state
352                    .dependent_table_ids
353                    .insert(node.upstream_source_id.into());
354                current_fragment
355                    .upstream_table_ids
356                    .push(node.upstream_source_id);
357            }
358            NodeBody::SourceBackfill(node) => {
359                current_fragment.fragment_type_mask |= FragmentTypeFlag::SourceScan as u32;
360                // memorize upstream source id for later use
361                let source_id = node.upstream_source_id;
362                state.dependent_table_ids.insert(source_id.into());
363                current_fragment.upstream_table_ids.push(source_id);
364            }
365
366            NodeBody::Now(_) => {
367                // TODO: Remove this and insert a `BarrierRecv` instead.
368                current_fragment.fragment_type_mask |= FragmentTypeFlag::Now as u32;
369                current_fragment.requires_singleton = true;
370            }
371
372            NodeBody::Values(_) => {
373                current_fragment.fragment_type_mask |= FragmentTypeFlag::Values as u32;
374                current_fragment.requires_singleton = true;
375            }
376
377            NodeBody::StreamFsFetch(_) => {
378                current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32;
379            }
380
381            _ => {}
382        };
383
384        // handle join logic
385        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
386        {
387            if delta_index_join.get_join_type()? == JoinType::Inner
388                && delta_index_join.condition.is_none()
389            {
390                return build_delta_join_without_arrange(state, current_fragment, stream_node);
391            } else {
392                panic!("only inner join without non-equal condition is supported for delta joins");
393            }
394        }
395
396        // Usually we do not expect exchange node to be visited here, which should be handled by the
397        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
398        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
399        // node to it, so that the meta service can handle it correctly.
400        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
401            stream_node = state.gen_no_op_stream_node(stream_node);
402        }
403
404        // Visit plan children.
405        stream_node.input = stream_node
406            .input
407            .into_iter()
408            .map(|mut child_node| {
409                match child_node.get_node_body()? {
410                    // When exchange node is generated when doing rewrites, it could be having
411                    // zero input. In this case, we won't recursively visit its children.
412                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
413                    // Exchange node indicates a new child fragment.
414                    NodeBody::Exchange(exchange_node) => {
415                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
416
417                        // Exchange node should have only one input.
418                        let [input]: [_; 1] =
419                            std::mem::take(&mut child_node.input).try_into().unwrap();
420                        let child_fragment = build_and_add_fragment(state, input)?;
421
422                        let result = state.fragment_graph.try_add_edge(
423                            child_fragment.fragment_id,
424                            current_fragment.fragment_id,
425                            StreamFragmentEdge {
426                                dispatch_strategy: exchange_node_strategy.clone(),
427                                // Always use the exchange operator id as the link id.
428                                link_id: child_node.operator_id,
429                            },
430                        );
431
432                        // It's possible that there're multiple edges between two fragments, while the
433                        // meta service and the compute node does not expect this. In this case, we
434                        // manually insert a fragment of `NoOp` between the two fragments.
435                        if result.is_err() {
436                            // Assign a new operator id for the `Exchange`, so we can distinguish it
437                            // from duplicate edges and break the sharing.
438                            child_node.operator_id = state.gen_operator_id() as u64;
439
440                            // Take the upstream plan node as the reference for properties of `NoOp`.
441                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
442                            let no_shuffle_strategy = DispatchStrategy {
443                                r#type: DispatcherType::NoShuffle as i32,
444                                dist_key_indices: vec![],
445                                output_indices: (0..ref_fragment_node.fields.len() as u32)
446                                    .collect(),
447                            };
448
449                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
450
451                            let no_op_fragment = {
452                                let node = state.gen_no_op_stream_node(StreamNode {
453                                    operator_id: no_shuffle_exchange_operator_id,
454                                    identity: "StreamNoShuffleExchange".into(),
455                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
456                                        strategy: Some(no_shuffle_strategy.clone()),
457                                    }))),
458                                    input: vec![],
459
460                                    // Take reference's properties.
461                                    stream_key: ref_fragment_node.stream_key.clone(),
462                                    append_only: ref_fragment_node.append_only,
463                                    fields: ref_fragment_node.fields.clone(),
464                                });
465
466                                let mut fragment = state.new_stream_fragment();
467                                fragment.node = Some(node.into());
468                                Rc::new(fragment)
469                            };
470
471                            state.fragment_graph.add_fragment(no_op_fragment.clone());
472
473                            state.fragment_graph.add_edge(
474                                child_fragment.fragment_id,
475                                no_op_fragment.fragment_id,
476                                StreamFragmentEdge {
477                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
478                                    dispatch_strategy: no_shuffle_strategy,
479                                    link_id: no_shuffle_exchange_operator_id,
480                                },
481                            );
482                            state.fragment_graph.add_edge(
483                                no_op_fragment.fragment_id,
484                                current_fragment.fragment_id,
485                                StreamFragmentEdge {
486                                    // Use the original exchange strategy for downstream edge.
487                                    dispatch_strategy: exchange_node_strategy,
488                                    link_id: child_node.operator_id,
489                                },
490                            );
491                        }
492
493                        Ok(child_node)
494                    }
495
496                    // For other children, visit recursively.
497                    _ => build_fragment(state, current_fragment, child_node),
498                }
499            })
500            .collect::<Result<_>>()?;
501        Ok(stream_node)
502    })
503}