risingwave_frontend/optimizer/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DataFusionExecuteCheckerExt;
36pub use plan_visitor::{
37    ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72    ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87    BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88    PlanTreeNode, Stream, StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion,
89    StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92    LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
96
97/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
98/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
99/// batch plan with optimization. the required Order and Distribution columns might be more than the
100/// output columns. for example:
101/// ```sql
102///    select v1 from t order by id;
103/// ```
104/// the plan will return two columns (id, v1), and the required order column is id. the id
105/// column is required in optimization, but the final generated plan will remove the unnecessary
106/// column in the result.
107#[derive(Educe)]
108#[educe(Debug, Clone)]
109pub struct PlanRoot<P: PlanPhase> {
110    // The current plan node.
111    pub plan: PlanRef<P::Convention>,
112    // The phase of the plan.
113    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
114    _phase: PhantomData<P>,
115    required_dist: RequiredDist,
116    required_order: Order,
117    out_fields: FixedBitSet,
118    out_names: Vec<String>,
119}
120
121/// `PlanPhase` is used to track the phase of the `PlanRoot`.
122/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
123/// Typical phase transformation are:
124/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
125/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
126pub trait PlanPhase {
127    type Convention: ConventionMarker;
128}
129
130macro_rules! for_all_phase {
131    () => {
132        for_all_phase! {
133            { Logical, $crate::optimizer::plan_node::Logical },
134            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
135            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
136            { Batch, $crate::optimizer::plan_node::Batch },
137            { Stream, $crate::optimizer::plan_node::Stream }
138        }
139    };
140    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
141        $(
142            paste::paste! {
143                pub struct [< PlanPhase$phase >];
144                impl PlanPhase for [< PlanPhase$phase >] {
145                    type Convention = $convention;
146                }
147                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
148            }
149        )+
150    }
151}
152
153for_all_phase!();
154
155impl LogicalPlanRoot {
156    pub fn new_with_logical_plan(
157        plan: LogicalPlanRef,
158        required_dist: RequiredDist,
159        required_order: Order,
160        out_fields: FixedBitSet,
161        out_names: Vec<String>,
162    ) -> Self {
163        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
164    }
165}
166
167impl BatchPlanRoot {
168    pub fn new_with_batch_plan(
169        plan: BatchPlanRef,
170        required_dist: RequiredDist,
171        required_order: Order,
172        out_fields: FixedBitSet,
173        out_names: Vec<String>,
174    ) -> Self {
175        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
176    }
177}
178
179impl<P: PlanPhase> PlanRoot<P> {
180    fn new_inner(
181        plan: PlanRef<P::Convention>,
182        required_dist: RequiredDist,
183        required_order: Order,
184        out_fields: FixedBitSet,
185        out_names: Vec<String>,
186    ) -> Self {
187        let input_schema = plan.schema();
188        assert_eq!(input_schema.fields().len(), out_fields.len());
189        assert_eq!(out_fields.count_ones(..), out_names.len());
190
191        Self {
192            plan,
193            _phase: PhantomData,
194            required_dist,
195            required_order,
196            out_fields,
197            out_names,
198        }
199    }
200
201    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
202        PlanRoot {
203            plan,
204            _phase: PhantomData,
205            required_dist: self.required_dist,
206            required_order: self.required_order,
207            out_fields: self.out_fields,
208            out_names: self.out_names,
209        }
210    }
211
212    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
213    /// b, ..)`.
214    ///
215    /// If the number of names does not match the number of output fields, an error is returned.
216    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
217        if out_names.len() != self.out_fields.count_ones(..) {
218            Err(ErrorCode::InvalidInputSyntax(
219                "number of column names does not match number of columns".to_owned(),
220            ))?
221        }
222        self.out_names = out_names;
223        Ok(())
224    }
225
226    /// Get the plan root's schema, only including the fields to be output.
227    pub fn schema(&self) -> Schema {
228        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
229        // as a field and always construct one on demand here to keep it in sync.
230        Schema {
231            fields: self
232                .out_fields
233                .ones()
234                .map(|i| self.plan.schema().fields()[i].clone())
235                .zip_eq_debug(&self.out_names)
236                .map(|(field, name)| Field {
237                    name: name.clone(),
238                    ..field
239                })
240                .collect(),
241        }
242    }
243}
244
245impl LogicalPlanRoot {
246    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
247    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
248    /// (`out_fields`).
249    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
250        if self.out_fields.count_ones(..) == self.out_fields.len() {
251            return self.plan;
252        }
253        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
254    }
255
256    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
257    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
258    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
259    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
260        use generic::Agg;
261        use plan_node::PlanAggCall;
262        use risingwave_common::types::ListValue;
263        use risingwave_expr::aggregate::PbAggKind;
264
265        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
266        use crate::utils::{Condition, IndexSet};
267
268        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
269            bail!("subquery must return only one column");
270        };
271        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
272        let return_type = DataType::list(input_column_type.clone());
273        let agg = Agg::new(
274            vec![PlanAggCall {
275                agg_type: PbAggKind::ArrayAgg.into(),
276                return_type: return_type.clone(),
277                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
278                distinct: false,
279                order_by: self.required_order.column_orders,
280                filter: Condition::true_cond(),
281                direct_args: vec![],
282            }],
283            IndexSet::empty(),
284            self.plan,
285        );
286        Ok(LogicalProject::create(
287            agg.into(),
288            vec![
289                FunctionCall::new(
290                    ExprType::Coalesce,
291                    vec![
292                        InputRef::new(0, return_type).into(),
293                        ExprImpl::literal_list(
294                            ListValue::empty(&input_column_type),
295                            input_column_type,
296                        ),
297                    ],
298                )
299                .unwrap()
300                .into(),
301            ],
302        ))
303    }
304
305    /// Apply logical optimization to the plan for stream.
306    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
307        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
308        Ok(self)
309    }
310
311    /// Apply logical optimization to the plan for batch.
312    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
313        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
314        Ok(self.into_phase(plan))
315    }
316
317    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318        self.gen_optimized_logical_plan_for_batch()?
319            .gen_batch_plan()
320    }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324    /// Optimize and generate a singleton batch physical plan without exchange nodes.
325    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
326        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
327            return Err(ErrorCode::NotSupported(
328                "do not support temporal join for batch queries".to_owned(),
329                "please use temporal join in streaming queries".to_owned(),
330            )
331            .into());
332        }
333
334        let ctx = self.plan.ctx();
335        // Inline session timezone mainly for rewriting now()
336        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
337
338        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
339        plan = const_eval_exprs(plan)?;
340
341        if ctx.is_explain_trace() {
342            ctx.trace("Const eval exprs:");
343            ctx.trace(plan.explain_to_string());
344        }
345
346        // Convert to physical plan node
347        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
348        if ctx.is_explain_trace() {
349            ctx.trace("To Batch Plan:");
350            ctx.trace(plan.explain_to_string());
351        }
352
353        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
354            "Merge BatchProject",
355            vec![BatchProjectMergeRule::create()],
356            ApplyOrder::BottomUp,
357        ))?;
358
359        // Inline session timezone
360        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
361
362        if ctx.is_explain_trace() {
363            ctx.trace("Inline Session Timezone:");
364            ctx.trace(plan.explain_to_string());
365        }
366
367        #[cfg(debug_assertions)]
368        InputRefValidator.validate(plan.clone());
369        assert_eq!(
370            *plan.distribution(),
371            Distribution::Single,
372            "{}",
373            plan.explain_to_string()
374        );
375        assert!(
376            !has_batch_exchange(plan.clone()),
377            "{}",
378            plan.explain_to_string()
379        );
380
381        let ctx = plan.ctx();
382        if ctx.is_explain_trace() {
383            ctx.trace("To Batch Physical Plan:");
384            ctx.trace(plan.explain_to_string());
385        }
386
387        Ok(self.into_phase(plan))
388    }
389
390    #[cfg(feature = "datafusion")]
391    pub fn gen_datafusion_logical_plan(
392        &self,
393    ) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
394        use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
395        use datafusion_common::Column;
396        use plan_visitor::LogicalPlanToDataFusionExt;
397
398        use crate::datafusion::{InputColumns, convert_column_order};
399
400        tracing::debug!(
401            "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
402            self.plan
403        );
404
405        let ctx = self.plan.ctx();
406        // Inline session timezone mainly for rewriting now()
407        let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
408        plan = const_eval_exprs(plan)?;
409
410        let mut df_plan = plan.to_datafusion_logical_plan()?;
411
412        if !self.required_order.is_any() {
413            let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
414            let expr = self
415                .required_order
416                .column_orders
417                .iter()
418                .map(|column_order| convert_column_order(column_order, &input_columns))
419                .collect_vec();
420            df_plan = Arc::new(LogicalPlan::Sort(Sort {
421                expr,
422                input: df_plan,
423                fetch: None,
424            }));
425        }
426
427        if self.out_names.len() < df_plan.schema().fields().len() {
428            let df_schema = df_plan.schema().as_ref();
429            let projection_exprs = self
430                .out_fields
431                .ones()
432                .zip_eq_debug(self.out_names.iter())
433                .map(|(i, name)| {
434                    DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
435                })
436                .collect_vec();
437            df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
438                projection_exprs,
439                df_plan,
440            )?));
441        }
442
443        tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
444
445        Ok(df_plan)
446    }
447}
448
449impl BatchPlanRoot {
450    /// Optimize and generate a batch query plan for distributed execution.
451    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
452        self.required_dist = RequiredDist::single();
453        let mut plan = self.plan;
454
455        // Convert to distributed plan
456        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
457
458        let ctx = plan.ctx();
459        if ctx.is_explain_trace() {
460            ctx.trace("To Batch Distributed Plan:");
461            ctx.trace(plan.explain_to_string());
462        }
463        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
464            plan =
465                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
466        }
467
468        // Add Project if the any position of `self.out_fields` is set to zero.
469        if self.out_fields.count_ones(..) != self.out_fields.len() {
470            plan =
471                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
472        }
473
474        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
475        let plan = plan.optimize_by_rules(&OptimizationStage::new(
476            "Push Limit To Scan",
477            vec![BatchPushLimitToScanRule::create()],
478            ApplyOrder::BottomUp,
479        ))?;
480
481        Ok(plan)
482    }
483
484    /// Optimize and generate a batch query plan for local execution.
485    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
486        let mut plan = self.plan;
487
488        // Convert to local plan node
489        plan = plan.to_local_with_order_required(&self.required_order)?;
490
491        // We remark that since the `to_local_with_order_required` does not enforce single
492        // distribution, we enforce at the root if needed.
493        let insert_exchange = match plan.distribution() {
494            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
495            _ => true,
496        };
497        if insert_exchange {
498            plan =
499                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
500        }
501
502        // Add Project if the any position of `self.out_fields` is set to zero.
503        if self.out_fields.count_ones(..) != self.out_fields.len() {
504            plan =
505                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
506        }
507
508        let ctx = plan.ctx();
509        if ctx.is_explain_trace() {
510            ctx.trace("To Batch Local Plan:");
511            ctx.trace(plan.explain_to_string());
512        }
513
514        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
515        let plan = plan.optimize_by_rules(&OptimizationStage::new(
516            "Push Limit To Scan",
517            vec![BatchPushLimitToScanRule::create()],
518            ApplyOrder::BottomUp,
519        ))?;
520
521        Ok(plan)
522    }
523}
524
525impl LogicalPlanRoot {
526    /// Generate optimized stream plan
527    fn gen_optimized_stream_plan(
528        self,
529        emit_on_window_close: bool,
530        allow_snapshot_backfill: bool,
531    ) -> Result<StreamOptimizedLogicalPlanRoot> {
532        let backfill_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
533            BackfillType::SnapshotBackfill
534        } else if self.should_use_arrangement_backfill() {
535            BackfillType::ArrangementBackfill
536        } else {
537            BackfillType::Backfill
538        };
539        self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)
540    }
541
542    fn gen_optimized_stream_plan_inner(
543        self,
544        emit_on_window_close: bool,
545        backfill_type: BackfillType,
546    ) -> Result<StreamOptimizedLogicalPlanRoot> {
547        let ctx = self.plan.ctx();
548        let _explain_trace = ctx.is_explain_trace();
549
550        let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
551
552        let mut plan = optimized_plan
553            .plan
554            .clone()
555            .optimize_by_rules(&OptimizationStage::new(
556                "Merge StreamProject",
557                vec![StreamProjectMergeRule::create()],
558                ApplyOrder::BottomUp,
559            ))?;
560
561        if ctx
562            .session_ctx()
563            .config()
564            .streaming_separate_consecutive_join()
565        {
566            plan = plan.optimize_by_rules(&OptimizationStage::new(
567                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
568                vec![SeparateConsecutiveJoinRule::create()],
569                ApplyOrder::BottomUp,
570            ))?;
571        }
572
573        // Add Logstore for Unaligned join
574        // Apply this BEFORE delta join rule, because delta join removes
575        // the join
576        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
577            plan = plan.optimize_by_rules(&OptimizationStage::new(
578                "Add Logstore for Unaligned join",
579                vec![AddLogstoreRule::create()],
580                ApplyOrder::BottomUp,
581            ))?;
582        }
583
584        if ctx.session_ctx().config().streaming_enable_delta_join()
585            && ctx.session_ctx().config().enable_index_selection()
586        {
587            // TODO: make it a logical optimization.
588            // Rewrite joins with index to delta join
589            plan = plan.optimize_by_rules(&OptimizationStage::new(
590                "To IndexDeltaJoin",
591                vec![IndexDeltaJoinRule::create()],
592                ApplyOrder::BottomUp,
593            ))?;
594        }
595        // Inline session timezone
596        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
597
598        if ctx.is_explain_trace() {
599            ctx.trace("Inline session timezone:");
600            ctx.trace(plan.explain_to_string());
601        }
602
603        // Const eval of exprs at the last minute
604        plan = const_eval_exprs(plan)?;
605
606        if ctx.is_explain_trace() {
607            ctx.trace("Const eval exprs:");
608            ctx.trace(plan.explain_to_string());
609        }
610
611        #[cfg(debug_assertions)]
612        InputRefValidator.validate(plan.clone());
613
614        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
615            return Err(ErrorCode::NotSupported(
616                "exist dangling temporal scan".to_owned(),
617                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
618            ).into());
619        }
620
621        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
622            return Err(ErrorCode::NotSupported(
623                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
624                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
625            ).into());
626        }
627
628        if LocalityProviderCounter::count(plan.clone()) > 5 {
629            // LocalityProviderCounter is non-zero only when locality backfill is enabled.
630            assert!(ctx.session_ctx().config().enable_locality_backfill());
631            risingwave_common::license::Feature::LocalityBackfill.check_available()?;
632        }
633
634        if ctx.missed_locality_providers() > 1
635            && risingwave_common::license::Feature::LocalityBackfill
636                .check_available()
637                .is_ok()
638        {
639            // missed_locality_providers can only be non-zero when locality backfill is disabled.
640            assert!(!ctx.session_ctx().config().enable_locality_backfill());
641            ctx.warn_to_user(format!(
642                "This streaming job has {} operators that could benefit from locality backfill. \
643                Consider enabling it with `SET enable_locality_backfill = true` for potentially \
644                faster backfill performance, when existing data volume in upstream(s) is large.",
645                ctx.missed_locality_providers()
646            ));
647        }
648
649        Ok(optimized_plan.into_phase(plan))
650    }
651
652    /// Generate create index or create materialize view plan.
653    fn gen_stream_plan(
654        self,
655        emit_on_window_close: bool,
656        backfill_type: BackfillType,
657    ) -> Result<StreamOptimizedLogicalPlanRoot> {
658        let ctx = self.plan.ctx();
659        let explain_trace = ctx.is_explain_trace();
660
661        let plan = {
662            {
663                if !ctx
664                    .session_ctx()
665                    .config()
666                    .streaming_allow_jsonb_in_stream_key()
667                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
668                {
669                    return Err(ErrorCode::NotSupported(
670                        err,
671                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
672                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
673                    ).into());
674                }
675                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
676                let (plan, out_col_change) = {
677                    let (plan, out_col_change) = optimized_plan
678                        .plan
679                        .logical_rewrite_for_stream(&mut Default::default())?;
680                    if out_col_change.is_injective() {
681                        (plan, out_col_change)
682                    } else {
683                        let mut output_indices = (0..plan.schema().len()).collect_vec();
684                        #[allow(unused_assignments)]
685                        let (mut map, mut target_size) = out_col_change.into_parts();
686
687                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
688                        // assert_eq!(target_size, output_indices.len());
689                        target_size = plan.schema().len();
690                        let mut tar_exists = vec![false; target_size];
691                        for i in map.iter_mut().flatten() {
692                            if tar_exists[*i] {
693                                output_indices.push(*i);
694                                *i = target_size;
695                                target_size += 1;
696                            } else {
697                                tar_exists[*i] = true;
698                            }
699                        }
700                        let plan =
701                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
702                        let out_col_change = ColIndexMapping::new(map, target_size);
703                        (plan.into(), out_col_change)
704                    }
705                };
706
707                if explain_trace {
708                    ctx.trace("Logical Rewrite For Stream:");
709                    ctx.trace(plan.explain_to_string());
710                }
711
712                optimized_plan.required_dist =
713                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
714                optimized_plan.required_order = out_col_change
715                    .rewrite_required_order(&optimized_plan.required_order)
716                    .unwrap();
717                optimized_plan.out_fields =
718                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
719                let mut plan = plan.to_stream_with_dist_required(
720                    &optimized_plan.required_dist,
721                    &mut ToStreamContext::new_with_backfill_type(
722                        emit_on_window_close,
723                        backfill_type,
724                    ),
725                )?;
726                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
727                optimized_plan.into_phase(plan)
728            }
729        };
730
731        if explain_trace {
732            ctx.trace("To Stream Plan:");
733            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
734            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
735        }
736        Ok(plan)
737    }
738
739    /// Visit the plan root and compute the cardinality.
740    ///
741    /// Panics if not called on a logical plan.
742    fn compute_cardinality(&self) -> Cardinality {
743        CardinalityVisitor.visit(self.plan.clone())
744    }
745
746    /// Optimize and generate a create table plan.
747    pub fn gen_table_plan(
748        self,
749        context: OptimizerContextRef,
750        table_name: String,
751        database_id: DatabaseId,
752        schema_id: SchemaId,
753        CreateTableInfo {
754            columns,
755            pk_column_ids,
756            row_id_index,
757            watermark_descs,
758            source_catalog,
759            version,
760        }: CreateTableInfo,
761        CreateTableProps {
762            definition,
763            append_only,
764            on_conflict,
765            with_version_columns,
766            webhook_info,
767            engine,
768        }: CreateTableProps,
769    ) -> Result<StreamMaterialize> {
770        // Snapshot backfill is not allowed for create table
771        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
772
773        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
774
775        let pk_column_indices = {
776            let mut id_to_idx = HashMap::new();
777
778            columns.iter().enumerate().for_each(|(idx, c)| {
779                id_to_idx.insert(c.column_id(), idx);
780            });
781            pk_column_ids
782                .iter()
783                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
784                .collect_vec()
785        };
786
787        fn inject_project_for_generated_column_if_needed(
788            columns: &[ColumnCatalog],
789            node: StreamPlanRef,
790        ) -> Result<StreamPlanRef> {
791            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
792            if let Some(exprs) = exprs {
793                let logical_project = generic::Project::new(exprs, node);
794                return Ok(StreamProject::new(logical_project).into());
795            }
796            Ok(node)
797        }
798
799        #[derive(PartialEq, Debug, Copy, Clone)]
800        enum PrimaryKeyKind {
801            UserDefinedPrimaryKey,
802            NonAppendOnlyRowIdPk,
803            AppendOnlyRowIdPk,
804        }
805
806        fn inject_dml_node(
807            columns: &[ColumnCatalog],
808            append_only: bool,
809            stream_plan: StreamPlanRef,
810            pk_column_indices: &[usize],
811            kind: PrimaryKeyKind,
812            column_descs: Vec<ColumnDesc>,
813        ) -> Result<StreamPlanRef> {
814            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
815
816            // Add generated columns.
817            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
818
819            dml_node = match kind {
820                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
821                    RequiredDist::hash_shard(pk_column_indices)
822                        .streaming_enforce_if_not_satisfies(dml_node)?
823                }
824                PrimaryKeyKind::AppendOnlyRowIdPk => {
825                    StreamExchange::new_no_shuffle(dml_node).into()
826                }
827            };
828
829            Ok(dml_node)
830        }
831
832        let kind = if let Some(row_id_index) = row_id_index {
833            assert_eq!(
834                pk_column_indices.iter().exactly_one().copied().unwrap(),
835                row_id_index
836            );
837            if append_only {
838                PrimaryKeyKind::AppendOnlyRowIdPk
839            } else {
840                PrimaryKeyKind::NonAppendOnlyRowIdPk
841            }
842        } else {
843            PrimaryKeyKind::UserDefinedPrimaryKey
844        };
845
846        let column_descs: Vec<ColumnDesc> = columns
847            .iter()
848            .filter(|&c| c.can_dml())
849            .map(|c| c.column_desc.clone())
850            .collect();
851
852        let mut not_null_idxs = vec![];
853        for (idx, column) in column_descs.iter().enumerate() {
854            if !column.nullable {
855                not_null_idxs.push(idx);
856            }
857        }
858
859        let version_column_indices = if !with_version_columns.is_empty() {
860            find_version_column_indices(&columns, with_version_columns)?
861        } else {
862            vec![]
863        };
864
865        let with_external_source = source_catalog.is_some();
866        let (dml_source_node, external_source_node) = if with_external_source {
867            let dummy_source_node = LogicalSource::new(
868                None,
869                columns.clone(),
870                row_id_index,
871                SourceNodeKind::CreateTable,
872                context.clone(),
873                None,
874            )
875            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
876            let mut external_source_node = stream_plan.plan;
877            external_source_node =
878                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
879            external_source_node = match kind {
880                PrimaryKeyKind::UserDefinedPrimaryKey => {
881                    RequiredDist::hash_shard(&pk_column_indices)
882                        .streaming_enforce_if_not_satisfies(external_source_node)?
883                }
884
885                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
886                    StreamExchange::new_no_shuffle(external_source_node).into()
887                }
888            };
889            (dummy_source_node, Some(external_source_node))
890        } else {
891            (stream_plan.plan, None)
892        };
893
894        let dml_node = inject_dml_node(
895            &columns,
896            append_only,
897            dml_source_node,
898            &pk_column_indices,
899            kind,
900            column_descs,
901        )?;
902
903        let dists = external_source_node
904            .iter()
905            .map(|input| input.distribution())
906            .chain([dml_node.distribution()])
907            .unique()
908            .collect_vec();
909
910        let dist = match &dists[..] {
911            &[Distribution::SomeShard, Distribution::HashShard(_)]
912            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
913            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
914                dist.clone()
915            }
916            _ => {
917                unreachable!()
918            }
919        };
920
921        let generated_column_exprs =
922            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
923        let upstream_sink_union = StreamUpstreamSinkUnion::new(
924            context.clone(),
925            dml_node.schema(),
926            dml_node.stream_key(),
927            dist.clone(), // should always be the same as dist of `Union`
928            append_only,
929            row_id_index.is_none(),
930            generated_column_exprs,
931        );
932
933        let union_inputs = external_source_node
934            .into_iter()
935            .chain([dml_node, upstream_sink_union.into()])
936            .collect_vec();
937
938        let mut stream_plan = StreamUnion::new_with_dist(
939            Union {
940                all: true,
941                inputs: union_inputs,
942                source_col: None,
943            },
944            dist,
945        )
946        .into();
947
948        let ttl_watermark_indices = watermark_descs
949            .iter()
950            .filter(|d| d.with_ttl)
951            .map(|d| d.watermark_idx as usize)
952            .collect_vec();
953
954        // Add WatermarkFilter node.
955        if !watermark_descs.is_empty() {
956            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
957        }
958
959        // Add RowIDGen node if needed.
960        if let Some(row_id_index) = row_id_index {
961            match kind {
962                PrimaryKeyKind::UserDefinedPrimaryKey => {
963                    unreachable!()
964                }
965                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
966                    stream_plan = StreamRowIdGen::new_with_dist(
967                        stream_plan,
968                        row_id_index,
969                        Distribution::HashShard(vec![row_id_index]),
970                    )
971                    .into();
972                }
973            }
974        }
975
976        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
977
978        if let ConflictBehavior::IgnoreConflict = conflict_behavior
979            && !version_column_indices.is_empty()
980        {
981            Err(ErrorCode::InvalidParameterValue(
982                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
983            ))?
984        }
985
986        let retention_seconds = context.with_options().retention_seconds();
987
988        let table_required_dist = {
989            let mut bitset = FixedBitSet::with_capacity(columns.len());
990            for idx in &pk_column_indices {
991                bitset.insert(*idx);
992            }
993            RequiredDist::ShardByKey(bitset)
994        };
995
996        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
997
998        if !not_null_idxs.is_empty() {
999            stream_plan =
1000                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
1001        }
1002
1003        // Determine if the table should be refreshable based on the connector type
1004        let refreshable = source_catalog
1005            .as_ref()
1006            .map(|catalog| {
1007                catalog.with_properties.is_batch_connector() || {
1008                    matches!(
1009                        catalog
1010                            .refresh_mode
1011                            .as_ref()
1012                            .map(|refresh_mode| refresh_mode.refresh_mode),
1013                        Some(Some(RefreshMode::FullReload(_)))
1014                    )
1015                }
1016            })
1017            .unwrap_or(false);
1018
1019        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
1020        if refreshable && row_id_index.is_some() {
1021            return Err(crate::error::ErrorCode::BindError(
1022                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1023                    .to_owned(),
1024            )
1025            .into());
1026        }
1027
1028        StreamMaterialize::create_for_table(
1029            stream_plan,
1030            table_name,
1031            database_id,
1032            schema_id,
1033            table_required_dist,
1034            Order::any(),
1035            columns,
1036            definition,
1037            conflict_behavior,
1038            version_column_indices,
1039            pk_column_indices,
1040            ttl_watermark_indices,
1041            row_id_index,
1042            version,
1043            retention_seconds,
1044            webhook_info,
1045            engine,
1046            refreshable,
1047        )
1048    }
1049
1050    /// Optimize and generate a create materialized view plan.
1051    pub fn gen_materialize_plan(
1052        self,
1053        database_id: DatabaseId,
1054        schema_id: SchemaId,
1055        mv_name: String,
1056        definition: String,
1057        emit_on_window_close: bool,
1058    ) -> Result<StreamMaterialize> {
1059        let cardinality = self.compute_cardinality();
1060        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1061        StreamMaterialize::create(
1062            stream_plan,
1063            mv_name,
1064            database_id,
1065            schema_id,
1066            definition,
1067            TableType::MaterializedView,
1068            cardinality,
1069            None,
1070        )
1071    }
1072
1073    /// Optimize and generate a create index plan.
1074    pub fn gen_index_plan(
1075        self,
1076        index_name: String,
1077        database_id: DatabaseId,
1078        schema_id: SchemaId,
1079        definition: String,
1080        retention_seconds: Option<NonZeroU32>,
1081    ) -> Result<StreamMaterialize> {
1082        let cardinality = self.compute_cardinality();
1083        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1084
1085        StreamMaterialize::create(
1086            stream_plan,
1087            index_name,
1088            database_id,
1089            schema_id,
1090            definition,
1091            TableType::Index,
1092            cardinality,
1093            retention_seconds,
1094        )
1095    }
1096
1097    pub fn gen_vector_index_plan(
1098        self,
1099        index_name: String,
1100        database_id: DatabaseId,
1101        schema_id: SchemaId,
1102        definition: String,
1103        retention_seconds: Option<NonZeroU32>,
1104        vector_index_info: PbVectorIndexInfo,
1105    ) -> Result<StreamVectorIndexWrite> {
1106        let cardinality = self.compute_cardinality();
1107        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1108
1109        StreamVectorIndexWrite::create(
1110            stream_plan,
1111            index_name,
1112            database_id,
1113            schema_id,
1114            definition,
1115            cardinality,
1116            retention_seconds,
1117            vector_index_info,
1118        )
1119    }
1120
1121    /// Optimize and generate a create sink plan.
1122    #[expect(clippy::too_many_arguments)]
1123    pub fn gen_sink_plan(
1124        self,
1125        sink_name: String,
1126        definition: String,
1127        properties: WithOptionsSecResolved,
1128        emit_on_window_close: bool,
1129        db_name: String,
1130        sink_from_table_name: String,
1131        format_desc: Option<SinkFormatDesc>,
1132        without_backfill: bool,
1133        target_table: Option<Arc<TableCatalog>>,
1134        partition_info: Option<PartitionComputeInfo>,
1135        user_specified_columns: bool,
1136        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1137        allow_snapshot_backfill: bool,
1138    ) -> Result<StreamSink> {
1139        let backfill_type = if without_backfill {
1140            BackfillType::UpstreamOnly
1141        } else if allow_snapshot_backfill
1142            && self.should_use_snapshot_backfill()
1143            && {
1144                if auto_refresh_schema_from_table.is_some() {
1145                    self.plan.ctx().session_ctx().notice_to_user("Auto schema change only support for ArrangementBackfill. Switched to use ArrangementBackfill");
1146                    false
1147                } else {
1148                    true
1149                }
1150            }
1151        {
1152            assert!(
1153                target_table.is_none(),
1154                "should not allow snapshot backfill for sink-into-table"
1155            );
1156            // Snapshot backfill on sink-into-table is not allowed
1157            BackfillType::SnapshotBackfill
1158        } else if self.should_use_arrangement_backfill() {
1159            BackfillType::ArrangementBackfill
1160        } else {
1161            BackfillType::Backfill
1162        };
1163        if auto_refresh_schema_from_table.is_some()
1164            && backfill_type != BackfillType::ArrangementBackfill
1165        {
1166            return Err(ErrorCode::InvalidInputSyntax(format!(
1167                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1168                backfill_type
1169            ))
1170            .into());
1171        }
1172        let stream_plan =
1173            self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)?;
1174        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1175            let columns = t.columns_without_rw_timestamp();
1176            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1177        });
1178
1179        StreamSink::create(
1180            stream_plan,
1181            sink_name,
1182            db_name,
1183            sink_from_table_name,
1184            target_table,
1185            target_columns_to_plan_mapping,
1186            definition,
1187            properties,
1188            format_desc,
1189            partition_info,
1190            auto_refresh_schema_from_table,
1191        )
1192    }
1193
1194    pub fn should_use_arrangement_backfill(&self) -> bool {
1195        let ctx = self.plan.ctx();
1196        let session_ctx = ctx.session_ctx();
1197        let arrangement_backfill_enabled = session_ctx
1198            .env()
1199            .streaming_config()
1200            .developer
1201            .enable_arrangement_backfill;
1202        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1203    }
1204
1205    pub fn should_use_snapshot_backfill(&self) -> bool {
1206        let ctx = self.plan.ctx();
1207        let session_ctx = ctx.session_ctx();
1208        let use_snapshot_backfill = session_ctx
1209            .env()
1210            .streaming_config()
1211            .developer
1212            .enable_snapshot_backfill
1213            && session_ctx.config().streaming_use_snapshot_backfill();
1214        if use_snapshot_backfill {
1215            if let Some(warning_msg) = self.plan.forbid_snapshot_backfill() {
1216                self.plan.ctx().session_ctx().notice_to_user(warning_msg);
1217                false
1218            } else {
1219                true
1220            }
1221        } else {
1222            false
1223        }
1224    }
1225}
1226
1227impl<P: PlanPhase> PlanRoot<P> {
1228    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1229    pub fn target_columns_to_plan_mapping(
1230        &self,
1231        tar_cols: &[ColumnCatalog],
1232        user_specified_columns: bool,
1233    ) -> Vec<Option<usize>> {
1234        #[allow(clippy::disallowed_methods)]
1235        let visible_cols: Vec<(usize, String)> = self
1236            .out_fields
1237            .ones()
1238            .zip_eq(self.out_names.iter().cloned())
1239            .collect_vec();
1240
1241        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1242        let visible_col_idxes_by_name = visible_cols
1243            .iter()
1244            .map(|(i, name)| (name.as_ref(), *i))
1245            .collect::<BTreeMap<_, _>>();
1246
1247        tar_cols
1248            .iter()
1249            .enumerate()
1250            .filter(|(_, tar_col)| tar_col.can_dml())
1251            .map(|(tar_i, tar_col)| {
1252                if user_specified_columns {
1253                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1254                } else {
1255                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1256                }
1257            })
1258            .collect()
1259    }
1260}
1261
1262fn find_version_column_indices(
1263    column_catalog: &Vec<ColumnCatalog>,
1264    version_column_names: Vec<String>,
1265) -> Result<Vec<usize>> {
1266    let mut indices = Vec::new();
1267    for version_column_name in version_column_names {
1268        let mut found = false;
1269        for (index, column) in column_catalog.iter().enumerate() {
1270            if column.column_desc.name == version_column_name {
1271                if let &DataType::Jsonb
1272                | &DataType::List(_)
1273                | &DataType::Struct(_)
1274                | &DataType::Bytea
1275                | &DataType::Boolean = column.data_type()
1276                {
1277                    return Err(ErrorCode::InvalidInputSyntax(format!(
1278                        "Version column {} must be of a comparable data type",
1279                        version_column_name
1280                    ))
1281                    .into());
1282                }
1283                indices.push(index);
1284                found = true;
1285                break;
1286            }
1287        }
1288        if !found {
1289            return Err(ErrorCode::InvalidInputSyntax(format!(
1290                "Version column {} not found",
1291                version_column_name
1292            ))
1293            .into());
1294        }
1295    }
1296    Ok(indices)
1297}
1298
1299fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1300    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1301
1302    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1303    if let Some(error) = const_eval_rewriter.error {
1304        return Err(error);
1305    }
1306    Ok(plan)
1307}
1308
1309fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1310    ctx: OptimizerContextRef,
1311    plan: PlanRef<C>,
1312) -> Result<PlanRef<C>> {
1313    let mut v = TimestamptzExprFinder::default();
1314    plan.visit_exprs_recursive(&mut v);
1315    if v.has() {
1316        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1317    } else {
1318        Ok(plan)
1319    }
1320}
1321
1322fn exist_and_no_exchange_before(
1323    plan: &BatchPlanRef,
1324    is_candidate: fn(&BatchPlanRef) -> bool,
1325) -> bool {
1326    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1327        return false;
1328    }
1329    is_candidate(plan)
1330        || plan
1331            .inputs()
1332            .iter()
1333            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1334}
1335
1336impl BatchPlanRef {
1337    fn is_user_table_scan(&self) -> bool {
1338        self.node_type() == BatchPlanNodeType::BatchSeqScan
1339            || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1340            || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1341    }
1342
1343    fn is_source_scan(&self) -> bool {
1344        self.node_type() == BatchPlanNodeType::BatchSource
1345            || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1346            || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1347    }
1348
1349    fn is_insert(&self) -> bool {
1350        self.node_type() == BatchPlanNodeType::BatchInsert
1351    }
1352
1353    fn is_update(&self) -> bool {
1354        self.node_type() == BatchPlanNodeType::BatchUpdate
1355    }
1356
1357    fn is_delete(&self) -> bool {
1358        self.node_type() == BatchPlanNodeType::BatchDelete
1359    }
1360}
1361
1362/// As we always run the root stage locally, for some plan in root stage which need to execute in
1363/// compute node we insert an additional exhchange before it to avoid to include it in the root
1364/// stage.
1365///
1366/// Returns `true` if we must insert an additional exchange to ensure this.
1367fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1368    assert_eq!(plan.distribution(), &Distribution::Single);
1369    exist_and_no_exchange_before(&plan, |plan| {
1370        plan.is_user_table_scan()
1371            || plan.is_source_scan()
1372            || plan.is_insert()
1373            || plan.is_update()
1374            || plan.is_delete()
1375    })
1376}
1377
1378/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1379/// them for the different requirement of plan node in different execute mode.
1380fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1381    assert_eq!(plan.distribution(), &Distribution::Single);
1382    exist_and_no_exchange_before(&plan, |plan| {
1383        plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1384    })
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389    use super::*;
1390    use crate::optimizer::plan_node::LogicalValues;
1391
1392    #[tokio::test]
1393    async fn test_as_subplan() {
1394        let ctx = OptimizerContext::mock().await;
1395        let values = LogicalValues::new(
1396            vec![],
1397            Schema::new(vec![
1398                Field::with_name(DataType::Int32, "v1"),
1399                Field::with_name(DataType::Varchar, "v2"),
1400            ]),
1401            ctx,
1402        )
1403        .into();
1404        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1405        let out_names = vec!["v1".into()];
1406        let root = PlanRoot::new_with_logical_plan(
1407            values,
1408            RequiredDist::Any,
1409            Order::any(),
1410            out_fields,
1411            out_names,
1412        );
1413        let subplan = root.into_unordered_subplan();
1414        assert_eq!(
1415            subplan.schema(),
1416            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1417        );
1418    }
1419}