risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35    ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
36};
37use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70    ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86    StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87    ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{
90    LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
91};
92use crate::optimizer::property::Distribution;
93use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
94
95/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
96/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
97/// batch plan with optimization. the required Order and Distribution columns might be more than the
98/// output columns. for example:
99/// ```sql
100///    select v1 from t order by id;
101/// ```
102/// the plan will return two columns (id, v1), and the required order column is id. the id
103/// column is required in optimization, but the final generated plan will remove the unnecessary
104/// column in the result.
105#[derive(Educe)]
106#[educe(Debug, Clone)]
107pub struct PlanRoot<P: PlanPhase> {
108    // The current plan node.
109    pub plan: PlanRef<P::Convention>,
110    // The phase of the plan.
111    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
112    _phase: PhantomData<P>,
113    required_dist: RequiredDist,
114    required_order: Order,
115    out_fields: FixedBitSet,
116    out_names: Vec<String>,
117}
118
119/// `PlanPhase` is used to track the phase of the `PlanRoot`.
120/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
121/// Typical phase transformation are:
122/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
123/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
124pub trait PlanPhase {
125    type Convention: ConventionMarker;
126}
127
128macro_rules! for_all_phase {
129    () => {
130        for_all_phase! {
131            { Logical, $crate::optimizer::plan_node::Logical },
132            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
133            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
134            { Batch, $crate::optimizer::plan_node::Batch },
135            { Stream, $crate::optimizer::plan_node::Stream }
136        }
137    };
138    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
139        $(
140            paste::paste! {
141                pub struct [< PlanPhase$phase >];
142                impl PlanPhase for [< PlanPhase$phase >] {
143                    type Convention = $convention;
144                }
145                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
146            }
147        )+
148    }
149}
150
151for_all_phase!();
152
153impl LogicalPlanRoot {
154    pub fn new_with_logical_plan(
155        plan: LogicalPlanRef,
156        required_dist: RequiredDist,
157        required_order: Order,
158        out_fields: FixedBitSet,
159        out_names: Vec<String>,
160    ) -> Self {
161        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
162    }
163}
164
165impl BatchPlanRoot {
166    pub fn new_with_batch_plan(
167        plan: BatchPlanRef,
168        required_dist: RequiredDist,
169        required_order: Order,
170        out_fields: FixedBitSet,
171        out_names: Vec<String>,
172    ) -> Self {
173        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
174    }
175}
176
177impl<P: PlanPhase> PlanRoot<P> {
178    fn new_inner(
179        plan: PlanRef<P::Convention>,
180        required_dist: RequiredDist,
181        required_order: Order,
182        out_fields: FixedBitSet,
183        out_names: Vec<String>,
184    ) -> Self {
185        let input_schema = plan.schema();
186        assert_eq!(input_schema.fields().len(), out_fields.len());
187        assert_eq!(out_fields.count_ones(..), out_names.len());
188
189        Self {
190            plan,
191            _phase: PhantomData,
192            required_dist,
193            required_order,
194            out_fields,
195            out_names,
196        }
197    }
198
199    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
200        PlanRoot {
201            plan,
202            _phase: PhantomData,
203            required_dist: self.required_dist,
204            required_order: self.required_order,
205            out_fields: self.out_fields,
206            out_names: self.out_names,
207        }
208    }
209
210    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
211    /// b, ..)`.
212    ///
213    /// If the number of names does not match the number of output fields, an error is returned.
214    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
215        if out_names.len() != self.out_fields.count_ones(..) {
216            Err(ErrorCode::InvalidInputSyntax(
217                "number of column names does not match number of columns".to_owned(),
218            ))?
219        }
220        self.out_names = out_names;
221        Ok(())
222    }
223
224    /// Get the plan root's schema, only including the fields to be output.
225    pub fn schema(&self) -> Schema {
226        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
227        // as a field and always construct one on demand here to keep it in sync.
228        Schema {
229            fields: self
230                .out_fields
231                .ones()
232                .map(|i| self.plan.schema().fields()[i].clone())
233                .zip_eq_debug(&self.out_names)
234                .map(|(field, name)| Field {
235                    name: name.clone(),
236                    ..field
237                })
238                .collect(),
239        }
240    }
241}
242
243impl LogicalPlanRoot {
244    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
245    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
246    /// (`out_fields`).
247    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
248        if self.out_fields.count_ones(..) == self.out_fields.len() {
249            return self.plan;
250        }
251        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
252    }
253
254    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
255    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
256    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
257    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
258        use generic::Agg;
259        use plan_node::PlanAggCall;
260        use risingwave_common::types::ListValue;
261        use risingwave_expr::aggregate::PbAggKind;
262
263        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
264        use crate::utils::{Condition, IndexSet};
265
266        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
267            bail!("subquery must return only one column");
268        };
269        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
270        let return_type = DataType::list(input_column_type.clone());
271        let agg = Agg::new(
272            vec![PlanAggCall {
273                agg_type: PbAggKind::ArrayAgg.into(),
274                return_type: return_type.clone(),
275                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
276                distinct: false,
277                order_by: self.required_order.column_orders,
278                filter: Condition::true_cond(),
279                direct_args: vec![],
280            }],
281            IndexSet::empty(),
282            self.plan,
283        );
284        Ok(LogicalProject::create(
285            agg.into(),
286            vec![
287                FunctionCall::new(
288                    ExprType::Coalesce,
289                    vec![
290                        InputRef::new(0, return_type).into(),
291                        ExprImpl::literal_list(
292                            ListValue::empty(&input_column_type),
293                            input_column_type,
294                        ),
295                    ],
296                )
297                .unwrap()
298                .into(),
299            ],
300        ))
301    }
302
303    /// Apply logical optimization to the plan for stream.
304    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
305        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
306        Ok(self)
307    }
308
309    /// Apply logical optimization to the plan for batch.
310    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
311        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
312        Ok(self.into_phase(plan))
313    }
314
315    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
316        self.gen_optimized_logical_plan_for_batch()?
317            .gen_batch_plan()
318    }
319}
320
321impl BatchOptimizedLogicalPlanRoot {
322    /// Optimize and generate a singleton batch physical plan without exchange nodes.
323    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
324        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
325            return Err(ErrorCode::NotSupported(
326                "do not support temporal join for batch queries".to_owned(),
327                "please use temporal join in streaming queries".to_owned(),
328            )
329            .into());
330        }
331
332        let ctx = self.plan.ctx();
333        // Inline session timezone mainly for rewriting now()
334        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
335
336        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
337        plan = const_eval_exprs(plan)?;
338
339        if ctx.is_explain_trace() {
340            ctx.trace("Const eval exprs:");
341            ctx.trace(plan.explain_to_string());
342        }
343
344        // Convert to physical plan node
345        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
346        if ctx.is_explain_trace() {
347            ctx.trace("To Batch Plan:");
348            ctx.trace(plan.explain_to_string());
349        }
350
351        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
352            "Merge BatchProject",
353            vec![BatchProjectMergeRule::create()],
354            ApplyOrder::BottomUp,
355        ))?;
356
357        // Inline session timezone
358        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
359
360        if ctx.is_explain_trace() {
361            ctx.trace("Inline Session Timezone:");
362            ctx.trace(plan.explain_to_string());
363        }
364
365        #[cfg(debug_assertions)]
366        InputRefValidator.validate(plan.clone());
367        assert_eq!(
368            *plan.distribution(),
369            Distribution::Single,
370            "{}",
371            plan.explain_to_string()
372        );
373        assert!(
374            !has_batch_exchange(plan.clone()),
375            "{}",
376            plan.explain_to_string()
377        );
378
379        let ctx = plan.ctx();
380        if ctx.is_explain_trace() {
381            ctx.trace("To Batch Physical Plan:");
382            ctx.trace(plan.explain_to_string());
383        }
384
385        Ok(self.into_phase(plan))
386    }
387}
388
389impl BatchPlanRoot {
390    /// Optimize and generate a batch query plan for distributed execution.
391    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
392        self.required_dist = RequiredDist::single();
393        let mut plan = self.plan;
394
395        // Convert to distributed plan
396        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
397
398        let ctx = plan.ctx();
399        if ctx.is_explain_trace() {
400            ctx.trace("To Batch Distributed Plan:");
401            ctx.trace(plan.explain_to_string());
402        }
403        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
404            plan =
405                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
406        }
407
408        // Add Project if the any position of `self.out_fields` is set to zero.
409        if self.out_fields.count_ones(..) != self.out_fields.len() {
410            plan =
411                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
412        }
413
414        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
415        let plan = plan.optimize_by_rules(&OptimizationStage::new(
416            "Push Limit To Scan",
417            vec![BatchPushLimitToScanRule::create()],
418            ApplyOrder::BottomUp,
419        ))?;
420
421        let plan = plan.optimize_by_rules(&OptimizationStage::new(
422            "Iceberg Count Star",
423            vec![BatchIcebergCountStar::create()],
424            ApplyOrder::TopDown,
425        ))?;
426
427        // For iceberg scan, we do iceberg predicate pushdown
428        // BatchFilter -> BatchIcebergScan
429        let plan = plan.optimize_by_rules(&OptimizationStage::new(
430            "Iceberg Predicate Pushdown",
431            vec![BatchIcebergPredicatePushDownRule::create()],
432            ApplyOrder::BottomUp,
433        ))?;
434
435        Ok(plan)
436    }
437
438    /// Optimize and generate a batch query plan for local execution.
439    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
440        let mut plan = self.plan;
441
442        // Convert to local plan node
443        plan = plan.to_local_with_order_required(&self.required_order)?;
444
445        // We remark that since the `to_local_with_order_required` does not enforce single
446        // distribution, we enforce at the root if needed.
447        let insert_exchange = match plan.distribution() {
448            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
449            _ => true,
450        };
451        if insert_exchange {
452            plan =
453                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
454        }
455
456        // Add Project if the any position of `self.out_fields` is set to zero.
457        if self.out_fields.count_ones(..) != self.out_fields.len() {
458            plan =
459                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
460        }
461
462        let ctx = plan.ctx();
463        if ctx.is_explain_trace() {
464            ctx.trace("To Batch Local Plan:");
465            ctx.trace(plan.explain_to_string());
466        }
467
468        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
469        let plan = plan.optimize_by_rules(&OptimizationStage::new(
470            "Push Limit To Scan",
471            vec![BatchPushLimitToScanRule::create()],
472            ApplyOrder::BottomUp,
473        ))?;
474
475        let plan = plan.optimize_by_rules(&OptimizationStage::new(
476            "Iceberg Count Star",
477            vec![BatchIcebergCountStar::create()],
478            ApplyOrder::TopDown,
479        ))?;
480        Ok(plan)
481    }
482}
483
484impl LogicalPlanRoot {
485    /// Generate optimized stream plan
486    fn gen_optimized_stream_plan(
487        self,
488        emit_on_window_close: bool,
489        allow_snapshot_backfill: bool,
490    ) -> Result<StreamOptimizedLogicalPlanRoot> {
491        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
492            StreamScanType::SnapshotBackfill
493        } else if self.should_use_arrangement_backfill() {
494            StreamScanType::ArrangementBackfill
495        } else {
496            StreamScanType::Backfill
497        };
498        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
499    }
500
501    fn gen_optimized_stream_plan_inner(
502        self,
503        emit_on_window_close: bool,
504        stream_scan_type: StreamScanType,
505    ) -> Result<StreamOptimizedLogicalPlanRoot> {
506        let ctx = self.plan.ctx();
507        let _explain_trace = ctx.is_explain_trace();
508
509        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
510
511        let mut plan = optimized_plan
512            .plan
513            .clone()
514            .optimize_by_rules(&OptimizationStage::new(
515                "Merge StreamProject",
516                vec![StreamProjectMergeRule::create()],
517                ApplyOrder::BottomUp,
518            ))?;
519
520        if ctx
521            .session_ctx()
522            .config()
523            .streaming_separate_consecutive_join()
524        {
525            plan = plan.optimize_by_rules(&OptimizationStage::new(
526                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
527                vec![SeparateConsecutiveJoinRule::create()],
528                ApplyOrder::BottomUp,
529            ))?;
530        }
531
532        // Add Logstore for Unaligned join
533        // Apply this BEFORE delta join rule, because delta join removes
534        // the join
535        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
536            plan = plan.optimize_by_rules(&OptimizationStage::new(
537                "Add Logstore for Unaligned join",
538                vec![AddLogstoreRule::create()],
539                ApplyOrder::BottomUp,
540            ))?;
541        }
542
543        if ctx.session_ctx().config().streaming_enable_delta_join()
544            && ctx.session_ctx().config().enable_index_selection()
545        {
546            // TODO: make it a logical optimization.
547            // Rewrite joins with index to delta join
548            plan = plan.optimize_by_rules(&OptimizationStage::new(
549                "To IndexDeltaJoin",
550                vec![IndexDeltaJoinRule::create()],
551                ApplyOrder::BottomUp,
552            ))?;
553        }
554        // Inline session timezone
555        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
556
557        if ctx.is_explain_trace() {
558            ctx.trace("Inline session timezone:");
559            ctx.trace(plan.explain_to_string());
560        }
561
562        // Const eval of exprs at the last minute
563        plan = const_eval_exprs(plan)?;
564
565        if ctx.is_explain_trace() {
566            ctx.trace("Const eval exprs:");
567            ctx.trace(plan.explain_to_string());
568        }
569
570        #[cfg(debug_assertions)]
571        InputRefValidator.validate(plan.clone());
572
573        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
574            return Err(ErrorCode::NotSupported(
575                "exist dangling temporal scan".to_owned(),
576                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
577            ).into());
578        }
579
580        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
581            return Err(ErrorCode::NotSupported(
582                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
583                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
584            ).into());
585        }
586
587        if ctx.session_ctx().config().enable_locality_backfill()
588            && LocalityProviderCounter::count(plan.clone()) > 5
589        {
590            risingwave_common::license::Feature::LocalityBackfill.check_available()?
591        }
592
593        Ok(optimized_plan.into_phase(plan))
594    }
595
596    /// Generate create index or create materialize view plan.
597    fn gen_stream_plan(
598        self,
599        emit_on_window_close: bool,
600        stream_scan_type: StreamScanType,
601    ) -> Result<StreamOptimizedLogicalPlanRoot> {
602        let ctx = self.plan.ctx();
603        let explain_trace = ctx.is_explain_trace();
604
605        let plan = {
606            {
607                if !ctx
608                    .session_ctx()
609                    .config()
610                    .streaming_allow_jsonb_in_stream_key()
611                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
612                {
613                    return Err(ErrorCode::NotSupported(
614                        err,
615                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
616                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
617                    ).into());
618                }
619                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
620                let (plan, out_col_change) = {
621                    let (plan, out_col_change) = optimized_plan
622                        .plan
623                        .logical_rewrite_for_stream(&mut Default::default())?;
624                    if out_col_change.is_injective() {
625                        (plan, out_col_change)
626                    } else {
627                        let mut output_indices = (0..plan.schema().len()).collect_vec();
628                        #[allow(unused_assignments)]
629                        let (mut map, mut target_size) = out_col_change.into_parts();
630
631                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
632                        // assert_eq!(target_size, output_indices.len());
633                        target_size = plan.schema().len();
634                        let mut tar_exists = vec![false; target_size];
635                        for i in map.iter_mut().flatten() {
636                            if tar_exists[*i] {
637                                output_indices.push(*i);
638                                *i = target_size;
639                                target_size += 1;
640                            } else {
641                                tar_exists[*i] = true;
642                            }
643                        }
644                        let plan =
645                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
646                        let out_col_change = ColIndexMapping::new(map, target_size);
647                        (plan.into(), out_col_change)
648                    }
649                };
650
651                if explain_trace {
652                    ctx.trace("Logical Rewrite For Stream:");
653                    ctx.trace(plan.explain_to_string());
654                }
655
656                optimized_plan.required_dist =
657                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
658                optimized_plan.required_order = out_col_change
659                    .rewrite_required_order(&optimized_plan.required_order)
660                    .unwrap();
661                optimized_plan.out_fields =
662                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
663                let mut plan = plan.to_stream_with_dist_required(
664                    &optimized_plan.required_dist,
665                    &mut ToStreamContext::new_with_stream_scan_type(
666                        emit_on_window_close,
667                        stream_scan_type,
668                    ),
669                )?;
670                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
671                optimized_plan.into_phase(plan)
672            }
673        };
674
675        if explain_trace {
676            ctx.trace("To Stream Plan:");
677            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
678            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
679        }
680        Ok(plan)
681    }
682
683    /// Visit the plan root and compute the cardinality.
684    ///
685    /// Panics if not called on a logical plan.
686    fn compute_cardinality(&self) -> Cardinality {
687        CardinalityVisitor.visit(self.plan.clone())
688    }
689
690    /// Optimize and generate a create table plan.
691    pub fn gen_table_plan(
692        self,
693        context: OptimizerContextRef,
694        table_name: String,
695        database_id: DatabaseId,
696        schema_id: SchemaId,
697        CreateTableInfo {
698            columns,
699            pk_column_ids,
700            row_id_index,
701            watermark_descs,
702            source_catalog,
703            version,
704        }: CreateTableInfo,
705        CreateTableProps {
706            definition,
707            append_only,
708            on_conflict,
709            with_version_columns,
710            webhook_info,
711            engine,
712        }: CreateTableProps,
713    ) -> Result<StreamMaterialize> {
714        // Snapshot backfill is not allowed for create table
715        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
716
717        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
718
719        let pk_column_indices = {
720            let mut id_to_idx = HashMap::new();
721
722            columns.iter().enumerate().for_each(|(idx, c)| {
723                id_to_idx.insert(c.column_id(), idx);
724            });
725            pk_column_ids
726                .iter()
727                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
728                .collect_vec()
729        };
730
731        fn inject_project_for_generated_column_if_needed(
732            columns: &[ColumnCatalog],
733            node: StreamPlanRef,
734        ) -> Result<StreamPlanRef> {
735            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
736            if let Some(exprs) = exprs {
737                let logical_project = generic::Project::new(exprs, node);
738                return Ok(StreamProject::new(logical_project).into());
739            }
740            Ok(node)
741        }
742
743        #[derive(PartialEq, Debug, Copy, Clone)]
744        enum PrimaryKeyKind {
745            UserDefinedPrimaryKey,
746            NonAppendOnlyRowIdPk,
747            AppendOnlyRowIdPk,
748        }
749
750        fn inject_dml_node(
751            columns: &[ColumnCatalog],
752            append_only: bool,
753            stream_plan: StreamPlanRef,
754            pk_column_indices: &[usize],
755            kind: PrimaryKeyKind,
756            column_descs: Vec<ColumnDesc>,
757        ) -> Result<StreamPlanRef> {
758            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
759
760            // Add generated columns.
761            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
762
763            dml_node = match kind {
764                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
765                    RequiredDist::hash_shard(pk_column_indices)
766                        .streaming_enforce_if_not_satisfies(dml_node)?
767                }
768                PrimaryKeyKind::AppendOnlyRowIdPk => {
769                    StreamExchange::new_no_shuffle(dml_node).into()
770                }
771            };
772
773            Ok(dml_node)
774        }
775
776        let kind = if let Some(row_id_index) = row_id_index {
777            assert_eq!(
778                pk_column_indices.iter().exactly_one().copied().unwrap(),
779                row_id_index
780            );
781            if append_only {
782                PrimaryKeyKind::AppendOnlyRowIdPk
783            } else {
784                PrimaryKeyKind::NonAppendOnlyRowIdPk
785            }
786        } else {
787            PrimaryKeyKind::UserDefinedPrimaryKey
788        };
789
790        let column_descs: Vec<ColumnDesc> = columns
791            .iter()
792            .filter(|&c| c.can_dml())
793            .map(|c| c.column_desc.clone())
794            .collect();
795
796        let mut not_null_idxs = vec![];
797        for (idx, column) in column_descs.iter().enumerate() {
798            if !column.nullable {
799                not_null_idxs.push(idx);
800            }
801        }
802
803        let version_column_indices = if !with_version_columns.is_empty() {
804            find_version_column_indices(&columns, with_version_columns)?
805        } else {
806            vec![]
807        };
808
809        let with_external_source = source_catalog.is_some();
810        let (dml_source_node, external_source_node) = if with_external_source {
811            let dummy_source_node = LogicalSource::new(
812                None,
813                columns.clone(),
814                row_id_index,
815                SourceNodeKind::CreateTable,
816                context.clone(),
817                None,
818            )
819            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
820            let mut external_source_node = stream_plan.plan;
821            external_source_node =
822                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
823            external_source_node = match kind {
824                PrimaryKeyKind::UserDefinedPrimaryKey => {
825                    RequiredDist::hash_shard(&pk_column_indices)
826                        .streaming_enforce_if_not_satisfies(external_source_node)?
827                }
828
829                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
830                    StreamExchange::new_no_shuffle(external_source_node).into()
831                }
832            };
833            (dummy_source_node, Some(external_source_node))
834        } else {
835            (stream_plan.plan, None)
836        };
837
838        let dml_node = inject_dml_node(
839            &columns,
840            append_only,
841            dml_source_node,
842            &pk_column_indices,
843            kind,
844            column_descs,
845        )?;
846
847        let dists = external_source_node
848            .iter()
849            .map(|input| input.distribution())
850            .chain([dml_node.distribution()])
851            .unique()
852            .collect_vec();
853
854        let dist = match &dists[..] {
855            &[Distribution::SomeShard, Distribution::HashShard(_)]
856            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
857            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
858                dist.clone()
859            }
860            _ => {
861                unreachable!()
862            }
863        };
864
865        let generated_column_exprs =
866            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
867        let upstream_sink_union = StreamUpstreamSinkUnion::new(
868            context.clone(),
869            dml_node.schema(),
870            dml_node.stream_key(),
871            dist.clone(), // should always be the same as dist of `Union`
872            append_only,
873            row_id_index.is_none(),
874            generated_column_exprs,
875        );
876
877        let union_inputs = external_source_node
878            .into_iter()
879            .chain([dml_node, upstream_sink_union.into()])
880            .collect_vec();
881
882        let mut stream_plan = StreamUnion::new_with_dist(
883            Union {
884                all: true,
885                inputs: union_inputs,
886                source_col: None,
887            },
888            dist,
889        )
890        .into();
891
892        // Add WatermarkFilter node.
893        if !watermark_descs.is_empty() {
894            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
895        }
896
897        // Add RowIDGen node if needed.
898        if let Some(row_id_index) = row_id_index {
899            match kind {
900                PrimaryKeyKind::UserDefinedPrimaryKey => {
901                    unreachable!()
902                }
903                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
904                    stream_plan = StreamRowIdGen::new_with_dist(
905                        stream_plan,
906                        row_id_index,
907                        Distribution::HashShard(vec![row_id_index]),
908                    )
909                    .into();
910                }
911            }
912        }
913
914        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
915
916        if let ConflictBehavior::IgnoreConflict = conflict_behavior
917            && !version_column_indices.is_empty()
918        {
919            Err(ErrorCode::InvalidParameterValue(
920                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
921            ))?
922        }
923
924        let retention_seconds = context.with_options().retention_seconds();
925
926        let table_required_dist = {
927            let mut bitset = FixedBitSet::with_capacity(columns.len());
928            for idx in &pk_column_indices {
929                bitset.insert(*idx);
930            }
931            RequiredDist::ShardByKey(bitset)
932        };
933
934        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
935
936        if !not_null_idxs.is_empty() {
937            stream_plan =
938                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
939        }
940
941        // Determine if the table should be refreshable based on the connector type
942        let refreshable = source_catalog
943            .as_ref()
944            .map(|catalog| {
945                catalog.with_properties.is_batch_connector() || {
946                    matches!(
947                        catalog
948                            .refresh_mode
949                            .as_ref()
950                            .map(|refresh_mode| refresh_mode.refresh_mode),
951                        Some(Some(RefreshMode::FullRecompute(_)))
952                    )
953                }
954            })
955            .unwrap_or(false);
956
957        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
958        if refreshable && row_id_index.is_some() {
959            return Err(crate::error::ErrorCode::BindError(
960                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
961                    .to_owned(),
962            )
963            .into());
964        }
965
966        StreamMaterialize::create_for_table(
967            stream_plan,
968            table_name,
969            database_id,
970            schema_id,
971            table_required_dist,
972            Order::any(),
973            columns,
974            definition,
975            conflict_behavior,
976            version_column_indices,
977            pk_column_indices,
978            row_id_index,
979            version,
980            retention_seconds,
981            webhook_info,
982            engine,
983            refreshable,
984        )
985    }
986
987    /// Optimize and generate a create materialized view plan.
988    pub fn gen_materialize_plan(
989        self,
990        database_id: DatabaseId,
991        schema_id: SchemaId,
992        mv_name: String,
993        definition: String,
994        emit_on_window_close: bool,
995    ) -> Result<StreamMaterialize> {
996        let cardinality = self.compute_cardinality();
997        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
998        StreamMaterialize::create(
999            stream_plan,
1000            mv_name,
1001            database_id,
1002            schema_id,
1003            definition,
1004            TableType::MaterializedView,
1005            cardinality,
1006            None,
1007        )
1008    }
1009
1010    /// Optimize and generate a create index plan.
1011    pub fn gen_index_plan(
1012        self,
1013        index_name: String,
1014        database_id: DatabaseId,
1015        schema_id: SchemaId,
1016        definition: String,
1017        retention_seconds: Option<NonZeroU32>,
1018    ) -> Result<StreamMaterialize> {
1019        let cardinality = self.compute_cardinality();
1020        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1021
1022        StreamMaterialize::create(
1023            stream_plan,
1024            index_name,
1025            database_id,
1026            schema_id,
1027            definition,
1028            TableType::Index,
1029            cardinality,
1030            retention_seconds,
1031        )
1032    }
1033
1034    pub fn gen_vector_index_plan(
1035        self,
1036        index_name: String,
1037        database_id: DatabaseId,
1038        schema_id: SchemaId,
1039        definition: String,
1040        retention_seconds: Option<NonZeroU32>,
1041        vector_index_info: PbVectorIndexInfo,
1042    ) -> Result<StreamVectorIndexWrite> {
1043        let cardinality = self.compute_cardinality();
1044        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1045
1046        StreamVectorIndexWrite::create(
1047            stream_plan,
1048            index_name,
1049            database_id,
1050            schema_id,
1051            definition,
1052            cardinality,
1053            retention_seconds,
1054            vector_index_info,
1055        )
1056    }
1057
1058    /// Optimize and generate a create sink plan.
1059    #[allow(clippy::too_many_arguments)]
1060    pub fn gen_sink_plan(
1061        self,
1062        sink_name: String,
1063        definition: String,
1064        properties: WithOptionsSecResolved,
1065        emit_on_window_close: bool,
1066        db_name: String,
1067        sink_from_table_name: String,
1068        format_desc: Option<SinkFormatDesc>,
1069        without_backfill: bool,
1070        target_table: Option<Arc<TableCatalog>>,
1071        partition_info: Option<PartitionComputeInfo>,
1072        user_specified_columns: bool,
1073        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1074    ) -> Result<StreamSink> {
1075        let stream_scan_type = if without_backfill {
1076            StreamScanType::UpstreamOnly
1077        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1078            // Snapshot backfill on sink-into-table is not allowed
1079            StreamScanType::SnapshotBackfill
1080        } else if self.should_use_arrangement_backfill() {
1081            StreamScanType::ArrangementBackfill
1082        } else {
1083            StreamScanType::Backfill
1084        };
1085        if auto_refresh_schema_from_table.is_some()
1086            && stream_scan_type != StreamScanType::ArrangementBackfill
1087        {
1088            return Err(ErrorCode::InvalidInputSyntax(format!(
1089                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1090                stream_scan_type
1091            ))
1092            .into());
1093        }
1094        let stream_plan =
1095            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1096        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1097            let columns = t.columns_without_rw_timestamp();
1098            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1099        });
1100
1101        StreamSink::create(
1102            stream_plan,
1103            sink_name,
1104            db_name,
1105            sink_from_table_name,
1106            target_table,
1107            target_columns_to_plan_mapping,
1108            definition,
1109            properties,
1110            format_desc,
1111            partition_info,
1112            auto_refresh_schema_from_table,
1113        )
1114    }
1115}
1116
1117impl<P: PlanPhase> PlanRoot<P> {
1118    pub fn should_use_arrangement_backfill(&self) -> bool {
1119        let ctx = self.plan.ctx();
1120        let session_ctx = ctx.session_ctx();
1121        let arrangement_backfill_enabled = session_ctx
1122            .env()
1123            .streaming_config()
1124            .developer
1125            .enable_arrangement_backfill;
1126        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1127    }
1128
1129    pub fn should_use_snapshot_backfill(&self) -> bool {
1130        self.plan
1131            .ctx()
1132            .session_ctx()
1133            .config()
1134            .streaming_use_snapshot_backfill()
1135    }
1136
1137    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1138    pub fn target_columns_to_plan_mapping(
1139        &self,
1140        tar_cols: &[ColumnCatalog],
1141        user_specified_columns: bool,
1142    ) -> Vec<Option<usize>> {
1143        #[allow(clippy::disallowed_methods)]
1144        let visible_cols: Vec<(usize, String)> = self
1145            .out_fields
1146            .ones()
1147            .zip_eq(self.out_names.iter().cloned())
1148            .collect_vec();
1149
1150        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1151        let visible_col_idxes_by_name = visible_cols
1152            .iter()
1153            .map(|(i, name)| (name.as_ref(), *i))
1154            .collect::<BTreeMap<_, _>>();
1155
1156        tar_cols
1157            .iter()
1158            .enumerate()
1159            .filter(|(_, tar_col)| tar_col.can_dml())
1160            .map(|(tar_i, tar_col)| {
1161                if user_specified_columns {
1162                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1163                } else {
1164                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1165                }
1166            })
1167            .collect()
1168    }
1169}
1170
1171fn find_version_column_indices(
1172    column_catalog: &Vec<ColumnCatalog>,
1173    version_column_names: Vec<String>,
1174) -> Result<Vec<usize>> {
1175    let mut indices = Vec::new();
1176    for version_column_name in version_column_names {
1177        let mut found = false;
1178        for (index, column) in column_catalog.iter().enumerate() {
1179            if column.column_desc.name == version_column_name {
1180                if let &DataType::Jsonb
1181                | &DataType::List(_)
1182                | &DataType::Struct(_)
1183                | &DataType::Bytea
1184                | &DataType::Boolean = column.data_type()
1185                {
1186                    return Err(ErrorCode::InvalidInputSyntax(format!(
1187                        "Version column {} must be of a comparable data type",
1188                        version_column_name
1189                    ))
1190                    .into());
1191                }
1192                indices.push(index);
1193                found = true;
1194                break;
1195            }
1196        }
1197        if !found {
1198            return Err(ErrorCode::InvalidInputSyntax(format!(
1199                "Version column {} not found",
1200                version_column_name
1201            ))
1202            .into());
1203        }
1204    }
1205    Ok(indices)
1206}
1207
1208fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1209    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1210
1211    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1212    if let Some(error) = const_eval_rewriter.error {
1213        return Err(error);
1214    }
1215    Ok(plan)
1216}
1217
1218fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1219    ctx: OptimizerContextRef,
1220    plan: PlanRef<C>,
1221) -> Result<PlanRef<C>> {
1222    let mut v = TimestamptzExprFinder::default();
1223    plan.visit_exprs_recursive(&mut v);
1224    if v.has() {
1225        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1226    } else {
1227        Ok(plan)
1228    }
1229}
1230
1231fn exist_and_no_exchange_before(
1232    plan: &BatchPlanRef,
1233    is_candidate: fn(&BatchPlanRef) -> bool,
1234) -> bool {
1235    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1236        return false;
1237    }
1238    is_candidate(plan)
1239        || plan
1240            .inputs()
1241            .iter()
1242            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1243}
1244
1245impl BatchPlanRef {
1246    fn is_user_table_scan(&self) -> bool {
1247        self.node_type() == BatchPlanNodeType::BatchSeqScan
1248            || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1249            || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1250    }
1251
1252    fn is_source_scan(&self) -> bool {
1253        self.node_type() == BatchPlanNodeType::BatchSource
1254            || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1255            || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1256    }
1257
1258    fn is_insert(&self) -> bool {
1259        self.node_type() == BatchPlanNodeType::BatchInsert
1260    }
1261
1262    fn is_update(&self) -> bool {
1263        self.node_type() == BatchPlanNodeType::BatchUpdate
1264    }
1265
1266    fn is_delete(&self) -> bool {
1267        self.node_type() == BatchPlanNodeType::BatchDelete
1268    }
1269}
1270
1271/// As we always run the root stage locally, for some plan in root stage which need to execute in
1272/// compute node we insert an additional exhchange before it to avoid to include it in the root
1273/// stage.
1274///
1275/// Returns `true` if we must insert an additional exchange to ensure this.
1276fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1277    assert_eq!(plan.distribution(), &Distribution::Single);
1278    exist_and_no_exchange_before(&plan, |plan| {
1279        plan.is_user_table_scan()
1280            || plan.is_source_scan()
1281            || plan.is_insert()
1282            || plan.is_update()
1283            || plan.is_delete()
1284    })
1285}
1286
1287/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1288/// them for the different requirement of plan node in different execute mode.
1289fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1290    assert_eq!(plan.distribution(), &Distribution::Single);
1291    exist_and_no_exchange_before(&plan, |plan| {
1292        plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1293    })
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use super::*;
1299    use crate::optimizer::plan_node::LogicalValues;
1300
1301    #[tokio::test]
1302    async fn test_as_subplan() {
1303        let ctx = OptimizerContext::mock().await;
1304        let values = LogicalValues::new(
1305            vec![],
1306            Schema::new(vec![
1307                Field::with_name(DataType::Int32, "v1"),
1308                Field::with_name(DataType::Varchar, "v2"),
1309            ]),
1310            ctx,
1311        )
1312        .into();
1313        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1314        let out_names = vec!["v1".into()];
1315        let root = PlanRoot::new_with_logical_plan(
1316            values,
1317            RequiredDist::Any,
1318            Order::any(),
1319            out_fields,
1320            out_names,
1321        );
1322        let subplan = root.into_unordered_subplan();
1323        assert_eq!(
1324            subplan.schema(),
1325            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1326        );
1327    }
1328}