1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34pub use plan_visitor::{
35 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
36};
37use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
38
39pub mod backfill_order_strategy;
40mod logical_optimization;
41mod optimizer_context;
42pub mod plan_expr_rewriter;
43mod plan_expr_visitor;
44mod rule;
45
46use std::collections::{BTreeMap, HashMap};
47use std::marker::PhantomData;
48
49use educe::Educe;
50use fixedbitset::FixedBitSet;
51use itertools::Itertools as _;
52pub use logical_optimization::*;
53pub use optimizer_context::*;
54use plan_expr_rewriter::ConstEvalRewriter;
55use property::Order;
56use risingwave_common::bail;
57use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
58use risingwave_common::types::DataType;
59use risingwave_common::util::column_index_mapping::ColIndexMapping;
60use risingwave_common::util::iter_util::ZipEqDebug;
61use risingwave_connector::WithPropertiesExt;
62use risingwave_connector::sink::catalog::SinkFormatDesc;
63use risingwave_pb::stream_plan::StreamScanType;
64
65use self::heuristic_optimizer::ApplyOrder;
66use self::plan_node::generic::{self, PhysicalPlanRef};
67use self::plan_node::{
68 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
69 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
70 ToStreamContext, stream_enforce_eowc_requirement,
71};
72#[cfg(debug_assertions)]
73use self::plan_visitor::InputRefValidator;
74use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
75use self::property::{Cardinality, RequiredDist};
76use self::rule::*;
77use crate::TableCatalog;
78use crate::catalog::table_catalog::TableType;
79use crate::catalog::{DatabaseId, SchemaId};
80use crate::error::{ErrorCode, Result};
81use crate::expr::TimestamptzExprFinder;
82use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
83use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
84use crate::optimizer::plan_node::{
85 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
86 StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
87 ToStream, VisitExprsRecursive,
88};
89use crate::optimizer::plan_visitor::{
90 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
91};
92use crate::optimizer::property::Distribution;
93use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
94
95#[derive(Educe)]
106#[educe(Debug, Clone)]
107pub struct PlanRoot<P: PlanPhase> {
108 pub plan: PlanRef<P::Convention>,
110 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
112 _phase: PhantomData<P>,
113 required_dist: RequiredDist,
114 required_order: Order,
115 out_fields: FixedBitSet,
116 out_names: Vec<String>,
117}
118
119pub trait PlanPhase {
125 type Convention: ConventionMarker;
126}
127
128macro_rules! for_all_phase {
129 () => {
130 for_all_phase! {
131 { Logical, $crate::optimizer::plan_node::Logical },
132 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
133 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
134 { Batch, $crate::optimizer::plan_node::Batch },
135 { Stream, $crate::optimizer::plan_node::Stream }
136 }
137 };
138 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
139 $(
140 paste::paste! {
141 pub struct [< PlanPhase$phase >];
142 impl PlanPhase for [< PlanPhase$phase >] {
143 type Convention = $convention;
144 }
145 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
146 }
147 )+
148 }
149}
150
151for_all_phase!();
152
153impl LogicalPlanRoot {
154 pub fn new_with_logical_plan(
155 plan: LogicalPlanRef,
156 required_dist: RequiredDist,
157 required_order: Order,
158 out_fields: FixedBitSet,
159 out_names: Vec<String>,
160 ) -> Self {
161 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
162 }
163}
164
165impl BatchPlanRoot {
166 pub fn new_with_batch_plan(
167 plan: BatchPlanRef,
168 required_dist: RequiredDist,
169 required_order: Order,
170 out_fields: FixedBitSet,
171 out_names: Vec<String>,
172 ) -> Self {
173 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
174 }
175}
176
177impl<P: PlanPhase> PlanRoot<P> {
178 fn new_inner(
179 plan: PlanRef<P::Convention>,
180 required_dist: RequiredDist,
181 required_order: Order,
182 out_fields: FixedBitSet,
183 out_names: Vec<String>,
184 ) -> Self {
185 let input_schema = plan.schema();
186 assert_eq!(input_schema.fields().len(), out_fields.len());
187 assert_eq!(out_fields.count_ones(..), out_names.len());
188
189 Self {
190 plan,
191 _phase: PhantomData,
192 required_dist,
193 required_order,
194 out_fields,
195 out_names,
196 }
197 }
198
199 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
200 PlanRoot {
201 plan,
202 _phase: PhantomData,
203 required_dist: self.required_dist,
204 required_order: self.required_order,
205 out_fields: self.out_fields,
206 out_names: self.out_names,
207 }
208 }
209
210 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
215 if out_names.len() != self.out_fields.count_ones(..) {
216 Err(ErrorCode::InvalidInputSyntax(
217 "number of column names does not match number of columns".to_owned(),
218 ))?
219 }
220 self.out_names = out_names;
221 Ok(())
222 }
223
224 pub fn schema(&self) -> Schema {
226 Schema {
229 fields: self
230 .out_fields
231 .ones()
232 .map(|i| self.plan.schema().fields()[i].clone())
233 .zip_eq_debug(&self.out_names)
234 .map(|(field, name)| Field {
235 name: name.clone(),
236 ..field
237 })
238 .collect(),
239 }
240 }
241}
242
243impl LogicalPlanRoot {
244 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
248 if self.out_fields.count_ones(..) == self.out_fields.len() {
249 return self.plan;
250 }
251 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
252 }
253
254 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
258 use generic::Agg;
259 use plan_node::PlanAggCall;
260 use risingwave_common::types::ListValue;
261 use risingwave_expr::aggregate::PbAggKind;
262
263 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
264 use crate::utils::{Condition, IndexSet};
265
266 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
267 bail!("subquery must return only one column");
268 };
269 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
270 let return_type = DataType::list(input_column_type.clone());
271 let agg = Agg::new(
272 vec![PlanAggCall {
273 agg_type: PbAggKind::ArrayAgg.into(),
274 return_type: return_type.clone(),
275 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
276 distinct: false,
277 order_by: self.required_order.column_orders,
278 filter: Condition::true_cond(),
279 direct_args: vec![],
280 }],
281 IndexSet::empty(),
282 self.plan,
283 );
284 Ok(LogicalProject::create(
285 agg.into(),
286 vec![
287 FunctionCall::new(
288 ExprType::Coalesce,
289 vec![
290 InputRef::new(0, return_type).into(),
291 ExprImpl::literal_list(
292 ListValue::empty(&input_column_type),
293 input_column_type,
294 ),
295 ],
296 )
297 .unwrap()
298 .into(),
299 ],
300 ))
301 }
302
303 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
305 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
306 Ok(self)
307 }
308
309 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
311 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
312 Ok(self.into_phase(plan))
313 }
314
315 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
316 self.gen_optimized_logical_plan_for_batch()?
317 .gen_batch_plan()
318 }
319}
320
321impl BatchOptimizedLogicalPlanRoot {
322 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
324 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
325 return Err(ErrorCode::NotSupported(
326 "do not support temporal join for batch queries".to_owned(),
327 "please use temporal join in streaming queries".to_owned(),
328 )
329 .into());
330 }
331
332 let ctx = self.plan.ctx();
333 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
335
336 plan = const_eval_exprs(plan)?;
338
339 if ctx.is_explain_trace() {
340 ctx.trace("Const eval exprs:");
341 ctx.trace(plan.explain_to_string());
342 }
343
344 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
346 if ctx.is_explain_trace() {
347 ctx.trace("To Batch Plan:");
348 ctx.trace(plan.explain_to_string());
349 }
350
351 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
352 "Merge BatchProject",
353 vec![BatchProjectMergeRule::create()],
354 ApplyOrder::BottomUp,
355 ))?;
356
357 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
359
360 if ctx.is_explain_trace() {
361 ctx.trace("Inline Session Timezone:");
362 ctx.trace(plan.explain_to_string());
363 }
364
365 #[cfg(debug_assertions)]
366 InputRefValidator.validate(plan.clone());
367 assert_eq!(
368 *plan.distribution(),
369 Distribution::Single,
370 "{}",
371 plan.explain_to_string()
372 );
373 assert!(
374 !has_batch_exchange(plan.clone()),
375 "{}",
376 plan.explain_to_string()
377 );
378
379 let ctx = plan.ctx();
380 if ctx.is_explain_trace() {
381 ctx.trace("To Batch Physical Plan:");
382 ctx.trace(plan.explain_to_string());
383 }
384
385 Ok(self.into_phase(plan))
386 }
387}
388
389impl BatchPlanRoot {
390 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
392 self.required_dist = RequiredDist::single();
393 let mut plan = self.plan;
394
395 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
397
398 let ctx = plan.ctx();
399 if ctx.is_explain_trace() {
400 ctx.trace("To Batch Distributed Plan:");
401 ctx.trace(plan.explain_to_string());
402 }
403 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
404 plan =
405 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
406 }
407
408 if self.out_fields.count_ones(..) != self.out_fields.len() {
410 plan =
411 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
412 }
413
414 let plan = plan.optimize_by_rules(&OptimizationStage::new(
416 "Push Limit To Scan",
417 vec![BatchPushLimitToScanRule::create()],
418 ApplyOrder::BottomUp,
419 ))?;
420
421 let plan = plan.optimize_by_rules(&OptimizationStage::new(
422 "Iceberg Count Star",
423 vec![BatchIcebergCountStar::create()],
424 ApplyOrder::TopDown,
425 ))?;
426
427 let plan = plan.optimize_by_rules(&OptimizationStage::new(
430 "Iceberg Predicate Pushdown",
431 vec![BatchIcebergPredicatePushDownRule::create()],
432 ApplyOrder::BottomUp,
433 ))?;
434
435 Ok(plan)
436 }
437
438 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
440 let mut plan = self.plan;
441
442 plan = plan.to_local_with_order_required(&self.required_order)?;
444
445 let insert_exchange = match plan.distribution() {
448 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
449 _ => true,
450 };
451 if insert_exchange {
452 plan =
453 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
454 }
455
456 if self.out_fields.count_ones(..) != self.out_fields.len() {
458 plan =
459 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
460 }
461
462 let ctx = plan.ctx();
463 if ctx.is_explain_trace() {
464 ctx.trace("To Batch Local Plan:");
465 ctx.trace(plan.explain_to_string());
466 }
467
468 let plan = plan.optimize_by_rules(&OptimizationStage::new(
470 "Push Limit To Scan",
471 vec![BatchPushLimitToScanRule::create()],
472 ApplyOrder::BottomUp,
473 ))?;
474
475 let plan = plan.optimize_by_rules(&OptimizationStage::new(
476 "Iceberg Count Star",
477 vec![BatchIcebergCountStar::create()],
478 ApplyOrder::TopDown,
479 ))?;
480 Ok(plan)
481 }
482}
483
484impl LogicalPlanRoot {
485 fn gen_optimized_stream_plan(
487 self,
488 emit_on_window_close: bool,
489 allow_snapshot_backfill: bool,
490 ) -> Result<StreamOptimizedLogicalPlanRoot> {
491 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
492 StreamScanType::SnapshotBackfill
493 } else if self.should_use_arrangement_backfill() {
494 StreamScanType::ArrangementBackfill
495 } else {
496 StreamScanType::Backfill
497 };
498 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
499 }
500
501 fn gen_optimized_stream_plan_inner(
502 self,
503 emit_on_window_close: bool,
504 stream_scan_type: StreamScanType,
505 ) -> Result<StreamOptimizedLogicalPlanRoot> {
506 let ctx = self.plan.ctx();
507 let _explain_trace = ctx.is_explain_trace();
508
509 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
510
511 let mut plan = optimized_plan
512 .plan
513 .clone()
514 .optimize_by_rules(&OptimizationStage::new(
515 "Merge StreamProject",
516 vec![StreamProjectMergeRule::create()],
517 ApplyOrder::BottomUp,
518 ))?;
519
520 if ctx
521 .session_ctx()
522 .config()
523 .streaming_separate_consecutive_join()
524 {
525 plan = plan.optimize_by_rules(&OptimizationStage::new(
526 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
527 vec![SeparateConsecutiveJoinRule::create()],
528 ApplyOrder::BottomUp,
529 ))?;
530 }
531
532 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
536 plan = plan.optimize_by_rules(&OptimizationStage::new(
537 "Add Logstore for Unaligned join",
538 vec![AddLogstoreRule::create()],
539 ApplyOrder::BottomUp,
540 ))?;
541 }
542
543 if ctx.session_ctx().config().streaming_enable_delta_join()
544 && ctx.session_ctx().config().enable_index_selection()
545 {
546 plan = plan.optimize_by_rules(&OptimizationStage::new(
549 "To IndexDeltaJoin",
550 vec![IndexDeltaJoinRule::create()],
551 ApplyOrder::BottomUp,
552 ))?;
553 }
554 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
556
557 if ctx.is_explain_trace() {
558 ctx.trace("Inline session timezone:");
559 ctx.trace(plan.explain_to_string());
560 }
561
562 plan = const_eval_exprs(plan)?;
564
565 if ctx.is_explain_trace() {
566 ctx.trace("Const eval exprs:");
567 ctx.trace(plan.explain_to_string());
568 }
569
570 #[cfg(debug_assertions)]
571 InputRefValidator.validate(plan.clone());
572
573 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
574 return Err(ErrorCode::NotSupported(
575 "exist dangling temporal scan".to_owned(),
576 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
577 ).into());
578 }
579
580 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
581 return Err(ErrorCode::NotSupported(
582 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
583 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
584 ).into());
585 }
586
587 if ctx.session_ctx().config().enable_locality_backfill()
588 && LocalityProviderCounter::count(plan.clone()) > 5
589 {
590 risingwave_common::license::Feature::LocalityBackfill.check_available()?
591 }
592
593 Ok(optimized_plan.into_phase(plan))
594 }
595
596 fn gen_stream_plan(
598 self,
599 emit_on_window_close: bool,
600 stream_scan_type: StreamScanType,
601 ) -> Result<StreamOptimizedLogicalPlanRoot> {
602 let ctx = self.plan.ctx();
603 let explain_trace = ctx.is_explain_trace();
604
605 let plan = {
606 {
607 if !ctx
608 .session_ctx()
609 .config()
610 .streaming_allow_jsonb_in_stream_key()
611 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
612 {
613 return Err(ErrorCode::NotSupported(
614 err,
615 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
616 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
617 ).into());
618 }
619 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
620 let (plan, out_col_change) = {
621 let (plan, out_col_change) = optimized_plan
622 .plan
623 .logical_rewrite_for_stream(&mut Default::default())?;
624 if out_col_change.is_injective() {
625 (plan, out_col_change)
626 } else {
627 let mut output_indices = (0..plan.schema().len()).collect_vec();
628 #[allow(unused_assignments)]
629 let (mut map, mut target_size) = out_col_change.into_parts();
630
631 target_size = plan.schema().len();
634 let mut tar_exists = vec![false; target_size];
635 for i in map.iter_mut().flatten() {
636 if tar_exists[*i] {
637 output_indices.push(*i);
638 *i = target_size;
639 target_size += 1;
640 } else {
641 tar_exists[*i] = true;
642 }
643 }
644 let plan =
645 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
646 let out_col_change = ColIndexMapping::new(map, target_size);
647 (plan.into(), out_col_change)
648 }
649 };
650
651 if explain_trace {
652 ctx.trace("Logical Rewrite For Stream:");
653 ctx.trace(plan.explain_to_string());
654 }
655
656 optimized_plan.required_dist =
657 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
658 optimized_plan.required_order = out_col_change
659 .rewrite_required_order(&optimized_plan.required_order)
660 .unwrap();
661 optimized_plan.out_fields =
662 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
663 let mut plan = plan.to_stream_with_dist_required(
664 &optimized_plan.required_dist,
665 &mut ToStreamContext::new_with_stream_scan_type(
666 emit_on_window_close,
667 stream_scan_type,
668 ),
669 )?;
670 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
671 optimized_plan.into_phase(plan)
672 }
673 };
674
675 if explain_trace {
676 ctx.trace("To Stream Plan:");
677 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
679 }
680 Ok(plan)
681 }
682
683 fn compute_cardinality(&self) -> Cardinality {
687 CardinalityVisitor.visit(self.plan.clone())
688 }
689
690 pub fn gen_table_plan(
692 self,
693 context: OptimizerContextRef,
694 table_name: String,
695 database_id: DatabaseId,
696 schema_id: SchemaId,
697 CreateTableInfo {
698 columns,
699 pk_column_ids,
700 row_id_index,
701 watermark_descs,
702 source_catalog,
703 version,
704 }: CreateTableInfo,
705 CreateTableProps {
706 definition,
707 append_only,
708 on_conflict,
709 with_version_columns,
710 webhook_info,
711 engine,
712 }: CreateTableProps,
713 ) -> Result<StreamMaterialize> {
714 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
716
717 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
718
719 let pk_column_indices = {
720 let mut id_to_idx = HashMap::new();
721
722 columns.iter().enumerate().for_each(|(idx, c)| {
723 id_to_idx.insert(c.column_id(), idx);
724 });
725 pk_column_ids
726 .iter()
727 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
729 };
730
731 fn inject_project_for_generated_column_if_needed(
732 columns: &[ColumnCatalog],
733 node: StreamPlanRef,
734 ) -> Result<StreamPlanRef> {
735 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
736 if let Some(exprs) = exprs {
737 let logical_project = generic::Project::new(exprs, node);
738 return Ok(StreamProject::new(logical_project).into());
739 }
740 Ok(node)
741 }
742
743 #[derive(PartialEq, Debug, Copy, Clone)]
744 enum PrimaryKeyKind {
745 UserDefinedPrimaryKey,
746 NonAppendOnlyRowIdPk,
747 AppendOnlyRowIdPk,
748 }
749
750 fn inject_dml_node(
751 columns: &[ColumnCatalog],
752 append_only: bool,
753 stream_plan: StreamPlanRef,
754 pk_column_indices: &[usize],
755 kind: PrimaryKeyKind,
756 column_descs: Vec<ColumnDesc>,
757 ) -> Result<StreamPlanRef> {
758 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
759
760 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
762
763 dml_node = match kind {
764 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
765 RequiredDist::hash_shard(pk_column_indices)
766 .streaming_enforce_if_not_satisfies(dml_node)?
767 }
768 PrimaryKeyKind::AppendOnlyRowIdPk => {
769 StreamExchange::new_no_shuffle(dml_node).into()
770 }
771 };
772
773 Ok(dml_node)
774 }
775
776 let kind = if let Some(row_id_index) = row_id_index {
777 assert_eq!(
778 pk_column_indices.iter().exactly_one().copied().unwrap(),
779 row_id_index
780 );
781 if append_only {
782 PrimaryKeyKind::AppendOnlyRowIdPk
783 } else {
784 PrimaryKeyKind::NonAppendOnlyRowIdPk
785 }
786 } else {
787 PrimaryKeyKind::UserDefinedPrimaryKey
788 };
789
790 let column_descs: Vec<ColumnDesc> = columns
791 .iter()
792 .filter(|&c| c.can_dml())
793 .map(|c| c.column_desc.clone())
794 .collect();
795
796 let mut not_null_idxs = vec![];
797 for (idx, column) in column_descs.iter().enumerate() {
798 if !column.nullable {
799 not_null_idxs.push(idx);
800 }
801 }
802
803 let version_column_indices = if !with_version_columns.is_empty() {
804 find_version_column_indices(&columns, with_version_columns)?
805 } else {
806 vec![]
807 };
808
809 let with_external_source = source_catalog.is_some();
810 let (dml_source_node, external_source_node) = if with_external_source {
811 let dummy_source_node = LogicalSource::new(
812 None,
813 columns.clone(),
814 row_id_index,
815 SourceNodeKind::CreateTable,
816 context.clone(),
817 None,
818 )
819 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
820 let mut external_source_node = stream_plan.plan;
821 external_source_node =
822 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
823 external_source_node = match kind {
824 PrimaryKeyKind::UserDefinedPrimaryKey => {
825 RequiredDist::hash_shard(&pk_column_indices)
826 .streaming_enforce_if_not_satisfies(external_source_node)?
827 }
828
829 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
830 StreamExchange::new_no_shuffle(external_source_node).into()
831 }
832 };
833 (dummy_source_node, Some(external_source_node))
834 } else {
835 (stream_plan.plan, None)
836 };
837
838 let dml_node = inject_dml_node(
839 &columns,
840 append_only,
841 dml_source_node,
842 &pk_column_indices,
843 kind,
844 column_descs,
845 )?;
846
847 let dists = external_source_node
848 .iter()
849 .map(|input| input.distribution())
850 .chain([dml_node.distribution()])
851 .unique()
852 .collect_vec();
853
854 let dist = match &dists[..] {
855 &[Distribution::SomeShard, Distribution::HashShard(_)]
856 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
857 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
858 dist.clone()
859 }
860 _ => {
861 unreachable!()
862 }
863 };
864
865 let generated_column_exprs =
866 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
867 let upstream_sink_union = StreamUpstreamSinkUnion::new(
868 context.clone(),
869 dml_node.schema(),
870 dml_node.stream_key(),
871 dist.clone(), append_only,
873 row_id_index.is_none(),
874 generated_column_exprs,
875 );
876
877 let union_inputs = external_source_node
878 .into_iter()
879 .chain([dml_node, upstream_sink_union.into()])
880 .collect_vec();
881
882 let mut stream_plan = StreamUnion::new_with_dist(
883 Union {
884 all: true,
885 inputs: union_inputs,
886 source_col: None,
887 },
888 dist,
889 )
890 .into();
891
892 if !watermark_descs.is_empty() {
894 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
895 }
896
897 if let Some(row_id_index) = row_id_index {
899 match kind {
900 PrimaryKeyKind::UserDefinedPrimaryKey => {
901 unreachable!()
902 }
903 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
904 stream_plan = StreamRowIdGen::new_with_dist(
905 stream_plan,
906 row_id_index,
907 Distribution::HashShard(vec![row_id_index]),
908 )
909 .into();
910 }
911 }
912 }
913
914 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
915
916 if let ConflictBehavior::IgnoreConflict = conflict_behavior
917 && !version_column_indices.is_empty()
918 {
919 Err(ErrorCode::InvalidParameterValue(
920 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
921 ))?
922 }
923
924 let retention_seconds = context.with_options().retention_seconds();
925
926 let table_required_dist = {
927 let mut bitset = FixedBitSet::with_capacity(columns.len());
928 for idx in &pk_column_indices {
929 bitset.insert(*idx);
930 }
931 RequiredDist::ShardByKey(bitset)
932 };
933
934 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
935
936 if !not_null_idxs.is_empty() {
937 stream_plan =
938 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
939 }
940
941 let refreshable = source_catalog
943 .as_ref()
944 .map(|catalog| {
945 catalog.with_properties.is_batch_connector() || {
946 matches!(
947 catalog
948 .refresh_mode
949 .as_ref()
950 .map(|refresh_mode| refresh_mode.refresh_mode),
951 Some(Some(RefreshMode::FullRecompute(_)))
952 )
953 }
954 })
955 .unwrap_or(false);
956
957 if refreshable && row_id_index.is_some() {
959 return Err(crate::error::ErrorCode::BindError(
960 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
961 .to_owned(),
962 )
963 .into());
964 }
965
966 StreamMaterialize::create_for_table(
967 stream_plan,
968 table_name,
969 database_id,
970 schema_id,
971 table_required_dist,
972 Order::any(),
973 columns,
974 definition,
975 conflict_behavior,
976 version_column_indices,
977 pk_column_indices,
978 row_id_index,
979 version,
980 retention_seconds,
981 webhook_info,
982 engine,
983 refreshable,
984 )
985 }
986
987 pub fn gen_materialize_plan(
989 self,
990 database_id: DatabaseId,
991 schema_id: SchemaId,
992 mv_name: String,
993 definition: String,
994 emit_on_window_close: bool,
995 ) -> Result<StreamMaterialize> {
996 let cardinality = self.compute_cardinality();
997 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
998 StreamMaterialize::create(
999 stream_plan,
1000 mv_name,
1001 database_id,
1002 schema_id,
1003 definition,
1004 TableType::MaterializedView,
1005 cardinality,
1006 None,
1007 )
1008 }
1009
1010 pub fn gen_index_plan(
1012 self,
1013 index_name: String,
1014 database_id: DatabaseId,
1015 schema_id: SchemaId,
1016 definition: String,
1017 retention_seconds: Option<NonZeroU32>,
1018 ) -> Result<StreamMaterialize> {
1019 let cardinality = self.compute_cardinality();
1020 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1021
1022 StreamMaterialize::create(
1023 stream_plan,
1024 index_name,
1025 database_id,
1026 schema_id,
1027 definition,
1028 TableType::Index,
1029 cardinality,
1030 retention_seconds,
1031 )
1032 }
1033
1034 pub fn gen_vector_index_plan(
1035 self,
1036 index_name: String,
1037 database_id: DatabaseId,
1038 schema_id: SchemaId,
1039 definition: String,
1040 retention_seconds: Option<NonZeroU32>,
1041 vector_index_info: PbVectorIndexInfo,
1042 ) -> Result<StreamVectorIndexWrite> {
1043 let cardinality = self.compute_cardinality();
1044 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1045
1046 StreamVectorIndexWrite::create(
1047 stream_plan,
1048 index_name,
1049 database_id,
1050 schema_id,
1051 definition,
1052 cardinality,
1053 retention_seconds,
1054 vector_index_info,
1055 )
1056 }
1057
1058 #[allow(clippy::too_many_arguments)]
1060 pub fn gen_sink_plan(
1061 self,
1062 sink_name: String,
1063 definition: String,
1064 properties: WithOptionsSecResolved,
1065 emit_on_window_close: bool,
1066 db_name: String,
1067 sink_from_table_name: String,
1068 format_desc: Option<SinkFormatDesc>,
1069 without_backfill: bool,
1070 target_table: Option<Arc<TableCatalog>>,
1071 partition_info: Option<PartitionComputeInfo>,
1072 user_specified_columns: bool,
1073 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1074 ) -> Result<StreamSink> {
1075 let stream_scan_type = if without_backfill {
1076 StreamScanType::UpstreamOnly
1077 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1078 StreamScanType::SnapshotBackfill
1080 } else if self.should_use_arrangement_backfill() {
1081 StreamScanType::ArrangementBackfill
1082 } else {
1083 StreamScanType::Backfill
1084 };
1085 if auto_refresh_schema_from_table.is_some()
1086 && stream_scan_type != StreamScanType::ArrangementBackfill
1087 {
1088 return Err(ErrorCode::InvalidInputSyntax(format!(
1089 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1090 stream_scan_type
1091 ))
1092 .into());
1093 }
1094 let stream_plan =
1095 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1096 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1097 let columns = t.columns_without_rw_timestamp();
1098 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1099 });
1100
1101 StreamSink::create(
1102 stream_plan,
1103 sink_name,
1104 db_name,
1105 sink_from_table_name,
1106 target_table,
1107 target_columns_to_plan_mapping,
1108 definition,
1109 properties,
1110 format_desc,
1111 partition_info,
1112 auto_refresh_schema_from_table,
1113 )
1114 }
1115}
1116
1117impl<P: PlanPhase> PlanRoot<P> {
1118 pub fn should_use_arrangement_backfill(&self) -> bool {
1119 let ctx = self.plan.ctx();
1120 let session_ctx = ctx.session_ctx();
1121 let arrangement_backfill_enabled = session_ctx
1122 .env()
1123 .streaming_config()
1124 .developer
1125 .enable_arrangement_backfill;
1126 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1127 }
1128
1129 pub fn should_use_snapshot_backfill(&self) -> bool {
1130 self.plan
1131 .ctx()
1132 .session_ctx()
1133 .config()
1134 .streaming_use_snapshot_backfill()
1135 }
1136
1137 pub fn target_columns_to_plan_mapping(
1139 &self,
1140 tar_cols: &[ColumnCatalog],
1141 user_specified_columns: bool,
1142 ) -> Vec<Option<usize>> {
1143 #[allow(clippy::disallowed_methods)]
1144 let visible_cols: Vec<(usize, String)> = self
1145 .out_fields
1146 .ones()
1147 .zip_eq(self.out_names.iter().cloned())
1148 .collect_vec();
1149
1150 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1151 let visible_col_idxes_by_name = visible_cols
1152 .iter()
1153 .map(|(i, name)| (name.as_ref(), *i))
1154 .collect::<BTreeMap<_, _>>();
1155
1156 tar_cols
1157 .iter()
1158 .enumerate()
1159 .filter(|(_, tar_col)| tar_col.can_dml())
1160 .map(|(tar_i, tar_col)| {
1161 if user_specified_columns {
1162 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1163 } else {
1164 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1165 }
1166 })
1167 .collect()
1168 }
1169}
1170
1171fn find_version_column_indices(
1172 column_catalog: &Vec<ColumnCatalog>,
1173 version_column_names: Vec<String>,
1174) -> Result<Vec<usize>> {
1175 let mut indices = Vec::new();
1176 for version_column_name in version_column_names {
1177 let mut found = false;
1178 for (index, column) in column_catalog.iter().enumerate() {
1179 if column.column_desc.name == version_column_name {
1180 if let &DataType::Jsonb
1181 | &DataType::List(_)
1182 | &DataType::Struct(_)
1183 | &DataType::Bytea
1184 | &DataType::Boolean = column.data_type()
1185 {
1186 return Err(ErrorCode::InvalidInputSyntax(format!(
1187 "Version column {} must be of a comparable data type",
1188 version_column_name
1189 ))
1190 .into());
1191 }
1192 indices.push(index);
1193 found = true;
1194 break;
1195 }
1196 }
1197 if !found {
1198 return Err(ErrorCode::InvalidInputSyntax(format!(
1199 "Version column {} not found",
1200 version_column_name
1201 ))
1202 .into());
1203 }
1204 }
1205 Ok(indices)
1206}
1207
1208fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1209 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1210
1211 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1212 if let Some(error) = const_eval_rewriter.error {
1213 return Err(error);
1214 }
1215 Ok(plan)
1216}
1217
1218fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1219 ctx: OptimizerContextRef,
1220 plan: PlanRef<C>,
1221) -> Result<PlanRef<C>> {
1222 let mut v = TimestamptzExprFinder::default();
1223 plan.visit_exprs_recursive(&mut v);
1224 if v.has() {
1225 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1226 } else {
1227 Ok(plan)
1228 }
1229}
1230
1231fn exist_and_no_exchange_before(
1232 plan: &BatchPlanRef,
1233 is_candidate: fn(&BatchPlanRef) -> bool,
1234) -> bool {
1235 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1236 return false;
1237 }
1238 is_candidate(plan)
1239 || plan
1240 .inputs()
1241 .iter()
1242 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1243}
1244
1245impl BatchPlanRef {
1246 fn is_user_table_scan(&self) -> bool {
1247 self.node_type() == BatchPlanNodeType::BatchSeqScan
1248 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1249 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1250 }
1251
1252 fn is_source_scan(&self) -> bool {
1253 self.node_type() == BatchPlanNodeType::BatchSource
1254 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1255 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1256 }
1257
1258 fn is_insert(&self) -> bool {
1259 self.node_type() == BatchPlanNodeType::BatchInsert
1260 }
1261
1262 fn is_update(&self) -> bool {
1263 self.node_type() == BatchPlanNodeType::BatchUpdate
1264 }
1265
1266 fn is_delete(&self) -> bool {
1267 self.node_type() == BatchPlanNodeType::BatchDelete
1268 }
1269}
1270
1271fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1277 assert_eq!(plan.distribution(), &Distribution::Single);
1278 exist_and_no_exchange_before(&plan, |plan| {
1279 plan.is_user_table_scan()
1280 || plan.is_source_scan()
1281 || plan.is_insert()
1282 || plan.is_update()
1283 || plan.is_delete()
1284 })
1285}
1286
1287fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1290 assert_eq!(plan.distribution(), &Distribution::Single);
1291 exist_and_no_exchange_before(&plan, |plan| {
1292 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1293 })
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use super::*;
1299 use crate::optimizer::plan_node::LogicalValues;
1300
1301 #[tokio::test]
1302 async fn test_as_subplan() {
1303 let ctx = OptimizerContext::mock().await;
1304 let values = LogicalValues::new(
1305 vec![],
1306 Schema::new(vec![
1307 Field::with_name(DataType::Int32, "v1"),
1308 Field::with_name(DataType::Varchar, "v2"),
1309 ]),
1310 ctx,
1311 )
1312 .into();
1313 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1314 let out_names = vec!["v1".into()];
1315 let root = PlanRoot::new_with_logical_plan(
1316 values,
1317 RequiredDist::Any,
1318 Order::any(),
1319 out_fields,
1320 out_names,
1321 );
1322 let subplan = root.into_unordered_subplan();
1323 assert_eq!(
1324 subplan.schema(),
1325 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1326 );
1327 }
1328}