1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::LogicalPlanToDataFusionExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, LogicalIcebergScanExt, PlanVisitor, RelationCollectorVisitor,
38 SysTableVisitor,
39};
40use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
41
42pub mod backfill_order_strategy;
43mod logical_optimization;
44mod optimizer_context;
45pub mod plan_expr_rewriter;
46mod plan_expr_visitor;
47mod rule;
48
49use std::collections::{BTreeMap, HashMap};
50use std::marker::PhantomData;
51
52use educe::Educe;
53use fixedbitset::FixedBitSet;
54use itertools::Itertools as _;
55pub use logical_optimization::*;
56pub use optimizer_context::*;
57use plan_expr_rewriter::ConstEvalRewriter;
58use property::Order;
59use risingwave_common::bail;
60use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
61use risingwave_common::types::DataType;
62use risingwave_common::util::column_index_mapping::ColIndexMapping;
63use risingwave_common::util::iter_util::ZipEqDebug;
64use risingwave_connector::WithPropertiesExt;
65use risingwave_connector::sink::catalog::SinkFormatDesc;
66use risingwave_pb::stream_plan::StreamScanType;
67
68use self::heuristic_optimizer::ApplyOrder;
69use self::plan_node::generic::{self, PhysicalPlanRef};
70use self::plan_node::{
71 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
72 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
73 ToStreamContext, stream_enforce_eowc_requirement,
74};
75#[cfg(debug_assertions)]
76use self::plan_visitor::InputRefValidator;
77use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
78use self::property::{Cardinality, RequiredDist};
79use self::rule::*;
80use crate::TableCatalog;
81use crate::catalog::table_catalog::TableType;
82use crate::catalog::{DatabaseId, SchemaId};
83use crate::error::{ErrorCode, Result};
84use crate::expr::TimestamptzExprFinder;
85use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
86use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
87use crate::optimizer::plan_node::{
88 Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker, PlanTreeNode, Stream,
89 StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion, StreamVectorIndexWrite,
90 ToStream, VisitExprsRecursive,
91};
92use crate::optimizer::plan_visitor::{
93 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
94};
95use crate::optimizer::property::Distribution;
96use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
97
98#[derive(Educe)]
109#[educe(Debug, Clone)]
110pub struct PlanRoot<P: PlanPhase> {
111 pub plan: PlanRef<P::Convention>,
113 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
115 _phase: PhantomData<P>,
116 required_dist: RequiredDist,
117 required_order: Order,
118 out_fields: FixedBitSet,
119 out_names: Vec<String>,
120}
121
122pub trait PlanPhase {
128 type Convention: ConventionMarker;
129}
130
131macro_rules! for_all_phase {
132 () => {
133 for_all_phase! {
134 { Logical, $crate::optimizer::plan_node::Logical },
135 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
136 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
137 { Batch, $crate::optimizer::plan_node::Batch },
138 { Stream, $crate::optimizer::plan_node::Stream }
139 }
140 };
141 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
142 $(
143 paste::paste! {
144 pub struct [< PlanPhase$phase >];
145 impl PlanPhase for [< PlanPhase$phase >] {
146 type Convention = $convention;
147 }
148 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
149 }
150 )+
151 }
152}
153
154for_all_phase!();
155
156impl LogicalPlanRoot {
157 pub fn new_with_logical_plan(
158 plan: LogicalPlanRef,
159 required_dist: RequiredDist,
160 required_order: Order,
161 out_fields: FixedBitSet,
162 out_names: Vec<String>,
163 ) -> Self {
164 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
165 }
166}
167
168impl BatchPlanRoot {
169 pub fn new_with_batch_plan(
170 plan: BatchPlanRef,
171 required_dist: RequiredDist,
172 required_order: Order,
173 out_fields: FixedBitSet,
174 out_names: Vec<String>,
175 ) -> Self {
176 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
177 }
178}
179
180impl<P: PlanPhase> PlanRoot<P> {
181 fn new_inner(
182 plan: PlanRef<P::Convention>,
183 required_dist: RequiredDist,
184 required_order: Order,
185 out_fields: FixedBitSet,
186 out_names: Vec<String>,
187 ) -> Self {
188 let input_schema = plan.schema();
189 assert_eq!(input_schema.fields().len(), out_fields.len());
190 assert_eq!(out_fields.count_ones(..), out_names.len());
191
192 Self {
193 plan,
194 _phase: PhantomData,
195 required_dist,
196 required_order,
197 out_fields,
198 out_names,
199 }
200 }
201
202 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
203 PlanRoot {
204 plan,
205 _phase: PhantomData,
206 required_dist: self.required_dist,
207 required_order: self.required_order,
208 out_fields: self.out_fields,
209 out_names: self.out_names,
210 }
211 }
212
213 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
218 if out_names.len() != self.out_fields.count_ones(..) {
219 Err(ErrorCode::InvalidInputSyntax(
220 "number of column names does not match number of columns".to_owned(),
221 ))?
222 }
223 self.out_names = out_names;
224 Ok(())
225 }
226
227 pub fn schema(&self) -> Schema {
229 Schema {
232 fields: self
233 .out_fields
234 .ones()
235 .map(|i| self.plan.schema().fields()[i].clone())
236 .zip_eq_debug(&self.out_names)
237 .map(|(field, name)| Field {
238 name: name.clone(),
239 ..field
240 })
241 .collect(),
242 }
243 }
244}
245
246impl LogicalPlanRoot {
247 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
251 if self.out_fields.count_ones(..) == self.out_fields.len() {
252 return self.plan;
253 }
254 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
255 }
256
257 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
261 use generic::Agg;
262 use plan_node::PlanAggCall;
263 use risingwave_common::types::ListValue;
264 use risingwave_expr::aggregate::PbAggKind;
265
266 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
267 use crate::utils::{Condition, IndexSet};
268
269 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
270 bail!("subquery must return only one column");
271 };
272 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
273 let return_type = DataType::list(input_column_type.clone());
274 let agg = Agg::new(
275 vec![PlanAggCall {
276 agg_type: PbAggKind::ArrayAgg.into(),
277 return_type: return_type.clone(),
278 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
279 distinct: false,
280 order_by: self.required_order.column_orders,
281 filter: Condition::true_cond(),
282 direct_args: vec![],
283 }],
284 IndexSet::empty(),
285 self.plan,
286 );
287 Ok(LogicalProject::create(
288 agg.into(),
289 vec![
290 FunctionCall::new(
291 ExprType::Coalesce,
292 vec![
293 InputRef::new(0, return_type).into(),
294 ExprImpl::literal_list(
295 ListValue::empty(&input_column_type),
296 input_column_type,
297 ),
298 ],
299 )
300 .unwrap()
301 .into(),
302 ],
303 ))
304 }
305
306 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
308 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
309 Ok(self)
310 }
311
312 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
314 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
315 Ok(self.into_phase(plan))
316 }
317
318 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
319 self.gen_optimized_logical_plan_for_batch()?
320 .gen_batch_plan()
321 }
322}
323
324impl BatchOptimizedLogicalPlanRoot {
325 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
327 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
328 return Err(ErrorCode::NotSupported(
329 "do not support temporal join for batch queries".to_owned(),
330 "please use temporal join in streaming queries".to_owned(),
331 )
332 .into());
333 }
334
335 let ctx = self.plan.ctx();
336 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
338
339 plan = const_eval_exprs(plan)?;
341
342 if ctx.is_explain_trace() {
343 ctx.trace("Const eval exprs:");
344 ctx.trace(plan.explain_to_string());
345 }
346
347 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
349 if ctx.is_explain_trace() {
350 ctx.trace("To Batch Plan:");
351 ctx.trace(plan.explain_to_string());
352 }
353
354 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
355 "Merge BatchProject",
356 vec![BatchProjectMergeRule::create()],
357 ApplyOrder::BottomUp,
358 ))?;
359
360 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
362
363 if ctx.is_explain_trace() {
364 ctx.trace("Inline Session Timezone:");
365 ctx.trace(plan.explain_to_string());
366 }
367
368 #[cfg(debug_assertions)]
369 InputRefValidator.validate(plan.clone());
370 assert_eq!(
371 *plan.distribution(),
372 Distribution::Single,
373 "{}",
374 plan.explain_to_string()
375 );
376 assert!(
377 !has_batch_exchange(plan.clone()),
378 "{}",
379 plan.explain_to_string()
380 );
381
382 let ctx = plan.ctx();
383 if ctx.is_explain_trace() {
384 ctx.trace("To Batch Physical Plan:");
385 ctx.trace(plan.explain_to_string());
386 }
387
388 Ok(self.into_phase(plan))
389 }
390}
391
392impl BatchPlanRoot {
393 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
395 self.required_dist = RequiredDist::single();
396 let mut plan = self.plan;
397
398 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
400
401 let ctx = plan.ctx();
402 if ctx.is_explain_trace() {
403 ctx.trace("To Batch Distributed Plan:");
404 ctx.trace(plan.explain_to_string());
405 }
406 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
407 plan =
408 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
409 }
410
411 if self.out_fields.count_ones(..) != self.out_fields.len() {
413 plan =
414 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
415 }
416
417 let plan = plan.optimize_by_rules(&OptimizationStage::new(
419 "Push Limit To Scan",
420 vec![BatchPushLimitToScanRule::create()],
421 ApplyOrder::BottomUp,
422 ))?;
423
424 let plan = plan.optimize_by_rules(&OptimizationStage::new(
425 "Iceberg Count Star",
426 vec![BatchIcebergCountStar::create()],
427 ApplyOrder::TopDown,
428 ))?;
429
430 let plan = plan.optimize_by_rules(&OptimizationStage::new(
433 "Iceberg Predicate Pushdown",
434 vec![BatchIcebergPredicatePushDownRule::create()],
435 ApplyOrder::BottomUp,
436 ))?;
437
438 Ok(plan)
439 }
440
441 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
443 let mut plan = self.plan;
444
445 plan = plan.to_local_with_order_required(&self.required_order)?;
447
448 let insert_exchange = match plan.distribution() {
451 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
452 _ => true,
453 };
454 if insert_exchange {
455 plan =
456 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
457 }
458
459 if self.out_fields.count_ones(..) != self.out_fields.len() {
461 plan =
462 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
463 }
464
465 let ctx = plan.ctx();
466 if ctx.is_explain_trace() {
467 ctx.trace("To Batch Local Plan:");
468 ctx.trace(plan.explain_to_string());
469 }
470
471 let plan = plan.optimize_by_rules(&OptimizationStage::new(
473 "Push Limit To Scan",
474 vec![BatchPushLimitToScanRule::create()],
475 ApplyOrder::BottomUp,
476 ))?;
477
478 let plan = plan.optimize_by_rules(&OptimizationStage::new(
479 "Iceberg Count Star",
480 vec![BatchIcebergCountStar::create()],
481 ApplyOrder::TopDown,
482 ))?;
483 Ok(plan)
484 }
485}
486
487impl LogicalPlanRoot {
488 fn gen_optimized_stream_plan(
490 self,
491 emit_on_window_close: bool,
492 allow_snapshot_backfill: bool,
493 ) -> Result<StreamOptimizedLogicalPlanRoot> {
494 let stream_scan_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
495 StreamScanType::SnapshotBackfill
496 } else if self.should_use_arrangement_backfill() {
497 StreamScanType::ArrangementBackfill
498 } else {
499 StreamScanType::Backfill
500 };
501 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)
502 }
503
504 fn gen_optimized_stream_plan_inner(
505 self,
506 emit_on_window_close: bool,
507 stream_scan_type: StreamScanType,
508 ) -> Result<StreamOptimizedLogicalPlanRoot> {
509 let ctx = self.plan.ctx();
510 let _explain_trace = ctx.is_explain_trace();
511
512 let optimized_plan = self.gen_stream_plan(emit_on_window_close, stream_scan_type)?;
513
514 let mut plan = optimized_plan
515 .plan
516 .clone()
517 .optimize_by_rules(&OptimizationStage::new(
518 "Merge StreamProject",
519 vec![StreamProjectMergeRule::create()],
520 ApplyOrder::BottomUp,
521 ))?;
522
523 if ctx
524 .session_ctx()
525 .config()
526 .streaming_separate_consecutive_join()
527 {
528 plan = plan.optimize_by_rules(&OptimizationStage::new(
529 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
530 vec![SeparateConsecutiveJoinRule::create()],
531 ApplyOrder::BottomUp,
532 ))?;
533 }
534
535 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
539 plan = plan.optimize_by_rules(&OptimizationStage::new(
540 "Add Logstore for Unaligned join",
541 vec![AddLogstoreRule::create()],
542 ApplyOrder::BottomUp,
543 ))?;
544 }
545
546 if ctx.session_ctx().config().streaming_enable_delta_join()
547 && ctx.session_ctx().config().enable_index_selection()
548 {
549 plan = plan.optimize_by_rules(&OptimizationStage::new(
552 "To IndexDeltaJoin",
553 vec![IndexDeltaJoinRule::create()],
554 ApplyOrder::BottomUp,
555 ))?;
556 }
557 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
559
560 if ctx.is_explain_trace() {
561 ctx.trace("Inline session timezone:");
562 ctx.trace(plan.explain_to_string());
563 }
564
565 plan = const_eval_exprs(plan)?;
567
568 if ctx.is_explain_trace() {
569 ctx.trace("Const eval exprs:");
570 ctx.trace(plan.explain_to_string());
571 }
572
573 #[cfg(debug_assertions)]
574 InputRefValidator.validate(plan.clone());
575
576 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
577 return Err(ErrorCode::NotSupported(
578 "exist dangling temporal scan".to_owned(),
579 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
580 ).into());
581 }
582
583 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
584 return Err(ErrorCode::NotSupported(
585 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
586 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
587 ).into());
588 }
589
590 if ctx.session_ctx().config().enable_locality_backfill()
591 && LocalityProviderCounter::count(plan.clone()) > 5
592 {
593 risingwave_common::license::Feature::LocalityBackfill.check_available()?
594 }
595
596 Ok(optimized_plan.into_phase(plan))
597 }
598
599 fn gen_stream_plan(
601 self,
602 emit_on_window_close: bool,
603 stream_scan_type: StreamScanType,
604 ) -> Result<StreamOptimizedLogicalPlanRoot> {
605 let ctx = self.plan.ctx();
606 let explain_trace = ctx.is_explain_trace();
607
608 let plan = {
609 {
610 if !ctx
611 .session_ctx()
612 .config()
613 .streaming_allow_jsonb_in_stream_key()
614 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
615 {
616 return Err(ErrorCode::NotSupported(
617 err,
618 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
619 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
620 ).into());
621 }
622 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
623 let (plan, out_col_change) = {
624 let (plan, out_col_change) = optimized_plan
625 .plan
626 .logical_rewrite_for_stream(&mut Default::default())?;
627 if out_col_change.is_injective() {
628 (plan, out_col_change)
629 } else {
630 let mut output_indices = (0..plan.schema().len()).collect_vec();
631 #[allow(unused_assignments)]
632 let (mut map, mut target_size) = out_col_change.into_parts();
633
634 target_size = plan.schema().len();
637 let mut tar_exists = vec![false; target_size];
638 for i in map.iter_mut().flatten() {
639 if tar_exists[*i] {
640 output_indices.push(*i);
641 *i = target_size;
642 target_size += 1;
643 } else {
644 tar_exists[*i] = true;
645 }
646 }
647 let plan =
648 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
649 let out_col_change = ColIndexMapping::new(map, target_size);
650 (plan.into(), out_col_change)
651 }
652 };
653
654 if explain_trace {
655 ctx.trace("Logical Rewrite For Stream:");
656 ctx.trace(plan.explain_to_string());
657 }
658
659 optimized_plan.required_dist =
660 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
661 optimized_plan.required_order = out_col_change
662 .rewrite_required_order(&optimized_plan.required_order)
663 .unwrap();
664 optimized_plan.out_fields =
665 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
666 let mut plan = plan.to_stream_with_dist_required(
667 &optimized_plan.required_dist,
668 &mut ToStreamContext::new_with_stream_scan_type(
669 emit_on_window_close,
670 stream_scan_type,
671 ),
672 )?;
673 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
674 optimized_plan.into_phase(plan)
675 }
676 };
677
678 if explain_trace {
679 ctx.trace("To Stream Plan:");
680 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
682 }
683 Ok(plan)
684 }
685
686 fn compute_cardinality(&self) -> Cardinality {
690 CardinalityVisitor.visit(self.plan.clone())
691 }
692
693 pub fn gen_table_plan(
695 self,
696 context: OptimizerContextRef,
697 table_name: String,
698 database_id: DatabaseId,
699 schema_id: SchemaId,
700 CreateTableInfo {
701 columns,
702 pk_column_ids,
703 row_id_index,
704 watermark_descs,
705 source_catalog,
706 version,
707 }: CreateTableInfo,
708 CreateTableProps {
709 definition,
710 append_only,
711 on_conflict,
712 with_version_columns,
713 webhook_info,
714 engine,
715 }: CreateTableProps,
716 ) -> Result<StreamMaterialize> {
717 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
719
720 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
721
722 let pk_column_indices = {
723 let mut id_to_idx = HashMap::new();
724
725 columns.iter().enumerate().for_each(|(idx, c)| {
726 id_to_idx.insert(c.column_id(), idx);
727 });
728 pk_column_ids
729 .iter()
730 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
732 };
733
734 fn inject_project_for_generated_column_if_needed(
735 columns: &[ColumnCatalog],
736 node: StreamPlanRef,
737 ) -> Result<StreamPlanRef> {
738 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
739 if let Some(exprs) = exprs {
740 let logical_project = generic::Project::new(exprs, node);
741 return Ok(StreamProject::new(logical_project).into());
742 }
743 Ok(node)
744 }
745
746 #[derive(PartialEq, Debug, Copy, Clone)]
747 enum PrimaryKeyKind {
748 UserDefinedPrimaryKey,
749 NonAppendOnlyRowIdPk,
750 AppendOnlyRowIdPk,
751 }
752
753 fn inject_dml_node(
754 columns: &[ColumnCatalog],
755 append_only: bool,
756 stream_plan: StreamPlanRef,
757 pk_column_indices: &[usize],
758 kind: PrimaryKeyKind,
759 column_descs: Vec<ColumnDesc>,
760 ) -> Result<StreamPlanRef> {
761 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
762
763 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
765
766 dml_node = match kind {
767 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
768 RequiredDist::hash_shard(pk_column_indices)
769 .streaming_enforce_if_not_satisfies(dml_node)?
770 }
771 PrimaryKeyKind::AppendOnlyRowIdPk => {
772 StreamExchange::new_no_shuffle(dml_node).into()
773 }
774 };
775
776 Ok(dml_node)
777 }
778
779 let kind = if let Some(row_id_index) = row_id_index {
780 assert_eq!(
781 pk_column_indices.iter().exactly_one().copied().unwrap(),
782 row_id_index
783 );
784 if append_only {
785 PrimaryKeyKind::AppendOnlyRowIdPk
786 } else {
787 PrimaryKeyKind::NonAppendOnlyRowIdPk
788 }
789 } else {
790 PrimaryKeyKind::UserDefinedPrimaryKey
791 };
792
793 let column_descs: Vec<ColumnDesc> = columns
794 .iter()
795 .filter(|&c| c.can_dml())
796 .map(|c| c.column_desc.clone())
797 .collect();
798
799 let mut not_null_idxs = vec![];
800 for (idx, column) in column_descs.iter().enumerate() {
801 if !column.nullable {
802 not_null_idxs.push(idx);
803 }
804 }
805
806 let version_column_indices = if !with_version_columns.is_empty() {
807 find_version_column_indices(&columns, with_version_columns)?
808 } else {
809 vec![]
810 };
811
812 let with_external_source = source_catalog.is_some();
813 let (dml_source_node, external_source_node) = if with_external_source {
814 let dummy_source_node = LogicalSource::new(
815 None,
816 columns.clone(),
817 row_id_index,
818 SourceNodeKind::CreateTable,
819 context.clone(),
820 None,
821 )
822 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
823 let mut external_source_node = stream_plan.plan;
824 external_source_node =
825 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
826 external_source_node = match kind {
827 PrimaryKeyKind::UserDefinedPrimaryKey => {
828 RequiredDist::hash_shard(&pk_column_indices)
829 .streaming_enforce_if_not_satisfies(external_source_node)?
830 }
831
832 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
833 StreamExchange::new_no_shuffle(external_source_node).into()
834 }
835 };
836 (dummy_source_node, Some(external_source_node))
837 } else {
838 (stream_plan.plan, None)
839 };
840
841 let dml_node = inject_dml_node(
842 &columns,
843 append_only,
844 dml_source_node,
845 &pk_column_indices,
846 kind,
847 column_descs,
848 )?;
849
850 let dists = external_source_node
851 .iter()
852 .map(|input| input.distribution())
853 .chain([dml_node.distribution()])
854 .unique()
855 .collect_vec();
856
857 let dist = match &dists[..] {
858 &[Distribution::SomeShard, Distribution::HashShard(_)]
859 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
860 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
861 dist.clone()
862 }
863 _ => {
864 unreachable!()
865 }
866 };
867
868 let generated_column_exprs =
869 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
870 let upstream_sink_union = StreamUpstreamSinkUnion::new(
871 context.clone(),
872 dml_node.schema(),
873 dml_node.stream_key(),
874 dist.clone(), append_only,
876 row_id_index.is_none(),
877 generated_column_exprs,
878 );
879
880 let union_inputs = external_source_node
881 .into_iter()
882 .chain([dml_node, upstream_sink_union.into()])
883 .collect_vec();
884
885 let mut stream_plan = StreamUnion::new_with_dist(
886 Union {
887 all: true,
888 inputs: union_inputs,
889 source_col: None,
890 },
891 dist,
892 )
893 .into();
894
895 if !watermark_descs.is_empty() {
897 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
898 }
899
900 if let Some(row_id_index) = row_id_index {
902 match kind {
903 PrimaryKeyKind::UserDefinedPrimaryKey => {
904 unreachable!()
905 }
906 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
907 stream_plan = StreamRowIdGen::new_with_dist(
908 stream_plan,
909 row_id_index,
910 Distribution::HashShard(vec![row_id_index]),
911 )
912 .into();
913 }
914 }
915 }
916
917 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
918
919 if let ConflictBehavior::IgnoreConflict = conflict_behavior
920 && !version_column_indices.is_empty()
921 {
922 Err(ErrorCode::InvalidParameterValue(
923 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
924 ))?
925 }
926
927 let retention_seconds = context.with_options().retention_seconds();
928
929 let table_required_dist = {
930 let mut bitset = FixedBitSet::with_capacity(columns.len());
931 for idx in &pk_column_indices {
932 bitset.insert(*idx);
933 }
934 RequiredDist::ShardByKey(bitset)
935 };
936
937 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
938
939 if !not_null_idxs.is_empty() {
940 stream_plan =
941 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
942 }
943
944 let refreshable = source_catalog
946 .as_ref()
947 .map(|catalog| {
948 catalog.with_properties.is_batch_connector() || {
949 matches!(
950 catalog
951 .refresh_mode
952 .as_ref()
953 .map(|refresh_mode| refresh_mode.refresh_mode),
954 Some(Some(RefreshMode::FullReload(_)))
955 )
956 }
957 })
958 .unwrap_or(false);
959
960 if refreshable && row_id_index.is_some() {
962 return Err(crate::error::ErrorCode::BindError(
963 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
964 .to_owned(),
965 )
966 .into());
967 }
968
969 StreamMaterialize::create_for_table(
970 stream_plan,
971 table_name,
972 database_id,
973 schema_id,
974 table_required_dist,
975 Order::any(),
976 columns,
977 definition,
978 conflict_behavior,
979 version_column_indices,
980 pk_column_indices,
981 row_id_index,
982 version,
983 retention_seconds,
984 webhook_info,
985 engine,
986 refreshable,
987 )
988 }
989
990 pub fn gen_materialize_plan(
992 self,
993 database_id: DatabaseId,
994 schema_id: SchemaId,
995 mv_name: String,
996 definition: String,
997 emit_on_window_close: bool,
998 ) -> Result<StreamMaterialize> {
999 let cardinality = self.compute_cardinality();
1000 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1001 StreamMaterialize::create(
1002 stream_plan,
1003 mv_name,
1004 database_id,
1005 schema_id,
1006 definition,
1007 TableType::MaterializedView,
1008 cardinality,
1009 None,
1010 )
1011 }
1012
1013 pub fn gen_index_plan(
1015 self,
1016 index_name: String,
1017 database_id: DatabaseId,
1018 schema_id: SchemaId,
1019 definition: String,
1020 retention_seconds: Option<NonZeroU32>,
1021 ) -> Result<StreamMaterialize> {
1022 let cardinality = self.compute_cardinality();
1023 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1024
1025 StreamMaterialize::create(
1026 stream_plan,
1027 index_name,
1028 database_id,
1029 schema_id,
1030 definition,
1031 TableType::Index,
1032 cardinality,
1033 retention_seconds,
1034 )
1035 }
1036
1037 pub fn gen_vector_index_plan(
1038 self,
1039 index_name: String,
1040 database_id: DatabaseId,
1041 schema_id: SchemaId,
1042 definition: String,
1043 retention_seconds: Option<NonZeroU32>,
1044 vector_index_info: PbVectorIndexInfo,
1045 ) -> Result<StreamVectorIndexWrite> {
1046 let cardinality = self.compute_cardinality();
1047 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1048
1049 StreamVectorIndexWrite::create(
1050 stream_plan,
1051 index_name,
1052 database_id,
1053 schema_id,
1054 definition,
1055 cardinality,
1056 retention_seconds,
1057 vector_index_info,
1058 )
1059 }
1060
1061 #[allow(clippy::too_many_arguments)]
1063 pub fn gen_sink_plan(
1064 self,
1065 sink_name: String,
1066 definition: String,
1067 properties: WithOptionsSecResolved,
1068 emit_on_window_close: bool,
1069 db_name: String,
1070 sink_from_table_name: String,
1071 format_desc: Option<SinkFormatDesc>,
1072 without_backfill: bool,
1073 target_table: Option<Arc<TableCatalog>>,
1074 partition_info: Option<PartitionComputeInfo>,
1075 user_specified_columns: bool,
1076 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1077 ) -> Result<StreamSink> {
1078 let stream_scan_type = if without_backfill {
1079 StreamScanType::UpstreamOnly
1080 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1081 StreamScanType::SnapshotBackfill
1083 } else if self.should_use_arrangement_backfill() {
1084 StreamScanType::ArrangementBackfill
1085 } else {
1086 StreamScanType::Backfill
1087 };
1088 if auto_refresh_schema_from_table.is_some()
1089 && stream_scan_type != StreamScanType::ArrangementBackfill
1090 {
1091 return Err(ErrorCode::InvalidInputSyntax(format!(
1092 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1093 stream_scan_type
1094 ))
1095 .into());
1096 }
1097 let stream_plan =
1098 self.gen_optimized_stream_plan_inner(emit_on_window_close, stream_scan_type)?;
1099 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1100 let columns = t.columns_without_rw_timestamp();
1101 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1102 });
1103
1104 StreamSink::create(
1105 stream_plan,
1106 sink_name,
1107 db_name,
1108 sink_from_table_name,
1109 target_table,
1110 target_columns_to_plan_mapping,
1111 definition,
1112 properties,
1113 format_desc,
1114 partition_info,
1115 auto_refresh_schema_from_table,
1116 )
1117 }
1118}
1119
1120impl<P: PlanPhase> PlanRoot<P> {
1121 pub fn should_use_arrangement_backfill(&self) -> bool {
1122 let ctx = self.plan.ctx();
1123 let session_ctx = ctx.session_ctx();
1124 let arrangement_backfill_enabled = session_ctx
1125 .env()
1126 .streaming_config()
1127 .developer
1128 .enable_arrangement_backfill;
1129 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1130 }
1131
1132 pub fn should_use_snapshot_backfill(&self) -> bool {
1133 self.plan
1134 .ctx()
1135 .session_ctx()
1136 .config()
1137 .streaming_use_snapshot_backfill()
1138 }
1139
1140 pub fn target_columns_to_plan_mapping(
1142 &self,
1143 tar_cols: &[ColumnCatalog],
1144 user_specified_columns: bool,
1145 ) -> Vec<Option<usize>> {
1146 #[allow(clippy::disallowed_methods)]
1147 let visible_cols: Vec<(usize, String)> = self
1148 .out_fields
1149 .ones()
1150 .zip_eq(self.out_names.iter().cloned())
1151 .collect_vec();
1152
1153 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1154 let visible_col_idxes_by_name = visible_cols
1155 .iter()
1156 .map(|(i, name)| (name.as_ref(), *i))
1157 .collect::<BTreeMap<_, _>>();
1158
1159 tar_cols
1160 .iter()
1161 .enumerate()
1162 .filter(|(_, tar_col)| tar_col.can_dml())
1163 .map(|(tar_i, tar_col)| {
1164 if user_specified_columns {
1165 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1166 } else {
1167 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1168 }
1169 })
1170 .collect()
1171 }
1172}
1173
1174fn find_version_column_indices(
1175 column_catalog: &Vec<ColumnCatalog>,
1176 version_column_names: Vec<String>,
1177) -> Result<Vec<usize>> {
1178 let mut indices = Vec::new();
1179 for version_column_name in version_column_names {
1180 let mut found = false;
1181 for (index, column) in column_catalog.iter().enumerate() {
1182 if column.column_desc.name == version_column_name {
1183 if let &DataType::Jsonb
1184 | &DataType::List(_)
1185 | &DataType::Struct(_)
1186 | &DataType::Bytea
1187 | &DataType::Boolean = column.data_type()
1188 {
1189 return Err(ErrorCode::InvalidInputSyntax(format!(
1190 "Version column {} must be of a comparable data type",
1191 version_column_name
1192 ))
1193 .into());
1194 }
1195 indices.push(index);
1196 found = true;
1197 break;
1198 }
1199 }
1200 if !found {
1201 return Err(ErrorCode::InvalidInputSyntax(format!(
1202 "Version column {} not found",
1203 version_column_name
1204 ))
1205 .into());
1206 }
1207 }
1208 Ok(indices)
1209}
1210
1211fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1212 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1213
1214 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1215 if let Some(error) = const_eval_rewriter.error {
1216 return Err(error);
1217 }
1218 Ok(plan)
1219}
1220
1221fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1222 ctx: OptimizerContextRef,
1223 plan: PlanRef<C>,
1224) -> Result<PlanRef<C>> {
1225 let mut v = TimestamptzExprFinder::default();
1226 plan.visit_exprs_recursive(&mut v);
1227 if v.has() {
1228 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1229 } else {
1230 Ok(plan)
1231 }
1232}
1233
1234fn exist_and_no_exchange_before(
1235 plan: &BatchPlanRef,
1236 is_candidate: fn(&BatchPlanRef) -> bool,
1237) -> bool {
1238 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1239 return false;
1240 }
1241 is_candidate(plan)
1242 || plan
1243 .inputs()
1244 .iter()
1245 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1246}
1247
1248impl BatchPlanRef {
1249 fn is_user_table_scan(&self) -> bool {
1250 self.node_type() == BatchPlanNodeType::BatchSeqScan
1251 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1252 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1253 }
1254
1255 fn is_source_scan(&self) -> bool {
1256 self.node_type() == BatchPlanNodeType::BatchSource
1257 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1258 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1259 }
1260
1261 fn is_insert(&self) -> bool {
1262 self.node_type() == BatchPlanNodeType::BatchInsert
1263 }
1264
1265 fn is_update(&self) -> bool {
1266 self.node_type() == BatchPlanNodeType::BatchUpdate
1267 }
1268
1269 fn is_delete(&self) -> bool {
1270 self.node_type() == BatchPlanNodeType::BatchDelete
1271 }
1272}
1273
1274fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1280 assert_eq!(plan.distribution(), &Distribution::Single);
1281 exist_and_no_exchange_before(&plan, |plan| {
1282 plan.is_user_table_scan()
1283 || plan.is_source_scan()
1284 || plan.is_insert()
1285 || plan.is_update()
1286 || plan.is_delete()
1287 })
1288}
1289
1290fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1293 assert_eq!(plan.distribution(), &Distribution::Single);
1294 exist_and_no_exchange_before(&plan, |plan| {
1295 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1296 })
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use super::*;
1302 use crate::optimizer::plan_node::LogicalValues;
1303
1304 #[tokio::test]
1305 async fn test_as_subplan() {
1306 let ctx = OptimizerContext::mock().await;
1307 let values = LogicalValues::new(
1308 vec![],
1309 Schema::new(vec![
1310 Field::with_name(DataType::Int32, "v1"),
1311 Field::with_name(DataType::Varchar, "v2"),
1312 ]),
1313 ctx,
1314 )
1315 .into();
1316 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1317 let out_names = vec!["v1".into()];
1318 let root = PlanRoot::new_with_logical_plan(
1319 values,
1320 RequiredDist::Any,
1321 Order::any(),
1322 out_fields,
1323 out_names,
1324 );
1325 let subplan = root.into_unordered_subplan();
1326 assert_eq!(
1327 subplan.schema(),
1328 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1329 );
1330 }
1331}