risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::ConfigParallelism;
33use risingwave_connector::source::cdc::CdcScanOptions;
34use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
35use risingwave_pb::plan_common::JoinType;
36use risingwave_pb::stream_plan::{
37    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
38    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
39    StreamNode, StreamScanType,
40};
41
42use self::rewrite::build_delta_join_without_arrange;
43use crate::catalog::FragmentId;
44use crate::error::ErrorCode::NotSupported;
45use crate::error::{Result, RwError};
46use crate::optimizer::plan_node::generic::GenericPlanRef;
47use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
48use crate::stream_fragmenter::parallelism::derive_parallelism;
49
50/// The mutable state when building fragment graph.
51#[derive(Educe)]
52#[educe(Default)]
53pub struct BuildFragmentGraphState {
54    /// fragment graph field, transformed from input streaming plan.
55    fragment_graph: StreamFragmentGraph,
56    /// local fragment id
57    next_local_fragment_id: FragmentId,
58
59    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
60    /// node traversing.
61    next_table_id: u32,
62
63    /// rewrite will produce new operators, and we need to track next operator id
64    #[educe(Default(expression = u32::MAX - 1))]
65    next_operator_id: u32,
66
67    /// dependent streaming job ids.
68    dependent_table_ids: HashSet<TableId>,
69
70    /// operator id to `LocalFragmentId` mapping used by share operator.
71    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
72    /// operator id to `StreamNode` mapping used by share operator.
73    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
74
75    has_source_backfill: bool,
76    has_snapshot_backfill: bool,
77    has_cross_db_snapshot_backfill: bool,
78    has_any_backfill: bool,
79    tables: HashMap<TableId, Table>,
80}
81
82impl BuildFragmentGraphState {
83    /// Create a new stream fragment with given node with generating a fragment id.
84    fn new_stream_fragment(&mut self) -> StreamFragment {
85        let fragment = StreamFragment::new(self.next_local_fragment_id);
86        self.next_local_fragment_id += 1;
87        fragment
88    }
89
90    /// Generate an operator id
91    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
92        self.next_operator_id -= 1;
93        LocalOperatorId::new(self.next_operator_id).into()
94    }
95
96    /// Generate an table id
97    pub fn gen_table_id(&mut self) -> u32 {
98        let ret = self.next_table_id;
99        self.next_table_id += 1;
100        ret
101    }
102
103    /// Generate an table id
104    pub fn gen_table_id_wrapped(&mut self) -> TableId {
105        TableId::new(self.gen_table_id())
106    }
107
108    pub fn add_share_stream_node(
109        &mut self,
110        operator_id: StreamNodeLocalOperatorId,
111        stream_node: StreamNode,
112    ) {
113        self.share_stream_node_mapping
114            .insert(operator_id, stream_node);
115    }
116
117    pub fn get_share_stream_node(
118        &mut self,
119        operator_id: StreamNodeLocalOperatorId,
120    ) -> Option<&StreamNode> {
121        self.share_stream_node_mapping.get(&operator_id)
122    }
123
124    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
125    /// stream node will also be copied from the `input` node.
126    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
127        StreamNode {
128            operator_id: self.gen_operator_id(),
129            identity: "StreamNoOp".into(),
130            node_body: Some(NodeBody::NoOp(NoOpNode {})),
131
132            // Take input's properties.
133            stream_key: input.stream_key.clone(),
134            stream_kind: input.stream_kind,
135            fields: input.fields.clone(),
136
137            input: vec![input],
138        }
139    }
140}
141
142// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
143pub enum GraphJobType {
144    Table,
145    MaterializedView,
146    Source,
147    Sink,
148    Index,
149}
150
151impl GraphJobType {
152    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
153        match self {
154            GraphJobType::Table => config.streaming_parallelism_for_table(),
155            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
156            GraphJobType::Source => config.streaming_parallelism_for_source(),
157            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
158            GraphJobType::Index => config.streaming_parallelism_for_index(),
159        }
160    }
161}
162
163pub fn build_graph(
164    plan_node: PlanRef,
165    job_type: Option<GraphJobType>,
166) -> Result<StreamFragmentGraphProto> {
167    build_graph_with_strategy(plan_node, job_type, None)
168}
169
170pub fn build_graph_with_strategy(
171    plan_node: PlanRef,
172    job_type: Option<GraphJobType>,
173    backfill_order: Option<BackfillOrder>,
174) -> Result<StreamFragmentGraphProto> {
175    let ctx = plan_node.plan_base().ctx();
176    let plan_node = reorganize_elements_id(plan_node);
177
178    let mut state = BuildFragmentGraphState::default();
179    let stream_node = plan_node.to_stream_prost(&mut state)?;
180    generate_fragment_graph(&mut state, stream_node)?;
181    if state.has_source_backfill && state.has_snapshot_backfill {
182        return Err(RwError::from(NotSupported(
183            "Snapshot backfill with shared source backfill is not supported".to_owned(),
184            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
185                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
186                .to_owned(),
187        )));
188    }
189    if state.has_cross_db_snapshot_backfill
190        && let Some(ref backfill_order) = backfill_order
191        && !backfill_order.order.is_empty()
192    {
193        return Err(RwError::from(NotSupported(
194            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
195            "Please remove backfill order specification from your query".to_owned(),
196        )));
197    }
198
199    let mut fragment_graph = state.fragment_graph.to_protobuf();
200
201    // Set table ids.
202    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
203    fragment_graph.table_ids_cnt = state.next_table_id;
204
205    // Set parallelism and vnode count.
206    {
207        let config = ctx.session_ctx().config();
208        let streaming_parallelism = config.streaming_parallelism();
209        let normal_parallelism = derive_parallelism(
210            job_type.map(|t| t.to_parallelism(config.deref())),
211            streaming_parallelism,
212        );
213        let backfill_parallelism = if state.has_any_backfill {
214            match config.streaming_parallelism_for_backfill() {
215                ConfigParallelism::Default => None,
216                override_parallelism => {
217                    derive_parallelism(Some(override_parallelism), streaming_parallelism)
218                        .or(normal_parallelism)
219                }
220            }
221        } else {
222            None
223        };
224        fragment_graph.parallelism = normal_parallelism;
225        fragment_graph.backfill_parallelism = backfill_parallelism;
226        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
227    }
228
229    // Set context for this streaming job.
230    let config_override = ctx
231        .session_ctx()
232        .config()
233        .to_initial_streaming_config_override()
234        .context("invalid initial streaming config override")?;
235    fragment_graph.ctx = Some(StreamContext {
236        timezone: ctx.get_session_timezone(),
237        config_override,
238    });
239
240    fragment_graph.backfill_order = backfill_order;
241
242    Ok(fragment_graph)
243}
244
245#[cfg(any())]
246fn is_stateful_executor(stream_node: &StreamNode) -> bool {
247    matches!(
248        stream_node.get_node_body().unwrap(),
249        NodeBody::HashAgg(_)
250            | NodeBody::HashJoin(_)
251            | NodeBody::DeltaIndexJoin(_)
252            | NodeBody::StreamScan(_)
253            | NodeBody::StreamCdcScan(_)
254            | NodeBody::DynamicFilter(_)
255    )
256}
257
258/// Do some dirty rewrites before building the fragments.
259/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
260/// throughput) into multiple fragments, which may help improve the I/O concurrency.
261/// Known as "no-shuffle exchange" or "1v1 exchange".
262#[cfg(any())]
263fn rewrite_stream_node(
264    state: &mut BuildFragmentGraphState,
265    stream_node: StreamNode,
266    insert_exchange_flag: bool,
267) -> Result<StreamNode> {
268    let f = |child| {
269        // For stateful operators, set `exchange_flag = true`. If it's already true,
270        // force add an exchange.
271        if is_stateful_executor(&child) {
272            if insert_exchange_flag {
273                let child_node = rewrite_stream_node(state, child, true)?;
274
275                let strategy = DispatchStrategy {
276                    r#type: DispatcherType::NoShuffle.into(),
277                    dist_key_indices: vec![], // TODO: use distribution key
278                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
279                };
280                Ok(StreamNode {
281                    stream_key: child_node.stream_key.clone(),
282                    fields: child_node.fields.clone(),
283                    node_body: Some(NodeBody::Exchange(ExchangeNode {
284                        strategy: Some(strategy),
285                    })),
286                    operator_id: state.gen_operator_id(),
287                    append_only: child_node.append_only,
288                    input: vec![child_node],
289                    identity: "Exchange (NoShuffle)".to_string(),
290                })
291            } else {
292                rewrite_stream_node(state, child, true)
293            }
294        } else {
295            match child.get_node_body()? {
296                // For exchanges, reset the flag.
297                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
298                // Otherwise, recursively visit the children.
299                _ => rewrite_stream_node(state, child, insert_exchange_flag),
300            }
301        }
302    };
303    Ok(StreamNode {
304        input: stream_node
305            .input
306            .into_iter()
307            .map(f)
308            .collect::<Result<_>>()?,
309        ..stream_node
310    })
311}
312
313/// Generate fragment DAG from input streaming plan by their dependency.
314fn generate_fragment_graph(
315    state: &mut BuildFragmentGraphState,
316    stream_node: StreamNode,
317) -> Result<()> {
318    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
319    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
320    // #4614
321    #[cfg(any())]
322    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
323
324    build_and_add_fragment(state, stream_node)?;
325    Ok(())
326}
327
328/// Use the given `stream_node` to create a fragment and add it to graph.
329fn build_and_add_fragment(
330    state: &mut BuildFragmentGraphState,
331    stream_node: StreamNode,
332) -> Result<Rc<StreamFragment>> {
333    let operator_id = stream_node.operator_id;
334    match state.share_mapping.get(&operator_id) {
335        None => {
336            let mut fragment = state.new_stream_fragment();
337            let node = build_fragment(state, &mut fragment, stream_node)?;
338
339            // It's possible that the stream node is rewritten while building the fragment, for
340            // example, empty fragment to no-op fragment. We get the operator id again instead of
341            // using the original one.
342            let operator_id = node.operator_id;
343
344            assert!(fragment.node.is_none());
345            fragment.node = Some(Box::new(node));
346            let fragment_ref = Rc::new(fragment);
347
348            state.fragment_graph.add_fragment(fragment_ref.clone());
349            state
350                .share_mapping
351                .insert(operator_id, fragment_ref.fragment_id);
352            Ok(fragment_ref)
353        }
354        Some(fragment_id) => Ok(state
355            .fragment_graph
356            .get_fragment(fragment_id)
357            .unwrap()
358            .clone()),
359    }
360}
361
362/// Build new fragment and link dependencies by visiting children recursively, update
363/// `requires_singleton` and `fragment_type` properties for current fragment.
364fn build_fragment(
365    state: &mut BuildFragmentGraphState,
366    current_fragment: &mut StreamFragment,
367    mut stream_node: StreamNode,
368) -> Result<StreamNode> {
369    recursive::tracker!().recurse(|_t| {
370        // Update current fragment based on the node we're visiting.
371        match stream_node.get_node_body()? {
372            NodeBody::BarrierRecv(_) => current_fragment
373                .fragment_type_mask
374                .add(FragmentTypeFlag::BarrierRecv),
375
376            NodeBody::Source(node) => {
377                current_fragment
378                    .fragment_type_mask
379                    .add(FragmentTypeFlag::Source);
380
381                if let Some(source) = node.source_inner.as_ref()
382                    && let Some(source_info) = source.info.as_ref()
383                    && ((source_info.is_shared() && !source_info.is_distributed)
384                        || source.with_properties.requires_singleton())
385                {
386                    current_fragment.requires_singleton = true;
387                }
388            }
389
390            NodeBody::Dml(_) => {
391                current_fragment
392                    .fragment_type_mask
393                    .add(FragmentTypeFlag::Dml);
394            }
395
396            NodeBody::Materialize(_) => {
397                current_fragment
398                    .fragment_type_mask
399                    .add(FragmentTypeFlag::Mview);
400            }
401
402            NodeBody::Sink(_) => current_fragment
403                .fragment_type_mask
404                .add(FragmentTypeFlag::Sink),
405
406            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
407
408            NodeBody::EowcGapFill(node) => {
409                let table = node.buffer_table.as_ref().unwrap().clone();
410                state.tables.insert(table.id, table);
411                let table = node.prev_row_table.as_ref().unwrap().clone();
412                state.tables.insert(table.id, table);
413            }
414
415            NodeBody::GapFill(node) => {
416                let table = node.state_table.as_ref().unwrap().clone();
417                state.tables.insert(table.id, table);
418            }
419
420            NodeBody::StreamScan(node) => {
421                current_fragment
422                    .fragment_type_mask
423                    .add(FragmentTypeFlag::StreamScan);
424                match node.stream_scan_type() {
425                    StreamScanType::SnapshotBackfill => {
426                        current_fragment
427                            .fragment_type_mask
428                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
429                        state.has_snapshot_backfill = true;
430                        state.has_any_backfill = true;
431                    }
432                    StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
433                        state.has_any_backfill = true;
434                    }
435                    StreamScanType::CrossDbSnapshotBackfill => {
436                        current_fragment
437                            .fragment_type_mask
438                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
439                        state.has_cross_db_snapshot_backfill = true;
440                        state.has_any_backfill = true;
441                    }
442                    StreamScanType::Unspecified
443                    | StreamScanType::Chain
444                    | StreamScanType::Rearrange
445                    | StreamScanType::UpstreamOnly => {}
446                }
447                // memorize table id for later use
448                // The table id could be a upstream CDC source
449                state.dependent_table_ids.insert(node.table_id);
450
451                // Add state table if present
452                if let Some(state_table) = &node.state_table {
453                    let table = state_table.clone();
454                    state.tables.insert(table.id, table);
455                }
456            }
457
458            NodeBody::StreamCdcScan(node) => {
459                if let Some(o) = node.options
460                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
461                {
462                    // Use parallel CDC backfill.
463                    current_fragment
464                        .fragment_type_mask
465                        .add(FragmentTypeFlag::StreamCdcScan);
466                } else {
467                    current_fragment
468                        .fragment_type_mask
469                        .add(FragmentTypeFlag::StreamScan);
470                    // the backfill algorithm is not parallel safe
471                    current_fragment.requires_singleton = true;
472                }
473                state.has_source_backfill = true;
474                state.has_any_backfill = true;
475            }
476
477            NodeBody::CdcFilter(node) => {
478                current_fragment
479                    .fragment_type_mask
480                    .add(FragmentTypeFlag::CdcFilter);
481                // memorize upstream source id for later use
482                state
483                    .dependent_table_ids
484                    .insert(node.upstream_source_id.as_cdc_table_id());
485            }
486            NodeBody::SourceBackfill(node) => {
487                current_fragment
488                    .fragment_type_mask
489                    .add(FragmentTypeFlag::SourceScan);
490                // memorize upstream source id for later use
491                let source_id = node.upstream_source_id;
492                state
493                    .dependent_table_ids
494                    .insert(source_id.as_cdc_table_id());
495                state.has_source_backfill = true;
496                state.has_any_backfill = true;
497            }
498
499            NodeBody::Now(_) => {
500                // TODO: Remove this and insert a `BarrierRecv` instead.
501                current_fragment
502                    .fragment_type_mask
503                    .add(FragmentTypeFlag::Now);
504                current_fragment.requires_singleton = true;
505            }
506
507            NodeBody::Values(_) => {
508                current_fragment
509                    .fragment_type_mask
510                    .add(FragmentTypeFlag::Values);
511                current_fragment.requires_singleton = true;
512            }
513
514            NodeBody::StreamFsFetch(_) => {
515                current_fragment
516                    .fragment_type_mask
517                    .add(FragmentTypeFlag::FsFetch);
518            }
519
520            NodeBody::VectorIndexWrite(_) => {
521                current_fragment
522                    .fragment_type_mask
523                    .add(FragmentTypeFlag::VectorIndexWrite);
524            }
525
526            NodeBody::UpstreamSinkUnion(_) => {
527                current_fragment
528                    .fragment_type_mask
529                    .add(FragmentTypeFlag::UpstreamSinkUnion);
530            }
531
532            NodeBody::LocalityProvider(_) => {
533                current_fragment
534                    .fragment_type_mask
535                    .add(FragmentTypeFlag::LocalityProvider);
536            }
537
538            _ => {}
539        };
540
541        // handle join logic
542        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
543        {
544            if delta_index_join.get_join_type()? == JoinType::Inner
545                && delta_index_join.condition.is_none()
546            {
547                return build_delta_join_without_arrange(state, current_fragment, stream_node);
548            } else {
549                panic!("only inner join without non-equal condition is supported for delta joins");
550            }
551        }
552
553        // Usually we do not expect exchange node to be visited here, which should be handled by the
554        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
555        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
556        // node to it, so that the meta service can handle it correctly.
557        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
558            stream_node = state.gen_no_op_stream_node(stream_node);
559        }
560
561        // Visit plan children.
562        stream_node.input = stream_node
563            .input
564            .into_iter()
565            .map(|mut child_node| {
566                match child_node.get_node_body()? {
567                    // When exchange node is generated when doing rewrites, it could be having
568                    // zero input. In this case, we won't recursively visit its children.
569                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
570                    // Exchange node indicates a new child fragment.
571                    NodeBody::Exchange(exchange_node) => {
572                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
573
574                        // Exchange node should have only one input.
575                        let [input]: [_; 1] =
576                            std::mem::take(&mut child_node.input).try_into().unwrap();
577                        let child_fragment = build_and_add_fragment(state, input)?;
578
579                        let result = state.fragment_graph.try_add_edge(
580                            child_fragment.fragment_id,
581                            current_fragment.fragment_id,
582                            StreamFragmentEdge {
583                                dispatch_strategy: exchange_node_strategy.clone(),
584                                // Always use the exchange operator id as the link id.
585                                link_id: child_node.operator_id.as_raw_id(),
586                            },
587                        );
588
589                        // It's possible that there're multiple edges between two fragments, while the
590                        // meta service and the compute node does not expect this. In this case, we
591                        // manually insert a fragment of `NoOp` between the two fragments.
592                        if result.is_err() {
593                            // Assign a new operator id for the `Exchange`, so we can distinguish it
594                            // from duplicate edges and break the sharing.
595                            child_node.operator_id = state.gen_operator_id();
596
597                            // Take the upstream plan node as the reference for properties of `NoOp`.
598                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
599                            let no_shuffle_strategy = DispatchStrategy {
600                                r#type: DispatcherType::NoShuffle as i32,
601                                dist_key_indices: vec![],
602                                output_mapping: PbDispatchOutputMapping::identical(
603                                    ref_fragment_node.fields.len(),
604                                )
605                                .into(),
606                            };
607
608                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
609
610                            let no_op_fragment = {
611                                let node = state.gen_no_op_stream_node(StreamNode {
612                                    operator_id: no_shuffle_exchange_operator_id,
613                                    identity: "StreamNoShuffleExchange".into(),
614                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
615                                        strategy: Some(no_shuffle_strategy.clone()),
616                                    }))),
617                                    input: vec![],
618
619                                    // Take reference's properties.
620                                    stream_key: ref_fragment_node.stream_key.clone(),
621                                    stream_kind: ref_fragment_node.stream_kind,
622                                    fields: ref_fragment_node.fields.clone(),
623                                });
624
625                                let mut fragment = state.new_stream_fragment();
626                                fragment.node = Some(node.into());
627                                Rc::new(fragment)
628                            };
629
630                            state.fragment_graph.add_fragment(no_op_fragment.clone());
631
632                            state.fragment_graph.add_edge(
633                                child_fragment.fragment_id,
634                                no_op_fragment.fragment_id,
635                                StreamFragmentEdge {
636                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
637                                    dispatch_strategy: no_shuffle_strategy,
638                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
639                                },
640                            );
641                            state.fragment_graph.add_edge(
642                                no_op_fragment.fragment_id,
643                                current_fragment.fragment_id,
644                                StreamFragmentEdge {
645                                    // Use the original exchange strategy for downstream edge.
646                                    dispatch_strategy: exchange_node_strategy,
647                                    link_id: child_node.operator_id.as_raw_id(),
648                                },
649                            );
650                        }
651
652                        Ok(child_node)
653                    }
654
655                    // For other children, visit recursively.
656                    _ => build_fragment(state, current_fragment, child_node),
657                }
658            })
659            .collect::<Result<_>>()?;
660        Ok(stream_node)
661    })
662}