risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39    StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::derive_parallelism;
49
50/// The mutable state when building fragment graph.
51#[derive(Educe)]
52#[educe(Default)]
53pub struct BuildFragmentGraphState {
54    /// fragment graph field, transformed from input streaming plan.
55    fragment_graph: StreamFragmentGraph,
56    /// local fragment id
57    next_local_fragment_id: FragmentId,
58
59    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
60    /// node traversing.
61    next_table_id: u32,
62
63    /// rewrite will produce new operators, and we need to track next operator id
64    #[educe(Default(expression = u32::MAX - 1))]
65    next_operator_id: u32,
66
67    /// dependent streaming job ids.
68    dependent_table_ids: HashSet<TableId>,
69
70    /// operator id to `LocalFragmentId` mapping used by share operator.
71    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
72    /// operator id to `StreamNode` mapping used by share operator.
73    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
74
75    has_source_backfill: bool,
76    has_snapshot_backfill: bool,
77    has_cross_db_snapshot_backfill: bool,
78    tables: HashMap<TableId, Table>,
79}
80
81impl BuildFragmentGraphState {
82    /// Create a new stream fragment with given node with generating a fragment id.
83    fn new_stream_fragment(&mut self) -> StreamFragment {
84        let fragment = StreamFragment::new(self.next_local_fragment_id);
85        self.next_local_fragment_id += 1;
86        fragment
87    }
88
89    /// Generate an operator id
90    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
91        self.next_operator_id -= 1;
92        LocalOperatorId::new(self.next_operator_id).into()
93    }
94
95    /// Generate an table id
96    pub fn gen_table_id(&mut self) -> u32 {
97        let ret = self.next_table_id;
98        self.next_table_id += 1;
99        ret
100    }
101
102    /// Generate an table id
103    pub fn gen_table_id_wrapped(&mut self) -> TableId {
104        TableId::new(self.gen_table_id())
105    }
106
107    pub fn add_share_stream_node(
108        &mut self,
109        operator_id: StreamNodeLocalOperatorId,
110        stream_node: StreamNode,
111    ) {
112        self.share_stream_node_mapping
113            .insert(operator_id, stream_node);
114    }
115
116    pub fn get_share_stream_node(
117        &mut self,
118        operator_id: StreamNodeLocalOperatorId,
119    ) -> Option<&StreamNode> {
120        self.share_stream_node_mapping.get(&operator_id)
121    }
122
123    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
124    /// stream node will also be copied from the `input` node.
125    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
126        StreamNode {
127            operator_id: self.gen_operator_id(),
128            identity: "StreamNoOp".into(),
129            node_body: Some(NodeBody::NoOp(NoOpNode {})),
130
131            // Take input's properties.
132            stream_key: input.stream_key.clone(),
133            stream_kind: input.stream_kind,
134            fields: input.fields.clone(),
135
136            input: vec![input],
137        }
138    }
139}
140
141// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
142pub enum GraphJobType {
143    Table,
144    MaterializedView,
145    Source,
146    Sink,
147    Index,
148}
149
150impl GraphJobType {
151    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
152        match self {
153            GraphJobType::Table => config.streaming_parallelism_for_table(),
154            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
155            GraphJobType::Source => config.streaming_parallelism_for_source(),
156            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
157            GraphJobType::Index => config.streaming_parallelism_for_index(),
158        }
159    }
160}
161
162pub fn build_graph(
163    plan_node: PlanRef,
164    job_type: Option<GraphJobType>,
165) -> Result<StreamFragmentGraphProto> {
166    build_graph_with_strategy(plan_node, job_type, None)
167}
168
169pub fn build_graph_with_strategy(
170    plan_node: PlanRef,
171    job_type: Option<GraphJobType>,
172    backfill_order: Option<BackfillOrder>,
173) -> Result<StreamFragmentGraphProto> {
174    let ctx = plan_node.plan_base().ctx();
175    let plan_node = reorganize_elements_id(plan_node);
176
177    let mut state = BuildFragmentGraphState::default();
178    let stream_node = plan_node.to_stream_prost(&mut state)?;
179    generate_fragment_graph(&mut state, stream_node)?;
180    if state.has_source_backfill && state.has_snapshot_backfill {
181        return Err(RwError::from(NotSupported(
182            "Snapshot backfill with shared source backfill is not supported".to_owned(),
183            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
184                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
185                .to_owned(),
186        )));
187    }
188    if state.has_cross_db_snapshot_backfill
189        && let Some(ref backfill_order) = backfill_order
190        && !backfill_order.order.is_empty()
191    {
192        return Err(RwError::from(NotSupported(
193            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
194            "Please remove backfill order specification from your query".to_owned(),
195        )));
196    }
197
198    let mut fragment_graph = state.fragment_graph.to_protobuf();
199
200    // Set table ids.
201    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
202    fragment_graph.table_ids_cnt = state.next_table_id;
203
204    // Set parallelism and vnode count.
205    {
206        let config = ctx.session_ctx().config();
207        fragment_graph.parallelism = derive_parallelism(
208            job_type.map(|t| t.to_parallelism(config.deref())),
209            config.streaming_parallelism(),
210        );
211        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
212    }
213
214    // Set context for this streaming job.
215    let config_override = ctx
216        .session_ctx()
217        .config()
218        .to_initial_streaming_config_override()
219        .context("invalid initial streaming config override")?;
220    fragment_graph.ctx = Some(StreamContext {
221        timezone: ctx.get_session_timezone(),
222        config_override,
223    });
224
225    fragment_graph.backfill_order = backfill_order;
226
227    Ok(fragment_graph)
228}
229
230#[cfg(any())]
231fn is_stateful_executor(stream_node: &StreamNode) -> bool {
232    matches!(
233        stream_node.get_node_body().unwrap(),
234        NodeBody::HashAgg(_)
235            | NodeBody::HashJoin(_)
236            | NodeBody::DeltaIndexJoin(_)
237            | NodeBody::StreamScan(_)
238            | NodeBody::StreamCdcScan(_)
239            | NodeBody::DynamicFilter(_)
240    )
241}
242
243/// Do some dirty rewrites before building the fragments.
244/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
245/// throughput) into multiple fragments, which may help improve the I/O concurrency.
246/// Known as "no-shuffle exchange" or "1v1 exchange".
247#[cfg(any())]
248fn rewrite_stream_node(
249    state: &mut BuildFragmentGraphState,
250    stream_node: StreamNode,
251    insert_exchange_flag: bool,
252) -> Result<StreamNode> {
253    let f = |child| {
254        // For stateful operators, set `exchange_flag = true`. If it's already true,
255        // force add an exchange.
256        if is_stateful_executor(&child) {
257            if insert_exchange_flag {
258                let child_node = rewrite_stream_node(state, child, true)?;
259
260                let strategy = DispatchStrategy {
261                    r#type: DispatcherType::NoShuffle.into(),
262                    dist_key_indices: vec![], // TODO: use distribution key
263                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
264                };
265                Ok(StreamNode {
266                    stream_key: child_node.stream_key.clone(),
267                    fields: child_node.fields.clone(),
268                    node_body: Some(NodeBody::Exchange(ExchangeNode {
269                        strategy: Some(strategy),
270                    })),
271                    operator_id: state.gen_operator_id(),
272                    append_only: child_node.append_only,
273                    input: vec![child_node],
274                    identity: "Exchange (NoShuffle)".to_string(),
275                })
276            } else {
277                rewrite_stream_node(state, child, true)
278            }
279        } else {
280            match child.get_node_body()? {
281                // For exchanges, reset the flag.
282                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
283                // Otherwise, recursively visit the children.
284                _ => rewrite_stream_node(state, child, insert_exchange_flag),
285            }
286        }
287    };
288    Ok(StreamNode {
289        input: stream_node
290            .input
291            .into_iter()
292            .map(f)
293            .collect::<Result<_>>()?,
294        ..stream_node
295    })
296}
297
298/// Generate fragment DAG from input streaming plan by their dependency.
299fn generate_fragment_graph(
300    state: &mut BuildFragmentGraphState,
301    stream_node: StreamNode,
302) -> Result<()> {
303    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
304    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
305    // #4614
306    #[cfg(any())]
307    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
308
309    build_and_add_fragment(state, stream_node)?;
310    Ok(())
311}
312
313/// Use the given `stream_node` to create a fragment and add it to graph.
314fn build_and_add_fragment(
315    state: &mut BuildFragmentGraphState,
316    stream_node: StreamNode,
317) -> Result<Rc<StreamFragment>> {
318    let operator_id = stream_node.operator_id;
319    match state.share_mapping.get(&operator_id) {
320        None => {
321            let mut fragment = state.new_stream_fragment();
322            let node = build_fragment(state, &mut fragment, stream_node)?;
323
324            // It's possible that the stream node is rewritten while building the fragment, for
325            // example, empty fragment to no-op fragment. We get the operator id again instead of
326            // using the original one.
327            let operator_id = node.operator_id;
328
329            assert!(fragment.node.is_none());
330            fragment.node = Some(Box::new(node));
331            let fragment_ref = Rc::new(fragment);
332
333            state.fragment_graph.add_fragment(fragment_ref.clone());
334            state
335                .share_mapping
336                .insert(operator_id, fragment_ref.fragment_id);
337            Ok(fragment_ref)
338        }
339        Some(fragment_id) => Ok(state
340            .fragment_graph
341            .get_fragment(fragment_id)
342            .unwrap()
343            .clone()),
344    }
345}
346
347/// Build new fragment and link dependencies by visiting children recursively, update
348/// `requires_singleton` and `fragment_type` properties for current fragment.
349fn build_fragment(
350    state: &mut BuildFragmentGraphState,
351    current_fragment: &mut StreamFragment,
352    mut stream_node: StreamNode,
353) -> Result<StreamNode> {
354    recursive::tracker!().recurse(|_t| {
355        // Update current fragment based on the node we're visiting.
356        match stream_node.get_node_body()? {
357            NodeBody::BarrierRecv(_) => current_fragment
358                .fragment_type_mask
359                .add(FragmentTypeFlag::BarrierRecv),
360
361            NodeBody::Source(node) => {
362                current_fragment
363                    .fragment_type_mask
364                    .add(FragmentTypeFlag::Source);
365
366                if let Some(source) = node.source_inner.as_ref()
367                    && let Some(source_info) = source.info.as_ref()
368                    && ((source_info.is_shared() && !source_info.is_distributed)
369                        || source.with_properties.requires_singleton())
370                {
371                    current_fragment.requires_singleton = true;
372                }
373            }
374
375            NodeBody::Dml(_) => {
376                current_fragment
377                    .fragment_type_mask
378                    .add(FragmentTypeFlag::Dml);
379            }
380
381            NodeBody::Materialize(_) => {
382                current_fragment
383                    .fragment_type_mask
384                    .add(FragmentTypeFlag::Mview);
385            }
386
387            NodeBody::Sink(_) => current_fragment
388                .fragment_type_mask
389                .add(FragmentTypeFlag::Sink),
390
391            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
392
393            NodeBody::EowcGapFill(node) => {
394                let table = node.buffer_table.as_ref().unwrap().clone();
395                state.tables.insert(table.id, table);
396                let table = node.prev_row_table.as_ref().unwrap().clone();
397                state.tables.insert(table.id, table);
398            }
399
400            NodeBody::GapFill(node) => {
401                let table = node.state_table.as_ref().unwrap().clone();
402                state.tables.insert(table.id, table);
403            }
404
405            NodeBody::StreamScan(node) => {
406                current_fragment
407                    .fragment_type_mask
408                    .add(FragmentTypeFlag::StreamScan);
409                match node.stream_scan_type() {
410                    StreamScanType::SnapshotBackfill => {
411                        current_fragment
412                            .fragment_type_mask
413                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
414                        state.has_snapshot_backfill = true;
415                    }
416                    StreamScanType::CrossDbSnapshotBackfill => {
417                        current_fragment
418                            .fragment_type_mask
419                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
420                        state.has_cross_db_snapshot_backfill = true;
421                    }
422                    StreamScanType::Unspecified
423                    | StreamScanType::Chain
424                    | StreamScanType::Rearrange
425                    | StreamScanType::Backfill
426                    | StreamScanType::UpstreamOnly
427                    | StreamScanType::ArrangementBackfill => {}
428                }
429                // memorize table id for later use
430                // The table id could be a upstream CDC source
431                state.dependent_table_ids.insert(node.table_id);
432
433                // Add state table if present
434                if let Some(state_table) = &node.state_table {
435                    let table = state_table.clone();
436                    state.tables.insert(table.id, table);
437                }
438            }
439
440            NodeBody::StreamCdcScan(node) => {
441                if let Some(o) = node.options
442                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
443                {
444                    // Use parallel CDC backfill.
445                    current_fragment
446                        .fragment_type_mask
447                        .add(FragmentTypeFlag::StreamCdcScan);
448                } else {
449                    current_fragment
450                        .fragment_type_mask
451                        .add(FragmentTypeFlag::StreamScan);
452                    // the backfill algorithm is not parallel safe
453                    current_fragment.requires_singleton = true;
454                }
455                state.has_source_backfill = true;
456            }
457
458            NodeBody::CdcFilter(node) => {
459                current_fragment
460                    .fragment_type_mask
461                    .add(FragmentTypeFlag::CdcFilter);
462                // memorize upstream source id for later use
463                state
464                    .dependent_table_ids
465                    .insert(node.upstream_source_id.as_cdc_table_id());
466            }
467            NodeBody::SourceBackfill(node) => {
468                current_fragment
469                    .fragment_type_mask
470                    .add(FragmentTypeFlag::SourceScan);
471                // memorize upstream source id for later use
472                let source_id = node.upstream_source_id;
473                state
474                    .dependent_table_ids
475                    .insert(source_id.as_cdc_table_id());
476                state.has_source_backfill = true;
477            }
478
479            NodeBody::Now(_) => {
480                // TODO: Remove this and insert a `BarrierRecv` instead.
481                current_fragment
482                    .fragment_type_mask
483                    .add(FragmentTypeFlag::Now);
484                current_fragment.requires_singleton = true;
485            }
486
487            NodeBody::Values(_) => {
488                current_fragment
489                    .fragment_type_mask
490                    .add(FragmentTypeFlag::Values);
491                current_fragment.requires_singleton = true;
492            }
493
494            NodeBody::StreamFsFetch(_) => {
495                current_fragment
496                    .fragment_type_mask
497                    .add(FragmentTypeFlag::FsFetch);
498            }
499
500            NodeBody::VectorIndexWrite(_) => {
501                current_fragment
502                    .fragment_type_mask
503                    .add(FragmentTypeFlag::VectorIndexWrite);
504            }
505
506            NodeBody::UpstreamSinkUnion(_) => {
507                current_fragment
508                    .fragment_type_mask
509                    .add(FragmentTypeFlag::UpstreamSinkUnion);
510            }
511
512            NodeBody::LocalityProvider(_) => {
513                current_fragment
514                    .fragment_type_mask
515                    .add(FragmentTypeFlag::LocalityProvider);
516            }
517
518            _ => {}
519        };
520
521        // handle join logic
522        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
523        {
524            if delta_index_join.get_join_type()? == JoinType::Inner
525                && delta_index_join.condition.is_none()
526            {
527                return build_delta_join_without_arrange(state, current_fragment, stream_node);
528            } else {
529                panic!("only inner join without non-equal condition is supported for delta joins");
530            }
531        }
532
533        // Usually we do not expect exchange node to be visited here, which should be handled by the
534        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
535        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
536        // node to it, so that the meta service can handle it correctly.
537        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
538            stream_node = state.gen_no_op_stream_node(stream_node);
539        }
540
541        // Visit plan children.
542        stream_node.input = stream_node
543            .input
544            .into_iter()
545            .map(|mut child_node| {
546                match child_node.get_node_body()? {
547                    // When exchange node is generated when doing rewrites, it could be having
548                    // zero input. In this case, we won't recursively visit its children.
549                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
550                    // Exchange node indicates a new child fragment.
551                    NodeBody::Exchange(exchange_node) => {
552                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
553
554                        // Exchange node should have only one input.
555                        let [input]: [_; 1] =
556                            std::mem::take(&mut child_node.input).try_into().unwrap();
557                        let child_fragment = build_and_add_fragment(state, input)?;
558
559                        let result = state.fragment_graph.try_add_edge(
560                            child_fragment.fragment_id,
561                            current_fragment.fragment_id,
562                            StreamFragmentEdge {
563                                dispatch_strategy: exchange_node_strategy.clone(),
564                                // Always use the exchange operator id as the link id.
565                                link_id: child_node.operator_id.as_raw_id(),
566                            },
567                        );
568
569                        // It's possible that there're multiple edges between two fragments, while the
570                        // meta service and the compute node does not expect this. In this case, we
571                        // manually insert a fragment of `NoOp` between the two fragments.
572                        if result.is_err() {
573                            // Assign a new operator id for the `Exchange`, so we can distinguish it
574                            // from duplicate edges and break the sharing.
575                            child_node.operator_id = state.gen_operator_id();
576
577                            // Take the upstream plan node as the reference for properties of `NoOp`.
578                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
579                            let no_shuffle_strategy = DispatchStrategy {
580                                r#type: DispatcherType::NoShuffle as i32,
581                                dist_key_indices: vec![],
582                                output_mapping: PbDispatchOutputMapping::identical(
583                                    ref_fragment_node.fields.len(),
584                                )
585                                .into(),
586                            };
587
588                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
589
590                            let no_op_fragment = {
591                                let node = state.gen_no_op_stream_node(StreamNode {
592                                    operator_id: no_shuffle_exchange_operator_id,
593                                    identity: "StreamNoShuffleExchange".into(),
594                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
595                                        strategy: Some(no_shuffle_strategy.clone()),
596                                    }))),
597                                    input: vec![],
598
599                                    // Take reference's properties.
600                                    stream_key: ref_fragment_node.stream_key.clone(),
601                                    stream_kind: ref_fragment_node.stream_kind,
602                                    fields: ref_fragment_node.fields.clone(),
603                                });
604
605                                let mut fragment = state.new_stream_fragment();
606                                fragment.node = Some(node.into());
607                                Rc::new(fragment)
608                            };
609
610                            state.fragment_graph.add_fragment(no_op_fragment.clone());
611
612                            state.fragment_graph.add_edge(
613                                child_fragment.fragment_id,
614                                no_op_fragment.fragment_id,
615                                StreamFragmentEdge {
616                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
617                                    dispatch_strategy: no_shuffle_strategy,
618                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
619                                },
620                            );
621                            state.fragment_graph.add_edge(
622                                no_op_fragment.fragment_id,
623                                current_fragment.fragment_id,
624                                StreamFragmentEdge {
625                                    // Use the original exchange strategy for downstream edge.
626                                    dispatch_strategy: exchange_node_strategy,
627                                    link_id: child_node.operator_id.as_raw_id(),
628                                },
629                            );
630                        }
631
632                        Ok(child_node)
633                    }
634
635                    // For other children, visit recursively.
636                    _ => build_fragment(state, current_fragment, child_node),
637                }
638            })
639            .collect::<Result<_>>()?;
640        Ok(stream_node)
641    })
642}