1use std::num::NonZeroU32;
15use std::ops::DerefMut;
16use std::sync::Arc;
17
18pub mod plan_node;
19
20use plan_node::StreamFilter;
21pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
22
23pub mod property;
24
25mod delta_join_solver;
26mod heuristic_optimizer;
27mod plan_rewriter;
28
29mod plan_visitor;
30
31pub use plan_visitor::{
32 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
33 SysTableVisitor,
34};
35
36pub mod backfill_order_strategy;
37mod logical_optimization;
38mod optimizer_context;
39pub mod plan_expr_rewriter;
40mod plan_expr_visitor;
41mod rule;
42
43use std::collections::{BTreeMap, HashMap};
44use std::marker::PhantomData;
45
46use educe::Educe;
47use fixedbitset::FixedBitSet;
48use itertools::Itertools as _;
49pub use logical_optimization::*;
50pub use optimizer_context::*;
51use plan_expr_rewriter::ConstEvalRewriter;
52use property::Order;
53use risingwave_common::bail;
54use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
55use risingwave_common::types::DataType;
56use risingwave_common::util::column_index_mapping::ColIndexMapping;
57use risingwave_common::util::iter_util::ZipEqDebug;
58use risingwave_connector::WithPropertiesExt;
59use risingwave_connector::sink::catalog::SinkFormatDesc;
60use risingwave_pb::stream_plan::StreamScanType;
61
62use self::heuristic_optimizer::ApplyOrder;
63use self::plan_node::generic::{self, PhysicalPlanRef};
64use self::plan_node::{
65 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
66 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
67 ToStreamContext, stream_enforce_eowc_requirement,
68};
69#[cfg(debug_assertions)]
70use self::plan_visitor::InputRefValidator;
71use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
72use self::property::{Cardinality, RequiredDist};
73use self::rule::*;
74use crate::TableCatalog;
75use crate::catalog::table_catalog::TableType;
76use crate::catalog::{DatabaseId, SchemaId};
77use crate::error::{ErrorCode, Result};
78use crate::expr::TimestamptzExprFinder;
79use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
80use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
81use crate::optimizer::plan_node::{
82 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
83 StreamExchange, StreamPlanRef, StreamUnion, ToStream, VisitExprsRecursive,
84};
85use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
86use crate::optimizer::property::Distribution;
87use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
88
89#[derive(Educe)]
100#[educe(Debug, Clone)]
101pub struct PlanRoot<P: PlanPhase> {
102 pub plan: PlanRef<P::Convention>,
104 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
106 _phase: PhantomData<P>,
107 required_dist: RequiredDist,
108 required_order: Order,
109 out_fields: FixedBitSet,
110 out_names: Vec<String>,
111}
112
113pub trait PlanPhase {
119 type Convention: ConventionMarker;
120}
121
122macro_rules! for_all_phase {
123 () => {
124 for_all_phase! {
125 { Logical, $crate::optimizer::plan_node::Logical },
126 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
127 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
128 { Batch, $crate::optimizer::plan_node::Batch },
129 { Stream, $crate::optimizer::plan_node::Stream }
130 }
131 };
132 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
133 $(
134 paste::paste! {
135 pub struct [< PlanPhase$phase >];
136 impl PlanPhase for [< PlanPhase$phase >] {
137 type Convention = $convention;
138 }
139 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
140 }
141 )+
142 }
143}
144
145for_all_phase!();
146
147impl LogicalPlanRoot {
148 pub fn new_with_logical_plan(
149 plan: LogicalPlanRef,
150 required_dist: RequiredDist,
151 required_order: Order,
152 out_fields: FixedBitSet,
153 out_names: Vec<String>,
154 ) -> Self {
155 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
156 }
157}
158
159impl BatchPlanRoot {
160 pub fn new_with_batch_plan(
161 plan: BatchPlanRef,
162 required_dist: RequiredDist,
163 required_order: Order,
164 out_fields: FixedBitSet,
165 out_names: Vec<String>,
166 ) -> Self {
167 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
168 }
169}
170
171impl<P: PlanPhase> PlanRoot<P> {
172 fn new_inner(
173 plan: PlanRef<P::Convention>,
174 required_dist: RequiredDist,
175 required_order: Order,
176 out_fields: FixedBitSet,
177 out_names: Vec<String>,
178 ) -> Self {
179 let input_schema = plan.schema();
180 assert_eq!(input_schema.fields().len(), out_fields.len());
181 assert_eq!(out_fields.count_ones(..), out_names.len());
182
183 Self {
184 plan,
185 _phase: PhantomData,
186 required_dist,
187 required_order,
188 out_fields,
189 out_names,
190 }
191 }
192
193 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
194 PlanRoot {
195 plan,
196 _phase: PhantomData,
197 required_dist: self.required_dist,
198 required_order: self.required_order,
199 out_fields: self.out_fields,
200 out_names: self.out_names,
201 }
202 }
203
204 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
209 if out_names.len() != self.out_fields.count_ones(..) {
210 Err(ErrorCode::InvalidInputSyntax(
211 "number of column names does not match number of columns".to_owned(),
212 ))?
213 }
214 self.out_names = out_names;
215 Ok(())
216 }
217
218 pub fn schema(&self) -> Schema {
220 Schema {
223 fields: self
224 .out_fields
225 .ones()
226 .map(|i| self.plan.schema().fields()[i].clone())
227 .zip_eq_debug(&self.out_names)
228 .map(|(field, name)| Field {
229 name: name.clone(),
230 ..field
231 })
232 .collect(),
233 }
234 }
235}
236
237impl LogicalPlanRoot {
238 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
242 if self.out_fields.count_ones(..) == self.out_fields.len() {
243 return self.plan;
244 }
245 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
246 }
247
248 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
252 use generic::Agg;
253 use plan_node::PlanAggCall;
254 use risingwave_common::types::ListValue;
255 use risingwave_expr::aggregate::PbAggKind;
256
257 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
258 use crate::utils::{Condition, IndexSet};
259
260 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
261 bail!("subquery must return only one column");
262 };
263 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
264 let return_type = DataType::List(input_column_type.clone().into());
265 let agg = Agg::new(
266 vec![PlanAggCall {
267 agg_type: PbAggKind::ArrayAgg.into(),
268 return_type: return_type.clone(),
269 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
270 distinct: false,
271 order_by: self.required_order.column_orders,
272 filter: Condition::true_cond(),
273 direct_args: vec![],
274 }],
275 IndexSet::empty(),
276 self.plan,
277 );
278 Ok(LogicalProject::create(
279 agg.into(),
280 vec![
281 FunctionCall::new(
282 ExprType::Coalesce,
283 vec![
284 InputRef::new(0, return_type).into(),
285 ExprImpl::literal_list(
286 ListValue::empty(&input_column_type),
287 input_column_type,
288 ),
289 ],
290 )
291 .unwrap()
292 .into(),
293 ],
294 ))
295 }
296
297 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
299 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
300 Ok(self)
301 }
302
303 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
305 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
306 Ok(self.into_phase(plan))
307 }
308
309 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
310 self.gen_optimized_logical_plan_for_batch()?
311 .gen_batch_plan()
312 }
313}
314
315impl BatchOptimizedLogicalPlanRoot {
316 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
319 return Err(ErrorCode::NotSupported(
320 "do not support temporal join for batch queries".to_owned(),
321 "please use temporal join in streaming queries".to_owned(),
322 )
323 .into());
324 }
325
326 let ctx = self.plan.ctx();
327 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
329
330 plan = const_eval_exprs(plan)?;
332
333 if ctx.is_explain_trace() {
334 ctx.trace("Const eval exprs:");
335 ctx.trace(plan.explain_to_string());
336 }
337
338 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
340 if ctx.is_explain_trace() {
341 ctx.trace("To Batch Plan:");
342 ctx.trace(plan.explain_to_string());
343 }
344
345 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
346 "Merge BatchProject",
347 vec![BatchProjectMergeRule::create()],
348 ApplyOrder::BottomUp,
349 ))?;
350
351 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
353
354 if ctx.is_explain_trace() {
355 ctx.trace("Inline Session Timezone:");
356 ctx.trace(plan.explain_to_string());
357 }
358
359 #[cfg(debug_assertions)]
360 InputRefValidator.validate(plan.clone());
361 assert!(
362 *plan.distribution() == Distribution::Single,
363 "{}",
364 plan.explain_to_string()
365 );
366 assert!(
367 !has_batch_exchange(plan.clone()),
368 "{}",
369 plan.explain_to_string()
370 );
371
372 let ctx = plan.ctx();
373 if ctx.is_explain_trace() {
374 ctx.trace("To Batch Physical Plan:");
375 ctx.trace(plan.explain_to_string());
376 }
377
378 Ok(self.into_phase(plan))
379 }
380}
381
382impl BatchPlanRoot {
383 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
385 self.required_dist = RequiredDist::single();
386 let mut plan = self.plan;
387
388 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
390
391 if self.out_fields.count_ones(..) != self.out_fields.len() {
393 plan =
394 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
395 }
396
397 let ctx = plan.ctx();
398 if ctx.is_explain_trace() {
399 ctx.trace("To Batch Distributed Plan:");
400 ctx.trace(plan.explain_to_string());
401 }
402 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
403 plan =
404 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
405 }
406
407 let plan = plan.optimize_by_rules(&OptimizationStage::new(
409 "Push Limit To Scan",
410 vec![BatchPushLimitToScanRule::create()],
411 ApplyOrder::BottomUp,
412 ))?;
413
414 let plan = plan.optimize_by_rules(&OptimizationStage::new(
415 "Iceberg Count Star",
416 vec![BatchIcebergCountStar::create()],
417 ApplyOrder::TopDown,
418 ))?;
419
420 let plan = plan.optimize_by_rules(&OptimizationStage::new(
423 "Iceberg Predicate Pushdown",
424 vec![BatchIcebergPredicatePushDownRule::create()],
425 ApplyOrder::BottomUp,
426 ))?;
427
428 Ok(plan)
429 }
430
431 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
433 let mut plan = self.plan;
434
435 plan = plan.to_local_with_order_required(&self.required_order)?;
437
438 let insert_exchange = match plan.distribution() {
441 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
442 _ => true,
443 };
444 if insert_exchange {
445 plan =
446 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
447 }
448
449 if self.out_fields.count_ones(..) != self.out_fields.len() {
451 plan =
452 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
453 }
454
455 let ctx = plan.ctx();
456 if ctx.is_explain_trace() {
457 ctx.trace("To Batch Local Plan:");
458 ctx.trace(plan.explain_to_string());
459 }
460
461 let plan = plan.optimize_by_rules(&OptimizationStage::new(
463 "Push Limit To Scan",
464 vec![BatchPushLimitToScanRule::create()],
465 ApplyOrder::BottomUp,
466 ))?;
467
468 let plan = plan.optimize_by_rules(&OptimizationStage::new(
469 "Iceberg Count Star",
470 vec![BatchIcebergCountStar::create()],
471 ApplyOrder::TopDown,
472 ))?;
473 Ok(plan)
474 }
475}
476
477impl LogicalPlanRoot {
478 fn gen_optimized_stream_plan(
480 self,
481 emit_on_window_close: bool,
482 allow_snapshot_backfill: bool,
483 ) -> Result<StreamOptimizedLogicalPlanRoot> {
484 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
485 StreamScanType::SnapshotBackfill
486 } else if self.should_use_arrangement_backfill() {
487 StreamScanType::ArrangementBackfill
488 } else {
489 StreamScanType::Backfill
490 };
491 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
492 }
493
494 fn gen_optimized_stream_plan_inner(
495 self,
496 emit_on_window_close: bool,
497 stream_scan_type: StreamScanType,
498 ) -> Result<StreamOptimizedLogicalPlanRoot> {
499 let ctx = self.plan.ctx();
500 let _explain_trace = ctx.is_explain_trace();
501
502 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
503
504 let mut plan = optimized_plan
505 .plan
506 .clone()
507 .optimize_by_rules(&OptimizationStage::new(
508 "Merge StreamProject",
509 vec![StreamProjectMergeRule::create()],
510 ApplyOrder::BottomUp,
511 ))?;
512
513 if ctx
514 .session_ctx()
515 .config()
516 .streaming_separate_consecutive_join()
517 {
518 plan = plan.optimize_by_rules(&OptimizationStage::new(
519 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
520 vec![SeparateConsecutiveJoinRule::create()],
521 ApplyOrder::BottomUp,
522 ))?;
523 }
524
525 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
529 plan = plan.optimize_by_rules(&OptimizationStage::new(
530 "Add Logstore for Unaligned join",
531 vec![AddLogstoreRule::create()],
532 ApplyOrder::BottomUp,
533 ))?;
534 }
535
536 if ctx.session_ctx().config().streaming_enable_delta_join() {
537 plan = plan.optimize_by_rules(&OptimizationStage::new(
540 "To IndexDeltaJoin",
541 vec![IndexDeltaJoinRule::create()],
542 ApplyOrder::BottomUp,
543 ))?;
544 }
545 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
547
548 if ctx.is_explain_trace() {
549 ctx.trace("Inline session timezone:");
550 ctx.trace(plan.explain_to_string());
551 }
552
553 plan = const_eval_exprs(plan)?;
555
556 if ctx.is_explain_trace() {
557 ctx.trace("Const eval exprs:");
558 ctx.trace(plan.explain_to_string());
559 }
560
561 #[cfg(debug_assertions)]
562 InputRefValidator.validate(plan.clone());
563
564 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
565 return Err(ErrorCode::NotSupported(
566 "exist dangling temporal scan".to_owned(),
567 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
568 ).into());
569 }
570
571 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
572 return Err(ErrorCode::NotSupported(
573 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
574 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
575 ).into());
576 }
577
578 Ok(optimized_plan.into_phase(plan))
579 }
580
581 fn gen_stream_plan(
583 self,
584 emit_on_window_close: bool,
585 stream_scan_type: StreamScanType,
586 ) -> Result<StreamOptimizedLogicalPlanRoot> {
587 let ctx = self.plan.ctx();
588 let explain_trace = ctx.is_explain_trace();
589
590 let plan = {
591 {
592 if !ctx
593 .session_ctx()
594 .config()
595 .streaming_allow_jsonb_in_stream_key()
596 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
597 {
598 return Err(ErrorCode::NotSupported(
599 err,
600 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
601 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
602 ).into());
603 }
604 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
605 let (plan, out_col_change) = {
606 let (plan, out_col_change) = optimized_plan
607 .plan
608 .logical_rewrite_for_stream(&mut Default::default())?;
609 if out_col_change.is_injective() {
610 (plan, out_col_change)
611 } else {
612 let mut output_indices = (0..plan.schema().len()).collect_vec();
613 #[allow(unused_assignments)]
614 let (mut map, mut target_size) = out_col_change.into_parts();
615
616 target_size = plan.schema().len();
619 let mut tar_exists = vec![false; target_size];
620 for i in map.iter_mut().flatten() {
621 if tar_exists[*i] {
622 output_indices.push(*i);
623 *i = target_size;
624 target_size += 1;
625 } else {
626 tar_exists[*i] = true;
627 }
628 }
629 let plan =
630 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
631 let out_col_change = ColIndexMapping::new(map, target_size);
632 (plan.into(), out_col_change)
633 }
634 };
635
636 if explain_trace {
637 ctx.trace("Logical Rewrite For Stream:");
638 ctx.trace(plan.explain_to_string());
639 }
640
641 optimized_plan.required_dist =
642 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
643 optimized_plan.required_order = out_col_change
644 .rewrite_required_order(&optimized_plan.required_order)
645 .unwrap();
646 optimized_plan.out_fields =
647 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
648 let mut plan = plan.to_stream_with_dist_required(
649 &optimized_plan.required_dist,
650 &mut ToStreamContext::new_with_stream_scan_type(
651 emit_on_window_close,
652 stream_scan_type,
653 ),
654 )?;
655 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
656 optimized_plan.into_phase(plan)
657 }
658 };
659
660 if explain_trace {
661 ctx.trace("To Stream Plan:");
662 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
664 }
665 Ok(plan)
666 }
667
668 fn compute_cardinality(&self) -> Cardinality {
672 CardinalityVisitor.visit(self.plan.clone())
673 }
674
675 pub fn gen_table_plan(
677 self,
678 context: OptimizerContextRef,
679 table_name: String,
680 database_id: DatabaseId,
681 schema_id: SchemaId,
682 CreateTableInfo {
683 columns,
684 pk_column_ids,
685 row_id_index,
686 watermark_descs,
687 source_catalog,
688 version,
689 }: CreateTableInfo,
690 CreateTableProps {
691 definition,
692 append_only,
693 on_conflict,
694 with_version_column,
695 webhook_info,
696 engine,
697 }: CreateTableProps,
698 ) -> Result<StreamMaterialize> {
699 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
701
702 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
703
704 let pk_column_indices = {
705 let mut id_to_idx = HashMap::new();
706
707 columns.iter().enumerate().for_each(|(idx, c)| {
708 id_to_idx.insert(c.column_id(), idx);
709 });
710 pk_column_ids
711 .iter()
712 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
714 };
715
716 fn inject_project_for_generated_column_if_needed(
717 columns: &[ColumnCatalog],
718 node: StreamPlanRef,
719 ) -> Result<StreamPlanRef> {
720 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
721 if let Some(exprs) = exprs {
722 let logical_project = generic::Project::new(exprs, node);
723 return Ok(StreamProject::new(logical_project).into());
724 }
725 Ok(node)
726 }
727
728 #[derive(PartialEq, Debug, Copy, Clone)]
729 enum PrimaryKeyKind {
730 UserDefinedPrimaryKey,
731 NonAppendOnlyRowIdPk,
732 AppendOnlyRowIdPk,
733 }
734
735 fn inject_dml_node(
736 columns: &[ColumnCatalog],
737 append_only: bool,
738 stream_plan: StreamPlanRef,
739 pk_column_indices: &[usize],
740 kind: PrimaryKeyKind,
741 column_descs: Vec<ColumnDesc>,
742 ) -> Result<StreamPlanRef> {
743 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
744
745 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
747
748 dml_node = match kind {
749 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
750 RequiredDist::hash_shard(pk_column_indices)
751 .streaming_enforce_if_not_satisfies(dml_node)?
752 }
753 PrimaryKeyKind::AppendOnlyRowIdPk => {
754 StreamExchange::new_no_shuffle(dml_node).into()
755 }
756 };
757
758 Ok(dml_node)
759 }
760
761 let kind = if let Some(row_id_index) = row_id_index {
762 assert_eq!(
763 pk_column_indices.iter().exactly_one().copied().unwrap(),
764 row_id_index
765 );
766 if append_only {
767 PrimaryKeyKind::AppendOnlyRowIdPk
768 } else {
769 PrimaryKeyKind::NonAppendOnlyRowIdPk
770 }
771 } else {
772 PrimaryKeyKind::UserDefinedPrimaryKey
773 };
774
775 let column_descs: Vec<ColumnDesc> = columns
776 .iter()
777 .filter(|&c| c.can_dml())
778 .map(|c| c.column_desc.clone())
779 .collect();
780
781 let mut not_null_idxs = vec![];
782 for (idx, column) in column_descs.iter().enumerate() {
783 if !column.nullable {
784 not_null_idxs.push(idx);
785 }
786 }
787
788 let version_column_index = if let Some(version_column) = with_version_column {
789 find_version_column_index(&columns, version_column)?
790 } else {
791 None
792 };
793
794 let with_external_source = source_catalog.is_some();
795 let union_inputs = if with_external_source {
796 let mut external_source_node = stream_plan.plan;
797 external_source_node =
798 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
799 external_source_node = match kind {
800 PrimaryKeyKind::UserDefinedPrimaryKey => {
801 RequiredDist::hash_shard(&pk_column_indices)
802 .streaming_enforce_if_not_satisfies(external_source_node)?
803 }
804
805 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
806 StreamExchange::new_no_shuffle(external_source_node).into()
807 }
808 };
809
810 let dummy_source_node = LogicalSource::new(
811 None,
812 columns.clone(),
813 row_id_index,
814 SourceNodeKind::CreateTable,
815 context.clone(),
816 None,
817 )
818 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
819
820 let dml_node = inject_dml_node(
821 &columns,
822 append_only,
823 dummy_source_node,
824 &pk_column_indices,
825 kind,
826 column_descs,
827 )?;
828
829 vec![external_source_node, dml_node]
830 } else {
831 let dml_node = inject_dml_node(
832 &columns,
833 append_only,
834 stream_plan.plan,
835 &pk_column_indices,
836 kind,
837 column_descs,
838 )?;
839
840 vec![dml_node]
841 };
842
843 let dists = union_inputs
844 .iter()
845 .map(|input| input.distribution())
846 .unique()
847 .collect_vec();
848
849 let dist = match &dists[..] {
850 &[Distribution::SomeShard, Distribution::HashShard(_)]
851 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
852 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
853 dist.clone()
854 }
855 _ => {
856 unreachable!()
857 }
858 };
859
860 let mut stream_plan = StreamUnion::new_with_dist(
861 Union {
862 all: true,
863 inputs: union_inputs,
864 source_col: None,
865 },
866 dist.clone(),
867 )
868 .into();
869
870 if !watermark_descs.is_empty() {
872 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
873 }
874
875 if let Some(row_id_index) = row_id_index {
877 match kind {
878 PrimaryKeyKind::UserDefinedPrimaryKey => {
879 unreachable!()
880 }
881 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
882 stream_plan = StreamRowIdGen::new_with_dist(
883 stream_plan,
884 row_id_index,
885 Distribution::HashShard(vec![row_id_index]),
886 )
887 .into();
888 }
889 }
890 }
891
892 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
893
894 if let ConflictBehavior::IgnoreConflict = conflict_behavior
895 && version_column_index.is_some()
896 {
897 Err(ErrorCode::InvalidParameterValue(
898 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
899 ))?
900 }
901
902 let retention_seconds = context.with_options().retention_seconds();
903
904 let table_required_dist = {
905 let mut bitset = FixedBitSet::with_capacity(columns.len());
906 for idx in &pk_column_indices {
907 bitset.insert(*idx);
908 }
909 RequiredDist::ShardByKey(bitset)
910 };
911
912 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
913
914 if !not_null_idxs.is_empty() {
915 stream_plan =
916 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
917 }
918
919 let refreshable = source_catalog
921 .as_ref()
922 .map(|catalog| catalog.with_properties.is_batch_connector())
923 .unwrap_or(false);
924
925 if refreshable && row_id_index.is_some() {
927 return Err(crate::error::ErrorCode::BindError(
928 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
929 .to_owned(),
930 )
931 .into());
932 }
933
934 StreamMaterialize::create_for_table(
935 stream_plan,
936 table_name,
937 database_id,
938 schema_id,
939 table_required_dist,
940 Order::any(),
941 columns,
942 definition,
943 conflict_behavior,
944 version_column_index,
945 pk_column_indices,
946 row_id_index,
947 version,
948 retention_seconds,
949 webhook_info,
950 engine,
951 refreshable,
952 )
953 }
954
955 pub fn gen_materialize_plan(
957 self,
958 database_id: DatabaseId,
959 schema_id: SchemaId,
960 mv_name: String,
961 definition: String,
962 emit_on_window_close: bool,
963 ) -> Result<StreamMaterialize> {
964 let cardinality = self.compute_cardinality();
965 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
966 StreamMaterialize::create(
967 stream_plan,
968 mv_name,
969 database_id,
970 schema_id,
971 definition,
972 TableType::MaterializedView,
973 cardinality,
974 None,
975 )
976 }
977
978 pub fn gen_index_plan(
980 self,
981 index_name: String,
982 database_id: DatabaseId,
983 schema_id: SchemaId,
984 definition: String,
985 retention_seconds: Option<NonZeroU32>,
986 ) -> Result<StreamMaterialize> {
987 let cardinality = self.compute_cardinality();
988 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
989
990 StreamMaterialize::create(
991 stream_plan,
992 index_name,
993 database_id,
994 schema_id,
995 definition,
996 TableType::Index,
997 cardinality,
998 retention_seconds,
999 )
1000 }
1001
1002 #[allow(clippy::too_many_arguments)]
1004 pub fn gen_sink_plan(
1005 self,
1006 sink_name: String,
1007 definition: String,
1008 properties: WithOptionsSecResolved,
1009 emit_on_window_close: bool,
1010 db_name: String,
1011 sink_from_table_name: String,
1012 format_desc: Option<SinkFormatDesc>,
1013 without_backfill: bool,
1014 target_table: Option<Arc<TableCatalog>>,
1015 partition_info: Option<PartitionComputeInfo>,
1016 user_specified_columns: bool,
1017 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1018 ) -> Result<StreamSink> {
1019 let stream_scan_type = if without_backfill {
1020 StreamScanType::UpstreamOnly
1021 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1022 StreamScanType::SnapshotBackfill
1024 } else if self.should_use_arrangement_backfill() {
1025 StreamScanType::ArrangementBackfill
1026 } else {
1027 StreamScanType::Backfill
1028 };
1029 if auto_refresh_schema_from_table.is_some()
1030 && stream_scan_type != StreamScanType::ArrangementBackfill
1031 {
1032 return Err(ErrorCode::InvalidInputSyntax(format!(
1033 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1034 stream_scan_type
1035 ))
1036 .into());
1037 }
1038 let stream_plan =
1039 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1040 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1041 let columns = t.columns_without_rw_timestamp();
1042 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1043 });
1044
1045 StreamSink::create(
1046 stream_plan,
1047 sink_name,
1048 db_name,
1049 sink_from_table_name,
1050 target_table,
1051 target_columns_to_plan_mapping,
1052 definition,
1053 properties,
1054 format_desc,
1055 partition_info,
1056 auto_refresh_schema_from_table,
1057 )
1058 }
1059}
1060
1061impl<P: PlanPhase> PlanRoot<P> {
1062 pub fn should_use_arrangement_backfill(&self) -> bool {
1063 let ctx = self.plan.ctx();
1064 let session_ctx = ctx.session_ctx();
1065 let arrangement_backfill_enabled = session_ctx
1066 .env()
1067 .streaming_config()
1068 .developer
1069 .enable_arrangement_backfill;
1070 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1071 }
1072
1073 pub fn should_use_snapshot_backfill(&self) -> bool {
1074 self.plan
1075 .ctx()
1076 .session_ctx()
1077 .config()
1078 .streaming_use_snapshot_backfill()
1079 }
1080
1081 pub fn target_columns_to_plan_mapping(
1083 &self,
1084 tar_cols: &[ColumnCatalog],
1085 user_specified_columns: bool,
1086 ) -> Vec<Option<usize>> {
1087 #[allow(clippy::disallowed_methods)]
1088 let visible_cols: Vec<(usize, String)> = self
1089 .out_fields
1090 .ones()
1091 .zip_eq(self.out_names.iter().cloned())
1092 .collect_vec();
1093
1094 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1095 let visible_col_idxes_by_name = visible_cols
1096 .iter()
1097 .map(|(i, name)| (name.as_ref(), *i))
1098 .collect::<BTreeMap<_, _>>();
1099
1100 tar_cols
1101 .iter()
1102 .enumerate()
1103 .filter(|(_, tar_col)| tar_col.can_dml())
1104 .map(|(tar_i, tar_col)| {
1105 if user_specified_columns {
1106 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1107 } else {
1108 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1109 }
1110 })
1111 .collect()
1112 }
1113}
1114
1115fn find_version_column_index(
1116 column_catalog: &Vec<ColumnCatalog>,
1117 version_column_name: String,
1118) -> Result<Option<usize>> {
1119 for (index, column) in column_catalog.iter().enumerate() {
1120 if column.column_desc.name == version_column_name {
1121 if let &DataType::Jsonb
1122 | &DataType::List(_)
1123 | &DataType::Struct(_)
1124 | &DataType::Bytea
1125 | &DataType::Boolean = column.data_type()
1126 {
1127 Err(ErrorCode::InvalidParameterValue(
1128 "The specified version column data type is invalid.".to_owned(),
1129 ))?
1130 }
1131 return Ok(Some(index));
1132 }
1133 }
1134 Err(ErrorCode::InvalidParameterValue(
1135 "The specified version column name is not in the current columns.".to_owned(),
1136 ))?
1137}
1138
1139fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1140 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1141
1142 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1143 if let Some(error) = const_eval_rewriter.error {
1144 return Err(error);
1145 }
1146 Ok(plan)
1147}
1148
1149fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1150 ctx: OptimizerContextRef,
1151 plan: PlanRef<C>,
1152) -> Result<PlanRef<C>> {
1153 let mut v = TimestamptzExprFinder::default();
1154 plan.visit_exprs_recursive(&mut v);
1155 if v.has() {
1156 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1157 } else {
1158 Ok(plan)
1159 }
1160}
1161
1162fn exist_and_no_exchange_before(
1163 plan: &BatchPlanRef,
1164 is_candidate: fn(&BatchPlanRef) -> bool,
1165) -> bool {
1166 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1167 return false;
1168 }
1169 is_candidate(plan)
1170 || plan
1171 .inputs()
1172 .iter()
1173 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1174}
1175
1176fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1182 fn is_user_table(plan: &BatchPlanRef) -> bool {
1183 plan.node_type() == BatchPlanNodeType::BatchSeqScan
1184 }
1185
1186 fn is_log_table(plan: &BatchPlanRef) -> bool {
1187 plan.node_type() == BatchPlanNodeType::BatchLogSeqScan
1188 }
1189
1190 fn is_source(plan: &BatchPlanRef) -> bool {
1191 plan.node_type() == BatchPlanNodeType::BatchSource
1192 || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1193 || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1194 }
1195
1196 fn is_insert(plan: &BatchPlanRef) -> bool {
1197 plan.node_type() == BatchPlanNodeType::BatchInsert
1198 }
1199
1200 fn is_update(plan: &BatchPlanRef) -> bool {
1201 plan.node_type() == BatchPlanNodeType::BatchUpdate
1202 }
1203
1204 fn is_delete(plan: &BatchPlanRef) -> bool {
1205 plan.node_type() == BatchPlanNodeType::BatchDelete
1206 }
1207
1208 assert_eq!(plan.distribution(), &Distribution::Single);
1209 exist_and_no_exchange_before(&plan, is_user_table)
1210 || exist_and_no_exchange_before(&plan, is_source)
1211 || exist_and_no_exchange_before(&plan, is_insert)
1212 || exist_and_no_exchange_before(&plan, is_update)
1213 || exist_and_no_exchange_before(&plan, is_delete)
1214 || exist_and_no_exchange_before(&plan, is_log_table)
1215}
1216
1217fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1220 fn is_user_table(plan: &BatchPlanRef) -> bool {
1221 plan.node_type() == BatchPlanNodeType::BatchSeqScan
1222 }
1223
1224 fn is_source(plan: &BatchPlanRef) -> bool {
1225 plan.node_type() == BatchPlanNodeType::BatchSource
1226 || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1227 || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1228 }
1229
1230 fn is_insert(plan: &BatchPlanRef) -> bool {
1231 plan.node_type() == BatchPlanNodeType::BatchInsert
1232 }
1233
1234 assert_eq!(plan.distribution(), &Distribution::Single);
1235 exist_and_no_exchange_before(&plan, is_user_table)
1236 || exist_and_no_exchange_before(&plan, is_source)
1237 || exist_and_no_exchange_before(&plan, is_insert)
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use super::*;
1243 use crate::optimizer::plan_node::LogicalValues;
1244
1245 #[tokio::test]
1246 async fn test_as_subplan() {
1247 let ctx = OptimizerContext::mock().await;
1248 let values = LogicalValues::new(
1249 vec![],
1250 Schema::new(vec![
1251 Field::with_name(DataType::Int32, "v1"),
1252 Field::with_name(DataType::Varchar, "v2"),
1253 ]),
1254 ctx,
1255 )
1256 .into();
1257 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1258 let out_names = vec!["v1".into()];
1259 let root = PlanRoot::new_with_logical_plan(
1260 values,
1261 RequiredDist::Any,
1262 Order::any(),
1263 out_fields,
1264 out_names,
1265 );
1266 let subplan = root.into_unordered_subplan();
1267 assert_eq!(
1268 subplan.schema(),
1269 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1270 );
1271 }
1272}