risingwave_frontend/stream_fragmenter/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod graph;
16use anyhow::Context;
17use graph::*;
18use risingwave_common::util::recursive::{self, Recurse as _};
19use risingwave_connector::WithPropertiesExt;
20use risingwave_pb::catalog::Table;
21use risingwave_pb::stream_plan::stream_node::NodeBody;
22mod parallelism;
23mod rewrite;
24
25use std::collections::{HashMap, HashSet};
26use std::ops::Deref;
27use std::rc::Rc;
28
29use educe::Educe;
30use risingwave_common::catalog::{FragmentTypeFlag, TableId};
31use risingwave_common::session_config::SessionConfig;
32use risingwave_common::session_config::parallelism::{
33    ConfigAdaptiveParallelismStrategy, ConfigParallelism,
34};
35use risingwave_common::system_param::AdaptiveParallelismStrategy;
36use risingwave_connector::source::cdc::CdcScanOptions;
37use risingwave_pb::id::{LocalOperatorId, StreamNodeLocalOperatorId};
38use risingwave_pb::plan_common::JoinType;
39use risingwave_pb::stream_plan::{
40    BackfillOrder, DispatchStrategy, DispatcherType, ExchangeNode, NoOpNode,
41    PbDispatchOutputMapping, StreamContext, StreamFragmentGraph as StreamFragmentGraphProto,
42    StreamNode, StreamScanType,
43};
44
45use self::rewrite::build_delta_join_without_arrange;
46use crate::catalog::FragmentId;
47use crate::error::ErrorCode::NotSupported;
48use crate::error::{Result, RwError};
49use crate::optimizer::plan_node::generic::GenericPlanRef;
50use crate::optimizer::plan_node::{StreamPlanRef as PlanRef, reorganize_elements_id};
51use crate::stream_fragmenter::parallelism::{derive_parallelism, derive_parallelism_strategy};
52
53/// The mutable state when building fragment graph.
54#[derive(Educe)]
55#[educe(Default)]
56pub struct BuildFragmentGraphState {
57    /// fragment graph field, transformed from input streaming plan.
58    fragment_graph: StreamFragmentGraph,
59    /// local fragment id
60    next_local_fragment_id: FragmentId,
61
62    /// Next local table id to be allocated. It equals to total table ids cnt when finish stream
63    /// node traversing.
64    next_table_id: u32,
65
66    /// rewrite will produce new operators, and we need to track next operator id
67    #[educe(Default(expression = u32::MAX - 1))]
68    next_operator_id: u32,
69
70    /// dependent streaming job ids.
71    dependent_table_ids: HashSet<TableId>,
72
73    /// operator id to `LocalFragmentId` mapping used by share operator.
74    share_mapping: HashMap<StreamNodeLocalOperatorId, LocalFragmentId>,
75    /// operator id to `StreamNode` mapping used by share operator.
76    share_stream_node_mapping: HashMap<StreamNodeLocalOperatorId, StreamNode>,
77
78    has_source_backfill: bool,
79    has_snapshot_backfill: bool,
80    has_cross_db_snapshot_backfill: bool,
81    has_any_backfill: bool,
82    tables: HashMap<TableId, Table>,
83}
84
85impl BuildFragmentGraphState {
86    /// Create a new stream fragment with given node with generating a fragment id.
87    fn new_stream_fragment(&mut self) -> StreamFragment {
88        let fragment = StreamFragment::new(self.next_local_fragment_id);
89        self.next_local_fragment_id += 1;
90        fragment
91    }
92
93    /// Generate an operator id
94    fn gen_operator_id(&mut self) -> StreamNodeLocalOperatorId {
95        self.next_operator_id -= 1;
96        LocalOperatorId::new(self.next_operator_id).into()
97    }
98
99    /// Generate an table id
100    pub fn gen_table_id(&mut self) -> u32 {
101        let ret = self.next_table_id;
102        self.next_table_id += 1;
103        ret
104    }
105
106    /// Generate an table id
107    pub fn gen_table_id_wrapped(&mut self) -> TableId {
108        TableId::new(self.gen_table_id())
109    }
110
111    pub fn add_share_stream_node(
112        &mut self,
113        operator_id: StreamNodeLocalOperatorId,
114        stream_node: StreamNode,
115    ) {
116        self.share_stream_node_mapping
117            .insert(operator_id, stream_node);
118    }
119
120    pub fn get_share_stream_node(
121        &mut self,
122        operator_id: StreamNodeLocalOperatorId,
123    ) -> Option<&StreamNode> {
124        self.share_stream_node_mapping.get(&operator_id)
125    }
126
127    /// Generate a new stream node with `NoOp` body and the given `input`. The properties of the
128    /// stream node will also be copied from the `input` node.
129    pub fn gen_no_op_stream_node(&mut self, input: StreamNode) -> StreamNode {
130        StreamNode {
131            operator_id: self.gen_operator_id(),
132            identity: "StreamNoOp".into(),
133            node_body: Some(NodeBody::NoOp(NoOpNode {})),
134
135            // Take input's properties.
136            stream_key: input.stream_key.clone(),
137            stream_kind: input.stream_kind,
138            fields: input.fields.clone(),
139
140            input: vec![input],
141        }
142    }
143}
144
145// The type of streaming job. It is used to determine the parallelism of the job during `build_graph`.
146pub enum GraphJobType {
147    Table,
148    MaterializedView,
149    Source,
150    Sink,
151    Index,
152}
153
154impl GraphJobType {
155    pub fn to_parallelism(&self, config: &SessionConfig) -> ConfigParallelism {
156        match self {
157            GraphJobType::Table => config.streaming_parallelism_for_table(),
158            GraphJobType::MaterializedView => config.streaming_parallelism_for_materialized_view(),
159            GraphJobType::Source => config.streaming_parallelism_for_source(),
160            GraphJobType::Sink => config.streaming_parallelism_for_sink(),
161            GraphJobType::Index => config.streaming_parallelism_for_index(),
162        }
163    }
164
165    pub fn to_parallelism_strategy(
166        &self,
167        config: &SessionConfig,
168    ) -> ConfigAdaptiveParallelismStrategy {
169        match self {
170            GraphJobType::Table => config.streaming_parallelism_strategy_for_table(),
171            GraphJobType::MaterializedView => {
172                config.streaming_parallelism_strategy_for_materialized_view()
173            }
174            GraphJobType::Source => config.streaming_parallelism_strategy_for_source(),
175            GraphJobType::Sink => config.streaming_parallelism_strategy_for_sink(),
176            GraphJobType::Index => config.streaming_parallelism_strategy_for_index(),
177        }
178    }
179}
180
181pub fn build_graph(
182    plan_node: PlanRef,
183    job_type: Option<GraphJobType>,
184) -> Result<StreamFragmentGraphProto> {
185    build_graph_with_strategy(plan_node, job_type, None)
186}
187
188pub fn build_graph_with_strategy(
189    plan_node: PlanRef,
190    job_type: Option<GraphJobType>,
191    backfill_order: Option<BackfillOrder>,
192) -> Result<StreamFragmentGraphProto> {
193    let ctx = plan_node.plan_base().ctx();
194    let plan_node = reorganize_elements_id(plan_node);
195
196    let mut state = BuildFragmentGraphState::default();
197    let stream_node = plan_node.to_stream_prost(&mut state)?;
198    generate_fragment_graph(&mut state, stream_node)?;
199    if state.has_source_backfill && state.has_snapshot_backfill {
200        return Err(RwError::from(NotSupported(
201            "Snapshot backfill with shared source backfill is not supported".to_owned(),
202            "`SET streaming_use_shared_source = false` to disable shared source backfill, or \
203                    `SET streaming_use_snapshot_backfill = false` to disable snapshot backfill"
204                .to_owned(),
205        )));
206    }
207    if state.has_cross_db_snapshot_backfill
208        && let Some(ref backfill_order) = backfill_order
209        && !backfill_order.order.is_empty()
210    {
211        return Err(RwError::from(NotSupported(
212            "Backfill order control with cross-db snapshot backfill is not supported".to_owned(),
213            "Please remove backfill order specification from your query".to_owned(),
214        )));
215    }
216
217    let mut fragment_graph = state.fragment_graph.to_protobuf();
218
219    // Set table ids.
220    fragment_graph.dependent_table_ids = state.dependent_table_ids.into_iter().collect();
221    fragment_graph.table_ids_cnt = state.next_table_id;
222
223    // Set parallelism and vnode count.
224    let parallelism_strategy = {
225        let config = ctx.session_ctx().config();
226        let streaming_parallelism = config.streaming_parallelism();
227        let job_parallelism = job_type.as_ref().map(|t| t.to_parallelism(config.deref()));
228        let normal_parallelism = derive_parallelism(job_parallelism, streaming_parallelism);
229        let backfill_parallelism = if state.has_any_backfill {
230            match config.streaming_parallelism_for_backfill() {
231                ConfigParallelism::Default => None,
232                override_parallelism => {
233                    derive_parallelism(Some(override_parallelism), streaming_parallelism)
234                        .or(normal_parallelism)
235                }
236            }
237        } else {
238            None
239        };
240        fragment_graph.parallelism = normal_parallelism;
241        fragment_graph.backfill_parallelism = backfill_parallelism;
242        fragment_graph.max_parallelism = config.streaming_max_parallelism() as _;
243
244        let job_strategy = job_type
245            .as_ref()
246            .map(|t| t.to_parallelism_strategy(config.deref()));
247        derive_parallelism_strategy(job_strategy, config.streaming_parallelism_strategy())
248    };
249
250    // Set context for this streaming job.
251    let config_override = ctx
252        .session_ctx()
253        .config()
254        .to_initial_streaming_config_override()
255        .context("invalid initial streaming config override")?;
256    let adaptive_parallelism_strategy = parallelism_strategy
257        .as_ref()
258        .map(AdaptiveParallelismStrategy::to_string)
259        .unwrap_or_default();
260    fragment_graph.ctx = Some(StreamContext {
261        timezone: ctx.get_session_timezone(),
262        config_override,
263        adaptive_parallelism_strategy,
264    });
265
266    fragment_graph.backfill_order = backfill_order;
267
268    Ok(fragment_graph)
269}
270
271#[cfg(any())]
272fn is_stateful_executor(stream_node: &StreamNode) -> bool {
273    matches!(
274        stream_node.get_node_body().unwrap(),
275        NodeBody::HashAgg(_)
276            | NodeBody::HashJoin(_)
277            | NodeBody::DeltaIndexJoin(_)
278            | NodeBody::StreamScan(_)
279            | NodeBody::StreamCdcScan(_)
280            | NodeBody::DynamicFilter(_)
281    )
282}
283
284/// Do some dirty rewrites before building the fragments.
285/// Currently, it will split the fragment with multiple stateful operators (those have high I/O
286/// throughput) into multiple fragments, which may help improve the I/O concurrency.
287/// Known as "no-shuffle exchange" or "1v1 exchange".
288#[cfg(any())]
289fn rewrite_stream_node(
290    state: &mut BuildFragmentGraphState,
291    stream_node: StreamNode,
292    insert_exchange_flag: bool,
293) -> Result<StreamNode> {
294    let f = |child| {
295        // For stateful operators, set `exchange_flag = true`. If it's already true,
296        // force add an exchange.
297        if is_stateful_executor(&child) {
298            if insert_exchange_flag {
299                let child_node = rewrite_stream_node(state, child, true)?;
300
301                let strategy = DispatchStrategy {
302                    r#type: DispatcherType::NoShuffle.into(),
303                    dist_key_indices: vec![], // TODO: use distribution key
304                    output_indices: (0..(child_node.fields.len() as u32)).collect(),
305                };
306                Ok(StreamNode {
307                    stream_key: child_node.stream_key.clone(),
308                    fields: child_node.fields.clone(),
309                    node_body: Some(NodeBody::Exchange(ExchangeNode {
310                        strategy: Some(strategy),
311                    })),
312                    operator_id: state.gen_operator_id(),
313                    append_only: child_node.append_only,
314                    input: vec![child_node],
315                    identity: "Exchange (NoShuffle)".to_string(),
316                })
317            } else {
318                rewrite_stream_node(state, child, true)
319            }
320        } else {
321            match child.get_node_body()? {
322                // For exchanges, reset the flag.
323                NodeBody::Exchange(_) => rewrite_stream_node(state, child, false),
324                // Otherwise, recursively visit the children.
325                _ => rewrite_stream_node(state, child, insert_exchange_flag),
326            }
327        }
328    };
329    Ok(StreamNode {
330        input: stream_node
331            .input
332            .into_iter()
333            .map(f)
334            .collect::<Result<_>>()?,
335        ..stream_node
336    })
337}
338
339/// Generate fragment DAG from input streaming plan by their dependency.
340fn generate_fragment_graph(
341    state: &mut BuildFragmentGraphState,
342    stream_node: StreamNode,
343) -> Result<()> {
344    // TODO: the 1v1 exchange is disabled for now, as it breaks the assumption of independent
345    // scaling of fragments. We may introduce further optimization transparently to the fragmenter.
346    // #4614
347    #[cfg(any())]
348    let stream_node = rewrite_stream_node(state, stream_node, is_stateful_executor(&stream_node))?;
349
350    build_and_add_fragment(state, stream_node)?;
351    Ok(())
352}
353
354/// Use the given `stream_node` to create a fragment and add it to graph.
355fn build_and_add_fragment(
356    state: &mut BuildFragmentGraphState,
357    stream_node: StreamNode,
358) -> Result<Rc<StreamFragment>> {
359    let operator_id = stream_node.operator_id;
360    match state.share_mapping.get(&operator_id) {
361        None => {
362            let mut fragment = state.new_stream_fragment();
363            let node = build_fragment(state, &mut fragment, stream_node)?;
364
365            // It's possible that the stream node is rewritten while building the fragment, for
366            // example, empty fragment to no-op fragment. We get the operator id again instead of
367            // using the original one.
368            let operator_id = node.operator_id;
369
370            assert!(fragment.node.is_none());
371            fragment.node = Some(Box::new(node));
372            let fragment_ref = Rc::new(fragment);
373
374            state.fragment_graph.add_fragment(fragment_ref.clone());
375            state
376                .share_mapping
377                .insert(operator_id, fragment_ref.fragment_id);
378            Ok(fragment_ref)
379        }
380        Some(fragment_id) => Ok(state
381            .fragment_graph
382            .get_fragment(fragment_id)
383            .unwrap()
384            .clone()),
385    }
386}
387
388/// Build new fragment and link dependencies by visiting children recursively, update
389/// `requires_singleton` and `fragment_type` properties for current fragment.
390fn build_fragment(
391    state: &mut BuildFragmentGraphState,
392    current_fragment: &mut StreamFragment,
393    mut stream_node: StreamNode,
394) -> Result<StreamNode> {
395    recursive::tracker!().recurse(|_t| {
396        // Update current fragment based on the node we're visiting.
397        match stream_node.get_node_body()? {
398            NodeBody::BarrierRecv(_) => current_fragment
399                .fragment_type_mask
400                .add(FragmentTypeFlag::BarrierRecv),
401
402            NodeBody::Source(node) => {
403                current_fragment
404                    .fragment_type_mask
405                    .add(FragmentTypeFlag::Source);
406
407                if let Some(source) = node.source_inner.as_ref()
408                    && let Some(source_info) = source.info.as_ref()
409                    && ((source_info.is_shared() && !source_info.is_distributed)
410                        || source.with_properties.requires_singleton())
411                {
412                    current_fragment.requires_singleton = true;
413                }
414            }
415
416            NodeBody::Dml(_) => {
417                current_fragment
418                    .fragment_type_mask
419                    .add(FragmentTypeFlag::Dml);
420            }
421
422            NodeBody::Materialize(_) => {
423                current_fragment
424                    .fragment_type_mask
425                    .add(FragmentTypeFlag::Mview);
426            }
427
428            NodeBody::Sink(_) => current_fragment
429                .fragment_type_mask
430                .add(FragmentTypeFlag::Sink),
431
432            NodeBody::TopN(_) => current_fragment.requires_singleton = true,
433
434            NodeBody::EowcGapFill(node) => {
435                let table = node.buffer_table.as_ref().unwrap().clone();
436                state.tables.insert(table.id, table);
437                let table = node.prev_row_table.as_ref().unwrap().clone();
438                state.tables.insert(table.id, table);
439            }
440
441            NodeBody::GapFill(node) => {
442                let table = node.state_table.as_ref().unwrap().clone();
443                state.tables.insert(table.id, table);
444            }
445
446            NodeBody::StreamScan(node) => {
447                current_fragment
448                    .fragment_type_mask
449                    .add(FragmentTypeFlag::StreamScan);
450                match node.stream_scan_type() {
451                    StreamScanType::SnapshotBackfill => {
452                        current_fragment
453                            .fragment_type_mask
454                            .add(FragmentTypeFlag::SnapshotBackfillStreamScan);
455                        state.has_snapshot_backfill = true;
456                        state.has_any_backfill = true;
457                    }
458                    StreamScanType::Backfill | StreamScanType::ArrangementBackfill => {
459                        state.has_any_backfill = true;
460                    }
461                    StreamScanType::CrossDbSnapshotBackfill => {
462                        current_fragment
463                            .fragment_type_mask
464                            .add(FragmentTypeFlag::CrossDbSnapshotBackfillStreamScan);
465                        state.has_cross_db_snapshot_backfill = true;
466                        state.has_any_backfill = true;
467                    }
468                    StreamScanType::Unspecified
469                    | StreamScanType::Chain
470                    | StreamScanType::Rearrange
471                    | StreamScanType::UpstreamOnly => {}
472                }
473                // memorize table id for later use
474                // The table id could be a upstream CDC source
475                state.dependent_table_ids.insert(node.table_id);
476
477                // Add state table if present
478                if let Some(state_table) = &node.state_table {
479                    let table = state_table.clone();
480                    state.tables.insert(table.id, table);
481                }
482            }
483
484            NodeBody::StreamCdcScan(node) => {
485                if let Some(o) = node.options
486                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
487                {
488                    // Use parallel CDC backfill.
489                    current_fragment
490                        .fragment_type_mask
491                        .add(FragmentTypeFlag::StreamCdcScan);
492                } else {
493                    current_fragment
494                        .fragment_type_mask
495                        .add(FragmentTypeFlag::StreamScan);
496                    // the backfill algorithm is not parallel safe
497                    current_fragment.requires_singleton = true;
498                }
499                state.has_source_backfill = true;
500                state.has_any_backfill = true;
501            }
502
503            NodeBody::CdcFilter(node) => {
504                current_fragment
505                    .fragment_type_mask
506                    .add(FragmentTypeFlag::CdcFilter);
507                // memorize upstream source id for later use
508                state
509                    .dependent_table_ids
510                    .insert(node.upstream_source_id.as_cdc_table_id());
511            }
512            NodeBody::SourceBackfill(node) => {
513                current_fragment
514                    .fragment_type_mask
515                    .add(FragmentTypeFlag::SourceScan);
516                // memorize upstream source id for later use
517                let source_id = node.upstream_source_id;
518                state
519                    .dependent_table_ids
520                    .insert(source_id.as_cdc_table_id());
521                state.has_source_backfill = true;
522                state.has_any_backfill = true;
523            }
524
525            NodeBody::Now(_) => {
526                // TODO: Remove this and insert a `BarrierRecv` instead.
527                current_fragment
528                    .fragment_type_mask
529                    .add(FragmentTypeFlag::Now);
530                current_fragment.requires_singleton = true;
531            }
532
533            NodeBody::Values(_) => {
534                current_fragment
535                    .fragment_type_mask
536                    .add(FragmentTypeFlag::Values);
537                current_fragment.requires_singleton = true;
538            }
539
540            NodeBody::StreamFsFetch(_) => {
541                current_fragment
542                    .fragment_type_mask
543                    .add(FragmentTypeFlag::FsFetch);
544            }
545
546            NodeBody::VectorIndexWrite(_) => {
547                current_fragment
548                    .fragment_type_mask
549                    .add(FragmentTypeFlag::VectorIndexWrite);
550            }
551
552            NodeBody::UpstreamSinkUnion(_) => {
553                current_fragment
554                    .fragment_type_mask
555                    .add(FragmentTypeFlag::UpstreamSinkUnion);
556            }
557
558            NodeBody::LocalityProvider(_) => {
559                current_fragment
560                    .fragment_type_mask
561                    .add(FragmentTypeFlag::LocalityProvider);
562            }
563
564            _ => {}
565        };
566
567        // handle join logic
568        if let NodeBody::DeltaIndexJoin(delta_index_join) = stream_node.node_body.as_mut().unwrap()
569        {
570            if delta_index_join.get_join_type()? == JoinType::Inner
571                && delta_index_join.condition.is_none()
572            {
573                return build_delta_join_without_arrange(state, current_fragment, stream_node);
574            } else {
575                panic!("only inner join without non-equal condition is supported for delta joins");
576            }
577        }
578
579        // Usually we do not expect exchange node to be visited here, which should be handled by the
580        // following logic of "visit children" instead. If it does happen (for example, `Share` will be
581        // transformed to an `Exchange`), it means we have an empty fragment and we need to add a no-op
582        // node to it, so that the meta service can handle it correctly.
583        if let NodeBody::Exchange(_) = stream_node.node_body.as_ref().unwrap() {
584            stream_node = state.gen_no_op_stream_node(stream_node);
585        }
586
587        // Visit plan children.
588        stream_node.input = stream_node
589            .input
590            .into_iter()
591            .map(|mut child_node| {
592                match child_node.get_node_body()? {
593                    // When exchange node is generated when doing rewrites, it could be having
594                    // zero input. In this case, we won't recursively visit its children.
595                    NodeBody::Exchange(_) if child_node.input.is_empty() => Ok(child_node),
596                    // Exchange node indicates a new child fragment.
597                    NodeBody::Exchange(exchange_node) => {
598                        let exchange_node_strategy = exchange_node.get_strategy()?.clone();
599
600                        // Exchange node should have only one input.
601                        let [input]: [_; 1] =
602                            std::mem::take(&mut child_node.input).try_into().unwrap();
603                        let child_fragment = build_and_add_fragment(state, input)?;
604
605                        let result = state.fragment_graph.try_add_edge(
606                            child_fragment.fragment_id,
607                            current_fragment.fragment_id,
608                            StreamFragmentEdge {
609                                dispatch_strategy: exchange_node_strategy.clone(),
610                                // Always use the exchange operator id as the link id.
611                                link_id: child_node.operator_id.as_raw_id(),
612                            },
613                        );
614
615                        // It's possible that there're multiple edges between two fragments, while the
616                        // meta service and the compute node does not expect this. In this case, we
617                        // manually insert a fragment of `NoOp` between the two fragments.
618                        if result.is_err() {
619                            // Assign a new operator id for the `Exchange`, so we can distinguish it
620                            // from duplicate edges and break the sharing.
621                            child_node.operator_id = state.gen_operator_id();
622
623                            // Take the upstream plan node as the reference for properties of `NoOp`.
624                            let ref_fragment_node = child_fragment.node.as_ref().unwrap();
625                            let no_shuffle_strategy = DispatchStrategy {
626                                r#type: DispatcherType::NoShuffle as i32,
627                                dist_key_indices: vec![],
628                                output_mapping: PbDispatchOutputMapping::identical(
629                                    ref_fragment_node.fields.len(),
630                                )
631                                .into(),
632                            };
633
634                            let no_shuffle_exchange_operator_id = state.gen_operator_id();
635
636                            let no_op_fragment = {
637                                let node = state.gen_no_op_stream_node(StreamNode {
638                                    operator_id: no_shuffle_exchange_operator_id,
639                                    identity: "StreamNoShuffleExchange".into(),
640                                    node_body: Some(NodeBody::Exchange(Box::new(ExchangeNode {
641                                        strategy: Some(no_shuffle_strategy.clone()),
642                                    }))),
643                                    input: vec![],
644
645                                    // Take reference's properties.
646                                    stream_key: ref_fragment_node.stream_key.clone(),
647                                    stream_kind: ref_fragment_node.stream_kind,
648                                    fields: ref_fragment_node.fields.clone(),
649                                });
650
651                                let mut fragment = state.new_stream_fragment();
652                                fragment.node = Some(node.into());
653                                Rc::new(fragment)
654                            };
655
656                            state.fragment_graph.add_fragment(no_op_fragment.clone());
657
658                            state.fragment_graph.add_edge(
659                                child_fragment.fragment_id,
660                                no_op_fragment.fragment_id,
661                                StreamFragmentEdge {
662                                    // Use `NoShuffle` exhcnage strategy for upstream edge.
663                                    dispatch_strategy: no_shuffle_strategy,
664                                    link_id: no_shuffle_exchange_operator_id.as_raw_id(),
665                                },
666                            );
667                            state.fragment_graph.add_edge(
668                                no_op_fragment.fragment_id,
669                                current_fragment.fragment_id,
670                                StreamFragmentEdge {
671                                    // Use the original exchange strategy for downstream edge.
672                                    dispatch_strategy: exchange_node_strategy,
673                                    link_id: child_node.operator_id.as_raw_id(),
674                                },
675                            );
676                        }
677
678                        Ok(child_node)
679                    }
680
681                    // For other children, visit recursively.
682                    _ => build_fragment(state, current_fragment, child_node),
683                }
684            })
685            .collect::<Result<_>>()?;
686        Ok(stream_node)
687    })
688}