risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39    StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49    ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52/// The mutable state when building fragment graph.
53#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56    /// fragment graph field, transformed from input streaming plan.
57    fragment_graph: StreamFragmentGraph,
58    /// local fragment id
59    next_local_fragment_id: FragmentId,
60
61    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
62    /// node traversing.
63    next_table_id: u32,
64
65    /// rewrite will produce new operators, and we need to track next operator id
66    #[educe(Default(expression = u32::MAX - 1))]
67    next_operator_id: u32,
68
69    /// dependent streaming job ids.
70    dependent_table_ids: HashSet<TableId>,
71
72    /// operator id to `LocalFragmentId` mapping used by share operator.
73    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74    /// operator id to `StreamNode` mapping used by share operator.
75    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77    has_source_backfill: bool,
78    has_snapshot_backfill: bool,
79    has_cross_db_snapshot_backfill: bool,
80    has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84    /// Create a new stream fragment with given node with generating a fragment id.
85    fn new_stream_fragment(&mut self) -> StreamFragment {
86        let fragment = StreamFragment::new(self.next_local_fragment_id);
87        self.next_local_fragment_id += 1;
88        fragment
89    }
90
91    /// Generate an operator id
92    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93        self.next_operator_id -= 1;
94        LocalOperatorId::new(self.next_operator_id).into()
95    }
96
97    /// Generate an table id
98    pub fn gen_table_id(&mut self) -> u32 {
99        let ret = self.next_table_id;
100        self.next_table_id += 1;
101        ret
102    }
103
104    /// Generate an table id
105    pub fn gen_table_id_wrapped(&mut self) -> TableId {
106        TableId::new(self.gen_table_id())
107    }
108
109    pub fn add_share_stream_node(
110        &mut self,
111        operator_id: StreamNodeLocalOperatorId,
112        stream_node: StreamNode,
113    ) {
114        self.share_stream_node_mapping
115            .insert(operator_id, stream_node);
116    }
117
118    pub fn get_share_stream_node(
119        &mut self,
120        operator_id: StreamNodeLocalOperatorId,
121    ) -> Option<&StreamNode> {
122        self.share_stream_node_mapping.get(&operator_id)
123    }
124
125    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
126    /// stream node will also be copied from the `input` node.
127    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128        StreamNode {
129            operator_id: self.gen_operator_id(),
130            identity: "StreamNoOp".into(),
131            node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133            // Take input's properties.
134            stream_key: input.stream_key.clone(),
135            stream_kind: input.stream_kind,
136            fields: input.fields.clone(),
137
138            input: vec![input],
139        }
140    }
141}
142
143// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
144#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146    Table,
147    MaterializedView,
148    Source,
149    Sink,
150    Index,
151}
152
153impl GraphJobType {
154    pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155        match self {
156            GraphJobType::Table => config.streaming_parallelism_for_table(),
157            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158            GraphJobType::Source => config.streaming_parallelism_for_source(),
159            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160            GraphJobType::Index => config.streaming_parallelism_for_index(),
161        }
162    }
163}
164
165pub fn build_graph(
166    plan_node: PlanRef,
167    job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169    build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173    plan_node: PlanRef,
174    job_type: Option<GraphJobType>,
175    backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177    let ctx = plan_node.plan_base().ctx();
178    let plan_node = reorganize_elements_id(plan_node);
179
180    let mut state = BuildFragmentGraphState::default();
181    let stream_node = plan_node.to_stream_prost(&mut state)?;
182    generate_fragment_graph(&mut state, stream_node)?;
183    if state.has_source_backfill && state.has_snapshot_backfill {
184        return Err(RwError::from(NotSupported(
185            "Snapshot backfill with shared source backfill is not supported".to_owned(),
186            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188                .to_owned(),
189        )));
190    }
191    if state.has_cross_db_snapshot_backfill
192        && let Some(ref backfill_order) = backfill_order
193        && !backfill_order.order.is_empty()
194    {
195        return Err(RwError::from(NotSupported(
196            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197            "Please remove backfill order specification from your query".to_owned(),
198        )));
199    }
200
201    let (
202        normal_parallelism,
203        backfill_parallelism,
204        adaptive_parallelism_strategy,
205        backfill_adaptive_parallelism_strategy,
206        max_parallelism,
207    ) = {
208        let config = ctx.session_ctx().config();
209        let streaming_parallelism = config.streaming_parallelism();
210        let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
211        let normal_parallelism =
212            derive_parallelism(job_type, job_parallelism, streaming_parallelism);
213        let backfill_parallelism = if state.has_any_backfill {
214            derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
215        } else {
216            ResolvedParallelism {
217                parallelism: None,
218                adaptive_strategy: None,
219            }
220        };
221        (
222            normal_parallelism.parallelism,
223            backfill_parallelism.parallelism,
224            normal_parallelism
225                .adaptive_strategy
226                .as_ref()
227                .map(AdaptiveParallelismStrategy::to_string)
228                .unwrap_or_default(),
229            backfill_parallelism
230                .adaptive_strategy
231                .as_ref()
232                .map(AdaptiveParallelismStrategy::to_string)
233                .unwrap_or_default(),
234            config.streaming_max_parallelism() as _,
235        )
236    };
237
238    let config_override = ctx
239        .session_ctx()
240        .config()
241        .to_initial_streaming_config_override()
242        .context("invalid initial streaming config override")?;
243    let fragments = state
244        .fragment_graph
245        .fragments
246        .into_iter()
247        .map(|(k, v)| (k, v.to_protobuf()))
248        .collect();
249    let edges = state.fragment_graph.edges.into_values().collect();
250
251    Ok(StreamFragmentGraphProto {
252        fragments,
253        edges,
254        dependent_table_ids: state.dependent_table_ids.into_iter().collect(),
255        table_ids_cnt: state.next_table_id,
256        ctx: Some(StreamContext {
257            timezone: ctx.get_session_timezone(),
258            config_override,
259        }),
260        parallelism: normal_parallelism,
261        backfill_parallelism,
262        adaptive_parallelism_strategy,
263        backfill_adaptive_parallelism_strategy,
264        max_parallelism,
265        backfill_order,
266    })
267}
268
269#[cfg(any())]
270fn is_stateful_executor(stream_node: &StreamNode) -> bool {
271    matches!(
272        stream_node.get_node_body().unwrap(),
273        NodeBody::HashAgg(_)
274            | NodeBody::HashJoin(_)
275            | NodeBody::DeltaIndexJoin(_)
276            | NodeBody::StreamScan(_)
277            | NodeBody::StreamCdcScan(_)
278            | NodeBody::DynamicFilter(_)
279    )
280}
281
282/// Do some dirty rewrites before building the fragments.
283/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
284/// throughput) into multiple fragments, which may help improve the I/O concurrency.
285/// Known as "no-shuffle exchange" or "1v1 exchange".
286#[cfg(any())]
287fn rewrite_stream_node(
288    state: &mut BuildFragmentGraphState,
289    stream_node: StreamNode,
290    insert_exchange_flag: bool,
291) -> Result<StreamNode> {
292    let f = |child| {
293        // For stateful operators, set `exchange_flag = true`. If it's already true,
294        // force add an exchange.
295        if is_stateful_executor(&child) {
296            if insert_exchange_flag {
297                let child_node = rewrite_stream_node(state, child, true)?;
298
299                let strategy = DispatchStrategy {
300                    r#type: DispatcherType::NoShuffle.into(),
301                    dist_key_indices: vec![], // TODO: use distribution key
302                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
303                };
304                Ok(StreamNode {
305                    stream_key: child_node.stream_key.clone(),
306                    fields: child_node.fields.clone(),
307                    node_body: Some(NodeBody::Exchange(ExchangeNode {
308                        strategy: Some(strategy),
309                    })),
310                    operator_id: state.gen_operator_id(),
311                    append_only: child_node.append_only,
312                    input: vec![child_node],
313                    identity: "Exchange (NoShuffle)".to_string(),
314                })
315            } else {
316                rewrite_stream_node(state, child, true)
317            }
318        } else {
319            match child.get_node_body()? {
320                // For exchanges, reset the flag.
321                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
322                // Otherwise, recursively visit the children.
323                _ => rewrite_stream_node(state, child, insert_exchange_flag),
324            }
325        }
326    };
327    Ok(StreamNode {
328        input: stream_node
329            .input
330            .into_iter()
331            .map(f)
332            .collect::<Result<_>>()?,
333        ..stream_node
334    })
335}
336
337/// Generate fragment DAG from input streaming plan by their dependency.
338fn generate_fragment_graph(
339    state: &mut BuildFragmentGraphState,
340    stream_node: StreamNode,
341) -> Result<()> {
342    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
343    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
344    // #4614
345    #[cfg(any())]
346    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
347
348    build_and_add_fragment(state, stream_node)?;
349    Ok(())
350}
351
352/// Use the given `stream_node` to create a fragment and add it to graph.
353fn build_and_add_fragment(
354    state: &mut BuildFragmentGraphState,
355    stream_node: StreamNode,
356) -> Result<Rc<StreamFragment>> {
357    let operator_id = stream_node.operator_id;
358    match state.share_mapping.get(&operator_id) {
359        None => {
360            let mut fragment = state.new_stream_fragment();
361            let node = build_fragment(state, &mut fragment, stream_node)?;
362
363            // It's possible that the stream node is rewritten while building the fragment, for
364            // example, empty fragment to no-op fragment. We get the operator id again instead of
365            // using the original one.
366            let operator_id = node.operator_id;
367
368            assert!(fragment.node.is_none());
369            fragment.node = Some(Box::new(node));
370            let fragment_ref = Rc::new(fragment);
371
372            state.fragment_graph.add_fragment(fragment_ref.clone());
373            state
374                .share_mapping
375                .insert(operator_id, fragment_ref.fragment_id);
376            Ok(fragment_ref)
377        }
378        Some(fragment_id) => Ok(state
379            .fragment_graph
380            .get_fragment(fragment_id)
381            .unwrap()
382            .clone()),
383    }
384}
385
386/// Build new fragment and link dependencies by visiting children recursively, update
387/// `requires_singleton` and `fragment_type` properties for current fragment.
388fn build_fragment(
389    state: &mut BuildFragmentGraphState,
390    current_fragment: &mut StreamFragment,
391    mut stream_node: StreamNode,
392) -> Result<StreamNode> {
393    recursive::tracker!().recurse(|_t| {
394        // Update current fragment based on the node we're visiting.
395        match stream_node.get_node_body()? {
396            NodeBody::BarrierRecv(_) => current_fragment
397                .fragment_type_mask
398                .add(FragmentTypeFlag::BarrierRecv),
399
400            NodeBody::Source(node) => {
401                current_fragment
402                    .fragment_type_mask
403                    .add(FragmentTypeFlag::Source);
404
405                if let Some(source) = node.source_inner.as_ref()
406                    && let Some(source_info) = source.info.as_ref()
407                    && ((source_info.is_shared() && !source_info.is_distributed)
408                        || source.with_properties.requires_singleton())
409                {
410                    current_fragment.requires_singleton = true;
411                }
412            }
413
414            NodeBody::Dml(_) => {
415                current_fragment
416                    .fragment_type_mask
417                    .add(FragmentTypeFlag::Dml);
418            }
419
420            NodeBody::Materialize(_) => {
421                current_fragment
422                    .fragment_type_mask
423                    .add(FragmentTypeFlag::Mview);
424            }
425
426            NodeBody::Sink(_) => current_fragment
427                .fragment_type_mask
428                .add(FragmentTypeFlag::Sink),
429
430            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
431
432            NodeBody::StreamScan(node) => {
433                current_fragment
434                    .fragment_type_mask
435                    .add(FragmentTypeFlag::StreamScan);
436                match node.stream_scan_type() {
437                    StreamScanType::SnapshotBackfill => {
438                        current_fragment
439                            .fragment_type_mask
440                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
441                        state.has_snapshot_backfill = true;
442                        state.has_any_backfill = true;
443                    }
444                    StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
445                        state.has_any_backfill = true;
446                    }
447                    StreamScanType::CrossDbSnapshotBackfill => {
448                        current_fragment
449                            .fragment_type_mask
450                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
451                        state.has_cross_db_snapshot_backfill = true;
452                        state.has_any_backfill = true;
453                    }
454                    StreamScanType::Unspecified
455                    | StreamScanType::Chain
456                    | StreamScanType::Rearrange
457                    | StreamScanType::UpstreamOnly => {}
458                }
459                // memorize table id for later use
460                // The table id could be a upstream CDC source
461                state.dependent_table_ids.insert(node.table_id);
462            }
463
464            NodeBody::StreamCdcScan(node) => {
465                if let Some(o) = node.options
466                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
467                {
468                    // Use parallel CDC backfill.
469                    current_fragment
470                        .fragment_type_mask
471                        .add(FragmentTypeFlag::StreamCdcScan);
472                } else {
473                    current_fragment
474                        .fragment_type_mask
475                        .add(FragmentTypeFlag::StreamScan);
476                    // the backfill algorithm is not parallel safe
477                    current_fragment.requires_singleton = true;
478                }
479                state.has_source_backfill = true;
480                state.has_any_backfill = true;
481            }
482
483            NodeBody::CdcFilter(node) => {
484                current_fragment
485                    .fragment_type_mask
486                    .add(FragmentTypeFlag::CdcFilter);
487                // memorize upstream source id for later use
488                state
489                    .dependent_table_ids
490                    .insert(node.upstream_source_id.as_cdc_table_id());
491            }
492            NodeBody::SourceBackfill(node) => {
493                current_fragment
494                    .fragment_type_mask
495                    .add(FragmentTypeFlag::SourceScan);
496                // memorize upstream source id for later use
497                let source_id = node.upstream_source_id;
498                state
499                    .dependent_table_ids
500                    .insert(source_id.as_cdc_table_id());
501                state.has_source_backfill = true;
502                state.has_any_backfill = true;
503            }
504
505            NodeBody::Now(_) => {
506                // TODO: Remove this and insert a `BarrierRecv` instead.
507                current_fragment
508                    .fragment_type_mask
509                    .add(FragmentTypeFlag::Now);
510                current_fragment.requires_singleton = true;
511            }
512
513            NodeBody::Values(_) => {
514                current_fragment
515                    .fragment_type_mask
516                    .add(FragmentTypeFlag::Values);
517                current_fragment.requires_singleton = true;
518            }
519
520            NodeBody::StreamFsFetch(_) => {
521                current_fragment
522                    .fragment_type_mask
523                    .add(FragmentTypeFlag::FsFetch);
524            }
525
526            NodeBody::VectorIndexWrite(_) => {
527                current_fragment
528                    .fragment_type_mask
529                    .add(FragmentTypeFlag::VectorIndexWrite);
530            }
531
532            NodeBody::UpstreamSinkUnion(_) => {
533                current_fragment
534                    .fragment_type_mask
535                    .add(FragmentTypeFlag::UpstreamSinkUnion);
536            }
537
538            NodeBody::LocalityProvider(_) => {
539                current_fragment
540                    .fragment_type_mask
541                    .add(FragmentTypeFlag::LocalityProvider);
542            }
543
544            _ => {}
545        };
546
547        // handle join logic
548        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
549        {
550            if delta_index_join.get_join_type()? == JoinType::Inner
551                && delta_index_join.condition.is_none()
552            {
553                return build_delta_join_without_arrange(state, current_fragment, stream_node);
554            } else {
555                panic!("only inner join without non-equal condition is supported for delta joins");
556            }
557        }
558
559        // Usually we do not expect exchange node to be visited here, which should be handled by the
560        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
561        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
562        // node to it, so that the meta service can handle it correctly.
563        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
564            stream_node = state.gen_no_op_stream_node(stream_node);
565        }
566
567        // Visit plan children.
568        stream_node.input = stream_node
569            .input
570            .into_iter()
571            .map(|mut child_node| {
572                match child_node.get_node_body()? {
573                    // When exchange node is generated when doing rewrites, it could be having
574                    // zero input. In this case, we won't recursively visit its children.
575                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
576                    // Exchange node indicates a new child fragment.
577                    NodeBody::Exchange(exchange_node) => {
578                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
579
580                        // Exchange node should have only one input.
581                        let [input]: [_; 1] =
582                            std::mem::take(&mut child_node.input).try_into().unwrap();
583                        let child_fragment = build_and_add_fragment(state, input)?;
584
585                        let result = state.fragment_graph.try_add_edge(
586                            child_fragment.fragment_id,
587                            current_fragment.fragment_id,
588                            StreamFragmentEdge {
589                                dispatch_strategy: exchange_node_strategy.clone(),
590                                // Always use the exchange operator id as the link id.
591                                link_id: child_node.operator_id.as_raw_id(),
592                            },
593                        );
594
595                        // It's possible that there're multiple edges between two fragments, while the
596                        // meta service and the compute node does not expect this. In this case, we
597                        // manually insert a fragment of `NoOp` between the two fragments.
598                        if result.is_err() {
599                            // Assign a new operator id for the `Exchange`, so we can distinguish it
600                            // from duplicate edges and break the sharing.
601                            child_node.operator_id = state.gen_operator_id();
602
603                            // Take the upstream plan node as the reference for properties of `NoOp`.
604                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
605                            let no_shuffle_strategy = DispatchStrategy {
606                                r#type: DispatcherType::NoShuffle as i32,
607                                dist_key_indices: vec![],
608                                output_mapping: PbDispatchOutputMapping::identical(
609                                    ref_fragment_node.fields.len(),
610                                )
611                                .into(),
612                            };
613
614                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
615
616                            let no_op_fragment = {
617                                let node = state.gen_no_op_stream_node(StreamNode {
618                                    operator_id: no_shuffle_exchange_operator_id,
619                                    identity: "StreamNoShuffleExchange".into(),
620                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
621                                        strategy: Some(no_shuffle_strategy.clone()),
622                                    }))),
623                                    input: vec![],
624
625                                    // Take reference's properties.
626                                    stream_key: ref_fragment_node.stream_key.clone(),
627                                    stream_kind: ref_fragment_node.stream_kind,
628                                    fields: ref_fragment_node.fields.clone(),
629                                });
630
631                                let mut fragment = state.new_stream_fragment();
632                                fragment.node = Some(node.into());
633                                Rc::new(fragment)
634                            };
635
636                            state.fragment_graph.add_fragment(no_op_fragment.clone());
637
638                            state.fragment_graph.add_edge(
639                                child_fragment.fragment_id,
640                                no_op_fragment.fragment_id,
641                                StreamFragmentEdge {
642                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
643                                    dispatch_strategy: no_shuffle_strategy,
644                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
645                                },
646                            );
647                            state.fragment_graph.add_edge(
648                                no_op_fragment.fragment_id,
649                                current_fragment.fragment_id,
650                                StreamFragmentEdge {
651                                    // Use the original exchange strategy for downstream edge.
652                                    dispatch_strategy: exchange_node_strategy,
653                                    link_id: child_node.operator_id.as_raw_id(),
654                                },
655                            );
656                        }
657
658                        Ok(child_node)
659                    }
660
661                    // For other children, visit recursively.
662                    _ => build_fragment(state, current_fragment, child_node),
663                }
664            })
665            .collect::<Result<_>>()?;
666        Ok(stream_node)
667    })
668}