risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
36    SysTableVisitor,
37};
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70    ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86    StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87    ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
90use crate::optimizer::property::Distribution;
91use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
92
93/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
94/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
95/// batch plan with optimization. the required Order and Distribution columns might be more than the
96/// output columns. for example:
97/// ```sql
98///    select v1 from t order by id;
99/// ```
100/// the plan will return two columns (id, v1), and the required order column is id. the id
101/// column is required in optimization, but the final generated plan will remove the unnecessary
102/// column in the result.
103#[derive(Educe)]
104#[educe(Debug, Clone)]
105pub struct PlanRoot<P: PlanPhase> {
106    // The current plan node.
107    pub plan: PlanRef<P::Convention>,
108    // The phase of the plan.
109    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
110    _phase: PhantomData<P>,
111    required_dist: RequiredDist,
112    required_order: Order,
113    out_fields: FixedBitSet,
114    out_names: Vec<String>,
115}
116
117/// `PlanPhase` is used to track the phase of the `PlanRoot`.
118/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
119/// Typical phase transformation are:
120/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
121/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
122pub trait PlanPhase {
123    type Convention: ConventionMarker;
124}
125
126macro_rules! for_all_phase {
127    () => {
128        for_all_phase! {
129            { Logical, $crate::optimizer::plan_node::Logical },
130            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
131            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
132            { Batch, $crate::optimizer::plan_node::Batch },
133            { Stream, $crate::optimizer::plan_node::Stream }
134        }
135    };
136    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
137        $(
138            paste::paste! {
139                pub struct [< PlanPhase$phase >];
140                impl PlanPhase for [< PlanPhase$phase >] {
141                    type Convention = $convention;
142                }
143                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
144            }
145        )+
146    }
147}
148
149for_all_phase!();
150
151impl LogicalPlanRoot {
152    pub fn new_with_logical_plan(
153        plan: LogicalPlanRef,
154        required_dist: RequiredDist,
155        required_order: Order,
156        out_fields: FixedBitSet,
157        out_names: Vec<String>,
158    ) -> Self {
159        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
160    }
161}
162
163impl BatchPlanRoot {
164    pub fn new_with_batch_plan(
165        plan: BatchPlanRef,
166        required_dist: RequiredDist,
167        required_order: Order,
168        out_fields: FixedBitSet,
169        out_names: Vec<String>,
170    ) -> Self {
171        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
172    }
173}
174
175impl<P: PlanPhase> PlanRoot<P> {
176    fn new_inner(
177        plan: PlanRef<P::Convention>,
178        required_dist: RequiredDist,
179        required_order: Order,
180        out_fields: FixedBitSet,
181        out_names: Vec<String>,
182    ) -> Self {
183        let input_schema = plan.schema();
184        assert_eq!(input_schema.fields().len(), out_fields.len());
185        assert_eq!(out_fields.count_ones(..), out_names.len());
186
187        Self {
188            plan,
189            _phase: PhantomData,
190            required_dist,
191            required_order,
192            out_fields,
193            out_names,
194        }
195    }
196
197    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
198        PlanRoot {
199            plan,
200            _phase: PhantomData,
201            required_dist: self.required_dist,
202            required_order: self.required_order,
203            out_fields: self.out_fields,
204            out_names: self.out_names,
205        }
206    }
207
208    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
209    /// b, ..)`.
210    ///
211    /// If the number of names does not match the number of output fields, an error is returned.
212    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
213        if out_names.len() != self.out_fields.count_ones(..) {
214            Err(ErrorCode::InvalidInputSyntax(
215                "number of column names does not match number of columns".to_owned(),
216            ))?
217        }
218        self.out_names = out_names;
219        Ok(())
220    }
221
222    /// Get the plan root's schema, only including the fields to be output.
223    pub fn schema(&self) -> Schema {
224        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
225        // as a field and always construct one on demand here to keep it in sync.
226        Schema {
227            fields: self
228                .out_fields
229                .ones()
230                .map(|i| self.plan.schema().fields()[i].clone())
231                .zip_eq_debug(&self.out_names)
232                .map(|(field, name)| Field {
233                    name: name.clone(),
234                    ..field
235                })
236                .collect(),
237        }
238    }
239}
240
241impl LogicalPlanRoot {
242    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
243    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
244    /// (`out_fields`).
245    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
246        if self.out_fields.count_ones(..) == self.out_fields.len() {
247            return self.plan;
248        }
249        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
250    }
251
252    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
253    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
254    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
255    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
256        use generic::Agg;
257        use plan_node::PlanAggCall;
258        use risingwave_common::types::ListValue;
259        use risingwave_expr::aggregate::PbAggKind;
260
261        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
262        use crate::utils::{Condition, IndexSet};
263
264        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
265            bail!("subquery must return only one column");
266        };
267        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
268        let return_type = DataType::list(input_column_type.clone());
269        let agg = Agg::new(
270            vec![PlanAggCall {
271                agg_type: PbAggKind::ArrayAgg.into(),
272                return_type: return_type.clone(),
273                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
274                distinct: false,
275                order_by: self.required_order.column_orders,
276                filter: Condition::true_cond(),
277                direct_args: vec![],
278            }],
279            IndexSet::empty(),
280            self.plan,
281        );
282        Ok(LogicalProject::create(
283            agg.into(),
284            vec![
285                FunctionCall::new(
286                    ExprType::Coalesce,
287                    vec![
288                        InputRef::new(0, return_type).into(),
289                        ExprImpl::literal_list(
290                            ListValue::empty(&input_column_type),
291                            input_column_type,
292                        ),
293                    ],
294                )
295                .unwrap()
296                .into(),
297            ],
298        ))
299    }
300
301    /// Apply logical optimization to the plan for stream.
302    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
303        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
304        Ok(self)
305    }
306
307    /// Apply logical optimization to the plan for batch.
308    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
309        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
310        Ok(self.into_phase(plan))
311    }
312
313    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
314        self.gen_optimized_logical_plan_for_batch()?
315            .gen_batch_plan()
316    }
317}
318
319impl BatchOptimizedLogicalPlanRoot {
320    /// Optimize and generate a singleton batch physical plan without exchange nodes.
321    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
322        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
323            return Err(ErrorCode::NotSupported(
324                "do not support temporal join for batch queries".to_owned(),
325                "please use temporal join in streaming queries".to_owned(),
326            )
327            .into());
328        }
329
330        let ctx = self.plan.ctx();
331        // Inline session timezone mainly for rewriting now()
332        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
333
334        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
335        plan = const_eval_exprs(plan)?;
336
337        if ctx.is_explain_trace() {
338            ctx.trace("Const eval exprs:");
339            ctx.trace(plan.explain_to_string());
340        }
341
342        // Convert to physical plan node
343        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
344        if ctx.is_explain_trace() {
345            ctx.trace("To Batch Plan:");
346            ctx.trace(plan.explain_to_string());
347        }
348
349        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
350            "Merge BatchProject",
351            vec![BatchProjectMergeRule::create()],
352            ApplyOrder::BottomUp,
353        ))?;
354
355        // Inline session timezone
356        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
357
358        if ctx.is_explain_trace() {
359            ctx.trace("Inline Session Timezone:");
360            ctx.trace(plan.explain_to_string());
361        }
362
363        #[cfg(debug_assertions)]
364        InputRefValidator.validate(plan.clone());
365        assert_eq!(
366            *plan.distribution(),
367            Distribution::Single,
368            "{}",
369            plan.explain_to_string()
370        );
371        assert!(
372            !has_batch_exchange(plan.clone()),
373            "{}",
374            plan.explain_to_string()
375        );
376
377        let ctx = plan.ctx();
378        if ctx.is_explain_trace() {
379            ctx.trace("To Batch Physical Plan:");
380            ctx.trace(plan.explain_to_string());
381        }
382
383        Ok(self.into_phase(plan))
384    }
385}
386
387impl BatchPlanRoot {
388    /// Optimize and generate a batch query plan for distributed execution.
389    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
390        self.required_dist = RequiredDist::single();
391        let mut plan = self.plan;
392
393        // Convert to distributed plan
394        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
395
396        let ctx = plan.ctx();
397        if ctx.is_explain_trace() {
398            ctx.trace("To Batch Distributed Plan:");
399            ctx.trace(plan.explain_to_string());
400        }
401        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
402            plan =
403                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
404        }
405
406        // Add Project if the any position of `self.out_fields` is set to zero.
407        if self.out_fields.count_ones(..) != self.out_fields.len() {
408            plan =
409                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
410        }
411
412        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
413        let plan = plan.optimize_by_rules(&OptimizationStage::new(
414            "Push Limit To Scan",
415            vec![BatchPushLimitToScanRule::create()],
416            ApplyOrder::BottomUp,
417        ))?;
418
419        let plan = plan.optimize_by_rules(&OptimizationStage::new(
420            "Iceberg Count Star",
421            vec![BatchIcebergCountStar::create()],
422            ApplyOrder::TopDown,
423        ))?;
424
425        // For iceberg scan, we do iceberg predicate pushdown
426        // BatchFilter -> BatchIcebergScan
427        let plan = plan.optimize_by_rules(&OptimizationStage::new(
428            "Iceberg Predicate Pushdown",
429            vec![BatchIcebergPredicatePushDownRule::create()],
430            ApplyOrder::BottomUp,
431        ))?;
432
433        Ok(plan)
434    }
435
436    /// Optimize and generate a batch query plan for local execution.
437    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
438        let mut plan = self.plan;
439
440        // Convert to local plan node
441        plan = plan.to_local_with_order_required(&self.required_order)?;
442
443        // We remark that since the `to_local_with_order_required` does not enforce single
444        // distribution, we enforce at the root if needed.
445        let insert_exchange = match plan.distribution() {
446            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
447            _ => true,
448        };
449        if insert_exchange {
450            plan =
451                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
452        }
453
454        // Add Project if the any position of `self.out_fields` is set to zero.
455        if self.out_fields.count_ones(..) != self.out_fields.len() {
456            plan =
457                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
458        }
459
460        let ctx = plan.ctx();
461        if ctx.is_explain_trace() {
462            ctx.trace("To Batch Local Plan:");
463            ctx.trace(plan.explain_to_string());
464        }
465
466        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
467        let plan = plan.optimize_by_rules(&OptimizationStage::new(
468            "Push Limit To Scan",
469            vec![BatchPushLimitToScanRule::create()],
470            ApplyOrder::BottomUp,
471        ))?;
472
473        let plan = plan.optimize_by_rules(&OptimizationStage::new(
474            "Iceberg Count Star",
475            vec![BatchIcebergCountStar::create()],
476            ApplyOrder::TopDown,
477        ))?;
478        Ok(plan)
479    }
480}
481
482impl LogicalPlanRoot {
483    /// Generate optimized stream plan
484    fn gen_optimized_stream_plan(
485        self,
486        emit_on_window_close: bool,
487        allow_snapshot_backfill: bool,
488    ) -> Result<StreamOptimizedLogicalPlanRoot> {
489        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
490            StreamScanType::SnapshotBackfill
491        } else if self.should_use_arrangement_backfill() {
492            StreamScanType::ArrangementBackfill
493        } else {
494            StreamScanType::Backfill
495        };
496        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
497    }
498
499    fn gen_optimized_stream_plan_inner(
500        self,
501        emit_on_window_close: bool,
502        stream_scan_type: StreamScanType,
503    ) -> Result<StreamOptimizedLogicalPlanRoot> {
504        let ctx = self.plan.ctx();
505        let _explain_trace = ctx.is_explain_trace();
506
507        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
508
509        let mut plan = optimized_plan
510            .plan
511            .clone()
512            .optimize_by_rules(&OptimizationStage::new(
513                "Merge StreamProject",
514                vec![StreamProjectMergeRule::create()],
515                ApplyOrder::BottomUp,
516            ))?;
517
518        if ctx
519            .session_ctx()
520            .config()
521            .streaming_separate_consecutive_join()
522        {
523            plan = plan.optimize_by_rules(&OptimizationStage::new(
524                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
525                vec![SeparateConsecutiveJoinRule::create()],
526                ApplyOrder::BottomUp,
527            ))?;
528        }
529
530        // Add Logstore for Unaligned join
531        // Apply this BEFORE delta join rule, because delta join removes
532        // the join
533        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
534            plan = plan.optimize_by_rules(&OptimizationStage::new(
535                "Add Logstore for Unaligned join",
536                vec![AddLogstoreRule::create()],
537                ApplyOrder::BottomUp,
538            ))?;
539        }
540
541        if ctx.session_ctx().config().streaming_enable_delta_join()
542            && ctx.session_ctx().config().enable_index_selection()
543        {
544            // TODO: make it a logical optimization.
545            // Rewrite joins with index to delta join
546            plan = plan.optimize_by_rules(&OptimizationStage::new(
547                "To IndexDeltaJoin",
548                vec![IndexDeltaJoinRule::create()],
549                ApplyOrder::BottomUp,
550            ))?;
551        }
552        // Inline session timezone
553        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
554
555        if ctx.is_explain_trace() {
556            ctx.trace("Inline session timezone:");
557            ctx.trace(plan.explain_to_string());
558        }
559
560        // Const eval of exprs at the last minute
561        plan = const_eval_exprs(plan)?;
562
563        if ctx.is_explain_trace() {
564            ctx.trace("Const eval exprs:");
565            ctx.trace(plan.explain_to_string());
566        }
567
568        #[cfg(debug_assertions)]
569        InputRefValidator.validate(plan.clone());
570
571        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
572            return Err(ErrorCode::NotSupported(
573                "exist dangling temporal scan".to_owned(),
574                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
575            ).into());
576        }
577
578        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
579            return Err(ErrorCode::NotSupported(
580                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
581                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
582            ).into());
583        }
584
585        Ok(optimized_plan.into_phase(plan))
586    }
587
588    /// Generate create index or create materialize view plan.
589    fn gen_stream_plan(
590        self,
591        emit_on_window_close: bool,
592        stream_scan_type: StreamScanType,
593    ) -> Result<StreamOptimizedLogicalPlanRoot> {
594        let ctx = self.plan.ctx();
595        let explain_trace = ctx.is_explain_trace();
596
597        let plan = {
598            {
599                if !ctx
600                    .session_ctx()
601                    .config()
602                    .streaming_allow_jsonb_in_stream_key()
603                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
604                {
605                    return Err(ErrorCode::NotSupported(
606                        err,
607                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
608                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
609                    ).into());
610                }
611                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
612                let (plan, out_col_change) = {
613                    let (plan, out_col_change) = optimized_plan
614                        .plan
615                        .logical_rewrite_for_stream(&mut Default::default())?;
616                    if out_col_change.is_injective() {
617                        (plan, out_col_change)
618                    } else {
619                        let mut output_indices = (0..plan.schema().len()).collect_vec();
620                        #[allow(unused_assignments)]
621                        let (mut map, mut target_size) = out_col_change.into_parts();
622
623                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
624                        // assert_eq!(target_size, output_indices.len());
625                        target_size = plan.schema().len();
626                        let mut tar_exists = vec![false; target_size];
627                        for i in map.iter_mut().flatten() {
628                            if tar_exists[*i] {
629                                output_indices.push(*i);
630                                *i = target_size;
631                                target_size += 1;
632                            } else {
633                                tar_exists[*i] = true;
634                            }
635                        }
636                        let plan =
637                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
638                        let out_col_change = ColIndexMapping::new(map, target_size);
639                        (plan.into(), out_col_change)
640                    }
641                };
642
643                if explain_trace {
644                    ctx.trace("Logical Rewrite For Stream:");
645                    ctx.trace(plan.explain_to_string());
646                }
647
648                optimized_plan.required_dist =
649                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
650                optimized_plan.required_order = out_col_change
651                    .rewrite_required_order(&optimized_plan.required_order)
652                    .unwrap();
653                optimized_plan.out_fields =
654                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
655                let mut plan = plan.to_stream_with_dist_required(
656                    &optimized_plan.required_dist,
657                    &mut ToStreamContext::new_with_stream_scan_type(
658                        emit_on_window_close,
659                        stream_scan_type,
660                    ),
661                )?;
662                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
663                optimized_plan.into_phase(plan)
664            }
665        };
666
667        if explain_trace {
668            ctx.trace("To Stream Plan:");
669            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
670            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
671        }
672        Ok(plan)
673    }
674
675    /// Visit the plan root and compute the cardinality.
676    ///
677    /// Panics if not called on a logical plan.
678    fn compute_cardinality(&self) -> Cardinality {
679        CardinalityVisitor.visit(self.plan.clone())
680    }
681
682    /// Optimize and generate a create table plan.
683    pub fn gen_table_plan(
684        self,
685        context: OptimizerContextRef,
686        table_name: String,
687        database_id: DatabaseId,
688        schema_id: SchemaId,
689        CreateTableInfo {
690            columns,
691            pk_column_ids,
692            row_id_index,
693            watermark_descs,
694            source_catalog,
695            version,
696        }: CreateTableInfo,
697        CreateTableProps {
698            definition,
699            append_only,
700            on_conflict,
701            with_version_columns,
702            webhook_info,
703            engine,
704        }: CreateTableProps,
705    ) -> Result<StreamMaterialize> {
706        // Snapshot backfill is not allowed for create table
707        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
708
709        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
710
711        let pk_column_indices = {
712            let mut id_to_idx = HashMap::new();
713
714            columns.iter().enumerate().for_each(|(idx, c)| {
715                id_to_idx.insert(c.column_id(), idx);
716            });
717            pk_column_ids
718                .iter()
719                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
720                .collect_vec()
721        };
722
723        fn inject_project_for_generated_column_if_needed(
724            columns: &[ColumnCatalog],
725            node: StreamPlanRef,
726        ) -> Result<StreamPlanRef> {
727            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
728            if let Some(exprs) = exprs {
729                let logical_project = generic::Project::new(exprs, node);
730                return Ok(StreamProject::new(logical_project).into());
731            }
732            Ok(node)
733        }
734
735        #[derive(PartialEq, Debug, Copy, Clone)]
736        enum PrimaryKeyKind {
737            UserDefinedPrimaryKey,
738            NonAppendOnlyRowIdPk,
739            AppendOnlyRowIdPk,
740        }
741
742        fn inject_dml_node(
743            columns: &[ColumnCatalog],
744            append_only: bool,
745            stream_plan: StreamPlanRef,
746            pk_column_indices: &[usize],
747            kind: PrimaryKeyKind,
748            column_descs: Vec<ColumnDesc>,
749        ) -> Result<StreamPlanRef> {
750            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
751
752            // Add generated columns.
753            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
754
755            dml_node = match kind {
756                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
757                    RequiredDist::hash_shard(pk_column_indices)
758                        .streaming_enforce_if_not_satisfies(dml_node)?
759                }
760                PrimaryKeyKind::AppendOnlyRowIdPk => {
761                    StreamExchange::new_no_shuffle(dml_node).into()
762                }
763            };
764
765            Ok(dml_node)
766        }
767
768        let kind = if let Some(row_id_index) = row_id_index {
769            assert_eq!(
770                pk_column_indices.iter().exactly_one().copied().unwrap(),
771                row_id_index
772            );
773            if append_only {
774                PrimaryKeyKind::AppendOnlyRowIdPk
775            } else {
776                PrimaryKeyKind::NonAppendOnlyRowIdPk
777            }
778        } else {
779            PrimaryKeyKind::UserDefinedPrimaryKey
780        };
781
782        let column_descs: Vec<ColumnDesc> = columns
783            .iter()
784            .filter(|&c| c.can_dml())
785            .map(|c| c.column_desc.clone())
786            .collect();
787
788        let mut not_null_idxs = vec![];
789        for (idx, column) in column_descs.iter().enumerate() {
790            if !column.nullable {
791                not_null_idxs.push(idx);
792            }
793        }
794
795        let version_column_indices = if !with_version_columns.is_empty() {
796            find_version_column_indices(&columns, with_version_columns)?
797        } else {
798            vec![]
799        };
800
801        let with_external_source = source_catalog.is_some();
802        let (dml_source_node, external_source_node) = if with_external_source {
803            let dummy_source_node = LogicalSource::new(
804                None,
805                columns.clone(),
806                row_id_index,
807                SourceNodeKind::CreateTable,
808                context.clone(),
809                None,
810            )
811            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
812            let mut external_source_node = stream_plan.plan;
813            external_source_node =
814                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
815            external_source_node = match kind {
816                PrimaryKeyKind::UserDefinedPrimaryKey => {
817                    RequiredDist::hash_shard(&pk_column_indices)
818                        .streaming_enforce_if_not_satisfies(external_source_node)?
819                }
820
821                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
822                    StreamExchange::new_no_shuffle(external_source_node).into()
823                }
824            };
825            (dummy_source_node, Some(external_source_node))
826        } else {
827            (stream_plan.plan, None)
828        };
829
830        let dml_node = inject_dml_node(
831            &columns,
832            append_only,
833            dml_source_node,
834            &pk_column_indices,
835            kind,
836            column_descs,
837        )?;
838
839        let dists = external_source_node
840            .iter()
841            .map(|input| input.distribution())
842            .chain([dml_node.distribution()])
843            .unique()
844            .collect_vec();
845
846        let dist = match &dists[..] {
847            &[Distribution::SomeShard, Distribution::HashShard(_)]
848            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
849            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
850                dist.clone()
851            }
852            _ => {
853                unreachable!()
854            }
855        };
856
857        let generated_column_exprs =
858            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
859        let upstream_sink_union = StreamUpstreamSinkUnion::new(
860            context.clone(),
861            dml_node.schema(),
862            dml_node.stream_key(),
863            dist.clone(), // should always be the same as dist of `Union`
864            append_only,
865            row_id_index.is_none(),
866            generated_column_exprs,
867        );
868
869        let union_inputs = external_source_node
870            .into_iter()
871            .chain([dml_node, upstream_sink_union.into()])
872            .collect_vec();
873
874        let mut stream_plan = StreamUnion::new_with_dist(
875            Union {
876                all: true,
877                inputs: union_inputs,
878                source_col: None,
879            },
880            dist,
881        )
882        .into();
883
884        // Add WatermarkFilter node.
885        if !watermark_descs.is_empty() {
886            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
887        }
888
889        // Add RowIDGen node if needed.
890        if let Some(row_id_index) = row_id_index {
891            match kind {
892                PrimaryKeyKind::UserDefinedPrimaryKey => {
893                    unreachable!()
894                }
895                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
896                    stream_plan = StreamRowIdGen::new_with_dist(
897                        stream_plan,
898                        row_id_index,
899                        Distribution::HashShard(vec![row_id_index]),
900                    )
901                    .into();
902                }
903            }
904        }
905
906        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
907
908        if let ConflictBehavior::IgnoreConflict = conflict_behavior
909            && !version_column_indices.is_empty()
910        {
911            Err(ErrorCode::InvalidParameterValue(
912                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
913            ))?
914        }
915
916        let retention_seconds = context.with_options().retention_seconds();
917
918        let table_required_dist = {
919            let mut bitset = FixedBitSet::with_capacity(columns.len());
920            for idx in &pk_column_indices {
921                bitset.insert(*idx);
922            }
923            RequiredDist::ShardByKey(bitset)
924        };
925
926        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
927
928        if !not_null_idxs.is_empty() {
929            stream_plan =
930                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
931        }
932
933        // Determine if the table should be refreshable based on the connector type
934        let refreshable = source_catalog
935            .as_ref()
936            .map(|catalog| catalog.with_properties.is_batch_connector())
937            .unwrap_or(false);
938
939        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
940        if refreshable && row_id_index.is_some() {
941            return Err(crate::error::ErrorCode::BindError(
942                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
943                    .to_owned(),
944            )
945            .into());
946        }
947
948        StreamMaterialize::create_for_table(
949            stream_plan,
950            table_name,
951            database_id,
952            schema_id,
953            table_required_dist,
954            Order::any(),
955            columns,
956            definition,
957            conflict_behavior,
958            version_column_indices,
959            pk_column_indices,
960            row_id_index,
961            version,
962            retention_seconds,
963            webhook_info,
964            engine,
965            refreshable,
966        )
967    }
968
969    /// Optimize and generate a create materialized view plan.
970    pub fn gen_materialize_plan(
971        self,
972        database_id: DatabaseId,
973        schema_id: SchemaId,
974        mv_name: String,
975        definition: String,
976        emit_on_window_close: bool,
977    ) -> Result<StreamMaterialize> {
978        let cardinality = self.compute_cardinality();
979        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
980        StreamMaterialize::create(
981            stream_plan,
982            mv_name,
983            database_id,
984            schema_id,
985            definition,
986            TableType::MaterializedView,
987            cardinality,
988            None,
989        )
990    }
991
992    /// Optimize and generate a create index plan.
993    pub fn gen_index_plan(
994        self,
995        index_name: String,
996        database_id: DatabaseId,
997        schema_id: SchemaId,
998        definition: String,
999        retention_seconds: Option<NonZeroU32>,
1000    ) -> Result<StreamMaterialize> {
1001        let cardinality = self.compute_cardinality();
1002        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1003
1004        StreamMaterialize::create(
1005            stream_plan,
1006            index_name,
1007            database_id,
1008            schema_id,
1009            definition,
1010            TableType::Index,
1011            cardinality,
1012            retention_seconds,
1013        )
1014    }
1015
1016    pub fn gen_vector_index_plan(
1017        self,
1018        index_name: String,
1019        database_id: DatabaseId,
1020        schema_id: SchemaId,
1021        definition: String,
1022        retention_seconds: Option<NonZeroU32>,
1023        vector_index_info: PbVectorIndexInfo,
1024    ) -> Result<StreamVectorIndexWrite> {
1025        let cardinality = self.compute_cardinality();
1026        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1027
1028        StreamVectorIndexWrite::create(
1029            stream_plan,
1030            index_name,
1031            database_id,
1032            schema_id,
1033            definition,
1034            cardinality,
1035            retention_seconds,
1036            vector_index_info,
1037        )
1038    }
1039
1040    /// Optimize and generate a create sink plan.
1041    #[allow(clippy::too_many_arguments)]
1042    pub fn gen_sink_plan(
1043        self,
1044        sink_name: String,
1045        definition: String,
1046        properties: WithOptionsSecResolved,
1047        emit_on_window_close: bool,
1048        db_name: String,
1049        sink_from_table_name: String,
1050        format_desc: Option<SinkFormatDesc>,
1051        without_backfill: bool,
1052        target_table: Option<Arc<TableCatalog>>,
1053        partition_info: Option<PartitionComputeInfo>,
1054        user_specified_columns: bool,
1055        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1056    ) -> Result<StreamSink> {
1057        let stream_scan_type = if without_backfill {
1058            StreamScanType::UpstreamOnly
1059        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1060            // Snapshot backfill on sink-into-table is not allowed
1061            StreamScanType::SnapshotBackfill
1062        } else if self.should_use_arrangement_backfill() {
1063            StreamScanType::ArrangementBackfill
1064        } else {
1065            StreamScanType::Backfill
1066        };
1067        if auto_refresh_schema_from_table.is_some()
1068            && stream_scan_type != StreamScanType::ArrangementBackfill
1069        {
1070            return Err(ErrorCode::InvalidInputSyntax(format!(
1071                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1072                stream_scan_type
1073            ))
1074            .into());
1075        }
1076        let stream_plan =
1077            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1078        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1079            let columns = t.columns_without_rw_timestamp();
1080            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1081        });
1082
1083        StreamSink::create(
1084            stream_plan,
1085            sink_name,
1086            db_name,
1087            sink_from_table_name,
1088            target_table,
1089            target_columns_to_plan_mapping,
1090            definition,
1091            properties,
1092            format_desc,
1093            partition_info,
1094            auto_refresh_schema_from_table,
1095        )
1096    }
1097}
1098
1099impl<P: PlanPhase> PlanRoot<P> {
1100    pub fn should_use_arrangement_backfill(&self) -> bool {
1101        let ctx = self.plan.ctx();
1102        let session_ctx = ctx.session_ctx();
1103        let arrangement_backfill_enabled = session_ctx
1104            .env()
1105            .streaming_config()
1106            .developer
1107            .enable_arrangement_backfill;
1108        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1109    }
1110
1111    pub fn should_use_snapshot_backfill(&self) -> bool {
1112        self.plan
1113            .ctx()
1114            .session_ctx()
1115            .config()
1116            .streaming_use_snapshot_backfill()
1117    }
1118
1119    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1120    pub fn target_columns_to_plan_mapping(
1121        &self,
1122        tar_cols: &[ColumnCatalog],
1123        user_specified_columns: bool,
1124    ) -> Vec<Option<usize>> {
1125        #[allow(clippy::disallowed_methods)]
1126        let visible_cols: Vec<(usize, String)> = self
1127            .out_fields
1128            .ones()
1129            .zip_eq(self.out_names.iter().cloned())
1130            .collect_vec();
1131
1132        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1133        let visible_col_idxes_by_name = visible_cols
1134            .iter()
1135            .map(|(i, name)| (name.as_ref(), *i))
1136            .collect::<BTreeMap<_, _>>();
1137
1138        tar_cols
1139            .iter()
1140            .enumerate()
1141            .filter(|(_, tar_col)| tar_col.can_dml())
1142            .map(|(tar_i, tar_col)| {
1143                if user_specified_columns {
1144                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1145                } else {
1146                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1147                }
1148            })
1149            .collect()
1150    }
1151}
1152
1153fn find_version_column_indices(
1154    column_catalog: &Vec<ColumnCatalog>,
1155    version_column_names: Vec<String>,
1156) -> Result<Vec<usize>> {
1157    let mut indices = Vec::new();
1158    for version_column_name in version_column_names {
1159        let mut found = false;
1160        for (index, column) in column_catalog.iter().enumerate() {
1161            if column.column_desc.name == version_column_name {
1162                if let &DataType::Jsonb
1163                | &DataType::List(_)
1164                | &DataType::Struct(_)
1165                | &DataType::Bytea
1166                | &DataType::Boolean = column.data_type()
1167                {
1168                    return Err(ErrorCode::InvalidInputSyntax(format!(
1169                        "Version column {} must be of a comparable data type",
1170                        version_column_name
1171                    ))
1172                    .into());
1173                }
1174                indices.push(index);
1175                found = true;
1176                break;
1177            }
1178        }
1179        if !found {
1180            return Err(ErrorCode::InvalidInputSyntax(format!(
1181                "Version column {} not found",
1182                version_column_name
1183            ))
1184            .into());
1185        }
1186    }
1187    Ok(indices)
1188}
1189
1190fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1191    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1192
1193    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1194    if let Some(error) = const_eval_rewriter.error {
1195        return Err(error);
1196    }
1197    Ok(plan)
1198}
1199
1200fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1201    ctx: OptimizerContextRef,
1202    plan: PlanRef<C>,
1203) -> Result<PlanRef<C>> {
1204    let mut v = TimestamptzExprFinder::default();
1205    plan.visit_exprs_recursive(&mut v);
1206    if v.has() {
1207        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1208    } else {
1209        Ok(plan)
1210    }
1211}
1212
1213fn exist_and_no_exchange_before(
1214    plan: &BatchPlanRef,
1215    is_candidate: fn(&BatchPlanRef) -> bool,
1216) -> bool {
1217    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1218        return false;
1219    }
1220    is_candidate(plan)
1221        || plan
1222            .inputs()
1223            .iter()
1224            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1225}
1226
1227impl BatchPlanRef {
1228    fn is_user_table_scan(&self) -> bool {
1229        self.node_type() == BatchPlanNodeType::BatchSeqScan
1230            || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1231            || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1232    }
1233
1234    fn is_source_scan(&self) -> bool {
1235        self.node_type() == BatchPlanNodeType::BatchSource
1236            || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1237            || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1238    }
1239
1240    fn is_insert(&self) -> bool {
1241        self.node_type() == BatchPlanNodeType::BatchInsert
1242    }
1243
1244    fn is_update(&self) -> bool {
1245        self.node_type() == BatchPlanNodeType::BatchUpdate
1246    }
1247
1248    fn is_delete(&self) -> bool {
1249        self.node_type() == BatchPlanNodeType::BatchDelete
1250    }
1251}
1252
1253/// As we always run the root stage locally, for some plan in root stage which need to execute in
1254/// compute node we insert an additional exhchange before it to avoid to include it in the root
1255/// stage.
1256///
1257/// Returns `true` if we must insert an additional exchange to ensure this.
1258fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1259    assert_eq!(plan.distribution(), &Distribution::Single);
1260    exist_and_no_exchange_before(&plan, |plan| {
1261        plan.is_user_table_scan()
1262            || plan.is_source_scan()
1263            || plan.is_insert()
1264            || plan.is_update()
1265            || plan.is_delete()
1266    })
1267}
1268
1269/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1270/// them for the different requirement of plan node in different execute mode.
1271fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1272    assert_eq!(plan.distribution(), &Distribution::Single);
1273    exist_and_no_exchange_before(&plan, |plan| {
1274        plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1275    })
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281    use crate::optimizer::plan_node::LogicalValues;
1282
1283    #[tokio::test]
1284    async fn test_as_subplan() {
1285        let ctx = OptimizerContext::mock().await;
1286        let values = LogicalValues::new(
1287            vec![],
1288            Schema::new(vec![
1289                Field::with_name(DataType::Int32, "v1"),
1290                Field::with_name(DataType::Varchar, "v2"),
1291            ]),
1292            ctx,
1293        )
1294        .into();
1295        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1296        let out_names = vec!["v1".into()];
1297        let root = PlanRoot::new_with_logical_plan(
1298            values,
1299            RequiredDist::Any,
1300            Order::any(),
1301            out_fields,
1302            out_names,
1303        );
1304        let subplan = root.into_unordered_subplan();
1305        assert_eq!(
1306            subplan.schema(),
1307            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1308        );
1309    }
1310}