1use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29pub use plan_rewriter::PlanRewriter;
30
31mod plan_visitor;
32
33pub use plan_visitor::{
34 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
35 SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::assert_matches::assert_matches;
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::sink::catalog::SinkFormatDesc;
62use risingwave_pb::stream_plan::StreamScanType;
63
64use self::heuristic_optimizer::ApplyOrder;
65use self::plan_node::generic::{self, PhysicalPlanRef};
66use self::plan_node::{
67 BatchProject, Convention, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
68 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
69 ToStreamContext, stream_enforce_eowc_requirement,
70};
71#[cfg(debug_assertions)]
72use self::plan_visitor::InputRefValidator;
73use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
74use self::property::{Cardinality, RequiredDist};
75use self::rule::*;
76use crate::TableCatalog;
77use crate::catalog::table_catalog::TableType;
78use crate::catalog::{DatabaseId, SchemaId};
79use crate::error::{ErrorCode, Result};
80use crate::expr::TimestamptzExprFinder;
81use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
82use crate::optimizer::plan_node::generic::{SourceNodeKind, Union};
83use crate::optimizer::plan_node::{
84 BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion,
85 ToStream, VisitExprsRecursive,
86};
87use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
88use crate::optimizer::property::Distribution;
89use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
90
91#[derive(Educe)]
102#[educe(Debug, Clone)]
103pub struct PlanRoot<P> {
104 pub plan: PlanRef,
106 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
108 _phase: PhantomData<P>,
109 required_dist: RequiredDist,
110 required_order: Order,
111 out_fields: FixedBitSet,
112 out_names: Vec<String>,
113}
114
115pub trait PlanPhase {}
121
122macro_rules! for_all_phase {
123 () => {
124 for_all_phase! {
125 Logical,
126 BatchOptimizedLogical,
127 StreamOptimizedLogical,
128 Batch,
129 Stream
130 }
131 };
132 ($($phase:ident),+ $(,)?) => {
133 $(
134 paste::paste! {
135 pub struct [< PlanPhase$phase >];
136 impl PlanPhase for [< PlanPhase$phase >] {}
137 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
138 }
139 )+
140 }
141}
142
143for_all_phase!();
144
145impl LogicalPlanRoot {
146 pub fn new_with_logical_plan(
147 plan: PlanRef,
148 required_dist: RequiredDist,
149 required_order: Order,
150 out_fields: FixedBitSet,
151 out_names: Vec<String>,
152 ) -> Self {
153 assert_eq!(plan.convention(), Convention::Logical);
154 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
155 }
156}
157
158impl BatchPlanRoot {
159 pub fn new_with_batch_plan(
160 plan: PlanRef,
161 required_dist: RequiredDist,
162 required_order: Order,
163 out_fields: FixedBitSet,
164 out_names: Vec<String>,
165 ) -> Self {
166 assert_eq!(plan.convention(), Convention::Batch);
167 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
168 }
169}
170
171impl<P: PlanPhase> PlanRoot<P> {
172 fn new_inner(
173 plan: PlanRef,
174 required_dist: RequiredDist,
175 required_order: Order,
176 out_fields: FixedBitSet,
177 out_names: Vec<String>,
178 ) -> Self {
179 let input_schema = plan.schema();
180 assert_eq!(input_schema.fields().len(), out_fields.len());
181 assert_eq!(out_fields.count_ones(..), out_names.len());
182
183 Self {
184 plan,
185 _phase: PhantomData,
186 required_dist,
187 required_order,
188 out_fields,
189 out_names,
190 }
191 }
192
193 fn into_phase<P2: PlanPhase>(self) -> PlanRoot<P2> {
194 PlanRoot {
195 plan: self.plan,
196 _phase: PhantomData,
197 required_dist: self.required_dist,
198 required_order: self.required_order,
199 out_fields: self.out_fields,
200 out_names: self.out_names,
201 }
202 }
203
204 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
209 if out_names.len() != self.out_fields.count_ones(..) {
210 Err(ErrorCode::InvalidInputSyntax(
211 "number of column names does not match number of columns".to_owned(),
212 ))?
213 }
214 self.out_names = out_names;
215 Ok(())
216 }
217
218 pub fn schema(&self) -> Schema {
220 Schema {
223 fields: self
224 .out_fields
225 .ones()
226 .map(|i| self.plan.schema().fields()[i].clone())
227 .zip_eq_debug(&self.out_names)
228 .map(|(field, name)| Field {
229 name: name.clone(),
230 ..field
231 })
232 .collect(),
233 }
234 }
235}
236
237impl LogicalPlanRoot {
238 pub fn into_unordered_subplan(self) -> PlanRef {
242 assert_eq!(self.plan.convention(), Convention::Logical);
243 if self.out_fields.count_ones(..) == self.out_fields.len() {
244 return self.plan;
245 }
246 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
247 }
248
249 pub fn into_array_agg(self) -> Result<PlanRef> {
253 use generic::Agg;
254 use plan_node::PlanAggCall;
255 use risingwave_common::types::ListValue;
256 use risingwave_expr::aggregate::PbAggKind;
257
258 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
259 use crate::utils::{Condition, IndexSet};
260
261 assert_eq!(self.plan.convention(), Convention::Logical);
262 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
263 bail!("subquery must return only one column");
264 };
265 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
266 let return_type = DataType::List(input_column_type.clone().into());
267 let agg = Agg::new(
268 vec![PlanAggCall {
269 agg_type: PbAggKind::ArrayAgg.into(),
270 return_type: return_type.clone(),
271 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
272 distinct: false,
273 order_by: self.required_order.column_orders,
274 filter: Condition::true_cond(),
275 direct_args: vec![],
276 }],
277 IndexSet::empty(),
278 self.plan,
279 );
280 Ok(LogicalProject::create(
281 agg.into(),
282 vec![
283 FunctionCall::new(
284 ExprType::Coalesce,
285 vec![
286 InputRef::new(0, return_type).into(),
287 ExprImpl::literal_list(
288 ListValue::empty(&input_column_type),
289 input_column_type,
290 ),
291 ],
292 )
293 .unwrap()
294 .into(),
295 ],
296 ))
297 }
298
299 pub fn gen_optimized_logical_plan_for_stream(
301 mut self,
302 ) -> Result<StreamOptimizedLogicalPlanRoot> {
303 assert_eq!(self.plan.convention(), Convention::Logical);
304 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
305 assert_eq!(self.plan.convention(), Convention::Logical);
306 Ok(self.into_phase())
307 }
308
309 pub fn gen_optimized_logical_plan_for_batch(mut self) -> Result<BatchOptimizedLogicalPlanRoot> {
311 assert_eq!(self.plan.convention(), Convention::Logical);
312 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
313 assert_eq!(self.plan.convention(), Convention::Logical);
314 Ok(self.into_phase())
315 }
316
317 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 self.gen_optimized_logical_plan_for_batch()?
319 .gen_batch_plan()
320 }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324 pub fn gen_batch_plan(mut self) -> Result<BatchPlanRoot> {
326 assert_eq!(self.plan.convention(), Convention::Logical);
327
328 let mut plan = self.plan;
329
330 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
331 return Err(ErrorCode::NotSupported(
332 "do not support temporal join for batch queries".to_owned(),
333 "please use temporal join in streaming queries".to_owned(),
334 )
335 .into());
336 }
337
338 let ctx = plan.ctx();
339 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
341
342 plan = const_eval_exprs(plan)?;
344
345 if ctx.is_explain_trace() {
346 ctx.trace("Const eval exprs:");
347 ctx.trace(plan.explain_to_string());
348 }
349
350 plan = plan.to_batch_with_order_required(&self.required_order)?;
352 if ctx.is_explain_trace() {
353 ctx.trace("To Batch Plan:");
354 ctx.trace(plan.explain_to_string());
355 }
356
357 plan = plan.optimize_by_rules(&OptimizationStage::new(
358 "Merge BatchProject",
359 vec![BatchProjectMergeRule::create()],
360 ApplyOrder::BottomUp,
361 ))?;
362
363 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
365
366 if ctx.is_explain_trace() {
367 ctx.trace("Inline Session Timezone:");
368 ctx.trace(plan.explain_to_string());
369 }
370
371 #[cfg(debug_assertions)]
372 InputRefValidator.validate(plan.clone());
373 assert!(
374 *plan.distribution() == Distribution::Single,
375 "{}",
376 plan.explain_to_string()
377 );
378 assert!(
379 !has_batch_exchange(plan.clone()),
380 "{}",
381 plan.explain_to_string()
382 );
383
384 let ctx = plan.ctx();
385 if ctx.is_explain_trace() {
386 ctx.trace("To Batch Physical Plan:");
387 ctx.trace(plan.explain_to_string());
388 }
389
390 self.plan = plan;
391 assert_eq!(self.plan.convention(), Convention::Batch);
392 Ok(self.into_phase())
393 }
394}
395
396impl BatchPlanRoot {
397 pub fn gen_batch_distributed_plan(mut self) -> Result<PlanRef> {
399 assert_eq!(self.plan.convention(), Convention::Batch);
400 self.required_dist = RequiredDist::single();
401 let mut plan = self.plan;
402
403 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
405
406 if self.out_fields.count_ones(..) != self.out_fields.len() {
408 plan =
409 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
410 }
411
412 let ctx = plan.ctx();
413 if ctx.is_explain_trace() {
414 ctx.trace("To Batch Distributed Plan:");
415 ctx.trace(plan.explain_to_string());
416 }
417 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
418 plan =
419 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
420 }
421
422 let plan = plan.optimize_by_rules(&OptimizationStage::new(
424 "Push Limit To Scan",
425 vec![BatchPushLimitToScanRule::create()],
426 ApplyOrder::BottomUp,
427 ))?;
428
429 let plan = plan.optimize_by_rules(&OptimizationStage::new(
430 "Iceberg Count Star",
431 vec![BatchIcebergCountStar::create()],
432 ApplyOrder::TopDown,
433 ))?;
434
435 let plan = plan.optimize_by_rules(&OptimizationStage::new(
438 "Iceberg Predicate Pushdown",
439 vec![BatchIcebergPredicatePushDownRule::create()],
440 ApplyOrder::BottomUp,
441 ))?;
442
443 assert_eq!(plan.convention(), Convention::Batch);
444 Ok(plan)
445 }
446
447 pub fn gen_batch_local_plan(self) -> Result<PlanRef> {
449 assert_eq!(self.plan.convention(), Convention::Batch);
450 let mut plan = self.plan;
451
452 plan = plan.to_local_with_order_required(&self.required_order)?;
454
455 let insert_exchange = match plan.distribution() {
458 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
459 _ => true,
460 };
461 if insert_exchange {
462 plan =
463 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
464 }
465
466 if self.out_fields.count_ones(..) != self.out_fields.len() {
468 plan =
469 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
470 }
471
472 let ctx = plan.ctx();
473 if ctx.is_explain_trace() {
474 ctx.trace("To Batch Local Plan:");
475 ctx.trace(plan.explain_to_string());
476 }
477
478 let plan = plan.optimize_by_rules(&OptimizationStage::new(
480 "Push Limit To Scan",
481 vec![BatchPushLimitToScanRule::create()],
482 ApplyOrder::BottomUp,
483 ))?;
484
485 let plan = plan.optimize_by_rules(&OptimizationStage::new(
486 "Iceberg Count Star",
487 vec![BatchIcebergCountStar::create()],
488 ApplyOrder::TopDown,
489 ))?;
490
491 assert_eq!(plan.convention(), Convention::Batch);
492 Ok(plan)
493 }
494}
495
496impl LogicalPlanRoot {
497 fn gen_optimized_stream_plan(
499 self,
500 emit_on_window_close: bool,
501 allow_snapshot_backfill: bool,
502 ) -> Result<StreamOptimizedLogicalPlanRoot> {
503 assert_eq!(self.plan.convention(), Convention::Logical);
504 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
505 StreamScanType::SnapshotBackfill
506 } else if self.should_use_arrangement_backfill() {
507 StreamScanType::ArrangementBackfill
508 } else {
509 StreamScanType::Backfill
510 };
511 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
512 }
513
514 fn gen_optimized_stream_plan_inner(
515 self,
516 emit_on_window_close: bool,
517 stream_scan_type: StreamScanType,
518 ) -> Result<StreamOptimizedLogicalPlanRoot> {
519 assert_eq!(self.plan.convention(), Convention::Logical);
520 let ctx = self.plan.ctx();
521 let _explain_trace = ctx.is_explain_trace();
522
523 let mut optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
524 let mut plan = optimized_plan.plan;
525
526 plan = plan.optimize_by_rules(&OptimizationStage::new(
527 "Merge StreamProject",
528 vec![StreamProjectMergeRule::create()],
529 ApplyOrder::BottomUp,
530 ))?;
531
532 if ctx
533 .session_ctx()
534 .config()
535 .streaming_separate_consecutive_join()
536 {
537 plan = plan.optimize_by_rules(&OptimizationStage::new(
538 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
539 vec![SeparateConsecutiveJoinRule::create()],
540 ApplyOrder::BottomUp,
541 ))?;
542 }
543
544 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
548 plan = plan.optimize_by_rules(&OptimizationStage::new(
549 "Add Logstore for Unaligned join",
550 vec![AddLogstoreRule::create()],
551 ApplyOrder::BottomUp,
552 ))?;
553 }
554
555 if ctx.session_ctx().config().streaming_enable_delta_join() {
556 plan = plan.optimize_by_rules(&OptimizationStage::new(
559 "To IndexDeltaJoin",
560 vec![IndexDeltaJoinRule::create()],
561 ApplyOrder::BottomUp,
562 ))?;
563 }
564 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
566
567 if ctx.is_explain_trace() {
568 ctx.trace("Inline session timezone:");
569 ctx.trace(plan.explain_to_string());
570 }
571
572 plan = const_eval_exprs(plan)?;
574
575 if ctx.is_explain_trace() {
576 ctx.trace("Const eval exprs:");
577 ctx.trace(plan.explain_to_string());
578 }
579
580 #[cfg(debug_assertions)]
581 InputRefValidator.validate(plan.clone());
582
583 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
584 return Err(ErrorCode::NotSupported(
585 "exist dangling temporal scan".to_owned(),
586 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
587 ).into());
588 }
589
590 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
591 return Err(ErrorCode::NotSupported(
592 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
593 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
594 ).into());
595 }
596
597 optimized_plan.plan = plan;
598 assert_eq!(optimized_plan.plan.convention(), Convention::Stream);
599 Ok(optimized_plan.into_phase())
600 }
601
602 fn gen_stream_plan(
604 self,
605 emit_on_window_close: bool,
606 stream_scan_type: StreamScanType,
607 ) -> Result<StreamOptimizedLogicalPlanRoot> {
608 assert_eq!(self.plan.convention(), Convention::Logical);
609 let ctx = self.plan.ctx();
610 let explain_trace = ctx.is_explain_trace();
611
612 let plan = match self.plan.convention() {
613 Convention::Logical => {
614 if !ctx
615 .session_ctx()
616 .config()
617 .streaming_allow_jsonb_in_stream_key()
618 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
619 {
620 return Err(ErrorCode::NotSupported(
621 err,
622 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
623 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
624 ).into());
625 }
626 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
627 let mut plan = optimized_plan.plan;
628 let out_col_change;
629 (plan, out_col_change) = {
630 let (plan, out_col_change) =
631 plan.logical_rewrite_for_stream(&mut Default::default())?;
632 if out_col_change.is_injective() {
633 (plan, out_col_change)
634 } else {
635 let mut output_indices = (0..plan.schema().len()).collect_vec();
636 #[allow(unused_assignments)]
637 let (mut map, mut target_size) = out_col_change.into_parts();
638
639 target_size = plan.schema().len();
642 let mut tar_exists = vec![false; target_size];
643 for i in map.iter_mut().flatten() {
644 if tar_exists[*i] {
645 output_indices.push(*i);
646 *i = target_size;
647 target_size += 1;
648 } else {
649 tar_exists[*i] = true;
650 }
651 }
652 let plan =
653 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
654 let out_col_change = ColIndexMapping::new(map, target_size);
655 (plan.into(), out_col_change)
656 }
657 };
658
659 if explain_trace {
660 ctx.trace("Logical Rewrite For Stream:");
661 ctx.trace(plan.explain_to_string());
662 }
663
664 optimized_plan.required_dist =
665 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
666 optimized_plan.required_order = out_col_change
667 .rewrite_required_order(&optimized_plan.required_order)
668 .unwrap();
669 optimized_plan.out_fields =
670 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
671 plan = plan.to_stream_with_dist_required(
672 &optimized_plan.required_dist,
673 &mut ToStreamContext::new_with_stream_scan_type(
674 emit_on_window_close,
675 stream_scan_type,
676 ),
677 )?;
678 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
679 optimized_plan.plan = plan;
680 optimized_plan
681 }
682 _ => unreachable!(),
683 };
684
685 if explain_trace {
686 ctx.trace("To Stream Plan:");
687 ctx.trace(plan.plan.explain_to_string());
688 }
689 Ok(plan)
690 }
691
692 fn compute_cardinality(&self) -> Cardinality {
696 assert_matches!(self.plan.convention(), Convention::Logical);
697 CardinalityVisitor.visit(self.plan.clone())
698 }
699
700 pub fn gen_table_plan(
702 self,
703 context: OptimizerContextRef,
704 table_name: String,
705 database_id: DatabaseId,
706 schema_id: SchemaId,
707 CreateTableInfo {
708 columns,
709 pk_column_ids,
710 row_id_index,
711 watermark_descs,
712 source_catalog,
713 version,
714 }: CreateTableInfo,
715 CreateTableProps {
716 definition,
717 append_only,
718 on_conflict,
719 with_version_column,
720 webhook_info,
721 engine,
722 }: CreateTableProps,
723 ) -> Result<StreamMaterialize> {
724 assert_eq!(self.plan.convention(), Convention::Logical);
725 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
727 assert_eq!(stream_plan.plan.convention(), Convention::Stream);
728
729 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
730
731 let pk_column_indices = {
732 let mut id_to_idx = HashMap::new();
733
734 columns.iter().enumerate().for_each(|(idx, c)| {
735 id_to_idx.insert(c.column_id(), idx);
736 });
737 pk_column_ids
738 .iter()
739 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
741 };
742
743 fn inject_project_for_generated_column_if_needed(
744 columns: &[ColumnCatalog],
745 node: PlanRef,
746 ) -> Result<PlanRef> {
747 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
748 if let Some(exprs) = exprs {
749 let logical_project = generic::Project::new(exprs, node);
750 return Ok(StreamProject::new(logical_project).into());
751 }
752 Ok(node)
753 }
754
755 #[derive(PartialEq, Debug, Copy, Clone)]
756 enum PrimaryKeyKind {
757 UserDefinedPrimaryKey,
758 NonAppendOnlyRowIdPk,
759 AppendOnlyRowIdPk,
760 }
761
762 fn inject_dml_node(
763 columns: &[ColumnCatalog],
764 append_only: bool,
765 stream_plan: PlanRef,
766 pk_column_indices: &[usize],
767 kind: PrimaryKeyKind,
768 column_descs: Vec<ColumnDesc>,
769 ) -> Result<PlanRef> {
770 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
771
772 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
774
775 dml_node = match kind {
776 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
777 RequiredDist::hash_shard(pk_column_indices)
778 .enforce_if_not_satisfies(dml_node, &Order::any())?
779 }
780 PrimaryKeyKind::AppendOnlyRowIdPk => {
781 StreamExchange::new_no_shuffle(dml_node).into()
782 }
783 };
784
785 Ok(dml_node)
786 }
787
788 let kind = if let Some(row_id_index) = row_id_index {
789 assert_eq!(
790 pk_column_indices.iter().exactly_one().copied().unwrap(),
791 row_id_index
792 );
793 if append_only {
794 PrimaryKeyKind::AppendOnlyRowIdPk
795 } else {
796 PrimaryKeyKind::NonAppendOnlyRowIdPk
797 }
798 } else {
799 PrimaryKeyKind::UserDefinedPrimaryKey
800 };
801
802 let column_descs: Vec<ColumnDesc> = columns
803 .iter()
804 .filter(|&c| c.can_dml())
805 .map(|c| c.column_desc.clone())
806 .collect();
807
808 let mut not_null_idxs = vec![];
809 for (idx, column) in column_descs.iter().enumerate() {
810 if !column.nullable {
811 not_null_idxs.push(idx);
812 }
813 }
814
815 let version_column_index = if let Some(version_column) = with_version_column {
816 find_version_column_index(&columns, version_column)?
817 } else {
818 None
819 };
820
821 let with_external_source = source_catalog.is_some();
822 let union_inputs = if with_external_source {
823 let mut external_source_node = stream_plan.plan;
824 external_source_node =
825 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
826 external_source_node = match kind {
827 PrimaryKeyKind::UserDefinedPrimaryKey => {
828 RequiredDist::hash_shard(&pk_column_indices)
829 .enforce_if_not_satisfies(external_source_node, &Order::any())?
830 }
831
832 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
833 StreamExchange::new_no_shuffle(external_source_node).into()
834 }
835 };
836
837 let dummy_source_node = LogicalSource::new(
838 None,
839 columns.clone(),
840 row_id_index,
841 SourceNodeKind::CreateTable,
842 context.clone(),
843 None,
844 )
845 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
846
847 let dml_node = inject_dml_node(
848 &columns,
849 append_only,
850 dummy_source_node,
851 &pk_column_indices,
852 kind,
853 column_descs,
854 )?;
855
856 vec![external_source_node, dml_node]
857 } else {
858 let dml_node = inject_dml_node(
859 &columns,
860 append_only,
861 stream_plan.plan,
862 &pk_column_indices,
863 kind,
864 column_descs,
865 )?;
866
867 vec![dml_node]
868 };
869
870 let dists = union_inputs
871 .iter()
872 .map(|input| input.distribution())
873 .unique()
874 .collect_vec();
875
876 let dist = match &dists[..] {
877 &[Distribution::SomeShard, Distribution::HashShard(_)]
878 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
879 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
880 dist.clone()
881 }
882 _ => {
883 unreachable!()
884 }
885 };
886
887 let mut stream_plan = StreamUnion::new_with_dist(
888 Union {
889 all: true,
890 inputs: union_inputs,
891 source_col: None,
892 },
893 dist.clone(),
894 )
895 .into();
896
897 if !watermark_descs.is_empty() {
899 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
900 }
901
902 if let Some(row_id_index) = row_id_index {
904 match kind {
905 PrimaryKeyKind::UserDefinedPrimaryKey => {
906 unreachable!()
907 }
908 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
909 stream_plan = StreamRowIdGen::new_with_dist(
910 stream_plan,
911 row_id_index,
912 Distribution::HashShard(vec![row_id_index]),
913 )
914 .into();
915 }
916 }
917 }
918
919 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
920
921 if let ConflictBehavior::IgnoreConflict = conflict_behavior
922 && version_column_index.is_some()
923 {
924 Err(ErrorCode::InvalidParameterValue(
925 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
926 ))?
927 }
928
929 let retention_seconds = context.with_options().retention_seconds();
930
931 let table_required_dist = {
932 let mut bitset = FixedBitSet::with_capacity(columns.len());
933 for idx in &pk_column_indices {
934 bitset.insert(*idx);
935 }
936 RequiredDist::ShardByKey(bitset)
937 };
938
939 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
940
941 if !not_null_idxs.is_empty() {
942 stream_plan =
943 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
944 }
945
946 StreamMaterialize::create_for_table(
947 stream_plan,
948 table_name,
949 database_id,
950 schema_id,
951 table_required_dist,
952 Order::any(),
953 columns,
954 definition,
955 conflict_behavior,
956 version_column_index,
957 pk_column_indices,
958 row_id_index,
959 version,
960 retention_seconds,
961 webhook_info,
962 engine,
963 )
964 }
965
966 pub fn gen_materialize_plan(
968 self,
969 database_id: DatabaseId,
970 schema_id: SchemaId,
971 mv_name: String,
972 definition: String,
973 emit_on_window_close: bool,
974 ) -> Result<StreamMaterialize> {
975 let cardinality = self.compute_cardinality();
976 assert_eq!(self.plan.convention(), Convention::Logical);
977 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
978 assert_eq!(stream_plan.plan.convention(), Convention::Stream);
979 StreamMaterialize::create(
980 stream_plan,
981 mv_name,
982 database_id,
983 schema_id,
984 definition,
985 TableType::MaterializedView,
986 cardinality,
987 None,
988 )
989 }
990
991 pub fn gen_index_plan(
993 self,
994 index_name: String,
995 database_id: DatabaseId,
996 schema_id: SchemaId,
997 definition: String,
998 retention_seconds: Option<NonZeroU32>,
999 ) -> Result<StreamMaterialize> {
1000 let cardinality = self.compute_cardinality();
1001 assert_eq!(self.plan.convention(), Convention::Logical);
1002 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1003 assert_eq!(stream_plan.plan.convention(), Convention::Stream);
1004
1005 StreamMaterialize::create(
1006 stream_plan,
1007 index_name,
1008 database_id,
1009 schema_id,
1010 definition,
1011 TableType::Index,
1012 cardinality,
1013 retention_seconds,
1014 )
1015 }
1016
1017 #[allow(clippy::too_many_arguments)]
1019 pub fn gen_sink_plan(
1020 self,
1021 sink_name: String,
1022 definition: String,
1023 properties: WithOptionsSecResolved,
1024 emit_on_window_close: bool,
1025 db_name: String,
1026 sink_from_table_name: String,
1027 format_desc: Option<SinkFormatDesc>,
1028 without_backfill: bool,
1029 target_table: Option<Arc<TableCatalog>>,
1030 partition_info: Option<PartitionComputeInfo>,
1031 user_specified_columns: bool,
1032 ) -> Result<StreamSink> {
1033 let stream_scan_type = if without_backfill {
1034 StreamScanType::UpstreamOnly
1035 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1036 StreamScanType::SnapshotBackfill
1038 } else if self.should_use_arrangement_backfill() {
1039 StreamScanType::ArrangementBackfill
1040 } else {
1041 StreamScanType::Backfill
1042 };
1043 assert_eq!(self.plan.convention(), Convention::Logical);
1044 let stream_plan =
1045 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1046 assert_eq!(stream_plan.plan.convention(), Convention::Stream);
1047 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1048 let columns = t.columns_without_rw_timestamp();
1049 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1050 });
1051 StreamSink::create(
1052 stream_plan,
1053 sink_name,
1054 db_name,
1055 sink_from_table_name,
1056 target_table,
1057 target_columns_to_plan_mapping,
1058 definition,
1059 properties,
1060 format_desc,
1061 partition_info,
1062 )
1063 }
1064}
1065
1066impl<P: PlanPhase> PlanRoot<P> {
1067 pub fn should_use_arrangement_backfill(&self) -> bool {
1068 let ctx = self.plan.ctx();
1069 let session_ctx = ctx.session_ctx();
1070 let arrangement_backfill_enabled = session_ctx
1071 .env()
1072 .streaming_config()
1073 .developer
1074 .enable_arrangement_backfill;
1075 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1076 }
1077
1078 pub fn should_use_snapshot_backfill(&self) -> bool {
1079 self.plan
1080 .ctx()
1081 .session_ctx()
1082 .config()
1083 .streaming_use_snapshot_backfill()
1084 }
1085
1086 pub fn target_columns_to_plan_mapping(
1088 &self,
1089 tar_cols: &[ColumnCatalog],
1090 user_specified_columns: bool,
1091 ) -> Vec<Option<usize>> {
1092 #[allow(clippy::disallowed_methods)]
1093 let visible_cols: Vec<(usize, String)> = self
1094 .out_fields
1095 .ones()
1096 .zip_eq(self.out_names.iter().cloned())
1097 .collect_vec();
1098
1099 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1100 let visible_col_idxes_by_name = visible_cols
1101 .iter()
1102 .map(|(i, name)| (name.as_ref(), *i))
1103 .collect::<BTreeMap<_, _>>();
1104
1105 tar_cols
1106 .iter()
1107 .enumerate()
1108 .filter(|(_, tar_col)| tar_col.can_dml())
1109 .map(|(tar_i, tar_col)| {
1110 if user_specified_columns {
1111 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1112 } else {
1113 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1114 }
1115 })
1116 .collect()
1117 }
1118}
1119
1120fn find_version_column_index(
1121 column_catalog: &Vec<ColumnCatalog>,
1122 version_column_name: String,
1123) -> Result<Option<usize>> {
1124 for (index, column) in column_catalog.iter().enumerate() {
1125 if column.column_desc.name == version_column_name {
1126 if let &DataType::Jsonb
1127 | &DataType::List(_)
1128 | &DataType::Struct(_)
1129 | &DataType::Bytea
1130 | &DataType::Boolean = column.data_type()
1131 {
1132 Err(ErrorCode::InvalidParameterValue(
1133 "The specified version column data type is invalid.".to_owned(),
1134 ))?
1135 }
1136 return Ok(Some(index));
1137 }
1138 }
1139 Err(ErrorCode::InvalidParameterValue(
1140 "The specified version column name is not in the current columns.".to_owned(),
1141 ))?
1142}
1143
1144fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
1145 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1146
1147 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1148 if let Some(error) = const_eval_rewriter.error {
1149 return Err(error);
1150 }
1151 Ok(plan)
1152}
1153
1154fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
1155 let mut v = TimestamptzExprFinder::default();
1156 plan.visit_exprs_recursive(&mut v);
1157 if v.has() {
1158 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1159 } else {
1160 Ok(plan)
1161 }
1162}
1163
1164fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
1165 if plan.node_type() == PlanNodeType::BatchExchange {
1166 return false;
1167 }
1168 is_candidate(plan)
1169 || plan
1170 .inputs()
1171 .iter()
1172 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1173}
1174
1175fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
1181 fn is_user_table(plan: &PlanRef) -> bool {
1182 plan.node_type() == PlanNodeType::BatchSeqScan
1183 }
1184
1185 fn is_log_table(plan: &PlanRef) -> bool {
1186 plan.node_type() == PlanNodeType::BatchLogSeqScan
1187 }
1188
1189 fn is_source(plan: &PlanRef) -> bool {
1190 plan.node_type() == PlanNodeType::BatchSource
1191 || plan.node_type() == PlanNodeType::BatchKafkaScan
1192 || plan.node_type() == PlanNodeType::BatchIcebergScan
1193 }
1194
1195 fn is_insert(plan: &PlanRef) -> bool {
1196 plan.node_type() == PlanNodeType::BatchInsert
1197 }
1198
1199 fn is_update(plan: &PlanRef) -> bool {
1200 plan.node_type() == PlanNodeType::BatchUpdate
1201 }
1202
1203 fn is_delete(plan: &PlanRef) -> bool {
1204 plan.node_type() == PlanNodeType::BatchDelete
1205 }
1206
1207 assert_eq!(plan.distribution(), &Distribution::Single);
1208 exist_and_no_exchange_before(&plan, is_user_table)
1209 || exist_and_no_exchange_before(&plan, is_source)
1210 || exist_and_no_exchange_before(&plan, is_insert)
1211 || exist_and_no_exchange_before(&plan, is_update)
1212 || exist_and_no_exchange_before(&plan, is_delete)
1213 || exist_and_no_exchange_before(&plan, is_log_table)
1214}
1215
1216fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
1219 fn is_user_table(plan: &PlanRef) -> bool {
1220 plan.node_type() == PlanNodeType::BatchSeqScan
1221 }
1222
1223 fn is_source(plan: &PlanRef) -> bool {
1224 plan.node_type() == PlanNodeType::BatchSource
1225 || plan.node_type() == PlanNodeType::BatchKafkaScan
1226 || plan.node_type() == PlanNodeType::BatchIcebergScan
1227 }
1228
1229 fn is_insert(plan: &PlanRef) -> bool {
1230 plan.node_type() == PlanNodeType::BatchInsert
1231 }
1232
1233 assert_eq!(plan.distribution(), &Distribution::Single);
1234 exist_and_no_exchange_before(&plan, is_user_table)
1235 || exist_and_no_exchange_before(&plan, is_source)
1236 || exist_and_no_exchange_before(&plan, is_insert)
1237}
1238
1239#[cfg(test)]
1240mod tests {
1241 use super::*;
1242 use crate::optimizer::plan_node::LogicalValues;
1243
1244 #[tokio::test]
1245 async fn test_as_subplan() {
1246 let ctx = OptimizerContext::mock().await;
1247 let values = LogicalValues::new(
1248 vec![],
1249 Schema::new(vec![
1250 Field::with_name(DataType::Int32, "v1"),
1251 Field::with_name(DataType::Varchar, "v2"),
1252 ]),
1253 ctx,
1254 )
1255 .into();
1256 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1257 let out_names = vec!["v1".into()];
1258 let root = PlanRoot::new_with_logical_plan(
1259 values,
1260 RequiredDist::Any,
1261 Order::any(),
1262 out_fields,
1263 out_names,
1264 );
1265 let subplan = root.into_unordered_subplan();
1266 assert_eq!(
1267 subplan.schema(),
1268 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1269 );
1270 }
1271}