risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::stream_plan::stream_node::NodeBody;
21mod parallelism;
22mod rewrite;
23
24use std::collections::{HashMap, HashSet};
25use std::ops::Deref;
26use std::rc::Rc;
27
28use educe::Educe;
29use risingwave_common::catalog::{FragmentTypeFlag, TableId};
30use risingwave_common::session_config::SessionConfig;
31use risingwave_common::session_config::parallelism::ConfigParallelism;
32use risingwave_common::system_param::AdaptiveParallelismStrategy;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39    StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::{
49    ResolvedParallelism, derive_backfill_parallelism, derive_parallelism,
50};
51
52/// The mutable state when building fragment graph.
53#[derive(Educe)]
54#[educe(Default)]
55pub struct BuildFragmentGraphState {
56    /// fragment graph field, transformed from input streaming plan.
57    fragment_graph: StreamFragmentGraph,
58    /// local fragment id
59    next_local_fragment_id: FragmentId,
60
61    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
62    /// node traversing.
63    next_table_id: u32,
64
65    /// rewrite will produce new operators, and we need to track next operator id
66    #[educe(Default(expression = u32::MAX - 1))]
67    next_operator_id: u32,
68
69    /// dependent streaming job ids.
70    dependent_table_ids: HashSet<TableId>,
71
72    /// operator id to `LocalFragmentId` mapping used by share operator.
73    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
74    /// operator id to `StreamNode` mapping used by share operator.
75    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
76
77    has_source_backfill: bool,
78    has_snapshot_backfill: bool,
79    has_cross_db_snapshot_backfill: bool,
80    has_any_backfill: bool,
81}
82
83impl BuildFragmentGraphState {
84    /// Create a new stream fragment with given node with generating a fragment id.
85    fn new_stream_fragment(&mut self) -> StreamFragment {
86        let fragment = StreamFragment::new(self.next_local_fragment_id);
87        self.next_local_fragment_id += 1;
88        fragment
89    }
90
91    /// Generate an operator id
92    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
93        self.next_operator_id -= 1;
94        LocalOperatorId::new(self.next_operator_id).into()
95    }
96
97    /// Generate an table id
98    pub fn gen_table_id(&mut self) -> u32 {
99        let ret = self.next_table_id;
100        self.next_table_id += 1;
101        ret
102    }
103
104    /// Generate an table id
105    pub fn gen_table_id_wrapped(&mut self) -> TableId {
106        TableId::new(self.gen_table_id())
107    }
108
109    pub fn add_share_stream_node(
110        &mut self,
111        operator_id: StreamNodeLocalOperatorId,
112        stream_node: StreamNode,
113    ) {
114        self.share_stream_node_mapping
115            .insert(operator_id, stream_node);
116    }
117
118    pub fn get_share_stream_node(
119        &mut self,
120        operator_id: StreamNodeLocalOperatorId,
121    ) -> Option<&StreamNode> {
122        self.share_stream_node_mapping.get(&operator_id)
123    }
124
125    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
126    /// stream node will also be copied from the `input` node.
127    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
128        StreamNode {
129            operator_id: self.gen_operator_id(),
130            identity: "StreamNoOp".into(),
131            node_body: Some(NodeBody::NoOp(NoOpNode {})),
132
133            // Take input's properties.
134            stream_key: input.stream_key.clone(),
135            stream_kind: input.stream_kind,
136            fields: input.fields.clone(),
137
138            input: vec![input],
139        }
140    }
141}
142
143// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
144#[derive(Clone, Copy, Debug, PartialEq, Eq)]
145pub enum GraphJobType {
146    Table,
147    MaterializedView,
148    Source,
149    Sink,
150    Index,
151}
152
153impl GraphJobType {
154    pub fn to_parallelism(self, config: &SessionConfig) -> ConfigParallelism {
155        match self {
156            GraphJobType::Table => config.streaming_parallelism_for_table(),
157            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
158            GraphJobType::Source => config.streaming_parallelism_for_source(),
159            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
160            GraphJobType::Index => config.streaming_parallelism_for_index(),
161        }
162    }
163}
164
165pub fn build_graph(
166    plan_node: PlanRef,
167    job_type: Option<GraphJobType>,
168) -> Result<StreamFragmentGraphProto> {
169    build_graph_with_strategy(plan_node, job_type, None)
170}
171
172pub fn build_graph_with_strategy(
173    plan_node: PlanRef,
174    job_type: Option<GraphJobType>,
175    backfill_order: Option<BackfillOrder>,
176) -> Result<StreamFragmentGraphProto> {
177    let ctx = plan_node.plan_base().ctx();
178    let plan_node = reorganize_elements_id(plan_node);
179
180    let mut state = BuildFragmentGraphState::default();
181    let stream_node = plan_node.to_stream_prost(&mut state)?;
182    generate_fragment_graph(&mut state, stream_node)?;
183    if state.has_source_backfill && state.has_snapshot_backfill {
184        return Err(RwError::from(NotSupported(
185            "Snapshot backfill with shared source backfill is not supported".to_owned(),
186            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
187                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
188                .to_owned(),
189        )));
190    }
191    if state.has_cross_db_snapshot_backfill
192        && let Some(ref backfill_order) = backfill_order
193        && !backfill_order.order.is_empty()
194    {
195        return Err(RwError::from(NotSupported(
196            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
197            "Please remove backfill order specification from your query".to_owned(),
198        )));
199    }
200
201    let mut fragment_graph = state.fragment_graph.to_protobuf();
202
203    // Set table ids.
204    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
205    fragment_graph.table_ids_cnt = state.next_table_id;
206
207    // Set parallelism and vnode count.
208    let parallelism_strategy = {
209        let config = ctx.session_ctx().config();
210        let streaming_parallelism = config.streaming_parallelism();
211        let job_parallelism = job_type.map(|t| t.to_parallelism(config.deref()));
212        let normal_parallelism =
213            derive_parallelism(job_type, job_parallelism, streaming_parallelism);
214        let backfill_parallelism = if state.has_any_backfill {
215            derive_backfill_parallelism(config.streaming_parallelism_for_backfill())
216        } else {
217            ResolvedParallelism {
218                parallelism: None,
219                adaptive_strategy: None,
220            }
221        };
222        fragment_graph.parallelism = normal_parallelism.parallelism;
223        fragment_graph.backfill_parallelism = backfill_parallelism.parallelism;
224        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
225        (
226            normal_parallelism.adaptive_strategy,
227            backfill_parallelism.adaptive_strategy,
228        )
229    };
230
231    // Set context for this streaming job.
232    let config_override = ctx
233        .session_ctx()
234        .config()
235        .to_initial_streaming_config_override()
236        .context("invalid initial streaming config override")?;
237    let adaptive_parallelism_strategy = parallelism_strategy
238        .0
239        .as_ref()
240        .map(AdaptiveParallelismStrategy::to_string)
241        .unwrap_or_default();
242    let backfill_adaptive_parallelism_strategy = parallelism_strategy
243        .1
244        .as_ref()
245        .map(AdaptiveParallelismStrategy::to_string)
246        .unwrap_or_default();
247    fragment_graph.ctx = Some(StreamContext {
248        timezone: ctx.get_session_timezone(),
249        config_override,
250        adaptive_parallelism_strategy,
251        backfill_adaptive_parallelism_strategy,
252    });
253
254    fragment_graph.backfill_order = backfill_order;
255
256    Ok(fragment_graph)
257}
258
259#[cfg(any())]
260fn is_stateful_executor(stream_node: &StreamNode) -> bool {
261    matches!(
262        stream_node.get_node_body().unwrap(),
263        NodeBody::HashAgg(_)
264            | NodeBody::HashJoin(_)
265            | NodeBody::DeltaIndexJoin(_)
266            | NodeBody::StreamScan(_)
267            | NodeBody::StreamCdcScan(_)
268            | NodeBody::DynamicFilter(_)
269    )
270}
271
272/// Do some dirty rewrites before building the fragments.
273/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
274/// throughput) into multiple fragments, which may help improve the I/O concurrency.
275/// Known as "no-shuffle exchange" or "1v1 exchange".
276#[cfg(any())]
277fn rewrite_stream_node(
278    state: &mut BuildFragmentGraphState,
279    stream_node: StreamNode,
280    insert_exchange_flag: bool,
281) -> Result<StreamNode> {
282    let f = |child| {
283        // For stateful operators, set `exchange_flag = true`. If it's already true,
284        // force add an exchange.
285        if is_stateful_executor(&child) {
286            if insert_exchange_flag {
287                let child_node = rewrite_stream_node(state, child, true)?;
288
289                let strategy = DispatchStrategy {
290                    r#type: DispatcherType::NoShuffle.into(),
291                    dist_key_indices: vec![], // TODO: use distribution key
292                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
293                };
294                Ok(StreamNode {
295                    stream_key: child_node.stream_key.clone(),
296                    fields: child_node.fields.clone(),
297                    node_body: Some(NodeBody::Exchange(ExchangeNode {
298                        strategy: Some(strategy),
299                    })),
300                    operator_id: state.gen_operator_id(),
301                    append_only: child_node.append_only,
302                    input: vec![child_node],
303                    identity: "Exchange (NoShuffle)".to_string(),
304                })
305            } else {
306                rewrite_stream_node(state, child, true)
307            }
308        } else {
309            match child.get_node_body()? {
310                // For exchanges, reset the flag.
311                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
312                // Otherwise, recursively visit the children.
313                _ => rewrite_stream_node(state, child, insert_exchange_flag),
314            }
315        }
316    };
317    Ok(StreamNode {
318        input: stream_node
319            .input
320            .into_iter()
321            .map(f)
322            .collect::<Result<_>>()?,
323        ..stream_node
324    })
325}
326
327/// Generate fragment DAG from input streaming plan by their dependency.
328fn generate_fragment_graph(
329    state: &mut BuildFragmentGraphState,
330    stream_node: StreamNode,
331) -> Result<()> {
332    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
333    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
334    // #4614
335    #[cfg(any())]
336    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
337
338    build_and_add_fragment(state, stream_node)?;
339    Ok(())
340}
341
342/// Use the given `stream_node` to create a fragment and add it to graph.
343fn build_and_add_fragment(
344    state: &mut BuildFragmentGraphState,
345    stream_node: StreamNode,
346) -> Result<Rc<StreamFragment>> {
347    let operator_id = stream_node.operator_id;
348    match state.share_mapping.get(&operator_id) {
349        None => {
350            let mut fragment = state.new_stream_fragment();
351            let node = build_fragment(state, &mut fragment, stream_node)?;
352
353            // It's possible that the stream node is rewritten while building the fragment, for
354            // example, empty fragment to no-op fragment. We get the operator id again instead of
355            // using the original one.
356            let operator_id = node.operator_id;
357
358            assert!(fragment.node.is_none());
359            fragment.node = Some(Box::new(node));
360            let fragment_ref = Rc::new(fragment);
361
362            state.fragment_graph.add_fragment(fragment_ref.clone());
363            state
364                .share_mapping
365                .insert(operator_id, fragment_ref.fragment_id);
366            Ok(fragment_ref)
367        }
368        Some(fragment_id) => Ok(state
369            .fragment_graph
370            .get_fragment(fragment_id)
371            .unwrap()
372            .clone()),
373    }
374}
375
376/// Build new fragment and link dependencies by visiting children recursively, update
377/// `requires_singleton` and `fragment_type` properties for current fragment.
378fn build_fragment(
379    state: &mut BuildFragmentGraphState,
380    current_fragment: &mut StreamFragment,
381    mut stream_node: StreamNode,
382) -> Result<StreamNode> {
383    recursive::tracker!().recurse(|_t| {
384        // Update current fragment based on the node we're visiting.
385        match stream_node.get_node_body()? {
386            NodeBody::BarrierRecv(_) => current_fragment
387                .fragment_type_mask
388                .add(FragmentTypeFlag::BarrierRecv),
389
390            NodeBody::Source(node) => {
391                current_fragment
392                    .fragment_type_mask
393                    .add(FragmentTypeFlag::Source);
394
395                if let Some(source) = node.source_inner.as_ref()
396                    && let Some(source_info) = source.info.as_ref()
397                    && ((source_info.is_shared() && !source_info.is_distributed)
398                        || source.with_properties.requires_singleton())
399                {
400                    current_fragment.requires_singleton = true;
401                }
402            }
403
404            NodeBody::Dml(_) => {
405                current_fragment
406                    .fragment_type_mask
407                    .add(FragmentTypeFlag::Dml);
408            }
409
410            NodeBody::Materialize(_) => {
411                current_fragment
412                    .fragment_type_mask
413                    .add(FragmentTypeFlag::Mview);
414            }
415
416            NodeBody::Sink(_) => current_fragment
417                .fragment_type_mask
418                .add(FragmentTypeFlag::Sink),
419
420            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
421
422            NodeBody::StreamScan(node) => {
423                current_fragment
424                    .fragment_type_mask
425                    .add(FragmentTypeFlag::StreamScan);
426                match node.stream_scan_type() {
427                    StreamScanType::SnapshotBackfill => {
428                        current_fragment
429                            .fragment_type_mask
430                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
431                        state.has_snapshot_backfill = true;
432                        state.has_any_backfill = true;
433                    }
434                    StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
435                        state.has_any_backfill = true;
436                    }
437                    StreamScanType::CrossDbSnapshotBackfill => {
438                        current_fragment
439                            .fragment_type_mask
440                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
441                        state.has_cross_db_snapshot_backfill = true;
442                        state.has_any_backfill = true;
443                    }
444                    StreamScanType::Unspecified
445                    | StreamScanType::Chain
446                    | StreamScanType::Rearrange
447                    | StreamScanType::UpstreamOnly => {}
448                }
449                // memorize table id for later use
450                // The table id could be a upstream CDC source
451                state.dependent_table_ids.insert(node.table_id);
452            }
453
454            NodeBody::StreamCdcScan(node) => {
455                if let Some(o) = node.options
456                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
457                {
458                    // Use parallel CDC backfill.
459                    current_fragment
460                        .fragment_type_mask
461                        .add(FragmentTypeFlag::StreamCdcScan);
462                } else {
463                    current_fragment
464                        .fragment_type_mask
465                        .add(FragmentTypeFlag::StreamScan);
466                    // the backfill algorithm is not parallel safe
467                    current_fragment.requires_singleton = true;
468                }
469                state.has_source_backfill = true;
470                state.has_any_backfill = true;
471            }
472
473            NodeBody::CdcFilter(node) => {
474                current_fragment
475                    .fragment_type_mask
476                    .add(FragmentTypeFlag::CdcFilter);
477                // memorize upstream source id for later use
478                state
479                    .dependent_table_ids
480                    .insert(node.upstream_source_id.as_cdc_table_id());
481            }
482            NodeBody::SourceBackfill(node) => {
483                current_fragment
484                    .fragment_type_mask
485                    .add(FragmentTypeFlag::SourceScan);
486                // memorize upstream source id for later use
487                let source_id = node.upstream_source_id;
488                state
489                    .dependent_table_ids
490                    .insert(source_id.as_cdc_table_id());
491                state.has_source_backfill = true;
492                state.has_any_backfill = true;
493            }
494
495            NodeBody::Now(_) => {
496                // TODO: Remove this and insert a `BarrierRecv` instead.
497                current_fragment
498                    .fragment_type_mask
499                    .add(FragmentTypeFlag::Now);
500                current_fragment.requires_singleton = true;
501            }
502
503            NodeBody::Values(_) => {
504                current_fragment
505                    .fragment_type_mask
506                    .add(FragmentTypeFlag::Values);
507                current_fragment.requires_singleton = true;
508            }
509
510            NodeBody::StreamFsFetch(_) => {
511                current_fragment
512                    .fragment_type_mask
513                    .add(FragmentTypeFlag::FsFetch);
514            }
515
516            NodeBody::VectorIndexWrite(_) => {
517                current_fragment
518                    .fragment_type_mask
519                    .add(FragmentTypeFlag::VectorIndexWrite);
520            }
521
522            NodeBody::UpstreamSinkUnion(_) => {
523                current_fragment
524                    .fragment_type_mask
525                    .add(FragmentTypeFlag::UpstreamSinkUnion);
526            }
527
528            NodeBody::LocalityProvider(_) => {
529                current_fragment
530                    .fragment_type_mask
531                    .add(FragmentTypeFlag::LocalityProvider);
532            }
533
534            _ => {}
535        };
536
537        // handle join logic
538        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
539        {
540            if delta_index_join.get_join_type()? == JoinType::Inner
541                && delta_index_join.condition.is_none()
542            {
543                return build_delta_join_without_arrange(state, current_fragment, stream_node);
544            } else {
545                panic!("only inner join without non-equal condition is supported for delta joins");
546            }
547        }
548
549        // Usually we do not expect exchange node to be visited here, which should be handled by the
550        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
551        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
552        // node to it, so that the meta service can handle it correctly.
553        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
554            stream_node = state.gen_no_op_stream_node(stream_node);
555        }
556
557        // Visit plan children.
558        stream_node.input = stream_node
559            .input
560            .into_iter()
561            .map(|mut child_node| {
562                match child_node.get_node_body()? {
563                    // When exchange node is generated when doing rewrites, it could be having
564                    // zero input. In this case, we won't recursively visit its children.
565                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
566                    // Exchange node indicates a new child fragment.
567                    NodeBody::Exchange(exchange_node) => {
568                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
569
570                        // Exchange node should have only one input.
571                        let [input]: [_; 1] =
572                            std::mem::take(&mut child_node.input).try_into().unwrap();
573                        let child_fragment = build_and_add_fragment(state, input)?;
574
575                        let result = state.fragment_graph.try_add_edge(
576                            child_fragment.fragment_id,
577                            current_fragment.fragment_id,
578                            StreamFragmentEdge {
579                                dispatch_strategy: exchange_node_strategy.clone(),
580                                // Always use the exchange operator id as the link id.
581                                link_id: child_node.operator_id.as_raw_id(),
582                            },
583                        );
584
585                        // It's possible that there're multiple edges between two fragments, while the
586                        // meta service and the compute node does not expect this. In this case, we
587                        // manually insert a fragment of `NoOp` between the two fragments.
588                        if result.is_err() {
589                            // Assign a new operator id for the `Exchange`, so we can distinguish it
590                            // from duplicate edges and break the sharing.
591                            child_node.operator_id = state.gen_operator_id();
592
593                            // Take the upstream plan node as the reference for properties of `NoOp`.
594                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
595                            let no_shuffle_strategy = DispatchStrategy {
596                                r#type: DispatcherType::NoShuffle as i32,
597                                dist_key_indices: vec![],
598                                output_mapping: PbDispatchOutputMapping::identical(
599                                    ref_fragment_node.fields.len(),
600                                )
601                                .into(),
602                            };
603
604                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
605
606                            let no_op_fragment = {
607                                let node = state.gen_no_op_stream_node(StreamNode {
608                                    operator_id: no_shuffle_exchange_operator_id,
609                                    identity: "StreamNoShuffleExchange".into(),
610                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
611                                        strategy: Some(no_shuffle_strategy.clone()),
612                                    }))),
613                                    input: vec![],
614
615                                    // Take reference's properties.
616                                    stream_key: ref_fragment_node.stream_key.clone(),
617                                    stream_kind: ref_fragment_node.stream_kind,
618                                    fields: ref_fragment_node.fields.clone(),
619                                });
620
621                                let mut fragment = state.new_stream_fragment();
622                                fragment.node = Some(node.into());
623                                Rc::new(fragment)
624                            };
625
626                            state.fragment_graph.add_fragment(no_op_fragment.clone());
627
628                            state.fragment_graph.add_edge(
629                                child_fragment.fragment_id,
630                                no_op_fragment.fragment_id,
631                                StreamFragmentEdge {
632                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
633                                    dispatch_strategy: no_shuffle_strategy,
634                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
635                                },
636                            );
637                            state.fragment_graph.add_edge(
638                                no_op_fragment.fragment_id,
639                                current_fragment.fragment_id,
640                                StreamFragmentEdge {
641                                    // Use the original exchange strategy for downstream edge.
642                                    dispatch_strategy: exchange_node_strategy,
643                                    link_id: child_node.operator_id.as_raw_id(),
644                                },
645                            );
646                        }
647
648                        Ok(child_node)
649                    }
650
651                    // For other children, visit recursively.
652                    _ => build_fragment(state, current_fragment, child_node),
653                }
654            })
655            .collect::<Result<_>>()?;
656        Ok(stream_node)
657    })
658}