risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35    SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::assert_matches::assert_matches;
46use std::collections::{BTreeMap, HashMap};
47
48use fixedbitset::FixedBitSet;
49use itertools::Itertools as _;
50pub use logical_optimization::*;
51pub use optimizer_context::*;
52use plan_expr_rewriter::ConstEvalRewriter;
53use property::Order;
54use risingwave_common::bail;
55use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
56use risingwave_common::types::DataType;
57use risingwave_common::util::column_index_mapping::ColIndexMapping;
58use risingwave_common::util::iter_util::ZipEqDebug;
59use risingwave_connector::sink::catalog::SinkFormatDesc;
60use risingwave_pb::stream_plan::StreamScanType;
61
62use self::heuristic_optimizer::ApplyOrder;
63use self::plan_node::generic::{self, PhysicalPlanRef};
64use self::plan_node::{
65    BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
66    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
67    ToStreamContext, stream_enforce_eowc_requirement,
68};
69#[cfg(debug_assertions)]
70use self::plan_visitor::InputRefValidator;
71use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
72use self::property::{Cardinality, RequiredDist};
73use self::rule::*;
74use crate::TableCatalog;
75use crate::catalog::table_catalog::TableType;
76use crate::catalog::{DatabaseId, SchemaId};
77use crate::error::{ErrorCode, Result};
78use crate::expr::TimestamptzExprFinder;
79use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
80use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
81use crate::optimizer::plan_node::{
82    BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
83    ToStream, VisitExprsRecursive,
84};
85use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
86use crate::optimizer::property::Distribution;
87use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
88
89/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
90/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
91/// batch plan with optimization. the required Order and Distribution columns might be more than the
92/// output columns. for example:
93/// ```sql
94///    select v1 from t order by id;
95/// ```
96/// the plan will return two columns (id, v1), and the required order column is id. the id
97/// column is required in optimization, but the final generated plan will remove the unnecessary
98/// column in the result.
99#[derive(Debug, Clone)]
100pub struct PlanRoot {
101    // The current plan node.
102    plan: PlanRef,
103    // The phase of the plan.
104    phase: PlanPhase,
105    required_dist: RequiredDist,
106    required_order: Order,
107    out_fields: FixedBitSet,
108    out_names: Vec<String>,
109}
110
111/// `PlanPhase` is used to track the phase of the `PlanRoot`.
112/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
113/// Typical phase transformation are:
114/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
115/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
116#[derive(Debug, Clone, PartialEq)]
117pub enum PlanPhase {
118    Logical,
119    OptimizedLogicalForBatch,
120    OptimizedLogicalForStream,
121    Batch,
122    Stream,
123}
124
125impl PlanRoot {
126    pub fn new_with_logical_plan(
127        plan: PlanRef,
128        required_dist: RequiredDist,
129        required_order: Order,
130        out_fields: FixedBitSet,
131        out_names: Vec<String>,
132    ) -> Self {
133        assert_eq!(plan.convention(), Convention::Logical);
134        Self::new_inner(
135            plan,
136            PlanPhase::Logical,
137            required_dist,
138            required_order,
139            out_fields,
140            out_names,
141        )
142    }
143
144    pub fn new_with_batch_plan(
145        plan: PlanRef,
146        required_dist: RequiredDist,
147        required_order: Order,
148        out_fields: FixedBitSet,
149        out_names: Vec<String>,
150    ) -> Self {
151        assert_eq!(plan.convention(), Convention::Batch);
152        Self::new_inner(
153            plan,
154            PlanPhase::Batch,
155            required_dist,
156            required_order,
157            out_fields,
158            out_names,
159        )
160    }
161
162    fn new_inner(
163        plan: PlanRef,
164        phase: PlanPhase,
165        required_dist: RequiredDist,
166        required_order: Order,
167        out_fields: FixedBitSet,
168        out_names: Vec<String>,
169    ) -> Self {
170        let input_schema = plan.schema();
171        assert_eq!(input_schema.fields().len(), out_fields.len());
172        assert_eq!(out_fields.count_ones(..), out_names.len());
173
174        Self {
175            plan,
176            phase,
177            required_dist,
178            required_order,
179            out_fields,
180            out_names,
181        }
182    }
183
184    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
185    /// b, ..)`.
186    ///
187    /// If the number of names does not match the number of output fields, an error is returned.
188    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
189        if out_names.len() != self.out_fields.count_ones(..) {
190            Err(ErrorCode::InvalidInputSyntax(
191                "number of column names does not match number of columns".to_owned(),
192            ))?
193        }
194        self.out_names = out_names;
195        Ok(())
196    }
197
198    /// Get the plan root's schema, only including the fields to be output.
199    pub fn schema(&self) -> Schema {
200        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
201        // as a field and always construct one on demand here to keep it in sync.
202        Schema {
203            fields: self
204                .out_fields
205                .ones()
206                .map(|i| self.plan.schema().fields()[i].clone())
207                .zip_eq_debug(&self.out_names)
208                .map(|(field, name)| Field {
209                    name: name.clone(),
210                    ..field
211                })
212                .collect(),
213        }
214    }
215
216    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
217    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
218    /// (`out_fields`).
219    pub fn into_unordered_subplan(self) -> PlanRef {
220        assert_eq!(self.phase, PlanPhase::Logical);
221        assert_eq!(self.plan.convention(), Convention::Logical);
222        if self.out_fields.count_ones(..) == self.out_fields.len() {
223            return self.plan;
224        }
225        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
226    }
227
228    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
229    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
230    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
231    pub fn into_array_agg(self) -> Result<PlanRef> {
232        use generic::Agg;
233        use plan_node::PlanAggCall;
234        use risingwave_common::types::ListValue;
235        use risingwave_expr::aggregate::PbAggKind;
236
237        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
238        use crate::utils::{Condition, IndexSet};
239
240        assert_eq!(self.phase, PlanPhase::Logical);
241        assert_eq!(self.plan.convention(), Convention::Logical);
242        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
243            bail!("subquery must return only one column");
244        };
245        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
246        let return_type = DataType::List(input_column_type.clone().into());
247        let agg = Agg::new(
248            vec![PlanAggCall {
249                agg_type: PbAggKind::ArrayAgg.into(),
250                return_type: return_type.clone(),
251                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
252                distinct: false,
253                order_by: self.required_order.column_orders,
254                filter: Condition::true_cond(),
255                direct_args: vec![],
256            }],
257            IndexSet::empty(),
258            self.plan,
259        );
260        Ok(LogicalProject::create(
261            agg.into(),
262            vec![
263                FunctionCall::new(
264                    ExprType::Coalesce,
265                    vec![
266                        InputRef::new(0, return_type).into(),
267                        ExprImpl::literal_list(
268                            ListValue::empty(&input_column_type),
269                            input_column_type,
270                        ),
271                    ],
272                )
273                .unwrap()
274                .into(),
275            ],
276        ))
277    }
278
279    /// Apply logical optimization to the plan for stream.
280    pub fn gen_optimized_logical_plan_for_stream(&mut self) -> Result<PlanRef> {
281        assert_eq!(self.phase, PlanPhase::Logical);
282        assert_eq!(self.plan.convention(), Convention::Logical);
283        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
284        self.phase = PlanPhase::OptimizedLogicalForStream;
285        assert_eq!(self.plan.convention(), Convention::Logical);
286        Ok(self.plan.clone())
287    }
288
289    /// Apply logical optimization to the plan for batch.
290    pub fn gen_optimized_logical_plan_for_batch(&mut self) -> Result<PlanRef> {
291        assert_eq!(self.phase, PlanPhase::Logical);
292        assert_eq!(self.plan.convention(), Convention::Logical);
293        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
294        self.phase = PlanPhase::OptimizedLogicalForBatch;
295        assert_eq!(self.plan.convention(), Convention::Logical);
296        Ok(self.plan.clone())
297    }
298
299    /// Optimize and generate a singleton batch physical plan without exchange nodes.
300    pub fn gen_batch_plan(&mut self) -> Result<PlanRef> {
301        assert_eq!(self.plan.convention(), Convention::Logical);
302        let mut plan = match self.phase {
303            PlanPhase::Logical => {
304                // Logical optimization
305                self.gen_optimized_logical_plan_for_batch()?
306            }
307            PlanPhase::OptimizedLogicalForBatch => self.plan.clone(),
308            PlanPhase::Batch | PlanPhase::OptimizedLogicalForStream | PlanPhase::Stream => {
309                panic!("unexpected phase")
310            }
311        };
312
313        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
314            return Err(ErrorCode::NotSupported(
315                "do not support temporal join for batch queries".to_owned(),
316                "please use temporal join in streaming queries".to_owned(),
317            )
318            .into());
319        }
320
321        let ctx = plan.ctx();
322        // Inline session timezone mainly for rewriting now()
323        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
324
325        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
326        plan = const_eval_exprs(plan)?;
327
328        if ctx.is_explain_trace() {
329            ctx.trace("Const eval exprs:");
330            ctx.trace(plan.explain_to_string());
331        }
332
333        // Convert to physical plan node
334        plan = plan.to_batch_with_order_required(&self.required_order)?;
335        if ctx.is_explain_trace() {
336            ctx.trace("To Batch Plan:");
337            ctx.trace(plan.explain_to_string());
338        }
339
340        plan = plan.optimize_by_rules(&OptimizationStage::new(
341            "Merge BatchProject",
342            vec![BatchProjectMergeRule::create()],
343            ApplyOrder::BottomUp,
344        ))?;
345
346        // Inline session timezone
347        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
348
349        if ctx.is_explain_trace() {
350            ctx.trace("Inline Session Timezone:");
351            ctx.trace(plan.explain_to_string());
352        }
353
354        #[cfg(debug_assertions)]
355        InputRefValidator.validate(plan.clone());
356        assert!(
357            *plan.distribution() == Distribution::Single,
358            "{}",
359            plan.explain_to_string()
360        );
361        assert!(
362            !has_batch_exchange(plan.clone()),
363            "{}",
364            plan.explain_to_string()
365        );
366
367        let ctx = plan.ctx();
368        if ctx.is_explain_trace() {
369            ctx.trace("To Batch Physical Plan:");
370            ctx.trace(plan.explain_to_string());
371        }
372
373        self.plan = plan;
374        self.phase = PlanPhase::Batch;
375        assert_eq!(self.plan.convention(), Convention::Batch);
376        Ok(self.plan.clone())
377    }
378
379    /// Optimize and generate a batch query plan for distributed execution.
380    pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
381        assert_eq!(self.phase, PlanPhase::Batch);
382        assert_eq!(self.plan.convention(), Convention::Batch);
383        self.required_dist = RequiredDist::single();
384        let mut plan = self.plan;
385
386        // Convert to distributed plan
387        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
388
389        // Add Project if the any position of `self.out_fields` is set to zero.
390        if self.out_fields.count_ones(..) != self.out_fields.len() {
391            plan =
392                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
393        }
394
395        let ctx = plan.ctx();
396        if ctx.is_explain_trace() {
397            ctx.trace("To Batch Distributed Plan:");
398            ctx.trace(plan.explain_to_string());
399        }
400        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
401            plan =
402                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
403        }
404
405        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
406        let plan = plan.optimize_by_rules(&OptimizationStage::new(
407            "Push Limit To Scan",
408            vec![BatchPushLimitToScanRule::create()],
409            ApplyOrder::BottomUp,
410        ))?;
411
412        let plan = plan.optimize_by_rules(&OptimizationStage::new(
413            "Iceberg Count Star",
414            vec![BatchIcebergCountStar::create()],
415            ApplyOrder::TopDown,
416        ))?;
417
418        // For iceberg scan, we do iceberg predicate pushdown
419        // BatchFilter -> BatchIcebergScan
420        let plan = plan.optimize_by_rules(&OptimizationStage::new(
421            "Iceberg Predicate Pushdown",
422            vec![BatchIcebergPredicatePushDownRule::create()],
423            ApplyOrder::BottomUp,
424        ))?;
425
426        assert_eq!(plan.convention(), Convention::Batch);
427        Ok(plan)
428    }
429
430    /// Optimize and generate a batch query plan for local execution.
431    pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
432        assert_eq!(self.phase, PlanPhase::Batch);
433        assert_eq!(self.plan.convention(), Convention::Batch);
434        let mut plan = self.plan;
435
436        // Convert to local plan node
437        plan = plan.to_local_with_order_required(&self.required_order)?;
438
439        // We remark that since the `to_local_with_order_required` does not enforce single
440        // distribution, we enforce at the root if needed.
441        let insert_exchange = match plan.distribution() {
442            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
443            _ => true,
444        };
445        if insert_exchange {
446            plan =
447                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
448        }
449
450        // Add Project if the any position of `self.out_fields` is set to zero.
451        if self.out_fields.count_ones(..) != self.out_fields.len() {
452            plan =
453                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
454        }
455
456        let ctx = plan.ctx();
457        if ctx.is_explain_trace() {
458            ctx.trace("To Batch Local Plan:");
459            ctx.trace(plan.explain_to_string());
460        }
461
462        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
463        let plan = plan.optimize_by_rules(&OptimizationStage::new(
464            "Push Limit To Scan",
465            vec![BatchPushLimitToScanRule::create()],
466            ApplyOrder::BottomUp,
467        ))?;
468
469        let plan = plan.optimize_by_rules(&OptimizationStage::new(
470            "Iceberg Count Star",
471            vec![BatchIcebergCountStar::create()],
472            ApplyOrder::TopDown,
473        ))?;
474
475        assert_eq!(plan.convention(), Convention::Batch);
476        Ok(plan)
477    }
478
479    /// Generate optimized stream plan
480    fn gen_optimized_stream_plan(
481        &mut self,
482        emit_on_window_close: bool,
483        allow_snapshot_backfill: bool,
484    ) -> Result<PlanRef> {
485        assert_eq!(self.phase, PlanPhase::Logical);
486        assert_eq!(self.plan.convention(), Convention::Logical);
487        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
488            StreamScanType::SnapshotBackfill
489        } else if self.should_use_arrangement_backfill() {
490            StreamScanType::ArrangementBackfill
491        } else {
492            StreamScanType::Backfill
493        };
494        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
495    }
496
497    fn gen_optimized_stream_plan_inner(
498        &mut self,
499        emit_on_window_close: bool,
500        stream_scan_type: StreamScanType,
501    ) -> Result<PlanRef> {
502        assert_eq!(self.phase, PlanPhase::Logical);
503        assert_eq!(self.plan.convention(), Convention::Logical);
504        let ctx = self.plan.ctx();
505        let _explain_trace = ctx.is_explain_trace();
506
507        let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
508
509        plan = plan.optimize_by_rules(&OptimizationStage::new(
510            "Merge StreamProject",
511            vec![StreamProjectMergeRule::create()],
512            ApplyOrder::BottomUp,
513        ))?;
514
515        // Add Logstore for Unaligned join
516        // Apply this BEFORE delta join rule, because delta join removes
517        // the join
518        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
519            plan = plan.optimize_by_rules(&OptimizationStage::new(
520                "Add Logstore for Unaligned join",
521                vec![AddLogstoreRule::create()],
522                ApplyOrder::BottomUp,
523            ))?;
524        }
525
526        if ctx.session_ctx().config().streaming_enable_delta_join() {
527            // TODO: make it a logical optimization.
528            // Rewrite joins with index to delta join
529            plan = plan.optimize_by_rules(&OptimizationStage::new(
530                "To IndexDeltaJoin",
531                vec![IndexDeltaJoinRule::create()],
532                ApplyOrder::BottomUp,
533            ))?;
534        }
535        // Inline session timezone
536        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
537
538        if ctx.is_explain_trace() {
539            ctx.trace("Inline session timezone:");
540            ctx.trace(plan.explain_to_string());
541        }
542
543        // Const eval of exprs at the last minute
544        plan = const_eval_exprs(plan)?;
545
546        if ctx.is_explain_trace() {
547            ctx.trace("Const eval exprs:");
548            ctx.trace(plan.explain_to_string());
549        }
550
551        #[cfg(debug_assertions)]
552        InputRefValidator.validate(plan.clone());
553
554        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
555            return Err(ErrorCode::NotSupported(
556                "exist dangling temporal scan".to_owned(),
557                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
558            ).into());
559        }
560
561        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
562            return Err(ErrorCode::NotSupported(
563                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
564                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
565            ).into());
566        }
567
568        self.plan = plan;
569        self.phase = PlanPhase::Stream;
570        assert_eq!(self.plan.convention(), Convention::Stream);
571        Ok(self.plan.clone())
572    }
573
574    /// Generate create index or create materialize view plan.
575    fn gen_stream_plan(
576        &mut self,
577        emit_on_window_close: bool,
578        stream_scan_type: StreamScanType,
579    ) -> Result<PlanRef> {
580        assert_eq!(self.phase, PlanPhase::Logical);
581        assert_eq!(self.plan.convention(), Convention::Logical);
582        let ctx = self.plan.ctx();
583        let explain_trace = ctx.is_explain_trace();
584
585        let plan = match self.plan.convention() {
586            Convention::Logical => {
587                if !ctx
588                    .session_ctx()
589                    .config()
590                    .streaming_allow_jsonb_in_stream_key()
591                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
592                {
593                    return Err(ErrorCode::NotSupported(
594                        err,
595                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
596                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
597                    ).into());
598                }
599                let plan = self.gen_optimized_logical_plan_for_stream()?;
600                let (plan, out_col_change) = {
601                    let (plan, out_col_change) =
602                        plan.logical_rewrite_for_stream(&mut Default::default())?;
603                    if out_col_change.is_injective() {
604                        (plan, out_col_change)
605                    } else {
606                        let mut output_indices = (0..plan.schema().len()).collect_vec();
607                        #[allow(unused_assignments)]
608                        let (mut map, mut target_size) = out_col_change.into_parts();
609
610                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
611                        // assert_eq!(target_size, output_indices.len());
612                        target_size = plan.schema().len();
613                        let mut tar_exists = vec![false; target_size];
614                        for i in map.iter_mut().flatten() {
615                            if tar_exists[*i] {
616                                output_indices.push(*i);
617                                *i = target_size;
618                                target_size += 1;
619                            } else {
620                                tar_exists[*i] = true;
621                            }
622                        }
623                        let plan =
624                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
625                        let out_col_change = ColIndexMapping::new(map, target_size);
626                        (plan.into(), out_col_change)
627                    }
628                };
629
630                if explain_trace {
631                    ctx.trace("Logical Rewrite For Stream:");
632                    ctx.trace(plan.explain_to_string());
633                }
634
635                self.required_dist =
636                    out_col_change.rewrite_required_distribution(&self.required_dist);
637                self.required_order = out_col_change
638                    .rewrite_required_order(&self.required_order)
639                    .unwrap();
640                self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
641                let plan = plan.to_stream_with_dist_required(
642                    &self.required_dist,
643                    &mut ToStreamContext::new_with_stream_scan_type(
644                        emit_on_window_close,
645                        stream_scan_type,
646                    ),
647                )?;
648                stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)
649            }
650            _ => unreachable!(),
651        }?;
652
653        if explain_trace {
654            ctx.trace("To Stream Plan:");
655            ctx.trace(plan.explain_to_string());
656        }
657        Ok(plan)
658    }
659
660    /// Visit the plan root and compute the cardinality.
661    ///
662    /// Panics if not called on a logical plan.
663    fn compute_cardinality(&self) -> Cardinality {
664        assert_matches!(self.plan.convention(), Convention::Logical);
665        CardinalityVisitor.visit(self.plan.clone())
666    }
667
668    /// Optimize and generate a create table plan.
669    pub fn gen_table_plan(
670        mut self,
671        context: OptimizerContextRef,
672        table_name: String,
673        database_id: DatabaseId,
674        schema_id: SchemaId,
675        CreateTableInfo {
676            columns,
677            pk_column_ids,
678            row_id_index,
679            watermark_descs,
680            source_catalog,
681            version,
682        }: CreateTableInfo,
683        CreateTableProps {
684            definition,
685            append_only,
686            on_conflict,
687            with_version_column,
688            webhook_info,
689            engine,
690        }: CreateTableProps,
691    ) -> Result<StreamMaterialize> {
692        assert_eq!(self.phase, PlanPhase::Logical);
693        assert_eq!(self.plan.convention(), Convention::Logical);
694        // Snapshot backfill is not allowed for create table
695        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
696        assert_eq!(self.phase, PlanPhase::Stream);
697        assert_eq!(stream_plan.convention(), Convention::Stream);
698
699        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
700
701        let pk_column_indices = {
702            let mut id_to_idx = HashMap::new();
703
704            columns.iter().enumerate().for_each(|(idx, c)| {
705                id_to_idx.insert(c.column_id(), idx);
706            });
707            pk_column_ids
708                .iter()
709                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
710                .collect_vec()
711        };
712
713        fn inject_project_for_generated_column_if_needed(
714            columns: &[ColumnCatalog],
715            node: PlanRef,
716        ) -> Result<PlanRef> {
717            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
718            if let Some(exprs) = exprs {
719                let logical_project = generic::Project::new(exprs, node);
720                return Ok(StreamProject::new(logical_project).into());
721            }
722            Ok(node)
723        }
724
725        #[derive(PartialEq, Debug, Copy, Clone)]
726        enum PrimaryKeyKind {
727            UserDefinedPrimaryKey,
728            NonAppendOnlyRowIdPk,
729            AppendOnlyRowIdPk,
730        }
731
732        fn inject_dml_node(
733            columns: &[ColumnCatalog],
734            append_only: bool,
735            stream_plan: PlanRef,
736            pk_column_indices: &[usize],
737            kind: PrimaryKeyKind,
738            column_descs: Vec<ColumnDesc>,
739        ) -> Result<PlanRef> {
740            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
741
742            // Add generated columns.
743            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
744
745            dml_node = match kind {
746                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
747                    RequiredDist::hash_shard(pk_column_indices)
748                        .enforce_if_not_satisfies(dml_node, &Order::any())?
749                }
750                PrimaryKeyKind::AppendOnlyRowIdPk => {
751                    StreamExchange::new_no_shuffle(dml_node).into()
752                }
753            };
754
755            Ok(dml_node)
756        }
757
758        let kind = if let Some(row_id_index) = row_id_index {
759            assert_eq!(
760                pk_column_indices.iter().exactly_one().copied().unwrap(),
761                row_id_index
762            );
763            if append_only {
764                PrimaryKeyKind::AppendOnlyRowIdPk
765            } else {
766                PrimaryKeyKind::NonAppendOnlyRowIdPk
767            }
768        } else {
769            PrimaryKeyKind::UserDefinedPrimaryKey
770        };
771
772        let column_descs: Vec<ColumnDesc> = columns
773            .iter()
774            .filter(|&c| c.can_dml())
775            .map(|c| c.column_desc.clone())
776            .collect();
777
778        let mut not_null_idxs = vec![];
779        for (idx, column) in column_descs.iter().enumerate() {
780            if !column.nullable {
781                not_null_idxs.push(idx);
782            }
783        }
784
785        let version_column_index = if let Some(version_column) = with_version_column {
786            find_version_column_index(&columns, version_column)?
787        } else {
788            None
789        };
790
791        let with_external_source = source_catalog.is_some();
792        let union_inputs = if with_external_source {
793            let mut external_source_node = stream_plan;
794            external_source_node =
795                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
796            external_source_node = match kind {
797                PrimaryKeyKind::UserDefinedPrimaryKey => {
798                    RequiredDist::hash_shard(&pk_column_indices)
799                        .enforce_if_not_satisfies(external_source_node, &Order::any())?
800                }
801
802                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
803                    StreamExchange::new_no_shuffle(external_source_node).into()
804                }
805            };
806
807            let dummy_source_node = LogicalSource::new(
808                None,
809                columns.clone(),
810                row_id_index,
811                SourceNodeKind::CreateTable,
812                context.clone(),
813                None,
814            )
815            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
816
817            let dml_node = inject_dml_node(
818                &columns,
819                append_only,
820                dummy_source_node,
821                &pk_column_indices,
822                kind,
823                column_descs,
824            )?;
825
826            vec![external_source_node, dml_node]
827        } else {
828            let dml_node = inject_dml_node(
829                &columns,
830                append_only,
831                stream_plan,
832                &pk_column_indices,
833                kind,
834                column_descs,
835            )?;
836
837            vec![dml_node]
838        };
839
840        let dists = union_inputs
841            .iter()
842            .map(|input| input.distribution())
843            .unique()
844            .collect_vec();
845
846        let dist = match &dists[..] {
847            &[Distribution::SomeShard, Distribution::HashShard(_)]
848            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
849            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
850                dist.clone()
851            }
852            _ => {
853                unreachable!()
854            }
855        };
856
857        let mut stream_plan = StreamUnion::new_with_dist(
858            Union {
859                all: true,
860                inputs: union_inputs,
861                source_col: None,
862            },
863            dist.clone(),
864        )
865        .into();
866
867        // Add WatermarkFilter node.
868        if !watermark_descs.is_empty() {
869            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
870        }
871
872        // Add RowIDGen node if needed.
873        if let Some(row_id_index) = row_id_index {
874            match kind {
875                PrimaryKeyKind::UserDefinedPrimaryKey => {
876                    unreachable!()
877                }
878                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
879                    stream_plan = StreamRowIdGen::new_with_dist(
880                        stream_plan,
881                        row_id_index,
882                        Distribution::HashShard(vec![row_id_index]),
883                    )
884                    .into();
885                }
886            }
887        }
888
889        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
890
891        if let ConflictBehavior::IgnoreConflict = conflict_behavior
892            && version_column_index.is_some()
893        {
894            Err(ErrorCode::InvalidParameterValue(
895                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
896            ))?
897        }
898
899        let retention_seconds = context.with_options().retention_seconds();
900
901        let table_required_dist = {
902            let mut bitset = FixedBitSet::with_capacity(columns.len());
903            for idx in &pk_column_indices {
904                bitset.insert(*idx);
905            }
906            RequiredDist::ShardByKey(bitset)
907        };
908
909        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
910
911        if !not_null_idxs.is_empty() {
912            stream_plan =
913                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
914        }
915
916        StreamMaterialize::create_for_table(
917            stream_plan,
918            table_name,
919            database_id,
920            schema_id,
921            table_required_dist,
922            Order::any(),
923            columns,
924            definition,
925            conflict_behavior,
926            version_column_index,
927            pk_column_indices,
928            row_id_index,
929            version,
930            retention_seconds,
931            webhook_info,
932            engine,
933        )
934    }
935
936    /// Optimize and generate a create materialized view plan.
937    pub fn gen_materialize_plan(
938        mut self,
939        database_id: DatabaseId,
940        schema_id: SchemaId,
941        mv_name: String,
942        definition: String,
943        emit_on_window_close: bool,
944    ) -> Result<StreamMaterialize> {
945        let cardinality = self.compute_cardinality();
946        assert_eq!(self.phase, PlanPhase::Logical);
947        assert_eq!(self.plan.convention(), Convention::Logical);
948        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
949        assert_eq!(self.phase, PlanPhase::Stream);
950        assert_eq!(stream_plan.convention(), Convention::Stream);
951        StreamMaterialize::create(
952            stream_plan,
953            mv_name,
954            database_id,
955            schema_id,
956            self.required_dist.clone(),
957            self.required_order.clone(),
958            self.out_fields.clone(),
959            self.out_names.clone(),
960            definition,
961            TableType::MaterializedView,
962            cardinality,
963            None,
964        )
965    }
966
967    /// Optimize and generate a create index plan.
968    pub fn gen_index_plan(
969        mut self,
970        index_name: String,
971        database_id: DatabaseId,
972        schema_id: SchemaId,
973        definition: String,
974        retention_seconds: Option<NonZeroU32>,
975    ) -> Result<StreamMaterialize> {
976        let cardinality = self.compute_cardinality();
977        assert_eq!(self.phase, PlanPhase::Logical);
978        assert_eq!(self.plan.convention(), Convention::Logical);
979        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
980        assert_eq!(self.phase, PlanPhase::Stream);
981        assert_eq!(stream_plan.convention(), Convention::Stream);
982
983        StreamMaterialize::create(
984            stream_plan,
985            index_name,
986            database_id,
987            schema_id,
988            self.required_dist.clone(),
989            self.required_order.clone(),
990            self.out_fields.clone(),
991            self.out_names.clone(),
992            definition,
993            TableType::Index,
994            cardinality,
995            retention_seconds,
996        )
997    }
998
999    /// Optimize and generate a create sink plan.
1000    #[allow(clippy::too_many_arguments)]
1001    pub fn gen_sink_plan(
1002        &mut self,
1003        sink_name: String,
1004        definition: String,
1005        properties: WithOptionsSecResolved,
1006        emit_on_window_close: bool,
1007        db_name: String,
1008        sink_from_table_name: String,
1009        format_desc: Option<SinkFormatDesc>,
1010        without_backfill: bool,
1011        target_table: Option<Arc<TableCatalog>>,
1012        partition_info: Option<PartitionComputeInfo>,
1013        user_specified_columns: bool,
1014    ) -> Result<StreamSink> {
1015        let stream_scan_type = if without_backfill {
1016            StreamScanType::UpstreamOnly
1017        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1018            // Snapshot backfill on sink-into-table is not allowed
1019            StreamScanType::SnapshotBackfill
1020        } else if self.should_use_arrangement_backfill() {
1021            StreamScanType::ArrangementBackfill
1022        } else {
1023            StreamScanType::Backfill
1024        };
1025        assert_eq!(self.phase, PlanPhase::Logical);
1026        assert_eq!(self.plan.convention(), Convention::Logical);
1027        let stream_plan =
1028            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1029        assert_eq!(self.phase, PlanPhase::Stream);
1030        assert_eq!(stream_plan.convention(), Convention::Stream);
1031        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1032            let columns = t.columns_without_rw_timestamp();
1033            self.target_columns_to_plan_mapping(&columns, user_specified_columns)
1034        });
1035        StreamSink::create(
1036            stream_plan,
1037            sink_name,
1038            db_name,
1039            sink_from_table_name,
1040            target_table,
1041            target_columns_to_plan_mapping,
1042            self.required_dist.clone(),
1043            self.required_order.clone(),
1044            self.out_fields.clone(),
1045            self.out_names.clone(),
1046            definition,
1047            properties,
1048            format_desc,
1049            partition_info,
1050        )
1051    }
1052
1053    pub fn should_use_arrangement_backfill(&self) -> bool {
1054        let ctx = self.plan.ctx();
1055        let session_ctx = ctx.session_ctx();
1056        let arrangement_backfill_enabled = session_ctx
1057            .env()
1058            .streaming_config()
1059            .developer
1060            .enable_arrangement_backfill;
1061        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1062    }
1063
1064    pub fn should_use_snapshot_backfill(&self) -> bool {
1065        self.plan
1066            .ctx()
1067            .session_ctx()
1068            .config()
1069            .streaming_use_snapshot_backfill()
1070    }
1071
1072    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1073    pub fn target_columns_to_plan_mapping(
1074        &self,
1075        tar_cols: &[ColumnCatalog],
1076        user_specified_columns: bool,
1077    ) -> Vec<Option<usize>> {
1078        #[allow(clippy::disallowed_methods)]
1079        let visible_cols: Vec<(usize, String)> = self
1080            .out_fields
1081            .ones()
1082            .zip_eq(self.out_names.iter().cloned())
1083            .collect_vec();
1084
1085        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1086        let visible_col_idxes_by_name = visible_cols
1087            .iter()
1088            .map(|(i, name)| (name.as_ref(), *i))
1089            .collect::<BTreeMap<_, _>>();
1090
1091        tar_cols
1092            .iter()
1093            .enumerate()
1094            .filter(|(_, tar_col)| tar_col.can_dml())
1095            .map(|(tar_i, tar_col)| {
1096                if user_specified_columns {
1097                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1098                } else {
1099                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1100                }
1101            })
1102            .collect()
1103    }
1104}
1105
1106fn find_version_column_index(
1107    column_catalog: &Vec<ColumnCatalog>,
1108    version_column_name: String,
1109) -> Result<Option<usize>> {
1110    for (index, column) in column_catalog.iter().enumerate() {
1111        if column.column_desc.name == version_column_name {
1112            if let &DataType::Jsonb
1113            | &DataType::List(_)
1114            | &DataType::Struct(_)
1115            | &DataType::Bytea
1116            | &DataType::Boolean = column.data_type()
1117            {
1118                Err(ErrorCode::InvalidParameterValue(
1119                    "The specified version column data type is invalid.".to_owned(),
1120                ))?
1121            }
1122            return Ok(Some(index));
1123        }
1124    }
1125    Err(ErrorCode::InvalidParameterValue(
1126        "The specified version column name is not in the current columns.".to_owned(),
1127    ))?
1128}
1129
1130fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1131    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1132
1133    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1134    if let Some(error) = const_eval_rewriter.error {
1135        return Err(error);
1136    }
1137    Ok(plan)
1138}
1139
1140fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1141    let mut v = TimestamptzExprFinder::default();
1142    plan.visit_exprs_recursive(&mut v);
1143    if v.has() {
1144        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1145    } else {
1146        Ok(plan)
1147    }
1148}
1149
1150fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1151    if plan.node_type() == PlanNodeType::BatchExchange {
1152        return false;
1153    }
1154    is_candidate(plan)
1155        || plan
1156            .inputs()
1157            .iter()
1158            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1159}
1160
1161/// As we always run the root stage locally, for some plan in root stage which need to execute in
1162/// compute node we insert an additional exhchange before it to avoid to include it in the root
1163/// stage.
1164///
1165/// Returns `true` if we must insert an additional exchange to ensure this.
1166fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1167    fn is_user_table(plan: &PlanRef) -> bool {
1168        plan.node_type() == PlanNodeType::BatchSeqScan
1169    }
1170
1171    fn is_log_table(plan: &PlanRef) -> bool {
1172        plan.node_type() == PlanNodeType::BatchLogSeqScan
1173    }
1174
1175    fn is_source(plan: &PlanRef) -> bool {
1176        plan.node_type() == PlanNodeType::BatchSource
1177            || plan.node_type() == PlanNodeType::BatchKafkaScan
1178            || plan.node_type() == PlanNodeType::BatchIcebergScan
1179    }
1180
1181    fn is_insert(plan: &PlanRef) -> bool {
1182        plan.node_type() == PlanNodeType::BatchInsert
1183    }
1184
1185    fn is_update(plan: &PlanRef) -> bool {
1186        plan.node_type() == PlanNodeType::BatchUpdate
1187    }
1188
1189    fn is_delete(plan: &PlanRef) -> bool {
1190        plan.node_type() == PlanNodeType::BatchDelete
1191    }
1192
1193    assert_eq!(plan.distribution(), &Distribution::Single);
1194    exist_and_no_exchange_before(&plan, is_user_table)
1195        || exist_and_no_exchange_before(&plan, is_source)
1196        || exist_and_no_exchange_before(&plan, is_insert)
1197        || exist_and_no_exchange_before(&plan, is_update)
1198        || exist_and_no_exchange_before(&plan, is_delete)
1199        || exist_and_no_exchange_before(&plan, is_log_table)
1200}
1201
1202/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1203/// them for the different requirement of plan node in different execute mode.
1204fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1205    fn is_user_table(plan: &PlanRef) -> bool {
1206        plan.node_type() == PlanNodeType::BatchSeqScan
1207    }
1208
1209    fn is_source(plan: &PlanRef) -> bool {
1210        plan.node_type() == PlanNodeType::BatchSource
1211            || plan.node_type() == PlanNodeType::BatchKafkaScan
1212            || plan.node_type() == PlanNodeType::BatchIcebergScan
1213    }
1214
1215    fn is_insert(plan: &PlanRef) -> bool {
1216        plan.node_type() == PlanNodeType::BatchInsert
1217    }
1218
1219    assert_eq!(plan.distribution(), &Distribution::Single);
1220    exist_and_no_exchange_before(&plan, is_user_table)
1221        || exist_and_no_exchange_before(&plan, is_source)
1222        || exist_and_no_exchange_before(&plan, is_insert)
1223}
1224
1225#[cfg(test)]
1226mod tests {
1227    use super::*;
1228    use crate::optimizer::plan_node::LogicalValues;
1229
1230    #[tokio::test]
1231    async fn test_as_subplan() {
1232        let ctx = OptimizerContext::mock().await;
1233        let values = LogicalValues::new(
1234            vec![],
1235            Schema::new(vec![
1236                Field::with_name(DataType::Int32, "v1"),
1237                Field::with_name(DataType::Varchar, "v2"),
1238            ]),
1239            ctx,
1240        )
1241        .into();
1242        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1243        let out_names = vec!["v1".into()];
1244        let root = PlanRoot::new_with_logical_plan(
1245            values,
1246            RequiredDist::Any,
1247            Order::any(),
1248            out_fields,
1249            out_names,
1250        );
1251        let subplan = root.into_unordered_subplan();
1252        assert_eq!(
1253            subplan.schema(),
1254            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1255        );
1256    }
1257}