Skip to main content

risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39    StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49    ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52/// The mutable state when building fragment graph.
53#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56    /// fragment graph field, transformed from input streaming plan.
57    fragment_graph: StreamFragmentGraph,
58    /// local fragment id
59    next_local_fragment_id: FragmentId,
60
61    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
62    /// node traversing.
63    next_table_id: u32,
64
65    /// rewrite will produce new operators, and we need to track next operator id
66    #[educe(Default(expression = u32::MAX - 1))]
67    next_operator_id: u32,
68
69    /// dependent streaming job ids.
70    dependent_table_ids: HashSet<TableId>,
71
72    /// operator id to `LocalFragmentId` mapping used by share operator.
73    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74    /// operator id to `StreamNode` mapping used by share operator.
75    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77    has_source_backfill: bool,
78    has_snapshot_backfill: bool,
79    has_cross_db_snapshot_backfill: bool,
80    has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84    /// Create a new stream fragment with given node with generating a fragment id.
85    fn new_stream_fragment(&mut self) -> StreamFragment {
86        let fragment = StreamFragment::new(self.next_local_fragment_id);
87        self.next_local_fragment_id += 1;
88        fragment
89    }
90
91    /// Generate an operator id
92    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93        self.next_operator_id -= 1;
94        LocalOperatorId::new(self.next_operator_id).into()
95    }
96
97    /// Generate an table id
98    pub fn gen_table_id(&mut self) -> u32 {
99        let ret = self.next_table_id;
100        self.next_table_id += 1;
101        ret
102    }
103
104    /// Generate an table id
105    pub fn gen_table_id_wrapped(&mut self) -> TableId {
106        TableId::new(self.gen_table_id())
107    }
108
109    pub fn add_share_stream_node(
110        &mut self,
111        operator_id: StreamNodeLocalOperatorId,
112        stream_node: StreamNode,
113    ) {
114        self.share_stream_node_mapping
115            .insert(operator_id, stream_node);
116    }
117
118    pub fn get_share_stream_node(
119        &mut self,
120        operator_id: StreamNodeLocalOperatorId,
121    ) -> Option<&StreamNode> {
122        self.share_stream_node_mapping.get(&operator_id)
123    }
124
125    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
126    /// stream node will also be copied from the `input` node.
127    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128        StreamNode {
129            operator_id: self.gen_operator_id(),
130            identity: "StreamNoOp".into(),
131            node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133            // Take input's properties.
134            stream_key: input.stream_key.clone(),
135            stream_kind: input.stream_kind,
136            fields: input.fields.clone(),
137
138            input: vec![input],
139        }
140    }
141}
142
143// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
144#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146    Table,
147    MaterializedView,
148    Source,
149    Sink,
150    Index,
151}
152
153impl GraphJobType {
154    pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155        match self {
156            GraphJobType::Table => config.streaming_parallelism_for_table(),
157            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158            GraphJobType::Source => config.streaming_parallelism_for_source(),
159            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160            GraphJobType::Index => config.streaming_parallelism_for_index(),
161        }
162    }
163}
164
165pub fn build_graph(
166    plan_node: PlanRef,
167    job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169    build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173    plan_node: PlanRef,
174    job_type: Option<GraphJobType>,
175    backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177    let ctx = plan_node.plan_base().ctx();
178    let plan_node = reorganize_elements_id(plan_node);
179
180    let mut state = BuildFragmentGraphState::default();
181    let stream_node = plan_node.to_stream_prost(&mut state)?;
182    generate_fragment_graph(&mut state, stream_node)?;
183    if state.has_source_backfill && state.has_snapshot_backfill {
184        return Err(RwError::from(NotSupported(
185            "Snapshot backfill with shared source backfill is not supported".to_owned(),
186            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188                .to_owned(),
189        )));
190    }
191    if state.has_cross_db_snapshot_backfill
192        && let Some(ref backfill_order) = backfill_order
193        && !backfill_order.order.is_empty()
194    {
195        return Err(RwError::from(NotSupported(
196            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197            "Please remove backfill order specification from your query".to_owned(),
198        )));
199    }
200
201    let (
202        normal_parallelism,
203        backfill_parallelism,
204        adaptive_parallelism_strategy,
205        backfill_adaptive_parallelism_strategy,
206        max_parallelism,
207    ) = {
208        let config = ctx.session_ctx().config();
209        let streaming_parallelism = config.streaming_parallelism();
210        let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
211        let normal_parallelism =
212            derive_parallelism(job_type, job_parallelism, streaming_parallelism);
213        let backfill_parallelism = if state.has_any_backfill {
214            derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
215        } else {
216            ResolvedParallelism {
217                parallelism: None,
218                adaptive_strategy: None,
219            }
220        };
221        (
222            normal_parallelism.parallelism,
223            backfill_parallelism.parallelism,
224            normal_parallelism
225                .adaptive_strategy
226                .as_ref()
227                .map(AdaptiveParallelismStrategy::to_string)
228                .unwrap_or_default(),
229            backfill_parallelism
230                .adaptive_strategy
231                .as_ref()
232                .map(AdaptiveParallelismStrategy::to_string)
233                .unwrap_or_default(),
234            config.streaming_max_parallelism() as _,
235        )
236    };
237
238    let config_override = ctx
239        .session_ctx()
240        .config()
241        .to_initial_streaming_config_override()
242        .context("invalid initial streaming config override")?;
243    let fragments = state
244        .fragment_graph
245        .fragments
246        .into_iter()
247        .map(|(k, v)| (k, v.to_protobuf()))
248        .collect();
249    let edges = state.fragment_graph.edges.into_values().collect();
250
251    Ok(StreamFragmentGraphProto {
252        fragments,
253        edges,
254        dependent_table_ids: state.dependent_table_ids.into_iter().collect(),
255        table_ids_cnt: state.next_table_id,
256        ctx: Some(StreamContext {
257            timezone: ctx.get_session_timezone(),
258            config_override,
259        }),
260        parallelism: normal_parallelism,
261        backfill_parallelism,
262        adaptive_parallelism_strategy,
263        backfill_adaptive_parallelism_strategy,
264        max_parallelism,
265        backfill_order,
266    })
267}
268
269#[cfg(any())]
270fn is_stateful_executor(stream_node: &StreamNode) -> bool {
271    matches!(
272        stream_node.get_node_body().unwrap(),
273        NodeBody::HashAgg(_)
274            | NodeBody::HashJoin(_)
275            | NodeBody::DeltaIndexJoin(_)
276            | NodeBody::StreamScan(_)
277            | NodeBody::StreamCdcScan(_)
278            | NodeBody::DynamicFilter(_)
279    )
280}
281
282/// Do some dirty rewrites before building the fragments.
283/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
284/// throughput) into multiple fragments, which may help improve the I/O concurrency.
285/// Known as "no-shuffle exchange" or "1v1 exchange".
286#[cfg(any())]
287fn rewrite_stream_node(
288    state: &mut BuildFragmentGraphState,
289    stream_node: StreamNode,
290    insert_exchange_flag: bool,
291) -> Result<StreamNode> {
292    let f = |child| {
293        // For stateful operators, set `exchange_flag = true`. If it's already true,
294        // force add an exchange.
295        if is_stateful_executor(&child) {
296            if insert_exchange_flag {
297                let child_node = rewrite_stream_node(state, child, true)?;
298
299                let strategy = DispatchStrategy {
300                    r#type: DispatcherType::NoShuffle.into(),
301                    dist_key_indices: vec![], // TODO: use distribution key
302                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
303                };
304                Ok(StreamNode {
305                    stream_key: child_node.stream_key.clone(),
306                    fields: child_node.fields.clone(),
307                    node_body: Some(NodeBody::Exchange(ExchangeNode {
308                        strategy: Some(strategy),
309                    })),
310                    operator_id: state.gen_operator_id(),
311                    append_only: child_node.append_only,
312                    input: vec![child_node],
313                    identity: "Exchange (NoShuffle)".to_string(),
314                })
315            } else {
316                rewrite_stream_node(state, child, true)
317            }
318        } else {
319            match child.get_node_body()? {
320                // For exchanges, reset the flag.
321                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
322                // Otherwise, recursively visit the children.
323                _ => rewrite_stream_node(state, child, insert_exchange_flag),
324            }
325        }
326    };
327    Ok(StreamNode {
328        input: stream_node
329            .input
330            .into_iter()
331            .map(f)
332            .collect::<Result<_>>()?,
333        ..stream_node
334    })
335}
336
337/// Generate fragment DAG from input streaming plan by their dependency.
338fn generate_fragment_graph(
339    state: &mut BuildFragmentGraphState,
340    stream_node: StreamNode,
341) -> Result<()> {
342    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
343    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
344    // #4614
345    #[cfg(any())]
346    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
347
348    build_and_add_fragment(state, stream_node)?;
349    Ok(())
350}
351
352/// Use the given `stream_node` to create a fragment and add it to graph.
353fn build_and_add_fragment(
354    state: &mut BuildFragmentGraphState,
355    stream_node: StreamNode,
356) -> Result<Rc<StreamFragment>> {
357    let operator_id = stream_node.operator_id;
358    match state.share_mapping.get(&operator_id) {
359        None => {
360            let mut fragment = state.new_stream_fragment();
361            let node = build_fragment(state, &mut fragment, stream_node)?;
362
363            // It's possible that the stream node is rewritten while building the fragment, for
364            // example, empty fragment to no-op fragment. We get the operator id again instead of
365            // using the original one.
366            let operator_id = node.operator_id;
367
368            assert!(fragment.node.is_none());
369            fragment.node = Some(Box::new(node));
370            let fragment_ref = Rc::new(fragment);
371
372            state.fragment_graph.add_fragment(fragment_ref.clone());
373            state
374                .share_mapping
375                .insert(operator_id, fragment_ref.fragment_id);
376            Ok(fragment_ref)
377        }
378        Some(fragment_id) => Ok(state
379            .fragment_graph
380            .get_fragment(fragment_id)
381            .unwrap()
382            .clone()),
383    }
384}
385
386/// Build new fragment and link dependencies by visiting children recursively, update
387/// `requires_singleton` and `fragment_type` properties for current fragment.
388fn build_fragment(
389    state: &mut BuildFragmentGraphState,
390    current_fragment: &mut StreamFragment,
391    mut stream_node: StreamNode,
392) -> Result<StreamNode> {
393    recursive::tracker!().recurse(|_t| {
394        // Update current fragment based on the node we're visiting.
395        match stream_node.get_node_body()? {
396            NodeBody::BarrierRecv(_) => current_fragment
397                .fragment_type_mask
398                .add(FragmentTypeFlag::BarrierRecv),
399
400            NodeBody::Source(node) => {
401                current_fragment
402                    .fragment_type_mask
403                    .add(FragmentTypeFlag::Source);
404
405                if let Some(source) = node.source_inner.as_ref()
406                    && let Some(source_info) = source.info.as_ref()
407                    && ((source_info.is_shared() && !source_info.is_distributed)
408                        || source.with_properties.requires_singleton())
409                {
410                    current_fragment.requires_singleton = true;
411                }
412            }
413
414            NodeBody::Dml(_) => {
415                current_fragment
416                    .fragment_type_mask
417                    .add(FragmentTypeFlag::Dml);
418            }
419
420            NodeBody::Materialize(_) => {
421                current_fragment
422                    .fragment_type_mask
423                    .add(FragmentTypeFlag::Mview);
424            }
425
426            NodeBody::Sink(_) => current_fragment
427                .fragment_type_mask
428                .add(FragmentTypeFlag::Sink),
429
430            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
431
432            NodeBody::StreamScan(node) => {
433                current_fragment
434                    .fragment_type_mask
435                    .add(FragmentTypeFlag::StreamScan);
436                #[expect(deprecated)]
437                match node.stream_scan_type() {
438                    StreamScanType::SnapshotBackfill => {
439                        current_fragment
440                            .fragment_type_mask
441                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
442                        state.has_snapshot_backfill = true;
443                        state.has_any_backfill = true;
444                    }
445                    StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
446                        state.has_any_backfill = true;
447                    }
448                    StreamScanType::CrossDbSnapshotBackfill => {
449                        current_fragment
450                            .fragment_type_mask
451                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
452                        state.has_cross_db_snapshot_backfill = true;
453                        state.has_any_backfill = true;
454                    }
455                    StreamScanType::Unspecified
456                    | StreamScanType::Chain
457                    | StreamScanType::Rearrange
458                    | StreamScanType::UpstreamOnly => {}
459                }
460                // memorize table id for later use
461                // The table id could be a upstream CDC source
462                state.dependent_table_ids.insert(node.table_id);
463            }
464
465            NodeBody::StreamCdcScan(node) => {
466                if let Some(o) = node.options
467                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
468                {
469                    // Use parallel CDC backfill.
470                    current_fragment
471                        .fragment_type_mask
472                        .add(FragmentTypeFlag::StreamCdcScan);
473                } else {
474                    current_fragment
475                        .fragment_type_mask
476                        .add(FragmentTypeFlag::StreamScan);
477                    // the backfill algorithm is not parallel safe
478                    current_fragment.requires_singleton = true;
479                }
480                state.has_source_backfill = true;
481                state.has_any_backfill = true;
482            }
483
484            NodeBody::CdcFilter(node) => {
485                current_fragment
486                    .fragment_type_mask
487                    .add(FragmentTypeFlag::CdcFilter);
488                // memorize upstream source id for later use
489                state
490                    .dependent_table_ids
491                    .insert(node.upstream_source_id.as_cdc_table_id());
492            }
493            NodeBody::SourceBackfill(node) => {
494                current_fragment
495                    .fragment_type_mask
496                    .add(FragmentTypeFlag::SourceScan);
497                // memorize upstream source id for later use
498                let source_id = node.upstream_source_id;
499                state
500                    .dependent_table_ids
501                    .insert(source_id.as_cdc_table_id());
502                state.has_source_backfill = true;
503                state.has_any_backfill = true;
504            }
505
506            NodeBody::Now(_) => {
507                // TODO: Remove this and insert a `BarrierRecv` instead.
508                current_fragment
509                    .fragment_type_mask
510                    .add(FragmentTypeFlag::Now);
511                current_fragment.requires_singleton = true;
512            }
513
514            NodeBody::Values(_) => {
515                current_fragment
516                    .fragment_type_mask
517                    .add(FragmentTypeFlag::Values);
518                current_fragment.requires_singleton = true;
519            }
520
521            NodeBody::StreamFsFetch(_) => {
522                current_fragment
523                    .fragment_type_mask
524                    .add(FragmentTypeFlag::FsFetch);
525            }
526
527            NodeBody::VectorIndexWrite(_) => {
528                current_fragment
529                    .fragment_type_mask
530                    .add(FragmentTypeFlag::VectorIndexWrite);
531            }
532
533            NodeBody::UpstreamSinkUnion(_) => {
534                current_fragment
535                    .fragment_type_mask
536                    .add(FragmentTypeFlag::UpstreamSinkUnion);
537            }
538
539            NodeBody::LocalityProvider(_) => {
540                current_fragment
541                    .fragment_type_mask
542                    .add(FragmentTypeFlag::LocalityProvider);
543            }
544
545            _ => {}
546        };
547
548        // handle join logic
549        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
550        {
551            if delta_index_join.get_join_type()? == JoinType::Inner
552                && delta_index_join.condition.is_none()
553            {
554                return build_delta_join_without_arrange(state, current_fragment, stream_node);
555            } else {
556                panic!("only inner join without non-equal condition is supported for delta joins");
557            }
558        }
559
560        // Usually we do not expect exchange node to be visited here, which should be handled by the
561        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
562        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
563        // node to it, so that the meta service can handle it correctly.
564        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
565            stream_node = state.gen_no_op_stream_node(stream_node);
566        }
567
568        // Visit plan children.
569        stream_node.input = stream_node
570            .input
571            .into_iter()
572            .map(|mut child_node| {
573                match child_node.get_node_body()? {
574                    // When exchange node is generated when doing rewrites, it could be having
575                    // zero input. In this case, we won't recursively visit its children.
576                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
577                    // Exchange node indicates a new child fragment.
578                    NodeBody::Exchange(exchange_node) => {
579                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
580
581                        // Exchange node should have only one input.
582                        let [input]: [_; 1] =
583                            std::mem::take(&mut child_node.input).try_into().unwrap();
584                        let child_fragment = build_and_add_fragment(state, input)?;
585
586                        let result = state.fragment_graph.try_add_edge(
587                            child_fragment.fragment_id,
588                            current_fragment.fragment_id,
589                            StreamFragmentEdge {
590                                dispatch_strategy: exchange_node_strategy.clone(),
591                                // Always use the exchange operator id as the link id.
592                                link_id: child_node.operator_id.as_raw_id(),
593                            },
594                        );
595
596                        // It's possible that there're multiple edges between two fragments, while the
597                        // meta service and the compute node does not expect this. In this case, we
598                        // manually insert a fragment of `NoOp` between the two fragments.
599                        if result.is_err() {
600                            // Assign a new operator id for the `Exchange`, so we can distinguish it
601                            // from duplicate edges and break the sharing.
602                            child_node.operator_id = state.gen_operator_id();
603
604                            // Take the upstream plan node as the reference for properties of `NoOp`.
605                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
606                            let no_shuffle_strategy = DispatchStrategy {
607                                r#type: DispatcherType::NoShuffle as i32,
608                                dist_key_indices: vec![],
609                                output_mapping: PbDispatchOutputMapping::identical(
610                                    ref_fragment_node.fields.len(),
611                                )
612                                .into(),
613                            };
614
615                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
616
617                            let no_op_fragment = {
618                                let node = state.gen_no_op_stream_node(StreamNode {
619                                    operator_id: no_shuffle_exchange_operator_id,
620                                    identity: "StreamNoShuffleExchange".into(),
621                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
622                                        strategy: Some(no_shuffle_strategy.clone()),
623                                    }))),
624                                    input: vec![],
625
626                                    // Take reference's properties.
627                                    stream_key: ref_fragment_node.stream_key.clone(),
628                                    stream_kind: ref_fragment_node.stream_kind,
629                                    fields: ref_fragment_node.fields.clone(),
630                                });
631
632                                let mut fragment = state.new_stream_fragment();
633                                fragment.node = Some(node.into());
634                                Rc::new(fragment)
635                            };
636
637                            state.fragment_graph.add_fragment(no_op_fragment.clone());
638
639                            state.fragment_graph.add_edge(
640                                child_fragment.fragment_id,
641                                no_op_fragment.fragment_id,
642                                StreamFragmentEdge {
643                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
644                                    dispatch_strategy: no_shuffle_strategy,
645                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
646                                },
647                            );
648                            state.fragment_graph.add_edge(
649                                no_op_fragment.fragment_id,
650                                current_fragment.fragment_id,
651                                StreamFragmentEdge {
652                                    // Use the original exchange strategy for downstream edge.
653                                    dispatch_strategy: exchange_node_strategy,
654                                    link_id: child_node.operator_id.as_raw_id(),
655                                },
656                            );
657                        }
658
659                        Ok(child_node)
660                    }
661
662                    // For other children, visit recursively.
663                    _ => build_fragment(state, current_fragment, child_node),
664                }
665            })
666            .collect::<Result<_>>()?;
667        Ok(stream_node)
668    })
669}