1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DataFusionExecuteCheckerExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72 ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87 BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88 PlanTreeNode, Stream, StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion,
89 StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
96
97#[derive(Educe)]
108#[educe(Debug, Clone)]
109pub struct PlanRoot<P: PlanPhase> {
110 pub plan: PlanRef<P::Convention>,
112 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
114 _phase: PhantomData<P>,
115 required_dist: RequiredDist,
116 required_order: Order,
117 out_fields: FixedBitSet,
118 out_names: Vec<String>,
119}
120
121pub trait PlanPhase {
127 type Convention: ConventionMarker;
128}
129
130macro_rules! for_all_phase {
131 () => {
132 for_all_phase! {
133 { Logical, $crate::optimizer::plan_node::Logical },
134 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
135 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
136 { Batch, $crate::optimizer::plan_node::Batch },
137 { Stream, $crate::optimizer::plan_node::Stream }
138 }
139 };
140 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
141 $(
142 paste::paste! {
143 pub struct [< PlanPhase$phase >];
144 impl PlanPhase for [< PlanPhase$phase >] {
145 type Convention = $convention;
146 }
147 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
148 }
149 )+
150 }
151}
152
153for_all_phase!();
154
155impl LogicalPlanRoot {
156 pub fn new_with_logical_plan(
157 plan: LogicalPlanRef,
158 required_dist: RequiredDist,
159 required_order: Order,
160 out_fields: FixedBitSet,
161 out_names: Vec<String>,
162 ) -> Self {
163 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
164 }
165}
166
167impl BatchPlanRoot {
168 pub fn new_with_batch_plan(
169 plan: BatchPlanRef,
170 required_dist: RequiredDist,
171 required_order: Order,
172 out_fields: FixedBitSet,
173 out_names: Vec<String>,
174 ) -> Self {
175 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
176 }
177}
178
179impl<P: PlanPhase> PlanRoot<P> {
180 fn new_inner(
181 plan: PlanRef<P::Convention>,
182 required_dist: RequiredDist,
183 required_order: Order,
184 out_fields: FixedBitSet,
185 out_names: Vec<String>,
186 ) -> Self {
187 let input_schema = plan.schema();
188 assert_eq!(input_schema.fields().len(), out_fields.len());
189 assert_eq!(out_fields.count_ones(..), out_names.len());
190
191 Self {
192 plan,
193 _phase: PhantomData,
194 required_dist,
195 required_order,
196 out_fields,
197 out_names,
198 }
199 }
200
201 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
202 PlanRoot {
203 plan,
204 _phase: PhantomData,
205 required_dist: self.required_dist,
206 required_order: self.required_order,
207 out_fields: self.out_fields,
208 out_names: self.out_names,
209 }
210 }
211
212 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
217 if out_names.len() != self.out_fields.count_ones(..) {
218 Err(ErrorCode::InvalidInputSyntax(
219 "number of column names does not match number of columns".to_owned(),
220 ))?
221 }
222 self.out_names = out_names;
223 Ok(())
224 }
225
226 pub fn schema(&self) -> Schema {
228 Schema {
231 fields: self
232 .out_fields
233 .ones()
234 .map(|i| self.plan.schema().fields()[i].clone())
235 .zip_eq_debug(&self.out_names)
236 .map(|(field, name)| Field {
237 name: name.clone(),
238 ..field
239 })
240 .collect(),
241 }
242 }
243}
244
245impl LogicalPlanRoot {
246 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
250 if self.out_fields.count_ones(..) == self.out_fields.len() {
251 return self.plan;
252 }
253 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
254 }
255
256 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
260 use generic::Agg;
261 use plan_node::PlanAggCall;
262 use risingwave_common::types::ListValue;
263 use risingwave_expr::aggregate::PbAggKind;
264
265 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
266 use crate::utils::{Condition, IndexSet};
267
268 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
269 bail!("subquery must return only one column");
270 };
271 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
272 let return_type = DataType::list(input_column_type.clone());
273 let agg = Agg::new(
274 vec![PlanAggCall {
275 agg_type: PbAggKind::ArrayAgg.into(),
276 return_type: return_type.clone(),
277 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
278 distinct: false,
279 order_by: self.required_order.column_orders,
280 filter: Condition::true_cond(),
281 direct_args: vec![],
282 }],
283 IndexSet::empty(),
284 self.plan,
285 );
286 Ok(LogicalProject::create(
287 agg.into(),
288 vec![
289 FunctionCall::new(
290 ExprType::Coalesce,
291 vec![
292 InputRef::new(0, return_type).into(),
293 ExprImpl::literal_list(
294 ListValue::empty(&input_column_type),
295 input_column_type,
296 ),
297 ],
298 )
299 .unwrap()
300 .into(),
301 ],
302 ))
303 }
304
305 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
307 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
308 Ok(self)
309 }
310
311 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
313 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
314 Ok(self.into_phase(plan))
315 }
316
317 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 self.gen_optimized_logical_plan_for_batch()?
319 .gen_batch_plan()
320 }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
326 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
327 return Err(ErrorCode::NotSupported(
328 "do not support temporal join for batch queries".to_owned(),
329 "please use temporal join in streaming queries".to_owned(),
330 )
331 .into());
332 }
333
334 let ctx = self.plan.ctx();
335 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
337
338 plan = const_eval_exprs(plan)?;
340
341 if ctx.is_explain_trace() {
342 ctx.trace("Const eval exprs:");
343 ctx.trace(plan.explain_to_string());
344 }
345
346 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
348 if ctx.is_explain_trace() {
349 ctx.trace("To Batch Plan:");
350 ctx.trace(plan.explain_to_string());
351 }
352
353 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
354 "Merge BatchProject",
355 vec![BatchProjectMergeRule::create()],
356 ApplyOrder::BottomUp,
357 ))?;
358
359 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
361
362 if ctx.is_explain_trace() {
363 ctx.trace("Inline Session Timezone:");
364 ctx.trace(plan.explain_to_string());
365 }
366
367 #[cfg(debug_assertions)]
368 InputRefValidator.validate(plan.clone());
369 assert_eq!(
370 *plan.distribution(),
371 Distribution::Single,
372 "{}",
373 plan.explain_to_string()
374 );
375 assert!(
376 !has_batch_exchange(plan.clone()),
377 "{}",
378 plan.explain_to_string()
379 );
380
381 let ctx = plan.ctx();
382 if ctx.is_explain_trace() {
383 ctx.trace("To Batch Physical Plan:");
384 ctx.trace(plan.explain_to_string());
385 }
386
387 Ok(self.into_phase(plan))
388 }
389
390 #[cfg(feature = "datafusion")]
391 pub fn gen_datafusion_logical_plan(self) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
392 use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
393 use datafusion_common::Column;
394 use plan_visitor::LogicalPlanToDataFusionExt;
395
396 use crate::datafusion::{InputColumns, convert_column_order};
397
398 tracing::debug!(
399 "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
400 self.plan
401 );
402
403 let ctx = self.plan.ctx();
404 let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
406 plan = const_eval_exprs(plan)?;
407
408 let mut df_plan = plan.to_datafusion_logical_plan()?;
409
410 if !self.required_order.is_any() {
411 let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
412 let expr = self
413 .required_order
414 .column_orders
415 .iter()
416 .map(|column_order| convert_column_order(column_order, &input_columns))
417 .collect_vec();
418 df_plan = Arc::new(LogicalPlan::Sort(Sort {
419 expr,
420 input: df_plan,
421 fetch: None,
422 }));
423 }
424
425 if self.out_names.len() < df_plan.schema().fields().len() {
426 let df_schema = df_plan.schema().as_ref();
427 let projection_exprs = self
428 .out_fields
429 .ones()
430 .zip_eq_debug(self.out_names.iter())
431 .map(|(i, name)| {
432 DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
433 })
434 .collect_vec();
435 df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
436 projection_exprs,
437 df_plan,
438 )?));
439 }
440
441 tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
442
443 Ok(df_plan)
444 }
445}
446
447impl BatchPlanRoot {
448 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
450 self.required_dist = RequiredDist::single();
451 let mut plan = self.plan;
452
453 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
455
456 let ctx = plan.ctx();
457 if ctx.is_explain_trace() {
458 ctx.trace("To Batch Distributed Plan:");
459 ctx.trace(plan.explain_to_string());
460 }
461 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
462 plan =
463 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
464 }
465
466 if self.out_fields.count_ones(..) != self.out_fields.len() {
468 plan =
469 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
470 }
471
472 let plan = plan.optimize_by_rules(&OptimizationStage::new(
474 "Push Limit To Scan",
475 vec![BatchPushLimitToScanRule::create()],
476 ApplyOrder::BottomUp,
477 ))?;
478
479 Ok(plan)
480 }
481
482 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
484 let mut plan = self.plan;
485
486 plan = plan.to_local_with_order_required(&self.required_order)?;
488
489 let insert_exchange = match plan.distribution() {
492 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
493 _ => true,
494 };
495 if insert_exchange {
496 plan =
497 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
498 }
499
500 if self.out_fields.count_ones(..) != self.out_fields.len() {
502 plan =
503 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
504 }
505
506 let ctx = plan.ctx();
507 if ctx.is_explain_trace() {
508 ctx.trace("To Batch Local Plan:");
509 ctx.trace(plan.explain_to_string());
510 }
511
512 let plan = plan.optimize_by_rules(&OptimizationStage::new(
514 "Push Limit To Scan",
515 vec![BatchPushLimitToScanRule::create()],
516 ApplyOrder::BottomUp,
517 ))?;
518
519 Ok(plan)
520 }
521}
522
523impl LogicalPlanRoot {
524 fn gen_optimized_stream_plan(
526 self,
527 emit_on_window_close: bool,
528 allow_snapshot_backfill: bool,
529 ) -> Result<StreamOptimizedLogicalPlanRoot> {
530 let backfill_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
531 BackfillType::SnapshotBackfill
532 } else if self.should_use_arrangement_backfill() {
533 BackfillType::ArrangementBackfill
534 } else {
535 BackfillType::Backfill
536 };
537 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)
538 }
539
540 fn gen_optimized_stream_plan_inner(
541 self,
542 emit_on_window_close: bool,
543 backfill_type: BackfillType,
544 ) -> Result<StreamOptimizedLogicalPlanRoot> {
545 let ctx = self.plan.ctx();
546 let _explain_trace = ctx.is_explain_trace();
547
548 let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
549
550 let mut plan = optimized_plan
551 .plan
552 .clone()
553 .optimize_by_rules(&OptimizationStage::new(
554 "Merge StreamProject",
555 vec![StreamProjectMergeRule::create()],
556 ApplyOrder::BottomUp,
557 ))?;
558
559 if ctx
560 .session_ctx()
561 .config()
562 .streaming_separate_consecutive_join()
563 {
564 plan = plan.optimize_by_rules(&OptimizationStage::new(
565 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
566 vec![SeparateConsecutiveJoinRule::create()],
567 ApplyOrder::BottomUp,
568 ))?;
569 }
570
571 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
575 plan = plan.optimize_by_rules(&OptimizationStage::new(
576 "Add Logstore for Unaligned join",
577 vec![AddLogstoreRule::create()],
578 ApplyOrder::BottomUp,
579 ))?;
580 }
581
582 if ctx.session_ctx().config().streaming_enable_delta_join()
583 && ctx.session_ctx().config().enable_index_selection()
584 {
585 plan = plan.optimize_by_rules(&OptimizationStage::new(
588 "To IndexDeltaJoin",
589 vec![IndexDeltaJoinRule::create()],
590 ApplyOrder::BottomUp,
591 ))?;
592 }
593 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
595
596 if ctx.is_explain_trace() {
597 ctx.trace("Inline session timezone:");
598 ctx.trace(plan.explain_to_string());
599 }
600
601 plan = const_eval_exprs(plan)?;
603
604 if ctx.is_explain_trace() {
605 ctx.trace("Const eval exprs:");
606 ctx.trace(plan.explain_to_string());
607 }
608
609 #[cfg(debug_assertions)]
610 InputRefValidator.validate(plan.clone());
611
612 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
613 return Err(ErrorCode::NotSupported(
614 "exist dangling temporal scan".to_owned(),
615 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
616 ).into());
617 }
618
619 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
620 return Err(ErrorCode::NotSupported(
621 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
622 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
623 ).into());
624 }
625
626 if ctx.session_ctx().config().enable_locality_backfill()
627 && LocalityProviderCounter::count(plan.clone()) > 5
628 {
629 risingwave_common::license::Feature::LocalityBackfill.check_available()?
630 }
631
632 Ok(optimized_plan.into_phase(plan))
633 }
634
635 fn gen_stream_plan(
637 self,
638 emit_on_window_close: bool,
639 backfill_type: BackfillType,
640 ) -> Result<StreamOptimizedLogicalPlanRoot> {
641 let ctx = self.plan.ctx();
642 let explain_trace = ctx.is_explain_trace();
643
644 let plan = {
645 {
646 if !ctx
647 .session_ctx()
648 .config()
649 .streaming_allow_jsonb_in_stream_key()
650 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
651 {
652 return Err(ErrorCode::NotSupported(
653 err,
654 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
655 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
656 ).into());
657 }
658 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
659 let (plan, out_col_change) = {
660 let (plan, out_col_change) = optimized_plan
661 .plan
662 .logical_rewrite_for_stream(&mut Default::default())?;
663 if out_col_change.is_injective() {
664 (plan, out_col_change)
665 } else {
666 let mut output_indices = (0..plan.schema().len()).collect_vec();
667 #[allow(unused_assignments)]
668 let (mut map, mut target_size) = out_col_change.into_parts();
669
670 target_size = plan.schema().len();
673 let mut tar_exists = vec![false; target_size];
674 for i in map.iter_mut().flatten() {
675 if tar_exists[*i] {
676 output_indices.push(*i);
677 *i = target_size;
678 target_size += 1;
679 } else {
680 tar_exists[*i] = true;
681 }
682 }
683 let plan =
684 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
685 let out_col_change = ColIndexMapping::new(map, target_size);
686 (plan.into(), out_col_change)
687 }
688 };
689
690 if explain_trace {
691 ctx.trace("Logical Rewrite For Stream:");
692 ctx.trace(plan.explain_to_string());
693 }
694
695 optimized_plan.required_dist =
696 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
697 optimized_plan.required_order = out_col_change
698 .rewrite_required_order(&optimized_plan.required_order)
699 .unwrap();
700 optimized_plan.out_fields =
701 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
702 let mut plan = plan.to_stream_with_dist_required(
703 &optimized_plan.required_dist,
704 &mut ToStreamContext::new_with_backfill_type(
705 emit_on_window_close,
706 backfill_type,
707 ),
708 )?;
709 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
710 optimized_plan.into_phase(plan)
711 }
712 };
713
714 if explain_trace {
715 ctx.trace("To Stream Plan:");
716 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
718 }
719 Ok(plan)
720 }
721
722 fn compute_cardinality(&self) -> Cardinality {
726 CardinalityVisitor.visit(self.plan.clone())
727 }
728
729 pub fn gen_table_plan(
731 self,
732 context: OptimizerContextRef,
733 table_name: String,
734 database_id: DatabaseId,
735 schema_id: SchemaId,
736 CreateTableInfo {
737 columns,
738 pk_column_ids,
739 row_id_index,
740 watermark_descs,
741 source_catalog,
742 version,
743 }: CreateTableInfo,
744 CreateTableProps {
745 definition,
746 append_only,
747 on_conflict,
748 with_version_columns,
749 webhook_info,
750 engine,
751 }: CreateTableProps,
752 ) -> Result<StreamMaterialize> {
753 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
755
756 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
757
758 let pk_column_indices = {
759 let mut id_to_idx = HashMap::new();
760
761 columns.iter().enumerate().for_each(|(idx, c)| {
762 id_to_idx.insert(c.column_id(), idx);
763 });
764 pk_column_ids
765 .iter()
766 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
768 };
769
770 fn inject_project_for_generated_column_if_needed(
771 columns: &[ColumnCatalog],
772 node: StreamPlanRef,
773 ) -> Result<StreamPlanRef> {
774 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
775 if let Some(exprs) = exprs {
776 let logical_project = generic::Project::new(exprs, node);
777 return Ok(StreamProject::new(logical_project).into());
778 }
779 Ok(node)
780 }
781
782 #[derive(PartialEq, Debug, Copy, Clone)]
783 enum PrimaryKeyKind {
784 UserDefinedPrimaryKey,
785 NonAppendOnlyRowIdPk,
786 AppendOnlyRowIdPk,
787 }
788
789 fn inject_dml_node(
790 columns: &[ColumnCatalog],
791 append_only: bool,
792 stream_plan: StreamPlanRef,
793 pk_column_indices: &[usize],
794 kind: PrimaryKeyKind,
795 column_descs: Vec<ColumnDesc>,
796 ) -> Result<StreamPlanRef> {
797 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
798
799 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
801
802 dml_node = match kind {
803 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
804 RequiredDist::hash_shard(pk_column_indices)
805 .streaming_enforce_if_not_satisfies(dml_node)?
806 }
807 PrimaryKeyKind::AppendOnlyRowIdPk => {
808 StreamExchange::new_no_shuffle(dml_node).into()
809 }
810 };
811
812 Ok(dml_node)
813 }
814
815 let kind = if let Some(row_id_index) = row_id_index {
816 assert_eq!(
817 pk_column_indices.iter().exactly_one().copied().unwrap(),
818 row_id_index
819 );
820 if append_only {
821 PrimaryKeyKind::AppendOnlyRowIdPk
822 } else {
823 PrimaryKeyKind::NonAppendOnlyRowIdPk
824 }
825 } else {
826 PrimaryKeyKind::UserDefinedPrimaryKey
827 };
828
829 let column_descs: Vec<ColumnDesc> = columns
830 .iter()
831 .filter(|&c| c.can_dml())
832 .map(|c| c.column_desc.clone())
833 .collect();
834
835 let mut not_null_idxs = vec![];
836 for (idx, column) in column_descs.iter().enumerate() {
837 if !column.nullable {
838 not_null_idxs.push(idx);
839 }
840 }
841
842 let version_column_indices = if !with_version_columns.is_empty() {
843 find_version_column_indices(&columns, with_version_columns)?
844 } else {
845 vec![]
846 };
847
848 let with_external_source = source_catalog.is_some();
849 let (dml_source_node, external_source_node) = if with_external_source {
850 let dummy_source_node = LogicalSource::new(
851 None,
852 columns.clone(),
853 row_id_index,
854 SourceNodeKind::CreateTable,
855 context.clone(),
856 None,
857 )
858 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
859 let mut external_source_node = stream_plan.plan;
860 external_source_node =
861 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
862 external_source_node = match kind {
863 PrimaryKeyKind::UserDefinedPrimaryKey => {
864 RequiredDist::hash_shard(&pk_column_indices)
865 .streaming_enforce_if_not_satisfies(external_source_node)?
866 }
867
868 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
869 StreamExchange::new_no_shuffle(external_source_node).into()
870 }
871 };
872 (dummy_source_node, Some(external_source_node))
873 } else {
874 (stream_plan.plan, None)
875 };
876
877 let dml_node = inject_dml_node(
878 &columns,
879 append_only,
880 dml_source_node,
881 &pk_column_indices,
882 kind,
883 column_descs,
884 )?;
885
886 let dists = external_source_node
887 .iter()
888 .map(|input| input.distribution())
889 .chain([dml_node.distribution()])
890 .unique()
891 .collect_vec();
892
893 let dist = match &dists[..] {
894 &[Distribution::SomeShard, Distribution::HashShard(_)]
895 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
896 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
897 dist.clone()
898 }
899 _ => {
900 unreachable!()
901 }
902 };
903
904 let generated_column_exprs =
905 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
906 let upstream_sink_union = StreamUpstreamSinkUnion::new(
907 context.clone(),
908 dml_node.schema(),
909 dml_node.stream_key(),
910 dist.clone(), append_only,
912 row_id_index.is_none(),
913 generated_column_exprs,
914 );
915
916 let union_inputs = external_source_node
917 .into_iter()
918 .chain([dml_node, upstream_sink_union.into()])
919 .collect_vec();
920
921 let mut stream_plan = StreamUnion::new_with_dist(
922 Union {
923 all: true,
924 inputs: union_inputs,
925 source_col: None,
926 },
927 dist,
928 )
929 .into();
930
931 let ttl_watermark_indices = watermark_descs
932 .iter()
933 .filter(|d| d.with_ttl)
934 .map(|d| d.watermark_idx as usize)
935 .collect_vec();
936
937 if !watermark_descs.is_empty() {
939 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
940 }
941
942 if let Some(row_id_index) = row_id_index {
944 match kind {
945 PrimaryKeyKind::UserDefinedPrimaryKey => {
946 unreachable!()
947 }
948 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
949 stream_plan = StreamRowIdGen::new_with_dist(
950 stream_plan,
951 row_id_index,
952 Distribution::HashShard(vec![row_id_index]),
953 )
954 .into();
955 }
956 }
957 }
958
959 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
960
961 if let ConflictBehavior::IgnoreConflict = conflict_behavior
962 && !version_column_indices.is_empty()
963 {
964 Err(ErrorCode::InvalidParameterValue(
965 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
966 ))?
967 }
968
969 let retention_seconds = context.with_options().retention_seconds();
970
971 let table_required_dist = {
972 let mut bitset = FixedBitSet::with_capacity(columns.len());
973 for idx in &pk_column_indices {
974 bitset.insert(*idx);
975 }
976 RequiredDist::ShardByKey(bitset)
977 };
978
979 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
980
981 if !not_null_idxs.is_empty() {
982 stream_plan =
983 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
984 }
985
986 let refreshable = source_catalog
988 .as_ref()
989 .map(|catalog| {
990 catalog.with_properties.is_batch_connector() || {
991 matches!(
992 catalog
993 .refresh_mode
994 .as_ref()
995 .map(|refresh_mode| refresh_mode.refresh_mode),
996 Some(Some(RefreshMode::FullReload(_)))
997 )
998 }
999 })
1000 .unwrap_or(false);
1001
1002 if refreshable && row_id_index.is_some() {
1004 return Err(crate::error::ErrorCode::BindError(
1005 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1006 .to_owned(),
1007 )
1008 .into());
1009 }
1010
1011 StreamMaterialize::create_for_table(
1012 stream_plan,
1013 table_name,
1014 database_id,
1015 schema_id,
1016 table_required_dist,
1017 Order::any(),
1018 columns,
1019 definition,
1020 conflict_behavior,
1021 version_column_indices,
1022 pk_column_indices,
1023 ttl_watermark_indices,
1024 row_id_index,
1025 version,
1026 retention_seconds,
1027 webhook_info,
1028 engine,
1029 refreshable,
1030 )
1031 }
1032
1033 pub fn gen_materialize_plan(
1035 self,
1036 database_id: DatabaseId,
1037 schema_id: SchemaId,
1038 mv_name: String,
1039 definition: String,
1040 emit_on_window_close: bool,
1041 ) -> Result<StreamMaterialize> {
1042 let cardinality = self.compute_cardinality();
1043 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1044 StreamMaterialize::create(
1045 stream_plan,
1046 mv_name,
1047 database_id,
1048 schema_id,
1049 definition,
1050 TableType::MaterializedView,
1051 cardinality,
1052 None,
1053 )
1054 }
1055
1056 pub fn gen_index_plan(
1058 self,
1059 index_name: String,
1060 database_id: DatabaseId,
1061 schema_id: SchemaId,
1062 definition: String,
1063 retention_seconds: Option<NonZeroU32>,
1064 ) -> Result<StreamMaterialize> {
1065 let cardinality = self.compute_cardinality();
1066 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1067
1068 StreamMaterialize::create(
1069 stream_plan,
1070 index_name,
1071 database_id,
1072 schema_id,
1073 definition,
1074 TableType::Index,
1075 cardinality,
1076 retention_seconds,
1077 )
1078 }
1079
1080 pub fn gen_vector_index_plan(
1081 self,
1082 index_name: String,
1083 database_id: DatabaseId,
1084 schema_id: SchemaId,
1085 definition: String,
1086 retention_seconds: Option<NonZeroU32>,
1087 vector_index_info: PbVectorIndexInfo,
1088 ) -> Result<StreamVectorIndexWrite> {
1089 let cardinality = self.compute_cardinality();
1090 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1091
1092 StreamVectorIndexWrite::create(
1093 stream_plan,
1094 index_name,
1095 database_id,
1096 schema_id,
1097 definition,
1098 cardinality,
1099 retention_seconds,
1100 vector_index_info,
1101 )
1102 }
1103
1104 #[expect(clippy::too_many_arguments)]
1106 pub fn gen_sink_plan(
1107 self,
1108 sink_name: String,
1109 definition: String,
1110 properties: WithOptionsSecResolved,
1111 emit_on_window_close: bool,
1112 db_name: String,
1113 sink_from_table_name: String,
1114 format_desc: Option<SinkFormatDesc>,
1115 without_backfill: bool,
1116 target_table: Option<Arc<TableCatalog>>,
1117 partition_info: Option<PartitionComputeInfo>,
1118 user_specified_columns: bool,
1119 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1120 allow_snapshot_backfill: bool,
1121 ) -> Result<StreamSink> {
1122 let backfill_type = if without_backfill {
1123 BackfillType::UpstreamOnly
1124 } else if allow_snapshot_backfill
1125 && self.should_use_snapshot_backfill()
1126 && {
1127 if auto_refresh_schema_from_table.is_some() {
1128 self.plan.ctx().session_ctx().notice_to_user("Auto schema change only support for ArrangementBackfill. Switched to use ArrangementBackfill");
1129 false
1130 } else {
1131 true
1132 }
1133 }
1134 {
1135 assert!(
1136 target_table.is_none(),
1137 "should not allow snapshot backfill for sink-into-table"
1138 );
1139 BackfillType::SnapshotBackfill
1141 } else if self.should_use_arrangement_backfill() {
1142 BackfillType::ArrangementBackfill
1143 } else {
1144 BackfillType::Backfill
1145 };
1146 if auto_refresh_schema_from_table.is_some()
1147 && backfill_type != BackfillType::ArrangementBackfill
1148 {
1149 return Err(ErrorCode::InvalidInputSyntax(format!(
1150 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1151 backfill_type
1152 ))
1153 .into());
1154 }
1155 let stream_plan =
1156 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)?;
1157 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1158 let columns = t.columns_without_rw_timestamp();
1159 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1160 });
1161
1162 StreamSink::create(
1163 stream_plan,
1164 sink_name,
1165 db_name,
1166 sink_from_table_name,
1167 target_table,
1168 target_columns_to_plan_mapping,
1169 definition,
1170 properties,
1171 format_desc,
1172 partition_info,
1173 auto_refresh_schema_from_table,
1174 )
1175 }
1176
1177 pub fn should_use_arrangement_backfill(&self) -> bool {
1178 let ctx = self.plan.ctx();
1179 let session_ctx = ctx.session_ctx();
1180 let arrangement_backfill_enabled = session_ctx
1181 .env()
1182 .streaming_config()
1183 .developer
1184 .enable_arrangement_backfill;
1185 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1186 }
1187
1188 pub fn should_use_snapshot_backfill(&self) -> bool {
1189 let ctx = self.plan.ctx();
1190 let session_ctx = ctx.session_ctx();
1191 let use_snapshot_backfill = session_ctx.config().streaming_use_snapshot_backfill();
1192 if use_snapshot_backfill {
1193 if let Some(warning_msg) = self.plan.forbid_snapshot_backfill() {
1194 self.plan.ctx().session_ctx().notice_to_user(warning_msg);
1195 false
1196 } else {
1197 true
1198 }
1199 } else {
1200 false
1201 }
1202 }
1203}
1204
1205impl<P: PlanPhase> PlanRoot<P> {
1206 pub fn target_columns_to_plan_mapping(
1208 &self,
1209 tar_cols: &[ColumnCatalog],
1210 user_specified_columns: bool,
1211 ) -> Vec<Option<usize>> {
1212 #[allow(clippy::disallowed_methods)]
1213 let visible_cols: Vec<(usize, String)> = self
1214 .out_fields
1215 .ones()
1216 .zip_eq(self.out_names.iter().cloned())
1217 .collect_vec();
1218
1219 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1220 let visible_col_idxes_by_name = visible_cols
1221 .iter()
1222 .map(|(i, name)| (name.as_ref(), *i))
1223 .collect::<BTreeMap<_, _>>();
1224
1225 tar_cols
1226 .iter()
1227 .enumerate()
1228 .filter(|(_, tar_col)| tar_col.can_dml())
1229 .map(|(tar_i, tar_col)| {
1230 if user_specified_columns {
1231 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1232 } else {
1233 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1234 }
1235 })
1236 .collect()
1237 }
1238}
1239
1240fn find_version_column_indices(
1241 column_catalog: &Vec<ColumnCatalog>,
1242 version_column_names: Vec<String>,
1243) -> Result<Vec<usize>> {
1244 let mut indices = Vec::new();
1245 for version_column_name in version_column_names {
1246 let mut found = false;
1247 for (index, column) in column_catalog.iter().enumerate() {
1248 if column.column_desc.name == version_column_name {
1249 if let &DataType::Jsonb
1250 | &DataType::List(_)
1251 | &DataType::Struct(_)
1252 | &DataType::Bytea
1253 | &DataType::Boolean = column.data_type()
1254 {
1255 return Err(ErrorCode::InvalidInputSyntax(format!(
1256 "Version column {} must be of a comparable data type",
1257 version_column_name
1258 ))
1259 .into());
1260 }
1261 indices.push(index);
1262 found = true;
1263 break;
1264 }
1265 }
1266 if !found {
1267 return Err(ErrorCode::InvalidInputSyntax(format!(
1268 "Version column {} not found",
1269 version_column_name
1270 ))
1271 .into());
1272 }
1273 }
1274 Ok(indices)
1275}
1276
1277fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1278 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1279
1280 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1281 if let Some(error) = const_eval_rewriter.error {
1282 return Err(error);
1283 }
1284 Ok(plan)
1285}
1286
1287fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1288 ctx: OptimizerContextRef,
1289 plan: PlanRef<C>,
1290) -> Result<PlanRef<C>> {
1291 let mut v = TimestamptzExprFinder::default();
1292 plan.visit_exprs_recursive(&mut v);
1293 if v.has() {
1294 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1295 } else {
1296 Ok(plan)
1297 }
1298}
1299
1300fn exist_and_no_exchange_before(
1301 plan: &BatchPlanRef,
1302 is_candidate: fn(&BatchPlanRef) -> bool,
1303) -> bool {
1304 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1305 return false;
1306 }
1307 is_candidate(plan)
1308 || plan
1309 .inputs()
1310 .iter()
1311 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1312}
1313
1314impl BatchPlanRef {
1315 fn is_user_table_scan(&self) -> bool {
1316 self.node_type() == BatchPlanNodeType::BatchSeqScan
1317 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1318 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1319 }
1320
1321 fn is_source_scan(&self) -> bool {
1322 self.node_type() == BatchPlanNodeType::BatchSource
1323 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1324 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1325 }
1326
1327 fn is_insert(&self) -> bool {
1328 self.node_type() == BatchPlanNodeType::BatchInsert
1329 }
1330
1331 fn is_update(&self) -> bool {
1332 self.node_type() == BatchPlanNodeType::BatchUpdate
1333 }
1334
1335 fn is_delete(&self) -> bool {
1336 self.node_type() == BatchPlanNodeType::BatchDelete
1337 }
1338}
1339
1340fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1346 assert_eq!(plan.distribution(), &Distribution::Single);
1347 exist_and_no_exchange_before(&plan, |plan| {
1348 plan.is_user_table_scan()
1349 || plan.is_source_scan()
1350 || plan.is_insert()
1351 || plan.is_update()
1352 || plan.is_delete()
1353 })
1354}
1355
1356fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1359 assert_eq!(plan.distribution(), &Distribution::Single);
1360 exist_and_no_exchange_before(&plan, |plan| {
1361 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1362 })
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367 use super::*;
1368 use crate::optimizer::plan_node::LogicalValues;
1369
1370 #[tokio::test]
1371 async fn test_as_subplan() {
1372 let ctx = OptimizerContext::mock().await;
1373 let values = LogicalValues::new(
1374 vec![],
1375 Schema::new(vec![
1376 Field::with_name(DataType::Int32, "v1"),
1377 Field::with_name(DataType::Varchar, "v2"),
1378 ]),
1379 ctx,
1380 )
1381 .into();
1382 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1383 let out_names = vec!["v1".into()];
1384 let root = PlanRoot::new_with_logical_plan(
1385 values,
1386 RequiredDist::Any,
1387 Order::any(),
1388 out_fields,
1389 out_names,
1390 );
1391 let subplan = root.into_unordered_subplan();
1392 assert_eq!(
1393 subplan.schema(),
1394 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1395 );
1396 }
1397}