1use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35 SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::assert_matches::assert_matches;
46use std::collections::{BTreeMap, HashMap};
47
48use fixedbitset::FixedBitSet;
49use itertools::Itertools as _;
50pub use logical_optimization::*;
51pub use optimizer_context::*;
52use plan_expr_rewriter::ConstEvalRewriter;
53use property::Order;
54use risingwave_common::bail;
55use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
56use risingwave_common::types::DataType;
57use risingwave_common::util::column_index_mapping::ColIndexMapping;
58use risingwave_common::util::iter_util::ZipEqDebug;
59use risingwave_connector::sink::catalog::SinkFormatDesc;
60use risingwave_pb::stream_plan::StreamScanType;
61
62use self::heuristic_optimizer::ApplyOrder;
63use self::plan_node::generic::{self, PhysicalPlanRef};
64use self::plan_node::{
65 BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
66 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
67 ToStreamContext, stream_enforce_eowc_requirement,
68};
69#[cfg(debug_assertions)]
70use self::plan_visitor::InputRefValidator;
71use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
72use self::property::{Cardinality, RequiredDist};
73use self::rule::*;
74use crate::TableCatalog;
75use crate::catalog::table_catalog::TableType;
76use crate::catalog::{DatabaseId, SchemaId};
77use crate::error::{ErrorCode, Result};
78use crate::expr::TimestamptzExprFinder;
79use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
80use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
81use crate::optimizer::plan_node::{
82 BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
83 ToStream, VisitExprsRecursive,
84};
85use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
86use crate::optimizer::property::Distribution;
87use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
88
89#[derive(Debug, Clone)]
100pub struct PlanRoot {
101 plan: PlanRef,
103 phase: PlanPhase,
105 required_dist: RequiredDist,
106 required_order: Order,
107 out_fields: FixedBitSet,
108 out_names: Vec<String>,
109}
110
111#[derive(Debug, Clone, PartialEq)]
117pub enum PlanPhase {
118 Logical,
119 OptimizedLogicalForBatch,
120 OptimizedLogicalForStream,
121 Batch,
122 Stream,
123}
124
125impl PlanRoot {
126 pub fn new_with_logical_plan(
127 plan: PlanRef,
128 required_dist: RequiredDist,
129 required_order: Order,
130 out_fields: FixedBitSet,
131 out_names: Vec<String>,
132 ) -> Self {
133 assert_eq!(plan.convention(), Convention::Logical);
134 Self::new_inner(
135 plan,
136 PlanPhase::Logical,
137 required_dist,
138 required_order,
139 out_fields,
140 out_names,
141 )
142 }
143
144 pub fn new_with_batch_plan(
145 plan: PlanRef,
146 required_dist: RequiredDist,
147 required_order: Order,
148 out_fields: FixedBitSet,
149 out_names: Vec<String>,
150 ) -> Self {
151 assert_eq!(plan.convention(), Convention::Batch);
152 Self::new_inner(
153 plan,
154 PlanPhase::Batch,
155 required_dist,
156 required_order,
157 out_fields,
158 out_names,
159 )
160 }
161
162 fn new_inner(
163 plan: PlanRef,
164 phase: PlanPhase,
165 required_dist: RequiredDist,
166 required_order: Order,
167 out_fields: FixedBitSet,
168 out_names: Vec<String>,
169 ) -> Self {
170 let input_schema = plan.schema();
171 assert_eq!(input_schema.fields().len(), out_fields.len());
172 assert_eq!(out_fields.count_ones(..), out_names.len());
173
174 Self {
175 plan,
176 phase,
177 required_dist,
178 required_order,
179 out_fields,
180 out_names,
181 }
182 }
183
184 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
189 if out_names.len() != self.out_fields.count_ones(..) {
190 Err(ErrorCode::InvalidInputSyntax(
191 "number of column names does not match number of columns".to_owned(),
192 ))?
193 }
194 self.out_names = out_names;
195 Ok(())
196 }
197
198 pub fn schema(&self) -> Schema {
200 Schema {
203 fields: self
204 .out_fields
205 .ones()
206 .map(|i| self.plan.schema().fields()[i].clone())
207 .zip_eq_debug(&self.out_names)
208 .map(|(field, name)| Field {
209 name: name.clone(),
210 ..field
211 })
212 .collect(),
213 }
214 }
215
216 pub fn into_unordered_subplan(self) -> PlanRef {
220 assert_eq!(self.phase, PlanPhase::Logical);
221 assert_eq!(self.plan.convention(), Convention::Logical);
222 if self.out_fields.count_ones(..) == self.out_fields.len() {
223 return self.plan;
224 }
225 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
226 }
227
228 pub fn into_array_agg(self) -> Result<PlanRef> {
232 use generic::Agg;
233 use plan_node::PlanAggCall;
234 use risingwave_common::types::ListValue;
235 use risingwave_expr::aggregate::PbAggKind;
236
237 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
238 use crate::utils::{Condition, IndexSet};
239
240 assert_eq!(self.phase, PlanPhase::Logical);
241 assert_eq!(self.plan.convention(), Convention::Logical);
242 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
243 bail!("subquery must return only one column");
244 };
245 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
246 let return_type = DataType::List(input_column_type.clone().into());
247 let agg = Agg::new(
248 vec![PlanAggCall {
249 agg_type: PbAggKind::ArrayAgg.into(),
250 return_type: return_type.clone(),
251 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
252 distinct: false,
253 order_by: self.required_order.column_orders,
254 filter: Condition::true_cond(),
255 direct_args: vec![],
256 }],
257 IndexSet::empty(),
258 self.plan,
259 );
260 Ok(LogicalProject::create(
261 agg.into(),
262 vec![
263 FunctionCall::new(
264 ExprType::Coalesce,
265 vec![
266 InputRef::new(0, return_type).into(),
267 ExprImpl::literal_list(
268 ListValue::empty(&input_column_type),
269 input_column_type,
270 ),
271 ],
272 )
273 .unwrap()
274 .into(),
275 ],
276 ))
277 }
278
279 pub fn gen_optimized_logical_plan_for_stream(&mut self) -> Result<PlanRef> {
281 assert_eq!(self.phase, PlanPhase::Logical);
282 assert_eq!(self.plan.convention(), Convention::Logical);
283 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
284 self.phase = PlanPhase::OptimizedLogicalForStream;
285 assert_eq!(self.plan.convention(), Convention::Logical);
286 Ok(self.plan.clone())
287 }
288
289 pub fn gen_optimized_logical_plan_for_batch(&mut self) -> Result<PlanRef> {
291 assert_eq!(self.phase, PlanPhase::Logical);
292 assert_eq!(self.plan.convention(), Convention::Logical);
293 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
294 self.phase = PlanPhase::OptimizedLogicalForBatch;
295 assert_eq!(self.plan.convention(), Convention::Logical);
296 Ok(self.plan.clone())
297 }
298
299 pub fn gen_batch_plan(&mut self) -> Result<PlanRef> {
301 assert_eq!(self.plan.convention(), Convention::Logical);
302 let mut plan = match self.phase {
303 PlanPhase::Logical => {
304 self.gen_optimized_logical_plan_for_batch()?
306 }
307 PlanPhase::OptimizedLogicalForBatch => self.plan.clone(),
308 PlanPhase::Batch | PlanPhase::OptimizedLogicalForStream | PlanPhase::Stream => {
309 panic!("unexpected phase")
310 }
311 };
312
313 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
314 return Err(ErrorCode::NotSupported(
315 "do not support temporal join for batch queries".to_owned(),
316 "please use temporal join in streaming queries".to_owned(),
317 )
318 .into());
319 }
320
321 let ctx = plan.ctx();
322 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
324
325 plan = const_eval_exprs(plan)?;
327
328 if ctx.is_explain_trace() {
329 ctx.trace("Const eval exprs:");
330 ctx.trace(plan.explain_to_string());
331 }
332
333 plan = plan.to_batch_with_order_required(&self.required_order)?;
335 if ctx.is_explain_trace() {
336 ctx.trace("To Batch Plan:");
337 ctx.trace(plan.explain_to_string());
338 }
339
340 plan = plan.optimize_by_rules(&OptimizationStage::new(
341 "Merge BatchProject",
342 vec![BatchProjectMergeRule::create()],
343 ApplyOrder::BottomUp,
344 ))?;
345
346 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
348
349 if ctx.is_explain_trace() {
350 ctx.trace("Inline Session Timezone:");
351 ctx.trace(plan.explain_to_string());
352 }
353
354 #[cfg(debug_assertions)]
355 InputRefValidator.validate(plan.clone());
356 assert!(
357 *plan.distribution() == Distribution::Single,
358 "{}",
359 plan.explain_to_string()
360 );
361 assert!(
362 !has_batch_exchange(plan.clone()),
363 "{}",
364 plan.explain_to_string()
365 );
366
367 let ctx = plan.ctx();
368 if ctx.is_explain_trace() {
369 ctx.trace("To Batch Physical Plan:");
370 ctx.trace(plan.explain_to_string());
371 }
372
373 self.plan = plan;
374 self.phase = PlanPhase::Batch;
375 assert_eq!(self.plan.convention(), Convention::Batch);
376 Ok(self.plan.clone())
377 }
378
379 pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
381 assert_eq!(self.phase, PlanPhase::Batch);
382 assert_eq!(self.plan.convention(), Convention::Batch);
383 self.required_dist = RequiredDist::single();
384 let mut plan = self.plan;
385
386 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
388
389 if self.out_fields.count_ones(..) != self.out_fields.len() {
391 plan =
392 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
393 }
394
395 let ctx = plan.ctx();
396 if ctx.is_explain_trace() {
397 ctx.trace("To Batch Distributed Plan:");
398 ctx.trace(plan.explain_to_string());
399 }
400 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
401 plan =
402 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
403 }
404
405 let plan = plan.optimize_by_rules(&OptimizationStage::new(
407 "Push Limit To Scan",
408 vec![BatchPushLimitToScanRule::create()],
409 ApplyOrder::BottomUp,
410 ))?;
411
412 let plan = plan.optimize_by_rules(&OptimizationStage::new(
413 "Iceberg Count Star",
414 vec![BatchIcebergCountStar::create()],
415 ApplyOrder::TopDown,
416 ))?;
417
418 let plan = plan.optimize_by_rules(&OptimizationStage::new(
421 "Iceberg Predicate Pushdown",
422 vec![BatchIcebergPredicatePushDownRule::create()],
423 ApplyOrder::BottomUp,
424 ))?;
425
426 assert_eq!(plan.convention(), Convention::Batch);
427 Ok(plan)
428 }
429
430 pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
432 assert_eq!(self.phase, PlanPhase::Batch);
433 assert_eq!(self.plan.convention(), Convention::Batch);
434 let mut plan = self.plan;
435
436 plan = plan.to_local_with_order_required(&self.required_order)?;
438
439 let insert_exchange = match plan.distribution() {
442 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
443 _ => true,
444 };
445 if insert_exchange {
446 plan =
447 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
448 }
449
450 if self.out_fields.count_ones(..) != self.out_fields.len() {
452 plan =
453 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
454 }
455
456 let ctx = plan.ctx();
457 if ctx.is_explain_trace() {
458 ctx.trace("To Batch Local Plan:");
459 ctx.trace(plan.explain_to_string());
460 }
461
462 let plan = plan.optimize_by_rules(&OptimizationStage::new(
464 "Push Limit To Scan",
465 vec![BatchPushLimitToScanRule::create()],
466 ApplyOrder::BottomUp,
467 ))?;
468
469 let plan = plan.optimize_by_rules(&OptimizationStage::new(
470 "Iceberg Count Star",
471 vec![BatchIcebergCountStar::create()],
472 ApplyOrder::TopDown,
473 ))?;
474
475 assert_eq!(plan.convention(), Convention::Batch);
476 Ok(plan)
477 }
478
479 fn gen_optimized_stream_plan(
481 &mut self,
482 emit_on_window_close: bool,
483 allow_snapshot_backfill: bool,
484 ) -> Result<PlanRef> {
485 assert_eq!(self.phase, PlanPhase::Logical);
486 assert_eq!(self.plan.convention(), Convention::Logical);
487 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
488 StreamScanType::SnapshotBackfill
489 } else if self.should_use_arrangement_backfill() {
490 StreamScanType::ArrangementBackfill
491 } else {
492 StreamScanType::Backfill
493 };
494 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
495 }
496
497 fn gen_optimized_stream_plan_inner(
498 &mut self,
499 emit_on_window_close: bool,
500 stream_scan_type: StreamScanType,
501 ) -> Result<PlanRef> {
502 assert_eq!(self.phase, PlanPhase::Logical);
503 assert_eq!(self.plan.convention(), Convention::Logical);
504 let ctx = self.plan.ctx();
505 let _explain_trace = ctx.is_explain_trace();
506
507 let mut plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
508
509 plan = plan.optimize_by_rules(&OptimizationStage::new(
510 "Merge StreamProject",
511 vec![StreamProjectMergeRule::create()],
512 ApplyOrder::BottomUp,
513 ))?;
514
515 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
519 plan = plan.optimize_by_rules(&OptimizationStage::new(
520 "Add Logstore for Unaligned join",
521 vec![AddLogstoreRule::create()],
522 ApplyOrder::BottomUp,
523 ))?;
524 }
525
526 if ctx.session_ctx().config().streaming_enable_delta_join() {
527 plan = plan.optimize_by_rules(&OptimizationStage::new(
530 "To IndexDeltaJoin",
531 vec![IndexDeltaJoinRule::create()],
532 ApplyOrder::BottomUp,
533 ))?;
534 }
535 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
537
538 if ctx.is_explain_trace() {
539 ctx.trace("Inline session timezone:");
540 ctx.trace(plan.explain_to_string());
541 }
542
543 plan = const_eval_exprs(plan)?;
545
546 if ctx.is_explain_trace() {
547 ctx.trace("Const eval exprs:");
548 ctx.trace(plan.explain_to_string());
549 }
550
551 #[cfg(debug_assertions)]
552 InputRefValidator.validate(plan.clone());
553
554 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
555 return Err(ErrorCode::NotSupported(
556 "exist dangling temporal scan".to_owned(),
557 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
558 ).into());
559 }
560
561 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
562 return Err(ErrorCode::NotSupported(
563 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
564 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
565 ).into());
566 }
567
568 self.plan = plan;
569 self.phase = PlanPhase::Stream;
570 assert_eq!(self.plan.convention(), Convention::Stream);
571 Ok(self.plan.clone())
572 }
573
574 fn gen_stream_plan(
576 &mut self,
577 emit_on_window_close: bool,
578 stream_scan_type: StreamScanType,
579 ) -> Result<PlanRef> {
580 assert_eq!(self.phase, PlanPhase::Logical);
581 assert_eq!(self.plan.convention(), Convention::Logical);
582 let ctx = self.plan.ctx();
583 let explain_trace = ctx.is_explain_trace();
584
585 let plan = match self.plan.convention() {
586 Convention::Logical => {
587 if !ctx
588 .session_ctx()
589 .config()
590 .streaming_allow_jsonb_in_stream_key()
591 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
592 {
593 return Err(ErrorCode::NotSupported(
594 err,
595 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
596 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
597 ).into());
598 }
599 let plan = self.gen_optimized_logical_plan_for_stream()?;
600 let (plan, out_col_change) = {
601 let (plan, out_col_change) =
602 plan.logical_rewrite_for_stream(&mut Default::default())?;
603 if out_col_change.is_injective() {
604 (plan, out_col_change)
605 } else {
606 let mut output_indices = (0..plan.schema().len()).collect_vec();
607 #[allow(unused_assignments)]
608 let (mut map, mut target_size) = out_col_change.into_parts();
609
610 target_size = plan.schema().len();
613 let mut tar_exists = vec![false; target_size];
614 for i in map.iter_mut().flatten() {
615 if tar_exists[*i] {
616 output_indices.push(*i);
617 *i = target_size;
618 target_size += 1;
619 } else {
620 tar_exists[*i] = true;
621 }
622 }
623 let plan =
624 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
625 let out_col_change = ColIndexMapping::new(map, target_size);
626 (plan.into(), out_col_change)
627 }
628 };
629
630 if explain_trace {
631 ctx.trace("Logical Rewrite For Stream:");
632 ctx.trace(plan.explain_to_string());
633 }
634
635 self.required_dist =
636 out_col_change.rewrite_required_distribution(&self.required_dist);
637 self.required_order = out_col_change
638 .rewrite_required_order(&self.required_order)
639 .unwrap();
640 self.out_fields = out_col_change.rewrite_bitset(&self.out_fields);
641 let plan = plan.to_stream_with_dist_required(
642 &self.required_dist,
643 &mut ToStreamContext::new_with_stream_scan_type(
644 emit_on_window_close,
645 stream_scan_type,
646 ),
647 )?;
648 stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)
649 }
650 _ => unreachable!(),
651 }?;
652
653 if explain_trace {
654 ctx.trace("To Stream Plan:");
655 ctx.trace(plan.explain_to_string());
656 }
657 Ok(plan)
658 }
659
660 fn compute_cardinality(&self) -> Cardinality {
664 assert_matches!(self.plan.convention(), Convention::Logical);
665 CardinalityVisitor.visit(self.plan.clone())
666 }
667
668 pub fn gen_table_plan(
670 mut self,
671 context: OptimizerContextRef,
672 table_name: String,
673 database_id: DatabaseId,
674 schema_id: SchemaId,
675 CreateTableInfo {
676 columns,
677 pk_column_ids,
678 row_id_index,
679 watermark_descs,
680 source_catalog,
681 version,
682 }: CreateTableInfo,
683 CreateTableProps {
684 definition,
685 append_only,
686 on_conflict,
687 with_version_column,
688 webhook_info,
689 engine,
690 }: CreateTableProps,
691 ) -> Result<StreamMaterialize> {
692 assert_eq!(self.phase, PlanPhase::Logical);
693 assert_eq!(self.plan.convention(), Convention::Logical);
694 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
696 assert_eq!(self.phase, PlanPhase::Stream);
697 assert_eq!(stream_plan.convention(), Convention::Stream);
698
699 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
700
701 let pk_column_indices = {
702 let mut id_to_idx = HashMap::new();
703
704 columns.iter().enumerate().for_each(|(idx, c)| {
705 id_to_idx.insert(c.column_id(), idx);
706 });
707 pk_column_ids
708 .iter()
709 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
711 };
712
713 fn inject_project_for_generated_column_if_needed(
714 columns: &[ColumnCatalog],
715 node: PlanRef,
716 ) -> Result<PlanRef> {
717 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
718 if let Some(exprs) = exprs {
719 let logical_project = generic::Project::new(exprs, node);
720 return Ok(StreamProject::new(logical_project).into());
721 }
722 Ok(node)
723 }
724
725 #[derive(PartialEq, Debug, Copy, Clone)]
726 enum PrimaryKeyKind {
727 UserDefinedPrimaryKey,
728 NonAppendOnlyRowIdPk,
729 AppendOnlyRowIdPk,
730 }
731
732 fn inject_dml_node(
733 columns: &[ColumnCatalog],
734 append_only: bool,
735 stream_plan: PlanRef,
736 pk_column_indices: &[usize],
737 kind: PrimaryKeyKind,
738 column_descs: Vec<ColumnDesc>,
739 ) -> Result<PlanRef> {
740 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
741
742 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
744
745 dml_node = match kind {
746 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
747 RequiredDist::hash_shard(pk_column_indices)
748 .enforce_if_not_satisfies(dml_node, &Order::any())?
749 }
750 PrimaryKeyKind::AppendOnlyRowIdPk => {
751 StreamExchange::new_no_shuffle(dml_node).into()
752 }
753 };
754
755 Ok(dml_node)
756 }
757
758 let kind = if let Some(row_id_index) = row_id_index {
759 assert_eq!(
760 pk_column_indices.iter().exactly_one().copied().unwrap(),
761 row_id_index
762 );
763 if append_only {
764 PrimaryKeyKind::AppendOnlyRowIdPk
765 } else {
766 PrimaryKeyKind::NonAppendOnlyRowIdPk
767 }
768 } else {
769 PrimaryKeyKind::UserDefinedPrimaryKey
770 };
771
772 let column_descs: Vec<ColumnDesc> = columns
773 .iter()
774 .filter(|&c| c.can_dml())
775 .map(|c| c.column_desc.clone())
776 .collect();
777
778 let mut not_null_idxs = vec![];
779 for (idx, column) in column_descs.iter().enumerate() {
780 if !column.nullable {
781 not_null_idxs.push(idx);
782 }
783 }
784
785 let version_column_index = if let Some(version_column) = with_version_column {
786 find_version_column_index(&columns, version_column)?
787 } else {
788 None
789 };
790
791 let with_external_source = source_catalog.is_some();
792 let union_inputs = if with_external_source {
793 let mut external_source_node = stream_plan;
794 external_source_node =
795 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
796 external_source_node = match kind {
797 PrimaryKeyKind::UserDefinedPrimaryKey => {
798 RequiredDist::hash_shard(&pk_column_indices)
799 .enforce_if_not_satisfies(external_source_node, &Order::any())?
800 }
801
802 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
803 StreamExchange::new_no_shuffle(external_source_node).into()
804 }
805 };
806
807 let dummy_source_node = LogicalSource::new(
808 None,
809 columns.clone(),
810 row_id_index,
811 SourceNodeKind::CreateTable,
812 context.clone(),
813 None,
814 )
815 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
816
817 let dml_node = inject_dml_node(
818 &columns,
819 append_only,
820 dummy_source_node,
821 &pk_column_indices,
822 kind,
823 column_descs,
824 )?;
825
826 vec![external_source_node, dml_node]
827 } else {
828 let dml_node = inject_dml_node(
829 &columns,
830 append_only,
831 stream_plan,
832 &pk_column_indices,
833 kind,
834 column_descs,
835 )?;
836
837 vec![dml_node]
838 };
839
840 let dists = union_inputs
841 .iter()
842 .map(|input| input.distribution())
843 .unique()
844 .collect_vec();
845
846 let dist = match &dists[..] {
847 &[Distribution::SomeShard, Distribution::HashShard(_)]
848 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
849 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
850 dist.clone()
851 }
852 _ => {
853 unreachable!()
854 }
855 };
856
857 let mut stream_plan = StreamUnion::new_with_dist(
858 Union {
859 all: true,
860 inputs: union_inputs,
861 source_col: None,
862 },
863 dist.clone(),
864 )
865 .into();
866
867 if !watermark_descs.is_empty() {
869 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
870 }
871
872 if let Some(row_id_index) = row_id_index {
874 match kind {
875 PrimaryKeyKind::UserDefinedPrimaryKey => {
876 unreachable!()
877 }
878 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
879 stream_plan = StreamRowIdGen::new_with_dist(
880 stream_plan,
881 row_id_index,
882 Distribution::HashShard(vec![row_id_index]),
883 )
884 .into();
885 }
886 }
887 }
888
889 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
890
891 if let ConflictBehavior::IgnoreConflict = conflict_behavior
892 && version_column_index.is_some()
893 {
894 Err(ErrorCode::InvalidParameterValue(
895 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
896 ))?
897 }
898
899 let retention_seconds = context.with_options().retention_seconds();
900
901 let table_required_dist = {
902 let mut bitset = FixedBitSet::with_capacity(columns.len());
903 for idx in &pk_column_indices {
904 bitset.insert(*idx);
905 }
906 RequiredDist::ShardByKey(bitset)
907 };
908
909 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
910
911 if !not_null_idxs.is_empty() {
912 stream_plan =
913 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
914 }
915
916 StreamMaterialize::create_for_table(
917 stream_plan,
918 table_name,
919 database_id,
920 schema_id,
921 table_required_dist,
922 Order::any(),
923 columns,
924 definition,
925 conflict_behavior,
926 version_column_index,
927 pk_column_indices,
928 row_id_index,
929 version,
930 retention_seconds,
931 webhook_info,
932 engine,
933 )
934 }
935
936 pub fn gen_materialize_plan(
938 mut self,
939 database_id: DatabaseId,
940 schema_id: SchemaId,
941 mv_name: String,
942 definition: String,
943 emit_on_window_close: bool,
944 ) -> Result<StreamMaterialize> {
945 let cardinality = self.compute_cardinality();
946 assert_eq!(self.phase, PlanPhase::Logical);
947 assert_eq!(self.plan.convention(), Convention::Logical);
948 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
949 assert_eq!(self.phase, PlanPhase::Stream);
950 assert_eq!(stream_plan.convention(), Convention::Stream);
951 StreamMaterialize::create(
952 stream_plan,
953 mv_name,
954 database_id,
955 schema_id,
956 self.required_dist.clone(),
957 self.required_order.clone(),
958 self.out_fields.clone(),
959 self.out_names.clone(),
960 definition,
961 TableType::MaterializedView,
962 cardinality,
963 None,
964 )
965 }
966
967 pub fn gen_index_plan(
969 mut self,
970 index_name: String,
971 database_id: DatabaseId,
972 schema_id: SchemaId,
973 definition: String,
974 retention_seconds: Option<NonZeroU32>,
975 ) -> Result<StreamMaterialize> {
976 let cardinality = self.compute_cardinality();
977 assert_eq!(self.phase, PlanPhase::Logical);
978 assert_eq!(self.plan.convention(), Convention::Logical);
979 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
980 assert_eq!(self.phase, PlanPhase::Stream);
981 assert_eq!(stream_plan.convention(), Convention::Stream);
982
983 StreamMaterialize::create(
984 stream_plan,
985 index_name,
986 database_id,
987 schema_id,
988 self.required_dist.clone(),
989 self.required_order.clone(),
990 self.out_fields.clone(),
991 self.out_names.clone(),
992 definition,
993 TableType::Index,
994 cardinality,
995 retention_seconds,
996 )
997 }
998
999 #[allow(clippy::too_many_arguments)]
1001 pub fn gen_sink_plan(
1002 &mut self,
1003 sink_name: String,
1004 definition: String,
1005 properties: WithOptionsSecResolved,
1006 emit_on_window_close: bool,
1007 db_name: String,
1008 sink_from_table_name: String,
1009 format_desc: Option<SinkFormatDesc>,
1010 without_backfill: bool,
1011 target_table: Option<Arc<TableCatalog>>,
1012 partition_info: Option<PartitionComputeInfo>,
1013 user_specified_columns: bool,
1014 ) -> Result<StreamSink> {
1015 let stream_scan_type = if without_backfill {
1016 StreamScanType::UpstreamOnly
1017 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1018 StreamScanType::SnapshotBackfill
1020 } else if self.should_use_arrangement_backfill() {
1021 StreamScanType::ArrangementBackfill
1022 } else {
1023 StreamScanType::Backfill
1024 };
1025 assert_eq!(self.phase, PlanPhase::Logical);
1026 assert_eq!(self.plan.convention(), Convention::Logical);
1027 let stream_plan =
1028 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1029 assert_eq!(self.phase, PlanPhase::Stream);
1030 assert_eq!(stream_plan.convention(), Convention::Stream);
1031 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1032 let columns = t.columns_without_rw_timestamp();
1033 self.target_columns_to_plan_mapping(&columns, user_specified_columns)
1034 });
1035 StreamSink::create(
1036 stream_plan,
1037 sink_name,
1038 db_name,
1039 sink_from_table_name,
1040 target_table,
1041 target_columns_to_plan_mapping,
1042 self.required_dist.clone(),
1043 self.required_order.clone(),
1044 self.out_fields.clone(),
1045 self.out_names.clone(),
1046 definition,
1047 properties,
1048 format_desc,
1049 partition_info,
1050 )
1051 }
1052
1053 pub fn should_use_arrangement_backfill(&self) -> bool {
1054 let ctx = self.plan.ctx();
1055 let session_ctx = ctx.session_ctx();
1056 let arrangement_backfill_enabled = session_ctx
1057 .env()
1058 .streaming_config()
1059 .developer
1060 .enable_arrangement_backfill;
1061 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1062 }
1063
1064 pub fn should_use_snapshot_backfill(&self) -> bool {
1065 self.plan
1066 .ctx()
1067 .session_ctx()
1068 .config()
1069 .streaming_use_snapshot_backfill()
1070 }
1071
1072 pub fn target_columns_to_plan_mapping(
1074 &self,
1075 tar_cols: &[ColumnCatalog],
1076 user_specified_columns: bool,
1077 ) -> Vec<Option<usize>> {
1078 #[allow(clippy::disallowed_methods)]
1079 let visible_cols: Vec<(usize, String)> = self
1080 .out_fields
1081 .ones()
1082 .zip_eq(self.out_names.iter().cloned())
1083 .collect_vec();
1084
1085 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1086 let visible_col_idxes_by_name = visible_cols
1087 .iter()
1088 .map(|(i, name)| (name.as_ref(), *i))
1089 .collect::<BTreeMap<_, _>>();
1090
1091 tar_cols
1092 .iter()
1093 .enumerate()
1094 .filter(|(_, tar_col)| tar_col.can_dml())
1095 .map(|(tar_i, tar_col)| {
1096 if user_specified_columns {
1097 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1098 } else {
1099 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1100 }
1101 })
1102 .collect()
1103 }
1104}
1105
1106fn find_version_column_index(
1107 column_catalog: &Vec<ColumnCatalog>,
1108 version_column_name: String,
1109) -> Result<Option<usize>> {
1110 for (index, column) in column_catalog.iter().enumerate() {
1111 if column.column_desc.name == version_column_name {
1112 if let &DataType::Jsonb
1113 | &DataType::List(_)
1114 | &DataType::Struct(_)
1115 | &DataType::Bytea
1116 | &DataType::Boolean = column.data_type()
1117 {
1118 Err(ErrorCode::InvalidParameterValue(
1119 "The specified version column data type is invalid.".to_owned(),
1120 ))?
1121 }
1122 return Ok(Some(index));
1123 }
1124 }
1125 Err(ErrorCode::InvalidParameterValue(
1126 "The specified version column name is not in the current columns.".to_owned(),
1127 ))?
1128}
1129
1130fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1131 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1132
1133 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1134 if let Some(error) = const_eval_rewriter.error {
1135 return Err(error);
1136 }
1137 Ok(plan)
1138}
1139
1140fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1141 let mut v = TimestamptzExprFinder::default();
1142 plan.visit_exprs_recursive(&mut v);
1143 if v.has() {
1144 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1145 } else {
1146 Ok(plan)
1147 }
1148}
1149
1150fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1151 if plan.node_type() == PlanNodeType::BatchExchange {
1152 return false;
1153 }
1154 is_candidate(plan)
1155 || plan
1156 .inputs()
1157 .iter()
1158 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1159}
1160
1161fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1167 fn is_user_table(plan: &PlanRef) -> bool {
1168 plan.node_type() == PlanNodeType::BatchSeqScan
1169 }
1170
1171 fn is_log_table(plan: &PlanRef) -> bool {
1172 plan.node_type() == PlanNodeType::BatchLogSeqScan
1173 }
1174
1175 fn is_source(plan: &PlanRef) -> bool {
1176 plan.node_type() == PlanNodeType::BatchSource
1177 || plan.node_type() == PlanNodeType::BatchKafkaScan
1178 || plan.node_type() == PlanNodeType::BatchIcebergScan
1179 }
1180
1181 fn is_insert(plan: &PlanRef) -> bool {
1182 plan.node_type() == PlanNodeType::BatchInsert
1183 }
1184
1185 fn is_update(plan: &PlanRef) -> bool {
1186 plan.node_type() == PlanNodeType::BatchUpdate
1187 }
1188
1189 fn is_delete(plan: &PlanRef) -> bool {
1190 plan.node_type() == PlanNodeType::BatchDelete
1191 }
1192
1193 assert_eq!(plan.distribution(), &Distribution::Single);
1194 exist_and_no_exchange_before(&plan, is_user_table)
1195 || exist_and_no_exchange_before(&plan, is_source)
1196 || exist_and_no_exchange_before(&plan, is_insert)
1197 || exist_and_no_exchange_before(&plan, is_update)
1198 || exist_and_no_exchange_before(&plan, is_delete)
1199 || exist_and_no_exchange_before(&plan, is_log_table)
1200}
1201
1202fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1205 fn is_user_table(plan: &PlanRef) -> bool {
1206 plan.node_type() == PlanNodeType::BatchSeqScan
1207 }
1208
1209 fn is_source(plan: &PlanRef) -> bool {
1210 plan.node_type() == PlanNodeType::BatchSource
1211 || plan.node_type() == PlanNodeType::BatchKafkaScan
1212 || plan.node_type() == PlanNodeType::BatchIcebergScan
1213 }
1214
1215 fn is_insert(plan: &PlanRef) -> bool {
1216 plan.node_type() == PlanNodeType::BatchInsert
1217 }
1218
1219 assert_eq!(plan.distribution(), &Distribution::Single);
1220 exist_and_no_exchange_before(&plan, is_user_table)
1221 || exist_and_no_exchange_before(&plan, is_source)
1222 || exist_and_no_exchange_before(&plan, is_insert)
1223}
1224
1225#[cfg(test)]
1226mod tests {
1227 use super::*;
1228 use crate::optimizer::plan_node::LogicalValues;
1229
1230 #[tokio::test]
1231 async fn test_as_subplan() {
1232 let ctx = OptimizerContext::mock().await;
1233 let values = LogicalValues::new(
1234 vec![],
1235 Schema::new(vec![
1236 Field::with_name(DataType::Int32, "v1"),
1237 Field::with_name(DataType::Varchar, "v2"),
1238 ]),
1239 ctx,
1240 )
1241 .into();
1242 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1243 let out_names = vec!["v1".into()];
1244 let root = PlanRoot::new_with_logical_plan(
1245 values,
1246 RequiredDist::Any,
1247 Order::any(),
1248 out_fields,
1249 out_names,
1250 );
1251 let subplan = root.into_unordered_subplan();
1252 assert_eq!(
1253 subplan.schema(),
1254 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1255 );
1256 }
1257}