1use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35 SysTableVisitor,
36};
37
38mod logical_optimization;
39mod optimizer_context;
40pub mod plan_expr_rewriter;
41mod plan_expr_visitor;
42mod rule;
43
44use std::assert_matches::assert_matches;
45use std::collections::{BTreeMap, HashMap};
46
47use fixedbitset::FixedBitSet;
48use itertools::Itertools as _;
49pub use logical_optimization::*;
50pub use optimizer_context::*;
51use plan_expr_rewriter::ConstEvalRewriter;
52use property::Order;
53use risingwave_common::bail;
54use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
55use risingwave_common::types::DataType;
56use risingwave_common::util::column_index_mapping::ColIndexMapping;
57use risingwave_common::util::iter_util::ZipEqDebug;
58use risingwave_connector::sink::catalog::SinkFormatDesc;
59use risingwave_pb::stream_plan::StreamScanType;
60
61use self::heuristic_optimizer::ApplyOrder;
62use self::plan_node::generic::{self, PhysicalPlanRef};
63use self::plan_node::{
64 BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
65 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
66 ToStreamContext, stream_enforce_eowc_requirement,
67};
68#[cfg(debug_assertions)]
69use self::plan_visitor::InputRefValidator;
70use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
71use self::property::{Cardinality, RequiredDist};
72use self::rule::*;
73use crate::TableCatalog;
74use crate::catalog::table_catalog::TableType;
75use crate::catalog::{DatabaseId, SchemaId};
76use crate::error::{ErrorCode, Result};
77use crate::expr::TimestamptzExprFinder;
78use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
79use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
80use crate::optimizer::plan_node::{
81 BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
82 ToStream, VisitExprsRecursive,
83};
84use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
85use crate::optimizer::property::Distribution;
86use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
87
88#[derive(Debug, Clone)]
99pub struct PlanRoot {
100 plan: PlanRef,
102 phase: PlanPhase,
104 required_dist: RequiredDist,
105 required_order: Order,
106 out_fields: FixedBitSet,
107 out_names: Vec<String>,
108}
109
110#[derive(Debug, Clone, PartialEq)]
116pub enum PlanPhase {
117 Logical,
118 OptimizedLogicalForBatch,
119 OptimizedLogicalForStream,
120 Batch,
121 Stream,
122}
123
124impl PlanRoot {
125 pub fn new_with_logical_plan(
126 plan: PlanRef,
127 required_dist: RequiredDist,
128 required_order: Order,
129 out_fields: FixedBitSet,
130 out_names: Vec<String>,
131 ) -> Self {
132 assert_eq!(plan.convention(), Convention::Logical);
133 Self::new_inner(
134 plan,
135 PlanPhase::Logical,
136 required_dist,
137 required_order,
138 out_fields,
139 out_names,
140 )
141 }
142
143 pub fn new_with_batch_plan(
144 plan: PlanRef,
145 required_dist: RequiredDist,
146 required_order: Order,
147 out_fields: FixedBitSet,
148 out_names: Vec<String>,
149 ) -> Self {
150 assert_eq!(plan.convention(), Convention::Batch);
151 Self::new_inner(
152 plan,
153 PlanPhase::Batch,
154 required_dist,
155 required_order,
156 out_fields,
157 out_names,
158 )
159 }
160
161 fn new_inner(
162 plan: PlanRef,
163 phase: PlanPhase,
164 required_dist: RequiredDist,
165 required_order: Order,
166 out_fields: FixedBitSet,
167 out_names: Vec<String>,
168 ) -> Self {
169 let input_schema = plan.schema();
170 assert_eq!(input_schema.fields().len(), out_fields.len());
171 assert_eq!(out_fields.count_ones(..), out_names.len());
172
173 Self {
174 plan,
175 phase,
176 required_dist,
177 required_order,
178 out_fields,
179 out_names,
180 }
181 }
182
183 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
188 if out_names.len() != self.out_fields.count_ones(..) {
189 Err(ErrorCode::InvalidInputSyntax(
190 "number of column names does not match number of columns".to_owned(),
191 ))?
192 }
193 self.out_names = out_names;
194 Ok(())
195 }
196
197 pub fn set_req_dist_as_same_as_req_order(&mut self) {
198 if self.required_order.len() != 0 {
199 let dist = self
200 .required_order
201 .column_orders
202 .iter()
203 .map(|o| o.column_index)
204 .collect_vec();
205 self.required_dist = RequiredDist::hash_shard(&dist)
206 }
207 }
208
209 pub fn schema(&self) -> Schema {
211 Schema {
214 fields: self
215 .out_fields
216 .ones()
217 .map(|i| self.plan.schema().fields()[i].clone())
218 .zip_eq_debug(&self.out_names)
219 .map(|(field, name)| Field {
220 name: name.clone(),
221 ..field
222 })
223 .collect(),
224 }
225 }
226
227 pub fn into_unordered_subplan(self) -> PlanRef {
231 assert_eq!(self.phase, PlanPhase::Logical);
232 assert_eq!(self.plan.convention(), Convention::Logical);
233 if self.out_fields.count_ones(..) == self.out_fields.len() {
234 return self.plan;
235 }
236 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
237 }
238
239 pub fn into_array_agg(self) -> Result<PlanRef> {
243 use generic::Agg;
244 use plan_node::PlanAggCall;
245 use risingwave_common::types::ListValue;
246 use risingwave_expr::aggregate::PbAggKind;
247
248 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
249 use crate::utils::{Condition, IndexSet};
250
251 assert_eq!(self.phase, PlanPhase::Logical);
252 assert_eq!(self.plan.convention(), Convention::Logical);
253 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
254 bail!("subquery must return only one column");
255 };
256 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
257 let return_type = DataType::List(input_column_type.clone().into());
258 let agg = Agg::new(
259 vec![PlanAggCall {
260 agg_type: PbAggKind::ArrayAgg.into(),
261 return_type: return_type.clone(),
262 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
263 distinct: false,
264 order_by: self.required_order.column_orders,
265 filter: Condition::true_cond(),
266 direct_args: vec![],
267 }],
268 IndexSet::empty(),
269 self.plan,
270 );
271 Ok(LogicalProject::create(
272 agg.into(),
273 vec![
274 FunctionCall::new(
275 ExprType::Coalesce,
276 vec![
277 InputRef::new(0, return_type).into(),
278 ExprImpl::literal_list(
279 ListValue::empty(&input_column_type),
280 input_column_type,
281 ),
282 ],
283 )
284 .unwrap()
285 .into(),
286 ],
287 ))
288 }
289
290 pub fn gen_optimized_logical_plan_for_stream(&mut self) -> Result<PlanRef> {
292 assert_eq!(self.phase, PlanPhase::Logical);
293 assert_eq!(self.plan.convention(), Convention::Logical);
294 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
295 self.phase = PlanPhase::OptimizedLogicalForStream;
296 assert_eq!(self.plan.convention(), Convention::Logical);
297 Ok(self.plan.clone())
298 }
299
300 pub fn gen_optimized_logical_plan_for_batch(&mut self) -> Result<PlanRef> {
302 assert_eq!(self.phase, PlanPhase::Logical);
303 assert_eq!(self.plan.convention(), Convention::Logical);
304 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
305 self.phase = PlanPhase::OptimizedLogicalForBatch;
306 assert_eq!(self.plan.convention(), Convention::Logical);
307 Ok(self.plan.clone())
308 }
309
310 pub fn gen_batch_plan(&mut self) -> Result<PlanRef> {
312 assert_eq!(self.plan.convention(), Convention::Logical);
313 let mut plan = match self.phase {
314 PlanPhase::Logical => {
315 self.gen_optimized_logical_plan_for_batch()?
317 }
318 PlanPhase::OptimizedLogicalForBatch => self.plan.clone(),
319 PlanPhase::Batch | PlanPhase::OptimizedLogicalForStream | PlanPhase::Stream => {
320 panic!("unexpected phase")
321 }
322 };
323
324 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
325 return Err(ErrorCode::NotSupported(
326 "do not support temporal join for batch queries".to_owned(),
327 "please use temporal join in streaming queries".to_owned(),
328 )
329 .into());
330 }
331
332 let ctx = plan.ctx();
333 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
335
336 plan = const_eval_exprs(plan)?;
338
339 if ctx.is_explain_trace() {
340 ctx.trace("Const eval exprs:");
341 ctx.trace(plan.explain_to_string());
342 }
343
344 plan = plan.to_batch_with_order_required(&self.required_order)?;
346 if ctx.is_explain_trace() {
347 ctx.trace("To Batch Plan:");
348 ctx.trace(plan.explain_to_string());
349 }
350
351 plan = plan.optimize_by_rules(&OptimizationStage::new(
352 "Merge BatchProject",
353 vec![BatchProjectMergeRule::create()],
354 ApplyOrder::BottomUp,
355 ))?;
356
357 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
359
360 if ctx.is_explain_trace() {
361 ctx.trace("Inline Session Timezone:");
362 ctx.trace(plan.explain_to_string());
363 }
364
365 #[cfg(debug_assertions)]
366 InputRefValidator.validate(plan.clone());
367 assert!(
368 *plan.distribution() == Distribution::Single,
369 "{}",
370 plan.explain_to_string()
371 );
372 assert!(
373 !has_batch_exchange(plan.clone()),
374 "{}",
375 plan.explain_to_string()
376 );
377
378 let ctx = plan.ctx();
379 if ctx.is_explain_trace() {
380 ctx.trace("To Batch Physical Plan:");
381 ctx.trace(plan.explain_to_string());
382 }
383
384 self.plan = plan;
385 self.phase = PlanPhase::Batch;
386 assert_eq!(self.plan.convention(), Convention::Batch);
387 Ok(self.plan.clone())
388 }
389
390 pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
392 assert_eq!(self.phase, PlanPhase::Batch);
393 assert_eq!(self.plan.convention(), Convention::Batch);
394 self.required_dist = RequiredDist::single();
395 let mut plan = self.plan;
396
397 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
399
400 if self.out_fields.count_ones(..) != self.out_fields.len() {
402 plan =
403 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
404 }
405
406 let ctx = plan.ctx();
407 if ctx.is_explain_trace() {
408 ctx.trace("To Batch Distributed Plan:");
409 ctx.trace(plan.explain_to_string());
410 }
411 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
412 plan =
413 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
414 }
415
416 let plan = plan.optimize_by_rules(&OptimizationStage::new(
418 "Push Limit To Scan",
419 vec![BatchPushLimitToScanRule::create()],
420 ApplyOrder::BottomUp,
421 ))?;
422
423 let plan = plan.optimize_by_rules(&OptimizationStage::new(
424 "Iceberg Count Star",
425 vec![BatchIcebergCountStar::create()],
426 ApplyOrder::TopDown,
427 ))?;
428
429 let plan = plan.optimize_by_rules(&OptimizationStage::new(
432 "Iceberg Predicate Pushdown",
433 vec![BatchIcebergPredicatePushDownRule::create()],
434 ApplyOrder::BottomUp,
435 ))?;
436
437 assert_eq!(plan.convention(), Convention::Batch);
438 Ok(plan)
439 }
440
441 pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
443 assert_eq!(self.phase, PlanPhase::Batch);
444 assert_eq!(self.plan.convention(), Convention::Batch);
445 let mut plan = self.plan;
446
447 plan = plan.to_local_with_order_required(&self.required_order)?;
449
450 let insert_exchange = match plan.distribution() {
453 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
454 _ => true,
455 };
456 if insert_exchange {
457 plan =
458 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
459 }
460
461 if self.out_fields.count_ones(..) != self.out_fields.len() {
463 plan =
464 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
465 }
466
467 let ctx = plan.ctx();
468 if ctx.is_explain_trace() {
469 ctx.trace("To Batch Local Plan:");
470 ctx.trace(plan.explain_to_string());
471 }
472
473 let plan = plan.optimize_by_rules(&OptimizationStage::new(
475 "Push Limit To Scan",
476 vec![BatchPushLimitToScanRule::create()],
477 ApplyOrder::BottomUp,
478 ))?;
479
480 let plan = plan.optimize_by_rules(&OptimizationStage::new(
481 "Iceberg Count Star",
482 vec![BatchIcebergCountStar::create()],
483 ApplyOrder::TopDown,
484 ))?;
485
486 assert_eq!(plan.convention(), Convention::Batch);
487 Ok(plan)
488 }
489
490 fn gen_optimized_stream_plan(
492 &mut self,
493 emit_on_window_close: bool,
494 allow_snapshot_backfill: bool,
495 ) -> Result<PlanRef> {
496 assert_eq!(self.phase, PlanPhase::Logical);
497 assert_eq!(self.plan.convention(), Convention::Logical);
498 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
499 StreamScanType::SnapshotBackfill
500 } else if self.should_use_arrangement_backfill() {
501 StreamScanType::ArrangementBackfill
502 } else {
503 StreamScanType::Backfill
504 };
505 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
506 }
507
508 fn gen_optimized_stream_plan_inner(
509 &mut self,
510 emit_on_window_close: bool,
511 stream_scan_type: StreamScanType,
512 ) -> Result<PlanRef> {
513 assert_eq!(self.phase, PlanPhase::Logical);
514 assert_eq!(self.plan.convention(), Convention::Logical);
515 let ctx = self.plan.ctx();
516 let _explain_trace = ctx.is_explain_trace();
517
518 let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
519
520 plan = plan.optimize_by_rules(&OptimizationStage::new(
521 "Merge StreamProject",
522 vec![StreamProjectMergeRule::create()],
523 ApplyOrder::BottomUp,
524 ))?;
525
526 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
530 plan = plan.optimize_by_rules(&OptimizationStage::new(
531 "Add Logstore for Unaligned join",
532 vec![AddLogstoreRule::create()],
533 ApplyOrder::BottomUp,
534 ))?;
535 }
536
537 if ctx.session_ctx().config().streaming_enable_delta_join() {
538 plan = plan.optimize_by_rules(&OptimizationStage::new(
541 "To IndexDeltaJoin",
542 vec![IndexDeltaJoinRule::create()],
543 ApplyOrder::BottomUp,
544 ))?;
545 }
546 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
548
549 if ctx.is_explain_trace() {
550 ctx.trace("Inline session timezone:");
551 ctx.trace(plan.explain_to_string());
552 }
553
554 plan = const_eval_exprs(plan)?;
556
557 if ctx.is_explain_trace() {
558 ctx.trace("Const eval exprs:");
559 ctx.trace(plan.explain_to_string());
560 }
561
562 #[cfg(debug_assertions)]
563 InputRefValidator.validate(plan.clone());
564
565 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
566 return Err(ErrorCode::NotSupported(
567 "exist dangling temporal scan".to_owned(),
568 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
569 ).into());
570 }
571
572 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
573 return Err(ErrorCode::NotSupported(
574 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
575 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
576 ).into());
577 }
578
579 self.plan = plan;
580 self.phase = PlanPhase::Stream;
581 assert_eq!(self.plan.convention(), Convention::Stream);
582 Ok(self.plan.clone())
583 }
584
585 fn gen_stream_plan(
587 &mut self,
588 emit_on_window_close: bool,
589 stream_scan_type: StreamScanType,
590 ) -> Result<PlanRef> {
591 assert_eq!(self.phase, PlanPhase::Logical);
592 assert_eq!(self.plan.convention(), Convention::Logical);
593 let ctx = self.plan.ctx();
594 let explain_trace = ctx.is_explain_trace();
595
596 let plan = match self.plan.convention() {
597 Convention::Logical => {
598 if !ctx
599 .session_ctx()
600 .config()
601 .streaming_allow_jsonb_in_stream_key()
602 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
603 {
604 return Err(ErrorCode::NotSupported(
605 err,
606 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
607 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
608 ).into());
609 }
610 let plan = self.gen_optimized_logical_plan_for_stream()?;
611 let (plan, out_col_change) = {
612 let (plan, out_col_change) =
613 plan.logical_rewrite_for_stream(&mut Default::default())?;
614 if out_col_change.is_injective() {
615 (plan, out_col_change)
616 } else {
617 let mut output_indices = (0..plan.schema().len()).collect_vec();
618 #[allow(unused_assignments)]
619 let (mut map, mut target_size) = out_col_change.into_parts();
620
621 target_size = plan.schema().len();
624 let mut tar_exists = vec![false; target_size];
625 for i in map.iter_mut().flatten() {
626 if tar_exists[*i] {
627 output_indices.push(*i);
628 *i = target_size;
629 target_size += 1;
630 } else {
631 tar_exists[*i] = true;
632 }
633 }
634 let plan =
635 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
636 let out_col_change = ColIndexMapping::new(map, target_size);
637 (plan.into(), out_col_change)
638 }
639 };
640
641 if explain_trace {
642 ctx.trace("Logical Rewrite For Stream:");
643 ctx.trace(plan.explain_to_string());
644 }
645
646 self.required_dist =
647 out_col_change.rewrite_required_distribution(&self.required_dist);
648 self.required_order = out_col_change
649 .rewrite_required_order(&self.required_order)
650 .unwrap();
651 self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
652 let plan = plan.to_stream_with_dist_required(
653 &self.required_dist,
654 &mut ToStreamContext::new_with_stream_scan_type(
655 emit_on_window_close,
656 stream_scan_type,
657 ),
658 )?;
659 stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)
660 }
661 _ => unreachable!(),
662 }?;
663
664 if explain_trace {
665 ctx.trace("To Stream Plan:");
666 ctx.trace(plan.explain_to_string());
667 }
668 Ok(plan)
669 }
670
671 fn compute_cardinality(&self) -> Cardinality {
675 assert_matches!(self.plan.convention(), Convention::Logical);
676 CardinalityVisitor.visit(self.plan.clone())
677 }
678
679 pub fn gen_table_plan(
681 mut self,
682 context: OptimizerContextRef,
683 table_name: String,
684 database_id: DatabaseId,
685 schema_id: SchemaId,
686 CreateTableInfo {
687 columns,
688 pk_column_ids,
689 row_id_index,
690 watermark_descs,
691 source_catalog,
692 version,
693 }: CreateTableInfo,
694 CreateTableProps {
695 definition,
696 append_only,
697 on_conflict,
698 with_version_column,
699 webhook_info,
700 engine,
701 }: CreateTableProps,
702 ) -> Result<StreamMaterialize> {
703 assert_eq!(self.phase, PlanPhase::Logical);
704 assert_eq!(self.plan.convention(), Convention::Logical);
705 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
707 assert_eq!(self.phase, PlanPhase::Stream);
708 assert_eq!(stream_plan.convention(), Convention::Stream);
709
710 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
711
712 let pk_column_indices = {
713 let mut id_to_idx = HashMap::new();
714
715 columns.iter().enumerate().for_each(|(idx, c)| {
716 id_to_idx.insert(c.column_id(), idx);
717 });
718 pk_column_ids
719 .iter()
720 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
722 };
723
724 fn inject_project_for_generated_column_if_needed(
725 columns: &[ColumnCatalog],
726 node: PlanRef,
727 ) -> Result<PlanRef> {
728 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
729 if let Some(exprs) = exprs {
730 let logical_project = generic::Project::new(exprs, node);
731 return Ok(StreamProject::new(logical_project).into());
732 }
733 Ok(node)
734 }
735
736 #[derive(PartialEq, Debug, Copy, Clone)]
737 enum PrimaryKeyKind {
738 UserDefinedPrimaryKey,
739 NonAppendOnlyRowIdPk,
740 AppendOnlyRowIdPk,
741 }
742
743 fn inject_dml_node(
744 columns: &[ColumnCatalog],
745 append_only: bool,
746 stream_plan: PlanRef,
747 pk_column_indices: &[usize],
748 kind: PrimaryKeyKind,
749 column_descs: Vec<ColumnDesc>,
750 ) -> Result<PlanRef> {
751 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
752
753 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
755
756 dml_node = match kind {
757 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
758 RequiredDist::hash_shard(pk_column_indices)
759 .enforce_if_not_satisfies(dml_node, &Order::any())?
760 }
761 PrimaryKeyKind::AppendOnlyRowIdPk => {
762 StreamExchange::new_no_shuffle(dml_node).into()
763 }
764 };
765
766 Ok(dml_node)
767 }
768
769 let kind = if let Some(row_id_index) = row_id_index {
770 assert_eq!(
771 pk_column_indices.iter().exactly_one().copied().unwrap(),
772 row_id_index
773 );
774 if append_only {
775 PrimaryKeyKind::AppendOnlyRowIdPk
776 } else {
777 PrimaryKeyKind::NonAppendOnlyRowIdPk
778 }
779 } else {
780 PrimaryKeyKind::UserDefinedPrimaryKey
781 };
782
783 let column_descs: Vec<ColumnDesc> = columns
784 .iter()
785 .filter(|&c| c.can_dml())
786 .map(|c| c.column_desc.clone())
787 .collect();
788
789 let mut not_null_idxs = vec![];
790 for (idx, column) in column_descs.iter().enumerate() {
791 if !column.nullable {
792 not_null_idxs.push(idx);
793 }
794 }
795
796 let version_column_index = if let Some(version_column) = with_version_column {
797 find_version_column_index(&columns, version_column)?
798 } else {
799 None
800 };
801
802 let with_external_source = source_catalog.is_some();
803 let union_inputs = if with_external_source {
804 let mut external_source_node = stream_plan;
805 external_source_node =
806 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
807 external_source_node = match kind {
808 PrimaryKeyKind::UserDefinedPrimaryKey => {
809 RequiredDist::hash_shard(&pk_column_indices)
810 .enforce_if_not_satisfies(external_source_node, &Order::any())?
811 }
812
813 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
814 StreamExchange::new_no_shuffle(external_source_node).into()
815 }
816 };
817
818 let dummy_source_node = LogicalSource::new(
819 None,
820 columns.clone(),
821 row_id_index,
822 SourceNodeKind::CreateTable,
823 context.clone(),
824 None,
825 )
826 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
827
828 let dml_node = inject_dml_node(
829 &columns,
830 append_only,
831 dummy_source_node,
832 &pk_column_indices,
833 kind,
834 column_descs,
835 )?;
836
837 vec![external_source_node, dml_node]
838 } else {
839 let dml_node = inject_dml_node(
840 &columns,
841 append_only,
842 stream_plan,
843 &pk_column_indices,
844 kind,
845 column_descs,
846 )?;
847
848 vec![dml_node]
849 };
850
851 let dists = union_inputs
852 .iter()
853 .map(|input| input.distribution())
854 .unique()
855 .collect_vec();
856
857 let dist = match &dists[..] {
858 &[Distribution::SomeShard, Distribution::HashShard(_)]
859 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
860 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
861 dist.clone()
862 }
863 _ => {
864 unreachable!()
865 }
866 };
867
868 let mut stream_plan = StreamUnion::new_with_dist(
869 Union {
870 all: true,
871 inputs: union_inputs,
872 source_col: None,
873 },
874 dist.clone(),
875 )
876 .into();
877
878 if !watermark_descs.is_empty() {
880 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
881 }
882
883 if let Some(row_id_index) = row_id_index {
885 match kind {
886 PrimaryKeyKind::UserDefinedPrimaryKey => {
887 unreachable!()
888 }
889 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
890 stream_plan = StreamRowIdGen::new_with_dist(
891 stream_plan,
892 row_id_index,
893 Distribution::HashShard(vec![row_id_index]),
894 )
895 .into();
896 }
897 }
898 }
899
900 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
901
902 if let ConflictBehavior::IgnoreConflict = conflict_behavior
903 && version_column_index.is_some()
904 {
905 Err(ErrorCode::InvalidParameterValue(
906 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
907 ))?
908 }
909
910 let retention_seconds = context.with_options().retention_seconds();
911
912 let table_required_dist = {
913 let mut bitset = FixedBitSet::with_capacity(columns.len());
914 for idx in &pk_column_indices {
915 bitset.insert(*idx);
916 }
917 RequiredDist::ShardByKey(bitset)
918 };
919
920 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
921
922 if !not_null_idxs.is_empty() {
923 stream_plan =
924 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
925 }
926
927 StreamMaterialize::create_for_table(
928 stream_plan,
929 table_name,
930 database_id,
931 schema_id,
932 table_required_dist,
933 Order::any(),
934 columns,
935 definition,
936 conflict_behavior,
937 version_column_index,
938 pk_column_indices,
939 row_id_index,
940 version,
941 retention_seconds,
942 webhook_info,
943 engine,
944 )
945 }
946
947 pub fn gen_materialize_plan(
949 mut self,
950 database_id: DatabaseId,
951 schema_id: SchemaId,
952 mv_name: String,
953 definition: String,
954 emit_on_window_close: bool,
955 ) -> Result<StreamMaterialize> {
956 let cardinality = self.compute_cardinality();
957 assert_eq!(self.phase, PlanPhase::Logical);
958 assert_eq!(self.plan.convention(), Convention::Logical);
959 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
960 assert_eq!(self.phase, PlanPhase::Stream);
961 assert_eq!(stream_plan.convention(), Convention::Stream);
962 StreamMaterialize::create(
963 stream_plan,
964 mv_name,
965 database_id,
966 schema_id,
967 self.required_dist.clone(),
968 self.required_order.clone(),
969 self.out_fields.clone(),
970 self.out_names.clone(),
971 definition,
972 TableType::MaterializedView,
973 cardinality,
974 None,
975 )
976 }
977
978 pub fn gen_index_plan(
980 mut self,
981 index_name: String,
982 database_id: DatabaseId,
983 schema_id: SchemaId,
984 definition: String,
985 retention_seconds: Option<NonZeroU32>,
986 ) -> Result<StreamMaterialize> {
987 let cardinality = self.compute_cardinality();
988 assert_eq!(self.phase, PlanPhase::Logical);
989 assert_eq!(self.plan.convention(), Convention::Logical);
990 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
991 assert_eq!(self.phase, PlanPhase::Stream);
992 assert_eq!(stream_plan.convention(), Convention::Stream);
993
994 StreamMaterialize::create(
995 stream_plan,
996 index_name,
997 database_id,
998 schema_id,
999 self.required_dist.clone(),
1000 self.required_order.clone(),
1001 self.out_fields.clone(),
1002 self.out_names.clone(),
1003 definition,
1004 TableType::Index,
1005 cardinality,
1006 retention_seconds,
1007 )
1008 }
1009
1010 #[allow(clippy::too_many_arguments)]
1012 pub fn gen_sink_plan(
1013 &mut self,
1014 sink_name: String,
1015 definition: String,
1016 properties: WithOptionsSecResolved,
1017 emit_on_window_close: bool,
1018 db_name: String,
1019 sink_from_table_name: String,
1020 format_desc: Option<SinkFormatDesc>,
1021 without_backfill: bool,
1022 target_table: Option<Arc<TableCatalog>>,
1023 partition_info: Option<PartitionComputeInfo>,
1024 user_specified_columns: bool,
1025 ) -> Result<StreamSink> {
1026 let stream_scan_type = if without_backfill {
1027 StreamScanType::UpstreamOnly
1028 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1029 StreamScanType::SnapshotBackfill
1031 } else if self.should_use_arrangement_backfill() {
1032 StreamScanType::ArrangementBackfill
1033 } else {
1034 StreamScanType::Backfill
1035 };
1036 assert_eq!(self.phase, PlanPhase::Logical);
1037 assert_eq!(self.plan.convention(), Convention::Logical);
1038 let stream_plan =
1039 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1040 assert_eq!(self.phase, PlanPhase::Stream);
1041 assert_eq!(stream_plan.convention(), Convention::Stream);
1042 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1043 let columns = t.columns_without_rw_timestamp();
1044 self.target_columns_to_plan_mapping(&columns, user_specified_columns)
1045 });
1046 StreamSink::create(
1047 stream_plan,
1048 sink_name,
1049 db_name,
1050 sink_from_table_name,
1051 target_table,
1052 target_columns_to_plan_mapping,
1053 self.required_dist.clone(),
1054 self.required_order.clone(),
1055 self.out_fields.clone(),
1056 self.out_names.clone(),
1057 definition,
1058 properties,
1059 format_desc,
1060 partition_info,
1061 )
1062 }
1063
1064 pub fn should_use_arrangement_backfill(&self) -> bool {
1065 let ctx = self.plan.ctx();
1066 let session_ctx = ctx.session_ctx();
1067 let arrangement_backfill_enabled = session_ctx
1068 .env()
1069 .streaming_config()
1070 .developer
1071 .enable_arrangement_backfill;
1072 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1073 }
1074
1075 pub fn should_use_snapshot_backfill(&self) -> bool {
1076 self.plan
1077 .ctx()
1078 .session_ctx()
1079 .config()
1080 .streaming_use_snapshot_backfill()
1081 }
1082
1083 pub fn target_columns_to_plan_mapping(
1085 &self,
1086 tar_cols: &[ColumnCatalog],
1087 user_specified_columns: bool,
1088 ) -> Vec<Option<usize>> {
1089 #[allow(clippy::disallowed_methods)]
1090 let visible_cols: Vec<(usize, String)> = self
1091 .out_fields
1092 .ones()
1093 .zip_eq(self.out_names.iter().cloned())
1094 .collect_vec();
1095
1096 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1097 let visible_col_idxes_by_name = visible_cols
1098 .iter()
1099 .map(|(i, name)| (name.as_ref(), *i))
1100 .collect::<BTreeMap<_, _>>();
1101
1102 tar_cols
1103 .iter()
1104 .enumerate()
1105 .filter(|(_, tar_col)| tar_col.can_dml())
1106 .map(|(tar_i, tar_col)| {
1107 if user_specified_columns {
1108 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1109 } else {
1110 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1111 }
1112 })
1113 .collect()
1114 }
1115}
1116
1117fn find_version_column_index(
1118 column_catalog: &Vec<ColumnCatalog>,
1119 version_column_name: String,
1120) -> Result<Option<usize>> {
1121 for (index, column) in column_catalog.iter().enumerate() {
1122 if column.column_desc.name == version_column_name {
1123 if let &DataType::Jsonb
1124 | &DataType::List(_)
1125 | &DataType::Struct(_)
1126 | &DataType::Bytea
1127 | &DataType::Boolean = column.data_type()
1128 {
1129 Err(ErrorCode::InvalidParameterValue(
1130 "The specified version column data type is invalid.".to_owned(),
1131 ))?
1132 }
1133 return Ok(Some(index));
1134 }
1135 }
1136 Err(ErrorCode::InvalidParameterValue(
1137 "The specified version column name is not in the current columns.".to_owned(),
1138 ))?
1139}
1140
1141fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1142 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1143
1144 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1145 if let Some(error) = const_eval_rewriter.error {
1146 return Err(error);
1147 }
1148 Ok(plan)
1149}
1150
1151fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1152 let mut v = TimestamptzExprFinder::default();
1153 plan.visit_exprs_recursive(&mut v);
1154 if v.has() {
1155 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1156 } else {
1157 Ok(plan)
1158 }
1159}
1160
1161fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1162 if plan.node_type() == PlanNodeType::BatchExchange {
1163 return false;
1164 }
1165 is_candidate(plan)
1166 || plan
1167 .inputs()
1168 .iter()
1169 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1170}
1171
1172fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1178 fn is_user_table(plan: &PlanRef) -> bool {
1179 plan.node_type() == PlanNodeType::BatchSeqScan
1180 }
1181
1182 fn is_log_table(plan: &PlanRef) -> bool {
1183 plan.node_type() == PlanNodeType::BatchLogSeqScan
1184 }
1185
1186 fn is_source(plan: &PlanRef) -> bool {
1187 plan.node_type() == PlanNodeType::BatchSource
1188 || plan.node_type() == PlanNodeType::BatchKafkaScan
1189 || plan.node_type() == PlanNodeType::BatchIcebergScan
1190 }
1191
1192 fn is_insert(plan: &PlanRef) -> bool {
1193 plan.node_type() == PlanNodeType::BatchInsert
1194 }
1195
1196 fn is_update(plan: &PlanRef) -> bool {
1197 plan.node_type() == PlanNodeType::BatchUpdate
1198 }
1199
1200 fn is_delete(plan: &PlanRef) -> bool {
1201 plan.node_type() == PlanNodeType::BatchDelete
1202 }
1203
1204 assert_eq!(plan.distribution(), &Distribution::Single);
1205 exist_and_no_exchange_before(&plan, is_user_table)
1206 || exist_and_no_exchange_before(&plan, is_source)
1207 || exist_and_no_exchange_before(&plan, is_insert)
1208 || exist_and_no_exchange_before(&plan, is_update)
1209 || exist_and_no_exchange_before(&plan, is_delete)
1210 || exist_and_no_exchange_before(&plan, is_log_table)
1211}
1212
1213fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1216 fn is_user_table(plan: &PlanRef) -> bool {
1217 plan.node_type() == PlanNodeType::BatchSeqScan
1218 }
1219
1220 fn is_source(plan: &PlanRef) -> bool {
1221 plan.node_type() == PlanNodeType::BatchSource
1222 || plan.node_type() == PlanNodeType::BatchKafkaScan
1223 || plan.node_type() == PlanNodeType::BatchIcebergScan
1224 }
1225
1226 fn is_insert(plan: &PlanRef) -> bool {
1227 plan.node_type() == PlanNodeType::BatchInsert
1228 }
1229
1230 assert_eq!(plan.distribution(), &Distribution::Single);
1231 exist_and_no_exchange_before(&plan, is_user_table)
1232 || exist_and_no_exchange_before(&plan, is_source)
1233 || exist_and_no_exchange_before(&plan, is_insert)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239 use crate::optimizer::plan_node::LogicalValues;
1240
1241 #[tokio::test]
1242 async fn test_as_subplan() {
1243 let ctx = OptimizerContext::mock().await;
1244 let values = LogicalValues::new(
1245 vec![],
1246 Schema::new(vec![
1247 Field::with_name(DataType::Int32, "v1"),
1248 Field::with_name(DataType::Varchar, "v2"),
1249 ]),
1250 ctx,
1251 )
1252 .into();
1253 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1254 let out_names = vec!["v1".into()];
1255 let root = PlanRoot::new_with_logical_plan(
1256 values,
1257 RequiredDist::Any,
1258 Order::any(),
1259 out_fields,
1260 out_names,
1261 );
1262 let subplan = root.into_unordered_subplan();
1263 assert_eq!(
1264 subplan.schema(),
1265 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1266 );
1267 }
1268}