1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DataFusionExecuteCheckerExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72 ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87 BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88 PlanTreeNode, Stream, StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion,
89 StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
96
97#[derive(Educe)]
108#[educe(Debug, Clone)]
109pub struct PlanRoot<P: PlanPhase> {
110 pub plan: PlanRef<P::Convention>,
112 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
114 _phase: PhantomData<P>,
115 required_dist: RequiredDist,
116 required_order: Order,
117 out_fields: FixedBitSet,
118 out_names: Vec<String>,
119}
120
121pub trait PlanPhase {
127 type Convention: ConventionMarker;
128}
129
130macro_rules! for_all_phase {
131 () => {
132 for_all_phase! {
133 { Logical, $crate::optimizer::plan_node::Logical },
134 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
135 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
136 { Batch, $crate::optimizer::plan_node::Batch },
137 { Stream, $crate::optimizer::plan_node::Stream }
138 }
139 };
140 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
141 $(
142 paste::paste! {
143 pub struct [< PlanPhase$phase >];
144 impl PlanPhase for [< PlanPhase$phase >] {
145 type Convention = $convention;
146 }
147 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
148 }
149 )+
150 }
151}
152
153for_all_phase!();
154
155impl LogicalPlanRoot {
156 pub fn new_with_logical_plan(
157 plan: LogicalPlanRef,
158 required_dist: RequiredDist,
159 required_order: Order,
160 out_fields: FixedBitSet,
161 out_names: Vec<String>,
162 ) -> Self {
163 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
164 }
165}
166
167impl BatchPlanRoot {
168 pub fn new_with_batch_plan(
169 plan: BatchPlanRef,
170 required_dist: RequiredDist,
171 required_order: Order,
172 out_fields: FixedBitSet,
173 out_names: Vec<String>,
174 ) -> Self {
175 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
176 }
177}
178
179impl<P: PlanPhase> PlanRoot<P> {
180 fn new_inner(
181 plan: PlanRef<P::Convention>,
182 required_dist: RequiredDist,
183 required_order: Order,
184 out_fields: FixedBitSet,
185 out_names: Vec<String>,
186 ) -> Self {
187 let input_schema = plan.schema();
188 assert_eq!(input_schema.fields().len(), out_fields.len());
189 assert_eq!(out_fields.count_ones(..), out_names.len());
190
191 Self {
192 plan,
193 _phase: PhantomData,
194 required_dist,
195 required_order,
196 out_fields,
197 out_names,
198 }
199 }
200
201 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
202 PlanRoot {
203 plan,
204 _phase: PhantomData,
205 required_dist: self.required_dist,
206 required_order: self.required_order,
207 out_fields: self.out_fields,
208 out_names: self.out_names,
209 }
210 }
211
212 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
217 if out_names.len() != self.out_fields.count_ones(..) {
218 Err(ErrorCode::InvalidInputSyntax(
219 "number of column names does not match number of columns".to_owned(),
220 ))?
221 }
222 self.out_names = out_names;
223 Ok(())
224 }
225
226 pub fn schema(&self) -> Schema {
228 Schema {
231 fields: self
232 .out_fields
233 .ones()
234 .map(|i| self.plan.schema().fields()[i].clone())
235 .zip_eq_debug(&self.out_names)
236 .map(|(field, name)| Field {
237 name: name.clone(),
238 ..field
239 })
240 .collect(),
241 }
242 }
243}
244
245impl LogicalPlanRoot {
246 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
250 if self.out_fields.count_ones(..) == self.out_fields.len() {
251 return self.plan;
252 }
253 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
254 }
255
256 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
260 use generic::Agg;
261 use plan_node::PlanAggCall;
262 use risingwave_common::types::ListValue;
263 use risingwave_expr::aggregate::PbAggKind;
264
265 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
266 use crate::utils::{Condition, IndexSet};
267
268 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
269 bail!("subquery must return only one column");
270 };
271 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
272 let return_type = DataType::list(input_column_type.clone());
273 let agg = Agg::new(
274 vec![PlanAggCall {
275 agg_type: PbAggKind::ArrayAgg.into(),
276 return_type: return_type.clone(),
277 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
278 distinct: false,
279 order_by: self.required_order.column_orders,
280 filter: Condition::true_cond(),
281 direct_args: vec![],
282 }],
283 IndexSet::empty(),
284 self.plan,
285 );
286 Ok(LogicalProject::create(
287 agg.into(),
288 vec![
289 FunctionCall::new(
290 ExprType::Coalesce,
291 vec![
292 InputRef::new(0, return_type).into(),
293 ExprImpl::literal_list(
294 ListValue::empty(&input_column_type),
295 input_column_type,
296 ),
297 ],
298 )
299 .unwrap()
300 .into(),
301 ],
302 ))
303 }
304
305 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
307 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
308 Ok(self)
309 }
310
311 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
313 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
314 Ok(self.into_phase(plan))
315 }
316
317 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 self.gen_optimized_logical_plan_for_batch()?
319 .gen_batch_plan()
320 }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
326 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
327 return Err(ErrorCode::NotSupported(
328 "do not support temporal join for batch queries".to_owned(),
329 "please use temporal join in streaming queries".to_owned(),
330 )
331 .into());
332 }
333
334 let ctx = self.plan.ctx();
335 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
337
338 plan = const_eval_exprs(plan)?;
340
341 if ctx.is_explain_trace() {
342 ctx.trace("Const eval exprs:");
343 ctx.trace(plan.explain_to_string());
344 }
345
346 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
348 if ctx.is_explain_trace() {
349 ctx.trace("To Batch Plan:");
350 ctx.trace(plan.explain_to_string());
351 }
352
353 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
354 "Merge BatchProject",
355 vec![BatchProjectMergeRule::create()],
356 ApplyOrder::BottomUp,
357 ))?;
358
359 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
361
362 if ctx.is_explain_trace() {
363 ctx.trace("Inline Session Timezone:");
364 ctx.trace(plan.explain_to_string());
365 }
366
367 #[cfg(debug_assertions)]
368 InputRefValidator.validate(plan.clone());
369 assert_eq!(
370 *plan.distribution(),
371 Distribution::Single,
372 "{}",
373 plan.explain_to_string()
374 );
375 assert!(
376 !has_batch_exchange(plan.clone()),
377 "{}",
378 plan.explain_to_string()
379 );
380
381 let ctx = plan.ctx();
382 if ctx.is_explain_trace() {
383 ctx.trace("To Batch Physical Plan:");
384 ctx.trace(plan.explain_to_string());
385 }
386
387 Ok(self.into_phase(plan))
388 }
389
390 #[cfg(feature = "datafusion")]
391 pub fn gen_datafusion_logical_plan(
392 &self,
393 ) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
394 use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
395 use datafusion_common::Column;
396 use plan_visitor::LogicalPlanToDataFusionExt;
397
398 use crate::datafusion::{InputColumns, convert_column_order};
399
400 tracing::debug!(
401 "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
402 self.plan
403 );
404
405 let ctx = self.plan.ctx();
406 let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
408 plan = const_eval_exprs(plan)?;
409
410 let mut df_plan = plan.to_datafusion_logical_plan()?;
411
412 if !self.required_order.is_any() {
413 let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
414 let expr = self
415 .required_order
416 .column_orders
417 .iter()
418 .map(|column_order| convert_column_order(column_order, &input_columns))
419 .collect_vec();
420 df_plan = Arc::new(LogicalPlan::Sort(Sort {
421 expr,
422 input: df_plan,
423 fetch: None,
424 }));
425 }
426
427 if self.out_names.len() < df_plan.schema().fields().len() {
428 let df_schema = df_plan.schema().as_ref();
429 let projection_exprs = self
430 .out_fields
431 .ones()
432 .zip_eq_debug(self.out_names.iter())
433 .map(|(i, name)| {
434 DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
435 })
436 .collect_vec();
437 df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
438 projection_exprs,
439 df_plan,
440 )?));
441 }
442
443 tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
444
445 Ok(df_plan)
446 }
447}
448
449impl BatchPlanRoot {
450 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
452 self.required_dist = RequiredDist::single();
453 let mut plan = self.plan;
454
455 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
457
458 let ctx = plan.ctx();
459 if ctx.is_explain_trace() {
460 ctx.trace("To Batch Distributed Plan:");
461 ctx.trace(plan.explain_to_string());
462 }
463 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
464 plan =
465 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
466 }
467
468 if self.out_fields.count_ones(..) != self.out_fields.len() {
470 plan =
471 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
472 }
473
474 let plan = plan.optimize_by_rules(&OptimizationStage::new(
476 "Push Limit To Scan",
477 vec![BatchPushLimitToScanRule::create()],
478 ApplyOrder::BottomUp,
479 ))?;
480
481 Ok(plan)
482 }
483
484 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
486 let mut plan = self.plan;
487
488 plan = plan.to_local_with_order_required(&self.required_order)?;
490
491 let insert_exchange = match plan.distribution() {
494 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
495 _ => true,
496 };
497 if insert_exchange {
498 plan =
499 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
500 }
501
502 if self.out_fields.count_ones(..) != self.out_fields.len() {
504 plan =
505 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
506 }
507
508 let ctx = plan.ctx();
509 if ctx.is_explain_trace() {
510 ctx.trace("To Batch Local Plan:");
511 ctx.trace(plan.explain_to_string());
512 }
513
514 let plan = plan.optimize_by_rules(&OptimizationStage::new(
516 "Push Limit To Scan",
517 vec![BatchPushLimitToScanRule::create()],
518 ApplyOrder::BottomUp,
519 ))?;
520
521 Ok(plan)
522 }
523}
524
525impl LogicalPlanRoot {
526 fn gen_optimized_stream_plan(
528 self,
529 emit_on_window_close: bool,
530 allow_snapshot_backfill: bool,
531 ) -> Result<StreamOptimizedLogicalPlanRoot> {
532 let backfill_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
533 BackfillType::SnapshotBackfill
534 } else if self.should_use_arrangement_backfill() {
535 BackfillType::ArrangementBackfill
536 } else {
537 BackfillType::Backfill
538 };
539 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)
540 }
541
542 fn gen_optimized_stream_plan_inner(
543 self,
544 emit_on_window_close: bool,
545 backfill_type: BackfillType,
546 ) -> Result<StreamOptimizedLogicalPlanRoot> {
547 let ctx = self.plan.ctx();
548 let _explain_trace = ctx.is_explain_trace();
549
550 let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
551
552 let mut plan = optimized_plan
553 .plan
554 .clone()
555 .optimize_by_rules(&OptimizationStage::new(
556 "Merge StreamProject",
557 vec![StreamProjectMergeRule::create()],
558 ApplyOrder::BottomUp,
559 ))?;
560
561 if ctx
562 .session_ctx()
563 .config()
564 .streaming_separate_consecutive_join()
565 {
566 plan = plan.optimize_by_rules(&OptimizationStage::new(
567 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
568 vec![SeparateConsecutiveJoinRule::create()],
569 ApplyOrder::BottomUp,
570 ))?;
571 }
572
573 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
577 plan = plan.optimize_by_rules(&OptimizationStage::new(
578 "Add Logstore for Unaligned join",
579 vec![AddLogstoreRule::create()],
580 ApplyOrder::BottomUp,
581 ))?;
582 }
583
584 if ctx.session_ctx().config().streaming_enable_delta_join()
585 && ctx.session_ctx().config().enable_index_selection()
586 {
587 plan = plan.optimize_by_rules(&OptimizationStage::new(
590 "To IndexDeltaJoin",
591 vec![IndexDeltaJoinRule::create()],
592 ApplyOrder::BottomUp,
593 ))?;
594 }
595 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
597
598 if ctx.is_explain_trace() {
599 ctx.trace("Inline session timezone:");
600 ctx.trace(plan.explain_to_string());
601 }
602
603 plan = const_eval_exprs(plan)?;
605
606 if ctx.is_explain_trace() {
607 ctx.trace("Const eval exprs:");
608 ctx.trace(plan.explain_to_string());
609 }
610
611 #[cfg(debug_assertions)]
612 InputRefValidator.validate(plan.clone());
613
614 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
615 return Err(ErrorCode::NotSupported(
616 "exist dangling temporal scan".to_owned(),
617 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
618 ).into());
619 }
620
621 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
622 return Err(ErrorCode::NotSupported(
623 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
624 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
625 ).into());
626 }
627
628 if ctx.session_ctx().config().enable_locality_backfill()
629 && LocalityProviderCounter::count(plan.clone()) > 5
630 {
631 risingwave_common::license::Feature::LocalityBackfill.check_available()?
632 }
633
634 Ok(optimized_plan.into_phase(plan))
635 }
636
637 fn gen_stream_plan(
639 self,
640 emit_on_window_close: bool,
641 backfill_type: BackfillType,
642 ) -> Result<StreamOptimizedLogicalPlanRoot> {
643 let ctx = self.plan.ctx();
644 let explain_trace = ctx.is_explain_trace();
645
646 let plan = {
647 {
648 if !ctx
649 .session_ctx()
650 .config()
651 .streaming_allow_jsonb_in_stream_key()
652 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
653 {
654 return Err(ErrorCode::NotSupported(
655 err,
656 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
657 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
658 ).into());
659 }
660 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
661 let (plan, out_col_change) = {
662 let (plan, out_col_change) = optimized_plan
663 .plan
664 .logical_rewrite_for_stream(&mut Default::default())?;
665 if out_col_change.is_injective() {
666 (plan, out_col_change)
667 } else {
668 let mut output_indices = (0..plan.schema().len()).collect_vec();
669 #[allow(unused_assignments)]
670 let (mut map, mut target_size) = out_col_change.into_parts();
671
672 target_size = plan.schema().len();
675 let mut tar_exists = vec![false; target_size];
676 for i in map.iter_mut().flatten() {
677 if tar_exists[*i] {
678 output_indices.push(*i);
679 *i = target_size;
680 target_size += 1;
681 } else {
682 tar_exists[*i] = true;
683 }
684 }
685 let plan =
686 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
687 let out_col_change = ColIndexMapping::new(map, target_size);
688 (plan.into(), out_col_change)
689 }
690 };
691
692 if explain_trace {
693 ctx.trace("Logical Rewrite For Stream:");
694 ctx.trace(plan.explain_to_string());
695 }
696
697 optimized_plan.required_dist =
698 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
699 optimized_plan.required_order = out_col_change
700 .rewrite_required_order(&optimized_plan.required_order)
701 .unwrap();
702 optimized_plan.out_fields =
703 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
704 let mut plan = plan.to_stream_with_dist_required(
705 &optimized_plan.required_dist,
706 &mut ToStreamContext::new_with_backfill_type(
707 emit_on_window_close,
708 backfill_type,
709 ),
710 )?;
711 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
712 optimized_plan.into_phase(plan)
713 }
714 };
715
716 if explain_trace {
717 ctx.trace("To Stream Plan:");
718 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
720 }
721 Ok(plan)
722 }
723
724 fn compute_cardinality(&self) -> Cardinality {
728 CardinalityVisitor.visit(self.plan.clone())
729 }
730
731 pub fn gen_table_plan(
733 self,
734 context: OptimizerContextRef,
735 table_name: String,
736 database_id: DatabaseId,
737 schema_id: SchemaId,
738 CreateTableInfo {
739 columns,
740 pk_column_ids,
741 row_id_index,
742 watermark_descs,
743 source_catalog,
744 version,
745 }: CreateTableInfo,
746 CreateTableProps {
747 definition,
748 append_only,
749 on_conflict,
750 with_version_columns,
751 webhook_info,
752 engine,
753 }: CreateTableProps,
754 ) -> Result<StreamMaterialize> {
755 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
757
758 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
759
760 let pk_column_indices = {
761 let mut id_to_idx = HashMap::new();
762
763 columns.iter().enumerate().for_each(|(idx, c)| {
764 id_to_idx.insert(c.column_id(), idx);
765 });
766 pk_column_ids
767 .iter()
768 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
770 };
771
772 fn inject_project_for_generated_column_if_needed(
773 columns: &[ColumnCatalog],
774 node: StreamPlanRef,
775 ) -> Result<StreamPlanRef> {
776 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
777 if let Some(exprs) = exprs {
778 let logical_project = generic::Project::new(exprs, node);
779 return Ok(StreamProject::new(logical_project).into());
780 }
781 Ok(node)
782 }
783
784 #[derive(PartialEq, Debug, Copy, Clone)]
785 enum PrimaryKeyKind {
786 UserDefinedPrimaryKey,
787 NonAppendOnlyRowIdPk,
788 AppendOnlyRowIdPk,
789 }
790
791 fn inject_dml_node(
792 columns: &[ColumnCatalog],
793 append_only: bool,
794 stream_plan: StreamPlanRef,
795 pk_column_indices: &[usize],
796 kind: PrimaryKeyKind,
797 column_descs: Vec<ColumnDesc>,
798 ) -> Result<StreamPlanRef> {
799 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
800
801 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
803
804 dml_node = match kind {
805 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
806 RequiredDist::hash_shard(pk_column_indices)
807 .streaming_enforce_if_not_satisfies(dml_node)?
808 }
809 PrimaryKeyKind::AppendOnlyRowIdPk => {
810 StreamExchange::new_no_shuffle(dml_node).into()
811 }
812 };
813
814 Ok(dml_node)
815 }
816
817 let kind = if let Some(row_id_index) = row_id_index {
818 assert_eq!(
819 pk_column_indices.iter().exactly_one().copied().unwrap(),
820 row_id_index
821 );
822 if append_only {
823 PrimaryKeyKind::AppendOnlyRowIdPk
824 } else {
825 PrimaryKeyKind::NonAppendOnlyRowIdPk
826 }
827 } else {
828 PrimaryKeyKind::UserDefinedPrimaryKey
829 };
830
831 let column_descs: Vec<ColumnDesc> = columns
832 .iter()
833 .filter(|&c| c.can_dml())
834 .map(|c| c.column_desc.clone())
835 .collect();
836
837 let mut not_null_idxs = vec![];
838 for (idx, column) in column_descs.iter().enumerate() {
839 if !column.nullable {
840 not_null_idxs.push(idx);
841 }
842 }
843
844 let version_column_indices = if !with_version_columns.is_empty() {
845 find_version_column_indices(&columns, with_version_columns)?
846 } else {
847 vec![]
848 };
849
850 let with_external_source = source_catalog.is_some();
851 let (dml_source_node, external_source_node) = if with_external_source {
852 let dummy_source_node = LogicalSource::new(
853 None,
854 columns.clone(),
855 row_id_index,
856 SourceNodeKind::CreateTable,
857 context.clone(),
858 None,
859 )
860 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
861 let mut external_source_node = stream_plan.plan;
862 external_source_node =
863 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
864 external_source_node = match kind {
865 PrimaryKeyKind::UserDefinedPrimaryKey => {
866 RequiredDist::hash_shard(&pk_column_indices)
867 .streaming_enforce_if_not_satisfies(external_source_node)?
868 }
869
870 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
871 StreamExchange::new_no_shuffle(external_source_node).into()
872 }
873 };
874 (dummy_source_node, Some(external_source_node))
875 } else {
876 (stream_plan.plan, None)
877 };
878
879 let dml_node = inject_dml_node(
880 &columns,
881 append_only,
882 dml_source_node,
883 &pk_column_indices,
884 kind,
885 column_descs,
886 )?;
887
888 let dists = external_source_node
889 .iter()
890 .map(|input| input.distribution())
891 .chain([dml_node.distribution()])
892 .unique()
893 .collect_vec();
894
895 let dist = match &dists[..] {
896 &[Distribution::SomeShard, Distribution::HashShard(_)]
897 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
898 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
899 dist.clone()
900 }
901 _ => {
902 unreachable!()
903 }
904 };
905
906 let generated_column_exprs =
907 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
908 let upstream_sink_union = StreamUpstreamSinkUnion::new(
909 context.clone(),
910 dml_node.schema(),
911 dml_node.stream_key(),
912 dist.clone(), append_only,
914 row_id_index.is_none(),
915 generated_column_exprs,
916 );
917
918 let union_inputs = external_source_node
919 .into_iter()
920 .chain([dml_node, upstream_sink_union.into()])
921 .collect_vec();
922
923 let mut stream_plan = StreamUnion::new_with_dist(
924 Union {
925 all: true,
926 inputs: union_inputs,
927 source_col: None,
928 },
929 dist,
930 )
931 .into();
932
933 let ttl_watermark_indices = watermark_descs
934 .iter()
935 .filter(|d| d.with_ttl)
936 .map(|d| d.watermark_idx as usize)
937 .collect_vec();
938
939 if !watermark_descs.is_empty() {
941 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
942 }
943
944 if let Some(row_id_index) = row_id_index {
946 match kind {
947 PrimaryKeyKind::UserDefinedPrimaryKey => {
948 unreachable!()
949 }
950 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
951 stream_plan = StreamRowIdGen::new_with_dist(
952 stream_plan,
953 row_id_index,
954 Distribution::HashShard(vec![row_id_index]),
955 )
956 .into();
957 }
958 }
959 }
960
961 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
962
963 if let ConflictBehavior::IgnoreConflict = conflict_behavior
964 && !version_column_indices.is_empty()
965 {
966 Err(ErrorCode::InvalidParameterValue(
967 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
968 ))?
969 }
970
971 let retention_seconds = context.with_options().retention_seconds();
972
973 let table_required_dist = {
974 let mut bitset = FixedBitSet::with_capacity(columns.len());
975 for idx in &pk_column_indices {
976 bitset.insert(*idx);
977 }
978 RequiredDist::ShardByKey(bitset)
979 };
980
981 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
982
983 if !not_null_idxs.is_empty() {
984 stream_plan =
985 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
986 }
987
988 let refreshable = source_catalog
990 .as_ref()
991 .map(|catalog| {
992 catalog.with_properties.is_batch_connector() || {
993 matches!(
994 catalog
995 .refresh_mode
996 .as_ref()
997 .map(|refresh_mode| refresh_mode.refresh_mode),
998 Some(Some(RefreshMode::FullReload(_)))
999 )
1000 }
1001 })
1002 .unwrap_or(false);
1003
1004 if refreshable && row_id_index.is_some() {
1006 return Err(crate::error::ErrorCode::BindError(
1007 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1008 .to_owned(),
1009 )
1010 .into());
1011 }
1012
1013 StreamMaterialize::create_for_table(
1014 stream_plan,
1015 table_name,
1016 database_id,
1017 schema_id,
1018 table_required_dist,
1019 Order::any(),
1020 columns,
1021 definition,
1022 conflict_behavior,
1023 version_column_indices,
1024 pk_column_indices,
1025 ttl_watermark_indices,
1026 row_id_index,
1027 version,
1028 retention_seconds,
1029 webhook_info,
1030 engine,
1031 refreshable,
1032 )
1033 }
1034
1035 pub fn gen_materialize_plan(
1037 self,
1038 database_id: DatabaseId,
1039 schema_id: SchemaId,
1040 mv_name: String,
1041 definition: String,
1042 emit_on_window_close: bool,
1043 ) -> Result<StreamMaterialize> {
1044 let cardinality = self.compute_cardinality();
1045 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1046 StreamMaterialize::create(
1047 stream_plan,
1048 mv_name,
1049 database_id,
1050 schema_id,
1051 definition,
1052 TableType::MaterializedView,
1053 cardinality,
1054 None,
1055 )
1056 }
1057
1058 pub fn gen_index_plan(
1060 self,
1061 index_name: String,
1062 database_id: DatabaseId,
1063 schema_id: SchemaId,
1064 definition: String,
1065 retention_seconds: Option<NonZeroU32>,
1066 ) -> Result<StreamMaterialize> {
1067 let cardinality = self.compute_cardinality();
1068 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1069
1070 StreamMaterialize::create(
1071 stream_plan,
1072 index_name,
1073 database_id,
1074 schema_id,
1075 definition,
1076 TableType::Index,
1077 cardinality,
1078 retention_seconds,
1079 )
1080 }
1081
1082 pub fn gen_vector_index_plan(
1083 self,
1084 index_name: String,
1085 database_id: DatabaseId,
1086 schema_id: SchemaId,
1087 definition: String,
1088 retention_seconds: Option<NonZeroU32>,
1089 vector_index_info: PbVectorIndexInfo,
1090 ) -> Result<StreamVectorIndexWrite> {
1091 let cardinality = self.compute_cardinality();
1092 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1093
1094 StreamVectorIndexWrite::create(
1095 stream_plan,
1096 index_name,
1097 database_id,
1098 schema_id,
1099 definition,
1100 cardinality,
1101 retention_seconds,
1102 vector_index_info,
1103 )
1104 }
1105
1106 #[expect(clippy::too_many_arguments)]
1108 pub fn gen_sink_plan(
1109 self,
1110 sink_name: String,
1111 definition: String,
1112 properties: WithOptionsSecResolved,
1113 emit_on_window_close: bool,
1114 db_name: String,
1115 sink_from_table_name: String,
1116 format_desc: Option<SinkFormatDesc>,
1117 without_backfill: bool,
1118 target_table: Option<Arc<TableCatalog>>,
1119 partition_info: Option<PartitionComputeInfo>,
1120 user_specified_columns: bool,
1121 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1122 allow_snapshot_backfill: bool,
1123 ) -> Result<StreamSink> {
1124 let backfill_type = if without_backfill {
1125 BackfillType::UpstreamOnly
1126 } else if allow_snapshot_backfill
1127 && self.should_use_snapshot_backfill()
1128 && {
1129 if auto_refresh_schema_from_table.is_some() {
1130 self.plan.ctx().session_ctx().notice_to_user("Auto schema change only support for ArrangementBackfill. Switched to use ArrangementBackfill");
1131 false
1132 } else {
1133 true
1134 }
1135 }
1136 {
1137 assert!(
1138 target_table.is_none(),
1139 "should not allow snapshot backfill for sink-into-table"
1140 );
1141 BackfillType::SnapshotBackfill
1143 } else if self.should_use_arrangement_backfill() {
1144 BackfillType::ArrangementBackfill
1145 } else {
1146 BackfillType::Backfill
1147 };
1148 if auto_refresh_schema_from_table.is_some()
1149 && backfill_type != BackfillType::ArrangementBackfill
1150 {
1151 return Err(ErrorCode::InvalidInputSyntax(format!(
1152 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1153 backfill_type
1154 ))
1155 .into());
1156 }
1157 let stream_plan =
1158 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)?;
1159 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1160 let columns = t.columns_without_rw_timestamp();
1161 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1162 });
1163
1164 StreamSink::create(
1165 stream_plan,
1166 sink_name,
1167 db_name,
1168 sink_from_table_name,
1169 target_table,
1170 target_columns_to_plan_mapping,
1171 definition,
1172 properties,
1173 format_desc,
1174 partition_info,
1175 auto_refresh_schema_from_table,
1176 )
1177 }
1178
1179 pub fn should_use_arrangement_backfill(&self) -> bool {
1180 let ctx = self.plan.ctx();
1181 let session_ctx = ctx.session_ctx();
1182 let arrangement_backfill_enabled = session_ctx
1183 .env()
1184 .streaming_config()
1185 .developer
1186 .enable_arrangement_backfill;
1187 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1188 }
1189
1190 pub fn should_use_snapshot_backfill(&self) -> bool {
1191 let ctx = self.plan.ctx();
1192 let session_ctx = ctx.session_ctx();
1193 let use_snapshot_backfill = session_ctx
1194 .env()
1195 .streaming_config()
1196 .developer
1197 .enable_snapshot_backfill
1198 && session_ctx.config().streaming_use_snapshot_backfill();
1199 if use_snapshot_backfill {
1200 if let Some(warning_msg) = self.plan.forbid_snapshot_backfill() {
1201 self.plan.ctx().session_ctx().notice_to_user(warning_msg);
1202 false
1203 } else {
1204 true
1205 }
1206 } else {
1207 false
1208 }
1209 }
1210}
1211
1212impl<P: PlanPhase> PlanRoot<P> {
1213 pub fn target_columns_to_plan_mapping(
1215 &self,
1216 tar_cols: &[ColumnCatalog],
1217 user_specified_columns: bool,
1218 ) -> Vec<Option<usize>> {
1219 #[allow(clippy::disallowed_methods)]
1220 let visible_cols: Vec<(usize, String)> = self
1221 .out_fields
1222 .ones()
1223 .zip_eq(self.out_names.iter().cloned())
1224 .collect_vec();
1225
1226 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1227 let visible_col_idxes_by_name = visible_cols
1228 .iter()
1229 .map(|(i, name)| (name.as_ref(), *i))
1230 .collect::<BTreeMap<_, _>>();
1231
1232 tar_cols
1233 .iter()
1234 .enumerate()
1235 .filter(|(_, tar_col)| tar_col.can_dml())
1236 .map(|(tar_i, tar_col)| {
1237 if user_specified_columns {
1238 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1239 } else {
1240 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1241 }
1242 })
1243 .collect()
1244 }
1245}
1246
1247fn find_version_column_indices(
1248 column_catalog: &Vec<ColumnCatalog>,
1249 version_column_names: Vec<String>,
1250) -> Result<Vec<usize>> {
1251 let mut indices = Vec::new();
1252 for version_column_name in version_column_names {
1253 let mut found = false;
1254 for (index, column) in column_catalog.iter().enumerate() {
1255 if column.column_desc.name == version_column_name {
1256 if let &DataType::Jsonb
1257 | &DataType::List(_)
1258 | &DataType::Struct(_)
1259 | &DataType::Bytea
1260 | &DataType::Boolean = column.data_type()
1261 {
1262 return Err(ErrorCode::InvalidInputSyntax(format!(
1263 "Version column {} must be of a comparable data type",
1264 version_column_name
1265 ))
1266 .into());
1267 }
1268 indices.push(index);
1269 found = true;
1270 break;
1271 }
1272 }
1273 if !found {
1274 return Err(ErrorCode::InvalidInputSyntax(format!(
1275 "Version column {} not found",
1276 version_column_name
1277 ))
1278 .into());
1279 }
1280 }
1281 Ok(indices)
1282}
1283
1284fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1285 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1286
1287 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1288 if let Some(error) = const_eval_rewriter.error {
1289 return Err(error);
1290 }
1291 Ok(plan)
1292}
1293
1294fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1295 ctx: OptimizerContextRef,
1296 plan: PlanRef<C>,
1297) -> Result<PlanRef<C>> {
1298 let mut v = TimestamptzExprFinder::default();
1299 plan.visit_exprs_recursive(&mut v);
1300 if v.has() {
1301 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1302 } else {
1303 Ok(plan)
1304 }
1305}
1306
1307fn exist_and_no_exchange_before(
1308 plan: &BatchPlanRef,
1309 is_candidate: fn(&BatchPlanRef) -> bool,
1310) -> bool {
1311 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1312 return false;
1313 }
1314 is_candidate(plan)
1315 || plan
1316 .inputs()
1317 .iter()
1318 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1319}
1320
1321impl BatchPlanRef {
1322 fn is_user_table_scan(&self) -> bool {
1323 self.node_type() == BatchPlanNodeType::BatchSeqScan
1324 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1325 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1326 }
1327
1328 fn is_source_scan(&self) -> bool {
1329 self.node_type() == BatchPlanNodeType::BatchSource
1330 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1331 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1332 }
1333
1334 fn is_insert(&self) -> bool {
1335 self.node_type() == BatchPlanNodeType::BatchInsert
1336 }
1337
1338 fn is_update(&self) -> bool {
1339 self.node_type() == BatchPlanNodeType::BatchUpdate
1340 }
1341
1342 fn is_delete(&self) -> bool {
1343 self.node_type() == BatchPlanNodeType::BatchDelete
1344 }
1345}
1346
1347fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1353 assert_eq!(plan.distribution(), &Distribution::Single);
1354 exist_and_no_exchange_before(&plan, |plan| {
1355 plan.is_user_table_scan()
1356 || plan.is_source_scan()
1357 || plan.is_insert()
1358 || plan.is_update()
1359 || plan.is_delete()
1360 })
1361}
1362
1363fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1366 assert_eq!(plan.distribution(), &Distribution::Single);
1367 exist_and_no_exchange_before(&plan, |plan| {
1368 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1369 })
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use super::*;
1375 use crate::optimizer::plan_node::LogicalValues;
1376
1377 #[tokio::test]
1378 async fn test_as_subplan() {
1379 let ctx = OptimizerContext::mock().await;
1380 let values = LogicalValues::new(
1381 vec![],
1382 Schema::new(vec![
1383 Field::with_name(DataType::Int32, "v1"),
1384 Field::with_name(DataType::Varchar, "v2"),
1385 ]),
1386 ctx,
1387 )
1388 .into();
1389 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1390 let out_names = vec!["v1".into()];
1391 let root = PlanRoot::new_with_logical_plan(
1392 values,
1393 RequiredDist::Any,
1394 Order::any(),
1395 out_fields,
1396 out_names,
1397 );
1398 let subplan = root.into_unordered_subplan();
1399 assert_eq!(
1400 subplan.schema(),
1401 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1402 );
1403 }
1404}