risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::LogicalPlanToDataFusionExt;
36pub use plan_visitor::{
37    ExecutionModeDecider, LogicalIcebergScanExt, PlanVisitor, RelationCollectorVisitor,
38    SysTableVisitor,
39};
40use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
41
42pub mod backfill_order_strategy;
43mod logical_optimization;
44mod optimizer_context;
45pub mod plan_expr_rewriter;
46mod plan_expr_visitor;
47mod rule;
48
49use std::collections::{BTreeMap, HashMap};
50use std::marker::PhantomData;
51
52use educe::Educe;
53use fixedbitset::FixedBitSet;
54use itertools::Itertools as _;
55pub use logical_optimization::*;
56pub use optimizer_context::*;
57use plan_expr_rewriter::ConstEvalRewriter;
58use property::Order;
59use risingwave_common::bail;
60use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
61use risingwave_common::types::DataType;
62use risingwave_common::util::column_index_mapping::ColIndexMapping;
63use risingwave_common::util::iter_util::ZipEqDebug;
64use risingwave_connector::WithPropertiesExt;
65use risingwave_connector::sink::catalog::SinkFormatDesc;
66use risingwave_pb::stream_plan::StreamScanType;
67
68use self::heuristic_optimizer::ApplyOrder;
69use self::plan_node::generic::{self, PhysicalPlanRef};
70use self::plan_node::{
71    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
72    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
73    ToStreamContext, stream_enforce_eowc_requirement,
74};
75#[cfg(debug_assertions)]
76use self::plan_visitor::InputRefValidator;
77use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
78use self::property::{Cardinality, RequiredDist};
79use self::rule::*;
80use crate::TableCatalog;
81use crate::catalog::table_catalog::TableType;
82use crate::catalog::{DatabaseId, SchemaId};
83use crate::error::{ErrorCode, Result};
84use crate::expr::TimestamptzExprFinder;
85use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
86use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
87use crate::optimizer::plan_node::{
88    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
89    StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
90    ToStream, VisitExprsRecursive,
91};
92use crate::optimizer::plan_visitor::{
93    LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
94};
95use crate::optimizer::property::Distribution;
96use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
97
98/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
99/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
100/// batch plan with optimization. the required Order and Distribution columns might be more than the
101/// output columns. for example:
102/// ```sql
103///    select v1 from t order by id;
104/// ```
105/// the plan will return two columns (id, v1), and the required order column is id. the id
106/// column is required in optimization, but the final generated plan will remove the unnecessary
107/// column in the result.
108#[derive(Educe)]
109#[educe(Debug, Clone)]
110pub struct PlanRoot<P: PlanPhase> {
111    // The current plan node.
112    pub plan: PlanRef<P::Convention>,
113    // The phase of the plan.
114    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
115    _phase: PhantomData<P>,
116    required_dist: RequiredDist,
117    required_order: Order,
118    out_fields: FixedBitSet,
119    out_names: Vec<String>,
120}
121
122/// `PlanPhase` is used to track the phase of the `PlanRoot`.
123/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
124/// Typical phase transformation are:
125/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
126/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
127pub trait PlanPhase {
128    type Convention: ConventionMarker;
129}
130
131macro_rules! for_all_phase {
132    () => {
133        for_all_phase! {
134            { Logical, $crate::optimizer::plan_node::Logical },
135            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
136            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
137            { Batch, $crate::optimizer::plan_node::Batch },
138            { Stream, $crate::optimizer::plan_node::Stream }
139        }
140    };
141    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
142        $(
143            paste::paste! {
144                pub struct [< PlanPhase$phase >];
145                impl PlanPhase for [< PlanPhase$phase >] {
146                    type Convention = $convention;
147                }
148                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
149            }
150        )+
151    }
152}
153
154for_all_phase!();
155
156impl LogicalPlanRoot {
157    pub fn new_with_logical_plan(
158        plan: LogicalPlanRef,
159        required_dist: RequiredDist,
160        required_order: Order,
161        out_fields: FixedBitSet,
162        out_names: Vec<String>,
163    ) -> Self {
164        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
165    }
166}
167
168impl BatchPlanRoot {
169    pub fn new_with_batch_plan(
170        plan: BatchPlanRef,
171        required_dist: RequiredDist,
172        required_order: Order,
173        out_fields: FixedBitSet,
174        out_names: Vec<String>,
175    ) -> Self {
176        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
177    }
178}
179
180impl<P: PlanPhase> PlanRoot<P> {
181    fn new_inner(
182        plan: PlanRef<P::Convention>,
183        required_dist: RequiredDist,
184        required_order: Order,
185        out_fields: FixedBitSet,
186        out_names: Vec<String>,
187    ) -> Self {
188        let input_schema = plan.schema();
189        assert_eq!(input_schema.fields().len(), out_fields.len());
190        assert_eq!(out_fields.count_ones(..), out_names.len());
191
192        Self {
193            plan,
194            _phase: PhantomData,
195            required_dist,
196            required_order,
197            out_fields,
198            out_names,
199        }
200    }
201
202    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
203        PlanRoot {
204            plan,
205            _phase: PhantomData,
206            required_dist: self.required_dist,
207            required_order: self.required_order,
208            out_fields: self.out_fields,
209            out_names: self.out_names,
210        }
211    }
212
213    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
214    /// b, ..)`.
215    ///
216    /// If the number of names does not match the number of output fields, an error is returned.
217    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
218        if out_names.len() != self.out_fields.count_ones(..) {
219            Err(ErrorCode::InvalidInputSyntax(
220                "number of column names does not match number of columns".to_owned(),
221            ))?
222        }
223        self.out_names = out_names;
224        Ok(())
225    }
226
227    /// Get the plan root's schema, only including the fields to be output.
228    pub fn schema(&self) -> Schema {
229        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
230        // as a field and always construct one on demand here to keep it in sync.
231        Schema {
232            fields: self
233                .out_fields
234                .ones()
235                .map(|i| self.plan.schema().fields()[i].clone())
236                .zip_eq_debug(&self.out_names)
237                .map(|(field, name)| Field {
238                    name: name.clone(),
239                    ..field
240                })
241                .collect(),
242        }
243    }
244}
245
246impl LogicalPlanRoot {
247    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
248    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
249    /// (`out_fields`).
250    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
251        if self.out_fields.count_ones(..) == self.out_fields.len() {
252            return self.plan;
253        }
254        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
255    }
256
257    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
258    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
259    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
260    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
261        use generic::Agg;
262        use plan_node::PlanAggCall;
263        use risingwave_common::types::ListValue;
264        use risingwave_expr::aggregate::PbAggKind;
265
266        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
267        use crate::utils::{Condition, IndexSet};
268
269        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
270            bail!("subquery must return only one column");
271        };
272        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
273        let return_type = DataType::list(input_column_type.clone());
274        let agg = Agg::new(
275            vec![PlanAggCall {
276                agg_type: PbAggKind::ArrayAgg.into(),
277                return_type: return_type.clone(),
278                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
279                distinct: false,
280                order_by: self.required_order.column_orders,
281                filter: Condition::true_cond(),
282                direct_args: vec![],
283            }],
284            IndexSet::empty(),
285            self.plan,
286        );
287        Ok(LogicalProject::create(
288            agg.into(),
289            vec![
290                FunctionCall::new(
291                    ExprType::Coalesce,
292                    vec![
293                        InputRef::new(0, return_type).into(),
294                        ExprImpl::literal_list(
295                            ListValue::empty(&input_column_type),
296                            input_column_type,
297                        ),
298                    ],
299                )
300                .unwrap()
301                .into(),
302            ],
303        ))
304    }
305
306    /// Apply logical optimization to the plan for stream.
307    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
308        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
309        Ok(self)
310    }
311
312    /// Apply logical optimization to the plan for batch.
313    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
314        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
315        Ok(self.into_phase(plan))
316    }
317
318    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
319        self.gen_optimized_logical_plan_for_batch()?
320            .gen_batch_plan()
321    }
322}
323
324impl BatchOptimizedLogicalPlanRoot {
325    /// Optimize and generate a singleton batch physical plan without exchange nodes.
326    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
327        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
328            return Err(ErrorCode::NotSupported(
329                "do not support temporal join for batch queries".to_owned(),
330                "please use temporal join in streaming queries".to_owned(),
331            )
332            .into());
333        }
334
335        let ctx = self.plan.ctx();
336        // Inline session timezone mainly for rewriting now()
337        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
338
339        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
340        plan = const_eval_exprs(plan)?;
341
342        if ctx.is_explain_trace() {
343            ctx.trace("Const eval exprs:");
344            ctx.trace(plan.explain_to_string());
345        }
346
347        // Convert to physical plan node
348        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
349        if ctx.is_explain_trace() {
350            ctx.trace("To Batch Plan:");
351            ctx.trace(plan.explain_to_string());
352        }
353
354        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
355            "Merge BatchProject",
356            vec![BatchProjectMergeRule::create()],
357            ApplyOrder::BottomUp,
358        ))?;
359
360        // Inline session timezone
361        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
362
363        if ctx.is_explain_trace() {
364            ctx.trace("Inline Session Timezone:");
365            ctx.trace(plan.explain_to_string());
366        }
367
368        #[cfg(debug_assertions)]
369        InputRefValidator.validate(plan.clone());
370        assert_eq!(
371            *plan.distribution(),
372            Distribution::Single,
373            "{}",
374            plan.explain_to_string()
375        );
376        assert!(
377            !has_batch_exchange(plan.clone()),
378            "{}",
379            plan.explain_to_string()
380        );
381
382        let ctx = plan.ctx();
383        if ctx.is_explain_trace() {
384            ctx.trace("To Batch Physical Plan:");
385            ctx.trace(plan.explain_to_string());
386        }
387
388        Ok(self.into_phase(plan))
389    }
390}
391
392impl BatchPlanRoot {
393    /// Optimize and generate a batch query plan for distributed execution.
394    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
395        self.required_dist = RequiredDist::single();
396        let mut plan = self.plan;
397
398        // Convert to distributed plan
399        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
400
401        let ctx = plan.ctx();
402        if ctx.is_explain_trace() {
403            ctx.trace("To Batch Distributed Plan:");
404            ctx.trace(plan.explain_to_string());
405        }
406        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
407            plan =
408                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
409        }
410
411        // Add Project if the any position of `self.out_fields` is set to zero.
412        if self.out_fields.count_ones(..) != self.out_fields.len() {
413            plan =
414                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
415        }
416
417        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
418        let plan = plan.optimize_by_rules(&OptimizationStage::new(
419            "Push Limit To Scan",
420            vec![BatchPushLimitToScanRule::create()],
421            ApplyOrder::BottomUp,
422        ))?;
423
424        let plan = plan.optimize_by_rules(&OptimizationStage::new(
425            "Iceberg Count Star",
426            vec![BatchIcebergCountStar::create()],
427            ApplyOrder::TopDown,
428        ))?;
429
430        // For iceberg scan, we do iceberg predicate pushdown
431        // BatchFilter -> BatchIcebergScan
432        let plan = plan.optimize_by_rules(&OptimizationStage::new(
433            "Iceberg Predicate Pushdown",
434            vec![BatchIcebergPredicatePushDownRule::create()],
435            ApplyOrder::BottomUp,
436        ))?;
437
438        Ok(plan)
439    }
440
441    /// Optimize and generate a batch query plan for local execution.
442    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
443        let mut plan = self.plan;
444
445        // Convert to local plan node
446        plan = plan.to_local_with_order_required(&self.required_order)?;
447
448        // We remark that since the `to_local_with_order_required` does not enforce single
449        // distribution, we enforce at the root if needed.
450        let insert_exchange = match plan.distribution() {
451            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
452            _ => true,
453        };
454        if insert_exchange {
455            plan =
456                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
457        }
458
459        // Add Project if the any position of `self.out_fields` is set to zero.
460        if self.out_fields.count_ones(..) != self.out_fields.len() {
461            plan =
462                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
463        }
464
465        let ctx = plan.ctx();
466        if ctx.is_explain_trace() {
467            ctx.trace("To Batch Local Plan:");
468            ctx.trace(plan.explain_to_string());
469        }
470
471        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
472        let plan = plan.optimize_by_rules(&OptimizationStage::new(
473            "Push Limit To Scan",
474            vec![BatchPushLimitToScanRule::create()],
475            ApplyOrder::BottomUp,
476        ))?;
477
478        let plan = plan.optimize_by_rules(&OptimizationStage::new(
479            "Iceberg Count Star",
480            vec![BatchIcebergCountStar::create()],
481            ApplyOrder::TopDown,
482        ))?;
483        Ok(plan)
484    }
485}
486
487impl LogicalPlanRoot {
488    /// Generate optimized stream plan
489    fn gen_optimized_stream_plan(
490        self,
491        emit_on_window_close: bool,
492        allow_snapshot_backfill: bool,
493    ) -> Result<StreamOptimizedLogicalPlanRoot> {
494        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
495            StreamScanType::SnapshotBackfill
496        } else if self.should_use_arrangement_backfill() {
497            StreamScanType::ArrangementBackfill
498        } else {
499            StreamScanType::Backfill
500        };
501        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
502    }
503
504    fn gen_optimized_stream_plan_inner(
505        self,
506        emit_on_window_close: bool,
507        stream_scan_type: StreamScanType,
508    ) -> Result<StreamOptimizedLogicalPlanRoot> {
509        let ctx = self.plan.ctx();
510        let _explain_trace = ctx.is_explain_trace();
511
512        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
513
514        let mut plan = optimized_plan
515            .plan
516            .clone()
517            .optimize_by_rules(&OptimizationStage::new(
518                "Merge StreamProject",
519                vec![StreamProjectMergeRule::create()],
520                ApplyOrder::BottomUp,
521            ))?;
522
523        if ctx
524            .session_ctx()
525            .config()
526            .streaming_separate_consecutive_join()
527        {
528            plan = plan.optimize_by_rules(&OptimizationStage::new(
529                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
530                vec![SeparateConsecutiveJoinRule::create()],
531                ApplyOrder::BottomUp,
532            ))?;
533        }
534
535        // Add Logstore for Unaligned join
536        // Apply this BEFORE delta join rule, because delta join removes
537        // the join
538        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
539            plan = plan.optimize_by_rules(&OptimizationStage::new(
540                "Add Logstore for Unaligned join",
541                vec![AddLogstoreRule::create()],
542                ApplyOrder::BottomUp,
543            ))?;
544        }
545
546        if ctx.session_ctx().config().streaming_enable_delta_join()
547            && ctx.session_ctx().config().enable_index_selection()
548        {
549            // TODO: make it a logical optimization.
550            // Rewrite joins with index to delta join
551            plan = plan.optimize_by_rules(&OptimizationStage::new(
552                "To IndexDeltaJoin",
553                vec![IndexDeltaJoinRule::create()],
554                ApplyOrder::BottomUp,
555            ))?;
556        }
557        // Inline session timezone
558        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
559
560        if ctx.is_explain_trace() {
561            ctx.trace("Inline session timezone:");
562            ctx.trace(plan.explain_to_string());
563        }
564
565        // Const eval of exprs at the last minute
566        plan = const_eval_exprs(plan)?;
567
568        if ctx.is_explain_trace() {
569            ctx.trace("Const eval exprs:");
570            ctx.trace(plan.explain_to_string());
571        }
572
573        #[cfg(debug_assertions)]
574        InputRefValidator.validate(plan.clone());
575
576        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
577            return Err(ErrorCode::NotSupported(
578                "exist dangling temporal scan".to_owned(),
579                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
580            ).into());
581        }
582
583        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
584            return Err(ErrorCode::NotSupported(
585                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
586                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
587            ).into());
588        }
589
590        if ctx.session_ctx().config().enable_locality_backfill()
591            && LocalityProviderCounter::count(plan.clone()) > 5
592        {
593            risingwave_common::license::Feature::LocalityBackfill.check_available()?
594        }
595
596        Ok(optimized_plan.into_phase(plan))
597    }
598
599    /// Generate create index or create materialize view plan.
600    fn gen_stream_plan(
601        self,
602        emit_on_window_close: bool,
603        stream_scan_type: StreamScanType,
604    ) -> Result<StreamOptimizedLogicalPlanRoot> {
605        let ctx = self.plan.ctx();
606        let explain_trace = ctx.is_explain_trace();
607
608        let plan = {
609            {
610                if !ctx
611                    .session_ctx()
612                    .config()
613                    .streaming_allow_jsonb_in_stream_key()
614                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
615                {
616                    return Err(ErrorCode::NotSupported(
617                        err,
618                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
619                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
620                    ).into());
621                }
622                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
623                let (plan, out_col_change) = {
624                    let (plan, out_col_change) = optimized_plan
625                        .plan
626                        .logical_rewrite_for_stream(&mut Default::default())?;
627                    if out_col_change.is_injective() {
628                        (plan, out_col_change)
629                    } else {
630                        let mut output_indices = (0..plan.schema().len()).collect_vec();
631                        #[allow(unused_assignments)]
632                        let (mut map, mut target_size) = out_col_change.into_parts();
633
634                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
635                        // assert_eq!(target_size, output_indices.len());
636                        target_size = plan.schema().len();
637                        let mut tar_exists = vec![false; target_size];
638                        for i in map.iter_mut().flatten() {
639                            if tar_exists[*i] {
640                                output_indices.push(*i);
641                                *i = target_size;
642                                target_size += 1;
643                            } else {
644                                tar_exists[*i] = true;
645                            }
646                        }
647                        let plan =
648                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
649                        let out_col_change = ColIndexMapping::new(map, target_size);
650                        (plan.into(), out_col_change)
651                    }
652                };
653
654                if explain_trace {
655                    ctx.trace("Logical Rewrite For Stream:");
656                    ctx.trace(plan.explain_to_string());
657                }
658
659                optimized_plan.required_dist =
660                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
661                optimized_plan.required_order = out_col_change
662                    .rewrite_required_order(&optimized_plan.required_order)
663                    .unwrap();
664                optimized_plan.out_fields =
665                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
666                let mut plan = plan.to_stream_with_dist_required(
667                    &optimized_plan.required_dist,
668                    &mut ToStreamContext::new_with_stream_scan_type(
669                        emit_on_window_close,
670                        stream_scan_type,
671                    ),
672                )?;
673                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
674                optimized_plan.into_phase(plan)
675            }
676        };
677
678        if explain_trace {
679            ctx.trace("To Stream Plan:");
680            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
681            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
682        }
683        Ok(plan)
684    }
685
686    /// Visit the plan root and compute the cardinality.
687    ///
688    /// Panics if not called on a logical plan.
689    fn compute_cardinality(&self) -> Cardinality {
690        CardinalityVisitor.visit(self.plan.clone())
691    }
692
693    /// Optimize and generate a create table plan.
694    pub fn gen_table_plan(
695        self,
696        context: OptimizerContextRef,
697        table_name: String,
698        database_id: DatabaseId,
699        schema_id: SchemaId,
700        CreateTableInfo {
701            columns,
702            pk_column_ids,
703            row_id_index,
704            watermark_descs,
705            source_catalog,
706            version,
707        }: CreateTableInfo,
708        CreateTableProps {
709            definition,
710            append_only,
711            on_conflict,
712            with_version_columns,
713            webhook_info,
714            engine,
715        }: CreateTableProps,
716    ) -> Result<StreamMaterialize> {
717        // Snapshot backfill is not allowed for create table
718        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
719
720        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
721
722        let pk_column_indices = {
723            let mut id_to_idx = HashMap::new();
724
725            columns.iter().enumerate().for_each(|(idx, c)| {
726                id_to_idx.insert(c.column_id(), idx);
727            });
728            pk_column_ids
729                .iter()
730                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
731                .collect_vec()
732        };
733
734        fn inject_project_for_generated_column_if_needed(
735            columns: &[ColumnCatalog],
736            node: StreamPlanRef,
737        ) -> Result<StreamPlanRef> {
738            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
739            if let Some(exprs) = exprs {
740                let logical_project = generic::Project::new(exprs, node);
741                return Ok(StreamProject::new(logical_project).into());
742            }
743            Ok(node)
744        }
745
746        #[derive(PartialEq, Debug, Copy, Clone)]
747        enum PrimaryKeyKind {
748            UserDefinedPrimaryKey,
749            NonAppendOnlyRowIdPk,
750            AppendOnlyRowIdPk,
751        }
752
753        fn inject_dml_node(
754            columns: &[ColumnCatalog],
755            append_only: bool,
756            stream_plan: StreamPlanRef,
757            pk_column_indices: &[usize],
758            kind: PrimaryKeyKind,
759            column_descs: Vec<ColumnDesc>,
760        ) -> Result<StreamPlanRef> {
761            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
762
763            // Add generated columns.
764            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
765
766            dml_node = match kind {
767                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
768                    RequiredDist::hash_shard(pk_column_indices)
769                        .streaming_enforce_if_not_satisfies(dml_node)?
770                }
771                PrimaryKeyKind::AppendOnlyRowIdPk => {
772                    StreamExchange::new_no_shuffle(dml_node).into()
773                }
774            };
775
776            Ok(dml_node)
777        }
778
779        let kind = if let Some(row_id_index) = row_id_index {
780            assert_eq!(
781                pk_column_indices.iter().exactly_one().copied().unwrap(),
782                row_id_index
783            );
784            if append_only {
785                PrimaryKeyKind::AppendOnlyRowIdPk
786            } else {
787                PrimaryKeyKind::NonAppendOnlyRowIdPk
788            }
789        } else {
790            PrimaryKeyKind::UserDefinedPrimaryKey
791        };
792
793        let column_descs: Vec<ColumnDesc> = columns
794            .iter()
795            .filter(|&c| c.can_dml())
796            .map(|c| c.column_desc.clone())
797            .collect();
798
799        let mut not_null_idxs = vec![];
800        for (idx, column) in column_descs.iter().enumerate() {
801            if !column.nullable {
802                not_null_idxs.push(idx);
803            }
804        }
805
806        let version_column_indices = if !with_version_columns.is_empty() {
807            find_version_column_indices(&columns, with_version_columns)?
808        } else {
809            vec![]
810        };
811
812        let with_external_source = source_catalog.is_some();
813        let (dml_source_node, external_source_node) = if with_external_source {
814            let dummy_source_node = LogicalSource::new(
815                None,
816                columns.clone(),
817                row_id_index,
818                SourceNodeKind::CreateTable,
819                context.clone(),
820                None,
821            )
822            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
823            let mut external_source_node = stream_plan.plan;
824            external_source_node =
825                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
826            external_source_node = match kind {
827                PrimaryKeyKind::UserDefinedPrimaryKey => {
828                    RequiredDist::hash_shard(&pk_column_indices)
829                        .streaming_enforce_if_not_satisfies(external_source_node)?
830                }
831
832                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
833                    StreamExchange::new_no_shuffle(external_source_node).into()
834                }
835            };
836            (dummy_source_node, Some(external_source_node))
837        } else {
838            (stream_plan.plan, None)
839        };
840
841        let dml_node = inject_dml_node(
842            &columns,
843            append_only,
844            dml_source_node,
845            &pk_column_indices,
846            kind,
847            column_descs,
848        )?;
849
850        let dists = external_source_node
851            .iter()
852            .map(|input| input.distribution())
853            .chain([dml_node.distribution()])
854            .unique()
855            .collect_vec();
856
857        let dist = match &dists[..] {
858            &[Distribution::SomeShard, Distribution::HashShard(_)]
859            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
860            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
861                dist.clone()
862            }
863            _ => {
864                unreachable!()
865            }
866        };
867
868        let generated_column_exprs =
869            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
870        let upstream_sink_union = StreamUpstreamSinkUnion::new(
871            context.clone(),
872            dml_node.schema(),
873            dml_node.stream_key(),
874            dist.clone(), // should always be the same as dist of `Union`
875            append_only,
876            row_id_index.is_none(),
877            generated_column_exprs,
878        );
879
880        let union_inputs = external_source_node
881            .into_iter()
882            .chain([dml_node, upstream_sink_union.into()])
883            .collect_vec();
884
885        let mut stream_plan = StreamUnion::new_with_dist(
886            Union {
887                all: true,
888                inputs: union_inputs,
889                source_col: None,
890            },
891            dist,
892        )
893        .into();
894
895        // Add WatermarkFilter node.
896        if !watermark_descs.is_empty() {
897            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
898        }
899
900        // Add RowIDGen node if needed.
901        if let Some(row_id_index) = row_id_index {
902            match kind {
903                PrimaryKeyKind::UserDefinedPrimaryKey => {
904                    unreachable!()
905                }
906                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
907                    stream_plan = StreamRowIdGen::new_with_dist(
908                        stream_plan,
909                        row_id_index,
910                        Distribution::HashShard(vec![row_id_index]),
911                    )
912                    .into();
913                }
914            }
915        }
916
917        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
918
919        if let ConflictBehavior::IgnoreConflict = conflict_behavior
920            && !version_column_indices.is_empty()
921        {
922            Err(ErrorCode::InvalidParameterValue(
923                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
924            ))?
925        }
926
927        let retention_seconds = context.with_options().retention_seconds();
928
929        let table_required_dist = {
930            let mut bitset = FixedBitSet::with_capacity(columns.len());
931            for idx in &pk_column_indices {
932                bitset.insert(*idx);
933            }
934            RequiredDist::ShardByKey(bitset)
935        };
936
937        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
938
939        if !not_null_idxs.is_empty() {
940            stream_plan =
941                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
942        }
943
944        // Determine if the table should be refreshable based on the connector type
945        let refreshable = source_catalog
946            .as_ref()
947            .map(|catalog| {
948                catalog.with_properties.is_batch_connector() || {
949                    matches!(
950                        catalog
951                            .refresh_mode
952                            .as_ref()
953                            .map(|refresh_mode| refresh_mode.refresh_mode),
954                        Some(Some(RefreshMode::FullReload(_)))
955                    )
956                }
957            })
958            .unwrap_or(false);
959
960        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
961        if refreshable && row_id_index.is_some() {
962            return Err(crate::error::ErrorCode::BindError(
963                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
964                    .to_owned(),
965            )
966            .into());
967        }
968
969        StreamMaterialize::create_for_table(
970            stream_plan,
971            table_name,
972            database_id,
973            schema_id,
974            table_required_dist,
975            Order::any(),
976            columns,
977            definition,
978            conflict_behavior,
979            version_column_indices,
980            pk_column_indices,
981            row_id_index,
982            version,
983            retention_seconds,
984            webhook_info,
985            engine,
986            refreshable,
987        )
988    }
989
990    /// Optimize and generate a create materialized view plan.
991    pub fn gen_materialize_plan(
992        self,
993        database_id: DatabaseId,
994        schema_id: SchemaId,
995        mv_name: String,
996        definition: String,
997        emit_on_window_close: bool,
998    ) -> Result<StreamMaterialize> {
999        let cardinality = self.compute_cardinality();
1000        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1001        StreamMaterialize::create(
1002            stream_plan,
1003            mv_name,
1004            database_id,
1005            schema_id,
1006            definition,
1007            TableType::MaterializedView,
1008            cardinality,
1009            None,
1010        )
1011    }
1012
1013    /// Optimize and generate a create index plan.
1014    pub fn gen_index_plan(
1015        self,
1016        index_name: String,
1017        database_id: DatabaseId,
1018        schema_id: SchemaId,
1019        definition: String,
1020        retention_seconds: Option<NonZeroU32>,
1021    ) -> Result<StreamMaterialize> {
1022        let cardinality = self.compute_cardinality();
1023        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1024
1025        StreamMaterialize::create(
1026            stream_plan,
1027            index_name,
1028            database_id,
1029            schema_id,
1030            definition,
1031            TableType::Index,
1032            cardinality,
1033            retention_seconds,
1034        )
1035    }
1036
1037    pub fn gen_vector_index_plan(
1038        self,
1039        index_name: String,
1040        database_id: DatabaseId,
1041        schema_id: SchemaId,
1042        definition: String,
1043        retention_seconds: Option<NonZeroU32>,
1044        vector_index_info: PbVectorIndexInfo,
1045    ) -> Result<StreamVectorIndexWrite> {
1046        let cardinality = self.compute_cardinality();
1047        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1048
1049        StreamVectorIndexWrite::create(
1050            stream_plan,
1051            index_name,
1052            database_id,
1053            schema_id,
1054            definition,
1055            cardinality,
1056            retention_seconds,
1057            vector_index_info,
1058        )
1059    }
1060
1061    /// Optimize and generate a create sink plan.
1062    #[allow(clippy::too_many_arguments)]
1063    pub fn gen_sink_plan(
1064        self,
1065        sink_name: String,
1066        definition: String,
1067        properties: WithOptionsSecResolved,
1068        emit_on_window_close: bool,
1069        db_name: String,
1070        sink_from_table_name: String,
1071        format_desc: Option<SinkFormatDesc>,
1072        without_backfill: bool,
1073        target_table: Option<Arc<TableCatalog>>,
1074        partition_info: Option<PartitionComputeInfo>,
1075        user_specified_columns: bool,
1076        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1077    ) -> Result<StreamSink> {
1078        let stream_scan_type = if without_backfill {
1079            StreamScanType::UpstreamOnly
1080        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1081            // Snapshot backfill on sink-into-table is not allowed
1082            StreamScanType::SnapshotBackfill
1083        } else if self.should_use_arrangement_backfill() {
1084            StreamScanType::ArrangementBackfill
1085        } else {
1086            StreamScanType::Backfill
1087        };
1088        if auto_refresh_schema_from_table.is_some()
1089            && stream_scan_type != StreamScanType::ArrangementBackfill
1090        {
1091            return Err(ErrorCode::InvalidInputSyntax(format!(
1092                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1093                stream_scan_type
1094            ))
1095            .into());
1096        }
1097        let stream_plan =
1098            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1099        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1100            let columns = t.columns_without_rw_timestamp();
1101            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1102        });
1103
1104        StreamSink::create(
1105            stream_plan,
1106            sink_name,
1107            db_name,
1108            sink_from_table_name,
1109            target_table,
1110            target_columns_to_plan_mapping,
1111            definition,
1112            properties,
1113            format_desc,
1114            partition_info,
1115            auto_refresh_schema_from_table,
1116        )
1117    }
1118}
1119
1120impl<P: PlanPhase> PlanRoot<P> {
1121    pub fn should_use_arrangement_backfill(&self) -> bool {
1122        let ctx = self.plan.ctx();
1123        let session_ctx = ctx.session_ctx();
1124        let arrangement_backfill_enabled = session_ctx
1125            .env()
1126            .streaming_config()
1127            .developer
1128            .enable_arrangement_backfill;
1129        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1130    }
1131
1132    pub fn should_use_snapshot_backfill(&self) -> bool {
1133        self.plan
1134            .ctx()
1135            .session_ctx()
1136            .config()
1137            .streaming_use_snapshot_backfill()
1138    }
1139
1140    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1141    pub fn target_columns_to_plan_mapping(
1142        &self,
1143        tar_cols: &[ColumnCatalog],
1144        user_specified_columns: bool,
1145    ) -> Vec<Option<usize>> {
1146        #[allow(clippy::disallowed_methods)]
1147        let visible_cols: Vec<(usize, String)> = self
1148            .out_fields
1149            .ones()
1150            .zip_eq(self.out_names.iter().cloned())
1151            .collect_vec();
1152
1153        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1154        let visible_col_idxes_by_name = visible_cols
1155            .iter()
1156            .map(|(i, name)| (name.as_ref(), *i))
1157            .collect::<BTreeMap<_, _>>();
1158
1159        tar_cols
1160            .iter()
1161            .enumerate()
1162            .filter(|(_, tar_col)| tar_col.can_dml())
1163            .map(|(tar_i, tar_col)| {
1164                if user_specified_columns {
1165                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1166                } else {
1167                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1168                }
1169            })
1170            .collect()
1171    }
1172}
1173
1174fn find_version_column_indices(
1175    column_catalog: &Vec<ColumnCatalog>,
1176    version_column_names: Vec<String>,
1177) -> Result<Vec<usize>> {
1178    let mut indices = Vec::new();
1179    for version_column_name in version_column_names {
1180        let mut found = false;
1181        for (index, column) in column_catalog.iter().enumerate() {
1182            if column.column_desc.name == version_column_name {
1183                if let &DataType::Jsonb
1184                | &DataType::List(_)
1185                | &DataType::Struct(_)
1186                | &DataType::Bytea
1187                | &DataType::Boolean = column.data_type()
1188                {
1189                    return Err(ErrorCode::InvalidInputSyntax(format!(
1190                        "Version column {} must be of a comparable data type",
1191                        version_column_name
1192                    ))
1193                    .into());
1194                }
1195                indices.push(index);
1196                found = true;
1197                break;
1198            }
1199        }
1200        if !found {
1201            return Err(ErrorCode::InvalidInputSyntax(format!(
1202                "Version column {} not found",
1203                version_column_name
1204            ))
1205            .into());
1206        }
1207    }
1208    Ok(indices)
1209}
1210
1211fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1212    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1213
1214    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1215    if let Some(error) = const_eval_rewriter.error {
1216        return Err(error);
1217    }
1218    Ok(plan)
1219}
1220
1221fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1222    ctx: OptimizerContextRef,
1223    plan: PlanRef<C>,
1224) -> Result<PlanRef<C>> {
1225    let mut v = TimestamptzExprFinder::default();
1226    plan.visit_exprs_recursive(&mut v);
1227    if v.has() {
1228        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1229    } else {
1230        Ok(plan)
1231    }
1232}
1233
1234fn exist_and_no_exchange_before(
1235    plan: &BatchPlanRef,
1236    is_candidate: fn(&BatchPlanRef) -> bool,
1237) -> bool {
1238    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1239        return false;
1240    }
1241    is_candidate(plan)
1242        || plan
1243            .inputs()
1244            .iter()
1245            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1246}
1247
1248impl BatchPlanRef {
1249    fn is_user_table_scan(&self) -> bool {
1250        self.node_type() == BatchPlanNodeType::BatchSeqScan
1251            || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1252            || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1253    }
1254
1255    fn is_source_scan(&self) -> bool {
1256        self.node_type() == BatchPlanNodeType::BatchSource
1257            || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1258            || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1259    }
1260
1261    fn is_insert(&self) -> bool {
1262        self.node_type() == BatchPlanNodeType::BatchInsert
1263    }
1264
1265    fn is_update(&self) -> bool {
1266        self.node_type() == BatchPlanNodeType::BatchUpdate
1267    }
1268
1269    fn is_delete(&self) -> bool {
1270        self.node_type() == BatchPlanNodeType::BatchDelete
1271    }
1272}
1273
1274/// As we always run the root stage locally, for some plan in root stage which need to execute in
1275/// compute node we insert an additional exhchange before it to avoid to include it in the root
1276/// stage.
1277///
1278/// Returns `true` if we must insert an additional exchange to ensure this.
1279fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1280    assert_eq!(plan.distribution(), &Distribution::Single);
1281    exist_and_no_exchange_before(&plan, |plan| {
1282        plan.is_user_table_scan()
1283            || plan.is_source_scan()
1284            || plan.is_insert()
1285            || plan.is_update()
1286            || plan.is_delete()
1287    })
1288}
1289
1290/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1291/// them for the different requirement of plan node in different execute mode.
1292fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1293    assert_eq!(plan.distribution(), &Distribution::Single);
1294    exist_and_no_exchange_before(&plan, |plan| {
1295        plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1296    })
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use super::*;
1302    use crate::optimizer::plan_node::LogicalValues;
1303
1304    #[tokio::test]
1305    async fn test_as_subplan() {
1306        let ctx = OptimizerContext::mock().await;
1307        let values = LogicalValues::new(
1308            vec![],
1309            Schema::new(vec![
1310                Field::with_name(DataType::Int32, "v1"),
1311                Field::with_name(DataType::Varchar, "v2"),
1312            ]),
1313            ctx,
1314        )
1315        .into();
1316        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1317        let out_names = vec!["v1".into()];
1318        let root = PlanRoot::new_with_logical_plan(
1319            values,
1320            RequiredDist::Any,
1321            Order::any(),
1322            out_fields,
1323            out_names,
1324        );
1325        let subplan = root.into_unordered_subplan();
1326        assert_eq!(
1327            subplan.schema(),
1328            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1329        );
1330    }
1331}