risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34    ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35    SysTableVisitor,
36};
37
38mod logical_optimization;
39mod optimizer_context;
40pub mod plan_expr_rewriter;
41mod plan_expr_visitor;
42mod rule;
43
44use std::assert_matches::assert_matches;
45use std::collections::{BTreeMap, HashMap};
46
47use fixedbitset::FixedBitSet;
48use itertools::Itertools as _;
49pub use logical_optimization::*;
50pub use optimizer_context::*;
51use plan_expr_rewriter::ConstEvalRewriter;
52use property::Order;
53use risingwave_common::bail;
54use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
55use risingwave_common::types::DataType;
56use risingwave_common::util::column_index_mapping::ColIndexMapping;
57use risingwave_common::util::iter_util::ZipEqDebug;
58use risingwave_connector::sink::catalog::SinkFormatDesc;
59use risingwave_pb::stream_plan::StreamScanType;
60
61use self::heuristic_optimizer::ApplyOrder;
62use self::plan_node::generic::{self, PhysicalPlanRef};
63use self::plan_node::{
64    BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
65    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
66    ToStreamContext, stream_enforce_eowc_requirement,
67};
68#[cfg(debug_assertions)]
69use self::plan_visitor::InputRefValidator;
70use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
71use self::property::{Cardinality, RequiredDist};
72use self::rule::*;
73use crate::TableCatalog;
74use crate::catalog::table_catalog::TableType;
75use crate::catalog::{DatabaseId, SchemaId};
76use crate::error::{ErrorCode, Result};
77use crate::expr::TimestamptzExprFinder;
78use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
79use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
80use crate::optimizer::plan_node::{
81    BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
82    ToStream, VisitExprsRecursive,
83};
84use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
85use crate::optimizer::property::Distribution;
86use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
87
88/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
89/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
90/// batch plan with optimization. the required Order and Distribution columns might be more than the
91/// output columns. for example:
92/// ```sql
93///    select v1 from t order by id;
94/// ```
95/// the plan will return two columns (id, v1), and the required order column is id. the id
96/// column is required in optimization, but the final generated plan will remove the unnecessary
97/// column in the result.
98#[derive(Debug, Clone)]
99pub struct PlanRoot {
100    // The current plan node.
101    plan: PlanRef,
102    // The phase of the plan.
103    phase: PlanPhase,
104    required_dist: RequiredDist,
105    required_order: Order,
106    out_fields: FixedBitSet,
107    out_names: Vec<String>,
108}
109
110/// `PlanPhase` is used to track the phase of the `PlanRoot`.
111/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
112/// Typical phase transformation are:
113/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
114/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
115#[derive(Debug, Clone, PartialEq)]
116pub enum PlanPhase {
117    Logical,
118    OptimizedLogicalForBatch,
119    OptimizedLogicalForStream,
120    Batch,
121    Stream,
122}
123
124impl PlanRoot {
125    pub fn new_with_logical_plan(
126        plan: PlanRef,
127        required_dist: RequiredDist,
128        required_order: Order,
129        out_fields: FixedBitSet,
130        out_names: Vec<String>,
131    ) -> Self {
132        assert_eq!(plan.convention(), Convention::Logical);
133        Self::new_inner(
134            plan,
135            PlanPhase::Logical,
136            required_dist,
137            required_order,
138            out_fields,
139            out_names,
140        )
141    }
142
143    pub fn new_with_batch_plan(
144        plan: PlanRef,
145        required_dist: RequiredDist,
146        required_order: Order,
147        out_fields: FixedBitSet,
148        out_names: Vec<String>,
149    ) -> Self {
150        assert_eq!(plan.convention(), Convention::Batch);
151        Self::new_inner(
152            plan,
153            PlanPhase::Batch,
154            required_dist,
155            required_order,
156            out_fields,
157            out_names,
158        )
159    }
160
161    fn new_inner(
162        plan: PlanRef,
163        phase: PlanPhase,
164        required_dist: RequiredDist,
165        required_order: Order,
166        out_fields: FixedBitSet,
167        out_names: Vec<String>,
168    ) -> Self {
169        let input_schema = plan.schema();
170        assert_eq!(input_schema.fields().len(), out_fields.len());
171        assert_eq!(out_fields.count_ones(..), out_names.len());
172
173        Self {
174            plan,
175            phase,
176            required_dist,
177            required_order,
178            out_fields,
179            out_names,
180        }
181    }
182
183    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
184    /// b, ..)`.
185    ///
186    /// If the number of names does not match the number of output fields, an error is returned.
187    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
188        if out_names.len() != self.out_fields.count_ones(..) {
189            Err(ErrorCode::InvalidInputSyntax(
190                "number of column names does not match number of columns".to_owned(),
191            ))?
192        }
193        self.out_names = out_names;
194        Ok(())
195    }
196
197    pub fn set_req_dist_as_same_as_req_order(&mut self) {
198        if self.required_order.len() != 0 {
199            let dist = self
200                .required_order
201                .column_orders
202                .iter()
203                .map(|o| o.column_index)
204                .collect_vec();
205            self.required_dist = RequiredDist::hash_shard(&dist)
206        }
207    }
208
209    /// Get the plan root's schema, only including the fields to be output.
210    pub fn schema(&self) -> Schema {
211        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
212        // as a field and always construct one on demand here to keep it in sync.
213        Schema {
214            fields: self
215                .out_fields
216                .ones()
217                .map(|i| self.plan.schema().fields()[i].clone())
218                .zip_eq_debug(&self.out_names)
219                .map(|(field, name)| Field {
220                    name: name.clone(),
221                    ..field
222                })
223                .collect(),
224        }
225    }
226
227    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
228    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
229    /// (`out_fields`).
230    pub fn into_unordered_subplan(self) -> PlanRef {
231        assert_eq!(self.phase, PlanPhase::Logical);
232        assert_eq!(self.plan.convention(), Convention::Logical);
233        if self.out_fields.count_ones(..) == self.out_fields.len() {
234            return self.plan;
235        }
236        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
237    }
238
239    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
240    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
241    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
242    pub fn into_array_agg(self) -> Result<PlanRef> {
243        use generic::Agg;
244        use plan_node::PlanAggCall;
245        use risingwave_common::types::ListValue;
246        use risingwave_expr::aggregate::PbAggKind;
247
248        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
249        use crate::utils::{Condition, IndexSet};
250
251        assert_eq!(self.phase, PlanPhase::Logical);
252        assert_eq!(self.plan.convention(), Convention::Logical);
253        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
254            bail!("subquery must return only one column");
255        };
256        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
257        let return_type = DataType::List(input_column_type.clone().into());
258        let agg = Agg::new(
259            vec![PlanAggCall {
260                agg_type: PbAggKind::ArrayAgg.into(),
261                return_type: return_type.clone(),
262                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
263                distinct: false,
264                order_by: self.required_order.column_orders,
265                filter: Condition::true_cond(),
266                direct_args: vec![],
267            }],
268            IndexSet::empty(),
269            self.plan,
270        );
271        Ok(LogicalProject::create(
272            agg.into(),
273            vec![
274                FunctionCall::new(
275                    ExprType::Coalesce,
276                    vec![
277                        InputRef::new(0, return_type).into(),
278                        ExprImpl::literal_list(
279                            ListValue::empty(&input_column_type),
280                            input_column_type,
281                        ),
282                    ],
283                )
284                .unwrap()
285                .into(),
286            ],
287        ))
288    }
289
290    /// Apply logical optimization to the plan for stream.
291    pub fn gen_optimized_logical_plan_for_stream(&mut self) -> Result<PlanRef> {
292        assert_eq!(self.phase, PlanPhase::Logical);
293        assert_eq!(self.plan.convention(), Convention::Logical);
294        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
295        self.phase = PlanPhase::OptimizedLogicalForStream;
296        assert_eq!(self.plan.convention(), Convention::Logical);
297        Ok(self.plan.clone())
298    }
299
300    /// Apply logical optimization to the plan for batch.
301    pub fn gen_optimized_logical_plan_for_batch(&mut self) -> Result<PlanRef> {
302        assert_eq!(self.phase, PlanPhase::Logical);
303        assert_eq!(self.plan.convention(), Convention::Logical);
304        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
305        self.phase = PlanPhase::OptimizedLogicalForBatch;
306        assert_eq!(self.plan.convention(), Convention::Logical);
307        Ok(self.plan.clone())
308    }
309
310    /// Optimize and generate a singleton batch physical plan without exchange nodes.
311    pub fn gen_batch_plan(&mut self) -> Result<PlanRef> {
312        assert_eq!(self.plan.convention(), Convention::Logical);
313        let mut plan = match self.phase {
314            PlanPhase::Logical => {
315                // Logical optimization
316                self.gen_optimized_logical_plan_for_batch()?
317            }
318            PlanPhase::OptimizedLogicalForBatch => self.plan.clone(),
319            PlanPhase::Batch | PlanPhase::OptimizedLogicalForStream | PlanPhase::Stream => {
320                panic!("unexpected phase")
321            }
322        };
323
324        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
325            return Err(ErrorCode::NotSupported(
326                "do not support temporal join for batch queries".to_owned(),
327                "please use temporal join in streaming queries".to_owned(),
328            )
329            .into());
330        }
331
332        let ctx = plan.ctx();
333        // Inline session timezone mainly for rewriting now()
334        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
335
336        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
337        plan = const_eval_exprs(plan)?;
338
339        if ctx.is_explain_trace() {
340            ctx.trace("Const eval exprs:");
341            ctx.trace(plan.explain_to_string());
342        }
343
344        // Convert to physical plan node
345        plan = plan.to_batch_with_order_required(&self.required_order)?;
346        if ctx.is_explain_trace() {
347            ctx.trace("To Batch Plan:");
348            ctx.trace(plan.explain_to_string());
349        }
350
351        plan = plan.optimize_by_rules(&OptimizationStage::new(
352            "Merge BatchProject",
353            vec![BatchProjectMergeRule::create()],
354            ApplyOrder::BottomUp,
355        ))?;
356
357        // Inline session timezone
358        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
359
360        if ctx.is_explain_trace() {
361            ctx.trace("Inline Session Timezone:");
362            ctx.trace(plan.explain_to_string());
363        }
364
365        #[cfg(debug_assertions)]
366        InputRefValidator.validate(plan.clone());
367        assert!(
368            *plan.distribution() == Distribution::Single,
369            "{}",
370            plan.explain_to_string()
371        );
372        assert!(
373            !has_batch_exchange(plan.clone()),
374            "{}",
375            plan.explain_to_string()
376        );
377
378        let ctx = plan.ctx();
379        if ctx.is_explain_trace() {
380            ctx.trace("To Batch Physical Plan:");
381            ctx.trace(plan.explain_to_string());
382        }
383
384        self.plan = plan;
385        self.phase = PlanPhase::Batch;
386        assert_eq!(self.plan.convention(), Convention::Batch);
387        Ok(self.plan.clone())
388    }
389
390    /// Optimize and generate a batch query plan for distributed execution.
391    pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
392        assert_eq!(self.phase, PlanPhase::Batch);
393        assert_eq!(self.plan.convention(), Convention::Batch);
394        self.required_dist = RequiredDist::single();
395        let mut plan = self.plan;
396
397        // Convert to distributed plan
398        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
399
400        // Add Project if the any position of `self.out_fields` is set to zero.
401        if self.out_fields.count_ones(..) != self.out_fields.len() {
402            plan =
403                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
404        }
405
406        let ctx = plan.ctx();
407        if ctx.is_explain_trace() {
408            ctx.trace("To Batch Distributed Plan:");
409            ctx.trace(plan.explain_to_string());
410        }
411        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
412            plan =
413                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
414        }
415
416        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
417        let plan = plan.optimize_by_rules(&OptimizationStage::new(
418            "Push Limit To Scan",
419            vec![BatchPushLimitToScanRule::create()],
420            ApplyOrder::BottomUp,
421        ))?;
422
423        let plan = plan.optimize_by_rules(&OptimizationStage::new(
424            "Iceberg Count Star",
425            vec![BatchIcebergCountStar::create()],
426            ApplyOrder::TopDown,
427        ))?;
428
429        // For iceberg scan, we do iceberg predicate pushdown
430        // BatchFilter -> BatchIcebergScan
431        let plan = plan.optimize_by_rules(&OptimizationStage::new(
432            "Iceberg Predicate Pushdown",
433            vec![BatchIcebergPredicatePushDownRule::create()],
434            ApplyOrder::BottomUp,
435        ))?;
436
437        assert_eq!(plan.convention(), Convention::Batch);
438        Ok(plan)
439    }
440
441    /// Optimize and generate a batch query plan for local execution.
442    pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
443        assert_eq!(self.phase, PlanPhase::Batch);
444        assert_eq!(self.plan.convention(), Convention::Batch);
445        let mut plan = self.plan;
446
447        // Convert to local plan node
448        plan = plan.to_local_with_order_required(&self.required_order)?;
449
450        // We remark that since the `to_local_with_order_required` does not enforce single
451        // distribution, we enforce at the root if needed.
452        let insert_exchange = match plan.distribution() {
453            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
454            _ => true,
455        };
456        if insert_exchange {
457            plan =
458                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
459        }
460
461        // Add Project if the any position of `self.out_fields` is set to zero.
462        if self.out_fields.count_ones(..) != self.out_fields.len() {
463            plan =
464                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
465        }
466
467        let ctx = plan.ctx();
468        if ctx.is_explain_trace() {
469            ctx.trace("To Batch Local Plan:");
470            ctx.trace(plan.explain_to_string());
471        }
472
473        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
474        let plan = plan.optimize_by_rules(&OptimizationStage::new(
475            "Push Limit To Scan",
476            vec![BatchPushLimitToScanRule::create()],
477            ApplyOrder::BottomUp,
478        ))?;
479
480        let plan = plan.optimize_by_rules(&OptimizationStage::new(
481            "Iceberg Count Star",
482            vec![BatchIcebergCountStar::create()],
483            ApplyOrder::TopDown,
484        ))?;
485
486        assert_eq!(plan.convention(), Convention::Batch);
487        Ok(plan)
488    }
489
490    /// Generate optimized stream plan
491    fn gen_optimized_stream_plan(
492        &mut self,
493        emit_on_window_close: bool,
494        allow_snapshot_backfill: bool,
495    ) -> Result<PlanRef> {
496        assert_eq!(self.phase, PlanPhase::Logical);
497        assert_eq!(self.plan.convention(), Convention::Logical);
498        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
499            StreamScanType::SnapshotBackfill
500        } else if self.should_use_arrangement_backfill() {
501            StreamScanType::ArrangementBackfill
502        } else {
503            StreamScanType::Backfill
504        };
505        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
506    }
507
508    fn gen_optimized_stream_plan_inner(
509        &mut self,
510        emit_on_window_close: bool,
511        stream_scan_type: StreamScanType,
512    ) -> Result<PlanRef> {
513        assert_eq!(self.phase, PlanPhase::Logical);
514        assert_eq!(self.plan.convention(), Convention::Logical);
515        let ctx = self.plan.ctx();
516        let _explain_trace = ctx.is_explain_trace();
517
518        let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
519
520        plan = plan.optimize_by_rules(&OptimizationStage::new(
521            "Merge StreamProject",
522            vec![StreamProjectMergeRule::create()],
523            ApplyOrder::BottomUp,
524        ))?;
525
526        // Add Logstore for Unaligned join
527        // Apply this BEFORE delta join rule, because delta join removes
528        // the join
529        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
530            plan = plan.optimize_by_rules(&OptimizationStage::new(
531                "Add Logstore for Unaligned join",
532                vec![AddLogstoreRule::create()],
533                ApplyOrder::BottomUp,
534            ))?;
535        }
536
537        if ctx.session_ctx().config().streaming_enable_delta_join() {
538            // TODO: make it a logical optimization.
539            // Rewrite joins with index to delta join
540            plan = plan.optimize_by_rules(&OptimizationStage::new(
541                "To IndexDeltaJoin",
542                vec![IndexDeltaJoinRule::create()],
543                ApplyOrder::BottomUp,
544            ))?;
545        }
546        // Inline session timezone
547        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
548
549        if ctx.is_explain_trace() {
550            ctx.trace("Inline session timezone:");
551            ctx.trace(plan.explain_to_string());
552        }
553
554        // Const eval of exprs at the last minute
555        plan = const_eval_exprs(plan)?;
556
557        if ctx.is_explain_trace() {
558            ctx.trace("Const eval exprs:");
559            ctx.trace(plan.explain_to_string());
560        }
561
562        #[cfg(debug_assertions)]
563        InputRefValidator.validate(plan.clone());
564
565        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
566            return Err(ErrorCode::NotSupported(
567                "exist dangling temporal scan".to_owned(),
568                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
569            ).into());
570        }
571
572        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
573            return Err(ErrorCode::NotSupported(
574                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
575                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
576            ).into());
577        }
578
579        self.plan = plan;
580        self.phase = PlanPhase::Stream;
581        assert_eq!(self.plan.convention(), Convention::Stream);
582        Ok(self.plan.clone())
583    }
584
585    /// Generate create index or create materialize view plan.
586    fn gen_stream_plan(
587        &mut self,
588        emit_on_window_close: bool,
589        stream_scan_type: StreamScanType,
590    ) -> Result<PlanRef> {
591        assert_eq!(self.phase, PlanPhase::Logical);
592        assert_eq!(self.plan.convention(), Convention::Logical);
593        let ctx = self.plan.ctx();
594        let explain_trace = ctx.is_explain_trace();
595
596        let plan = match self.plan.convention() {
597            Convention::Logical => {
598                if !ctx
599                    .session_ctx()
600                    .config()
601                    .streaming_allow_jsonb_in_stream_key()
602                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
603                {
604                    return Err(ErrorCode::NotSupported(
605                        err,
606                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
607                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
608                    ).into());
609                }
610                let plan = self.gen_optimized_logical_plan_for_stream()?;
611                let (plan, out_col_change) = {
612                    let (plan, out_col_change) =
613                        plan.logical_rewrite_for_stream(&mut Default::default())?;
614                    if out_col_change.is_injective() {
615                        (plan, out_col_change)
616                    } else {
617                        let mut output_indices = (0..plan.schema().len()).collect_vec();
618                        #[allow(unused_assignments)]
619                        let (mut map, mut target_size) = out_col_change.into_parts();
620
621                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
622                        // assert_eq!(target_size, output_indices.len());
623                        target_size = plan.schema().len();
624                        let mut tar_exists = vec![false; target_size];
625                        for i in map.iter_mut().flatten() {
626                            if tar_exists[*i] {
627                                output_indices.push(*i);
628                                *i = target_size;
629                                target_size += 1;
630                            } else {
631                                tar_exists[*i] = true;
632                            }
633                        }
634                        let plan =
635                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
636                        let out_col_change = ColIndexMapping::new(map, target_size);
637                        (plan.into(), out_col_change)
638                    }
639                };
640
641                if explain_trace {
642                    ctx.trace("Logical Rewrite For Stream:");
643                    ctx.trace(plan.explain_to_string());
644                }
645
646                self.required_dist =
647                    out_col_change.rewrite_required_distribution(&self.required_dist);
648                self.required_order = out_col_change
649                    .rewrite_required_order(&self.required_order)
650                    .unwrap();
651                self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
652                let plan = plan.to_stream_with_dist_required(
653                    &self.required_dist,
654                    &mut ToStreamContext::new_with_stream_scan_type(
655                        emit_on_window_close,
656                        stream_scan_type,
657                    ),
658                )?;
659                stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)
660            }
661            _ => unreachable!(),
662        }?;
663
664        if explain_trace {
665            ctx.trace("To Stream Plan:");
666            ctx.trace(plan.explain_to_string());
667        }
668        Ok(plan)
669    }
670
671    /// Visit the plan root and compute the cardinality.
672    ///
673    /// Panics if not called on a logical plan.
674    fn compute_cardinality(&self) -> Cardinality {
675        assert_matches!(self.plan.convention(), Convention::Logical);
676        CardinalityVisitor.visit(self.plan.clone())
677    }
678
679    /// Optimize and generate a create table plan.
680    pub fn gen_table_plan(
681        mut self,
682        context: OptimizerContextRef,
683        table_name: String,
684        database_id: DatabaseId,
685        schema_id: SchemaId,
686        CreateTableInfo {
687            columns,
688            pk_column_ids,
689            row_id_index,
690            watermark_descs,
691            source_catalog,
692            version,
693        }: CreateTableInfo,
694        CreateTableProps {
695            definition,
696            append_only,
697            on_conflict,
698            with_version_column,
699            webhook_info,
700            engine,
701        }: CreateTableProps,
702    ) -> Result<StreamMaterialize> {
703        assert_eq!(self.phase, PlanPhase::Logical);
704        assert_eq!(self.plan.convention(), Convention::Logical);
705        // Snapshot backfill is not allowed for create table
706        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
707        assert_eq!(self.phase, PlanPhase::Stream);
708        assert_eq!(stream_plan.convention(), Convention::Stream);
709
710        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
711
712        let pk_column_indices = {
713            let mut id_to_idx = HashMap::new();
714
715            columns.iter().enumerate().for_each(|(idx, c)| {
716                id_to_idx.insert(c.column_id(), idx);
717            });
718            pk_column_ids
719                .iter()
720                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
721                .collect_vec()
722        };
723
724        fn inject_project_for_generated_column_if_needed(
725            columns: &[ColumnCatalog],
726            node: PlanRef,
727        ) -> Result<PlanRef> {
728            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
729            if let Some(exprs) = exprs {
730                let logical_project = generic::Project::new(exprs, node);
731                return Ok(StreamProject::new(logical_project).into());
732            }
733            Ok(node)
734        }
735
736        #[derive(PartialEq, Debug, Copy, Clone)]
737        enum PrimaryKeyKind {
738            UserDefinedPrimaryKey,
739            NonAppendOnlyRowIdPk,
740            AppendOnlyRowIdPk,
741        }
742
743        fn inject_dml_node(
744            columns: &[ColumnCatalog],
745            append_only: bool,
746            stream_plan: PlanRef,
747            pk_column_indices: &[usize],
748            kind: PrimaryKeyKind,
749            column_descs: Vec<ColumnDesc>,
750        ) -> Result<PlanRef> {
751            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
752
753            // Add generated columns.
754            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
755
756            dml_node = match kind {
757                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
758                    RequiredDist::hash_shard(pk_column_indices)
759                        .enforce_if_not_satisfies(dml_node, &Order::any())?
760                }
761                PrimaryKeyKind::AppendOnlyRowIdPk => {
762                    StreamExchange::new_no_shuffle(dml_node).into()
763                }
764            };
765
766            Ok(dml_node)
767        }
768
769        let kind = if let Some(row_id_index) = row_id_index {
770            assert_eq!(
771                pk_column_indices.iter().exactly_one().copied().unwrap(),
772                row_id_index
773            );
774            if append_only {
775                PrimaryKeyKind::AppendOnlyRowIdPk
776            } else {
777                PrimaryKeyKind::NonAppendOnlyRowIdPk
778            }
779        } else {
780            PrimaryKeyKind::UserDefinedPrimaryKey
781        };
782
783        let column_descs: Vec<ColumnDesc> = columns
784            .iter()
785            .filter(|&c| c.can_dml())
786            .map(|c| c.column_desc.clone())
787            .collect();
788
789        let mut not_null_idxs = vec![];
790        for (idx, column) in column_descs.iter().enumerate() {
791            if !column.nullable {
792                not_null_idxs.push(idx);
793            }
794        }
795
796        let version_column_index = if let Some(version_column) = with_version_column {
797            find_version_column_index(&columns, version_column)?
798        } else {
799            None
800        };
801
802        let with_external_source = source_catalog.is_some();
803        let union_inputs = if with_external_source {
804            let mut external_source_node = stream_plan;
805            external_source_node =
806                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
807            external_source_node = match kind {
808                PrimaryKeyKind::UserDefinedPrimaryKey => {
809                    RequiredDist::hash_shard(&pk_column_indices)
810                        .enforce_if_not_satisfies(external_source_node, &Order::any())?
811                }
812
813                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
814                    StreamExchange::new_no_shuffle(external_source_node).into()
815                }
816            };
817
818            let dummy_source_node = LogicalSource::new(
819                None,
820                columns.clone(),
821                row_id_index,
822                SourceNodeKind::CreateTable,
823                context.clone(),
824                None,
825            )
826            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
827
828            let dml_node = inject_dml_node(
829                &columns,
830                append_only,
831                dummy_source_node,
832                &pk_column_indices,
833                kind,
834                column_descs,
835            )?;
836
837            vec![external_source_node, dml_node]
838        } else {
839            let dml_node = inject_dml_node(
840                &columns,
841                append_only,
842                stream_plan,
843                &pk_column_indices,
844                kind,
845                column_descs,
846            )?;
847
848            vec![dml_node]
849        };
850
851        let dists = union_inputs
852            .iter()
853            .map(|input| input.distribution())
854            .unique()
855            .collect_vec();
856
857        let dist = match &dists[..] {
858            &[Distribution::SomeShard, Distribution::HashShard(_)]
859            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
860            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
861                dist.clone()
862            }
863            _ => {
864                unreachable!()
865            }
866        };
867
868        let mut stream_plan = StreamUnion::new_with_dist(
869            Union {
870                all: true,
871                inputs: union_inputs,
872                source_col: None,
873            },
874            dist.clone(),
875        )
876        .into();
877
878        // Add WatermarkFilter node.
879        if !watermark_descs.is_empty() {
880            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
881        }
882
883        // Add RowIDGen node if needed.
884        if let Some(row_id_index) = row_id_index {
885            match kind {
886                PrimaryKeyKind::UserDefinedPrimaryKey => {
887                    unreachable!()
888                }
889                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
890                    stream_plan = StreamRowIdGen::new_with_dist(
891                        stream_plan,
892                        row_id_index,
893                        Distribution::HashShard(vec![row_id_index]),
894                    )
895                    .into();
896                }
897            }
898        }
899
900        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
901
902        if let ConflictBehavior::IgnoreConflict = conflict_behavior
903            && version_column_index.is_some()
904        {
905            Err(ErrorCode::InvalidParameterValue(
906                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
907            ))?
908        }
909
910        let retention_seconds = context.with_options().retention_seconds();
911
912        let table_required_dist = {
913            let mut bitset = FixedBitSet::with_capacity(columns.len());
914            for idx in &pk_column_indices {
915                bitset.insert(*idx);
916            }
917            RequiredDist::ShardByKey(bitset)
918        };
919
920        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
921
922        if !not_null_idxs.is_empty() {
923            stream_plan =
924                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
925        }
926
927        StreamMaterialize::create_for_table(
928            stream_plan,
929            table_name,
930            database_id,
931            schema_id,
932            table_required_dist,
933            Order::any(),
934            columns,
935            definition,
936            conflict_behavior,
937            version_column_index,
938            pk_column_indices,
939            row_id_index,
940            version,
941            retention_seconds,
942            webhook_info,
943            engine,
944        )
945    }
946
947    /// Optimize and generate a create materialized view plan.
948    pub fn gen_materialize_plan(
949        mut self,
950        database_id: DatabaseId,
951        schema_id: SchemaId,
952        mv_name: String,
953        definition: String,
954        emit_on_window_close: bool,
955    ) -> Result<StreamMaterialize> {
956        let cardinality = self.compute_cardinality();
957        assert_eq!(self.phase, PlanPhase::Logical);
958        assert_eq!(self.plan.convention(), Convention::Logical);
959        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
960        assert_eq!(self.phase, PlanPhase::Stream);
961        assert_eq!(stream_plan.convention(), Convention::Stream);
962        StreamMaterialize::create(
963            stream_plan,
964            mv_name,
965            database_id,
966            schema_id,
967            self.required_dist.clone(),
968            self.required_order.clone(),
969            self.out_fields.clone(),
970            self.out_names.clone(),
971            definition,
972            TableType::MaterializedView,
973            cardinality,
974            None,
975        )
976    }
977
978    /// Optimize and generate a create index plan.
979    pub fn gen_index_plan(
980        mut self,
981        index_name: String,
982        database_id: DatabaseId,
983        schema_id: SchemaId,
984        definition: String,
985        retention_seconds: Option<NonZeroU32>,
986    ) -> Result<StreamMaterialize> {
987        let cardinality = self.compute_cardinality();
988        assert_eq!(self.phase, PlanPhase::Logical);
989        assert_eq!(self.plan.convention(), Convention::Logical);
990        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
991        assert_eq!(self.phase, PlanPhase::Stream);
992        assert_eq!(stream_plan.convention(), Convention::Stream);
993
994        StreamMaterialize::create(
995            stream_plan,
996            index_name,
997            database_id,
998            schema_id,
999            self.required_dist.clone(),
1000            self.required_order.clone(),
1001            self.out_fields.clone(),
1002            self.out_names.clone(),
1003            definition,
1004            TableType::Index,
1005            cardinality,
1006            retention_seconds,
1007        )
1008    }
1009
1010    /// Optimize and generate a create sink plan.
1011    #[allow(clippy::too_many_arguments)]
1012    pub fn gen_sink_plan(
1013        &mut self,
1014        sink_name: String,
1015        definition: String,
1016        properties: WithOptionsSecResolved,
1017        emit_on_window_close: bool,
1018        db_name: String,
1019        sink_from_table_name: String,
1020        format_desc: Option<SinkFormatDesc>,
1021        without_backfill: bool,
1022        target_table: Option<Arc<TableCatalog>>,
1023        partition_info: Option<PartitionComputeInfo>,
1024        user_specified_columns: bool,
1025    ) -> Result<StreamSink> {
1026        let stream_scan_type = if without_backfill {
1027            StreamScanType::UpstreamOnly
1028        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1029            // Snapshot backfill on sink-into-table is not allowed
1030            StreamScanType::SnapshotBackfill
1031        } else if self.should_use_arrangement_backfill() {
1032            StreamScanType::ArrangementBackfill
1033        } else {
1034            StreamScanType::Backfill
1035        };
1036        assert_eq!(self.phase, PlanPhase::Logical);
1037        assert_eq!(self.plan.convention(), Convention::Logical);
1038        let stream_plan =
1039            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1040        assert_eq!(self.phase, PlanPhase::Stream);
1041        assert_eq!(stream_plan.convention(), Convention::Stream);
1042        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1043            let columns = t.columns_without_rw_timestamp();
1044            self.target_columns_to_plan_mapping(&columns, user_specified_columns)
1045        });
1046        StreamSink::create(
1047            stream_plan,
1048            sink_name,
1049            db_name,
1050            sink_from_table_name,
1051            target_table,
1052            target_columns_to_plan_mapping,
1053            self.required_dist.clone(),
1054            self.required_order.clone(),
1055            self.out_fields.clone(),
1056            self.out_names.clone(),
1057            definition,
1058            properties,
1059            format_desc,
1060            partition_info,
1061        )
1062    }
1063
1064    pub fn should_use_arrangement_backfill(&self) -> bool {
1065        let ctx = self.plan.ctx();
1066        let session_ctx = ctx.session_ctx();
1067        let arrangement_backfill_enabled = session_ctx
1068            .env()
1069            .streaming_config()
1070            .developer
1071            .enable_arrangement_backfill;
1072        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1073    }
1074
1075    pub fn should_use_snapshot_backfill(&self) -> bool {
1076        self.plan
1077            .ctx()
1078            .session_ctx()
1079            .config()
1080            .streaming_use_snapshot_backfill()
1081    }
1082
1083    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1084    pub fn target_columns_to_plan_mapping(
1085        &self,
1086        tar_cols: &[ColumnCatalog],
1087        user_specified_columns: bool,
1088    ) -> Vec<Option<usize>> {
1089        #[allow(clippy::disallowed_methods)]
1090        let visible_cols: Vec<(usize, String)> = self
1091            .out_fields
1092            .ones()
1093            .zip_eq(self.out_names.iter().cloned())
1094            .collect_vec();
1095
1096        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1097        let visible_col_idxes_by_name = visible_cols
1098            .iter()
1099            .map(|(i, name)| (name.as_ref(), *i))
1100            .collect::<BTreeMap<_, _>>();
1101
1102        tar_cols
1103            .iter()
1104            .enumerate()
1105            .filter(|(_, tar_col)| tar_col.can_dml())
1106            .map(|(tar_i, tar_col)| {
1107                if user_specified_columns {
1108                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1109                } else {
1110                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1111                }
1112            })
1113            .collect()
1114    }
1115}
1116
1117fn find_version_column_index(
1118    column_catalog: &Vec<ColumnCatalog>,
1119    version_column_name: String,
1120) -> Result<Option<usize>> {
1121    for (index, column) in column_catalog.iter().enumerate() {
1122        if column.column_desc.name == version_column_name {
1123            if let &DataType::Jsonb
1124            | &DataType::List(_)
1125            | &DataType::Struct(_)
1126            | &DataType::Bytea
1127            | &DataType::Boolean = column.data_type()
1128            {
1129                Err(ErrorCode::InvalidParameterValue(
1130                    "The specified version column data type is invalid.".to_owned(),
1131                ))?
1132            }
1133            return Ok(Some(index));
1134        }
1135    }
1136    Err(ErrorCode::InvalidParameterValue(
1137        "The specified version column name is not in the current columns.".to_owned(),
1138    ))?
1139}
1140
1141fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1142    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1143
1144    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1145    if let Some(error) = const_eval_rewriter.error {
1146        return Err(error);
1147    }
1148    Ok(plan)
1149}
1150
1151fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1152    let mut v = TimestamptzExprFinder::default();
1153    plan.visit_exprs_recursive(&mut v);
1154    if v.has() {
1155        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1156    } else {
1157        Ok(plan)
1158    }
1159}
1160
1161fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1162    if plan.node_type() == PlanNodeType::BatchExchange {
1163        return false;
1164    }
1165    is_candidate(plan)
1166        || plan
1167            .inputs()
1168            .iter()
1169            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1170}
1171
1172/// As we always run the root stage locally, for some plan in root stage which need to execute in
1173/// compute node we insert an additional exhchange before it to avoid to include it in the root
1174/// stage.
1175///
1176/// Returns `true` if we must insert an additional exchange to ensure this.
1177fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1178    fn is_user_table(plan: &PlanRef) -> bool {
1179        plan.node_type() == PlanNodeType::BatchSeqScan
1180    }
1181
1182    fn is_log_table(plan: &PlanRef) -> bool {
1183        plan.node_type() == PlanNodeType::BatchLogSeqScan
1184    }
1185
1186    fn is_source(plan: &PlanRef) -> bool {
1187        plan.node_type() == PlanNodeType::BatchSource
1188            || plan.node_type() == PlanNodeType::BatchKafkaScan
1189            || plan.node_type() == PlanNodeType::BatchIcebergScan
1190    }
1191
1192    fn is_insert(plan: &PlanRef) -> bool {
1193        plan.node_type() == PlanNodeType::BatchInsert
1194    }
1195
1196    fn is_update(plan: &PlanRef) -> bool {
1197        plan.node_type() == PlanNodeType::BatchUpdate
1198    }
1199
1200    fn is_delete(plan: &PlanRef) -> bool {
1201        plan.node_type() == PlanNodeType::BatchDelete
1202    }
1203
1204    assert_eq!(plan.distribution(), &Distribution::Single);
1205    exist_and_no_exchange_before(&plan, is_user_table)
1206        || exist_and_no_exchange_before(&plan, is_source)
1207        || exist_and_no_exchange_before(&plan, is_insert)
1208        || exist_and_no_exchange_before(&plan, is_update)
1209        || exist_and_no_exchange_before(&plan, is_delete)
1210        || exist_and_no_exchange_before(&plan, is_log_table)
1211}
1212
1213/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1214/// them for the different requirement of plan node in different execute mode.
1215fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1216    fn is_user_table(plan: &PlanRef) -> bool {
1217        plan.node_type() == PlanNodeType::BatchSeqScan
1218    }
1219
1220    fn is_source(plan: &PlanRef) -> bool {
1221        plan.node_type() == PlanNodeType::BatchSource
1222            || plan.node_type() == PlanNodeType::BatchKafkaScan
1223            || plan.node_type() == PlanNodeType::BatchIcebergScan
1224    }
1225
1226    fn is_insert(plan: &PlanRef) -> bool {
1227        plan.node_type() == PlanNodeType::BatchInsert
1228    }
1229
1230    assert_eq!(plan.distribution(), &Distribution::Single);
1231    exist_and_no_exchange_before(&plan, is_user_table)
1232        || exist_and_no_exchange_before(&plan, is_source)
1233        || exist_and_no_exchange_before(&plan, is_insert)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239    use crate::optimizer::plan_node::LogicalValues;
1240
1241    #[tokio::test]
1242    async fn test_as_subplan() {
1243        let ctx = OptimizerContext::mock().await;
1244        let values = LogicalValues::new(
1245            vec![],
1246            Schema::new(vec![
1247                Field::with_name(DataType::Int32, "v1"),
1248                Field::with_name(DataType::Varchar, "v2"),
1249            ]),
1250            ctx,
1251        )
1252        .into();
1253        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1254        let out_names = vec!["v1".into()];
1255        let root = PlanRoot::new_with_logical_plan(
1256            values,
1257            RequiredDist::Any,
1258            Order::any(),
1259            out_fields,
1260            out_names,
1261        );
1262        let subplan = root.into_unordered_subplan();
1263        assert_eq!(
1264            subplan.schema(),
1265            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1266        );
1267    }
1268}