risingwave_frontend/optimizer/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35    ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::collections::{BTreeMap, HashMap};
46use std::marker::PhantomData;
47
48use educe::Educe;
49use fixedbitset::FixedBitSet;
50use itertools::Itertools as _;
51pub use logical_optimization::*;
52pub use optimizer_context::*;
53use plan_expr_rewriter::ConstEvalRewriter;
54use property::Order;
55use risingwave_common::bail;
56use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
57use risingwave_common::types::DataType;
58use risingwave_common::util::column_index_mapping::ColIndexMapping;
59use risingwave_common::util::iter_util::ZipEqDebug;
60use risingwave_connector::WithPropertiesExt;
61use risingwave_connector::sink::catalog::SinkFormatDesc;
62use risingwave_pb::stream_plan::StreamScanType;
63
64use self::heuristic_optimizer::ApplyOrder;
65use self::plan_node::generic::{self, PhysicalPlanRef};
66use self::plan_node::{
67    BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
68    StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
69    ToStreamContext, stream_enforce_eowc_requirement,
70};
71#[cfg(debug_assertions)]
72use self::plan_visitor::InputRefValidator;
73use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
74use self::property::{Cardinality, RequiredDist};
75use self::rule::*;
76use crate::TableCatalog;
77use crate::catalog::table_catalog::TableType;
78use crate::catalog::{DatabaseId, SchemaId};
79use crate::error::{ErrorCode, Result};
80use crate::expr::TimestamptzExprFinder;
81use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
82use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
83use crate::optimizer::plan_node::{
84    Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
85    StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
86    ToStream, VisitExprsRecursive,
87};
88use crate::optimizer::plan_visitor::{
89    LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
90};
91use crate::optimizer::property::Distribution;
92use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
93
94/// `PlanRoot` is used to describe a plan. planner will construct a `PlanRoot` with `LogicalNode`.
95/// and required distribution and order. And `PlanRoot` can generate corresponding streaming or
96/// batch plan with optimization. the required Order and Distribution columns might be more than the
97/// output columns. for example:
98/// ```sql
99///    select v1 from t order by id;
100/// ```
101/// the plan will return two columns (id, v1), and the required order column is id. the id
102/// column is required in optimization, but the final generated plan will remove the unnecessary
103/// column in the result.
104#[derive(Educe)]
105#[educe(Debug, Clone)]
106pub struct PlanRoot<P: PlanPhase> {
107    // The current plan node.
108    pub plan: PlanRef<P::Convention>,
109    // The phase of the plan.
110    #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
111    _phase: PhantomData<P>,
112    required_dist: RequiredDist,
113    required_order: Order,
114    out_fields: FixedBitSet,
115    out_names: Vec<String>,
116}
117
118/// `PlanPhase` is used to track the phase of the `PlanRoot`.
119/// Usually, it begins from `Logical` and ends with `Batch` or `Stream`, unless we want to construct a `PlanRoot` from an intermediate phase.
120/// Typical phase transformation are:
121/// - `Logical` -> `OptimizedLogicalForBatch` -> `Batch`
122/// - `Logical` -> `OptimizedLogicalForStream` -> `Stream`
123pub trait PlanPhase {
124    type Convention: ConventionMarker;
125}
126
127macro_rules! for_all_phase {
128    () => {
129        for_all_phase! {
130            { Logical, $crate::optimizer::plan_node::Logical },
131            { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
132            { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
133            { Batch, $crate::optimizer::plan_node::Batch },
134            { Stream, $crate::optimizer::plan_node::Stream }
135        }
136    };
137    ($({$phase:ident, $convention:ty}),+ $(,)?) => {
138        $(
139            paste::paste! {
140                pub struct [< PlanPhase$phase >];
141                impl PlanPhase for [< PlanPhase$phase >] {
142                    type Convention = $convention;
143                }
144                pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
145            }
146        )+
147    }
148}
149
150for_all_phase!();
151
152impl LogicalPlanRoot {
153    pub fn new_with_logical_plan(
154        plan: LogicalPlanRef,
155        required_dist: RequiredDist,
156        required_order: Order,
157        out_fields: FixedBitSet,
158        out_names: Vec<String>,
159    ) -> Self {
160        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
161    }
162}
163
164impl BatchPlanRoot {
165    pub fn new_with_batch_plan(
166        plan: BatchPlanRef,
167        required_dist: RequiredDist,
168        required_order: Order,
169        out_fields: FixedBitSet,
170        out_names: Vec<String>,
171    ) -> Self {
172        Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
173    }
174}
175
176impl<P: PlanPhase> PlanRoot<P> {
177    fn new_inner(
178        plan: PlanRef<P::Convention>,
179        required_dist: RequiredDist,
180        required_order: Order,
181        out_fields: FixedBitSet,
182        out_names: Vec<String>,
183    ) -> Self {
184        let input_schema = plan.schema();
185        assert_eq!(input_schema.fields().len(), out_fields.len());
186        assert_eq!(out_fields.count_ones(..), out_names.len());
187
188        Self {
189            plan,
190            _phase: PhantomData,
191            required_dist,
192            required_order,
193            out_fields,
194            out_names,
195        }
196    }
197
198    fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
199        PlanRoot {
200            plan,
201            _phase: PhantomData,
202            required_dist: self.required_dist,
203            required_order: self.required_order,
204            out_fields: self.out_fields,
205            out_names: self.out_names,
206        }
207    }
208
209    /// Set customized names of the output fields, used for `CREATE [MATERIALIZED VIEW | SINK] r(a,
210    /// b, ..)`.
211    ///
212    /// If the number of names does not match the number of output fields, an error is returned.
213    pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
214        if out_names.len() != self.out_fields.count_ones(..) {
215            Err(ErrorCode::InvalidInputSyntax(
216                "number of column names does not match number of columns".to_owned(),
217            ))?
218        }
219        self.out_names = out_names;
220        Ok(())
221    }
222
223    /// Get the plan root's schema, only including the fields to be output.
224    pub fn schema(&self) -> Schema {
225        // The schema can be derived from the `out_fields` and `out_names`, so we don't maintain it
226        // as a field and always construct one on demand here to keep it in sync.
227        Schema {
228            fields: self
229                .out_fields
230                .ones()
231                .map(|i| self.plan.schema().fields()[i].clone())
232                .zip_eq_debug(&self.out_names)
233                .map(|(field, name)| Field {
234                    name: name.clone(),
235                    ..field
236                })
237                .collect(),
238        }
239    }
240}
241
242impl LogicalPlanRoot {
243    /// Transform the [`PlanRoot`] back to a [`PlanRef`] suitable to be used as a subplan, for
244    /// example as insert source or subquery. This ignores Order but retains post-Order pruning
245    /// (`out_fields`).
246    pub fn into_unordered_subplan(self) -> LogicalPlanRef {
247        if self.out_fields.count_ones(..) == self.out_fields.len() {
248            return self.plan;
249        }
250        LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
251    }
252
253    /// Transform the [`PlanRoot`] wrapped in an array-construction subquery to a [`PlanRef`]
254    /// supported by `ARRAY_AGG`. Similar to the unordered version, this abstracts away internal
255    /// `self.plan` which is further modified by `self.required_order` then `self.out_fields`.
256    pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
257        use generic::Agg;
258        use plan_node::PlanAggCall;
259        use risingwave_common::types::ListValue;
260        use risingwave_expr::aggregate::PbAggKind;
261
262        use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
263        use crate::utils::{Condition, IndexSet};
264
265        let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
266            bail!("subquery must return only one column");
267        };
268        let input_column_type = self.plan.schema().fields()[select_idx].data_type();
269        let return_type = DataType::list(input_column_type.clone());
270        let agg = Agg::new(
271            vec![PlanAggCall {
272                agg_type: PbAggKind::ArrayAgg.into(),
273                return_type: return_type.clone(),
274                inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
275                distinct: false,
276                order_by: self.required_order.column_orders,
277                filter: Condition::true_cond(),
278                direct_args: vec![],
279            }],
280            IndexSet::empty(),
281            self.plan,
282        );
283        Ok(LogicalProject::create(
284            agg.into(),
285            vec![
286                FunctionCall::new(
287                    ExprType::Coalesce,
288                    vec![
289                        InputRef::new(0, return_type).into(),
290                        ExprImpl::literal_list(
291                            ListValue::empty(&input_column_type),
292                            input_column_type,
293                        ),
294                    ],
295                )
296                .unwrap()
297                .into(),
298            ],
299        ))
300    }
301
302    /// Apply logical optimization to the plan for stream.
303    pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
304        self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
305        Ok(self)
306    }
307
308    /// Apply logical optimization to the plan for batch.
309    pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
310        let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
311        Ok(self.into_phase(plan))
312    }
313
314    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
315        self.gen_optimized_logical_plan_for_batch()?
316            .gen_batch_plan()
317    }
318}
319
320impl BatchOptimizedLogicalPlanRoot {
321    /// Optimize and generate a singleton batch physical plan without exchange nodes.
322    pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
323        if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
324            return Err(ErrorCode::NotSupported(
325                "do not support temporal join for batch queries".to_owned(),
326                "please use temporal join in streaming queries".to_owned(),
327            )
328            .into());
329        }
330
331        let ctx = self.plan.ctx();
332        // Inline session timezone mainly for rewriting now()
333        let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
334
335        // Const eval of exprs at the last minute, but before `to_batch` to make functional index selection happy.
336        plan = const_eval_exprs(plan)?;
337
338        if ctx.is_explain_trace() {
339            ctx.trace("Const eval exprs:");
340            ctx.trace(plan.explain_to_string());
341        }
342
343        // Convert to physical plan node
344        let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
345        if ctx.is_explain_trace() {
346            ctx.trace("To Batch Plan:");
347            ctx.trace(plan.explain_to_string());
348        }
349
350        plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
351            "Merge BatchProject",
352            vec![BatchProjectMergeRule::create()],
353            ApplyOrder::BottomUp,
354        ))?;
355
356        // Inline session timezone
357        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
358
359        if ctx.is_explain_trace() {
360            ctx.trace("Inline Session Timezone:");
361            ctx.trace(plan.explain_to_string());
362        }
363
364        #[cfg(debug_assertions)]
365        InputRefValidator.validate(plan.clone());
366        assert_eq!(
367            *plan.distribution(),
368            Distribution::Single,
369            "{}",
370            plan.explain_to_string()
371        );
372        assert!(
373            !has_batch_exchange(plan.clone()),
374            "{}",
375            plan.explain_to_string()
376        );
377
378        let ctx = plan.ctx();
379        if ctx.is_explain_trace() {
380            ctx.trace("To Batch Physical Plan:");
381            ctx.trace(plan.explain_to_string());
382        }
383
384        Ok(self.into_phase(plan))
385    }
386}
387
388impl BatchPlanRoot {
389    /// Optimize and generate a batch query plan for distributed execution.
390    pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
391        self.required_dist = RequiredDist::single();
392        let mut plan = self.plan;
393
394        // Convert to distributed plan
395        plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
396
397        let ctx = plan.ctx();
398        if ctx.is_explain_trace() {
399            ctx.trace("To Batch Distributed Plan:");
400            ctx.trace(plan.explain_to_string());
401        }
402        if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
403            plan =
404                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
405        }
406
407        // Add Project if the any position of `self.out_fields` is set to zero.
408        if self.out_fields.count_ones(..) != self.out_fields.len() {
409            plan =
410                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
411        }
412
413        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
414        let plan = plan.optimize_by_rules(&OptimizationStage::new(
415            "Push Limit To Scan",
416            vec![BatchPushLimitToScanRule::create()],
417            ApplyOrder::BottomUp,
418        ))?;
419
420        let plan = plan.optimize_by_rules(&OptimizationStage::new(
421            "Iceberg Count Star",
422            vec![BatchIcebergCountStar::create()],
423            ApplyOrder::TopDown,
424        ))?;
425
426        // For iceberg scan, we do iceberg predicate pushdown
427        // BatchFilter -> BatchIcebergScan
428        let plan = plan.optimize_by_rules(&OptimizationStage::new(
429            "Iceberg Predicate Pushdown",
430            vec![BatchIcebergPredicatePushDownRule::create()],
431            ApplyOrder::BottomUp,
432        ))?;
433
434        Ok(plan)
435    }
436
437    /// Optimize and generate a batch query plan for local execution.
438    pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
439        let mut plan = self.plan;
440
441        // Convert to local plan node
442        plan = plan.to_local_with_order_required(&self.required_order)?;
443
444        // We remark that since the `to_local_with_order_required` does not enforce single
445        // distribution, we enforce at the root if needed.
446        let insert_exchange = match plan.distribution() {
447            Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
448            _ => true,
449        };
450        if insert_exchange {
451            plan =
452                BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
453        }
454
455        // Add Project if the any position of `self.out_fields` is set to zero.
456        if self.out_fields.count_ones(..) != self.out_fields.len() {
457            plan =
458                BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
459        }
460
461        let ctx = plan.ctx();
462        if ctx.is_explain_trace() {
463            ctx.trace("To Batch Local Plan:");
464            ctx.trace(plan.explain_to_string());
465        }
466
467        // Both two phase limit and topn could generate limit on top of the scan, so we push limit here.
468        let plan = plan.optimize_by_rules(&OptimizationStage::new(
469            "Push Limit To Scan",
470            vec![BatchPushLimitToScanRule::create()],
471            ApplyOrder::BottomUp,
472        ))?;
473
474        let plan = plan.optimize_by_rules(&OptimizationStage::new(
475            "Iceberg Count Star",
476            vec![BatchIcebergCountStar::create()],
477            ApplyOrder::TopDown,
478        ))?;
479        Ok(plan)
480    }
481}
482
483impl LogicalPlanRoot {
484    /// Generate optimized stream plan
485    fn gen_optimized_stream_plan(
486        self,
487        emit_on_window_close: bool,
488        allow_snapshot_backfill: bool,
489    ) -> Result<StreamOptimizedLogicalPlanRoot> {
490        let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
491            StreamScanType::SnapshotBackfill
492        } else if self.should_use_arrangement_backfill() {
493            StreamScanType::ArrangementBackfill
494        } else {
495            StreamScanType::Backfill
496        };
497        self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
498    }
499
500    fn gen_optimized_stream_plan_inner(
501        self,
502        emit_on_window_close: bool,
503        stream_scan_type: StreamScanType,
504    ) -> Result<StreamOptimizedLogicalPlanRoot> {
505        let ctx = self.plan.ctx();
506        let _explain_trace = ctx.is_explain_trace();
507
508        let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
509
510        let mut plan = optimized_plan
511            .plan
512            .clone()
513            .optimize_by_rules(&OptimizationStage::new(
514                "Merge StreamProject",
515                vec![StreamProjectMergeRule::create()],
516                ApplyOrder::BottomUp,
517            ))?;
518
519        if ctx
520            .session_ctx()
521            .config()
522            .streaming_separate_consecutive_join()
523        {
524            plan = plan.optimize_by_rules(&OptimizationStage::new(
525                "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
526                vec![SeparateConsecutiveJoinRule::create()],
527                ApplyOrder::BottomUp,
528            ))?;
529        }
530
531        // Add Logstore for Unaligned join
532        // Apply this BEFORE delta join rule, because delta join removes
533        // the join
534        if ctx.session_ctx().config().streaming_enable_unaligned_join() {
535            plan = plan.optimize_by_rules(&OptimizationStage::new(
536                "Add Logstore for Unaligned join",
537                vec![AddLogstoreRule::create()],
538                ApplyOrder::BottomUp,
539            ))?;
540        }
541
542        if ctx.session_ctx().config().streaming_enable_delta_join()
543            && ctx.session_ctx().config().enable_index_selection()
544        {
545            // TODO: make it a logical optimization.
546            // Rewrite joins with index to delta join
547            plan = plan.optimize_by_rules(&OptimizationStage::new(
548                "To IndexDeltaJoin",
549                vec![IndexDeltaJoinRule::create()],
550                ApplyOrder::BottomUp,
551            ))?;
552        }
553        // Inline session timezone
554        plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
555
556        if ctx.is_explain_trace() {
557            ctx.trace("Inline session timezone:");
558            ctx.trace(plan.explain_to_string());
559        }
560
561        // Const eval of exprs at the last minute
562        plan = const_eval_exprs(plan)?;
563
564        if ctx.is_explain_trace() {
565            ctx.trace("Const eval exprs:");
566            ctx.trace(plan.explain_to_string());
567        }
568
569        #[cfg(debug_assertions)]
570        InputRefValidator.validate(plan.clone());
571
572        if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
573            return Err(ErrorCode::NotSupported(
574                "exist dangling temporal scan".to_owned(),
575                "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
576            ).into());
577        }
578
579        if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
580            return Err(ErrorCode::NotSupported(
581                "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
582                "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
583            ).into());
584        }
585
586        if ctx.session_ctx().config().enable_locality_backfill()
587            && LocalityProviderCounter::count(plan.clone()) > 5
588        {
589            risingwave_common::license::Feature::LocalityBackfill.check_available()?
590        }
591
592        Ok(optimized_plan.into_phase(plan))
593    }
594
595    /// Generate create index or create materialize view plan.
596    fn gen_stream_plan(
597        self,
598        emit_on_window_close: bool,
599        stream_scan_type: StreamScanType,
600    ) -> Result<StreamOptimizedLogicalPlanRoot> {
601        let ctx = self.plan.ctx();
602        let explain_trace = ctx.is_explain_trace();
603
604        let plan = {
605            {
606                if !ctx
607                    .session_ctx()
608                    .config()
609                    .streaming_allow_jsonb_in_stream_key()
610                    && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
611                {
612                    return Err(ErrorCode::NotSupported(
613                        err,
614                        "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
615                        If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
616                    ).into());
617                }
618                let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
619                let (plan, out_col_change) = {
620                    let (plan, out_col_change) = optimized_plan
621                        .plan
622                        .logical_rewrite_for_stream(&mut Default::default())?;
623                    if out_col_change.is_injective() {
624                        (plan, out_col_change)
625                    } else {
626                        let mut output_indices = (0..plan.schema().len()).collect_vec();
627                        #[allow(unused_assignments)]
628                        let (mut map, mut target_size) = out_col_change.into_parts();
629
630                        // TODO(st1page): https://github.com/risingwavelabs/risingwave/issues/7234
631                        // assert_eq!(target_size, output_indices.len());
632                        target_size = plan.schema().len();
633                        let mut tar_exists = vec![false; target_size];
634                        for i in map.iter_mut().flatten() {
635                            if tar_exists[*i] {
636                                output_indices.push(*i);
637                                *i = target_size;
638                                target_size += 1;
639                            } else {
640                                tar_exists[*i] = true;
641                            }
642                        }
643                        let plan =
644                            LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
645                        let out_col_change = ColIndexMapping::new(map, target_size);
646                        (plan.into(), out_col_change)
647                    }
648                };
649
650                if explain_trace {
651                    ctx.trace("Logical Rewrite For Stream:");
652                    ctx.trace(plan.explain_to_string());
653                }
654
655                optimized_plan.required_dist =
656                    out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
657                optimized_plan.required_order = out_col_change
658                    .rewrite_required_order(&optimized_plan.required_order)
659                    .unwrap();
660                optimized_plan.out_fields =
661                    out_col_change.rewrite_bitset(&optimized_plan.out_fields);
662                let mut plan = plan.to_stream_with_dist_required(
663                    &optimized_plan.required_dist,
664                    &mut ToStreamContext::new_with_stream_scan_type(
665                        emit_on_window_close,
666                        stream_scan_type,
667                    ),
668                )?;
669                plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
670                optimized_plan.into_phase(plan)
671            }
672        };
673
674        if explain_trace {
675            ctx.trace("To Stream Plan:");
676            // TODO: can be `plan.plan.explain_to_string()`, but should explicitly specify the type due to some limitation of rust compiler
677            ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
678        }
679        Ok(plan)
680    }
681
682    /// Visit the plan root and compute the cardinality.
683    ///
684    /// Panics if not called on a logical plan.
685    fn compute_cardinality(&self) -> Cardinality {
686        CardinalityVisitor.visit(self.plan.clone())
687    }
688
689    /// Optimize and generate a create table plan.
690    pub fn gen_table_plan(
691        self,
692        context: OptimizerContextRef,
693        table_name: String,
694        database_id: DatabaseId,
695        schema_id: SchemaId,
696        CreateTableInfo {
697            columns,
698            pk_column_ids,
699            row_id_index,
700            watermark_descs,
701            source_catalog,
702            version,
703        }: CreateTableInfo,
704        CreateTableProps {
705            definition,
706            append_only,
707            on_conflict,
708            with_version_columns,
709            webhook_info,
710            engine,
711        }: CreateTableProps,
712    ) -> Result<StreamMaterialize> {
713        // Snapshot backfill is not allowed for create table
714        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
715
716        assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
717
718        let pk_column_indices = {
719            let mut id_to_idx = HashMap::new();
720
721            columns.iter().enumerate().for_each(|(idx, c)| {
722                id_to_idx.insert(c.column_id(), idx);
723            });
724            pk_column_ids
725                .iter()
726                .map(|c| id_to_idx.get(c).copied().unwrap()) // pk column id must exist in table columns.
727                .collect_vec()
728        };
729
730        fn inject_project_for_generated_column_if_needed(
731            columns: &[ColumnCatalog],
732            node: StreamPlanRef,
733        ) -> Result<StreamPlanRef> {
734            let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
735            if let Some(exprs) = exprs {
736                let logical_project = generic::Project::new(exprs, node);
737                return Ok(StreamProject::new(logical_project).into());
738            }
739            Ok(node)
740        }
741
742        #[derive(PartialEq, Debug, Copy, Clone)]
743        enum PrimaryKeyKind {
744            UserDefinedPrimaryKey,
745            NonAppendOnlyRowIdPk,
746            AppendOnlyRowIdPk,
747        }
748
749        fn inject_dml_node(
750            columns: &[ColumnCatalog],
751            append_only: bool,
752            stream_plan: StreamPlanRef,
753            pk_column_indices: &[usize],
754            kind: PrimaryKeyKind,
755            column_descs: Vec<ColumnDesc>,
756        ) -> Result<StreamPlanRef> {
757            let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
758
759            // Add generated columns.
760            dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
761
762            dml_node = match kind {
763                PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
764                    RequiredDist::hash_shard(pk_column_indices)
765                        .streaming_enforce_if_not_satisfies(dml_node)?
766                }
767                PrimaryKeyKind::AppendOnlyRowIdPk => {
768                    StreamExchange::new_no_shuffle(dml_node).into()
769                }
770            };
771
772            Ok(dml_node)
773        }
774
775        let kind = if let Some(row_id_index) = row_id_index {
776            assert_eq!(
777                pk_column_indices.iter().exactly_one().copied().unwrap(),
778                row_id_index
779            );
780            if append_only {
781                PrimaryKeyKind::AppendOnlyRowIdPk
782            } else {
783                PrimaryKeyKind::NonAppendOnlyRowIdPk
784            }
785        } else {
786            PrimaryKeyKind::UserDefinedPrimaryKey
787        };
788
789        let column_descs: Vec<ColumnDesc> = columns
790            .iter()
791            .filter(|&c| c.can_dml())
792            .map(|c| c.column_desc.clone())
793            .collect();
794
795        let mut not_null_idxs = vec![];
796        for (idx, column) in column_descs.iter().enumerate() {
797            if !column.nullable {
798                not_null_idxs.push(idx);
799            }
800        }
801
802        let version_column_indices = if !with_version_columns.is_empty() {
803            find_version_column_indices(&columns, with_version_columns)?
804        } else {
805            vec![]
806        };
807
808        let with_external_source = source_catalog.is_some();
809        let (dml_source_node, external_source_node) = if with_external_source {
810            let dummy_source_node = LogicalSource::new(
811                None,
812                columns.clone(),
813                row_id_index,
814                SourceNodeKind::CreateTable,
815                context.clone(),
816                None,
817            )
818            .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
819            let mut external_source_node = stream_plan.plan;
820            external_source_node =
821                inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
822            external_source_node = match kind {
823                PrimaryKeyKind::UserDefinedPrimaryKey => {
824                    RequiredDist::hash_shard(&pk_column_indices)
825                        .streaming_enforce_if_not_satisfies(external_source_node)?
826                }
827
828                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
829                    StreamExchange::new_no_shuffle(external_source_node).into()
830                }
831            };
832            (dummy_source_node, Some(external_source_node))
833        } else {
834            (stream_plan.plan, None)
835        };
836
837        let dml_node = inject_dml_node(
838            &columns,
839            append_only,
840            dml_source_node,
841            &pk_column_indices,
842            kind,
843            column_descs,
844        )?;
845
846        let dists = external_source_node
847            .iter()
848            .map(|input| input.distribution())
849            .chain([dml_node.distribution()])
850            .unique()
851            .collect_vec();
852
853        let dist = match &dists[..] {
854            &[Distribution::SomeShard, Distribution::HashShard(_)]
855            | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
856            &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
857                dist.clone()
858            }
859            _ => {
860                unreachable!()
861            }
862        };
863
864        let generated_column_exprs =
865            LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
866        let upstream_sink_union = StreamUpstreamSinkUnion::new(
867            context.clone(),
868            dml_node.schema(),
869            dml_node.stream_key(),
870            dist.clone(), // should always be the same as dist of `Union`
871            append_only,
872            row_id_index.is_none(),
873            generated_column_exprs,
874        );
875
876        let union_inputs = external_source_node
877            .into_iter()
878            .chain([dml_node, upstream_sink_union.into()])
879            .collect_vec();
880
881        let mut stream_plan = StreamUnion::new_with_dist(
882            Union {
883                all: true,
884                inputs: union_inputs,
885                source_col: None,
886            },
887            dist,
888        )
889        .into();
890
891        // Add WatermarkFilter node.
892        if !watermark_descs.is_empty() {
893            stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
894        }
895
896        // Add RowIDGen node if needed.
897        if let Some(row_id_index) = row_id_index {
898            match kind {
899                PrimaryKeyKind::UserDefinedPrimaryKey => {
900                    unreachable!()
901                }
902                PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
903                    stream_plan = StreamRowIdGen::new_with_dist(
904                        stream_plan,
905                        row_id_index,
906                        Distribution::HashShard(vec![row_id_index]),
907                    )
908                    .into();
909                }
910            }
911        }
912
913        let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
914
915        if let ConflictBehavior::IgnoreConflict = conflict_behavior
916            && !version_column_indices.is_empty()
917        {
918            Err(ErrorCode::InvalidParameterValue(
919                "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
920            ))?
921        }
922
923        let retention_seconds = context.with_options().retention_seconds();
924
925        let table_required_dist = {
926            let mut bitset = FixedBitSet::with_capacity(columns.len());
927            for idx in &pk_column_indices {
928                bitset.insert(*idx);
929            }
930            RequiredDist::ShardByKey(bitset)
931        };
932
933        let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
934
935        if !not_null_idxs.is_empty() {
936            stream_plan =
937                StreamFilter::filter_out_any_null_rows(stream_plan.clone(), &not_null_idxs);
938        }
939
940        // Determine if the table should be refreshable based on the connector type
941        let refreshable = source_catalog
942            .as_ref()
943            .map(|catalog| catalog.with_properties.is_batch_connector())
944            .unwrap_or(false);
945
946        // Validate that refreshable tables have a user-defined primary key (i.e., does not have rowid)
947        if refreshable && row_id_index.is_some() {
948            return Err(crate::error::ErrorCode::BindError(
949                "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
950                    .to_owned(),
951            )
952            .into());
953        }
954
955        StreamMaterialize::create_for_table(
956            stream_plan,
957            table_name,
958            database_id,
959            schema_id,
960            table_required_dist,
961            Order::any(),
962            columns,
963            definition,
964            conflict_behavior,
965            version_column_indices,
966            pk_column_indices,
967            row_id_index,
968            version,
969            retention_seconds,
970            webhook_info,
971            engine,
972            refreshable,
973        )
974    }
975
976    /// Optimize and generate a create materialized view plan.
977    pub fn gen_materialize_plan(
978        self,
979        database_id: DatabaseId,
980        schema_id: SchemaId,
981        mv_name: String,
982        definition: String,
983        emit_on_window_close: bool,
984    ) -> Result<StreamMaterialize> {
985        let cardinality = self.compute_cardinality();
986        let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
987        StreamMaterialize::create(
988            stream_plan,
989            mv_name,
990            database_id,
991            schema_id,
992            definition,
993            TableType::MaterializedView,
994            cardinality,
995            None,
996        )
997    }
998
999    /// Optimize and generate a create index plan.
1000    pub fn gen_index_plan(
1001        self,
1002        index_name: String,
1003        database_id: DatabaseId,
1004        schema_id: SchemaId,
1005        definition: String,
1006        retention_seconds: Option<NonZeroU32>,
1007    ) -> Result<StreamMaterialize> {
1008        let cardinality = self.compute_cardinality();
1009        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1010
1011        StreamMaterialize::create(
1012            stream_plan,
1013            index_name,
1014            database_id,
1015            schema_id,
1016            definition,
1017            TableType::Index,
1018            cardinality,
1019            retention_seconds,
1020        )
1021    }
1022
1023    pub fn gen_vector_index_plan(
1024        self,
1025        index_name: String,
1026        database_id: DatabaseId,
1027        schema_id: SchemaId,
1028        definition: String,
1029        retention_seconds: Option<NonZeroU32>,
1030        vector_index_info: PbVectorIndexInfo,
1031    ) -> Result<StreamVectorIndexWrite> {
1032        let cardinality = self.compute_cardinality();
1033        let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1034
1035        StreamVectorIndexWrite::create(
1036            stream_plan,
1037            index_name,
1038            database_id,
1039            schema_id,
1040            definition,
1041            cardinality,
1042            retention_seconds,
1043            vector_index_info,
1044        )
1045    }
1046
1047    /// Optimize and generate a create sink plan.
1048    #[allow(clippy::too_many_arguments)]
1049    pub fn gen_sink_plan(
1050        self,
1051        sink_name: String,
1052        definition: String,
1053        properties: WithOptionsSecResolved,
1054        emit_on_window_close: bool,
1055        db_name: String,
1056        sink_from_table_name: String,
1057        format_desc: Option<SinkFormatDesc>,
1058        without_backfill: bool,
1059        target_table: Option<Arc<TableCatalog>>,
1060        partition_info: Option<PartitionComputeInfo>,
1061        user_specified_columns: bool,
1062        auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1063    ) -> Result<StreamSink> {
1064        let stream_scan_type = if without_backfill {
1065            StreamScanType::UpstreamOnly
1066        } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1067            // Snapshot backfill on sink-into-table is not allowed
1068            StreamScanType::SnapshotBackfill
1069        } else if self.should_use_arrangement_backfill() {
1070            StreamScanType::ArrangementBackfill
1071        } else {
1072            StreamScanType::Backfill
1073        };
1074        if auto_refresh_schema_from_table.is_some()
1075            && stream_scan_type != StreamScanType::ArrangementBackfill
1076        {
1077            return Err(ErrorCode::InvalidInputSyntax(format!(
1078                "auto schema change only support for ArrangementBackfill, but got: {:?}",
1079                stream_scan_type
1080            ))
1081            .into());
1082        }
1083        let stream_plan =
1084            self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1085        let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1086            let columns = t.columns_without_rw_timestamp();
1087            stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1088        });
1089
1090        StreamSink::create(
1091            stream_plan,
1092            sink_name,
1093            db_name,
1094            sink_from_table_name,
1095            target_table,
1096            target_columns_to_plan_mapping,
1097            definition,
1098            properties,
1099            format_desc,
1100            partition_info,
1101            auto_refresh_schema_from_table,
1102        )
1103    }
1104}
1105
1106impl<P: PlanPhase> PlanRoot<P> {
1107    pub fn should_use_arrangement_backfill(&self) -> bool {
1108        let ctx = self.plan.ctx();
1109        let session_ctx = ctx.session_ctx();
1110        let arrangement_backfill_enabled = session_ctx
1111            .env()
1112            .streaming_config()
1113            .developer
1114            .enable_arrangement_backfill;
1115        arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1116    }
1117
1118    pub fn should_use_snapshot_backfill(&self) -> bool {
1119        self.plan
1120            .ctx()
1121            .session_ctx()
1122            .config()
1123            .streaming_use_snapshot_backfill()
1124    }
1125
1126    /// used when the plan has a target relation such as DML and sink into table, return the mapping from table's columns to the plan's schema
1127    pub fn target_columns_to_plan_mapping(
1128        &self,
1129        tar_cols: &[ColumnCatalog],
1130        user_specified_columns: bool,
1131    ) -> Vec<Option<usize>> {
1132        #[allow(clippy::disallowed_methods)]
1133        let visible_cols: Vec<(usize, String)> = self
1134            .out_fields
1135            .ones()
1136            .zip_eq(self.out_names.iter().cloned())
1137            .collect_vec();
1138
1139        let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1140        let visible_col_idxes_by_name = visible_cols
1141            .iter()
1142            .map(|(i, name)| (name.as_ref(), *i))
1143            .collect::<BTreeMap<_, _>>();
1144
1145        tar_cols
1146            .iter()
1147            .enumerate()
1148            .filter(|(_, tar_col)| tar_col.can_dml())
1149            .map(|(tar_i, tar_col)| {
1150                if user_specified_columns {
1151                    visible_col_idxes_by_name.get(tar_col.name()).cloned()
1152                } else {
1153                    (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1154                }
1155            })
1156            .collect()
1157    }
1158}
1159
1160fn find_version_column_indices(
1161    column_catalog: &Vec<ColumnCatalog>,
1162    version_column_names: Vec<String>,
1163) -> Result<Vec<usize>> {
1164    let mut indices = Vec::new();
1165    for version_column_name in version_column_names {
1166        let mut found = false;
1167        for (index, column) in column_catalog.iter().enumerate() {
1168            if column.column_desc.name == version_column_name {
1169                if let &DataType::Jsonb
1170                | &DataType::List(_)
1171                | &DataType::Struct(_)
1172                | &DataType::Bytea
1173                | &DataType::Boolean = column.data_type()
1174                {
1175                    return Err(ErrorCode::InvalidInputSyntax(format!(
1176                        "Version column {} must be of a comparable data type",
1177                        version_column_name
1178                    ))
1179                    .into());
1180                }
1181                indices.push(index);
1182                found = true;
1183                break;
1184            }
1185        }
1186        if !found {
1187            return Err(ErrorCode::InvalidInputSyntax(format!(
1188                "Version column {} not found",
1189                version_column_name
1190            ))
1191            .into());
1192        }
1193    }
1194    Ok(indices)
1195}
1196
1197fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1198    let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1199
1200    let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1201    if let Some(error) = const_eval_rewriter.error {
1202        return Err(error);
1203    }
1204    Ok(plan)
1205}
1206
1207fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1208    ctx: OptimizerContextRef,
1209    plan: PlanRef<C>,
1210) -> Result<PlanRef<C>> {
1211    let mut v = TimestamptzExprFinder::default();
1212    plan.visit_exprs_recursive(&mut v);
1213    if v.has() {
1214        Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1215    } else {
1216        Ok(plan)
1217    }
1218}
1219
1220fn exist_and_no_exchange_before(
1221    plan: &BatchPlanRef,
1222    is_candidate: fn(&BatchPlanRef) -> bool,
1223) -> bool {
1224    if plan.node_type() == BatchPlanNodeType::BatchExchange {
1225        return false;
1226    }
1227    is_candidate(plan)
1228        || plan
1229            .inputs()
1230            .iter()
1231            .any(|input| exist_and_no_exchange_before(input, is_candidate))
1232}
1233
1234impl BatchPlanRef {
1235    fn is_user_table_scan(&self) -> bool {
1236        self.node_type() == BatchPlanNodeType::BatchSeqScan
1237            || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1238            || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1239    }
1240
1241    fn is_source_scan(&self) -> bool {
1242        self.node_type() == BatchPlanNodeType::BatchSource
1243            || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1244            || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1245    }
1246
1247    fn is_insert(&self) -> bool {
1248        self.node_type() == BatchPlanNodeType::BatchInsert
1249    }
1250
1251    fn is_update(&self) -> bool {
1252        self.node_type() == BatchPlanNodeType::BatchUpdate
1253    }
1254
1255    fn is_delete(&self) -> bool {
1256        self.node_type() == BatchPlanNodeType::BatchDelete
1257    }
1258}
1259
1260/// As we always run the root stage locally, for some plan in root stage which need to execute in
1261/// compute node we insert an additional exhchange before it to avoid to include it in the root
1262/// stage.
1263///
1264/// Returns `true` if we must insert an additional exchange to ensure this.
1265fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1266    assert_eq!(plan.distribution(), &Distribution::Single);
1267    exist_and_no_exchange_before(&plan, |plan| {
1268        plan.is_user_table_scan()
1269            || plan.is_source_scan()
1270            || plan.is_insert()
1271            || plan.is_update()
1272            || plan.is_delete()
1273    })
1274}
1275
1276/// The purpose is same as `require_additional_exchange_on_root_in_distributed_mode`. We separate
1277/// them for the different requirement of plan node in different execute mode.
1278fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1279    assert_eq!(plan.distribution(), &Distribution::Single);
1280    exist_and_no_exchange_before(&plan, |plan| {
1281        plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1282    })
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287    use super::*;
1288    use crate::optimizer::plan_node::LogicalValues;
1289
1290    #[tokio::test]
1291    async fn test_as_subplan() {
1292        let ctx = OptimizerContext::mock().await;
1293        let values = LogicalValues::new(
1294            vec![],
1295            Schema::new(vec![
1296                Field::with_name(DataType::Int32, "v1"),
1297                Field::with_name(DataType::Varchar, "v2"),
1298            ]),
1299            ctx,
1300        )
1301        .into();
1302        let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1303        let out_names = vec!["v1".into()];
1304        let root = PlanRoot::new_with_logical_plan(
1305            values,
1306            RequiredDist::Any,
1307            Order::any(),
1308            out_fields,
1309            out_names,
1310        );
1311        let subplan = root.into_unordered_subplan();
1312        assert_eq!(
1313            subplan.schema(),
1314            &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1315        );
1316    }
1317}