1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
36};
37
38pub mod backfill_order_strategy;
39mod logical_optimization;
40mod optimizer_context;
41pub mod plan_expr_rewriter;
42mod plan_expr_visitor;
43mod rule;
44
45use std::collections::{BTreeMap, HashMap};
46use std::marker::PhantomData;
47
48use educe::Educe;
49use fixedbitset::FixedBitSet;
50use itertools::Itertools as _;
51pub use logical_optimization::*;
52pub use optimizer_context::*;
53use plan_expr_rewriter::ConstEvalRewriter;
54use property::Order;
55use risingwave_common::bail;
56use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
57use risingwave_common::types::DataType;
58use risingwave_common::util::column_index_mapping::ColIndexMapping;
59use risingwave_common::util::iter_util::ZipEqDebug;
60use risingwave_connector::WithPropertiesExt;
61use risingwave_connector::sink::catalog::SinkFormatDesc;
62use risingwave_pb::stream_plan::StreamScanType;
63
64use self::heuristic_optimizer::ApplyOrder;
65use self::plan_node::generic::{self, PhysicalPlanRef};
66use self::plan_node::{
67 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
68 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
69 ToStreamContext, stream_enforce_eowc_requirement,
70};
71#[cfg(debug_assertions)]
72use self::plan_visitor::InputRefValidator;
73use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
74use self::property::{Cardinality, RequiredDist};
75use self::rule::*;
76use crate::TableCatalog;
77use crate::catalog::table_catalog::TableType;
78use crate::catalog::{DatabaseId, SchemaId};
79use crate::error::{ErrorCode, Result};
80use crate::expr::TimestamptzExprFinder;
81use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
82use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
83use crate::optimizer::plan_node::{
84 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
85 StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
86 ToStream, VisitExprsRecursive,
87};
88use crate::optimizer::plan_visitor::{
89 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
90};
91use crate::optimizer::property::Distribution;
92use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
93
94#[derive(Educe)]
105#[educe(Debug, Clone)]
106pub struct PlanRoot<P: PlanPhase> {
107 pub plan: PlanRef<P::Convention>,
109 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
111 _phase: PhantomData<P>,
112 required_dist: RequiredDist,
113 required_order: Order,
114 out_fields: FixedBitSet,
115 out_names: Vec<String>,
116}
117
118pub trait PlanPhase {
124 type Convention: ConventionMarker;
125}
126
127macro_rules! for_all_phase {
128 () => {
129 for_all_phase! {
130 { Logical, $crate::optimizer::plan_node::Logical },
131 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
132 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
133 { Batch, $crate::optimizer::plan_node::Batch },
134 { Stream, $crate::optimizer::plan_node::Stream }
135 }
136 };
137 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
138 $(
139 paste::paste! {
140 pub struct [< PlanPhase$phase >];
141 impl PlanPhase for [< PlanPhase$phase >] {
142 type Convention = $convention;
143 }
144 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
145 }
146 )+
147 }
148}
149
150for_all_phase!();
151
152impl LogicalPlanRoot {
153 pub fn new_with_logical_plan(
154 plan: LogicalPlanRef,
155 required_dist: RequiredDist,
156 required_order: Order,
157 out_fields: FixedBitSet,
158 out_names: Vec<String>,
159 ) -> Self {
160 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
161 }
162}
163
164impl BatchPlanRoot {
165 pub fn new_with_batch_plan(
166 plan: BatchPlanRef,
167 required_dist: RequiredDist,
168 required_order: Order,
169 out_fields: FixedBitSet,
170 out_names: Vec<String>,
171 ) -> Self {
172 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
173 }
174}
175
176impl<P: PlanPhase> PlanRoot<P> {
177 fn new_inner(
178 plan: PlanRef<P::Convention>,
179 required_dist: RequiredDist,
180 required_order: Order,
181 out_fields: FixedBitSet,
182 out_names: Vec<String>,
183 ) -> Self {
184 let input_schema = plan.schema();
185 assert_eq!(input_schema.fields().len(), out_fields.len());
186 assert_eq!(out_fields.count_ones(..), out_names.len());
187
188 Self {
189 plan,
190 _phase: PhantomData,
191 required_dist,
192 required_order,
193 out_fields,
194 out_names,
195 }
196 }
197
198 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
199 PlanRoot {
200 plan,
201 _phase: PhantomData,
202 required_dist: self.required_dist,
203 required_order: self.required_order,
204 out_fields: self.out_fields,
205 out_names: self.out_names,
206 }
207 }
208
209 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
214 if out_names.len() != self.out_fields.count_ones(..) {
215 Err(ErrorCode::InvalidInputSyntax(
216 "number of column names does not match number of columns".to_owned(),
217 ))?
218 }
219 self.out_names = out_names;
220 Ok(())
221 }
222
223 pub fn schema(&self) -> Schema {
225 Schema {
228 fields: self
229 .out_fields
230 .ones()
231 .map(|i| self.plan.schema().fields()[i].clone())
232 .zip_eq_debug(&self.out_names)
233 .map(|(field, name)| Field {
234 name: name.clone(),
235 ..field
236 })
237 .collect(),
238 }
239 }
240}
241
242impl LogicalPlanRoot {
243 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
247 if self.out_fields.count_ones(..) == self.out_fields.len() {
248 return self.plan;
249 }
250 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
251 }
252
253 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
257 use generic::Agg;
258 use plan_node::PlanAggCall;
259 use risingwave_common::types::ListValue;
260 use risingwave_expr::aggregate::PbAggKind;
261
262 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
263 use crate::utils::{Condition, IndexSet};
264
265 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
266 bail!("subquery must return only one column");
267 };
268 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
269 let return_type = DataType::list(input_column_type.clone());
270 let agg = Agg::new(
271 vec![PlanAggCall {
272 agg_type: PbAggKind::ArrayAgg.into(),
273 return_type: return_type.clone(),
274 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
275 distinct: false,
276 order_by: self.required_order.column_orders,
277 filter: Condition::true_cond(),
278 direct_args: vec![],
279 }],
280 IndexSet::empty(),
281 self.plan,
282 );
283 Ok(LogicalProject::create(
284 agg.into(),
285 vec![
286 FunctionCall::new(
287 ExprType::Coalesce,
288 vec![
289 InputRef::new(0, return_type).into(),
290 ExprImpl::literal_list(
291 ListValue::empty(&input_column_type),
292 input_column_type,
293 ),
294 ],
295 )
296 .unwrap()
297 .into(),
298 ],
299 ))
300 }
301
302 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
304 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
305 Ok(self)
306 }
307
308 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
310 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
311 Ok(self.into_phase(plan))
312 }
313
314 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
315 self.gen_optimized_logical_plan_for_batch()?
316 .gen_batch_plan()
317 }
318}
319
320impl BatchOptimizedLogicalPlanRoot {
321 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
323 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
324 return Err(ErrorCode::NotSupported(
325 "do not support temporal join for batch queries".to_owned(),
326 "please use temporal join in streaming queries".to_owned(),
327 )
328 .into());
329 }
330
331 let ctx = self.plan.ctx();
332 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
334
335 plan = const_eval_exprs(plan)?;
337
338 if ctx.is_explain_trace() {
339 ctx.trace("Const eval exprs:");
340 ctx.trace(plan.explain_to_string());
341 }
342
343 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
345 if ctx.is_explain_trace() {
346 ctx.trace("To Batch Plan:");
347 ctx.trace(plan.explain_to_string());
348 }
349
350 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
351 "Merge BatchProject",
352 vec![BatchProjectMergeRule::create()],
353 ApplyOrder::BottomUp,
354 ))?;
355
356 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
358
359 if ctx.is_explain_trace() {
360 ctx.trace("Inline Session Timezone:");
361 ctx.trace(plan.explain_to_string());
362 }
363
364 #[cfg(debug_assertions)]
365 InputRefValidator.validate(plan.clone());
366 assert_eq!(
367 *plan.distribution(),
368 Distribution::Single,
369 "{}",
370 plan.explain_to_string()
371 );
372 assert!(
373 !has_batch_exchange(plan.clone()),
374 "{}",
375 plan.explain_to_string()
376 );
377
378 let ctx = plan.ctx();
379 if ctx.is_explain_trace() {
380 ctx.trace("To Batch Physical Plan:");
381 ctx.trace(plan.explain_to_string());
382 }
383
384 Ok(self.into_phase(plan))
385 }
386}
387
388impl BatchPlanRoot {
389 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
391 self.required_dist = RequiredDist::single();
392 let mut plan = self.plan;
393
394 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
396
397 let ctx = plan.ctx();
398 if ctx.is_explain_trace() {
399 ctx.trace("To Batch Distributed Plan:");
400 ctx.trace(plan.explain_to_string());
401 }
402 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
403 plan =
404 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
405 }
406
407 if self.out_fields.count_ones(..) != self.out_fields.len() {
409 plan =
410 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
411 }
412
413 let plan = plan.optimize_by_rules(&OptimizationStage::new(
415 "Push Limit To Scan",
416 vec![BatchPushLimitToScanRule::create()],
417 ApplyOrder::BottomUp,
418 ))?;
419
420 let plan = plan.optimize_by_rules(&OptimizationStage::new(
421 "Iceberg Count Star",
422 vec![BatchIcebergCountStar::create()],
423 ApplyOrder::TopDown,
424 ))?;
425
426 let plan = plan.optimize_by_rules(&OptimizationStage::new(
429 "Iceberg Predicate Pushdown",
430 vec![BatchIcebergPredicatePushDownRule::create()],
431 ApplyOrder::BottomUp,
432 ))?;
433
434 Ok(plan)
435 }
436
437 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
439 let mut plan = self.plan;
440
441 plan = plan.to_local_with_order_required(&self.required_order)?;
443
444 let insert_exchange = match plan.distribution() {
447 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
448 _ => true,
449 };
450 if insert_exchange {
451 plan =
452 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
453 }
454
455 if self.out_fields.count_ones(..) != self.out_fields.len() {
457 plan =
458 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
459 }
460
461 let ctx = plan.ctx();
462 if ctx.is_explain_trace() {
463 ctx.trace("To Batch Local Plan:");
464 ctx.trace(plan.explain_to_string());
465 }
466
467 let plan = plan.optimize_by_rules(&OptimizationStage::new(
469 "Push Limit To Scan",
470 vec![BatchPushLimitToScanRule::create()],
471 ApplyOrder::BottomUp,
472 ))?;
473
474 let plan = plan.optimize_by_rules(&OptimizationStage::new(
475 "Iceberg Count Star",
476 vec![BatchIcebergCountStar::create()],
477 ApplyOrder::TopDown,
478 ))?;
479 Ok(plan)
480 }
481}
482
483impl LogicalPlanRoot {
484 fn gen_optimized_stream_plan(
486 self,
487 emit_on_window_close: bool,
488 allow_snapshot_backfill: bool,
489 ) -> Result<StreamOptimizedLogicalPlanRoot> {
490 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
491 StreamScanType::SnapshotBackfill
492 } else if self.should_use_arrangement_backfill() {
493 StreamScanType::ArrangementBackfill
494 } else {
495 StreamScanType::Backfill
496 };
497 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
498 }
499
500 fn gen_optimized_stream_plan_inner(
501 self,
502 emit_on_window_close: bool,
503 stream_scan_type: StreamScanType,
504 ) -> Result<StreamOptimizedLogicalPlanRoot> {
505 let ctx = self.plan.ctx();
506 let _explain_trace = ctx.is_explain_trace();
507
508 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
509
510 let mut plan = optimized_plan
511 .plan
512 .clone()
513 .optimize_by_rules(&OptimizationStage::new(
514 "Merge StreamProject",
515 vec![StreamProjectMergeRule::create()],
516 ApplyOrder::BottomUp,
517 ))?;
518
519 if ctx
520 .session_ctx()
521 .config()
522 .streaming_separate_consecutive_join()
523 {
524 plan = plan.optimize_by_rules(&OptimizationStage::new(
525 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
526 vec![SeparateConsecutiveJoinRule::create()],
527 ApplyOrder::BottomUp,
528 ))?;
529 }
530
531 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
535 plan = plan.optimize_by_rules(&OptimizationStage::new(
536 "Add Logstore for Unaligned join",
537 vec![AddLogstoreRule::create()],
538 ApplyOrder::BottomUp,
539 ))?;
540 }
541
542 if ctx.session_ctx().config().streaming_enable_delta_join()
543 && ctx.session_ctx().config().enable_index_selection()
544 {
545 plan = plan.optimize_by_rules(&OptimizationStage::new(
548 "To IndexDeltaJoin",
549 vec![IndexDeltaJoinRule::create()],
550 ApplyOrder::BottomUp,
551 ))?;
552 }
553 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
555
556 if ctx.is_explain_trace() {
557 ctx.trace("Inline session timezone:");
558 ctx.trace(plan.explain_to_string());
559 }
560
561 plan = const_eval_exprs(plan)?;
563
564 if ctx.is_explain_trace() {
565 ctx.trace("Const eval exprs:");
566 ctx.trace(plan.explain_to_string());
567 }
568
569 #[cfg(debug_assertions)]
570 InputRefValidator.validate(plan.clone());
571
572 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
573 return Err(ErrorCode::NotSupported(
574 "exist dangling temporal scan".to_owned(),
575 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
576 ).into());
577 }
578
579 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
580 return Err(ErrorCode::NotSupported(
581 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
582 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
583 ).into());
584 }
585
586 if ctx.session_ctx().config().enable_locality_backfill()
587 && LocalityProviderCounter::count(plan.clone()) > 5
588 {
589 risingwave_common::license::Feature::LocalityBackfill.check_available()?
590 }
591
592 Ok(optimized_plan.into_phase(plan))
593 }
594
595 fn gen_stream_plan(
597 self,
598 emit_on_window_close: bool,
599 stream_scan_type: StreamScanType,
600 ) -> Result<StreamOptimizedLogicalPlanRoot> {
601 let ctx = self.plan.ctx();
602 let explain_trace = ctx.is_explain_trace();
603
604 let plan = {
605 {
606 if !ctx
607 .session_ctx()
608 .config()
609 .streaming_allow_jsonb_in_stream_key()
610 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
611 {
612 return Err(ErrorCode::NotSupported(
613 err,
614 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
615 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
616 ).into());
617 }
618 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
619 let (plan, out_col_change) = {
620 let (plan, out_col_change) = optimized_plan
621 .plan
622 .logical_rewrite_for_stream(&mut Default::default())?;
623 if out_col_change.is_injective() {
624 (plan, out_col_change)
625 } else {
626 let mut output_indices = (0..plan.schema().len()).collect_vec();
627 #[allow(unused_assignments)]
628 let (mut map, mut target_size) = out_col_change.into_parts();
629
630 target_size = plan.schema().len();
633 let mut tar_exists = vec![false; target_size];
634 for i in map.iter_mut().flatten() {
635 if tar_exists[*i] {
636 output_indices.push(*i);
637 *i = target_size;
638 target_size += 1;
639 } else {
640 tar_exists[*i] = true;
641 }
642 }
643 let plan =
644 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
645 let out_col_change = ColIndexMapping::new(map, target_size);
646 (plan.into(), out_col_change)
647 }
648 };
649
650 if explain_trace {
651 ctx.trace("Logical Rewrite For Stream:");
652 ctx.trace(plan.explain_to_string());
653 }
654
655 optimized_plan.required_dist =
656 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
657 optimized_plan.required_order = out_col_change
658 .rewrite_required_order(&optimized_plan.required_order)
659 .unwrap();
660 optimized_plan.out_fields =
661 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
662 let mut plan = plan.to_stream_with_dist_required(
663 &optimized_plan.required_dist,
664 &mut ToStreamContext::new_with_stream_scan_type(
665 emit_on_window_close,
666 stream_scan_type,
667 ),
668 )?;
669 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
670 optimized_plan.into_phase(plan)
671 }
672 };
673
674 if explain_trace {
675 ctx.trace("To Stream Plan:");
676 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
678 }
679 Ok(plan)
680 }
681
682 fn compute_cardinality(&self) -> Cardinality {
686 CardinalityVisitor.visit(self.plan.clone())
687 }
688
689 pub fn gen_table_plan(
691 self,
692 context: OptimizerContextRef,
693 table_name: String,
694 database_id: DatabaseId,
695 schema_id: SchemaId,
696 CreateTableInfo {
697 columns,
698 pk_column_ids,
699 row_id_index,
700 watermark_descs,
701 source_catalog,
702 version,
703 }: CreateTableInfo,
704 CreateTableProps {
705 definition,
706 append_only,
707 on_conflict,
708 with_version_columns,
709 webhook_info,
710 engine,
711 }: CreateTableProps,
712 ) -> Result<StreamMaterialize> {
713 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
715
716 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
717
718 let pk_column_indices = {
719 let mut id_to_idx = HashMap::new();
720
721 columns.iter().enumerate().for_each(|(idx, c)| {
722 id_to_idx.insert(c.column_id(), idx);
723 });
724 pk_column_ids
725 .iter()
726 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
728 };
729
730 fn inject_project_for_generated_column_if_needed(
731 columns: &[ColumnCatalog],
732 node: StreamPlanRef,
733 ) -> Result<StreamPlanRef> {
734 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
735 if let Some(exprs) = exprs {
736 let logical_project = generic::Project::new(exprs, node);
737 return Ok(StreamProject::new(logical_project).into());
738 }
739 Ok(node)
740 }
741
742 #[derive(PartialEq, Debug, Copy, Clone)]
743 enum PrimaryKeyKind {
744 UserDefinedPrimaryKey,
745 NonAppendOnlyRowIdPk,
746 AppendOnlyRowIdPk,
747 }
748
749 fn inject_dml_node(
750 columns: &[ColumnCatalog],
751 append_only: bool,
752 stream_plan: StreamPlanRef,
753 pk_column_indices: &[usize],
754 kind: PrimaryKeyKind,
755 column_descs: Vec<ColumnDesc>,
756 ) -> Result<StreamPlanRef> {
757 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
758
759 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
761
762 dml_node = match kind {
763 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
764 RequiredDist::hash_shard(pk_column_indices)
765 .streaming_enforce_if_not_satisfies(dml_node)?
766 }
767 PrimaryKeyKind::AppendOnlyRowIdPk => {
768 StreamExchange::new_no_shuffle(dml_node).into()
769 }
770 };
771
772 Ok(dml_node)
773 }
774
775 let kind = if let Some(row_id_index) = row_id_index {
776 assert_eq!(
777 pk_column_indices.iter().exactly_one().copied().unwrap(),
778 row_id_index
779 );
780 if append_only {
781 PrimaryKeyKind::AppendOnlyRowIdPk
782 } else {
783 PrimaryKeyKind::NonAppendOnlyRowIdPk
784 }
785 } else {
786 PrimaryKeyKind::UserDefinedPrimaryKey
787 };
788
789 let column_descs: Vec<ColumnDesc> = columns
790 .iter()
791 .filter(|&c| c.can_dml())
792 .map(|c| c.column_desc.clone())
793 .collect();
794
795 let mut not_null_idxs = vec![];
796 for (idx, column) in column_descs.iter().enumerate() {
797 if !column.nullable {
798 not_null_idxs.push(idx);
799 }
800 }
801
802 let version_column_indices = if !with_version_columns.is_empty() {
803 find_version_column_indices(&columns, with_version_columns)?
804 } else {
805 vec![]
806 };
807
808 let with_external_source = source_catalog.is_some();
809 let (dml_source_node, external_source_node) = if with_external_source {
810 let dummy_source_node = LogicalSource::new(
811 None,
812 columns.clone(),
813 row_id_index,
814 SourceNodeKind::CreateTable,
815 context.clone(),
816 None,
817 )
818 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
819 let mut external_source_node = stream_plan.plan;
820 external_source_node =
821 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
822 external_source_node = match kind {
823 PrimaryKeyKind::UserDefinedPrimaryKey => {
824 RequiredDist::hash_shard(&pk_column_indices)
825 .streaming_enforce_if_not_satisfies(external_source_node)?
826 }
827
828 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
829 StreamExchange::new_no_shuffle(external_source_node).into()
830 }
831 };
832 (dummy_source_node, Some(external_source_node))
833 } else {
834 (stream_plan.plan, None)
835 };
836
837 let dml_node = inject_dml_node(
838 &columns,
839 append_only,
840 dml_source_node,
841 &pk_column_indices,
842 kind,
843 column_descs,
844 )?;
845
846 let dists = external_source_node
847 .iter()
848 .map(|input| input.distribution())
849 .chain([dml_node.distribution()])
850 .unique()
851 .collect_vec();
852
853 let dist = match &dists[..] {
854 &[Distribution::SomeShard, Distribution::HashShard(_)]
855 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
856 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
857 dist.clone()
858 }
859 _ => {
860 unreachable!()
861 }
862 };
863
864 let generated_column_exprs =
865 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
866 let upstream_sink_union = StreamUpstreamSinkUnion::new(
867 context.clone(),
868 dml_node.schema(),
869 dml_node.stream_key(),
870 dist.clone(), append_only,
872 row_id_index.is_none(),
873 generated_column_exprs,
874 );
875
876 let union_inputs = external_source_node
877 .into_iter()
878 .chain([dml_node, upstream_sink_union.into()])
879 .collect_vec();
880
881 let mut stream_plan = StreamUnion::new_with_dist(
882 Union {
883 all: true,
884 inputs: union_inputs,
885 source_col: None,
886 },
887 dist,
888 )
889 .into();
890
891 if !watermark_descs.is_empty() {
893 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
894 }
895
896 if let Some(row_id_index) = row_id_index {
898 match kind {
899 PrimaryKeyKind::UserDefinedPrimaryKey => {
900 unreachable!()
901 }
902 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
903 stream_plan = StreamRowIdGen::new_with_dist(
904 stream_plan,
905 row_id_index,
906 Distribution::HashShard(vec![row_id_index]),
907 )
908 .into();
909 }
910 }
911 }
912
913 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
914
915 if let ConflictBehavior::IgnoreConflict = conflict_behavior
916 && !version_column_indices.is_empty()
917 {
918 Err(ErrorCode::InvalidParameterValue(
919 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
920 ))?
921 }
922
923 let retention_seconds = context.with_options().retention_seconds();
924
925 let table_required_dist = {
926 let mut bitset = FixedBitSet::with_capacity(columns.len());
927 for idx in &pk_column_indices {
928 bitset.insert(*idx);
929 }
930 RequiredDist::ShardByKey(bitset)
931 };
932
933 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
934
935 if !not_null_idxs.is_empty() {
936 stream_plan =
937 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
938 }
939
940 let refreshable = source_catalog
942 .as_ref()
943 .map(|catalog| catalog.with_properties.is_batch_connector())
944 .unwrap_or(false);
945
946 if refreshable && row_id_index.is_some() {
948 return Err(crate::error::ErrorCode::BindError(
949 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
950 .to_owned(),
951 )
952 .into());
953 }
954
955 StreamMaterialize::create_for_table(
956 stream_plan,
957 table_name,
958 database_id,
959 schema_id,
960 table_required_dist,
961 Order::any(),
962 columns,
963 definition,
964 conflict_behavior,
965 version_column_indices,
966 pk_column_indices,
967 row_id_index,
968 version,
969 retention_seconds,
970 webhook_info,
971 engine,
972 refreshable,
973 )
974 }
975
976 pub fn gen_materialize_plan(
978 self,
979 database_id: DatabaseId,
980 schema_id: SchemaId,
981 mv_name: String,
982 definition: String,
983 emit_on_window_close: bool,
984 ) -> Result<StreamMaterialize> {
985 let cardinality = self.compute_cardinality();
986 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
987 StreamMaterialize::create(
988 stream_plan,
989 mv_name,
990 database_id,
991 schema_id,
992 definition,
993 TableType::MaterializedView,
994 cardinality,
995 None,
996 )
997 }
998
999 pub fn gen_index_plan(
1001 self,
1002 index_name: String,
1003 database_id: DatabaseId,
1004 schema_id: SchemaId,
1005 definition: String,
1006 retention_seconds: Option<NonZeroU32>,
1007 ) -> Result<StreamMaterialize> {
1008 let cardinality = self.compute_cardinality();
1009 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1010
1011 StreamMaterialize::create(
1012 stream_plan,
1013 index_name,
1014 database_id,
1015 schema_id,
1016 definition,
1017 TableType::Index,
1018 cardinality,
1019 retention_seconds,
1020 )
1021 }
1022
1023 pub fn gen_vector_index_plan(
1024 self,
1025 index_name: String,
1026 database_id: DatabaseId,
1027 schema_id: SchemaId,
1028 definition: String,
1029 retention_seconds: Option<NonZeroU32>,
1030 vector_index_info: PbVectorIndexInfo,
1031 ) -> Result<StreamVectorIndexWrite> {
1032 let cardinality = self.compute_cardinality();
1033 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1034
1035 StreamVectorIndexWrite::create(
1036 stream_plan,
1037 index_name,
1038 database_id,
1039 schema_id,
1040 definition,
1041 cardinality,
1042 retention_seconds,
1043 vector_index_info,
1044 )
1045 }
1046
1047 #[allow(clippy::too_many_arguments)]
1049 pub fn gen_sink_plan(
1050 self,
1051 sink_name: String,
1052 definition: String,
1053 properties: WithOptionsSecResolved,
1054 emit_on_window_close: bool,
1055 db_name: String,
1056 sink_from_table_name: String,
1057 format_desc: Option<SinkFormatDesc>,
1058 without_backfill: bool,
1059 target_table: Option<Arc<TableCatalog>>,
1060 partition_info: Option<PartitionComputeInfo>,
1061 user_specified_columns: bool,
1062 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1063 ) -> Result<StreamSink> {
1064 let stream_scan_type = if without_backfill {
1065 StreamScanType::UpstreamOnly
1066 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1067 StreamScanType::SnapshotBackfill
1069 } else if self.should_use_arrangement_backfill() {
1070 StreamScanType::ArrangementBackfill
1071 } else {
1072 StreamScanType::Backfill
1073 };
1074 if auto_refresh_schema_from_table.is_some()
1075 && stream_scan_type != StreamScanType::ArrangementBackfill
1076 {
1077 return Err(ErrorCode::InvalidInputSyntax(format!(
1078 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1079 stream_scan_type
1080 ))
1081 .into());
1082 }
1083 let stream_plan =
1084 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1085 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1086 let columns = t.columns_without_rw_timestamp();
1087 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1088 });
1089
1090 StreamSink::create(
1091 stream_plan,
1092 sink_name,
1093 db_name,
1094 sink_from_table_name,
1095 target_table,
1096 target_columns_to_plan_mapping,
1097 definition,
1098 properties,
1099 format_desc,
1100 partition_info,
1101 auto_refresh_schema_from_table,
1102 )
1103 }
1104}
1105
1106impl<P: PlanPhase> PlanRoot<P> {
1107 pub fn should_use_arrangement_backfill(&self) -> bool {
1108 let ctx = self.plan.ctx();
1109 let session_ctx = ctx.session_ctx();
1110 let arrangement_backfill_enabled = session_ctx
1111 .env()
1112 .streaming_config()
1113 .developer
1114 .enable_arrangement_backfill;
1115 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1116 }
1117
1118 pub fn should_use_snapshot_backfill(&self) -> bool {
1119 self.plan
1120 .ctx()
1121 .session_ctx()
1122 .config()
1123 .streaming_use_snapshot_backfill()
1124 }
1125
1126 pub fn target_columns_to_plan_mapping(
1128 &self,
1129 tar_cols: &[ColumnCatalog],
1130 user_specified_columns: bool,
1131 ) -> Vec<Option<usize>> {
1132 #[allow(clippy::disallowed_methods)]
1133 let visible_cols: Vec<(usize, String)> = self
1134 .out_fields
1135 .ones()
1136 .zip_eq(self.out_names.iter().cloned())
1137 .collect_vec();
1138
1139 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1140 let visible_col_idxes_by_name = visible_cols
1141 .iter()
1142 .map(|(i, name)| (name.as_ref(), *i))
1143 .collect::<BTreeMap<_, _>>();
1144
1145 tar_cols
1146 .iter()
1147 .enumerate()
1148 .filter(|(_, tar_col)| tar_col.can_dml())
1149 .map(|(tar_i, tar_col)| {
1150 if user_specified_columns {
1151 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1152 } else {
1153 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1154 }
1155 })
1156 .collect()
1157 }
1158}
1159
1160fn find_version_column_indices(
1161 column_catalog: &Vec<ColumnCatalog>,
1162 version_column_names: Vec<String>,
1163) -> Result<Vec<usize>> {
1164 let mut indices = Vec::new();
1165 for version_column_name in version_column_names {
1166 let mut found = false;
1167 for (index, column) in column_catalog.iter().enumerate() {
1168 if column.column_desc.name == version_column_name {
1169 if let &DataType::Jsonb
1170 | &DataType::List(_)
1171 | &DataType::Struct(_)
1172 | &DataType::Bytea
1173 | &DataType::Boolean = column.data_type()
1174 {
1175 return Err(ErrorCode::InvalidInputSyntax(format!(
1176 "Version column {} must be of a comparable data type",
1177 version_column_name
1178 ))
1179 .into());
1180 }
1181 indices.push(index);
1182 found = true;
1183 break;
1184 }
1185 }
1186 if !found {
1187 return Err(ErrorCode::InvalidInputSyntax(format!(
1188 "Version column {} not found",
1189 version_column_name
1190 ))
1191 .into());
1192 }
1193 }
1194 Ok(indices)
1195}
1196
1197fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1198 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1199
1200 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1201 if let Some(error) = const_eval_rewriter.error {
1202 return Err(error);
1203 }
1204 Ok(plan)
1205}
1206
1207fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1208 ctx: OptimizerContextRef,
1209 plan: PlanRef<C>,
1210) -> Result<PlanRef<C>> {
1211 let mut v = TimestamptzExprFinder::default();
1212 plan.visit_exprs_recursive(&mut v);
1213 if v.has() {
1214 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1215 } else {
1216 Ok(plan)
1217 }
1218}
1219
1220fn exist_and_no_exchange_before(
1221 plan: &BatchPlanRef,
1222 is_candidate: fn(&BatchPlanRef) -> bool,
1223) -> bool {
1224 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1225 return false;
1226 }
1227 is_candidate(plan)
1228 || plan
1229 .inputs()
1230 .iter()
1231 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1232}
1233
1234impl BatchPlanRef {
1235 fn is_user_table_scan(&self) -> bool {
1236 self.node_type() == BatchPlanNodeType::BatchSeqScan
1237 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1238 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1239 }
1240
1241 fn is_source_scan(&self) -> bool {
1242 self.node_type() == BatchPlanNodeType::BatchSource
1243 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1244 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1245 }
1246
1247 fn is_insert(&self) -> bool {
1248 self.node_type() == BatchPlanNodeType::BatchInsert
1249 }
1250
1251 fn is_update(&self) -> bool {
1252 self.node_type() == BatchPlanNodeType::BatchUpdate
1253 }
1254
1255 fn is_delete(&self) -> bool {
1256 self.node_type() == BatchPlanNodeType::BatchDelete
1257 }
1258}
1259
1260fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1266 assert_eq!(plan.distribution(), &Distribution::Single);
1267 exist_and_no_exchange_before(&plan, |plan| {
1268 plan.is_user_table_scan()
1269 || plan.is_source_scan()
1270 || plan.is_insert()
1271 || plan.is_update()
1272 || plan.is_delete()
1273 })
1274}
1275
1276fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1279 assert_eq!(plan.distribution(), &Distribution::Single);
1280 exist_and_no_exchange_before(&plan, |plan| {
1281 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1282 })
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287 use super::*;
1288 use crate::optimizer::plan_node::LogicalValues;
1289
1290 #[tokio::test]
1291 async fn test_as_subplan() {
1292 let ctx = OptimizerContext::mock().await;
1293 let values = LogicalValues::new(
1294 vec![],
1295 Schema::new(vec![
1296 Field::with_name(DataType::Int32, "v1"),
1297 Field::with_name(DataType::Varchar, "v2"),
1298 ]),
1299 ctx,
1300 )
1301 .into();
1302 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1303 let out_names = vec!["v1".into()];
1304 let root = PlanRoot::new_with_logical_plan(
1305 values,
1306 RequiredDist::Any,
1307 Order::any(),
1308 out_fields,
1309 out_names,
1310 );
1311 let subplan = root.into_unordered_subplan();
1312 assert_eq!(
1313 subplan.schema(),
1314 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1315 );
1316 }
1317}