risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::plan_common::JoinType;
35use risingwave_pb::stream_plan::{
36    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
37    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
38    StreamNode, StreamScanType,
39};
40
41use self::rewrite::build_delta_join_without_arrange;
42use crate::catalog::FragmentId;
43use crate::error::ErrorCode::NotSupported;
44use crate::error::{Result, RwError};
45use crate::optimizer::plan_node::generic::GenericPlanRef;
46use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
47use crate::stream_fragmenter::parallelism::derive_parallelism;
48
49/// The mutable state when building fragment graph.
50#[derive(Educe)]
51#[educe(Default)]
52pub struct BuildFragmentGraphState {
53    /// fragment graph field, transformed from input streaming plan.
54    fragment_graph: StreamFragmentGraph,
55    /// local fragment id
56    next_local_fragment_id: FragmentId,
57
58    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
59    /// node traversing.
60    next_table_id: u32,
61
62    /// rewrite will produce new operators, and we need to track next operator id
63    #[educe(Default(expression = u32::MAX - 1))]
64    next_operator_id: u32,
65
66    /// dependent streaming job ids.
67    dependent_table_ids: HashSet<TableId>,
68
69    /// operator id to `LocalFragmentId` mapping used by share operator.
70    share_mapping: HashMap<u32, LocalFragmentId>,
71    /// operator id to `StreamNode` mapping used by share operator.
72    share_stream_node_mapping: HashMap<u32, StreamNode>,
73
74    has_source_backfill: bool,
75    has_snapshot_backfill: bool,
76    has_cross_db_snapshot_backfill: bool,
77    tables: HashMap<TableId, Table>,
78}
79
80impl BuildFragmentGraphState {
81    /// Create a new stream fragment with given node with generating a fragment id.
82    fn new_stream_fragment(&mut self) -> StreamFragment {
83        let fragment = StreamFragment::new(self.next_local_fragment_id);
84        self.next_local_fragment_id += 1;
85        fragment
86    }
87
88    /// Generate an operator id
89    fn gen_operator_id(&mut self) -> u32 {
90        self.next_operator_id -= 1;
91        self.next_operator_id
92    }
93
94    /// Generate an table id
95    pub fn gen_table_id(&mut self) -> u32 {
96        let ret = self.next_table_id;
97        self.next_table_id += 1;
98        ret
99    }
100
101    /// Generate an table id
102    pub fn gen_table_id_wrapped(&mut self) -> TableId {
103        TableId::new(self.gen_table_id())
104    }
105
106    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
107        self.share_stream_node_mapping
108            .insert(operator_id, stream_node);
109    }
110
111    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
112        self.share_stream_node_mapping.get(&operator_id)
113    }
114
115    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
116    /// stream node will also be copied from the `input` node.
117    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
118        StreamNode {
119            operator_id: self.gen_operator_id() as u64,
120            identity: "StreamNoOp".into(),
121            node_body: Some(NodeBody::NoOp(NoOpNode {})),
122
123            // Take input's properties.
124            stream_key: input.stream_key.clone(),
125            stream_kind: input.stream_kind,
126            fields: input.fields.clone(),
127
128            input: vec![input],
129        }
130    }
131}
132
133// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
134pub enum GraphJobType {
135    Table,
136    MaterializedView,
137    Source,
138    Sink,
139    Index,
140}
141
142impl GraphJobType {
143    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
144        match self {
145            GraphJobType::Table => config.streaming_parallelism_for_table(),
146            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
147            GraphJobType::Source => config.streaming_parallelism_for_source(),
148            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
149            GraphJobType::Index => config.streaming_parallelism_for_index(),
150        }
151    }
152}
153
154pub fn build_graph(
155    plan_node: PlanRef,
156    job_type: Option<GraphJobType>,
157) -> Result<StreamFragmentGraphProto> {
158    build_graph_with_strategy(plan_node, job_type, None)
159}
160
161pub fn build_graph_with_strategy(
162    plan_node: PlanRef,
163    job_type: Option<GraphJobType>,
164    backfill_order: Option<BackfillOrder>,
165) -> Result<StreamFragmentGraphProto> {
166    let ctx = plan_node.plan_base().ctx();
167    let plan_node = reorganize_elements_id(plan_node);
168
169    let mut state = BuildFragmentGraphState::default();
170    let stream_node = plan_node.to_stream_prost(&mut state)?;
171    generate_fragment_graph(&mut state, stream_node)?;
172    if state.has_source_backfill && state.has_snapshot_backfill {
173        return Err(RwError::from(NotSupported(
174            "Snapshot backfill with shared source backfill is not supported".to_owned(),
175            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
176                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
177                .to_owned(),
178        )));
179    }
180    if state.has_cross_db_snapshot_backfill
181        && let Some(ref backfill_order) = backfill_order
182        && !backfill_order.order.is_empty()
183    {
184        return Err(RwError::from(NotSupported(
185            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
186            "Please remove backfill order specification from your query".to_owned(),
187        )));
188    }
189
190    let mut fragment_graph = state.fragment_graph.to_protobuf();
191
192    // Set table ids.
193    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
194    fragment_graph.table_ids_cnt = state.next_table_id;
195
196    // Set parallelism and vnode count.
197    {
198        let config = ctx.session_ctx().config();
199        fragment_graph.parallelism = derive_parallelism(
200            job_type.map(|t| t.to_parallelism(config.deref())),
201            config.streaming_parallelism(),
202        );
203        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
204    }
205
206    // Set context for this streaming job.
207    let config_override = ctx
208        .session_ctx()
209        .config()
210        .to_initial_streaming_config_override()
211        .context("invalid initial streaming config override")?;
212    fragment_graph.ctx = Some(StreamContext {
213        timezone: ctx.get_session_timezone(),
214        config_override,
215    });
216
217    fragment_graph.backfill_order = backfill_order;
218
219    Ok(fragment_graph)
220}
221
222#[cfg(any())]
223fn is_stateful_executor(stream_node: &StreamNode) -> bool {
224    matches!(
225        stream_node.get_node_body().unwrap(),
226        NodeBody::HashAgg(_)
227            | NodeBody::HashJoin(_)
228            | NodeBody::DeltaIndexJoin(_)
229            | NodeBody::StreamScan(_)
230            | NodeBody::StreamCdcScan(_)
231            | NodeBody::DynamicFilter(_)
232    )
233}
234
235/// Do some dirty rewrites before building the fragments.
236/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
237/// throughput) into multiple fragments, which may help improve the I/O concurrency.
238/// Known as "no-shuffle exchange" or "1v1 exchange".
239#[cfg(any())]
240fn rewrite_stream_node(
241    state: &mut BuildFragmentGraphState,
242    stream_node: StreamNode,
243    insert_exchange_flag: bool,
244) -> Result<StreamNode> {
245    let f = |child| {
246        // For stateful operators, set `exchange_flag = true`. If it's already true,
247        // force add an exchange.
248        if is_stateful_executor(&child) {
249            if insert_exchange_flag {
250                let child_node = rewrite_stream_node(state, child, true)?;
251
252                let strategy = DispatchStrategy {
253                    r#type: DispatcherType::NoShuffle.into(),
254                    dist_key_indices: vec![], // TODO: use distribution key
255                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
256                };
257                Ok(StreamNode {
258                    stream_key: child_node.stream_key.clone(),
259                    fields: child_node.fields.clone(),
260                    node_body: Some(NodeBody::Exchange(ExchangeNode {
261                        strategy: Some(strategy),
262                    })),
263                    operator_id: state.gen_operator_id() as u64,
264                    append_only: child_node.append_only,
265                    input: vec![child_node],
266                    identity: "Exchange (NoShuffle)".to_string(),
267                })
268            } else {
269                rewrite_stream_node(state, child, true)
270            }
271        } else {
272            match child.get_node_body()? {
273                // For exchanges, reset the flag.
274                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
275                // Otherwise, recursively visit the children.
276                _ => rewrite_stream_node(state, child, insert_exchange_flag),
277            }
278        }
279    };
280    Ok(StreamNode {
281        input: stream_node
282            .input
283            .into_iter()
284            .map(f)
285            .collect::<Result<_>>()?,
286        ..stream_node
287    })
288}
289
290/// Generate fragment DAG from input streaming plan by their dependency.
291fn generate_fragment_graph(
292    state: &mut BuildFragmentGraphState,
293    stream_node: StreamNode,
294) -> Result<()> {
295    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
296    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
297    // #4614
298    #[cfg(any())]
299    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
300
301    build_and_add_fragment(state, stream_node)?;
302    Ok(())
303}
304
305/// Use the given `stream_node` to create a fragment and add it to graph.
306fn build_and_add_fragment(
307    state: &mut BuildFragmentGraphState,
308    stream_node: StreamNode,
309) -> Result<Rc<StreamFragment>> {
310    let operator_id = stream_node.operator_id as u32;
311    match state.share_mapping.get(&operator_id) {
312        None => {
313            let mut fragment = state.new_stream_fragment();
314            let node = build_fragment(state, &mut fragment, stream_node)?;
315
316            // It's possible that the stream node is rewritten while building the fragment, for
317            // example, empty fragment to no-op fragment. We get the operator id again instead of
318            // using the original one.
319            let operator_id = node.operator_id as u32;
320
321            assert!(fragment.node.is_none());
322            fragment.node = Some(Box::new(node));
323            let fragment_ref = Rc::new(fragment);
324
325            state.fragment_graph.add_fragment(fragment_ref.clone());
326            state
327                .share_mapping
328                .insert(operator_id, fragment_ref.fragment_id);
329            Ok(fragment_ref)
330        }
331        Some(fragment_id) => Ok(state
332            .fragment_graph
333            .get_fragment(fragment_id)
334            .unwrap()
335            .clone()),
336    }
337}
338
339/// Build new fragment and link dependencies by visiting children recursively, update
340/// `requires_singleton` and `fragment_type` properties for current fragment.
341fn build_fragment(
342    state: &mut BuildFragmentGraphState,
343    current_fragment: &mut StreamFragment,
344    mut stream_node: StreamNode,
345) -> Result<StreamNode> {
346    recursive::tracker!().recurse(|_t| {
347        // Update current fragment based on the node we're visiting.
348        match stream_node.get_node_body()? {
349            NodeBody::BarrierRecv(_) => current_fragment
350                .fragment_type_mask
351                .add(FragmentTypeFlag::BarrierRecv),
352
353            NodeBody::Source(node) => {
354                current_fragment
355                    .fragment_type_mask
356                    .add(FragmentTypeFlag::Source);
357
358                if let Some(source) = node.source_inner.as_ref()
359                    && let Some(source_info) = source.info.as_ref()
360                    && ((source_info.is_shared() && !source_info.is_distributed)
361                        || source.with_properties.requires_singleton())
362                {
363                    current_fragment.requires_singleton = true;
364                }
365            }
366
367            NodeBody::Dml(_) => {
368                current_fragment
369                    .fragment_type_mask
370                    .add(FragmentTypeFlag::Dml);
371            }
372
373            NodeBody::Materialize(_) => {
374                current_fragment
375                    .fragment_type_mask
376                    .add(FragmentTypeFlag::Mview);
377            }
378
379            NodeBody::Sink(_) => current_fragment
380                .fragment_type_mask
381                .add(FragmentTypeFlag::Sink),
382
383            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
384
385            NodeBody::EowcGapFill(node) => {
386                let table = node.buffer_table.as_ref().unwrap().clone();
387                state.tables.insert(table.id, table);
388                let table = node.prev_row_table.as_ref().unwrap().clone();
389                state.tables.insert(table.id, table);
390            }
391
392            NodeBody::GapFill(node) => {
393                let table = node.state_table.as_ref().unwrap().clone();
394                state.tables.insert(table.id, table);
395            }
396
397            NodeBody::StreamScan(node) => {
398                current_fragment
399                    .fragment_type_mask
400                    .add(FragmentTypeFlag::StreamScan);
401                match node.stream_scan_type() {
402                    StreamScanType::SnapshotBackfill => {
403                        current_fragment
404                            .fragment_type_mask
405                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
406                        state.has_snapshot_backfill = true;
407                    }
408                    StreamScanType::CrossDbSnapshotBackfill => {
409                        current_fragment
410                            .fragment_type_mask
411                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
412                        state.has_cross_db_snapshot_backfill = true;
413                    }
414                    StreamScanType::Unspecified
415                    | StreamScanType::Chain
416                    | StreamScanType::Rearrange
417                    | StreamScanType::Backfill
418                    | StreamScanType::UpstreamOnly
419                    | StreamScanType::ArrangementBackfill => {}
420                }
421                // memorize table id for later use
422                // The table id could be a upstream CDC source
423                state.dependent_table_ids.insert(node.table_id);
424
425                // Add state table if present
426                if let Some(state_table) = &node.state_table {
427                    let table = state_table.clone();
428                    state.tables.insert(table.id, table);
429                }
430            }
431
432            NodeBody::StreamCdcScan(node) => {
433                if let Some(o) = node.options
434                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
435                {
436                    // Use parallel CDC backfill.
437                    current_fragment
438                        .fragment_type_mask
439                        .add(FragmentTypeFlag::StreamCdcScan);
440                } else {
441                    current_fragment
442                        .fragment_type_mask
443                        .add(FragmentTypeFlag::StreamScan);
444                    // the backfill algorithm is not parallel safe
445                    current_fragment.requires_singleton = true;
446                }
447                state.has_source_backfill = true;
448            }
449
450            NodeBody::CdcFilter(node) => {
451                current_fragment
452                    .fragment_type_mask
453                    .add(FragmentTypeFlag::CdcFilter);
454                // memorize upstream source id for later use
455                state
456                    .dependent_table_ids
457                    .insert(node.upstream_source_id.as_cdc_table_id());
458            }
459            NodeBody::SourceBackfill(node) => {
460                current_fragment
461                    .fragment_type_mask
462                    .add(FragmentTypeFlag::SourceScan);
463                // memorize upstream source id for later use
464                let source_id = node.upstream_source_id;
465                state
466                    .dependent_table_ids
467                    .insert(source_id.as_cdc_table_id());
468                state.has_source_backfill = true;
469            }
470
471            NodeBody::Now(_) => {
472                // TODO: Remove this and insert a `BarrierRecv` instead.
473                current_fragment
474                    .fragment_type_mask
475                    .add(FragmentTypeFlag::Now);
476                current_fragment.requires_singleton = true;
477            }
478
479            NodeBody::Values(_) => {
480                current_fragment
481                    .fragment_type_mask
482                    .add(FragmentTypeFlag::Values);
483                current_fragment.requires_singleton = true;
484            }
485
486            NodeBody::StreamFsFetch(_) => {
487                current_fragment
488                    .fragment_type_mask
489                    .add(FragmentTypeFlag::FsFetch);
490            }
491
492            NodeBody::VectorIndexWrite(_) => {
493                current_fragment
494                    .fragment_type_mask
495                    .add(FragmentTypeFlag::VectorIndexWrite);
496            }
497
498            NodeBody::UpstreamSinkUnion(_) => {
499                current_fragment
500                    .fragment_type_mask
501                    .add(FragmentTypeFlag::UpstreamSinkUnion);
502            }
503
504            NodeBody::LocalityProvider(_) => {
505                current_fragment
506                    .fragment_type_mask
507                    .add(FragmentTypeFlag::LocalityProvider);
508            }
509
510            _ => {}
511        };
512
513        // handle join logic
514        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
515        {
516            if delta_index_join.get_join_type()? == JoinType::Inner
517                && delta_index_join.condition.is_none()
518            {
519                return build_delta_join_without_arrange(state, current_fragment, stream_node);
520            } else {
521                panic!("only inner join without non-equal condition is supported for delta joins");
522            }
523        }
524
525        // Usually we do not expect exchange node to be visited here, which should be handled by the
526        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
527        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
528        // node to it, so that the meta service can handle it correctly.
529        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
530            stream_node = state.gen_no_op_stream_node(stream_node);
531        }
532
533        // Visit plan children.
534        stream_node.input = stream_node
535            .input
536            .into_iter()
537            .map(|mut child_node| {
538                match child_node.get_node_body()? {
539                    // When exchange node is generated when doing rewrites, it could be having
540                    // zero input. In this case, we won't recursively visit its children.
541                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
542                    // Exchange node indicates a new child fragment.
543                    NodeBody::Exchange(exchange_node) => {
544                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
545
546                        // Exchange node should have only one input.
547                        let [input]: [_; 1] =
548                            std::mem::take(&mut child_node.input).try_into().unwrap();
549                        let child_fragment = build_and_add_fragment(state, input)?;
550
551                        let result = state.fragment_graph.try_add_edge(
552                            child_fragment.fragment_id,
553                            current_fragment.fragment_id,
554                            StreamFragmentEdge {
555                                dispatch_strategy: exchange_node_strategy.clone(),
556                                // Always use the exchange operator id as the link id.
557                                link_id: child_node.operator_id,
558                            },
559                        );
560
561                        // It's possible that there're multiple edges between two fragments, while the
562                        // meta service and the compute node does not expect this. In this case, we
563                        // manually insert a fragment of `NoOp` between the two fragments.
564                        if result.is_err() {
565                            // Assign a new operator id for the `Exchange`, so we can distinguish it
566                            // from duplicate edges and break the sharing.
567                            child_node.operator_id = state.gen_operator_id() as u64;
568
569                            // Take the upstream plan node as the reference for properties of `NoOp`.
570                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
571                            let no_shuffle_strategy = DispatchStrategy {
572                                r#type: DispatcherType::NoShuffle as i32,
573                                dist_key_indices: vec![],
574                                output_mapping: PbDispatchOutputMapping::identical(
575                                    ref_fragment_node.fields.len(),
576                                )
577                                .into(),
578                            };
579
580                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
581
582                            let no_op_fragment = {
583                                let node = state.gen_no_op_stream_node(StreamNode {
584                                    operator_id: no_shuffle_exchange_operator_id,
585                                    identity: "StreamNoShuffleExchange".into(),
586                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
587                                        strategy: Some(no_shuffle_strategy.clone()),
588                                    }))),
589                                    input: vec![],
590
591                                    // Take reference's properties.
592                                    stream_key: ref_fragment_node.stream_key.clone(),
593                                    stream_kind: ref_fragment_node.stream_kind,
594                                    fields: ref_fragment_node.fields.clone(),
595                                });
596
597                                let mut fragment = state.new_stream_fragment();
598                                fragment.node = Some(node.into());
599                                Rc::new(fragment)
600                            };
601
602                            state.fragment_graph.add_fragment(no_op_fragment.clone());
603
604                            state.fragment_graph.add_edge(
605                                child_fragment.fragment_id,
606                                no_op_fragment.fragment_id,
607                                StreamFragmentEdge {
608                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
609                                    dispatch_strategy: no_shuffle_strategy,
610                                    link_id: no_shuffle_exchange_operator_id,
611                                },
612                            );
613                            state.fragment_graph.add_edge(
614                                no_op_fragment.fragment_id,
615                                current_fragment.fragment_id,
616                                StreamFragmentEdge {
617                                    // Use the original exchange strategy for downstream edge.
618                                    dispatch_strategy: exchange_node_strategy,
619                                    link_id: child_node.operator_id,
620                                },
621                            );
622                        }
623
624                        Ok(child_node)
625                    }
626
627                    // For other children, visit recursively.
628                    _ => build_fragment(state, current_fragment, child_node),
629                }
630            })
631            .collect::<Result<_>>()?;
632        Ok(stream_node)
633    })
634}