risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use graph::*;
17use risingwave_common::util::recursive::{self, Recurse as _};
18use risingwave_connector::WithPropertiesExt;
19use risingwave_pb::catalog::Table;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_connector::source::cdc::CdcScanOptions;
33use risingwave_pb::plan_common::JoinType;
34use risingwave_pb::stream_plan::{
35    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
36    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
37    StreamNode, StreamScanType,
38};
39
40use self::rewrite::build_delta_join_without_arrange;
41use crate::error::ErrorCode::NotSupported;
42use crate::error::{Result, RwError};
43use crate::optimizer::plan_node::generic::GenericPlanRef;
44use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
45use crate::stream_fragmenter::parallelism::derive_parallelism;
46
47/// The mutable state when building fragment graph.
48#[derive(Educe)]
49#[educe(Default)]
50pub struct BuildFragmentGraphState {
51    /// fragment graph field, transformed from input streaming plan.
52    fragment_graph: StreamFragmentGraph,
53    /// local fragment id
54    next_local_fragment_id: u32,
55
56    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
57    /// node traversing.
58    next_table_id: u32,
59
60    /// rewrite will produce new operators, and we need to track next operator id
61    #[educe(Default(expression = u32::MAX - 1))]
62    next_operator_id: u32,
63
64    /// dependent streaming job ids.
65    dependent_table_ids: HashSet<TableId>,
66
67    /// operator id to `LocalFragmentId` mapping used by share operator.
68    share_mapping: HashMap<u32, LocalFragmentId>,
69    /// operator id to `StreamNode` mapping used by share operator.
70    share_stream_node_mapping: HashMap<u32, StreamNode>,
71
72    has_source_backfill: bool,
73    has_snapshot_backfill: bool,
74    has_cross_db_snapshot_backfill: bool,
75    tables: HashMap<TableId, Table>,
76}
77
78impl BuildFragmentGraphState {
79    /// Create a new stream fragment with given node with generating a fragment id.
80    fn new_stream_fragment(&mut self) -> StreamFragment {
81        let fragment = StreamFragment::new(self.next_local_fragment_id);
82        self.next_local_fragment_id += 1;
83        fragment
84    }
85
86    /// Generate an operator id
87    fn gen_operator_id(&mut self) -> u32 {
88        self.next_operator_id -= 1;
89        self.next_operator_id
90    }
91
92    /// Generate an table id
93    pub fn gen_table_id(&mut self) -> u32 {
94        let ret = self.next_table_id;
95        self.next_table_id += 1;
96        ret
97    }
98
99    /// Generate an table id
100    pub fn gen_table_id_wrapped(&mut self) -> TableId {
101        TableId::new(self.gen_table_id())
102    }
103
104    pub fn add_share_stream_node(&mut self, operator_id: u32, stream_node: StreamNode) {
105        self.share_stream_node_mapping
106            .insert(operator_id, stream_node);
107    }
108
109    pub fn get_share_stream_node(&mut self, operator_id: u32) -> Option<&StreamNode> {
110        self.share_stream_node_mapping.get(&operator_id)
111    }
112
113    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
114    /// stream node will also be copied from the `input` node.
115    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
116        StreamNode {
117            operator_id: self.gen_operator_id() as u64,
118            identity: "StreamNoOp".into(),
119            node_body: Some(NodeBody::NoOp(NoOpNode {})),
120
121            // Take input's properties.
122            stream_key: input.stream_key.clone(),
123            stream_kind: input.stream_kind,
124            fields: input.fields.clone(),
125
126            input: vec![input],
127        }
128    }
129}
130
131// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
132pub enum GraphJobType {
133    Table,
134    MaterializedView,
135    Source,
136    Sink,
137    Index,
138}
139
140impl GraphJobType {
141    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
142        match self {
143            GraphJobType::Table => config.streaming_parallelism_for_table(),
144            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
145            GraphJobType::Source => config.streaming_parallelism_for_source(),
146            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
147            GraphJobType::Index => config.streaming_parallelism_for_index(),
148        }
149    }
150}
151
152pub fn build_graph(
153    plan_node: PlanRef,
154    job_type: Option<GraphJobType>,
155) -> Result<StreamFragmentGraphProto> {
156    build_graph_with_strategy(plan_node, job_type, None)
157}
158
159pub fn build_graph_with_strategy(
160    plan_node: PlanRef,
161    job_type: Option<GraphJobType>,
162    backfill_order: Option<BackfillOrder>,
163) -> Result<StreamFragmentGraphProto> {
164    let ctx = plan_node.plan_base().ctx();
165    let plan_node = reorganize_elements_id(plan_node);
166
167    let mut state = BuildFragmentGraphState::default();
168    let stream_node = plan_node.to_stream_prost(&mut state)?;
169    generate_fragment_graph(&mut state, stream_node)?;
170    if state.has_source_backfill && state.has_snapshot_backfill {
171        return Err(RwError::from(NotSupported(
172            "Snapshot backfill with shared source backfill is not supported".to_owned(),
173            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
174                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
175                .to_owned(),
176        )));
177    }
178    if state.has_cross_db_snapshot_backfill
179        && let Some(ref backfill_order) = backfill_order
180        && !backfill_order.order.is_empty()
181    {
182        return Err(RwError::from(NotSupported(
183            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
184            "Please remove backfill order specification from your query".to_owned(),
185        )));
186    }
187
188    let mut fragment_graph = state.fragment_graph.to_protobuf();
189
190    // Set table ids.
191    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
192    fragment_graph.table_ids_cnt = state.next_table_id;
193
194    // Set parallelism and vnode count.
195    {
196        let config = ctx.session_ctx().config();
197        fragment_graph.parallelism = derive_parallelism(
198            job_type.map(|t| t.to_parallelism(config.deref())),
199            config.streaming_parallelism(),
200        );
201        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
202    }
203
204    // Set timezone.
205    fragment_graph.ctx = Some(StreamContext {
206        timezone: ctx.get_session_timezone(),
207    });
208
209    fragment_graph.backfill_order = backfill_order;
210
211    Ok(fragment_graph)
212}
213
214#[cfg(any())]
215fn is_stateful_executor(stream_node: &StreamNode) -> bool {
216    matches!(
217        stream_node.get_node_body().unwrap(),
218        NodeBody::HashAgg(_)
219            | NodeBody::HashJoin(_)
220            | NodeBody::DeltaIndexJoin(_)
221            | NodeBody::StreamScan(_)
222            | NodeBody::StreamCdcScan(_)
223            | NodeBody::DynamicFilter(_)
224    )
225}
226
227/// Do some dirty rewrites before building the fragments.
228/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
229/// throughput) into multiple fragments, which may help improve the I/O concurrency.
230/// Known as "no-shuffle exchange" or "1v1 exchange".
231#[cfg(any())]
232fn rewrite_stream_node(
233    state: &mut BuildFragmentGraphState,
234    stream_node: StreamNode,
235    insert_exchange_flag: bool,
236) -> Result<StreamNode> {
237    let f = |child| {
238        // For stateful operators, set `exchange_flag = true`. If it's already true,
239        // force add an exchange.
240        if is_stateful_executor(&child) {
241            if insert_exchange_flag {
242                let child_node = rewrite_stream_node(state, child, true)?;
243
244                let strategy = DispatchStrategy {
245                    r#type: DispatcherType::NoShuffle.into(),
246                    dist_key_indices: vec![], // TODO: use distribution key
247                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
248                };
249                Ok(StreamNode {
250                    stream_key: child_node.stream_key.clone(),
251                    fields: child_node.fields.clone(),
252                    node_body: Some(NodeBody::Exchange(ExchangeNode {
253                        strategy: Some(strategy),
254                    })),
255                    operator_id: state.gen_operator_id() as u64,
256                    append_only: child_node.append_only,
257                    input: vec![child_node],
258                    identity: "Exchange (NoShuffle)".to_string(),
259                })
260            } else {
261                rewrite_stream_node(state, child, true)
262            }
263        } else {
264            match child.get_node_body()? {
265                // For exchanges, reset the flag.
266                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
267                // Otherwise, recursively visit the children.
268                _ => rewrite_stream_node(state, child, insert_exchange_flag),
269            }
270        }
271    };
272    Ok(StreamNode {
273        input: stream_node
274            .input
275            .into_iter()
276            .map(f)
277            .collect::<Result<_>>()?,
278        ..stream_node
279    })
280}
281
282/// Generate fragment DAG from input streaming plan by their dependency.
283fn generate_fragment_graph(
284    state: &mut BuildFragmentGraphState,
285    stream_node: StreamNode,
286) -> Result<()> {
287    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
288    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
289    // #4614
290    #[cfg(any())]
291    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
292
293    build_and_add_fragment(state, stream_node)?;
294    Ok(())
295}
296
297/// Use the given `stream_node` to create a fragment and add it to graph.
298fn build_and_add_fragment(
299    state: &mut BuildFragmentGraphState,
300    stream_node: StreamNode,
301) -> Result<Rc<StreamFragment>> {
302    let operator_id = stream_node.operator_id as u32;
303    match state.share_mapping.get(&operator_id) {
304        None => {
305            let mut fragment = state.new_stream_fragment();
306            let node = build_fragment(state, &mut fragment, stream_node)?;
307
308            // It's possible that the stream node is rewritten while building the fragment, for
309            // example, empty fragment to no-op fragment. We get the operator id again instead of
310            // using the original one.
311            let operator_id = node.operator_id as u32;
312
313            assert!(fragment.node.is_none());
314            fragment.node = Some(Box::new(node));
315            let fragment_ref = Rc::new(fragment);
316
317            state.fragment_graph.add_fragment(fragment_ref.clone());
318            state
319                .share_mapping
320                .insert(operator_id, fragment_ref.fragment_id);
321            Ok(fragment_ref)
322        }
323        Some(fragment_id) => Ok(state
324            .fragment_graph
325            .get_fragment(fragment_id)
326            .unwrap()
327            .clone()),
328    }
329}
330
331/// Build new fragment and link dependencies by visiting children recursively, update
332/// `requires_singleton` and `fragment_type` properties for current fragment.
333fn build_fragment(
334    state: &mut BuildFragmentGraphState,
335    current_fragment: &mut StreamFragment,
336    mut stream_node: StreamNode,
337) -> Result<StreamNode> {
338    recursive::tracker!().recurse(|_t| {
339        // Update current fragment based on the node we're visiting.
340        match stream_node.get_node_body()? {
341            NodeBody::BarrierRecv(_) => current_fragment
342                .fragment_type_mask
343                .add(FragmentTypeFlag::BarrierRecv),
344
345            NodeBody::Source(node) => {
346                current_fragment
347                    .fragment_type_mask
348                    .add(FragmentTypeFlag::Source);
349
350                if let Some(source) = node.source_inner.as_ref()
351                    && let Some(source_info) = source.info.as_ref()
352                    && ((source_info.is_shared() && !source_info.is_distributed)
353                        || source.with_properties.requires_singleton())
354                {
355                    current_fragment.requires_singleton = true;
356                }
357            }
358
359            NodeBody::Dml(_) => {
360                current_fragment
361                    .fragment_type_mask
362                    .add(FragmentTypeFlag::Dml);
363            }
364
365            NodeBody::Materialize(_) => {
366                current_fragment
367                    .fragment_type_mask
368                    .add(FragmentTypeFlag::Mview);
369            }
370
371            NodeBody::Sink(_) => current_fragment
372                .fragment_type_mask
373                .add(FragmentTypeFlag::Sink),
374
375            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
376
377            NodeBody::EowcGapFill(node) => {
378                let table = node.buffer_table.as_ref().unwrap().clone();
379                state.tables.insert(table.id, table);
380                let table = node.prev_row_table.as_ref().unwrap().clone();
381                state.tables.insert(table.id, table);
382            }
383
384            NodeBody::GapFill(node) => {
385                let table = node.state_table.as_ref().unwrap().clone();
386                state.tables.insert(table.id, table);
387            }
388
389            NodeBody::StreamScan(node) => {
390                current_fragment
391                    .fragment_type_mask
392                    .add(FragmentTypeFlag::StreamScan);
393                match node.stream_scan_type() {
394                    StreamScanType::SnapshotBackfill => {
395                        current_fragment
396                            .fragment_type_mask
397                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
398                        state.has_snapshot_backfill = true;
399                    }
400                    StreamScanType::CrossDbSnapshotBackfill => {
401                        current_fragment
402                            .fragment_type_mask
403                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
404                        state.has_cross_db_snapshot_backfill = true;
405                    }
406                    StreamScanType::Unspecified
407                    | StreamScanType::Chain
408                    | StreamScanType::Rearrange
409                    | StreamScanType::Backfill
410                    | StreamScanType::UpstreamOnly
411                    | StreamScanType::ArrangementBackfill => {}
412                }
413                // memorize table id for later use
414                // The table id could be a upstream CDC source
415                state.dependent_table_ids.insert(node.table_id);
416
417                // Add state table if present
418                if let Some(state_table) = &node.state_table {
419                    let table = state_table.clone();
420                    state.tables.insert(table.id, table);
421                }
422            }
423
424            NodeBody::StreamCdcScan(node) => {
425                if let Some(o) = node.options
426                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
427                {
428                    // Use parallel CDC backfill.
429                    current_fragment
430                        .fragment_type_mask
431                        .add(FragmentTypeFlag::StreamCdcScan);
432                } else {
433                    current_fragment
434                        .fragment_type_mask
435                        .add(FragmentTypeFlag::StreamScan);
436                    // the backfill algorithm is not parallel safe
437                    current_fragment.requires_singleton = true;
438                }
439                state.has_source_backfill = true;
440            }
441
442            NodeBody::CdcFilter(node) => {
443                current_fragment
444                    .fragment_type_mask
445                    .add(FragmentTypeFlag::CdcFilter);
446                // memorize upstream source id for later use
447                state
448                    .dependent_table_ids
449                    .insert(node.upstream_source_id.into());
450            }
451            NodeBody::SourceBackfill(node) => {
452                current_fragment
453                    .fragment_type_mask
454                    .add(FragmentTypeFlag::SourceScan);
455                // memorize upstream source id for later use
456                let source_id = node.upstream_source_id;
457                state.dependent_table_ids.insert(source_id.into());
458                state.has_source_backfill = true;
459            }
460
461            NodeBody::Now(_) => {
462                // TODO: Remove this and insert a `BarrierRecv` instead.
463                current_fragment
464                    .fragment_type_mask
465                    .add(FragmentTypeFlag::Now);
466                current_fragment.requires_singleton = true;
467            }
468
469            NodeBody::Values(_) => {
470                current_fragment
471                    .fragment_type_mask
472                    .add(FragmentTypeFlag::Values);
473                current_fragment.requires_singleton = true;
474            }
475
476            NodeBody::StreamFsFetch(_) => {
477                current_fragment
478                    .fragment_type_mask
479                    .add(FragmentTypeFlag::FsFetch);
480            }
481
482            NodeBody::VectorIndexWrite(_) => {
483                current_fragment
484                    .fragment_type_mask
485                    .add(FragmentTypeFlag::VectorIndexWrite);
486            }
487
488            NodeBody::UpstreamSinkUnion(_) => {
489                current_fragment
490                    .fragment_type_mask
491                    .add(FragmentTypeFlag::UpstreamSinkUnion);
492            }
493
494            NodeBody::LocalityProvider(_) => {
495                current_fragment
496                    .fragment_type_mask
497                    .add(FragmentTypeFlag::LocalityProvider);
498            }
499
500            _ => {}
501        };
502
503        // handle join logic
504        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
505        {
506            if delta_index_join.get_join_type()? == JoinType::Inner
507                && delta_index_join.condition.is_none()
508            {
509                return build_delta_join_without_arrange(state, current_fragment, stream_node);
510            } else {
511                panic!("only inner join without non-equal condition is supported for delta joins");
512            }
513        }
514
515        // Usually we do not expect exchange node to be visited here, which should be handled by the
516        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
517        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
518        // node to it, so that the meta service can handle it correctly.
519        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
520            stream_node = state.gen_no_op_stream_node(stream_node);
521        }
522
523        // Visit plan children.
524        stream_node.input = stream_node
525            .input
526            .into_iter()
527            .map(|mut child_node| {
528                match child_node.get_node_body()? {
529                    // When exchange node is generated when doing rewrites, it could be having
530                    // zero input. In this case, we won't recursively visit its children.
531                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
532                    // Exchange node indicates a new child fragment.
533                    NodeBody::Exchange(exchange_node) => {
534                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
535
536                        // Exchange node should have only one input.
537                        let [input]: [_; 1] =
538                            std::mem::take(&mut child_node.input).try_into().unwrap();
539                        let child_fragment = build_and_add_fragment(state, input)?;
540
541                        let result = state.fragment_graph.try_add_edge(
542                            child_fragment.fragment_id,
543                            current_fragment.fragment_id,
544                            StreamFragmentEdge {
545                                dispatch_strategy: exchange_node_strategy.clone(),
546                                // Always use the exchange operator id as the link id.
547                                link_id: child_node.operator_id,
548                            },
549                        );
550
551                        // It's possible that there're multiple edges between two fragments, while the
552                        // meta service and the compute node does not expect this. In this case, we
553                        // manually insert a fragment of `NoOp` between the two fragments.
554                        if result.is_err() {
555                            // Assign a new operator id for the `Exchange`, so we can distinguish it
556                            // from duplicate edges and break the sharing.
557                            child_node.operator_id = state.gen_operator_id() as u64;
558
559                            // Take the upstream plan node as the reference for properties of `NoOp`.
560                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
561                            let no_shuffle_strategy = DispatchStrategy {
562                                r#type: DispatcherType::NoShuffle as i32,
563                                dist_key_indices: vec![],
564                                output_mapping: PbDispatchOutputMapping::identical(
565                                    ref_fragment_node.fields.len(),
566                                )
567                                .into(),
568                            };
569
570                            let no_shuffle_exchange_operator_id = state.gen_operator_id() as u64;
571
572                            let no_op_fragment = {
573                                let node = state.gen_no_op_stream_node(StreamNode {
574                                    operator_id: no_shuffle_exchange_operator_id,
575                                    identity: "StreamNoShuffleExchange".into(),
576                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
577                                        strategy: Some(no_shuffle_strategy.clone()),
578                                    }))),
579                                    input: vec![],
580
581                                    // Take reference's properties.
582                                    stream_key: ref_fragment_node.stream_key.clone(),
583                                    stream_kind: ref_fragment_node.stream_kind,
584                                    fields: ref_fragment_node.fields.clone(),
585                                });
586
587                                let mut fragment = state.new_stream_fragment();
588                                fragment.node = Some(node.into());
589                                Rc::new(fragment)
590                            };
591
592                            state.fragment_graph.add_fragment(no_op_fragment.clone());
593
594                            state.fragment_graph.add_edge(
595                                child_fragment.fragment_id,
596                                no_op_fragment.fragment_id,
597                                StreamFragmentEdge {
598                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
599                                    dispatch_strategy: no_shuffle_strategy,
600                                    link_id: no_shuffle_exchange_operator_id,
601                                },
602                            );
603                            state.fragment_graph.add_edge(
604                                no_op_fragment.fragment_id,
605                                current_fragment.fragment_id,
606                                StreamFragmentEdge {
607                                    // Use the original exchange strategy for downstream edge.
608                                    dispatch_strategy: exchange_node_strategy,
609                                    link_id: child_node.operator_id,
610                                },
611                            );
612                        }
613
614                        Ok(child_node)
615                    }
616
617                    // For other children, visit recursively.
618                    _ => build_fragment(state, current_fragment, child_node),
619                }
620            })
621            .collect::<Result<_>>()?;
622        Ok(stream_node)
623    })
624}