risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35    SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::assert_matches::assert_matches;
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::sink::catalog::SinkFormatDesc;
62use risingwave_pb::stream_plan::StreamScanType;
63
64use self::heuristic_optimizer::ApplyOrder;
65use self::plan_node::generic::{self, PhysicalPlanRef};
66use self::plan_node::{
67    BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
68    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
69    ToStreamContext, stream_enforce_eowc_requirement,
70};
71#[cfg(debug_assertions)]
72use self::plan_visitor::InputRefValidator;
73use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
74use self::property::{Cardinality, RequiredDist};
75use self::rule::*;
76use crate::TableCatalog;
77use crate::catalog::table_catalog::TableType;
78use crate::catalog::{DatabaseId, SchemaId};
79use crate::error::{ErrorCode, Result};
80use crate::expr::TimestamptzExprFinder;
81use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
83use crate::optimizer::plan_node::{
84    BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
85    ToStream, VisitExprsRecursive,
86};
87use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
88use crate::optimizer::property::Distribution;
89use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
90
91/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
92/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
93/// batch plan with optimization. the required Order and Distribution columns might be more than the
94/// output columns. for example:
95/// ```sql
96///    select v1 from t order by id;
97/// ```
98/// the plan will return two columns (id, v1), and the required order column is id. the id
99/// column is required in optimization, but the final generated plan will remove the unnecessary
100/// column in the result.
101#[derive(Educe)]
102#[educe(Debug, Clone)]
103pub struct PlanRoot<P> {
104    // The current plan node.
105    pub plan: PlanRef,
106    // The phase of the plan.
107    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
108    _phase: PhantomData<P>,
109    required_dist: RequiredDist,
110    required_order: Order,
111    out_fields: FixedBitSet,
112    out_names: Vec<String>,
113}
114
115/// `PlanPhase` is used to track the phase of the `PlanRoot`.
116/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
117/// Typical phase transformation are:
118/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
119/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
120pub trait PlanPhase {}
121
122macro_rules! for_all_phase {
123    () => {
124        for_all_phase! {
125            Logical,
126            BatchOptimizedLogical,
127            StreamOptimizedLogical,
128            Batch,
129            Stream
130        }
131    };
132    ($($phase:ident),+ $(,)?) => {
133        $(
134            paste::paste! {
135                pub struct [< PlanPhase$phase >];
136                impl PlanPhase for [< PlanPhase$phase >] {}
137                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
138            }
139        )+
140    }
141}
142
143for_all_phase!();
144
145impl LogicalPlanRoot {
146    pub fn new_with_logical_plan(
147        plan: PlanRef,
148        required_dist: RequiredDist,
149        required_order: Order,
150        out_fields: FixedBitSet,
151        out_names: Vec<String>,
152    ) -> Self {
153        assert_eq!(plan.convention(), Convention::Logical);
154        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
155    }
156}
157
158impl BatchPlanRoot {
159    pub fn new_with_batch_plan(
160        plan: PlanRef,
161        required_dist: RequiredDist,
162        required_order: Order,
163        out_fields: FixedBitSet,
164        out_names: Vec<String>,
165    ) -> Self {
166        assert_eq!(plan.convention(), Convention::Batch);
167        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
168    }
169}
170
171impl<P: PlanPhase> PlanRoot<P> {
172    fn new_inner(
173        plan: PlanRef,
174        required_dist: RequiredDist,
175        required_order: Order,
176        out_fields: FixedBitSet,
177        out_names: Vec<String>,
178    ) -> Self {
179        let input_schema = plan.schema();
180        assert_eq!(input_schema.fields().len(), out_fields.len());
181        assert_eq!(out_fields.count_ones(..), out_names.len());
182
183        Self {
184            plan,
185            _phase: PhantomData,
186            required_dist,
187            required_order,
188            out_fields,
189            out_names,
190        }
191    }
192
193    fn into_phase<P2: PlanPhase>(self) -> PlanRoot<P2> {
194        PlanRoot {
195            plan: self.plan,
196            _phase: PhantomData,
197            required_dist: self.required_dist,
198            required_order: self.required_order,
199            out_fields: self.out_fields,
200            out_names: self.out_names,
201        }
202    }
203
204    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
205    /// b, ..)`.
206    ///
207    /// If the number of names does not match the number of output fields, an error is returned.
208    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
209        if out_names.len() != self.out_fields.count_ones(..) {
210            Err(ErrorCode::InvalidInputSyntax(
211                "number of column names does not match number of columns".to_owned(),
212            ))?
213        }
214        self.out_names = out_names;
215        Ok(())
216    }
217
218    /// Get the plan root's schema, only including the fields to be output.
219    pub fn schema(&self) -> Schema {
220        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
221        // as a field and always construct one on demand here to keep it in sync.
222        Schema {
223            fields: self
224                .out_fields
225                .ones()
226                .map(|i| self.plan.schema().fields()[i].clone())
227                .zip_eq_debug(&self.out_names)
228                .map(|(field, name)| Field {
229                    name: name.clone(),
230                    ..field
231                })
232                .collect(),
233        }
234    }
235}
236
237impl LogicalPlanRoot {
238    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
239    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
240    /// (`out_fields`).
241    pub fn into_unordered_subplan(self) -> PlanRef {
242        assert_eq!(self.plan.convention(), Convention::Logical);
243        if self.out_fields.count_ones(..) == self.out_fields.len() {
244            return self.plan;
245        }
246        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
247    }
248
249    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
250    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
251    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
252    pub fn into_array_agg(self) -> Result<PlanRef> {
253        use generic::Agg;
254        use plan_node::PlanAggCall;
255        use risingwave_common::types::ListValue;
256        use risingwave_expr::aggregate::PbAggKind;
257
258        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
259        use crate::utils::{Condition, IndexSet};
260
261        assert_eq!(self.plan.convention(), Convention::Logical);
262        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
263            bail!("subquery must return only one column");
264        };
265        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
266        let return_type = DataType::List(input_column_type.clone().into());
267        let agg = Agg::new(
268            vec![PlanAggCall {
269                agg_type: PbAggKind::ArrayAgg.into(),
270                return_type: return_type.clone(),
271                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
272                distinct: false,
273                order_by: self.required_order.column_orders,
274                filter: Condition::true_cond(),
275                direct_args: vec![],
276            }],
277            IndexSet::empty(),
278            self.plan,
279        );
280        Ok(LogicalProject::create(
281            agg.into(),
282            vec![
283                FunctionCall::new(
284                    ExprType::Coalesce,
285                    vec![
286                        InputRef::new(0, return_type).into(),
287                        ExprImpl::literal_list(
288                            ListValue::empty(&input_column_type),
289                            input_column_type,
290                        ),
291                    ],
292                )
293                .unwrap()
294                .into(),
295            ],
296        ))
297    }
298
299    /// Apply logical optimization to the plan for stream.
300    pub fn gen_optimized_logical_plan_for_stream(
301        mut self,
302    ) -> Result<StreamOptimizedLogicalPlanRoot> {
303        assert_eq!(self.plan.convention(), Convention::Logical);
304        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
305        assert_eq!(self.plan.convention(), Convention::Logical);
306        Ok(self.into_phase())
307    }
308
309    /// Apply logical optimization to the plan for batch.
310    pub fn gen_optimized_logical_plan_for_batch(mut self) -> Result<BatchOptimizedLogicalPlanRoot> {
311        assert_eq!(self.plan.convention(), Convention::Logical);
312        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
313        assert_eq!(self.plan.convention(), Convention::Logical);
314        Ok(self.into_phase())
315    }
316
317    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318        self.gen_optimized_logical_plan_for_batch()?
319            .gen_batch_plan()
320    }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324    /// Optimize and generate a singleton batch physical plan without exchange nodes.
325    pub fn gen_batch_plan(mut self) -> Result<BatchPlanRoot> {
326        assert_eq!(self.plan.convention(), Convention::Logical);
327
328        let mut plan = self.plan;
329
330        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
331            return Err(ErrorCode::NotSupported(
332                "do not support temporal join for batch queries".to_owned(),
333                "please use temporal join in streaming queries".to_owned(),
334            )
335            .into());
336        }
337
338        let ctx = plan.ctx();
339        // Inline session timezone mainly for rewriting now()
340        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
341
342        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
343        plan = const_eval_exprs(plan)?;
344
345        if ctx.is_explain_trace() {
346            ctx.trace("Const eval exprs:");
347            ctx.trace(plan.explain_to_string());
348        }
349
350        // Convert to physical plan node
351        plan = plan.to_batch_with_order_required(&self.required_order)?;
352        if ctx.is_explain_trace() {
353            ctx.trace("To Batch Plan:");
354            ctx.trace(plan.explain_to_string());
355        }
356
357        plan = plan.optimize_by_rules(&OptimizationStage::new(
358            "Merge BatchProject",
359            vec![BatchProjectMergeRule::create()],
360            ApplyOrder::BottomUp,
361        ))?;
362
363        // Inline session timezone
364        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
365
366        if ctx.is_explain_trace() {
367            ctx.trace("Inline Session Timezone:");
368            ctx.trace(plan.explain_to_string());
369        }
370
371        #[cfg(debug_assertions)]
372        InputRefValidator.validate(plan.clone());
373        assert!(
374            *plan.distribution() == Distribution::Single,
375            "{}",
376            plan.explain_to_string()
377        );
378        assert!(
379            !has_batch_exchange(plan.clone()),
380            "{}",
381            plan.explain_to_string()
382        );
383
384        let ctx = plan.ctx();
385        if ctx.is_explain_trace() {
386            ctx.trace("To Batch Physical Plan:");
387            ctx.trace(plan.explain_to_string());
388        }
389
390        self.plan = plan;
391        assert_eq!(self.plan.convention(), Convention::Batch);
392        Ok(self.into_phase())
393    }
394}
395
396impl BatchPlanRoot {
397    /// Optimize and generate a batch query plan for distributed execution.
398    pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
399        assert_eq!(self.plan.convention(), Convention::Batch);
400        self.required_dist = RequiredDist::single();
401        let mut plan = self.plan;
402
403        // Convert to distributed plan
404        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
405
406        // Add Project if the any position of `self.out_fields` is set to zero.
407        if self.out_fields.count_ones(..) != self.out_fields.len() {
408            plan =
409                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
410        }
411
412        let ctx = plan.ctx();
413        if ctx.is_explain_trace() {
414            ctx.trace("To Batch Distributed Plan:");
415            ctx.trace(plan.explain_to_string());
416        }
417        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
418            plan =
419                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
420        }
421
422        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
423        let plan = plan.optimize_by_rules(&OptimizationStage::new(
424            "Push Limit To Scan",
425            vec![BatchPushLimitToScanRule::create()],
426            ApplyOrder::BottomUp,
427        ))?;
428
429        let plan = plan.optimize_by_rules(&OptimizationStage::new(
430            "Iceberg Count Star",
431            vec![BatchIcebergCountStar::create()],
432            ApplyOrder::TopDown,
433        ))?;
434
435        // For iceberg scan, we do iceberg predicate pushdown
436        // BatchFilter -> BatchIcebergScan
437        let plan = plan.optimize_by_rules(&OptimizationStage::new(
438            "Iceberg Predicate Pushdown",
439            vec![BatchIcebergPredicatePushDownRule::create()],
440            ApplyOrder::BottomUp,
441        ))?;
442
443        assert_eq!(plan.convention(), Convention::Batch);
444        Ok(plan)
445    }
446
447    /// Optimize and generate a batch query plan for local execution.
448    pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
449        assert_eq!(self.plan.convention(), Convention::Batch);
450        let mut plan = self.plan;
451
452        // Convert to local plan node
453        plan = plan.to_local_with_order_required(&self.required_order)?;
454
455        // We remark that since the `to_local_with_order_required` does not enforce single
456        // distribution, we enforce at the root if needed.
457        let insert_exchange = match plan.distribution() {
458            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
459            _ => true,
460        };
461        if insert_exchange {
462            plan =
463                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
464        }
465
466        // Add Project if the any position of `self.out_fields` is set to zero.
467        if self.out_fields.count_ones(..) != self.out_fields.len() {
468            plan =
469                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
470        }
471
472        let ctx = plan.ctx();
473        if ctx.is_explain_trace() {
474            ctx.trace("To Batch Local Plan:");
475            ctx.trace(plan.explain_to_string());
476        }
477
478        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
479        let plan = plan.optimize_by_rules(&OptimizationStage::new(
480            "Push Limit To Scan",
481            vec![BatchPushLimitToScanRule::create()],
482            ApplyOrder::BottomUp,
483        ))?;
484
485        let plan = plan.optimize_by_rules(&OptimizationStage::new(
486            "Iceberg Count Star",
487            vec![BatchIcebergCountStar::create()],
488            ApplyOrder::TopDown,
489        ))?;
490
491        assert_eq!(plan.convention(), Convention::Batch);
492        Ok(plan)
493    }
494}
495
496impl LogicalPlanRoot {
497    /// Generate optimized stream plan
498    fn gen_optimized_stream_plan(
499        self,
500        emit_on_window_close: bool,
501        allow_snapshot_backfill: bool,
502    ) -> Result<StreamOptimizedLogicalPlanRoot> {
503        assert_eq!(self.plan.convention(), Convention::Logical);
504        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
505            StreamScanType::SnapshotBackfill
506        } else if self.should_use_arrangement_backfill() {
507            StreamScanType::ArrangementBackfill
508        } else {
509            StreamScanType::Backfill
510        };
511        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
512    }
513
514    fn gen_optimized_stream_plan_inner(
515        self,
516        emit_on_window_close: bool,
517        stream_scan_type: StreamScanType,
518    ) -> Result<StreamOptimizedLogicalPlanRoot> {
519        assert_eq!(self.plan.convention(), Convention::Logical);
520        let ctx = self.plan.ctx();
521        let _explain_trace = ctx.is_explain_trace();
522
523        let mut optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
524        let mut plan = optimized_plan.plan;
525
526        plan = plan.optimize_by_rules(&OptimizationStage::new(
527            "Merge StreamProject",
528            vec![StreamProjectMergeRule::create()],
529            ApplyOrder::BottomUp,
530        ))?;
531
532        if ctx
533            .session_ctx()
534            .config()
535            .streaming_separate_consecutive_join()
536        {
537            plan = plan.optimize_by_rules(&OptimizationStage::new(
538                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
539                vec![SeparateConsecutiveJoinRule::create()],
540                ApplyOrder::BottomUp,
541            ))?;
542        }
543
544        // Add Logstore for Unaligned join
545        // Apply this BEFORE delta join rule, because delta join removes
546        // the join
547        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
548            plan = plan.optimize_by_rules(&OptimizationStage::new(
549                "Add Logstore for Unaligned join",
550                vec![AddLogstoreRule::create()],
551                ApplyOrder::BottomUp,
552            ))?;
553        }
554
555        if ctx.session_ctx().config().streaming_enable_delta_join() {
556            // TODO: make it a logical optimization.
557            // Rewrite joins with index to delta join
558            plan = plan.optimize_by_rules(&OptimizationStage::new(
559                "To IndexDeltaJoin",
560                vec![IndexDeltaJoinRule::create()],
561                ApplyOrder::BottomUp,
562            ))?;
563        }
564        // Inline session timezone
565        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
566
567        if ctx.is_explain_trace() {
568            ctx.trace("Inline session timezone:");
569            ctx.trace(plan.explain_to_string());
570        }
571
572        // Const eval of exprs at the last minute
573        plan = const_eval_exprs(plan)?;
574
575        if ctx.is_explain_trace() {
576            ctx.trace("Const eval exprs:");
577            ctx.trace(plan.explain_to_string());
578        }
579
580        #[cfg(debug_assertions)]
581        InputRefValidator.validate(plan.clone());
582
583        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
584            return Err(ErrorCode::NotSupported(
585                "exist dangling temporal scan".to_owned(),
586                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
587            ).into());
588        }
589
590        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
591            return Err(ErrorCode::NotSupported(
592                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
593                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
594            ).into());
595        }
596
597        optimized_plan.plan = plan;
598        assert_eq!(optimized_plan.plan.convention(), Convention::Stream);
599        Ok(optimized_plan.into_phase())
600    }
601
602    /// Generate create index or create materialize view plan.
603    fn gen_stream_plan(
604        self,
605        emit_on_window_close: bool,
606        stream_scan_type: StreamScanType,
607    ) -> Result<StreamOptimizedLogicalPlanRoot> {
608        assert_eq!(self.plan.convention(), Convention::Logical);
609        let ctx = self.plan.ctx();
610        let explain_trace = ctx.is_explain_trace();
611
612        let plan = match self.plan.convention() {
613            Convention::Logical => {
614                if !ctx
615                    .session_ctx()
616                    .config()
617                    .streaming_allow_jsonb_in_stream_key()
618                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
619                {
620                    return Err(ErrorCode::NotSupported(
621                        err,
622                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
623                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
624                    ).into());
625                }
626                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
627                let mut plan = optimized_plan.plan;
628                let out_col_change;
629                (plan, out_col_change) = {
630                    let (plan, out_col_change) =
631                        plan.logical_rewrite_for_stream(&mut Default::default())?;
632                    if out_col_change.is_injective() {
633                        (plan, out_col_change)
634                    } else {
635                        let mut output_indices = (0..plan.schema().len()).collect_vec();
636                        #[allow(unused_assignments)]
637                        let (mut map, mut target_size) = out_col_change.into_parts();
638
639                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
640                        // assert_eq!(target_size, output_indices.len());
641                        target_size = plan.schema().len();
642                        let mut tar_exists = vec![false; target_size];
643                        for i in map.iter_mut().flatten() {
644                            if tar_exists[*i] {
645                                output_indices.push(*i);
646                                *i = target_size;
647                                target_size += 1;
648                            } else {
649                                tar_exists[*i] = true;
650                            }
651                        }
652                        let plan =
653                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
654                        let out_col_change = ColIndexMapping::new(map, target_size);
655                        (plan.into(), out_col_change)
656                    }
657                };
658
659                if explain_trace {
660                    ctx.trace("Logical Rewrite For Stream:");
661                    ctx.trace(plan.explain_to_string());
662                }
663
664                optimized_plan.required_dist =
665                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
666                optimized_plan.required_order = out_col_change
667                    .rewrite_required_order(&optimized_plan.required_order)
668                    .unwrap();
669                optimized_plan.out_fields =
670                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
671                plan = plan.to_stream_with_dist_required(
672                    &optimized_plan.required_dist,
673                    &mut ToStreamContext::new_with_stream_scan_type(
674                        emit_on_window_close,
675                        stream_scan_type,
676                    ),
677                )?;
678                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
679                optimized_plan.plan = plan;
680                optimized_plan
681            }
682            _ => unreachable!(),
683        };
684
685        if explain_trace {
686            ctx.trace("To Stream Plan:");
687            ctx.trace(plan.plan.explain_to_string());
688        }
689        Ok(plan)
690    }
691
692    /// Visit the plan root and compute the cardinality.
693    ///
694    /// Panics if not called on a logical plan.
695    fn compute_cardinality(&self) -> Cardinality {
696        assert_matches!(self.plan.convention(), Convention::Logical);
697        CardinalityVisitor.visit(self.plan.clone())
698    }
699
700    /// Optimize and generate a create table plan.
701    pub fn gen_table_plan(
702        self,
703        context: OptimizerContextRef,
704        table_name: String,
705        database_id: DatabaseId,
706        schema_id: SchemaId,
707        CreateTableInfo {
708            columns,
709            pk_column_ids,
710            row_id_index,
711            watermark_descs,
712            source_catalog,
713            version,
714        }: CreateTableInfo,
715        CreateTableProps {
716            definition,
717            append_only,
718            on_conflict,
719            with_version_column,
720            webhook_info,
721            engine,
722        }: CreateTableProps,
723    ) -> Result<StreamMaterialize> {
724        assert_eq!(self.plan.convention(), Convention::Logical);
725        // Snapshot backfill is not allowed for create table
726        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
727        assert_eq!(stream_plan.plan.convention(), Convention::Stream);
728
729        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
730
731        let pk_column_indices = {
732            let mut id_to_idx = HashMap::new();
733
734            columns.iter().enumerate().for_each(|(idx, c)| {
735                id_to_idx.insert(c.column_id(), idx);
736            });
737            pk_column_ids
738                .iter()
739                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
740                .collect_vec()
741        };
742
743        fn inject_project_for_generated_column_if_needed(
744            columns: &[ColumnCatalog],
745            node: PlanRef,
746        ) -> Result<PlanRef> {
747            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
748            if let Some(exprs) = exprs {
749                let logical_project = generic::Project::new(exprs, node);
750                return Ok(StreamProject::new(logical_project).into());
751            }
752            Ok(node)
753        }
754
755        #[derive(PartialEq, Debug, Copy, Clone)]
756        enum PrimaryKeyKind {
757            UserDefinedPrimaryKey,
758            NonAppendOnlyRowIdPk,
759            AppendOnlyRowIdPk,
760        }
761
762        fn inject_dml_node(
763            columns: &[ColumnCatalog],
764            append_only: bool,
765            stream_plan: PlanRef,
766            pk_column_indices: &[usize],
767            kind: PrimaryKeyKind,
768            column_descs: Vec<ColumnDesc>,
769        ) -> Result<PlanRef> {
770            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
771
772            // Add generated columns.
773            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
774
775            dml_node = match kind {
776                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
777                    RequiredDist::hash_shard(pk_column_indices)
778                        .enforce_if_not_satisfies(dml_node, &Order::any())?
779                }
780                PrimaryKeyKind::AppendOnlyRowIdPk => {
781                    StreamExchange::new_no_shuffle(dml_node).into()
782                }
783            };
784
785            Ok(dml_node)
786        }
787
788        let kind = if let Some(row_id_index) = row_id_index {
789            assert_eq!(
790                pk_column_indices.iter().exactly_one().copied().unwrap(),
791                row_id_index
792            );
793            if append_only {
794                PrimaryKeyKind::AppendOnlyRowIdPk
795            } else {
796                PrimaryKeyKind::NonAppendOnlyRowIdPk
797            }
798        } else {
799            PrimaryKeyKind::UserDefinedPrimaryKey
800        };
801
802        let column_descs: Vec<ColumnDesc> = columns
803            .iter()
804            .filter(|&c| c.can_dml())
805            .map(|c| c.column_desc.clone())
806            .collect();
807
808        let mut not_null_idxs = vec![];
809        for (idx, column) in column_descs.iter().enumerate() {
810            if !column.nullable {
811                not_null_idxs.push(idx);
812            }
813        }
814
815        let version_column_index = if let Some(version_column) = with_version_column {
816            find_version_column_index(&columns, version_column)?
817        } else {
818            None
819        };
820
821        let with_external_source = source_catalog.is_some();
822        let union_inputs = if with_external_source {
823            let mut external_source_node = stream_plan.plan;
824            external_source_node =
825                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
826            external_source_node = match kind {
827                PrimaryKeyKind::UserDefinedPrimaryKey => {
828                    RequiredDist::hash_shard(&pk_column_indices)
829                        .enforce_if_not_satisfies(external_source_node, &Order::any())?
830                }
831
832                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
833                    StreamExchange::new_no_shuffle(external_source_node).into()
834                }
835            };
836
837            let dummy_source_node = LogicalSource::new(
838                None,
839                columns.clone(),
840                row_id_index,
841                SourceNodeKind::CreateTable,
842                context.clone(),
843                None,
844            )
845            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
846
847            let dml_node = inject_dml_node(
848                &columns,
849                append_only,
850                dummy_source_node,
851                &pk_column_indices,
852                kind,
853                column_descs,
854            )?;
855
856            vec![external_source_node, dml_node]
857        } else {
858            let dml_node = inject_dml_node(
859                &columns,
860                append_only,
861                stream_plan.plan,
862                &pk_column_indices,
863                kind,
864                column_descs,
865            )?;
866
867            vec![dml_node]
868        };
869
870        let dists = union_inputs
871            .iter()
872            .map(|input| input.distribution())
873            .unique()
874            .collect_vec();
875
876        let dist = match &dists[..] {
877            &[Distribution::SomeShard, Distribution::HashShard(_)]
878            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
879            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
880                dist.clone()
881            }
882            _ => {
883                unreachable!()
884            }
885        };
886
887        let mut stream_plan = StreamUnion::new_with_dist(
888            Union {
889                all: true,
890                inputs: union_inputs,
891                source_col: None,
892            },
893            dist.clone(),
894        )
895        .into();
896
897        // Add WatermarkFilter node.
898        if !watermark_descs.is_empty() {
899            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
900        }
901
902        // Add RowIDGen node if needed.
903        if let Some(row_id_index) = row_id_index {
904            match kind {
905                PrimaryKeyKind::UserDefinedPrimaryKey => {
906                    unreachable!()
907                }
908                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
909                    stream_plan = StreamRowIdGen::new_with_dist(
910                        stream_plan,
911                        row_id_index,
912                        Distribution::HashShard(vec![row_id_index]),
913                    )
914                    .into();
915                }
916            }
917        }
918
919        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
920
921        if let ConflictBehavior::IgnoreConflict = conflict_behavior
922            && version_column_index.is_some()
923        {
924            Err(ErrorCode::InvalidParameterValue(
925                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
926            ))?
927        }
928
929        let retention_seconds = context.with_options().retention_seconds();
930
931        let table_required_dist = {
932            let mut bitset = FixedBitSet::with_capacity(columns.len());
933            for idx in &pk_column_indices {
934                bitset.insert(*idx);
935            }
936            RequiredDist::ShardByKey(bitset)
937        };
938
939        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
940
941        if !not_null_idxs.is_empty() {
942            stream_plan =
943                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
944        }
945
946        StreamMaterialize::create_for_table(
947            stream_plan,
948            table_name,
949            database_id,
950            schema_id,
951            table_required_dist,
952            Order::any(),
953            columns,
954            definition,
955            conflict_behavior,
956            version_column_index,
957            pk_column_indices,
958            row_id_index,
959            version,
960            retention_seconds,
961            webhook_info,
962            engine,
963        )
964    }
965
966    /// Optimize and generate a create materialized view plan.
967    pub fn gen_materialize_plan(
968        self,
969        database_id: DatabaseId,
970        schema_id: SchemaId,
971        mv_name: String,
972        definition: String,
973        emit_on_window_close: bool,
974    ) -> Result<StreamMaterialize> {
975        let cardinality = self.compute_cardinality();
976        assert_eq!(self.plan.convention(), Convention::Logical);
977        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
978        assert_eq!(stream_plan.plan.convention(), Convention::Stream);
979        StreamMaterialize::create(
980            stream_plan,
981            mv_name,
982            database_id,
983            schema_id,
984            definition,
985            TableType::MaterializedView,
986            cardinality,
987            None,
988        )
989    }
990
991    /// Optimize and generate a create index plan.
992    pub fn gen_index_plan(
993        self,
994        index_name: String,
995        database_id: DatabaseId,
996        schema_id: SchemaId,
997        definition: String,
998        retention_seconds: Option<NonZeroU32>,
999    ) -> Result<StreamMaterialize> {
1000        let cardinality = self.compute_cardinality();
1001        assert_eq!(self.plan.convention(), Convention::Logical);
1002        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1003        assert_eq!(stream_plan.plan.convention(), Convention::Stream);
1004
1005        StreamMaterialize::create(
1006            stream_plan,
1007            index_name,
1008            database_id,
1009            schema_id,
1010            definition,
1011            TableType::Index,
1012            cardinality,
1013            retention_seconds,
1014        )
1015    }
1016
1017    /// Optimize and generate a create sink plan.
1018    #[allow(clippy::too_many_arguments)]
1019    pub fn gen_sink_plan(
1020        self,
1021        sink_name: String,
1022        definition: String,
1023        properties: WithOptionsSecResolved,
1024        emit_on_window_close: bool,
1025        db_name: String,
1026        sink_from_table_name: String,
1027        format_desc: Option<SinkFormatDesc>,
1028        without_backfill: bool,
1029        target_table: Option<Arc<TableCatalog>>,
1030        partition_info: Option<PartitionComputeInfo>,
1031        user_specified_columns: bool,
1032    ) -> Result<StreamSink> {
1033        let stream_scan_type = if without_backfill {
1034            StreamScanType::UpstreamOnly
1035        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1036            // Snapshot backfill on sink-into-table is not allowed
1037            StreamScanType::SnapshotBackfill
1038        } else if self.should_use_arrangement_backfill() {
1039            StreamScanType::ArrangementBackfill
1040        } else {
1041            StreamScanType::Backfill
1042        };
1043        assert_eq!(self.plan.convention(), Convention::Logical);
1044        let stream_plan =
1045            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1046        assert_eq!(stream_plan.plan.convention(), Convention::Stream);
1047        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1048            let columns = t.columns_without_rw_timestamp();
1049            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1050        });
1051        StreamSink::create(
1052            stream_plan,
1053            sink_name,
1054            db_name,
1055            sink_from_table_name,
1056            target_table,
1057            target_columns_to_plan_mapping,
1058            definition,
1059            properties,
1060            format_desc,
1061            partition_info,
1062        )
1063    }
1064}
1065
1066impl<P: PlanPhase> PlanRoot<P> {
1067    pub fn should_use_arrangement_backfill(&self) -> bool {
1068        let ctx = self.plan.ctx();
1069        let session_ctx = ctx.session_ctx();
1070        let arrangement_backfill_enabled = session_ctx
1071            .env()
1072            .streaming_config()
1073            .developer
1074            .enable_arrangement_backfill;
1075        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1076    }
1077
1078    pub fn should_use_snapshot_backfill(&self) -> bool {
1079        self.plan
1080            .ctx()
1081            .session_ctx()
1082            .config()
1083            .streaming_use_snapshot_backfill()
1084    }
1085
1086    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1087    pub fn target_columns_to_plan_mapping(
1088        &self,
1089        tar_cols: &[ColumnCatalog],
1090        user_specified_columns: bool,
1091    ) -> Vec<Option<usize>> {
1092        #[allow(clippy::disallowed_methods)]
1093        let visible_cols: Vec<(usize, String)> = self
1094            .out_fields
1095            .ones()
1096            .zip_eq(self.out_names.iter().cloned())
1097            .collect_vec();
1098
1099        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1100        let visible_col_idxes_by_name = visible_cols
1101            .iter()
1102            .map(|(i, name)| (name.as_ref(), *i))
1103            .collect::<BTreeMap<_, _>>();
1104
1105        tar_cols
1106            .iter()
1107            .enumerate()
1108            .filter(|(_, tar_col)| tar_col.can_dml())
1109            .map(|(tar_i, tar_col)| {
1110                if user_specified_columns {
1111                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1112                } else {
1113                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1114                }
1115            })
1116            .collect()
1117    }
1118}
1119
1120fn find_version_column_index(
1121    column_catalog: &Vec<ColumnCatalog>,
1122    version_column_name: String,
1123) -> Result<Option<usize>> {
1124    for (index, column) in column_catalog.iter().enumerate() {
1125        if column.column_desc.name == version_column_name {
1126            if let &DataType::Jsonb
1127            | &DataType::List(_)
1128            | &DataType::Struct(_)
1129            | &DataType::Bytea
1130            | &DataType::Boolean = column.data_type()
1131            {
1132                Err(ErrorCode::InvalidParameterValue(
1133                    "The specified version column data type is invalid.".to_owned(),
1134                ))?
1135            }
1136            return Ok(Some(index));
1137        }
1138    }
1139    Err(ErrorCode::InvalidParameterValue(
1140        "The specified version column name is not in the current columns.".to_owned(),
1141    ))?
1142}
1143
1144fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1145    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1146
1147    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1148    if let Some(error) = const_eval_rewriter.error {
1149        return Err(error);
1150    }
1151    Ok(plan)
1152}
1153
1154fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1155    let mut v = TimestamptzExprFinder::default();
1156    plan.visit_exprs_recursive(&mut v);
1157    if v.has() {
1158        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1159    } else {
1160        Ok(plan)
1161    }
1162}
1163
1164fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1165    if plan.node_type() == PlanNodeType::BatchExchange {
1166        return false;
1167    }
1168    is_candidate(plan)
1169        || plan
1170            .inputs()
1171            .iter()
1172            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1173}
1174
1175/// As we always run the root stage locally, for some plan in root stage which need to execute in
1176/// compute node we insert an additional exhchange before it to avoid to include it in the root
1177/// stage.
1178///
1179/// Returns `true` if we must insert an additional exchange to ensure this.
1180fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1181    fn is_user_table(plan: &PlanRef) -> bool {
1182        plan.node_type() == PlanNodeType::BatchSeqScan
1183    }
1184
1185    fn is_log_table(plan: &PlanRef) -> bool {
1186        plan.node_type() == PlanNodeType::BatchLogSeqScan
1187    }
1188
1189    fn is_source(plan: &PlanRef) -> bool {
1190        plan.node_type() == PlanNodeType::BatchSource
1191            || plan.node_type() == PlanNodeType::BatchKafkaScan
1192            || plan.node_type() == PlanNodeType::BatchIcebergScan
1193    }
1194
1195    fn is_insert(plan: &PlanRef) -> bool {
1196        plan.node_type() == PlanNodeType::BatchInsert
1197    }
1198
1199    fn is_update(plan: &PlanRef) -> bool {
1200        plan.node_type() == PlanNodeType::BatchUpdate
1201    }
1202
1203    fn is_delete(plan: &PlanRef) -> bool {
1204        plan.node_type() == PlanNodeType::BatchDelete
1205    }
1206
1207    assert_eq!(plan.distribution(), &Distribution::Single);
1208    exist_and_no_exchange_before(&plan, is_user_table)
1209        || exist_and_no_exchange_before(&plan, is_source)
1210        || exist_and_no_exchange_before(&plan, is_insert)
1211        || exist_and_no_exchange_before(&plan, is_update)
1212        || exist_and_no_exchange_before(&plan, is_delete)
1213        || exist_and_no_exchange_before(&plan, is_log_table)
1214}
1215
1216/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1217/// them for the different requirement of plan node in different execute mode.
1218fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1219    fn is_user_table(plan: &PlanRef) -> bool {
1220        plan.node_type() == PlanNodeType::BatchSeqScan
1221    }
1222
1223    fn is_source(plan: &PlanRef) -> bool {
1224        plan.node_type() == PlanNodeType::BatchSource
1225            || plan.node_type() == PlanNodeType::BatchKafkaScan
1226            || plan.node_type() == PlanNodeType::BatchIcebergScan
1227    }
1228
1229    fn is_insert(plan: &PlanRef) -> bool {
1230        plan.node_type() == PlanNodeType::BatchInsert
1231    }
1232
1233    assert_eq!(plan.distribution(), &Distribution::Single);
1234    exist_and_no_exchange_before(&plan, is_user_table)
1235        || exist_and_no_exchange_before(&plan, is_source)
1236        || exist_and_no_exchange_before(&plan, is_insert)
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241    use super::*;
1242    use crate::optimizer::plan_node::LogicalValues;
1243
1244    #[tokio::test]
1245    async fn test_as_subplan() {
1246        let ctx = OptimizerContext::mock().await;
1247        let values = LogicalValues::new(
1248            vec![],
1249            Schema::new(vec![
1250                Field::with_name(DataType::Int32, "v1"),
1251                Field::with_name(DataType::Varchar, "v2"),
1252            ]),
1253            ctx,
1254        )
1255        .into();
1256        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1257        let out_names = vec!["v1".into()];
1258        let root = PlanRoot::new_with_logical_plan(
1259            values,
1260            RequiredDist::Any,
1261            Order::any(),
1262            out_fields,
1263            out_names,
1264        );
1265        let subplan = root.into_unordered_subplan();
1266        assert_eq!(
1267            subplan.schema(),
1268            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1269        );
1270    }
1271}