1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
36 SysTableVisitor,
37};
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70 ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86 StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87 ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
90use crate::optimizer::property::Distribution;
91use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
92
93#[derive(Educe)]
104#[educe(Debug, Clone)]
105pub struct PlanRoot<P: PlanPhase> {
106 pub plan: PlanRef<P::Convention>,
108 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
110 _phase: PhantomData<P>,
111 required_dist: RequiredDist,
112 required_order: Order,
113 out_fields: FixedBitSet,
114 out_names: Vec<String>,
115}
116
117pub trait PlanPhase {
123 type Convention: ConventionMarker;
124}
125
126macro_rules! for_all_phase {
127 () => {
128 for_all_phase! {
129 { Logical, $crate::optimizer::plan_node::Logical },
130 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
131 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
132 { Batch, $crate::optimizer::plan_node::Batch },
133 { Stream, $crate::optimizer::plan_node::Stream }
134 }
135 };
136 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
137 $(
138 paste::paste! {
139 pub struct [< PlanPhase$phase >];
140 impl PlanPhase for [< PlanPhase$phase >] {
141 type Convention = $convention;
142 }
143 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
144 }
145 )+
146 }
147}
148
149for_all_phase!();
150
151impl LogicalPlanRoot {
152 pub fn new_with_logical_plan(
153 plan: LogicalPlanRef,
154 required_dist: RequiredDist,
155 required_order: Order,
156 out_fields: FixedBitSet,
157 out_names: Vec<String>,
158 ) -> Self {
159 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
160 }
161}
162
163impl BatchPlanRoot {
164 pub fn new_with_batch_plan(
165 plan: BatchPlanRef,
166 required_dist: RequiredDist,
167 required_order: Order,
168 out_fields: FixedBitSet,
169 out_names: Vec<String>,
170 ) -> Self {
171 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
172 }
173}
174
175impl<P: PlanPhase> PlanRoot<P> {
176 fn new_inner(
177 plan: PlanRef<P::Convention>,
178 required_dist: RequiredDist,
179 required_order: Order,
180 out_fields: FixedBitSet,
181 out_names: Vec<String>,
182 ) -> Self {
183 let input_schema = plan.schema();
184 assert_eq!(input_schema.fields().len(), out_fields.len());
185 assert_eq!(out_fields.count_ones(..), out_names.len());
186
187 Self {
188 plan,
189 _phase: PhantomData,
190 required_dist,
191 required_order,
192 out_fields,
193 out_names,
194 }
195 }
196
197 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
198 PlanRoot {
199 plan,
200 _phase: PhantomData,
201 required_dist: self.required_dist,
202 required_order: self.required_order,
203 out_fields: self.out_fields,
204 out_names: self.out_names,
205 }
206 }
207
208 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
213 if out_names.len() != self.out_fields.count_ones(..) {
214 Err(ErrorCode::InvalidInputSyntax(
215 "number of column names does not match number of columns".to_owned(),
216 ))?
217 }
218 self.out_names = out_names;
219 Ok(())
220 }
221
222 pub fn schema(&self) -> Schema {
224 Schema {
227 fields: self
228 .out_fields
229 .ones()
230 .map(|i| self.plan.schema().fields()[i].clone())
231 .zip_eq_debug(&self.out_names)
232 .map(|(field, name)| Field {
233 name: name.clone(),
234 ..field
235 })
236 .collect(),
237 }
238 }
239}
240
241impl LogicalPlanRoot {
242 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
246 if self.out_fields.count_ones(..) == self.out_fields.len() {
247 return self.plan;
248 }
249 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
250 }
251
252 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
256 use generic::Agg;
257 use plan_node::PlanAggCall;
258 use risingwave_common::types::ListValue;
259 use risingwave_expr::aggregate::PbAggKind;
260
261 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
262 use crate::utils::{Condition, IndexSet};
263
264 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
265 bail!("subquery must return only one column");
266 };
267 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
268 let return_type = DataType::list(input_column_type.clone());
269 let agg = Agg::new(
270 vec![PlanAggCall {
271 agg_type: PbAggKind::ArrayAgg.into(),
272 return_type: return_type.clone(),
273 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
274 distinct: false,
275 order_by: self.required_order.column_orders,
276 filter: Condition::true_cond(),
277 direct_args: vec![],
278 }],
279 IndexSet::empty(),
280 self.plan,
281 );
282 Ok(LogicalProject::create(
283 agg.into(),
284 vec![
285 FunctionCall::new(
286 ExprType::Coalesce,
287 vec![
288 InputRef::new(0, return_type).into(),
289 ExprImpl::literal_list(
290 ListValue::empty(&input_column_type),
291 input_column_type,
292 ),
293 ],
294 )
295 .unwrap()
296 .into(),
297 ],
298 ))
299 }
300
301 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
303 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
304 Ok(self)
305 }
306
307 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
309 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
310 Ok(self.into_phase(plan))
311 }
312
313 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
314 self.gen_optimized_logical_plan_for_batch()?
315 .gen_batch_plan()
316 }
317}
318
319impl BatchOptimizedLogicalPlanRoot {
320 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
322 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
323 return Err(ErrorCode::NotSupported(
324 "do not support temporal join for batch queries".to_owned(),
325 "please use temporal join in streaming queries".to_owned(),
326 )
327 .into());
328 }
329
330 let ctx = self.plan.ctx();
331 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
333
334 plan = const_eval_exprs(plan)?;
336
337 if ctx.is_explain_trace() {
338 ctx.trace("Const eval exprs:");
339 ctx.trace(plan.explain_to_string());
340 }
341
342 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
344 if ctx.is_explain_trace() {
345 ctx.trace("To Batch Plan:");
346 ctx.trace(plan.explain_to_string());
347 }
348
349 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
350 "Merge BatchProject",
351 vec![BatchProjectMergeRule::create()],
352 ApplyOrder::BottomUp,
353 ))?;
354
355 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
357
358 if ctx.is_explain_trace() {
359 ctx.trace("Inline Session Timezone:");
360 ctx.trace(plan.explain_to_string());
361 }
362
363 #[cfg(debug_assertions)]
364 InputRefValidator.validate(plan.clone());
365 assert_eq!(
366 *plan.distribution(),
367 Distribution::Single,
368 "{}",
369 plan.explain_to_string()
370 );
371 assert!(
372 !has_batch_exchange(plan.clone()),
373 "{}",
374 plan.explain_to_string()
375 );
376
377 let ctx = plan.ctx();
378 if ctx.is_explain_trace() {
379 ctx.trace("To Batch Physical Plan:");
380 ctx.trace(plan.explain_to_string());
381 }
382
383 Ok(self.into_phase(plan))
384 }
385}
386
387impl BatchPlanRoot {
388 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
390 self.required_dist = RequiredDist::single();
391 let mut plan = self.plan;
392
393 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
395
396 let ctx = plan.ctx();
397 if ctx.is_explain_trace() {
398 ctx.trace("To Batch Distributed Plan:");
399 ctx.trace(plan.explain_to_string());
400 }
401 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
402 plan =
403 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
404 }
405
406 if self.out_fields.count_ones(..) != self.out_fields.len() {
408 plan =
409 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
410 }
411
412 let plan = plan.optimize_by_rules(&OptimizationStage::new(
414 "Push Limit To Scan",
415 vec![BatchPushLimitToScanRule::create()],
416 ApplyOrder::BottomUp,
417 ))?;
418
419 let plan = plan.optimize_by_rules(&OptimizationStage::new(
420 "Iceberg Count Star",
421 vec![BatchIcebergCountStar::create()],
422 ApplyOrder::TopDown,
423 ))?;
424
425 let plan = plan.optimize_by_rules(&OptimizationStage::new(
428 "Iceberg Predicate Pushdown",
429 vec![BatchIcebergPredicatePushDownRule::create()],
430 ApplyOrder::BottomUp,
431 ))?;
432
433 Ok(plan)
434 }
435
436 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
438 let mut plan = self.plan;
439
440 plan = plan.to_local_with_order_required(&self.required_order)?;
442
443 let insert_exchange = match plan.distribution() {
446 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
447 _ => true,
448 };
449 if insert_exchange {
450 plan =
451 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
452 }
453
454 if self.out_fields.count_ones(..) != self.out_fields.len() {
456 plan =
457 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
458 }
459
460 let ctx = plan.ctx();
461 if ctx.is_explain_trace() {
462 ctx.trace("To Batch Local Plan:");
463 ctx.trace(plan.explain_to_string());
464 }
465
466 let plan = plan.optimize_by_rules(&OptimizationStage::new(
468 "Push Limit To Scan",
469 vec![BatchPushLimitToScanRule::create()],
470 ApplyOrder::BottomUp,
471 ))?;
472
473 let plan = plan.optimize_by_rules(&OptimizationStage::new(
474 "Iceberg Count Star",
475 vec![BatchIcebergCountStar::create()],
476 ApplyOrder::TopDown,
477 ))?;
478 Ok(plan)
479 }
480}
481
482impl LogicalPlanRoot {
483 fn gen_optimized_stream_plan(
485 self,
486 emit_on_window_close: bool,
487 allow_snapshot_backfill: bool,
488 ) -> Result<StreamOptimizedLogicalPlanRoot> {
489 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
490 StreamScanType::SnapshotBackfill
491 } else if self.should_use_arrangement_backfill() {
492 StreamScanType::ArrangementBackfill
493 } else {
494 StreamScanType::Backfill
495 };
496 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
497 }
498
499 fn gen_optimized_stream_plan_inner(
500 self,
501 emit_on_window_close: bool,
502 stream_scan_type: StreamScanType,
503 ) -> Result<StreamOptimizedLogicalPlanRoot> {
504 let ctx = self.plan.ctx();
505 let _explain_trace = ctx.is_explain_trace();
506
507 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
508
509 let mut plan = optimized_plan
510 .plan
511 .clone()
512 .optimize_by_rules(&OptimizationStage::new(
513 "Merge StreamProject",
514 vec![StreamProjectMergeRule::create()],
515 ApplyOrder::BottomUp,
516 ))?;
517
518 if ctx
519 .session_ctx()
520 .config()
521 .streaming_separate_consecutive_join()
522 {
523 plan = plan.optimize_by_rules(&OptimizationStage::new(
524 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
525 vec![SeparateConsecutiveJoinRule::create()],
526 ApplyOrder::BottomUp,
527 ))?;
528 }
529
530 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
534 plan = plan.optimize_by_rules(&OptimizationStage::new(
535 "Add Logstore for Unaligned join",
536 vec![AddLogstoreRule::create()],
537 ApplyOrder::BottomUp,
538 ))?;
539 }
540
541 if ctx.session_ctx().config().streaming_enable_delta_join()
542 && ctx.session_ctx().config().enable_index_selection()
543 {
544 plan = plan.optimize_by_rules(&OptimizationStage::new(
547 "To IndexDeltaJoin",
548 vec![IndexDeltaJoinRule::create()],
549 ApplyOrder::BottomUp,
550 ))?;
551 }
552 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
554
555 if ctx.is_explain_trace() {
556 ctx.trace("Inline session timezone:");
557 ctx.trace(plan.explain_to_string());
558 }
559
560 plan = const_eval_exprs(plan)?;
562
563 if ctx.is_explain_trace() {
564 ctx.trace("Const eval exprs:");
565 ctx.trace(plan.explain_to_string());
566 }
567
568 #[cfg(debug_assertions)]
569 InputRefValidator.validate(plan.clone());
570
571 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
572 return Err(ErrorCode::NotSupported(
573 "exist dangling temporal scan".to_owned(),
574 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
575 ).into());
576 }
577
578 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
579 return Err(ErrorCode::NotSupported(
580 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
581 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
582 ).into());
583 }
584
585 Ok(optimized_plan.into_phase(plan))
586 }
587
588 fn gen_stream_plan(
590 self,
591 emit_on_window_close: bool,
592 stream_scan_type: StreamScanType,
593 ) -> Result<StreamOptimizedLogicalPlanRoot> {
594 let ctx = self.plan.ctx();
595 let explain_trace = ctx.is_explain_trace();
596
597 let plan = {
598 {
599 if !ctx
600 .session_ctx()
601 .config()
602 .streaming_allow_jsonb_in_stream_key()
603 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
604 {
605 return Err(ErrorCode::NotSupported(
606 err,
607 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
608 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
609 ).into());
610 }
611 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
612 let (plan, out_col_change) = {
613 let (plan, out_col_change) = optimized_plan
614 .plan
615 .logical_rewrite_for_stream(&mut Default::default())?;
616 if out_col_change.is_injective() {
617 (plan, out_col_change)
618 } else {
619 let mut output_indices = (0..plan.schema().len()).collect_vec();
620 #[allow(unused_assignments)]
621 let (mut map, mut target_size) = out_col_change.into_parts();
622
623 target_size = plan.schema().len();
626 let mut tar_exists = vec![false; target_size];
627 for i in map.iter_mut().flatten() {
628 if tar_exists[*i] {
629 output_indices.push(*i);
630 *i = target_size;
631 target_size += 1;
632 } else {
633 tar_exists[*i] = true;
634 }
635 }
636 let plan =
637 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
638 let out_col_change = ColIndexMapping::new(map, target_size);
639 (plan.into(), out_col_change)
640 }
641 };
642
643 if explain_trace {
644 ctx.trace("Logical Rewrite For Stream:");
645 ctx.trace(plan.explain_to_string());
646 }
647
648 optimized_plan.required_dist =
649 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
650 optimized_plan.required_order = out_col_change
651 .rewrite_required_order(&optimized_plan.required_order)
652 .unwrap();
653 optimized_plan.out_fields =
654 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
655 let mut plan = plan.to_stream_with_dist_required(
656 &optimized_plan.required_dist,
657 &mut ToStreamContext::new_with_stream_scan_type(
658 emit_on_window_close,
659 stream_scan_type,
660 ),
661 )?;
662 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
663 optimized_plan.into_phase(plan)
664 }
665 };
666
667 if explain_trace {
668 ctx.trace("To Stream Plan:");
669 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
671 }
672 Ok(plan)
673 }
674
675 fn compute_cardinality(&self) -> Cardinality {
679 CardinalityVisitor.visit(self.plan.clone())
680 }
681
682 pub fn gen_table_plan(
684 self,
685 context: OptimizerContextRef,
686 table_name: String,
687 database_id: DatabaseId,
688 schema_id: SchemaId,
689 CreateTableInfo {
690 columns,
691 pk_column_ids,
692 row_id_index,
693 watermark_descs,
694 source_catalog,
695 version,
696 }: CreateTableInfo,
697 CreateTableProps {
698 definition,
699 append_only,
700 on_conflict,
701 with_version_columns,
702 webhook_info,
703 engine,
704 }: CreateTableProps,
705 ) -> Result<StreamMaterialize> {
706 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
708
709 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
710
711 let pk_column_indices = {
712 let mut id_to_idx = HashMap::new();
713
714 columns.iter().enumerate().for_each(|(idx, c)| {
715 id_to_idx.insert(c.column_id(), idx);
716 });
717 pk_column_ids
718 .iter()
719 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
721 };
722
723 fn inject_project_for_generated_column_if_needed(
724 columns: &[ColumnCatalog],
725 node: StreamPlanRef,
726 ) -> Result<StreamPlanRef> {
727 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
728 if let Some(exprs) = exprs {
729 let logical_project = generic::Project::new(exprs, node);
730 return Ok(StreamProject::new(logical_project).into());
731 }
732 Ok(node)
733 }
734
735 #[derive(PartialEq, Debug, Copy, Clone)]
736 enum PrimaryKeyKind {
737 UserDefinedPrimaryKey,
738 NonAppendOnlyRowIdPk,
739 AppendOnlyRowIdPk,
740 }
741
742 fn inject_dml_node(
743 columns: &[ColumnCatalog],
744 append_only: bool,
745 stream_plan: StreamPlanRef,
746 pk_column_indices: &[usize],
747 kind: PrimaryKeyKind,
748 column_descs: Vec<ColumnDesc>,
749 ) -> Result<StreamPlanRef> {
750 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
751
752 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
754
755 dml_node = match kind {
756 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
757 RequiredDist::hash_shard(pk_column_indices)
758 .streaming_enforce_if_not_satisfies(dml_node)?
759 }
760 PrimaryKeyKind::AppendOnlyRowIdPk => {
761 StreamExchange::new_no_shuffle(dml_node).into()
762 }
763 };
764
765 Ok(dml_node)
766 }
767
768 let kind = if let Some(row_id_index) = row_id_index {
769 assert_eq!(
770 pk_column_indices.iter().exactly_one().copied().unwrap(),
771 row_id_index
772 );
773 if append_only {
774 PrimaryKeyKind::AppendOnlyRowIdPk
775 } else {
776 PrimaryKeyKind::NonAppendOnlyRowIdPk
777 }
778 } else {
779 PrimaryKeyKind::UserDefinedPrimaryKey
780 };
781
782 let column_descs: Vec<ColumnDesc> = columns
783 .iter()
784 .filter(|&c| c.can_dml())
785 .map(|c| c.column_desc.clone())
786 .collect();
787
788 let mut not_null_idxs = vec![];
789 for (idx, column) in column_descs.iter().enumerate() {
790 if !column.nullable {
791 not_null_idxs.push(idx);
792 }
793 }
794
795 let version_column_indices = if !with_version_columns.is_empty() {
796 find_version_column_indices(&columns, with_version_columns)?
797 } else {
798 vec![]
799 };
800
801 let with_external_source = source_catalog.is_some();
802 let (dml_source_node, external_source_node) = if with_external_source {
803 let dummy_source_node = LogicalSource::new(
804 None,
805 columns.clone(),
806 row_id_index,
807 SourceNodeKind::CreateTable,
808 context.clone(),
809 None,
810 )
811 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
812 let mut external_source_node = stream_plan.plan;
813 external_source_node =
814 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
815 external_source_node = match kind {
816 PrimaryKeyKind::UserDefinedPrimaryKey => {
817 RequiredDist::hash_shard(&pk_column_indices)
818 .streaming_enforce_if_not_satisfies(external_source_node)?
819 }
820
821 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
822 StreamExchange::new_no_shuffle(external_source_node).into()
823 }
824 };
825 (dummy_source_node, Some(external_source_node))
826 } else {
827 (stream_plan.plan, None)
828 };
829
830 let dml_node = inject_dml_node(
831 &columns,
832 append_only,
833 dml_source_node,
834 &pk_column_indices,
835 kind,
836 column_descs,
837 )?;
838
839 let dists = external_source_node
840 .iter()
841 .map(|input| input.distribution())
842 .chain([dml_node.distribution()])
843 .unique()
844 .collect_vec();
845
846 let dist = match &dists[..] {
847 &[Distribution::SomeShard, Distribution::HashShard(_)]
848 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
849 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
850 dist.clone()
851 }
852 _ => {
853 unreachable!()
854 }
855 };
856
857 let generated_column_exprs =
858 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
859 let upstream_sink_union = StreamUpstreamSinkUnion::new(
860 context.clone(),
861 dml_node.schema(),
862 dml_node.stream_key(),
863 dist.clone(), append_only,
865 row_id_index.is_none(),
866 generated_column_exprs,
867 );
868
869 let union_inputs = external_source_node
870 .into_iter()
871 .chain([dml_node, upstream_sink_union.into()])
872 .collect_vec();
873
874 let mut stream_plan = StreamUnion::new_with_dist(
875 Union {
876 all: true,
877 inputs: union_inputs,
878 source_col: None,
879 },
880 dist,
881 )
882 .into();
883
884 if !watermark_descs.is_empty() {
886 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
887 }
888
889 if let Some(row_id_index) = row_id_index {
891 match kind {
892 PrimaryKeyKind::UserDefinedPrimaryKey => {
893 unreachable!()
894 }
895 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
896 stream_plan = StreamRowIdGen::new_with_dist(
897 stream_plan,
898 row_id_index,
899 Distribution::HashShard(vec![row_id_index]),
900 )
901 .into();
902 }
903 }
904 }
905
906 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
907
908 if let ConflictBehavior::IgnoreConflict = conflict_behavior
909 && !version_column_indices.is_empty()
910 {
911 Err(ErrorCode::InvalidParameterValue(
912 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
913 ))?
914 }
915
916 let retention_seconds = context.with_options().retention_seconds();
917
918 let table_required_dist = {
919 let mut bitset = FixedBitSet::with_capacity(columns.len());
920 for idx in &pk_column_indices {
921 bitset.insert(*idx);
922 }
923 RequiredDist::ShardByKey(bitset)
924 };
925
926 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
927
928 if !not_null_idxs.is_empty() {
929 stream_plan =
930 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
931 }
932
933 let refreshable = source_catalog
935 .as_ref()
936 .map(|catalog| catalog.with_properties.is_batch_connector())
937 .unwrap_or(false);
938
939 if refreshable && row_id_index.is_some() {
941 return Err(crate::error::ErrorCode::BindError(
942 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
943 .to_owned(),
944 )
945 .into());
946 }
947
948 StreamMaterialize::create_for_table(
949 stream_plan,
950 table_name,
951 database_id,
952 schema_id,
953 table_required_dist,
954 Order::any(),
955 columns,
956 definition,
957 conflict_behavior,
958 version_column_indices,
959 pk_column_indices,
960 row_id_index,
961 version,
962 retention_seconds,
963 webhook_info,
964 engine,
965 refreshable,
966 )
967 }
968
969 pub fn gen_materialize_plan(
971 self,
972 database_id: DatabaseId,
973 schema_id: SchemaId,
974 mv_name: String,
975 definition: String,
976 emit_on_window_close: bool,
977 ) -> Result<StreamMaterialize> {
978 let cardinality = self.compute_cardinality();
979 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
980 StreamMaterialize::create(
981 stream_plan,
982 mv_name,
983 database_id,
984 schema_id,
985 definition,
986 TableType::MaterializedView,
987 cardinality,
988 None,
989 )
990 }
991
992 pub fn gen_index_plan(
994 self,
995 index_name: String,
996 database_id: DatabaseId,
997 schema_id: SchemaId,
998 definition: String,
999 retention_seconds: Option<NonZeroU32>,
1000 ) -> Result<StreamMaterialize> {
1001 let cardinality = self.compute_cardinality();
1002 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1003
1004 StreamMaterialize::create(
1005 stream_plan,
1006 index_name,
1007 database_id,
1008 schema_id,
1009 definition,
1010 TableType::Index,
1011 cardinality,
1012 retention_seconds,
1013 )
1014 }
1015
1016 pub fn gen_vector_index_plan(
1017 self,
1018 index_name: String,
1019 database_id: DatabaseId,
1020 schema_id: SchemaId,
1021 definition: String,
1022 retention_seconds: Option<NonZeroU32>,
1023 vector_index_info: PbVectorIndexInfo,
1024 ) -> Result<StreamVectorIndexWrite> {
1025 let cardinality = self.compute_cardinality();
1026 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1027
1028 StreamVectorIndexWrite::create(
1029 stream_plan,
1030 index_name,
1031 database_id,
1032 schema_id,
1033 definition,
1034 cardinality,
1035 retention_seconds,
1036 vector_index_info,
1037 )
1038 }
1039
1040 #[allow(clippy::too_many_arguments)]
1042 pub fn gen_sink_plan(
1043 self,
1044 sink_name: String,
1045 definition: String,
1046 properties: WithOptionsSecResolved,
1047 emit_on_window_close: bool,
1048 db_name: String,
1049 sink_from_table_name: String,
1050 format_desc: Option<SinkFormatDesc>,
1051 without_backfill: bool,
1052 target_table: Option<Arc<TableCatalog>>,
1053 partition_info: Option<PartitionComputeInfo>,
1054 user_specified_columns: bool,
1055 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1056 ) -> Result<StreamSink> {
1057 let stream_scan_type = if without_backfill {
1058 StreamScanType::UpstreamOnly
1059 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1060 StreamScanType::SnapshotBackfill
1062 } else if self.should_use_arrangement_backfill() {
1063 StreamScanType::ArrangementBackfill
1064 } else {
1065 StreamScanType::Backfill
1066 };
1067 if auto_refresh_schema_from_table.is_some()
1068 && stream_scan_type != StreamScanType::ArrangementBackfill
1069 {
1070 return Err(ErrorCode::InvalidInputSyntax(format!(
1071 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1072 stream_scan_type
1073 ))
1074 .into());
1075 }
1076 let stream_plan =
1077 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1078 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1079 let columns = t.columns_without_rw_timestamp();
1080 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1081 });
1082
1083 StreamSink::create(
1084 stream_plan,
1085 sink_name,
1086 db_name,
1087 sink_from_table_name,
1088 target_table,
1089 target_columns_to_plan_mapping,
1090 definition,
1091 properties,
1092 format_desc,
1093 partition_info,
1094 auto_refresh_schema_from_table,
1095 )
1096 }
1097}
1098
1099impl<P: PlanPhase> PlanRoot<P> {
1100 pub fn should_use_arrangement_backfill(&self) -> bool {
1101 let ctx = self.plan.ctx();
1102 let session_ctx = ctx.session_ctx();
1103 let arrangement_backfill_enabled = session_ctx
1104 .env()
1105 .streaming_config()
1106 .developer
1107 .enable_arrangement_backfill;
1108 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1109 }
1110
1111 pub fn should_use_snapshot_backfill(&self) -> bool {
1112 self.plan
1113 .ctx()
1114 .session_ctx()
1115 .config()
1116 .streaming_use_snapshot_backfill()
1117 }
1118
1119 pub fn target_columns_to_plan_mapping(
1121 &self,
1122 tar_cols: &[ColumnCatalog],
1123 user_specified_columns: bool,
1124 ) -> Vec<Option<usize>> {
1125 #[allow(clippy::disallowed_methods)]
1126 let visible_cols: Vec<(usize, String)> = self
1127 .out_fields
1128 .ones()
1129 .zip_eq(self.out_names.iter().cloned())
1130 .collect_vec();
1131
1132 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1133 let visible_col_idxes_by_name = visible_cols
1134 .iter()
1135 .map(|(i, name)| (name.as_ref(), *i))
1136 .collect::<BTreeMap<_, _>>();
1137
1138 tar_cols
1139 .iter()
1140 .enumerate()
1141 .filter(|(_, tar_col)| tar_col.can_dml())
1142 .map(|(tar_i, tar_col)| {
1143 if user_specified_columns {
1144 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1145 } else {
1146 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1147 }
1148 })
1149 .collect()
1150 }
1151}
1152
1153fn find_version_column_indices(
1154 column_catalog: &Vec<ColumnCatalog>,
1155 version_column_names: Vec<String>,
1156) -> Result<Vec<usize>> {
1157 let mut indices = Vec::new();
1158 for version_column_name in version_column_names {
1159 let mut found = false;
1160 for (index, column) in column_catalog.iter().enumerate() {
1161 if column.column_desc.name == version_column_name {
1162 if let &DataType::Jsonb
1163 | &DataType::List(_)
1164 | &DataType::Struct(_)
1165 | &DataType::Bytea
1166 | &DataType::Boolean = column.data_type()
1167 {
1168 return Err(ErrorCode::InvalidInputSyntax(format!(
1169 "Version column {} must be of a comparable data type",
1170 version_column_name
1171 ))
1172 .into());
1173 }
1174 indices.push(index);
1175 found = true;
1176 break;
1177 }
1178 }
1179 if !found {
1180 return Err(ErrorCode::InvalidInputSyntax(format!(
1181 "Version column {} not found",
1182 version_column_name
1183 ))
1184 .into());
1185 }
1186 }
1187 Ok(indices)
1188}
1189
1190fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1191 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1192
1193 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1194 if let Some(error) = const_eval_rewriter.error {
1195 return Err(error);
1196 }
1197 Ok(plan)
1198}
1199
1200fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1201 ctx: OptimizerContextRef,
1202 plan: PlanRef<C>,
1203) -> Result<PlanRef<C>> {
1204 let mut v = TimestamptzExprFinder::default();
1205 plan.visit_exprs_recursive(&mut v);
1206 if v.has() {
1207 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1208 } else {
1209 Ok(plan)
1210 }
1211}
1212
1213fn exist_and_no_exchange_before(
1214 plan: &BatchPlanRef,
1215 is_candidate: fn(&BatchPlanRef) -> bool,
1216) -> bool {
1217 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1218 return false;
1219 }
1220 is_candidate(plan)
1221 || plan
1222 .inputs()
1223 .iter()
1224 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1225}
1226
1227impl BatchPlanRef {
1228 fn is_user_table_scan(&self) -> bool {
1229 self.node_type() == BatchPlanNodeType::BatchSeqScan
1230 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1231 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1232 }
1233
1234 fn is_source_scan(&self) -> bool {
1235 self.node_type() == BatchPlanNodeType::BatchSource
1236 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1237 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1238 }
1239
1240 fn is_insert(&self) -> bool {
1241 self.node_type() == BatchPlanNodeType::BatchInsert
1242 }
1243
1244 fn is_update(&self) -> bool {
1245 self.node_type() == BatchPlanNodeType::BatchUpdate
1246 }
1247
1248 fn is_delete(&self) -> bool {
1249 self.node_type() == BatchPlanNodeType::BatchDelete
1250 }
1251}
1252
1253fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1259 assert_eq!(plan.distribution(), &Distribution::Single);
1260 exist_and_no_exchange_before(&plan, |plan| {
1261 plan.is_user_table_scan()
1262 || plan.is_source_scan()
1263 || plan.is_insert()
1264 || plan.is_update()
1265 || plan.is_delete()
1266 })
1267}
1268
1269fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1272 assert_eq!(plan.distribution(), &Distribution::Single);
1273 exist_and_no_exchange_before(&plan, |plan| {
1274 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1275 })
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281 use crate::optimizer::plan_node::LogicalValues;
1282
1283 #[tokio::test]
1284 async fn test_as_subplan() {
1285 let ctx = OptimizerContext::mock().await;
1286 let values = LogicalValues::new(
1287 vec![],
1288 Schema::new(vec![
1289 Field::with_name(DataType::Int32, "v1"),
1290 Field::with_name(DataType::Varchar, "v2"),
1291 ]),
1292 ctx,
1293 )
1294 .into();
1295 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1296 let out_names = vec!["v1".into()];
1297 let root = PlanRoot::new_with_logical_plan(
1298 values,
1299 RequiredDist::Any,
1300 Order::any(),
1301 out_fields,
1302 out_names,
1303 );
1304 let subplan = root.into_unordered_subplan();
1305 assert_eq!(
1306 subplan.schema(),
1307 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1308 );
1309 }
1310}