1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35 ExecutionModeDecider, PlanVisitor, ReadStorageTableVisitor, RelationCollectorVisitor,
36 SysTableVisitor,
37};
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70 ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86 StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87 ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{RwTimestampValidator, TemporalJoinValidator};
90use crate::optimizer::property::Distribution;
91use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
92
93#[derive(Educe)]
104#[educe(Debug, Clone)]
105pub struct PlanRoot<P: PlanPhase> {
106 pub plan: PlanRef<P::Convention>,
108 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
110 _phase: PhantomData<P>,
111 required_dist: RequiredDist,
112 required_order: Order,
113 out_fields: FixedBitSet,
114 out_names: Vec<String>,
115}
116
117pub trait PlanPhase {
123 type Convention: ConventionMarker;
124}
125
126macro_rules! for_all_phase {
127 () => {
128 for_all_phase! {
129 { Logical, $crate::optimizer::plan_node::Logical },
130 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
131 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
132 { Batch, $crate::optimizer::plan_node::Batch },
133 { Stream, $crate::optimizer::plan_node::Stream }
134 }
135 };
136 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
137 $(
138 paste::paste! {
139 pub struct [< PlanPhase$phase >];
140 impl PlanPhase for [< PlanPhase$phase >] {
141 type Convention = $convention;
142 }
143 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
144 }
145 )+
146 }
147}
148
149for_all_phase!();
150
151impl LogicalPlanRoot {
152 pub fn new_with_logical_plan(
153 plan: LogicalPlanRef,
154 required_dist: RequiredDist,
155 required_order: Order,
156 out_fields: FixedBitSet,
157 out_names: Vec<String>,
158 ) -> Self {
159 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
160 }
161}
162
163impl BatchPlanRoot {
164 pub fn new_with_batch_plan(
165 plan: BatchPlanRef,
166 required_dist: RequiredDist,
167 required_order: Order,
168 out_fields: FixedBitSet,
169 out_names: Vec<String>,
170 ) -> Self {
171 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
172 }
173}
174
175impl<P: PlanPhase> PlanRoot<P> {
176 fn new_inner(
177 plan: PlanRef<P::Convention>,
178 required_dist: RequiredDist,
179 required_order: Order,
180 out_fields: FixedBitSet,
181 out_names: Vec<String>,
182 ) -> Self {
183 let input_schema = plan.schema();
184 assert_eq!(input_schema.fields().len(), out_fields.len());
185 assert_eq!(out_fields.count_ones(..), out_names.len());
186
187 Self {
188 plan,
189 _phase: PhantomData,
190 required_dist,
191 required_order,
192 out_fields,
193 out_names,
194 }
195 }
196
197 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
198 PlanRoot {
199 plan,
200 _phase: PhantomData,
201 required_dist: self.required_dist,
202 required_order: self.required_order,
203 out_fields: self.out_fields,
204 out_names: self.out_names,
205 }
206 }
207
208 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
213 if out_names.len() != self.out_fields.count_ones(..) {
214 Err(ErrorCode::InvalidInputSyntax(
215 "number of column names does not match number of columns".to_owned(),
216 ))?
217 }
218 self.out_names = out_names;
219 Ok(())
220 }
221
222 pub fn schema(&self) -> Schema {
224 Schema {
227 fields: self
228 .out_fields
229 .ones()
230 .map(|i| self.plan.schema().fields()[i].clone())
231 .zip_eq_debug(&self.out_names)
232 .map(|(field, name)| Field {
233 name: name.clone(),
234 ..field
235 })
236 .collect(),
237 }
238 }
239}
240
241impl LogicalPlanRoot {
242 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
246 if self.out_fields.count_ones(..) == self.out_fields.len() {
247 return self.plan;
248 }
249 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
250 }
251
252 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
256 use generic::Agg;
257 use plan_node::PlanAggCall;
258 use risingwave_common::types::ListValue;
259 use risingwave_expr::aggregate::PbAggKind;
260
261 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
262 use crate::utils::{Condition, IndexSet};
263
264 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
265 bail!("subquery must return only one column");
266 };
267 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
268 let return_type = DataType::List(input_column_type.clone().into());
269 let agg = Agg::new(
270 vec![PlanAggCall {
271 agg_type: PbAggKind::ArrayAgg.into(),
272 return_type: return_type.clone(),
273 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
274 distinct: false,
275 order_by: self.required_order.column_orders,
276 filter: Condition::true_cond(),
277 direct_args: vec![],
278 }],
279 IndexSet::empty(),
280 self.plan,
281 );
282 Ok(LogicalProject::create(
283 agg.into(),
284 vec![
285 FunctionCall::new(
286 ExprType::Coalesce,
287 vec![
288 InputRef::new(0, return_type).into(),
289 ExprImpl::literal_list(
290 ListValue::empty(&input_column_type),
291 input_column_type,
292 ),
293 ],
294 )
295 .unwrap()
296 .into(),
297 ],
298 ))
299 }
300
301 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
303 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
304 Ok(self)
305 }
306
307 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
309 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
310 Ok(self.into_phase(plan))
311 }
312
313 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
314 self.gen_optimized_logical_plan_for_batch()?
315 .gen_batch_plan()
316 }
317}
318
319impl BatchOptimizedLogicalPlanRoot {
320 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
322 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
323 return Err(ErrorCode::NotSupported(
324 "do not support temporal join for batch queries".to_owned(),
325 "please use temporal join in streaming queries".to_owned(),
326 )
327 .into());
328 }
329
330 let ctx = self.plan.ctx();
331 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
333
334 plan = const_eval_exprs(plan)?;
336
337 if ctx.is_explain_trace() {
338 ctx.trace("Const eval exprs:");
339 ctx.trace(plan.explain_to_string());
340 }
341
342 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
344 if ctx.is_explain_trace() {
345 ctx.trace("To Batch Plan:");
346 ctx.trace(plan.explain_to_string());
347 }
348
349 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
350 "Merge BatchProject",
351 vec![BatchProjectMergeRule::create()],
352 ApplyOrder::BottomUp,
353 ))?;
354
355 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
357
358 if ctx.is_explain_trace() {
359 ctx.trace("Inline Session Timezone:");
360 ctx.trace(plan.explain_to_string());
361 }
362
363 #[cfg(debug_assertions)]
364 InputRefValidator.validate(plan.clone());
365 assert!(
366 *plan.distribution() == Distribution::Single,
367 "{}",
368 plan.explain_to_string()
369 );
370 assert!(
371 !has_batch_exchange(plan.clone()),
372 "{}",
373 plan.explain_to_string()
374 );
375
376 let ctx = plan.ctx();
377 if ctx.is_explain_trace() {
378 ctx.trace("To Batch Physical Plan:");
379 ctx.trace(plan.explain_to_string());
380 }
381
382 Ok(self.into_phase(plan))
383 }
384}
385
386impl BatchPlanRoot {
387 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
389 self.required_dist = RequiredDist::single();
390 let mut plan = self.plan;
391
392 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
394
395 if self.out_fields.count_ones(..) != self.out_fields.len() {
397 plan =
398 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
399 }
400
401 let ctx = plan.ctx();
402 if ctx.is_explain_trace() {
403 ctx.trace("To Batch Distributed Plan:");
404 ctx.trace(plan.explain_to_string());
405 }
406 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
407 plan =
408 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
409 }
410
411 let plan = plan.optimize_by_rules(&OptimizationStage::new(
413 "Push Limit To Scan",
414 vec![BatchPushLimitToScanRule::create()],
415 ApplyOrder::BottomUp,
416 ))?;
417
418 let plan = plan.optimize_by_rules(&OptimizationStage::new(
419 "Iceberg Count Star",
420 vec![BatchIcebergCountStar::create()],
421 ApplyOrder::TopDown,
422 ))?;
423
424 let plan = plan.optimize_by_rules(&OptimizationStage::new(
427 "Iceberg Predicate Pushdown",
428 vec![BatchIcebergPredicatePushDownRule::create()],
429 ApplyOrder::BottomUp,
430 ))?;
431
432 Ok(plan)
433 }
434
435 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
437 let mut plan = self.plan;
438
439 plan = plan.to_local_with_order_required(&self.required_order)?;
441
442 let insert_exchange = match plan.distribution() {
445 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
446 _ => true,
447 };
448 if insert_exchange {
449 plan =
450 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
451 }
452
453 if self.out_fields.count_ones(..) != self.out_fields.len() {
455 plan =
456 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
457 }
458
459 let ctx = plan.ctx();
460 if ctx.is_explain_trace() {
461 ctx.trace("To Batch Local Plan:");
462 ctx.trace(plan.explain_to_string());
463 }
464
465 let plan = plan.optimize_by_rules(&OptimizationStage::new(
467 "Push Limit To Scan",
468 vec![BatchPushLimitToScanRule::create()],
469 ApplyOrder::BottomUp,
470 ))?;
471
472 let plan = plan.optimize_by_rules(&OptimizationStage::new(
473 "Iceberg Count Star",
474 vec![BatchIcebergCountStar::create()],
475 ApplyOrder::TopDown,
476 ))?;
477 Ok(plan)
478 }
479}
480
481impl LogicalPlanRoot {
482 fn gen_optimized_stream_plan(
484 self,
485 emit_on_window_close: bool,
486 allow_snapshot_backfill: bool,
487 ) -> Result<StreamOptimizedLogicalPlanRoot> {
488 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
489 StreamScanType::SnapshotBackfill
490 } else if self.should_use_arrangement_backfill() {
491 StreamScanType::ArrangementBackfill
492 } else {
493 StreamScanType::Backfill
494 };
495 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
496 }
497
498 fn gen_optimized_stream_plan_inner(
499 self,
500 emit_on_window_close: bool,
501 stream_scan_type: StreamScanType,
502 ) -> Result<StreamOptimizedLogicalPlanRoot> {
503 let ctx = self.plan.ctx();
504 let _explain_trace = ctx.is_explain_trace();
505
506 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
507
508 let mut plan = optimized_plan
509 .plan
510 .clone()
511 .optimize_by_rules(&OptimizationStage::new(
512 "Merge StreamProject",
513 vec![StreamProjectMergeRule::create()],
514 ApplyOrder::BottomUp,
515 ))?;
516
517 if ctx
518 .session_ctx()
519 .config()
520 .streaming_separate_consecutive_join()
521 {
522 plan = plan.optimize_by_rules(&OptimizationStage::new(
523 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
524 vec![SeparateConsecutiveJoinRule::create()],
525 ApplyOrder::BottomUp,
526 ))?;
527 }
528
529 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
533 plan = plan.optimize_by_rules(&OptimizationStage::new(
534 "Add Logstore for Unaligned join",
535 vec![AddLogstoreRule::create()],
536 ApplyOrder::BottomUp,
537 ))?;
538 }
539
540 if ctx.session_ctx().config().streaming_enable_delta_join()
541 && ctx.session_ctx().config().enable_index_selection()
542 {
543 plan = plan.optimize_by_rules(&OptimizationStage::new(
546 "To IndexDeltaJoin",
547 vec![IndexDeltaJoinRule::create()],
548 ApplyOrder::BottomUp,
549 ))?;
550 }
551 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
553
554 if ctx.is_explain_trace() {
555 ctx.trace("Inline session timezone:");
556 ctx.trace(plan.explain_to_string());
557 }
558
559 plan = const_eval_exprs(plan)?;
561
562 if ctx.is_explain_trace() {
563 ctx.trace("Const eval exprs:");
564 ctx.trace(plan.explain_to_string());
565 }
566
567 #[cfg(debug_assertions)]
568 InputRefValidator.validate(plan.clone());
569
570 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
571 return Err(ErrorCode::NotSupported(
572 "exist dangling temporal scan".to_owned(),
573 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
574 ).into());
575 }
576
577 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
578 return Err(ErrorCode::NotSupported(
579 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
580 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
581 ).into());
582 }
583
584 Ok(optimized_plan.into_phase(plan))
585 }
586
587 fn gen_stream_plan(
589 self,
590 emit_on_window_close: bool,
591 stream_scan_type: StreamScanType,
592 ) -> Result<StreamOptimizedLogicalPlanRoot> {
593 let ctx = self.plan.ctx();
594 let explain_trace = ctx.is_explain_trace();
595
596 let plan = {
597 {
598 if !ctx
599 .session_ctx()
600 .config()
601 .streaming_allow_jsonb_in_stream_key()
602 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
603 {
604 return Err(ErrorCode::NotSupported(
605 err,
606 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
607 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
608 ).into());
609 }
610 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
611 let (plan, out_col_change) = {
612 let (plan, out_col_change) = optimized_plan
613 .plan
614 .logical_rewrite_for_stream(&mut Default::default())?;
615 if out_col_change.is_injective() {
616 (plan, out_col_change)
617 } else {
618 let mut output_indices = (0..plan.schema().len()).collect_vec();
619 #[allow(unused_assignments)]
620 let (mut map, mut target_size) = out_col_change.into_parts();
621
622 target_size = plan.schema().len();
625 let mut tar_exists = vec![false; target_size];
626 for i in map.iter_mut().flatten() {
627 if tar_exists[*i] {
628 output_indices.push(*i);
629 *i = target_size;
630 target_size += 1;
631 } else {
632 tar_exists[*i] = true;
633 }
634 }
635 let plan =
636 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
637 let out_col_change = ColIndexMapping::new(map, target_size);
638 (plan.into(), out_col_change)
639 }
640 };
641
642 if explain_trace {
643 ctx.trace("Logical Rewrite For Stream:");
644 ctx.trace(plan.explain_to_string());
645 }
646
647 optimized_plan.required_dist =
648 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
649 optimized_plan.required_order = out_col_change
650 .rewrite_required_order(&optimized_plan.required_order)
651 .unwrap();
652 optimized_plan.out_fields =
653 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
654 let mut plan = plan.to_stream_with_dist_required(
655 &optimized_plan.required_dist,
656 &mut ToStreamContext::new_with_stream_scan_type(
657 emit_on_window_close,
658 stream_scan_type,
659 ),
660 )?;
661 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
662 optimized_plan.into_phase(plan)
663 }
664 };
665
666 if explain_trace {
667 ctx.trace("To Stream Plan:");
668 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
670 }
671 Ok(plan)
672 }
673
674 fn compute_cardinality(&self) -> Cardinality {
678 CardinalityVisitor.visit(self.plan.clone())
679 }
680
681 pub fn gen_table_plan(
683 self,
684 context: OptimizerContextRef,
685 table_name: String,
686 database_id: DatabaseId,
687 schema_id: SchemaId,
688 CreateTableInfo {
689 columns,
690 pk_column_ids,
691 row_id_index,
692 watermark_descs,
693 source_catalog,
694 version,
695 }: CreateTableInfo,
696 CreateTableProps {
697 definition,
698 append_only,
699 on_conflict,
700 with_version_columns,
701 webhook_info,
702 engine,
703 }: CreateTableProps,
704 ) -> Result<StreamMaterialize> {
705 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
707
708 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
709
710 let pk_column_indices = {
711 let mut id_to_idx = HashMap::new();
712
713 columns.iter().enumerate().for_each(|(idx, c)| {
714 id_to_idx.insert(c.column_id(), idx);
715 });
716 pk_column_ids
717 .iter()
718 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
720 };
721
722 fn inject_project_for_generated_column_if_needed(
723 columns: &[ColumnCatalog],
724 node: StreamPlanRef,
725 ) -> Result<StreamPlanRef> {
726 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
727 if let Some(exprs) = exprs {
728 let logical_project = generic::Project::new(exprs, node);
729 return Ok(StreamProject::new(logical_project).into());
730 }
731 Ok(node)
732 }
733
734 #[derive(PartialEq, Debug, Copy, Clone)]
735 enum PrimaryKeyKind {
736 UserDefinedPrimaryKey,
737 NonAppendOnlyRowIdPk,
738 AppendOnlyRowIdPk,
739 }
740
741 fn inject_dml_node(
742 columns: &[ColumnCatalog],
743 append_only: bool,
744 stream_plan: StreamPlanRef,
745 pk_column_indices: &[usize],
746 kind: PrimaryKeyKind,
747 column_descs: Vec<ColumnDesc>,
748 ) -> Result<StreamPlanRef> {
749 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
750
751 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
753
754 dml_node = match kind {
755 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
756 RequiredDist::hash_shard(pk_column_indices)
757 .streaming_enforce_if_not_satisfies(dml_node)?
758 }
759 PrimaryKeyKind::AppendOnlyRowIdPk => {
760 StreamExchange::new_no_shuffle(dml_node).into()
761 }
762 };
763
764 Ok(dml_node)
765 }
766
767 let kind = if let Some(row_id_index) = row_id_index {
768 assert_eq!(
769 pk_column_indices.iter().exactly_one().copied().unwrap(),
770 row_id_index
771 );
772 if append_only {
773 PrimaryKeyKind::AppendOnlyRowIdPk
774 } else {
775 PrimaryKeyKind::NonAppendOnlyRowIdPk
776 }
777 } else {
778 PrimaryKeyKind::UserDefinedPrimaryKey
779 };
780
781 let column_descs: Vec<ColumnDesc> = columns
782 .iter()
783 .filter(|&c| c.can_dml())
784 .map(|c| c.column_desc.clone())
785 .collect();
786
787 let mut not_null_idxs = vec![];
788 for (idx, column) in column_descs.iter().enumerate() {
789 if !column.nullable {
790 not_null_idxs.push(idx);
791 }
792 }
793
794 let version_column_indices = if !with_version_columns.is_empty() {
795 find_version_column_indices(&columns, with_version_columns)?
796 } else {
797 vec![]
798 };
799
800 let with_external_source = source_catalog.is_some();
801 let (dml_source_node, external_source_node) = if with_external_source {
802 let dummy_source_node = LogicalSource::new(
803 None,
804 columns.clone(),
805 row_id_index,
806 SourceNodeKind::CreateTable,
807 context.clone(),
808 None,
809 )
810 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
811 let mut external_source_node = stream_plan.plan;
812 external_source_node =
813 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
814 external_source_node = match kind {
815 PrimaryKeyKind::UserDefinedPrimaryKey => {
816 RequiredDist::hash_shard(&pk_column_indices)
817 .streaming_enforce_if_not_satisfies(external_source_node)?
818 }
819
820 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
821 StreamExchange::new_no_shuffle(external_source_node).into()
822 }
823 };
824 (dummy_source_node, Some(external_source_node))
825 } else {
826 (stream_plan.plan, None)
827 };
828
829 let dml_node = inject_dml_node(
830 &columns,
831 append_only,
832 dml_source_node,
833 &pk_column_indices,
834 kind,
835 column_descs,
836 )?;
837
838 let dists = external_source_node
839 .iter()
840 .map(|input| input.distribution())
841 .chain([dml_node.distribution()])
842 .unique()
843 .collect_vec();
844
845 let dist = match &dists[..] {
846 &[Distribution::SomeShard, Distribution::HashShard(_)]
847 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
848 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
849 dist.clone()
850 }
851 _ => {
852 unreachable!()
853 }
854 };
855
856 let generated_column_exprs =
857 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
858 let upstream_sink_union = StreamUpstreamSinkUnion::new(
859 context.clone(),
860 dml_node.schema(),
861 dml_node.stream_key(),
862 dist.clone(), append_only,
864 row_id_index.is_none(),
865 generated_column_exprs,
866 );
867
868 let union_inputs = external_source_node
869 .into_iter()
870 .chain([dml_node, upstream_sink_union.into()])
871 .collect_vec();
872
873 let mut stream_plan = StreamUnion::new_with_dist(
874 Union {
875 all: true,
876 inputs: union_inputs,
877 source_col: None,
878 },
879 dist,
880 )
881 .into();
882
883 if !watermark_descs.is_empty() {
885 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
886 }
887
888 if let Some(row_id_index) = row_id_index {
890 match kind {
891 PrimaryKeyKind::UserDefinedPrimaryKey => {
892 unreachable!()
893 }
894 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
895 stream_plan = StreamRowIdGen::new_with_dist(
896 stream_plan,
897 row_id_index,
898 Distribution::HashShard(vec![row_id_index]),
899 )
900 .into();
901 }
902 }
903 }
904
905 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
906
907 if let ConflictBehavior::IgnoreConflict = conflict_behavior
908 && !version_column_indices.is_empty()
909 {
910 Err(ErrorCode::InvalidParameterValue(
911 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
912 ))?
913 }
914
915 let retention_seconds = context.with_options().retention_seconds();
916
917 let table_required_dist = {
918 let mut bitset = FixedBitSet::with_capacity(columns.len());
919 for idx in &pk_column_indices {
920 bitset.insert(*idx);
921 }
922 RequiredDist::ShardByKey(bitset)
923 };
924
925 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
926
927 if !not_null_idxs.is_empty() {
928 stream_plan =
929 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
930 }
931
932 let refreshable = source_catalog
934 .as_ref()
935 .map(|catalog| catalog.with_properties.is_batch_connector())
936 .unwrap_or(false);
937
938 if refreshable && row_id_index.is_some() {
940 return Err(crate::error::ErrorCode::BindError(
941 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
942 .to_owned(),
943 )
944 .into());
945 }
946
947 StreamMaterialize::create_for_table(
948 stream_plan,
949 table_name,
950 database_id,
951 schema_id,
952 table_required_dist,
953 Order::any(),
954 columns,
955 definition,
956 conflict_behavior,
957 version_column_indices,
958 pk_column_indices,
959 row_id_index,
960 version,
961 retention_seconds,
962 webhook_info,
963 engine,
964 refreshable,
965 )
966 }
967
968 pub fn gen_materialize_plan(
970 self,
971 database_id: DatabaseId,
972 schema_id: SchemaId,
973 mv_name: String,
974 definition: String,
975 emit_on_window_close: bool,
976 ) -> Result<StreamMaterialize> {
977 let cardinality = self.compute_cardinality();
978 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
979 StreamMaterialize::create(
980 stream_plan,
981 mv_name,
982 database_id,
983 schema_id,
984 definition,
985 TableType::MaterializedView,
986 cardinality,
987 None,
988 )
989 }
990
991 pub fn gen_index_plan(
993 self,
994 index_name: String,
995 database_id: DatabaseId,
996 schema_id: SchemaId,
997 definition: String,
998 retention_seconds: Option<NonZeroU32>,
999 ) -> Result<StreamMaterialize> {
1000 let cardinality = self.compute_cardinality();
1001 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1002
1003 StreamMaterialize::create(
1004 stream_plan,
1005 index_name,
1006 database_id,
1007 schema_id,
1008 definition,
1009 TableType::Index,
1010 cardinality,
1011 retention_seconds,
1012 )
1013 }
1014
1015 pub fn gen_vector_index_plan(
1016 self,
1017 index_name: String,
1018 database_id: DatabaseId,
1019 schema_id: SchemaId,
1020 definition: String,
1021 retention_seconds: Option<NonZeroU32>,
1022 vector_index_info: PbVectorIndexInfo,
1023 ) -> Result<StreamVectorIndexWrite> {
1024 let cardinality = self.compute_cardinality();
1025 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1026
1027 StreamVectorIndexWrite::create(
1028 stream_plan,
1029 index_name,
1030 database_id,
1031 schema_id,
1032 definition,
1033 cardinality,
1034 retention_seconds,
1035 vector_index_info,
1036 )
1037 }
1038
1039 #[allow(clippy::too_many_arguments)]
1041 pub fn gen_sink_plan(
1042 self,
1043 sink_name: String,
1044 definition: String,
1045 properties: WithOptionsSecResolved,
1046 emit_on_window_close: bool,
1047 db_name: String,
1048 sink_from_table_name: String,
1049 format_desc: Option<SinkFormatDesc>,
1050 without_backfill: bool,
1051 target_table: Option<Arc<TableCatalog>>,
1052 partition_info: Option<PartitionComputeInfo>,
1053 user_specified_columns: bool,
1054 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1055 ) -> Result<StreamSink> {
1056 let stream_scan_type = if without_backfill {
1057 StreamScanType::UpstreamOnly
1058 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1059 StreamScanType::SnapshotBackfill
1061 } else if self.should_use_arrangement_backfill() {
1062 StreamScanType::ArrangementBackfill
1063 } else {
1064 StreamScanType::Backfill
1065 };
1066 if auto_refresh_schema_from_table.is_some()
1067 && stream_scan_type != StreamScanType::ArrangementBackfill
1068 {
1069 return Err(ErrorCode::InvalidInputSyntax(format!(
1070 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1071 stream_scan_type
1072 ))
1073 .into());
1074 }
1075 let stream_plan =
1076 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1077 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1078 let columns = t.columns_without_rw_timestamp();
1079 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1080 });
1081
1082 StreamSink::create(
1083 stream_plan,
1084 sink_name,
1085 db_name,
1086 sink_from_table_name,
1087 target_table,
1088 target_columns_to_plan_mapping,
1089 definition,
1090 properties,
1091 format_desc,
1092 partition_info,
1093 auto_refresh_schema_from_table,
1094 )
1095 }
1096}
1097
1098impl<P: PlanPhase> PlanRoot<P> {
1099 pub fn should_use_arrangement_backfill(&self) -> bool {
1100 let ctx = self.plan.ctx();
1101 let session_ctx = ctx.session_ctx();
1102 let arrangement_backfill_enabled = session_ctx
1103 .env()
1104 .streaming_config()
1105 .developer
1106 .enable_arrangement_backfill;
1107 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1108 }
1109
1110 pub fn should_use_snapshot_backfill(&self) -> bool {
1111 self.plan
1112 .ctx()
1113 .session_ctx()
1114 .config()
1115 .streaming_use_snapshot_backfill()
1116 }
1117
1118 pub fn target_columns_to_plan_mapping(
1120 &self,
1121 tar_cols: &[ColumnCatalog],
1122 user_specified_columns: bool,
1123 ) -> Vec<Option<usize>> {
1124 #[allow(clippy::disallowed_methods)]
1125 let visible_cols: Vec<(usize, String)> = self
1126 .out_fields
1127 .ones()
1128 .zip_eq(self.out_names.iter().cloned())
1129 .collect_vec();
1130
1131 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1132 let visible_col_idxes_by_name = visible_cols
1133 .iter()
1134 .map(|(i, name)| (name.as_ref(), *i))
1135 .collect::<BTreeMap<_, _>>();
1136
1137 tar_cols
1138 .iter()
1139 .enumerate()
1140 .filter(|(_, tar_col)| tar_col.can_dml())
1141 .map(|(tar_i, tar_col)| {
1142 if user_specified_columns {
1143 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1144 } else {
1145 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1146 }
1147 })
1148 .collect()
1149 }
1150}
1151
1152fn find_version_column_indices(
1153 column_catalog: &Vec<ColumnCatalog>,
1154 version_column_names: Vec<String>,
1155) -> Result<Vec<usize>> {
1156 let mut indices = Vec::new();
1157 for version_column_name in version_column_names {
1158 let mut found = false;
1159 for (index, column) in column_catalog.iter().enumerate() {
1160 if column.column_desc.name == version_column_name {
1161 if let &DataType::Jsonb
1162 | &DataType::List(_)
1163 | &DataType::Struct(_)
1164 | &DataType::Bytea
1165 | &DataType::Boolean = column.data_type()
1166 {
1167 return Err(ErrorCode::InvalidInputSyntax(format!(
1168 "Version column {} must be of a comparable data type",
1169 version_column_name
1170 ))
1171 .into());
1172 }
1173 indices.push(index);
1174 found = true;
1175 break;
1176 }
1177 }
1178 if !found {
1179 return Err(ErrorCode::InvalidInputSyntax(format!(
1180 "Version column {} not found",
1181 version_column_name
1182 ))
1183 .into());
1184 }
1185 }
1186 Ok(indices)
1187}
1188
1189fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1190 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1191
1192 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1193 if let Some(error) = const_eval_rewriter.error {
1194 return Err(error);
1195 }
1196 Ok(plan)
1197}
1198
1199fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1200 ctx: OptimizerContextRef,
1201 plan: PlanRef<C>,
1202) -> Result<PlanRef<C>> {
1203 let mut v = TimestamptzExprFinder::default();
1204 plan.visit_exprs_recursive(&mut v);
1205 if v.has() {
1206 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1207 } else {
1208 Ok(plan)
1209 }
1210}
1211
1212fn exist_and_no_exchange_before(
1213 plan: &BatchPlanRef,
1214 is_candidate: fn(&BatchPlanRef) -> bool,
1215) -> bool {
1216 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1217 return false;
1218 }
1219 is_candidate(plan)
1220 || plan
1221 .inputs()
1222 .iter()
1223 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1224}
1225
1226fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1232 fn is_user_table(plan: &BatchPlanRef) -> bool {
1233 plan.node_type() == BatchPlanNodeType::BatchSeqScan
1234 }
1235
1236 fn is_log_table(plan: &BatchPlanRef) -> bool {
1237 plan.node_type() == BatchPlanNodeType::BatchLogSeqScan
1238 }
1239
1240 fn is_source(plan: &BatchPlanRef) -> bool {
1241 plan.node_type() == BatchPlanNodeType::BatchSource
1242 || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1243 || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1244 }
1245
1246 fn is_insert(plan: &BatchPlanRef) -> bool {
1247 plan.node_type() == BatchPlanNodeType::BatchInsert
1248 }
1249
1250 fn is_update(plan: &BatchPlanRef) -> bool {
1251 plan.node_type() == BatchPlanNodeType::BatchUpdate
1252 }
1253
1254 fn is_delete(plan: &BatchPlanRef) -> bool {
1255 plan.node_type() == BatchPlanNodeType::BatchDelete
1256 }
1257
1258 assert_eq!(plan.distribution(), &Distribution::Single);
1259 exist_and_no_exchange_before(&plan, is_user_table)
1260 || exist_and_no_exchange_before(&plan, is_source)
1261 || exist_and_no_exchange_before(&plan, is_insert)
1262 || exist_and_no_exchange_before(&plan, is_update)
1263 || exist_and_no_exchange_before(&plan, is_delete)
1264 || exist_and_no_exchange_before(&plan, is_log_table)
1265}
1266
1267fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1270 fn is_user_table(plan: &BatchPlanRef) -> bool {
1271 plan.node_type() == BatchPlanNodeType::BatchSeqScan
1272 || plan.node_type() == BatchPlanNodeType::BatchVectorSearch
1273 }
1274
1275 fn is_source(plan: &BatchPlanRef) -> bool {
1276 plan.node_type() == BatchPlanNodeType::BatchSource
1277 || plan.node_type() == BatchPlanNodeType::BatchKafkaScan
1278 || plan.node_type() == BatchPlanNodeType::BatchIcebergScan
1279 }
1280
1281 fn is_insert(plan: &BatchPlanRef) -> bool {
1282 plan.node_type() == BatchPlanNodeType::BatchInsert
1283 }
1284
1285 assert_eq!(plan.distribution(), &Distribution::Single);
1286 exist_and_no_exchange_before(&plan, is_user_table)
1287 || exist_and_no_exchange_before(&plan, is_source)
1288 || exist_and_no_exchange_before(&plan, is_insert)
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use super::*;
1294 use crate::optimizer::plan_node::LogicalValues;
1295
1296 #[tokio::test]
1297 async fn test_as_subplan() {
1298 let ctx = OptimizerContext::mock().await;
1299 let values = LogicalValues::new(
1300 vec![],
1301 Schema::new(vec![
1302 Field::with_name(DataType::Int32, "v1"),
1303 Field::with_name(DataType::Varchar, "v2"),
1304 ]),
1305 ctx,
1306 )
1307 .into();
1308 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1309 let out_names = vec!["v1".into()];
1310 let root = PlanRoot::new_with_logical_plan(
1311 values,
1312 RequiredDist::Any,
1313 Order::any(),
1314 out_fields,
1315 out_names,
1316 );
1317 let subplan = root.into_unordered_subplan();
1318 assert_eq!(
1319 subplan.schema(),
1320 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1321 );
1322 }
1323}