risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29mod plan_visitor;
30
31pub use plan_visitor::{
32    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
33    SysTableVisitor,
34};
35
36pub mod backfill_order_strategy;
37mod logical_optimization;
38mod optimizer_context;
39pub mod plan_expr_rewriter;
40mod plan_expr_visitor;
41mod rule;
42
43use std::collections::{BTreeMap, HashMap};
44use std::marker::PhantomData;
45
46use educe::Educe;
47use fixedbitset::FixedBitSet;
48use itertools::Itertools as _;
49pub use logical_optimization::*;
50pub use optimizer_context::*;
51use plan_expr_rewriter::ConstEvalRewriter;
52use property::Order;
53use risingwave_common::bail;
54use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
55use risingwave_common::types::DataType;
56use risingwave_common::util::column_index_mapping::ColIndexMapping;
57use risingwave_common::util::iter_util::ZipEqDebug;
58use risingwave_connector::WithPropertiesExt;
59use risingwave_connector::sink::catalog::SinkFormatDesc;
60use risingwave_pb::stream_plan::StreamScanType;
61
62use self::heuristic_optimizer::ApplyOrder;
63use self::plan_node::generic::{self, PhysicalPlanRef};
64use self::plan_node::{
65    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
66    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
67    ToStreamContext, stream_enforce_eowc_requirement,
68};
69#[cfg(debug_assertions)]
70use self::plan_visitor::InputRefValidator;
71use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
72use self::property::{Cardinality, RequiredDist};
73use self::rule::*;
74use crate::TableCatalog;
75use crate::catalog::table_catalog::TableType;
76use crate::catalog::{DatabaseId, SchemaId};
77use crate::error::{ErrorCode, Result};
78use crate::expr::TimestamptzExprFinder;
79use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
80use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
81use crate::optimizer::plan_node::{
82    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
83    StreamExchange, StreamPlanRef, StreamUnion, ToStream, VisitExprsRecursive,
84};
85use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
86use crate::optimizer::property::Distribution;
87use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
88
89/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
90/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
91/// batch plan with optimization. the required Order and Distribution columns might be more than the
92/// output columns. for example:
93/// ```sql
94///    select v1 from t order by id;
95/// ```
96/// the plan will return two columns (id, v1), and the required order column is id. the id
97/// column is required in optimization, but the final generated plan will remove the unnecessary
98/// column in the result.
99#[derive(Educe)]
100#[educe(Debug, Clone)]
101pub struct PlanRoot<P: PlanPhase> {
102    // The current plan node.
103    pub plan: PlanRef<P::Convention>,
104    // The phase of the plan.
105    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
106    _phase: PhantomData<P>,
107    required_dist: RequiredDist,
108    required_order: Order,
109    out_fields: FixedBitSet,
110    out_names: Vec<String>,
111}
112
113/// `PlanPhase` is used to track the phase of the `PlanRoot`.
114/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
115/// Typical phase transformation are:
116/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
117/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
118pub trait PlanPhase {
119    type Convention: ConventionMarker;
120}
121
122macro_rules! for_all_phase {
123    () => {
124        for_all_phase! {
125            { Logical, $crate::optimizer::plan_node::Logical },
126            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
127            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
128            { Batch, $crate::optimizer::plan_node::Batch },
129            { Stream, $crate::optimizer::plan_node::Stream }
130        }
131    };
132    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
133        $(
134            paste::paste! {
135                pub struct [< PlanPhase$phase >];
136                impl PlanPhase for [< PlanPhase$phase >] {
137                    type Convention = $convention;
138                }
139                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
140            }
141        )+
142    }
143}
144
145for_all_phase!();
146
147impl LogicalPlanRoot {
148    pub fn new_with_logical_plan(
149        plan: LogicalPlanRef,
150        required_dist: RequiredDist,
151        required_order: Order,
152        out_fields: FixedBitSet,
153        out_names: Vec<String>,
154    ) -> Self {
155        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
156    }
157}
158
159impl BatchPlanRoot {
160    pub fn new_with_batch_plan(
161        plan: BatchPlanRef,
162        required_dist: RequiredDist,
163        required_order: Order,
164        out_fields: FixedBitSet,
165        out_names: Vec<String>,
166    ) -> Self {
167        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
168    }
169}
170
171impl<P: PlanPhase> PlanRoot<P> {
172    fn new_inner(
173        plan: PlanRef<P::Convention>,
174        required_dist: RequiredDist,
175        required_order: Order,
176        out_fields: FixedBitSet,
177        out_names: Vec<String>,
178    ) -> Self {
179        let input_schema = plan.schema();
180        assert_eq!(input_schema.fields().len(), out_fields.len());
181        assert_eq!(out_fields.count_ones(..), out_names.len());
182
183        Self {
184            plan,
185            _phase: PhantomData,
186            required_dist,
187            required_order,
188            out_fields,
189            out_names,
190        }
191    }
192
193    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
194        PlanRoot {
195            plan,
196            _phase: PhantomData,
197            required_dist: self.required_dist,
198            required_order: self.required_order,
199            out_fields: self.out_fields,
200            out_names: self.out_names,
201        }
202    }
203
204    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
205    /// b, ..)`.
206    ///
207    /// If the number of names does not match the number of output fields, an error is returned.
208    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
209        if out_names.len() != self.out_fields.count_ones(..) {
210            Err(ErrorCode::InvalidInputSyntax(
211                "number of column names does not match number of columns".to_owned(),
212            ))?
213        }
214        self.out_names = out_names;
215        Ok(())
216    }
217
218    /// Get the plan root's schema, only including the fields to be output.
219    pub fn schema(&self) -> Schema {
220        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
221        // as a field and always construct one on demand here to keep it in sync.
222        Schema {
223            fields: self
224                .out_fields
225                .ones()
226                .map(|i| self.plan.schema().fields()[i].clone())
227                .zip_eq_debug(&self.out_names)
228                .map(|(field, name)| Field {
229                    name: name.clone(),
230                    ..field
231                })
232                .collect(),
233        }
234    }
235}
236
237impl LogicalPlanRoot {
238    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
239    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
240    /// (`out_fields`).
241    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
242        if self.out_fields.count_ones(..) == self.out_fields.len() {
243            return self.plan;
244        }
245        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
246    }
247
248    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
249    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
250    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
251    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
252        use generic::Agg;
253        use plan_node::PlanAggCall;
254        use risingwave_common::types::ListValue;
255        use risingwave_expr::aggregate::PbAggKind;
256
257        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
258        use crate::utils::{Condition, IndexSet};
259
260        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
261            bail!("subquery must return only one column");
262        };
263        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
264        let return_type = DataType::List(input_column_type.clone().into());
265        let agg = Agg::new(
266            vec![PlanAggCall {
267                agg_type: PbAggKind::ArrayAgg.into(),
268                return_type: return_type.clone(),
269                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
270                distinct: false,
271                order_by: self.required_order.column_orders,
272                filter: Condition::true_cond(),
273                direct_args: vec![],
274            }],
275            IndexSet::empty(),
276            self.plan,
277        );
278        Ok(LogicalProject::create(
279            agg.into(),
280            vec![
281                FunctionCall::new(
282                    ExprType::Coalesce,
283                    vec![
284                        InputRef::new(0, return_type).into(),
285                        ExprImpl::literal_list(
286                            ListValue::empty(&input_column_type),
287                            input_column_type,
288                        ),
289                    ],
290                )
291                .unwrap()
292                .into(),
293            ],
294        ))
295    }
296
297    /// Apply logical optimization to the plan for stream.
298    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
299        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
300        Ok(self)
301    }
302
303    /// Apply logical optimization to the plan for batch.
304    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
305        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
306        Ok(self.into_phase(plan))
307    }
308
309    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
310        self.gen_optimized_logical_plan_for_batch()?
311            .gen_batch_plan()
312    }
313}
314
315impl BatchOptimizedLogicalPlanRoot {
316    /// Optimize and generate a singleton batch physical plan without exchange nodes.
317    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
319            return Err(ErrorCode::NotSupported(
320                "do not support temporal join for batch queries".to_owned(),
321                "please use temporal join in streaming queries".to_owned(),
322            )
323            .into());
324        }
325
326        let ctx = self.plan.ctx();
327        // Inline session timezone mainly for rewriting now()
328        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
329
330        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
331        plan = const_eval_exprs(plan)?;
332
333        if ctx.is_explain_trace() {
334            ctx.trace("Const eval exprs:");
335            ctx.trace(plan.explain_to_string());
336        }
337
338        // Convert to physical plan node
339        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
340        if ctx.is_explain_trace() {
341            ctx.trace("To Batch Plan:");
342            ctx.trace(plan.explain_to_string());
343        }
344
345        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
346            "Merge BatchProject",
347            vec![BatchProjectMergeRule::create()],
348            ApplyOrder::BottomUp,
349        ))?;
350
351        // Inline session timezone
352        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
353
354        if ctx.is_explain_trace() {
355            ctx.trace("Inline Session Timezone:");
356            ctx.trace(plan.explain_to_string());
357        }
358
359        #[cfg(debug_assertions)]
360        InputRefValidator.validate(plan.clone());
361        assert!(
362            *plan.distribution() == Distribution::Single,
363            "{}",
364            plan.explain_to_string()
365        );
366        assert!(
367            !has_batch_exchange(plan.clone()),
368            "{}",
369            plan.explain_to_string()
370        );
371
372        let ctx = plan.ctx();
373        if ctx.is_explain_trace() {
374            ctx.trace("To Batch Physical Plan:");
375            ctx.trace(plan.explain_to_string());
376        }
377
378        Ok(self.into_phase(plan))
379    }
380}
381
382impl BatchPlanRoot {
383    /// Optimize and generate a batch query plan for distributed execution.
384    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
385        self.required_dist = RequiredDist::single();
386        let mut plan = self.plan;
387
388        // Convert to distributed plan
389        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
390
391        // Add Project if the any position of `self.out_fields` is set to zero.
392        if self.out_fields.count_ones(..) != self.out_fields.len() {
393            plan =
394                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
395        }
396
397        let ctx = plan.ctx();
398        if ctx.is_explain_trace() {
399            ctx.trace("To Batch Distributed Plan:");
400            ctx.trace(plan.explain_to_string());
401        }
402        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
403            plan =
404                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
405        }
406
407        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
408        let plan = plan.optimize_by_rules(&OptimizationStage::new(
409            "Push Limit To Scan",
410            vec![BatchPushLimitToScanRule::create()],
411            ApplyOrder::BottomUp,
412        ))?;
413
414        let plan = plan.optimize_by_rules(&OptimizationStage::new(
415            "Iceberg Count Star",
416            vec![BatchIcebergCountStar::create()],
417            ApplyOrder::TopDown,
418        ))?;
419
420        // For iceberg scan, we do iceberg predicate pushdown
421        // BatchFilter -> BatchIcebergScan
422        let plan = plan.optimize_by_rules(&OptimizationStage::new(
423            "Iceberg Predicate Pushdown",
424            vec![BatchIcebergPredicatePushDownRule::create()],
425            ApplyOrder::BottomUp,
426        ))?;
427
428        Ok(plan)
429    }
430
431    /// Optimize and generate a batch query plan for local execution.
432    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
433        let mut plan = self.plan;
434
435        // Convert to local plan node
436        plan = plan.to_local_with_order_required(&self.required_order)?;
437
438        // We remark that since the `to_local_with_order_required` does not enforce single
439        // distribution, we enforce at the root if needed.
440        let insert_exchange = match plan.distribution() {
441            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
442            _ => true,
443        };
444        if insert_exchange {
445            plan =
446                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
447        }
448
449        // Add Project if the any position of `self.out_fields` is set to zero.
450        if self.out_fields.count_ones(..) != self.out_fields.len() {
451            plan =
452                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
453        }
454
455        let ctx = plan.ctx();
456        if ctx.is_explain_trace() {
457            ctx.trace("To Batch Local Plan:");
458            ctx.trace(plan.explain_to_string());
459        }
460
461        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
462        let plan = plan.optimize_by_rules(&OptimizationStage::new(
463            "Push Limit To Scan",
464            vec![BatchPushLimitToScanRule::create()],
465            ApplyOrder::BottomUp,
466        ))?;
467
468        let plan = plan.optimize_by_rules(&OptimizationStage::new(
469            "Iceberg Count Star",
470            vec![BatchIcebergCountStar::create()],
471            ApplyOrder::TopDown,
472        ))?;
473        Ok(plan)
474    }
475}
476
477impl LogicalPlanRoot {
478    /// Generate optimized stream plan
479    fn gen_optimized_stream_plan(
480        self,
481        emit_on_window_close: bool,
482        allow_snapshot_backfill: bool,
483    ) -> Result<StreamOptimizedLogicalPlanRoot> {
484        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
485            StreamScanType::SnapshotBackfill
486        } else if self.should_use_arrangement_backfill() {
487            StreamScanType::ArrangementBackfill
488        } else {
489            StreamScanType::Backfill
490        };
491        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
492    }
493
494    fn gen_optimized_stream_plan_inner(
495        self,
496        emit_on_window_close: bool,
497        stream_scan_type: StreamScanType,
498    ) -> Result<StreamOptimizedLogicalPlanRoot> {
499        let ctx = self.plan.ctx();
500        let _explain_trace = ctx.is_explain_trace();
501
502        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
503
504        let mut plan = optimized_plan
505            .plan
506            .clone()
507            .optimize_by_rules(&OptimizationStage::new(
508                "Merge StreamProject",
509                vec![StreamProjectMergeRule::create()],
510                ApplyOrder::BottomUp,
511            ))?;
512
513        if ctx
514            .session_ctx()
515            .config()
516            .streaming_separate_consecutive_join()
517        {
518            plan = plan.optimize_by_rules(&OptimizationStage::new(
519                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
520                vec![SeparateConsecutiveJoinRule::create()],
521                ApplyOrder::BottomUp,
522            ))?;
523        }
524
525        // Add Logstore for Unaligned join
526        // Apply this BEFORE delta join rule, because delta join removes
527        // the join
528        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
529            plan = plan.optimize_by_rules(&OptimizationStage::new(
530                "Add Logstore for Unaligned join",
531                vec![AddLogstoreRule::create()],
532                ApplyOrder::BottomUp,
533            ))?;
534        }
535
536        if ctx.session_ctx().config().streaming_enable_delta_join() {
537            // TODO: make it a logical optimization.
538            // Rewrite joins with index to delta join
539            plan = plan.optimize_by_rules(&OptimizationStage::new(
540                "To IndexDeltaJoin",
541                vec![IndexDeltaJoinRule::create()],
542                ApplyOrder::BottomUp,
543            ))?;
544        }
545        // Inline session timezone
546        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
547
548        if ctx.is_explain_trace() {
549            ctx.trace("Inline session timezone:");
550            ctx.trace(plan.explain_to_string());
551        }
552
553        // Const eval of exprs at the last minute
554        plan = const_eval_exprs(plan)?;
555
556        if ctx.is_explain_trace() {
557            ctx.trace("Const eval exprs:");
558            ctx.trace(plan.explain_to_string());
559        }
560
561        #[cfg(debug_assertions)]
562        InputRefValidator.validate(plan.clone());
563
564        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
565            return Err(ErrorCode::NotSupported(
566                "exist dangling temporal scan".to_owned(),
567                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
568            ).into());
569        }
570
571        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
572            return Err(ErrorCode::NotSupported(
573                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
574                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
575            ).into());
576        }
577
578        Ok(optimized_plan.into_phase(plan))
579    }
580
581    /// Generate create index or create materialize view plan.
582    fn gen_stream_plan(
583        self,
584        emit_on_window_close: bool,
585        stream_scan_type: StreamScanType,
586    ) -> Result<StreamOptimizedLogicalPlanRoot> {
587        let ctx = self.plan.ctx();
588        let explain_trace = ctx.is_explain_trace();
589
590        let plan = {
591            {
592                if !ctx
593                    .session_ctx()
594                    .config()
595                    .streaming_allow_jsonb_in_stream_key()
596                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
597                {
598                    return Err(ErrorCode::NotSupported(
599                        err,
600                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
601                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
602                    ).into());
603                }
604                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
605                let (plan, out_col_change) = {
606                    let (plan, out_col_change) = optimized_plan
607                        .plan
608                        .logical_rewrite_for_stream(&mut Default::default())?;
609                    if out_col_change.is_injective() {
610                        (plan, out_col_change)
611                    } else {
612                        let mut output_indices = (0..plan.schema().len()).collect_vec();
613                        #[allow(unused_assignments)]
614                        let (mut map, mut target_size) = out_col_change.into_parts();
615
616                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
617                        // assert_eq!(target_size, output_indices.len());
618                        target_size = plan.schema().len();
619                        let mut tar_exists = vec![false; target_size];
620                        for i in map.iter_mut().flatten() {
621                            if tar_exists[*i] {
622                                output_indices.push(*i);
623                                *i = target_size;
624                                target_size += 1;
625                            } else {
626                                tar_exists[*i] = true;
627                            }
628                        }
629                        let plan =
630                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
631                        let out_col_change = ColIndexMapping::new(map, target_size);
632                        (plan.into(), out_col_change)
633                    }
634                };
635
636                if explain_trace {
637                    ctx.trace("Logical Rewrite For Stream:");
638                    ctx.trace(plan.explain_to_string());
639                }
640
641                optimized_plan.required_dist =
642                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
643                optimized_plan.required_order = out_col_change
644                    .rewrite_required_order(&optimized_plan.required_order)
645                    .unwrap();
646                optimized_plan.out_fields =
647                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
648                let mut plan = plan.to_stream_with_dist_required(
649                    &optimized_plan.required_dist,
650                    &mut ToStreamContext::new_with_stream_scan_type(
651                        emit_on_window_close,
652                        stream_scan_type,
653                    ),
654                )?;
655                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
656                optimized_plan.into_phase(plan)
657            }
658        };
659
660        if explain_trace {
661            ctx.trace("To Stream Plan:");
662            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
663            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
664        }
665        Ok(plan)
666    }
667
668    /// Visit the plan root and compute the cardinality.
669    ///
670    /// Panics if not called on a logical plan.
671    fn compute_cardinality(&self) -> Cardinality {
672        CardinalityVisitor.visit(self.plan.clone())
673    }
674
675    /// Optimize and generate a create table plan.
676    pub fn gen_table_plan(
677        self,
678        context: OptimizerContextRef,
679        table_name: String,
680        database_id: DatabaseId,
681        schema_id: SchemaId,
682        CreateTableInfo {
683            columns,
684            pk_column_ids,
685            row_id_index,
686            watermark_descs,
687            source_catalog,
688            version,
689        }: CreateTableInfo,
690        CreateTableProps {
691            definition,
692            append_only,
693            on_conflict,
694            with_version_column,
695            webhook_info,
696            engine,
697        }: CreateTableProps,
698    ) -> Result<StreamMaterialize> {
699        // Snapshot backfill is not allowed for create table
700        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
701
702        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
703
704        let pk_column_indices = {
705            let mut id_to_idx = HashMap::new();
706
707            columns.iter().enumerate().for_each(|(idx, c)| {
708                id_to_idx.insert(c.column_id(), idx);
709            });
710            pk_column_ids
711                .iter()
712                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
713                .collect_vec()
714        };
715
716        fn inject_project_for_generated_column_if_needed(
717            columns: &[ColumnCatalog],
718            node: StreamPlanRef,
719        ) -> Result<StreamPlanRef> {
720            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
721            if let Some(exprs) = exprs {
722                let logical_project = generic::Project::new(exprs, node);
723                return Ok(StreamProject::new(logical_project).into());
724            }
725            Ok(node)
726        }
727
728        #[derive(PartialEq, Debug, Copy, Clone)]
729        enum PrimaryKeyKind {
730            UserDefinedPrimaryKey,
731            NonAppendOnlyRowIdPk,
732            AppendOnlyRowIdPk,
733        }
734
735        fn inject_dml_node(
736            columns: &[ColumnCatalog],
737            append_only: bool,
738            stream_plan: StreamPlanRef,
739            pk_column_indices: &[usize],
740            kind: PrimaryKeyKind,
741            column_descs: Vec<ColumnDesc>,
742        ) -> Result<StreamPlanRef> {
743            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
744
745            // Add generated columns.
746            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
747
748            dml_node = match kind {
749                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
750                    RequiredDist::hash_shard(pk_column_indices)
751                        .streaming_enforce_if_not_satisfies(dml_node)?
752                }
753                PrimaryKeyKind::AppendOnlyRowIdPk => {
754                    StreamExchange::new_no_shuffle(dml_node).into()
755                }
756            };
757
758            Ok(dml_node)
759        }
760
761        let kind = if let Some(row_id_index) = row_id_index {
762            assert_eq!(
763                pk_column_indices.iter().exactly_one().copied().unwrap(),
764                row_id_index
765            );
766            if append_only {
767                PrimaryKeyKind::AppendOnlyRowIdPk
768            } else {
769                PrimaryKeyKind::NonAppendOnlyRowIdPk
770            }
771        } else {
772            PrimaryKeyKind::UserDefinedPrimaryKey
773        };
774
775        let column_descs: Vec<ColumnDesc> = columns
776            .iter()
777            .filter(|&c| c.can_dml())
778            .map(|c| c.column_desc.clone())
779            .collect();
780
781        let mut not_null_idxs = vec![];
782        for (idx, column) in column_descs.iter().enumerate() {
783            if !column.nullable {
784                not_null_idxs.push(idx);
785            }
786        }
787
788        let version_column_index = if let Some(version_column) = with_version_column {
789            find_version_column_index(&columns, version_column)?
790        } else {
791            None
792        };
793
794        let with_external_source = source_catalog.is_some();
795        let union_inputs = if with_external_source {
796            let mut external_source_node = stream_plan.plan;
797            external_source_node =
798                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
799            external_source_node = match kind {
800                PrimaryKeyKind::UserDefinedPrimaryKey => {
801                    RequiredDist::hash_shard(&pk_column_indices)
802                        .streaming_enforce_if_not_satisfies(external_source_node)?
803                }
804
805                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
806                    StreamExchange::new_no_shuffle(external_source_node).into()
807                }
808            };
809
810            let dummy_source_node = LogicalSource::new(
811                None,
812                columns.clone(),
813                row_id_index,
814                SourceNodeKind::CreateTable,
815                context.clone(),
816                None,
817            )
818            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
819
820            let dml_node = inject_dml_node(
821                &columns,
822                append_only,
823                dummy_source_node,
824                &pk_column_indices,
825                kind,
826                column_descs,
827            )?;
828
829            vec![external_source_node, dml_node]
830        } else {
831            let dml_node = inject_dml_node(
832                &columns,
833                append_only,
834                stream_plan.plan,
835                &pk_column_indices,
836                kind,
837                column_descs,
838            )?;
839
840            vec![dml_node]
841        };
842
843        let dists = union_inputs
844            .iter()
845            .map(|input| input.distribution())
846            .unique()
847            .collect_vec();
848
849        let dist = match &dists[..] {
850            &[Distribution::SomeShard, Distribution::HashShard(_)]
851            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
852            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
853                dist.clone()
854            }
855            _ => {
856                unreachable!()
857            }
858        };
859
860        let mut stream_plan = StreamUnion::new_with_dist(
861            Union {
862                all: true,
863                inputs: union_inputs,
864                source_col: None,
865            },
866            dist.clone(),
867        )
868        .into();
869
870        // Add WatermarkFilter node.
871        if !watermark_descs.is_empty() {
872            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
873        }
874
875        // Add RowIDGen node if needed.
876        if let Some(row_id_index) = row_id_index {
877            match kind {
878                PrimaryKeyKind::UserDefinedPrimaryKey => {
879                    unreachable!()
880                }
881                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
882                    stream_plan = StreamRowIdGen::new_with_dist(
883                        stream_plan,
884                        row_id_index,
885                        Distribution::HashShard(vec![row_id_index]),
886                    )
887                    .into();
888                }
889            }
890        }
891
892        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
893
894        if let ConflictBehavior::IgnoreConflict = conflict_behavior
895            && version_column_index.is_some()
896        {
897            Err(ErrorCode::InvalidParameterValue(
898                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
899            ))?
900        }
901
902        let retention_seconds = context.with_options().retention_seconds();
903
904        let table_required_dist = {
905            let mut bitset = FixedBitSet::with_capacity(columns.len());
906            for idx in &pk_column_indices {
907                bitset.insert(*idx);
908            }
909            RequiredDist::ShardByKey(bitset)
910        };
911
912        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
913
914        if !not_null_idxs.is_empty() {
915            stream_plan =
916                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
917        }
918
919        // Determine if the table should be refreshable based on the connector type
920        let refreshable = source_catalog
921            .as_ref()
922            .map(|catalog| catalog.with_properties.is_batch_connector())
923            .unwrap_or(false);
924
925        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
926        if refreshable && row_id_index.is_some() {
927            return Err(crate::error::ErrorCode::BindError(
928                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
929                    .to_owned(),
930            )
931            .into());
932        }
933
934        StreamMaterialize::create_for_table(
935            stream_plan,
936            table_name,
937            database_id,
938            schema_id,
939            table_required_dist,
940            Order::any(),
941            columns,
942            definition,
943            conflict_behavior,
944            version_column_index,
945            pk_column_indices,
946            row_id_index,
947            version,
948            retention_seconds,
949            webhook_info,
950            engine,
951            refreshable,
952        )
953    }
954
955    /// Optimize and generate a create materialized view plan.
956    pub fn gen_materialize_plan(
957        self,
958        database_id: DatabaseId,
959        schema_id: SchemaId,
960        mv_name: String,
961        definition: String,
962        emit_on_window_close: bool,
963    ) -> Result<StreamMaterialize> {
964        let cardinality = self.compute_cardinality();
965        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
966        StreamMaterialize::create(
967            stream_plan,
968            mv_name,
969            database_id,
970            schema_id,
971            definition,
972            TableType::MaterializedView,
973            cardinality,
974            None,
975        )
976    }
977
978    /// Optimize and generate a create index plan.
979    pub fn gen_index_plan(
980        self,
981        index_name: String,
982        database_id: DatabaseId,
983        schema_id: SchemaId,
984        definition: String,
985        retention_seconds: Option<NonZeroU32>,
986    ) -> Result<StreamMaterialize> {
987        let cardinality = self.compute_cardinality();
988        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
989
990        StreamMaterialize::create(
991            stream_plan,
992            index_name,
993            database_id,
994            schema_id,
995            definition,
996            TableType::Index,
997            cardinality,
998            retention_seconds,
999        )
1000    }
1001
1002    /// Optimize and generate a create sink plan.
1003    #[allow(clippy::too_many_arguments)]
1004    pub fn gen_sink_plan(
1005        self,
1006        sink_name: String,
1007        definition: String,
1008        properties: WithOptionsSecResolved,
1009        emit_on_window_close: bool,
1010        db_name: String,
1011        sink_from_table_name: String,
1012        format_desc: Option<SinkFormatDesc>,
1013        without_backfill: bool,
1014        target_table: Option<Arc<TableCatalog>>,
1015        partition_info: Option<PartitionComputeInfo>,
1016        user_specified_columns: bool,
1017        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1018    ) -> Result<StreamSink> {
1019        let stream_scan_type = if without_backfill {
1020            StreamScanType::UpstreamOnly
1021        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1022            // Snapshot backfill on sink-into-table is not allowed
1023            StreamScanType::SnapshotBackfill
1024        } else if self.should_use_arrangement_backfill() {
1025            StreamScanType::ArrangementBackfill
1026        } else {
1027            StreamScanType::Backfill
1028        };
1029        if auto_refresh_schema_from_table.is_some()
1030            && stream_scan_type != StreamScanType::ArrangementBackfill
1031        {
1032            return Err(ErrorCode::InvalidInputSyntax(format!(
1033                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1034                stream_scan_type
1035            ))
1036            .into());
1037        }
1038        let stream_plan =
1039            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1040        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1041            let columns = t.columns_without_rw_timestamp();
1042            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1043        });
1044
1045        StreamSink::create(
1046            stream_plan,
1047            sink_name,
1048            db_name,
1049            sink_from_table_name,
1050            target_table,
1051            target_columns_to_plan_mapping,
1052            definition,
1053            properties,
1054            format_desc,
1055            partition_info,
1056            auto_refresh_schema_from_table,
1057        )
1058    }
1059}
1060
1061impl<P: PlanPhase> PlanRoot<P> {
1062    pub fn should_use_arrangement_backfill(&self) -> bool {
1063        let ctx = self.plan.ctx();
1064        let session_ctx = ctx.session_ctx();
1065        let arrangement_backfill_enabled = session_ctx
1066            .env()
1067            .streaming_config()
1068            .developer
1069            .enable_arrangement_backfill;
1070        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1071    }
1072
1073    pub fn should_use_snapshot_backfill(&self) -> bool {
1074        self.plan
1075            .ctx()
1076            .session_ctx()
1077            .config()
1078            .streaming_use_snapshot_backfill()
1079    }
1080
1081    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1082    pub fn target_columns_to_plan_mapping(
1083        &self,
1084        tar_cols: &[ColumnCatalog],
1085        user_specified_columns: bool,
1086    ) -> Vec<Option<usize>> {
1087        #[allow(clippy::disallowed_methods)]
1088        let visible_cols: Vec<(usize, String)> = self
1089            .out_fields
1090            .ones()
1091            .zip_eq(self.out_names.iter().cloned())
1092            .collect_vec();
1093
1094        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1095        let visible_col_idxes_by_name = visible_cols
1096            .iter()
1097            .map(|(i, name)| (name.as_ref(), *i))
1098            .collect::<BTreeMap<_, _>>();
1099
1100        tar_cols
1101            .iter()
1102            .enumerate()
1103            .filter(|(_, tar_col)| tar_col.can_dml())
1104            .map(|(tar_i, tar_col)| {
1105                if user_specified_columns {
1106                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1107                } else {
1108                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1109                }
1110            })
1111            .collect()
1112    }
1113}
1114
1115fn find_version_column_index(
1116    column_catalog: &Vec<ColumnCatalog>,
1117    version_column_name: String,
1118) -> Result<Option<usize>> {
1119    for (index, column) in column_catalog.iter().enumerate() {
1120        if column.column_desc.name == version_column_name {
1121            if let &DataType::Jsonb
1122            | &DataType::List(_)
1123            | &DataType::Struct(_)
1124            | &DataType::Bytea
1125            | &DataType::Boolean = column.data_type()
1126            {
1127                Err(ErrorCode::InvalidParameterValue(
1128                    "The specified version column data type is invalid.".to_owned(),
1129                ))?
1130            }
1131            return Ok(Some(index));
1132        }
1133    }
1134    Err(ErrorCode::InvalidParameterValue(
1135        "The specified version column name is not in the current columns.".to_owned(),
1136    ))?
1137}
1138
1139fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1140    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1141
1142    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1143    if let Some(error) = const_eval_rewriter.error {
1144        return Err(error);
1145    }
1146    Ok(plan)
1147}
1148
1149fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1150    ctx: OptimizerContextRef,
1151    plan: PlanRef<C>,
1152) -> Result<PlanRef<C>> {
1153    let mut v = TimestamptzExprFinder::default();
1154    plan.visit_exprs_recursive(&mut v);
1155    if v.has() {
1156        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1157    } else {
1158        Ok(plan)
1159    }
1160}
1161
1162fn exist_and_no_exchange_before(
1163    plan: &BatchPlanRef,
1164    is_candidate: fn(&BatchPlanRef) -> bool,
1165) -> bool {
1166    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1167        return false;
1168    }
1169    is_candidate(plan)
1170        || plan
1171            .inputs()
1172            .iter()
1173            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1174}
1175
1176/// As we always run the root stage locally, for some plan in root stage which need to execute in
1177/// compute node we insert an additional exhchange before it to avoid to include it in the root
1178/// stage.
1179///
1180/// Returns `true` if we must insert an additional exchange to ensure this.
1181fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1182    fn is_user_table(plan: &BatchPlanRef) -> bool {
1183        plan.node_type() == BatchPlanNodeType::BatchSeqScan
1184    }
1185
1186    fn is_log_table(plan: &BatchPlanRef) -> bool {
1187        plan.node_type() == BatchPlanNodeType::BatchLogSeqScan
1188    }
1189
1190    fn is_source(plan: &BatchPlanRef) -> bool {
1191        plan.node_type() == BatchPlanNodeType::BatchSource
1192            || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1193            || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1194    }
1195
1196    fn is_insert(plan: &BatchPlanRef) -> bool {
1197        plan.node_type() == BatchPlanNodeType::BatchInsert
1198    }
1199
1200    fn is_update(plan: &BatchPlanRef) -> bool {
1201        plan.node_type() == BatchPlanNodeType::BatchUpdate
1202    }
1203
1204    fn is_delete(plan: &BatchPlanRef) -> bool {
1205        plan.node_type() == BatchPlanNodeType::BatchDelete
1206    }
1207
1208    assert_eq!(plan.distribution(), &Distribution::Single);
1209    exist_and_no_exchange_before(&plan, is_user_table)
1210        || exist_and_no_exchange_before(&plan, is_source)
1211        || exist_and_no_exchange_before(&plan, is_insert)
1212        || exist_and_no_exchange_before(&plan, is_update)
1213        || exist_and_no_exchange_before(&plan, is_delete)
1214        || exist_and_no_exchange_before(&plan, is_log_table)
1215}
1216
1217/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1218/// them for the different requirement of plan node in different execute mode.
1219fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1220    fn is_user_table(plan: &BatchPlanRef) -> bool {
1221        plan.node_type() == BatchPlanNodeType::BatchSeqScan
1222    }
1223
1224    fn is_source(plan: &BatchPlanRef) -> bool {
1225        plan.node_type() == BatchPlanNodeType::BatchSource
1226            || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1227            || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1228    }
1229
1230    fn is_insert(plan: &BatchPlanRef) -> bool {
1231        plan.node_type() == BatchPlanNodeType::BatchInsert
1232    }
1233
1234    assert_eq!(plan.distribution(), &Distribution::Single);
1235    exist_and_no_exchange_before(&plan, is_user_table)
1236        || exist_and_no_exchange_before(&plan, is_source)
1237        || exist_and_no_exchange_before(&plan, is_insert)
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242    use super::*;
1243    use crate::optimizer::plan_node::LogicalValues;
1244
1245    #[tokio::test]
1246    async fn test_as_subplan() {
1247        let ctx = OptimizerContext::mock().await;
1248        let values = LogicalValues::new(
1249            vec![],
1250            Schema::new(vec![
1251                Field::with_name(DataType::Int32, "v1"),
1252                Field::with_name(DataType::Varchar, "v2"),
1253            ]),
1254            ctx,
1255        )
1256        .into();
1257        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1258        let out_names = vec!["v1".into()];
1259        let root = PlanRoot::new_with_logical_plan(
1260            values,
1261            RequiredDist::Any,
1262            Order::any(),
1263            out_fields,
1264            out_names,
1265        );
1266        let subplan = root.into_unordered_subplan();
1267        assert_eq!(
1268            subplan.schema(),
1269            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1270        );
1271    }
1272}