risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_connector::source::cdc::CdcScanOptions;
33use risingwave_pb::plan_common::JoinType;
34use risingwave_pb::stream_plan::{
35    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
36    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
37    StreamNode, StreamScanType,
38};
39
40use self::rewrite::build_delta_join_without_arrange;
41use crate::catalog::FragmentId;
42use crate::error::ErrorCode::NotSupported;
43use crate::error::{Result, RwError};
44use crate::optimizer::plan_node::generic::GenericPlanRef;
45use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
46use crate::stream_fragmenter::parallelism::derive_parallelism;
47
48/// The mutable state when building fragment graph.
49#[derive(Educe)]
50#[educe(Default)]
51pub struct BuildFragmentGraphState {
52    /// fragment graph field, transformed from input streaming plan.
53    fragment_graph: StreamFragmentGraph,
54    /// local fragment id
55    next_local_fragment_id: FragmentId,
56
57    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
58    /// node traversing.
59    next_table_id: u32,
60
61    /// rewrite will produce new operators, and we need to track next operator id
62    #[educe(Default(expression = u32::MAX - 1))]
63    next_operator_id: u32,
64
65    /// dependent streaming job ids.
66    dependent_table_ids: HashSet<TableId>,
67
68    /// operator id to `LocalFragmentId` mapping used by share operator.
69    share_mapping: HashMap<u32, LocalFragmentId>,
70    /// operator id to `StreamNode` mapping used by share operator.
71    share_stream_node_mapping: HashMap<u32, StreamNode>,
72
73    has_source_backfill: bool,
74    has_snapshot_backfill: bool,
75    has_cross_db_snapshot_backfill: bool,
76    tables: HashMap<TableId, Table>,
77}
78
79impl BuildFragmentGraphState {
80    /// Create a new stream fragment with given node with generating a fragment id.
81    fn new_stream_fragment(&mut self) -> StreamFragment {
82        let fragment = StreamFragment::new(self.next_local_fragment_id);
83        self.next_local_fragment_id += 1;
84        fragment
85    }
86
87    /// Generate an operator id
88    fn gen_operator_id(&mut self) -> u32 {
89        self.next_operator_id -= 1;
90        self.next_operator_id
91    }
92
93    /// Generate an table id
94    pub fn gen_table_id(&mut self) -> u32 {
95        let ret = self.next_table_id;
96        self.next_table_id += 1;
97        ret
98    }
99
100    /// Generate an table id
101    pub fn gen_table_id_wrapped(&mut self) -> TableId {
102        TableId::new(self.gen_table_id())
103    }
104
105    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
106        self.share_stream_node_mapping
107            .insert(operator_id, stream_node);
108    }
109
110    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
111        self.share_stream_node_mapping.get(&operator_id)
112    }
113
114    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
115    /// stream node will also be copied from the `input` node.
116    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
117        StreamNode {
118            operator_id: self.gen_operator_id() as u64,
119            identity: "StreamNoOp".into(),
120            node_body: Some(NodeBody::NoOp(NoOpNode {})),
121
122            // Take input's properties.
123            stream_key: input.stream_key.clone(),
124            stream_kind: input.stream_kind,
125            fields: input.fields.clone(),
126
127            input: vec![input],
128        }
129    }
130}
131
132// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
133pub enum GraphJobType {
134    Table,
135    MaterializedView,
136    Source,
137    Sink,
138    Index,
139}
140
141impl GraphJobType {
142    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
143        match self {
144            GraphJobType::Table => config.streaming_parallelism_for_table(),
145            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
146            GraphJobType::Source => config.streaming_parallelism_for_source(),
147            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
148            GraphJobType::Index => config.streaming_parallelism_for_index(),
149        }
150    }
151}
152
153pub fn build_graph(
154    plan_node: PlanRef,
155    job_type: Option<GraphJobType>,
156) -> Result<StreamFragmentGraphProto> {
157    build_graph_with_strategy(plan_node, job_type, None)
158}
159
160pub fn build_graph_with_strategy(
161    plan_node: PlanRef,
162    job_type: Option<GraphJobType>,
163    backfill_order: Option<BackfillOrder>,
164) -> Result<StreamFragmentGraphProto> {
165    let ctx = plan_node.plan_base().ctx();
166    let plan_node = reorganize_elements_id(plan_node);
167
168    let mut state = BuildFragmentGraphState::default();
169    let stream_node = plan_node.to_stream_prost(&mut state)?;
170    generate_fragment_graph(&mut state, stream_node)?;
171    if state.has_source_backfill && state.has_snapshot_backfill {
172        return Err(RwError::from(NotSupported(
173            "Snapshot backfill with shared source backfill is not supported".to_owned(),
174            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
175                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
176                .to_owned(),
177        )));
178    }
179    if state.has_cross_db_snapshot_backfill
180        && let Some(ref backfill_order) = backfill_order
181        && !backfill_order.order.is_empty()
182    {
183        return Err(RwError::from(NotSupported(
184            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
185            "Please remove backfill order specification from your query".to_owned(),
186        )));
187    }
188
189    let mut fragment_graph = state.fragment_graph.to_protobuf();
190
191    // Set table ids.
192    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
193    fragment_graph.table_ids_cnt = state.next_table_id;
194
195    // Set parallelism and vnode count.
196    {
197        let config = ctx.session_ctx().config();
198        fragment_graph.parallelism = derive_parallelism(
199            job_type.map(|t| t.to_parallelism(config.deref())),
200            config.streaming_parallelism(),
201        );
202        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
203    }
204
205    // Set timezone.
206    fragment_graph.ctx = Some(StreamContext {
207        timezone: ctx.get_session_timezone(),
208    });
209
210    fragment_graph.backfill_order = backfill_order;
211
212    Ok(fragment_graph)
213}
214
215#[cfg(any())]
216fn is_stateful_executor(stream_node: &StreamNode) -> bool {
217    matches!(
218        stream_node.get_node_body().unwrap(),
219        NodeBody::HashAgg(_)
220            | NodeBody::HashJoin(_)
221            | NodeBody::DeltaIndexJoin(_)
222            | NodeBody::StreamScan(_)
223            | NodeBody::StreamCdcScan(_)
224            | NodeBody::DynamicFilter(_)
225    )
226}
227
228/// Do some dirty rewrites before building the fragments.
229/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
230/// throughput) into multiple fragments, which may help improve the I/O concurrency.
231/// Known as "no-shuffle exchange" or "1v1 exchange".
232#[cfg(any())]
233fn rewrite_stream_node(
234    state: &mut BuildFragmentGraphState,
235    stream_node: StreamNode,
236    insert_exchange_flag: bool,
237) -> Result<StreamNode> {
238    let f = |child| {
239        // For stateful operators, set `exchange_flag = true`. If it's already true,
240        // force add an exchange.
241        if is_stateful_executor(&child) {
242            if insert_exchange_flag {
243                let child_node = rewrite_stream_node(state, child, true)?;
244
245                let strategy = DispatchStrategy {
246                    r#type: DispatcherType::NoShuffle.into(),
247                    dist_key_indices: vec![], // TODO: use distribution key
248                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
249                };
250                Ok(StreamNode {
251                    stream_key: child_node.stream_key.clone(),
252                    fields: child_node.fields.clone(),
253                    node_body: Some(NodeBody::Exchange(ExchangeNode {
254                        strategy: Some(strategy),
255                    })),
256                    operator_id: state.gen_operator_id() as u64,
257                    append_only: child_node.append_only,
258                    input: vec![child_node],
259                    identity: "Exchange (NoShuffle)".to_string(),
260                })
261            } else {
262                rewrite_stream_node(state, child, true)
263            }
264        } else {
265            match child.get_node_body()? {
266                // For exchanges, reset the flag.
267                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
268                // Otherwise, recursively visit the children.
269                _ => rewrite_stream_node(state, child, insert_exchange_flag),
270            }
271        }
272    };
273    Ok(StreamNode {
274        input: stream_node
275            .input
276            .into_iter()
277            .map(f)
278            .collect::<Result<_>>()?,
279        ..stream_node
280    })
281}
282
283/// Generate fragment DAG from input streaming plan by their dependency.
284fn generate_fragment_graph(
285    state: &mut BuildFragmentGraphState,
286    stream_node: StreamNode,
287) -> Result<()> {
288    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
289    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
290    // #4614
291    #[cfg(any())]
292    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
293
294    build_and_add_fragment(state, stream_node)?;
295    Ok(())
296}
297
298/// Use the given `stream_node` to create a fragment and add it to graph.
299fn build_and_add_fragment(
300    state: &mut BuildFragmentGraphState,
301    stream_node: StreamNode,
302) -> Result<Rc<StreamFragment>> {
303    let operator_id = stream_node.operator_id as u32;
304    match state.share_mapping.get(&operator_id) {
305        None => {
306            let mut fragment = state.new_stream_fragment();
307            let node = build_fragment(state, &mut fragment, stream_node)?;
308
309            // It's possible that the stream node is rewritten while building the fragment, for
310            // example, empty fragment to no-op fragment. We get the operator id again instead of
311            // using the original one.
312            let operator_id = node.operator_id as u32;
313
314            assert!(fragment.node.is_none());
315            fragment.node = Some(Box::new(node));
316            let fragment_ref = Rc::new(fragment);
317
318            state.fragment_graph.add_fragment(fragment_ref.clone());
319            state
320                .share_mapping
321                .insert(operator_id, fragment_ref.fragment_id);
322            Ok(fragment_ref)
323        }
324        Some(fragment_id) => Ok(state
325            .fragment_graph
326            .get_fragment(fragment_id)
327            .unwrap()
328            .clone()),
329    }
330}
331
332/// Build new fragment and link dependencies by visiting children recursively, update
333/// `requires_singleton` and `fragment_type` properties for current fragment.
334fn build_fragment(
335    state: &mut BuildFragmentGraphState,
336    current_fragment: &mut StreamFragment,
337    mut stream_node: StreamNode,
338) -> Result<StreamNode> {
339    recursive::tracker!().recurse(|_t| {
340        // Update current fragment based on the node we're visiting.
341        match stream_node.get_node_body()? {
342            NodeBody::BarrierRecv(_) => current_fragment
343                .fragment_type_mask
344                .add(FragmentTypeFlag::BarrierRecv),
345
346            NodeBody::Source(node) => {
347                current_fragment
348                    .fragment_type_mask
349                    .add(FragmentTypeFlag::Source);
350
351                if let Some(source) = node.source_inner.as_ref()
352                    && let Some(source_info) = source.info.as_ref()
353                    && ((source_info.is_shared() && !source_info.is_distributed)
354                        || source.with_properties.requires_singleton())
355                {
356                    current_fragment.requires_singleton = true;
357                }
358            }
359
360            NodeBody::Dml(_) => {
361                current_fragment
362                    .fragment_type_mask
363                    .add(FragmentTypeFlag::Dml);
364            }
365
366            NodeBody::Materialize(_) => {
367                current_fragment
368                    .fragment_type_mask
369                    .add(FragmentTypeFlag::Mview);
370            }
371
372            NodeBody::Sink(_) => current_fragment
373                .fragment_type_mask
374                .add(FragmentTypeFlag::Sink),
375
376            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
377
378            NodeBody::EowcGapFill(node) => {
379                let table = node.buffer_table.as_ref().unwrap().clone();
380                state.tables.insert(table.id, table);
381                let table = node.prev_row_table.as_ref().unwrap().clone();
382                state.tables.insert(table.id, table);
383            }
384
385            NodeBody::GapFill(node) => {
386                let table = node.state_table.as_ref().unwrap().clone();
387                state.tables.insert(table.id, table);
388            }
389
390            NodeBody::StreamScan(node) => {
391                current_fragment
392                    .fragment_type_mask
393                    .add(FragmentTypeFlag::StreamScan);
394                match node.stream_scan_type() {
395                    StreamScanType::SnapshotBackfill => {
396                        current_fragment
397                            .fragment_type_mask
398                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
399                        state.has_snapshot_backfill = true;
400                    }
401                    StreamScanType::CrossDbSnapshotBackfill => {
402                        current_fragment
403                            .fragment_type_mask
404                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
405                        state.has_cross_db_snapshot_backfill = true;
406                    }
407                    StreamScanType::Unspecified
408                    | StreamScanType::Chain
409                    | StreamScanType::Rearrange
410                    | StreamScanType::Backfill
411                    | StreamScanType::UpstreamOnly
412                    | StreamScanType::ArrangementBackfill => {}
413                }
414                // memorize table id for later use
415                // The table id could be a upstream CDC source
416                state.dependent_table_ids.insert(node.table_id);
417
418                // Add state table if present
419                if let Some(state_table) = &node.state_table {
420                    let table = state_table.clone();
421                    state.tables.insert(table.id, table);
422                }
423            }
424
425            NodeBody::StreamCdcScan(node) => {
426                if let Some(o) = node.options
427                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
428                {
429                    // Use parallel CDC backfill.
430                    current_fragment
431                        .fragment_type_mask
432                        .add(FragmentTypeFlag::StreamCdcScan);
433                } else {
434                    current_fragment
435                        .fragment_type_mask
436                        .add(FragmentTypeFlag::StreamScan);
437                    // the backfill algorithm is not parallel safe
438                    current_fragment.requires_singleton = true;
439                }
440                state.has_source_backfill = true;
441            }
442
443            NodeBody::CdcFilter(node) => {
444                current_fragment
445                    .fragment_type_mask
446                    .add(FragmentTypeFlag::CdcFilter);
447                // memorize upstream source id for later use
448                state
449                    .dependent_table_ids
450                    .insert(node.upstream_source_id.as_cdc_table_id());
451            }
452            NodeBody::SourceBackfill(node) => {
453                current_fragment
454                    .fragment_type_mask
455                    .add(FragmentTypeFlag::SourceScan);
456                // memorize upstream source id for later use
457                let source_id = node.upstream_source_id;
458                state
459                    .dependent_table_ids
460                    .insert(source_id.as_cdc_table_id());
461                state.has_source_backfill = true;
462            }
463
464            NodeBody::Now(_) => {
465                // TODO: Remove this and insert a `BarrierRecv` instead.
466                current_fragment
467                    .fragment_type_mask
468                    .add(FragmentTypeFlag::Now);
469                current_fragment.requires_singleton = true;
470            }
471
472            NodeBody::Values(_) => {
473                current_fragment
474                    .fragment_type_mask
475                    .add(FragmentTypeFlag::Values);
476                current_fragment.requires_singleton = true;
477            }
478
479            NodeBody::StreamFsFetch(_) => {
480                current_fragment
481                    .fragment_type_mask
482                    .add(FragmentTypeFlag::FsFetch);
483            }
484
485            NodeBody::VectorIndexWrite(_) => {
486                current_fragment
487                    .fragment_type_mask
488                    .add(FragmentTypeFlag::VectorIndexWrite);
489            }
490
491            NodeBody::UpstreamSinkUnion(_) => {
492                current_fragment
493                    .fragment_type_mask
494                    .add(FragmentTypeFlag::UpstreamSinkUnion);
495            }
496
497            NodeBody::LocalityProvider(_) => {
498                current_fragment
499                    .fragment_type_mask
500                    .add(FragmentTypeFlag::LocalityProvider);
501            }
502
503            _ => {}
504        };
505
506        // handle join logic
507        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
508        {
509            if delta_index_join.get_join_type()? == JoinType::Inner
510                && delta_index_join.condition.is_none()
511            {
512                return build_delta_join_without_arrange(state, current_fragment, stream_node);
513            } else {
514                panic!("only inner join without non-equal condition is supported for delta joins");
515            }
516        }
517
518        // Usually we do not expect exchange node to be visited here, which should be handled by the
519        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
520        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
521        // node to it, so that the meta service can handle it correctly.
522        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
523            stream_node = state.gen_no_op_stream_node(stream_node);
524        }
525
526        // Visit plan children.
527        stream_node.input = stream_node
528            .input
529            .into_iter()
530            .map(|mut child_node| {
531                match child_node.get_node_body()? {
532                    // When exchange node is generated when doing rewrites, it could be having
533                    // zero input. In this case, we won't recursively visit its children.
534                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
535                    // Exchange node indicates a new child fragment.
536                    NodeBody::Exchange(exchange_node) => {
537                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
538
539                        // Exchange node should have only one input.
540                        let [input]: [_; 1] =
541                            std::mem::take(&mut child_node.input).try_into().unwrap();
542                        let child_fragment = build_and_add_fragment(state, input)?;
543
544                        let result = state.fragment_graph.try_add_edge(
545                            child_fragment.fragment_id,
546                            current_fragment.fragment_id,
547                            StreamFragmentEdge {
548                                dispatch_strategy: exchange_node_strategy.clone(),
549                                // Always use the exchange operator id as the link id.
550                                link_id: child_node.operator_id,
551                            },
552                        );
553
554                        // It's possible that there're multiple edges between two fragments, while the
555                        // meta service and the compute node does not expect this. In this case, we
556                        // manually insert a fragment of `NoOp` between the two fragments.
557                        if result.is_err() {
558                            // Assign a new operator id for the `Exchange`, so we can distinguish it
559                            // from duplicate edges and break the sharing.
560                            child_node.operator_id = state.gen_operator_id() as u64;
561
562                            // Take the upstream plan node as the reference for properties of `NoOp`.
563                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
564                            let no_shuffle_strategy = DispatchStrategy {
565                                r#type: DispatcherType::NoShuffle as i32,
566                                dist_key_indices: vec![],
567                                output_mapping: PbDispatchOutputMapping::identical(
568                                    ref_fragment_node.fields.len(),
569                                )
570                                .into(),
571                            };
572
573                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
574
575                            let no_op_fragment = {
576                                let node = state.gen_no_op_stream_node(StreamNode {
577                                    operator_id: no_shuffle_exchange_operator_id,
578                                    identity: "StreamNoShuffleExchange".into(),
579                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
580                                        strategy: Some(no_shuffle_strategy.clone()),
581                                    }))),
582                                    input: vec![],
583
584                                    // Take reference's properties.
585                                    stream_key: ref_fragment_node.stream_key.clone(),
586                                    stream_kind: ref_fragment_node.stream_kind,
587                                    fields: ref_fragment_node.fields.clone(),
588                                });
589
590                                let mut fragment = state.new_stream_fragment();
591                                fragment.node = Some(node.into());
592                                Rc::new(fragment)
593                            };
594
595                            state.fragment_graph.add_fragment(no_op_fragment.clone());
596
597                            state.fragment_graph.add_edge(
598                                child_fragment.fragment_id,
599                                no_op_fragment.fragment_id,
600                                StreamFragmentEdge {
601                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
602                                    dispatch_strategy: no_shuffle_strategy,
603                                    link_id: no_shuffle_exchange_operator_id,
604                                },
605                            );
606                            state.fragment_graph.add_edge(
607                                no_op_fragment.fragment_id,
608                                current_fragment.fragment_id,
609                                StreamFragmentEdge {
610                                    // Use the original exchange strategy for downstream edge.
611                                    dispatch_strategy: exchange_node_strategy,
612                                    link_id: child_node.operator_id,
613                                },
614                            );
615                        }
616
617                        Ok(child_node)
618                    }
619
620                    // For other children, visit recursively.
621                    _ => build_fragment(state, current_fragment, child_node),
622                }
623            })
624            .collect::<Result<_>>()?;
625        Ok(stream_node)
626    })
627}