risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
36    SysTableVisitor,
37};
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70    ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86    StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87    ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
90use crate::optimizer::property::Distribution;
91use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
92
93/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
94/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
95/// batch plan with optimization. the required Order and Distribution columns might be more than the
96/// output columns. for example:
97/// ```sql
98///    select v1 from t order by id;
99/// ```
100/// the plan will return two columns (id, v1), and the required order column is id. the id
101/// column is required in optimization, but the final generated plan will remove the unnecessary
102/// column in the result.
103#[derive(Educe)]
104#[educe(Debug, Clone)]
105pub struct PlanRoot<P: PlanPhase> {
106    // The current plan node.
107    pub plan: PlanRef<P::Convention>,
108    // The phase of the plan.
109    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
110    _phase: PhantomData<P>,
111    required_dist: RequiredDist,
112    required_order: Order,
113    out_fields: FixedBitSet,
114    out_names: Vec<String>,
115}
116
117/// `PlanPhase` is used to track the phase of the `PlanRoot`.
118/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
119/// Typical phase transformation are:
120/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
121/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
122pub trait PlanPhase {
123    type Convention: ConventionMarker;
124}
125
126macro_rules! for_all_phase {
127    () => {
128        for_all_phase! {
129            { Logical, $crate::optimizer::plan_node::Logical },
130            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
131            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
132            { Batch, $crate::optimizer::plan_node::Batch },
133            { Stream, $crate::optimizer::plan_node::Stream }
134        }
135    };
136    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
137        $(
138            paste::paste! {
139                pub struct [< PlanPhase$phase >];
140                impl PlanPhase for [< PlanPhase$phase >] {
141                    type Convention = $convention;
142                }
143                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
144            }
145        )+
146    }
147}
148
149for_all_phase!();
150
151impl LogicalPlanRoot {
152    pub fn new_with_logical_plan(
153        plan: LogicalPlanRef,
154        required_dist: RequiredDist,
155        required_order: Order,
156        out_fields: FixedBitSet,
157        out_names: Vec<String>,
158    ) -> Self {
159        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
160    }
161}
162
163impl BatchPlanRoot {
164    pub fn new_with_batch_plan(
165        plan: BatchPlanRef,
166        required_dist: RequiredDist,
167        required_order: Order,
168        out_fields: FixedBitSet,
169        out_names: Vec<String>,
170    ) -> Self {
171        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
172    }
173}
174
175impl<P: PlanPhase> PlanRoot<P> {
176    fn new_inner(
177        plan: PlanRef<P::Convention>,
178        required_dist: RequiredDist,
179        required_order: Order,
180        out_fields: FixedBitSet,
181        out_names: Vec<String>,
182    ) -> Self {
183        let input_schema = plan.schema();
184        assert_eq!(input_schema.fields().len(), out_fields.len());
185        assert_eq!(out_fields.count_ones(..), out_names.len());
186
187        Self {
188            plan,
189            _phase: PhantomData,
190            required_dist,
191            required_order,
192            out_fields,
193            out_names,
194        }
195    }
196
197    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
198        PlanRoot {
199            plan,
200            _phase: PhantomData,
201            required_dist: self.required_dist,
202            required_order: self.required_order,
203            out_fields: self.out_fields,
204            out_names: self.out_names,
205        }
206    }
207
208    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
209    /// b, ..)`.
210    ///
211    /// If the number of names does not match the number of output fields, an error is returned.
212    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
213        if out_names.len() != self.out_fields.count_ones(..) {
214            Err(ErrorCode::InvalidInputSyntax(
215                "number of column names does not match number of columns".to_owned(),
216            ))?
217        }
218        self.out_names = out_names;
219        Ok(())
220    }
221
222    /// Get the plan root's schema, only including the fields to be output.
223    pub fn schema(&self) -> Schema {
224        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
225        // as a field and always construct one on demand here to keep it in sync.
226        Schema {
227            fields: self
228                .out_fields
229                .ones()
230                .map(|i| self.plan.schema().fields()[i].clone())
231                .zip_eq_debug(&self.out_names)
232                .map(|(field, name)| Field {
233                    name: name.clone(),
234                    ..field
235                })
236                .collect(),
237        }
238    }
239}
240
241impl LogicalPlanRoot {
242    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
243    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
244    /// (`out_fields`).
245    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
246        if self.out_fields.count_ones(..) == self.out_fields.len() {
247            return self.plan;
248        }
249        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
250    }
251
252    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
253    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
254    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
255    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
256        use generic::Agg;
257        use plan_node::PlanAggCall;
258        use risingwave_common::types::ListValue;
259        use risingwave_expr::aggregate::PbAggKind;
260
261        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
262        use crate::utils::{Condition, IndexSet};
263
264        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
265            bail!("subquery must return only one column");
266        };
267        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
268        let return_type = DataType::List(input_column_type.clone().into());
269        let agg = Agg::new(
270            vec![PlanAggCall {
271                agg_type: PbAggKind::ArrayAgg.into(),
272                return_type: return_type.clone(),
273                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
274                distinct: false,
275                order_by: self.required_order.column_orders,
276                filter: Condition::true_cond(),
277                direct_args: vec![],
278            }],
279            IndexSet::empty(),
280            self.plan,
281        );
282        Ok(LogicalProject::create(
283            agg.into(),
284            vec![
285                FunctionCall::new(
286                    ExprType::Coalesce,
287                    vec![
288                        InputRef::new(0, return_type).into(),
289                        ExprImpl::literal_list(
290                            ListValue::empty(&input_column_type),
291                            input_column_type,
292                        ),
293                    ],
294                )
295                .unwrap()
296                .into(),
297            ],
298        ))
299    }
300
301    /// Apply logical optimization to the plan for stream.
302    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
303        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
304        Ok(self)
305    }
306
307    /// Apply logical optimization to the plan for batch.
308    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
309        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
310        Ok(self.into_phase(plan))
311    }
312
313    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
314        self.gen_optimized_logical_plan_for_batch()?
315            .gen_batch_plan()
316    }
317}
318
319impl BatchOptimizedLogicalPlanRoot {
320    /// Optimize and generate a singleton batch physical plan without exchange nodes.
321    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
322        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
323            return Err(ErrorCode::NotSupported(
324                "do not support temporal join for batch queries".to_owned(),
325                "please use temporal join in streaming queries".to_owned(),
326            )
327            .into());
328        }
329
330        let ctx = self.plan.ctx();
331        // Inline session timezone mainly for rewriting now()
332        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
333
334        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
335        plan = const_eval_exprs(plan)?;
336
337        if ctx.is_explain_trace() {
338            ctx.trace("Const eval exprs:");
339            ctx.trace(plan.explain_to_string());
340        }
341
342        // Convert to physical plan node
343        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
344        if ctx.is_explain_trace() {
345            ctx.trace("To Batch Plan:");
346            ctx.trace(plan.explain_to_string());
347        }
348
349        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
350            "Merge BatchProject",
351            vec![BatchProjectMergeRule::create()],
352            ApplyOrder::BottomUp,
353        ))?;
354
355        // Inline session timezone
356        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
357
358        if ctx.is_explain_trace() {
359            ctx.trace("Inline Session Timezone:");
360            ctx.trace(plan.explain_to_string());
361        }
362
363        #[cfg(debug_assertions)]
364        InputRefValidator.validate(plan.clone());
365        assert!(
366            *plan.distribution() == Distribution::Single,
367            "{}",
368            plan.explain_to_string()
369        );
370        assert!(
371            !has_batch_exchange(plan.clone()),
372            "{}",
373            plan.explain_to_string()
374        );
375
376        let ctx = plan.ctx();
377        if ctx.is_explain_trace() {
378            ctx.trace("To Batch Physical Plan:");
379            ctx.trace(plan.explain_to_string());
380        }
381
382        Ok(self.into_phase(plan))
383    }
384}
385
386impl BatchPlanRoot {
387    /// Optimize and generate a batch query plan for distributed execution.
388    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
389        self.required_dist = RequiredDist::single();
390        let mut plan = self.plan;
391
392        // Convert to distributed plan
393        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
394
395        // Add Project if the any position of `self.out_fields` is set to zero.
396        if self.out_fields.count_ones(..) != self.out_fields.len() {
397            plan =
398                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
399        }
400
401        let ctx = plan.ctx();
402        if ctx.is_explain_trace() {
403            ctx.trace("To Batch Distributed Plan:");
404            ctx.trace(plan.explain_to_string());
405        }
406        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
407            plan =
408                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
409        }
410
411        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
412        let plan = plan.optimize_by_rules(&OptimizationStage::new(
413            "Push Limit To Scan",
414            vec![BatchPushLimitToScanRule::create()],
415            ApplyOrder::BottomUp,
416        ))?;
417
418        let plan = plan.optimize_by_rules(&OptimizationStage::new(
419            "Iceberg Count Star",
420            vec![BatchIcebergCountStar::create()],
421            ApplyOrder::TopDown,
422        ))?;
423
424        // For iceberg scan, we do iceberg predicate pushdown
425        // BatchFilter -> BatchIcebergScan
426        let plan = plan.optimize_by_rules(&OptimizationStage::new(
427            "Iceberg Predicate Pushdown",
428            vec![BatchIcebergPredicatePushDownRule::create()],
429            ApplyOrder::BottomUp,
430        ))?;
431
432        Ok(plan)
433    }
434
435    /// Optimize and generate a batch query plan for local execution.
436    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
437        let mut plan = self.plan;
438
439        // Convert to local plan node
440        plan = plan.to_local_with_order_required(&self.required_order)?;
441
442        // We remark that since the `to_local_with_order_required` does not enforce single
443        // distribution, we enforce at the root if needed.
444        let insert_exchange = match plan.distribution() {
445            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
446            _ => true,
447        };
448        if insert_exchange {
449            plan =
450                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
451        }
452
453        // Add Project if the any position of `self.out_fields` is set to zero.
454        if self.out_fields.count_ones(..) != self.out_fields.len() {
455            plan =
456                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
457        }
458
459        let ctx = plan.ctx();
460        if ctx.is_explain_trace() {
461            ctx.trace("To Batch Local Plan:");
462            ctx.trace(plan.explain_to_string());
463        }
464
465        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
466        let plan = plan.optimize_by_rules(&OptimizationStage::new(
467            "Push Limit To Scan",
468            vec![BatchPushLimitToScanRule::create()],
469            ApplyOrder::BottomUp,
470        ))?;
471
472        let plan = plan.optimize_by_rules(&OptimizationStage::new(
473            "Iceberg Count Star",
474            vec![BatchIcebergCountStar::create()],
475            ApplyOrder::TopDown,
476        ))?;
477        Ok(plan)
478    }
479}
480
481impl LogicalPlanRoot {
482    /// Generate optimized stream plan
483    fn gen_optimized_stream_plan(
484        self,
485        emit_on_window_close: bool,
486        allow_snapshot_backfill: bool,
487    ) -> Result<StreamOptimizedLogicalPlanRoot> {
488        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
489            StreamScanType::SnapshotBackfill
490        } else if self.should_use_arrangement_backfill() {
491            StreamScanType::ArrangementBackfill
492        } else {
493            StreamScanType::Backfill
494        };
495        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
496    }
497
498    fn gen_optimized_stream_plan_inner(
499        self,
500        emit_on_window_close: bool,
501        stream_scan_type: StreamScanType,
502    ) -> Result<StreamOptimizedLogicalPlanRoot> {
503        let ctx = self.plan.ctx();
504        let _explain_trace = ctx.is_explain_trace();
505
506        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
507
508        let mut plan = optimized_plan
509            .plan
510            .clone()
511            .optimize_by_rules(&OptimizationStage::new(
512                "Merge StreamProject",
513                vec![StreamProjectMergeRule::create()],
514                ApplyOrder::BottomUp,
515            ))?;
516
517        if ctx
518            .session_ctx()
519            .config()
520            .streaming_separate_consecutive_join()
521        {
522            plan = plan.optimize_by_rules(&OptimizationStage::new(
523                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
524                vec![SeparateConsecutiveJoinRule::create()],
525                ApplyOrder::BottomUp,
526            ))?;
527        }
528
529        // Add Logstore for Unaligned join
530        // Apply this BEFORE delta join rule, because delta join removes
531        // the join
532        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
533            plan = plan.optimize_by_rules(&OptimizationStage::new(
534                "Add Logstore for Unaligned join",
535                vec![AddLogstoreRule::create()],
536                ApplyOrder::BottomUp,
537            ))?;
538        }
539
540        if ctx.session_ctx().config().streaming_enable_delta_join()
541            && ctx.session_ctx().config().enable_index_selection()
542        {
543            // TODO: make it a logical optimization.
544            // Rewrite joins with index to delta join
545            plan = plan.optimize_by_rules(&OptimizationStage::new(
546                "To IndexDeltaJoin",
547                vec![IndexDeltaJoinRule::create()],
548                ApplyOrder::BottomUp,
549            ))?;
550        }
551        // Inline session timezone
552        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
553
554        if ctx.is_explain_trace() {
555            ctx.trace("Inline session timezone:");
556            ctx.trace(plan.explain_to_string());
557        }
558
559        // Const eval of exprs at the last minute
560        plan = const_eval_exprs(plan)?;
561
562        if ctx.is_explain_trace() {
563            ctx.trace("Const eval exprs:");
564            ctx.trace(plan.explain_to_string());
565        }
566
567        #[cfg(debug_assertions)]
568        InputRefValidator.validate(plan.clone());
569
570        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
571            return Err(ErrorCode::NotSupported(
572                "exist dangling temporal scan".to_owned(),
573                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
574            ).into());
575        }
576
577        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
578            return Err(ErrorCode::NotSupported(
579                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
580                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
581            ).into());
582        }
583
584        Ok(optimized_plan.into_phase(plan))
585    }
586
587    /// Generate create index or create materialize view plan.
588    fn gen_stream_plan(
589        self,
590        emit_on_window_close: bool,
591        stream_scan_type: StreamScanType,
592    ) -> Result<StreamOptimizedLogicalPlanRoot> {
593        let ctx = self.plan.ctx();
594        let explain_trace = ctx.is_explain_trace();
595
596        let plan = {
597            {
598                if !ctx
599                    .session_ctx()
600                    .config()
601                    .streaming_allow_jsonb_in_stream_key()
602                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
603                {
604                    return Err(ErrorCode::NotSupported(
605                        err,
606                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
607                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
608                    ).into());
609                }
610                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
611                let (plan, out_col_change) = {
612                    let (plan, out_col_change) = optimized_plan
613                        .plan
614                        .logical_rewrite_for_stream(&mut Default::default())?;
615                    if out_col_change.is_injective() {
616                        (plan, out_col_change)
617                    } else {
618                        let mut output_indices = (0..plan.schema().len()).collect_vec();
619                        #[allow(unused_assignments)]
620                        let (mut map, mut target_size) = out_col_change.into_parts();
621
622                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
623                        // assert_eq!(target_size, output_indices.len());
624                        target_size = plan.schema().len();
625                        let mut tar_exists = vec![false; target_size];
626                        for i in map.iter_mut().flatten() {
627                            if tar_exists[*i] {
628                                output_indices.push(*i);
629                                *i = target_size;
630                                target_size += 1;
631                            } else {
632                                tar_exists[*i] = true;
633                            }
634                        }
635                        let plan =
636                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
637                        let out_col_change = ColIndexMapping::new(map, target_size);
638                        (plan.into(), out_col_change)
639                    }
640                };
641
642                if explain_trace {
643                    ctx.trace("Logical Rewrite For Stream:");
644                    ctx.trace(plan.explain_to_string());
645                }
646
647                optimized_plan.required_dist =
648                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
649                optimized_plan.required_order = out_col_change
650                    .rewrite_required_order(&optimized_plan.required_order)
651                    .unwrap();
652                optimized_plan.out_fields =
653                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
654                let mut plan = plan.to_stream_with_dist_required(
655                    &optimized_plan.required_dist,
656                    &mut ToStreamContext::new_with_stream_scan_type(
657                        emit_on_window_close,
658                        stream_scan_type,
659                    ),
660                )?;
661                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
662                optimized_plan.into_phase(plan)
663            }
664        };
665
666        if explain_trace {
667            ctx.trace("To Stream Plan:");
668            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
669            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
670        }
671        Ok(plan)
672    }
673
674    /// Visit the plan root and compute the cardinality.
675    ///
676    /// Panics if not called on a logical plan.
677    fn compute_cardinality(&self) -> Cardinality {
678        CardinalityVisitor.visit(self.plan.clone())
679    }
680
681    /// Optimize and generate a create table plan.
682    pub fn gen_table_plan(
683        self,
684        context: OptimizerContextRef,
685        table_name: String,
686        database_id: DatabaseId,
687        schema_id: SchemaId,
688        CreateTableInfo {
689            columns,
690            pk_column_ids,
691            row_id_index,
692            watermark_descs,
693            source_catalog,
694            version,
695        }: CreateTableInfo,
696        CreateTableProps {
697            definition,
698            append_only,
699            on_conflict,
700            with_version_columns,
701            webhook_info,
702            engine,
703        }: CreateTableProps,
704    ) -> Result<StreamMaterialize> {
705        // Snapshot backfill is not allowed for create table
706        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
707
708        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
709
710        let pk_column_indices = {
711            let mut id_to_idx = HashMap::new();
712
713            columns.iter().enumerate().for_each(|(idx, c)| {
714                id_to_idx.insert(c.column_id(), idx);
715            });
716            pk_column_ids
717                .iter()
718                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
719                .collect_vec()
720        };
721
722        fn inject_project_for_generated_column_if_needed(
723            columns: &[ColumnCatalog],
724            node: StreamPlanRef,
725        ) -> Result<StreamPlanRef> {
726            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
727            if let Some(exprs) = exprs {
728                let logical_project = generic::Project::new(exprs, node);
729                return Ok(StreamProject::new(logical_project).into());
730            }
731            Ok(node)
732        }
733
734        #[derive(PartialEq, Debug, Copy, Clone)]
735        enum PrimaryKeyKind {
736            UserDefinedPrimaryKey,
737            NonAppendOnlyRowIdPk,
738            AppendOnlyRowIdPk,
739        }
740
741        fn inject_dml_node(
742            columns: &[ColumnCatalog],
743            append_only: bool,
744            stream_plan: StreamPlanRef,
745            pk_column_indices: &[usize],
746            kind: PrimaryKeyKind,
747            column_descs: Vec<ColumnDesc>,
748        ) -> Result<StreamPlanRef> {
749            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
750
751            // Add generated columns.
752            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
753
754            dml_node = match kind {
755                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
756                    RequiredDist::hash_shard(pk_column_indices)
757                        .streaming_enforce_if_not_satisfies(dml_node)?
758                }
759                PrimaryKeyKind::AppendOnlyRowIdPk => {
760                    StreamExchange::new_no_shuffle(dml_node).into()
761                }
762            };
763
764            Ok(dml_node)
765        }
766
767        let kind = if let Some(row_id_index) = row_id_index {
768            assert_eq!(
769                pk_column_indices.iter().exactly_one().copied().unwrap(),
770                row_id_index
771            );
772            if append_only {
773                PrimaryKeyKind::AppendOnlyRowIdPk
774            } else {
775                PrimaryKeyKind::NonAppendOnlyRowIdPk
776            }
777        } else {
778            PrimaryKeyKind::UserDefinedPrimaryKey
779        };
780
781        let column_descs: Vec<ColumnDesc> = columns
782            .iter()
783            .filter(|&c| c.can_dml())
784            .map(|c| c.column_desc.clone())
785            .collect();
786
787        let mut not_null_idxs = vec![];
788        for (idx, column) in column_descs.iter().enumerate() {
789            if !column.nullable {
790                not_null_idxs.push(idx);
791            }
792        }
793
794        let version_column_indices = if !with_version_columns.is_empty() {
795            find_version_column_indices(&columns, with_version_columns)?
796        } else {
797            vec![]
798        };
799
800        let with_external_source = source_catalog.is_some();
801        let (dml_source_node, external_source_node) = if with_external_source {
802            let dummy_source_node = LogicalSource::new(
803                None,
804                columns.clone(),
805                row_id_index,
806                SourceNodeKind::CreateTable,
807                context.clone(),
808                None,
809            )
810            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
811            let mut external_source_node = stream_plan.plan;
812            external_source_node =
813                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
814            external_source_node = match kind {
815                PrimaryKeyKind::UserDefinedPrimaryKey => {
816                    RequiredDist::hash_shard(&pk_column_indices)
817                        .streaming_enforce_if_not_satisfies(external_source_node)?
818                }
819
820                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
821                    StreamExchange::new_no_shuffle(external_source_node).into()
822                }
823            };
824            (dummy_source_node, Some(external_source_node))
825        } else {
826            (stream_plan.plan, None)
827        };
828
829        let dml_node = inject_dml_node(
830            &columns,
831            append_only,
832            dml_source_node,
833            &pk_column_indices,
834            kind,
835            column_descs,
836        )?;
837
838        let dists = external_source_node
839            .iter()
840            .map(|input| input.distribution())
841            .chain([dml_node.distribution()])
842            .unique()
843            .collect_vec();
844
845        let dist = match &dists[..] {
846            &[Distribution::SomeShard, Distribution::HashShard(_)]
847            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
848            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
849                dist.clone()
850            }
851            _ => {
852                unreachable!()
853            }
854        };
855
856        let generated_column_exprs =
857            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
858        let upstream_sink_union = StreamUpstreamSinkUnion::new(
859            context.clone(),
860            dml_node.schema(),
861            dml_node.stream_key(),
862            dist.clone(), // should always be the same as dist of `Union`
863            append_only,
864            row_id_index.is_none(),
865            generated_column_exprs,
866        );
867
868        let union_inputs = external_source_node
869            .into_iter()
870            .chain([dml_node, upstream_sink_union.into()])
871            .collect_vec();
872
873        let mut stream_plan = StreamUnion::new_with_dist(
874            Union {
875                all: true,
876                inputs: union_inputs,
877                source_col: None,
878            },
879            dist,
880        )
881        .into();
882
883        // Add WatermarkFilter node.
884        if !watermark_descs.is_empty() {
885            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
886        }
887
888        // Add RowIDGen node if needed.
889        if let Some(row_id_index) = row_id_index {
890            match kind {
891                PrimaryKeyKind::UserDefinedPrimaryKey => {
892                    unreachable!()
893                }
894                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
895                    stream_plan = StreamRowIdGen::new_with_dist(
896                        stream_plan,
897                        row_id_index,
898                        Distribution::HashShard(vec![row_id_index]),
899                    )
900                    .into();
901                }
902            }
903        }
904
905        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
906
907        if let ConflictBehavior::IgnoreConflict = conflict_behavior
908            && !version_column_indices.is_empty()
909        {
910            Err(ErrorCode::InvalidParameterValue(
911                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
912            ))?
913        }
914
915        let retention_seconds = context.with_options().retention_seconds();
916
917        let table_required_dist = {
918            let mut bitset = FixedBitSet::with_capacity(columns.len());
919            for idx in &pk_column_indices {
920                bitset.insert(*idx);
921            }
922            RequiredDist::ShardByKey(bitset)
923        };
924
925        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
926
927        if !not_null_idxs.is_empty() {
928            stream_plan =
929                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
930        }
931
932        // Determine if the table should be refreshable based on the connector type
933        let refreshable = source_catalog
934            .as_ref()
935            .map(|catalog| catalog.with_properties.is_batch_connector())
936            .unwrap_or(false);
937
938        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
939        if refreshable && row_id_index.is_some() {
940            return Err(crate::error::ErrorCode::BindError(
941                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
942                    .to_owned(),
943            )
944            .into());
945        }
946
947        StreamMaterialize::create_for_table(
948            stream_plan,
949            table_name,
950            database_id,
951            schema_id,
952            table_required_dist,
953            Order::any(),
954            columns,
955            definition,
956            conflict_behavior,
957            version_column_indices,
958            pk_column_indices,
959            row_id_index,
960            version,
961            retention_seconds,
962            webhook_info,
963            engine,
964            refreshable,
965        )
966    }
967
968    /// Optimize and generate a create materialized view plan.
969    pub fn gen_materialize_plan(
970        self,
971        database_id: DatabaseId,
972        schema_id: SchemaId,
973        mv_name: String,
974        definition: String,
975        emit_on_window_close: bool,
976    ) -> Result<StreamMaterialize> {
977        let cardinality = self.compute_cardinality();
978        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
979        StreamMaterialize::create(
980            stream_plan,
981            mv_name,
982            database_id,
983            schema_id,
984            definition,
985            TableType::MaterializedView,
986            cardinality,
987            None,
988        )
989    }
990
991    /// Optimize and generate a create index plan.
992    pub fn gen_index_plan(
993        self,
994        index_name: String,
995        database_id: DatabaseId,
996        schema_id: SchemaId,
997        definition: String,
998        retention_seconds: Option<NonZeroU32>,
999    ) -> Result<StreamMaterialize> {
1000        let cardinality = self.compute_cardinality();
1001        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1002
1003        StreamMaterialize::create(
1004            stream_plan,
1005            index_name,
1006            database_id,
1007            schema_id,
1008            definition,
1009            TableType::Index,
1010            cardinality,
1011            retention_seconds,
1012        )
1013    }
1014
1015    pub fn gen_vector_index_plan(
1016        self,
1017        index_name: String,
1018        database_id: DatabaseId,
1019        schema_id: SchemaId,
1020        definition: String,
1021        retention_seconds: Option<NonZeroU32>,
1022        vector_index_info: PbVectorIndexInfo,
1023    ) -> Result<StreamVectorIndexWrite> {
1024        let cardinality = self.compute_cardinality();
1025        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1026
1027        StreamVectorIndexWrite::create(
1028            stream_plan,
1029            index_name,
1030            database_id,
1031            schema_id,
1032            definition,
1033            cardinality,
1034            retention_seconds,
1035            vector_index_info,
1036        )
1037    }
1038
1039    /// Optimize and generate a create sink plan.
1040    #[allow(clippy::too_many_arguments)]
1041    pub fn gen_sink_plan(
1042        self,
1043        sink_name: String,
1044        definition: String,
1045        properties: WithOptionsSecResolved,
1046        emit_on_window_close: bool,
1047        db_name: String,
1048        sink_from_table_name: String,
1049        format_desc: Option<SinkFormatDesc>,
1050        without_backfill: bool,
1051        target_table: Option<Arc<TableCatalog>>,
1052        partition_info: Option<PartitionComputeInfo>,
1053        user_specified_columns: bool,
1054        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1055    ) -> Result<StreamSink> {
1056        let stream_scan_type = if without_backfill {
1057            StreamScanType::UpstreamOnly
1058        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1059            // Snapshot backfill on sink-into-table is not allowed
1060            StreamScanType::SnapshotBackfill
1061        } else if self.should_use_arrangement_backfill() {
1062            StreamScanType::ArrangementBackfill
1063        } else {
1064            StreamScanType::Backfill
1065        };
1066        if auto_refresh_schema_from_table.is_some()
1067            && stream_scan_type != StreamScanType::ArrangementBackfill
1068        {
1069            return Err(ErrorCode::InvalidInputSyntax(format!(
1070                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1071                stream_scan_type
1072            ))
1073            .into());
1074        }
1075        let stream_plan =
1076            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1077        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1078            let columns = t.columns_without_rw_timestamp();
1079            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1080        });
1081
1082        StreamSink::create(
1083            stream_plan,
1084            sink_name,
1085            db_name,
1086            sink_from_table_name,
1087            target_table,
1088            target_columns_to_plan_mapping,
1089            definition,
1090            properties,
1091            format_desc,
1092            partition_info,
1093            auto_refresh_schema_from_table,
1094        )
1095    }
1096}
1097
1098impl<P: PlanPhase> PlanRoot<P> {
1099    pub fn should_use_arrangement_backfill(&self) -> bool {
1100        let ctx = self.plan.ctx();
1101        let session_ctx = ctx.session_ctx();
1102        let arrangement_backfill_enabled = session_ctx
1103            .env()
1104            .streaming_config()
1105            .developer
1106            .enable_arrangement_backfill;
1107        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1108    }
1109
1110    pub fn should_use_snapshot_backfill(&self) -> bool {
1111        self.plan
1112            .ctx()
1113            .session_ctx()
1114            .config()
1115            .streaming_use_snapshot_backfill()
1116    }
1117
1118    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1119    pub fn target_columns_to_plan_mapping(
1120        &self,
1121        tar_cols: &[ColumnCatalog],
1122        user_specified_columns: bool,
1123    ) -> Vec<Option<usize>> {
1124        #[allow(clippy::disallowed_methods)]
1125        let visible_cols: Vec<(usize, String)> = self
1126            .out_fields
1127            .ones()
1128            .zip_eq(self.out_names.iter().cloned())
1129            .collect_vec();
1130
1131        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1132        let visible_col_idxes_by_name = visible_cols
1133            .iter()
1134            .map(|(i, name)| (name.as_ref(), *i))
1135            .collect::<BTreeMap<_, _>>();
1136
1137        tar_cols
1138            .iter()
1139            .enumerate()
1140            .filter(|(_, tar_col)| tar_col.can_dml())
1141            .map(|(tar_i, tar_col)| {
1142                if user_specified_columns {
1143                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1144                } else {
1145                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1146                }
1147            })
1148            .collect()
1149    }
1150}
1151
1152fn find_version_column_indices(
1153    column_catalog: &Vec<ColumnCatalog>,
1154    version_column_names: Vec<String>,
1155) -> Result<Vec<usize>> {
1156    let mut indices = Vec::new();
1157    for version_column_name in version_column_names {
1158        let mut found = false;
1159        for (index, column) in column_catalog.iter().enumerate() {
1160            if column.column_desc.name == version_column_name {
1161                if let &DataType::Jsonb
1162                | &DataType::List(_)
1163                | &DataType::Struct(_)
1164                | &DataType::Bytea
1165                | &DataType::Boolean = column.data_type()
1166                {
1167                    return Err(ErrorCode::InvalidInputSyntax(format!(
1168                        "Version column {} must be of a comparable data type",
1169                        version_column_name
1170                    ))
1171                    .into());
1172                }
1173                indices.push(index);
1174                found = true;
1175                break;
1176            }
1177        }
1178        if !found {
1179            return Err(ErrorCode::InvalidInputSyntax(format!(
1180                "Version column {} not found",
1181                version_column_name
1182            ))
1183            .into());
1184        }
1185    }
1186    Ok(indices)
1187}
1188
1189fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1190    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1191
1192    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1193    if let Some(error) = const_eval_rewriter.error {
1194        return Err(error);
1195    }
1196    Ok(plan)
1197}
1198
1199fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1200    ctx: OptimizerContextRef,
1201    plan: PlanRef<C>,
1202) -> Result<PlanRef<C>> {
1203    let mut v = TimestamptzExprFinder::default();
1204    plan.visit_exprs_recursive(&mut v);
1205    if v.has() {
1206        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1207    } else {
1208        Ok(plan)
1209    }
1210}
1211
1212fn exist_and_no_exchange_before(
1213    plan: &BatchPlanRef,
1214    is_candidate: fn(&BatchPlanRef) -> bool,
1215) -> bool {
1216    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1217        return false;
1218    }
1219    is_candidate(plan)
1220        || plan
1221            .inputs()
1222            .iter()
1223            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1224}
1225
1226/// As we always run the root stage locally, for some plan in root stage which need to execute in
1227/// compute node we insert an additional exhchange before it to avoid to include it in the root
1228/// stage.
1229///
1230/// Returns `true` if we must insert an additional exchange to ensure this.
1231fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1232    fn is_user_table(plan: &BatchPlanRef) -> bool {
1233        plan.node_type() == BatchPlanNodeType::BatchSeqScan
1234    }
1235
1236    fn is_log_table(plan: &BatchPlanRef) -> bool {
1237        plan.node_type() == BatchPlanNodeType::BatchLogSeqScan
1238    }
1239
1240    fn is_source(plan: &BatchPlanRef) -> bool {
1241        plan.node_type() == BatchPlanNodeType::BatchSource
1242            || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1243            || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1244    }
1245
1246    fn is_insert(plan: &BatchPlanRef) -> bool {
1247        plan.node_type() == BatchPlanNodeType::BatchInsert
1248    }
1249
1250    fn is_update(plan: &BatchPlanRef) -> bool {
1251        plan.node_type() == BatchPlanNodeType::BatchUpdate
1252    }
1253
1254    fn is_delete(plan: &BatchPlanRef) -> bool {
1255        plan.node_type() == BatchPlanNodeType::BatchDelete
1256    }
1257
1258    assert_eq!(plan.distribution(), &Distribution::Single);
1259    exist_and_no_exchange_before(&plan, is_user_table)
1260        || exist_and_no_exchange_before(&plan, is_source)
1261        || exist_and_no_exchange_before(&plan, is_insert)
1262        || exist_and_no_exchange_before(&plan, is_update)
1263        || exist_and_no_exchange_before(&plan, is_delete)
1264        || exist_and_no_exchange_before(&plan, is_log_table)
1265}
1266
1267/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1268/// them for the different requirement of plan node in different execute mode.
1269fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1270    fn is_user_table(plan: &BatchPlanRef) -> bool {
1271        plan.node_type() == BatchPlanNodeType::BatchSeqScan
1272            || plan.node_type() == BatchPlanNodeType::BatchVectorSearch
1273    }
1274
1275    fn is_source(plan: &BatchPlanRef) -> bool {
1276        plan.node_type() == BatchPlanNodeType::BatchSource
1277            || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1278            || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1279    }
1280
1281    fn is_insert(plan: &BatchPlanRef) -> bool {
1282        plan.node_type() == BatchPlanNodeType::BatchInsert
1283    }
1284
1285    assert_eq!(plan.distribution(), &Distribution::Single);
1286    exist_and_no_exchange_before(&plan, is_user_table)
1287        || exist_and_no_exchange_before(&plan, is_source)
1288        || exist_and_no_exchange_before(&plan, is_insert)
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use super::*;
1294    use crate::optimizer::plan_node::LogicalValues;
1295
1296    #[tokio::test]
1297    async fn test_as_subplan() {
1298        let ctx = OptimizerContext::mock().await;
1299        let values = LogicalValues::new(
1300            vec![],
1301            Schema::new(vec![
1302                Field::with_name(DataType::Int32, "v1"),
1303                Field::with_name(DataType::Varchar, "v2"),
1304            ]),
1305            ctx,
1306        )
1307        .into();
1308        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1309        let out_names = vec!["v1".into()];
1310        let root = PlanRoot::new_with_logical_plan(
1311            values,
1312            RequiredDist::Any,
1313            Order::any(),
1314            out_fields,
1315            out_names,
1316        );
1317        let subplan = root.into_unordered_subplan();
1318        assert_eq!(
1319            subplan.schema(),
1320            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1321        );
1322    }
1323}