risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::stream_plan::stream_node::NodeBody;
20mod parallelism;
21mod rewrite;
22
23use std::collections::{HashMap, HashSet};
24use std::ops::Deref;
25use std::rc::Rc;
26
27use educe::Educe;
28use risingwave_common::catalog::{FragmentTypeFlag, TableId};
29use risingwave_common::session_config::SessionConfig;
30use risingwave_common::session_config::parallelism::ConfigParallelism;
31use risingwave_connector::source::cdc::CdcScanOptions;
32use risingwave_pb::plan_common::JoinType;
33use risingwave_pb::stream_plan::{
34    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
35    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
36    StreamNode, StreamScanType,
37};
38
39use self::rewrite::build_delta_join_without_arrange;
40use crate::error::ErrorCode::NotSupported;
41use crate::error::{Result, RwError};
42use crate::optimizer::plan_node::generic::GenericPlanRef;
43use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
44use crate::stream_fragmenter::parallelism::derive_parallelism;
45
46/// The mutable state when building fragment graph.
47#[derive(Educe)]
48#[educe(Default)]
49pub struct BuildFragmentGraphState {
50    /// fragment graph field, transformed from input streaming plan.
51    fragment_graph: StreamFragmentGraph,
52    /// local fragment id
53    next_local_fragment_id: u32,
54
55    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
56    /// node traversing.
57    next_table_id: u32,
58
59    /// rewrite will produce new operators, and we need to track next operator id
60    #[educe(Default(expression = u32::MAX - 1))]
61    next_operator_id: u32,
62
63    /// dependent streaming job ids.
64    dependent_table_ids: HashSet<TableId>,
65
66    /// operator id to `LocalFragmentId` mapping used by share operator.
67    share_mapping: HashMap<u32, LocalFragmentId>,
68    /// operator id to `StreamNode` mapping used by share operator.
69    share_stream_node_mapping: HashMap<u32, StreamNode>,
70
71    has_source_backfill: bool,
72    has_snapshot_backfill: bool,
73    has_cross_db_snapshot_backfill: bool,
74}
75
76impl BuildFragmentGraphState {
77    /// Create a new stream fragment with given node with generating a fragment id.
78    fn new_stream_fragment(&mut self) -> StreamFragment {
79        let fragment = StreamFragment::new(self.next_local_fragment_id);
80        self.next_local_fragment_id += 1;
81        fragment
82    }
83
84    /// Generate an operator id
85    fn gen_operator_id(&mut self) -> u32 {
86        self.next_operator_id -= 1;
87        self.next_operator_id
88    }
89
90    /// Generate an table id
91    pub fn gen_table_id(&mut self) -> u32 {
92        let ret = self.next_table_id;
93        self.next_table_id += 1;
94        ret
95    }
96
97    /// Generate an table id
98    pub fn gen_table_id_wrapped(&mut self) -> TableId {
99        TableId::new(self.gen_table_id())
100    }
101
102    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
103        self.share_stream_node_mapping
104            .insert(operator_id, stream_node);
105    }
106
107    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
108        self.share_stream_node_mapping.get(&operator_id)
109    }
110
111    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
112    /// stream node will also be copied from the `input` node.
113    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
114        StreamNode {
115            operator_id: self.gen_operator_id() as u64,
116            identity: "StreamNoOp".into(),
117            node_body: Some(NodeBody::NoOp(NoOpNode {})),
118
119            // Take input's properties.
120            stream_key: input.stream_key.clone(),
121            append_only: input.append_only,
122            fields: input.fields.clone(),
123
124            input: vec![input],
125        }
126    }
127}
128
129// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
130pub enum GraphJobType {
131    Table,
132    MaterializedView,
133    Source,
134    Sink,
135    Index,
136}
137
138impl GraphJobType {
139    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
140        match self {
141            GraphJobType::Table => config.streaming_parallelism_for_table(),
142            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
143            GraphJobType::Source => config.streaming_parallelism_for_source(),
144            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
145            GraphJobType::Index => config.streaming_parallelism_for_index(),
146        }
147    }
148}
149
150pub fn build_graph(
151    plan_node: PlanRef,
152    job_type: Option<GraphJobType>,
153) -> Result<StreamFragmentGraphProto> {
154    build_graph_with_strategy(plan_node, job_type, None)
155}
156
157pub fn build_graph_with_strategy(
158    plan_node: PlanRef,
159    job_type: Option<GraphJobType>,
160    backfill_order: Option<BackfillOrder>,
161) -> Result<StreamFragmentGraphProto> {
162    let ctx = plan_node.plan_base().ctx();
163    let plan_node = reorganize_elements_id(plan_node);
164
165    let mut state = BuildFragmentGraphState::default();
166    let stream_node = plan_node.to_stream_prost(&mut state)?;
167    generate_fragment_graph(&mut state, stream_node)?;
168    if state.has_source_backfill && state.has_snapshot_backfill {
169        return Err(RwError::from(NotSupported(
170            "Snapshot backfill with shared source backfill is not supported".to_owned(),
171            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
172                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
173                .to_owned(),
174        )));
175    }
176    if state.has_cross_db_snapshot_backfill
177        && let Some(ref backfill_order) = backfill_order
178        && !backfill_order.order.is_empty()
179    {
180        return Err(RwError::from(NotSupported(
181            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
182            "Please remove backfill order specification from your query".to_owned(),
183        )));
184    }
185
186    let mut fragment_graph = state.fragment_graph.to_protobuf();
187
188    // Set table ids.
189    fragment_graph.dependent_table_ids = state
190        .dependent_table_ids
191        .into_iter()
192        .map(|id| id.table_id)
193        .collect();
194    fragment_graph.table_ids_cnt = state.next_table_id;
195
196    // Set parallelism and vnode count.
197    {
198        let config = ctx.session_ctx().config();
199        fragment_graph.parallelism = derive_parallelism(
200            job_type.map(|t| t.to_parallelism(config.deref())),
201            config.streaming_parallelism(),
202        );
203        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204    }
205
206    // Set timezone.
207    fragment_graph.ctx = Some(StreamContext {
208        timezone: ctx.get_session_timezone(),
209    });
210
211    fragment_graph.backfill_order = backfill_order;
212
213    Ok(fragment_graph)
214}
215
216#[cfg(any())]
217fn is_stateful_executor(stream_node: &StreamNode) -> bool {
218    matches!(
219        stream_node.get_node_body().unwrap(),
220        NodeBody::HashAgg(_)
221            | NodeBody::HashJoin(_)
222            | NodeBody::DeltaIndexJoin(_)
223            | NodeBody::StreamScan(_)
224            | NodeBody::StreamCdcScan(_)
225            | NodeBody::DynamicFilter(_)
226    )
227}
228
229/// Do some dirty rewrites before building the fragments.
230/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
231/// throughput) into multiple fragments, which may help improve the I/O concurrency.
232/// Known as "no-shuffle exchange" or "1v1 exchange".
233#[cfg(any())]
234fn rewrite_stream_node(
235    state: &mut BuildFragmentGraphState,
236    stream_node: StreamNode,
237    insert_exchange_flag: bool,
238) -> Result<StreamNode> {
239    let f = |child| {
240        // For stateful operators, set `exchange_flag = true`. If it's already true,
241        // force add an exchange.
242        if is_stateful_executor(&child) {
243            if insert_exchange_flag {
244                let child_node = rewrite_stream_node(state, child, true)?;
245
246                let strategy = DispatchStrategy {
247                    r#type: DispatcherType::NoShuffle.into(),
248                    dist_key_indices: vec![], // TODO: use distribution key
249                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
250                };
251                Ok(StreamNode {
252                    stream_key: child_node.stream_key.clone(),
253                    fields: child_node.fields.clone(),
254                    node_body: Some(NodeBody::Exchange(ExchangeNode {
255                        strategy: Some(strategy),
256                    })),
257                    operator_id: state.gen_operator_id() as u64,
258                    append_only: child_node.append_only,
259                    input: vec![child_node],
260                    identity: "Exchange (NoShuffle)".to_string(),
261                })
262            } else {
263                rewrite_stream_node(state, child, true)
264            }
265        } else {
266            match child.get_node_body()? {
267                // For exchanges, reset the flag.
268                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
269                // Otherwise, recursively visit the children.
270                _ => rewrite_stream_node(state, child, insert_exchange_flag),
271            }
272        }
273    };
274    Ok(StreamNode {
275        input: stream_node
276            .input
277            .into_iter()
278            .map(f)
279            .collect::<Result<_>>()?,
280        ..stream_node
281    })
282}
283
284/// Generate fragment DAG from input streaming plan by their dependency.
285fn generate_fragment_graph(
286    state: &mut BuildFragmentGraphState,
287    stream_node: StreamNode,
288) -> Result<()> {
289    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
290    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
291    // #4614
292    #[cfg(any())]
293    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
294
295    build_and_add_fragment(state, stream_node)?;
296    Ok(())
297}
298
299/// Use the given `stream_node` to create a fragment and add it to graph.
300fn build_and_add_fragment(
301    state: &mut BuildFragmentGraphState,
302    stream_node: StreamNode,
303) -> Result<Rc<StreamFragment>> {
304    let operator_id = stream_node.operator_id as u32;
305    match state.share_mapping.get(&operator_id) {
306        None => {
307            let mut fragment = state.new_stream_fragment();
308            let node = build_fragment(state, &mut fragment, stream_node)?;
309
310            // It's possible that the stream node is rewritten while building the fragment, for
311            // example, empty fragment to no-op fragment. We get the operator id again instead of
312            // using the original one.
313            let operator_id = node.operator_id as u32;
314
315            assert!(fragment.node.is_none());
316            fragment.node = Some(Box::new(node));
317            let fragment_ref = Rc::new(fragment);
318
319            state.fragment_graph.add_fragment(fragment_ref.clone());
320            state
321                .share_mapping
322                .insert(operator_id, fragment_ref.fragment_id);
323            Ok(fragment_ref)
324        }
325        Some(fragment_id) => Ok(state
326            .fragment_graph
327            .get_fragment(fragment_id)
328            .unwrap()
329            .clone()),
330    }
331}
332
333/// Build new fragment and link dependencies by visiting children recursively, update
334/// `requires_singleton` and `fragment_type` properties for current fragment.
335fn build_fragment(
336    state: &mut BuildFragmentGraphState,
337    current_fragment: &mut StreamFragment,
338    mut stream_node: StreamNode,
339) -> Result<StreamNode> {
340    recursive::tracker!().recurse(|_t| {
341        // Update current fragment based on the node we're visiting.
342        match stream_node.get_node_body()? {
343            NodeBody::BarrierRecv(_) => current_fragment
344                .fragment_type_mask
345                .add(FragmentTypeFlag::BarrierRecv),
346
347            NodeBody::Source(node) => {
348                current_fragment
349                    .fragment_type_mask
350                    .add(FragmentTypeFlag::Source);
351
352                if let Some(source) = node.source_inner.as_ref()
353                    && let Some(source_info) = source.info.as_ref()
354                    && ((source_info.is_shared() && !source_info.is_distributed)
355                        || source.with_properties.requires_singleton())
356                {
357                    current_fragment.requires_singleton = true;
358                }
359            }
360
361            NodeBody::Dml(_) => {
362                current_fragment
363                    .fragment_type_mask
364                    .add(FragmentTypeFlag::Dml);
365            }
366
367            NodeBody::Materialize(_) => {
368                current_fragment
369                    .fragment_type_mask
370                    .add(FragmentTypeFlag::Mview);
371            }
372
373            NodeBody::Sink(_) => current_fragment
374                .fragment_type_mask
375                .add(FragmentTypeFlag::Sink),
376
377            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
378
379            NodeBody::StreamScan(node) => {
380                current_fragment
381                    .fragment_type_mask
382                    .add(FragmentTypeFlag::StreamScan);
383                match node.stream_scan_type() {
384                    StreamScanType::SnapshotBackfill => {
385                        current_fragment
386                            .fragment_type_mask
387                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
388                        state.has_snapshot_backfill = true;
389                    }
390                    StreamScanType::CrossDbSnapshotBackfill => {
391                        current_fragment
392                            .fragment_type_mask
393                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
394                        state.has_cross_db_snapshot_backfill = true;
395                    }
396                    StreamScanType::Unspecified
397                    | StreamScanType::Chain
398                    | StreamScanType::Rearrange
399                    | StreamScanType::Backfill
400                    | StreamScanType::UpstreamOnly
401                    | StreamScanType::ArrangementBackfill => {}
402                }
403                // memorize table id for later use
404                // The table id could be a upstream CDC source
405                state
406                    .dependent_table_ids
407                    .insert(TableId::new(node.table_id));
408                current_fragment.upstream_table_ids.push(node.table_id);
409            }
410
411            NodeBody::StreamCdcScan(node) => {
412                if let Some(o) = node.options
413                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
414                {
415                    // Use parallel CDC backfill.
416                    current_fragment
417                        .fragment_type_mask
418                        .add(FragmentTypeFlag::StreamCdcScan);
419                } else {
420                    current_fragment
421                        .fragment_type_mask
422                        .add(FragmentTypeFlag::StreamScan);
423                    // the backfill algorithm is not parallel safe
424                    current_fragment.requires_singleton = true;
425                }
426                state.has_source_backfill = true;
427            }
428
429            NodeBody::CdcFilter(node) => {
430                current_fragment
431                    .fragment_type_mask
432                    .add(FragmentTypeFlag::CdcFilter);
433                // memorize upstream source id for later use
434                state
435                    .dependent_table_ids
436                    .insert(node.upstream_source_id.into());
437                current_fragment
438                    .upstream_table_ids
439                    .push(node.upstream_source_id);
440            }
441            NodeBody::SourceBackfill(node) => {
442                current_fragment
443                    .fragment_type_mask
444                    .add(FragmentTypeFlag::SourceScan);
445                // memorize upstream source id for later use
446                let source_id = node.upstream_source_id;
447                state.dependent_table_ids.insert(source_id.into());
448                current_fragment.upstream_table_ids.push(source_id);
449                state.has_source_backfill = true;
450            }
451
452            NodeBody::Now(_) => {
453                // TODO: Remove this and insert a `BarrierRecv` instead.
454                current_fragment
455                    .fragment_type_mask
456                    .add(FragmentTypeFlag::Now);
457                current_fragment.requires_singleton = true;
458            }
459
460            NodeBody::Values(_) => {
461                current_fragment
462                    .fragment_type_mask
463                    .add(FragmentTypeFlag::Values);
464                current_fragment.requires_singleton = true;
465            }
466
467            NodeBody::StreamFsFetch(_) => {
468                current_fragment
469                    .fragment_type_mask
470                    .add(FragmentTypeFlag::FsFetch);
471            }
472
473            NodeBody::VectorIndexWrite(_) => {
474                current_fragment
475                    .fragment_type_mask
476                    .add(FragmentTypeFlag::VectorIndexWrite);
477            }
478
479            _ => {}
480        };
481
482        // handle join logic
483        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
484        {
485            if delta_index_join.get_join_type()? == JoinType::Inner
486                && delta_index_join.condition.is_none()
487            {
488                return build_delta_join_without_arrange(state, current_fragment, stream_node);
489            } else {
490                panic!("only inner join without non-equal condition is supported for delta joins");
491            }
492        }
493
494        // Usually we do not expect exchange node to be visited here, which should be handled by the
495        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
496        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
497        // node to it, so that the meta service can handle it correctly.
498        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
499            stream_node = state.gen_no_op_stream_node(stream_node);
500        }
501
502        // Visit plan children.
503        stream_node.input = stream_node
504            .input
505            .into_iter()
506            .map(|mut child_node| {
507                match child_node.get_node_body()? {
508                    // When exchange node is generated when doing rewrites, it could be having
509                    // zero input. In this case, we won't recursively visit its children.
510                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
511                    // Exchange node indicates a new child fragment.
512                    NodeBody::Exchange(exchange_node) => {
513                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
514
515                        // Exchange node should have only one input.
516                        let [input]: [_; 1] =
517                            std::mem::take(&mut child_node.input).try_into().unwrap();
518                        let child_fragment = build_and_add_fragment(state, input)?;
519
520                        let result = state.fragment_graph.try_add_edge(
521                            child_fragment.fragment_id,
522                            current_fragment.fragment_id,
523                            StreamFragmentEdge {
524                                dispatch_strategy: exchange_node_strategy.clone(),
525                                // Always use the exchange operator id as the link id.
526                                link_id: child_node.operator_id,
527                            },
528                        );
529
530                        // It's possible that there're multiple edges between two fragments, while the
531                        // meta service and the compute node does not expect this. In this case, we
532                        // manually insert a fragment of `NoOp` between the two fragments.
533                        if result.is_err() {
534                            // Assign a new operator id for the `Exchange`, so we can distinguish it
535                            // from duplicate edges and break the sharing.
536                            child_node.operator_id = state.gen_operator_id() as u64;
537
538                            // Take the upstream plan node as the reference for properties of `NoOp`.
539                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
540                            let no_shuffle_strategy = DispatchStrategy {
541                                r#type: DispatcherType::NoShuffle as i32,
542                                dist_key_indices: vec![],
543                                output_mapping: PbDispatchOutputMapping::identical(
544                                    ref_fragment_node.fields.len(),
545                                )
546                                .into(),
547                            };
548
549                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
550
551                            let no_op_fragment = {
552                                let node = state.gen_no_op_stream_node(StreamNode {
553                                    operator_id: no_shuffle_exchange_operator_id,
554                                    identity: "StreamNoShuffleExchange".into(),
555                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
556                                        strategy: Some(no_shuffle_strategy.clone()),
557                                    }))),
558                                    input: vec![],
559
560                                    // Take reference's properties.
561                                    stream_key: ref_fragment_node.stream_key.clone(),
562                                    append_only: ref_fragment_node.append_only,
563                                    fields: ref_fragment_node.fields.clone(),
564                                });
565
566                                let mut fragment = state.new_stream_fragment();
567                                fragment.node = Some(node.into());
568                                Rc::new(fragment)
569                            };
570
571                            state.fragment_graph.add_fragment(no_op_fragment.clone());
572
573                            state.fragment_graph.add_edge(
574                                child_fragment.fragment_id,
575                                no_op_fragment.fragment_id,
576                                StreamFragmentEdge {
577                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
578                                    dispatch_strategy: no_shuffle_strategy,
579                                    link_id: no_shuffle_exchange_operator_id,
580                                },
581                            );
582                            state.fragment_graph.add_edge(
583                                no_op_fragment.fragment_id,
584                                current_fragment.fragment_id,
585                                StreamFragmentEdge {
586                                    // Use the original exchange strategy for downstream edge.
587                                    dispatch_strategy: exchange_node_strategy,
588                                    link_id: child_node.operator_id,
589                                },
590                            );
591                        }
592
593                        Ok(child_node)
594                    }
595
596                    // For other children, visit recursively.
597                    _ => build_fragment(state, current_fragment, child_node),
598                }
599            })
600            .collect::<Result<_>>()?;
601        Ok(stream_node)
602    })
603}