1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DataFusionExecuteCheckerExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72 ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87 BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88 PlanTreeNode, Stream, StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion,
89 StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
96
97#[derive(Educe)]
108#[educe(Debug, Clone)]
109pub struct PlanRoot<P: PlanPhase> {
110 pub plan: PlanRef<P::Convention>,
112 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
114 _phase: PhantomData<P>,
115 required_dist: RequiredDist,
116 required_order: Order,
117 out_fields: FixedBitSet,
118 out_names: Vec<String>,
119}
120
121pub trait PlanPhase {
127 type Convention: ConventionMarker;
128}
129
130macro_rules! for_all_phase {
131 () => {
132 for_all_phase! {
133 { Logical, $crate::optimizer::plan_node::Logical },
134 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
135 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
136 { Batch, $crate::optimizer::plan_node::Batch },
137 { Stream, $crate::optimizer::plan_node::Stream }
138 }
139 };
140 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
141 $(
142 paste::paste! {
143 pub struct [< PlanPhase$phase >];
144 impl PlanPhase for [< PlanPhase$phase >] {
145 type Convention = $convention;
146 }
147 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
148 }
149 )+
150 }
151}
152
153for_all_phase!();
154
155impl LogicalPlanRoot {
156 pub fn new_with_logical_plan(
157 plan: LogicalPlanRef,
158 required_dist: RequiredDist,
159 required_order: Order,
160 out_fields: FixedBitSet,
161 out_names: Vec<String>,
162 ) -> Self {
163 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
164 }
165}
166
167impl BatchPlanRoot {
168 pub fn new_with_batch_plan(
169 plan: BatchPlanRef,
170 required_dist: RequiredDist,
171 required_order: Order,
172 out_fields: FixedBitSet,
173 out_names: Vec<String>,
174 ) -> Self {
175 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
176 }
177}
178
179impl<P: PlanPhase> PlanRoot<P> {
180 fn new_inner(
181 plan: PlanRef<P::Convention>,
182 required_dist: RequiredDist,
183 required_order: Order,
184 out_fields: FixedBitSet,
185 out_names: Vec<String>,
186 ) -> Self {
187 let input_schema = plan.schema();
188 assert_eq!(input_schema.fields().len(), out_fields.len());
189 assert_eq!(out_fields.count_ones(..), out_names.len());
190
191 Self {
192 plan,
193 _phase: PhantomData,
194 required_dist,
195 required_order,
196 out_fields,
197 out_names,
198 }
199 }
200
201 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
202 PlanRoot {
203 plan,
204 _phase: PhantomData,
205 required_dist: self.required_dist,
206 required_order: self.required_order,
207 out_fields: self.out_fields,
208 out_names: self.out_names,
209 }
210 }
211
212 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
217 if out_names.len() != self.out_fields.count_ones(..) {
218 Err(ErrorCode::InvalidInputSyntax(
219 "number of column names does not match number of columns".to_owned(),
220 ))?
221 }
222 self.out_names = out_names;
223 Ok(())
224 }
225
226 pub fn schema(&self) -> Schema {
228 Schema {
231 fields: self
232 .out_fields
233 .ones()
234 .map(|i| self.plan.schema().fields()[i].clone())
235 .zip_eq_debug(&self.out_names)
236 .map(|(field, name)| Field {
237 name: name.clone(),
238 ..field
239 })
240 .collect(),
241 }
242 }
243}
244
245impl LogicalPlanRoot {
246 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
250 if self.out_fields.count_ones(..) == self.out_fields.len() {
251 return self.plan;
252 }
253 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
254 }
255
256 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
260 use generic::Agg;
261 use plan_node::PlanAggCall;
262 use risingwave_common::types::ListValue;
263 use risingwave_expr::aggregate::PbAggKind;
264
265 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
266 use crate::utils::{Condition, IndexSet};
267
268 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
269 bail!("subquery must return only one column");
270 };
271 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
272 let return_type = DataType::list(input_column_type.clone());
273 let agg = Agg::new(
274 vec![PlanAggCall {
275 agg_type: PbAggKind::ArrayAgg.into(),
276 return_type: return_type.clone(),
277 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
278 distinct: false,
279 order_by: self.required_order.column_orders,
280 filter: Condition::true_cond(),
281 direct_args: vec![],
282 }],
283 IndexSet::empty(),
284 self.plan,
285 );
286 Ok(LogicalProject::create(
287 agg.into(),
288 vec![
289 FunctionCall::new(
290 ExprType::Coalesce,
291 vec![
292 InputRef::new(0, return_type).into(),
293 ExprImpl::literal_list(
294 ListValue::empty(&input_column_type),
295 input_column_type,
296 ),
297 ],
298 )
299 .unwrap()
300 .into(),
301 ],
302 ))
303 }
304
305 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
307 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
308 Ok(self)
309 }
310
311 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
313 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
314 Ok(self.into_phase(plan))
315 }
316
317 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 self.gen_optimized_logical_plan_for_batch()?
319 .gen_batch_plan()
320 }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
326 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
327 return Err(ErrorCode::NotSupported(
328 "do not support temporal join for batch queries".to_owned(),
329 "please use temporal join in streaming queries".to_owned(),
330 )
331 .into());
332 }
333
334 let ctx = self.plan.ctx();
335 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
337
338 plan = const_eval_exprs(plan)?;
340
341 if ctx.is_explain_trace() {
342 ctx.trace("Const eval exprs:");
343 ctx.trace(plan.explain_to_string());
344 }
345
346 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
348 if ctx.is_explain_trace() {
349 ctx.trace("To Batch Plan:");
350 ctx.trace(plan.explain_to_string());
351 }
352
353 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
354 "Merge BatchProject",
355 vec![BatchProjectMergeRule::create()],
356 ApplyOrder::BottomUp,
357 ))?;
358
359 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
361
362 if ctx.is_explain_trace() {
363 ctx.trace("Inline Session Timezone:");
364 ctx.trace(plan.explain_to_string());
365 }
366
367 #[cfg(debug_assertions)]
368 InputRefValidator.validate(plan.clone());
369 assert_eq!(
370 *plan.distribution(),
371 Distribution::Single,
372 "{}",
373 plan.explain_to_string()
374 );
375 assert!(
376 !has_batch_exchange(plan.clone()),
377 "{}",
378 plan.explain_to_string()
379 );
380
381 let ctx = plan.ctx();
382 if ctx.is_explain_trace() {
383 ctx.trace("To Batch Physical Plan:");
384 ctx.trace(plan.explain_to_string());
385 }
386
387 Ok(self.into_phase(plan))
388 }
389
390 #[cfg(feature = "datafusion")]
391 pub fn gen_datafusion_logical_plan(
392 &self,
393 ) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
394 use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
395 use datafusion_common::Column;
396 use plan_visitor::LogicalPlanToDataFusionExt;
397
398 use crate::datafusion::{InputColumns, convert_column_order};
399
400 tracing::debug!(
401 "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
402 self.plan
403 );
404
405 let ctx = self.plan.ctx();
406 let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
408 plan = const_eval_exprs(plan)?;
409
410 let mut df_plan = plan.to_datafusion_logical_plan()?;
411
412 if !self.required_order.is_any() {
413 let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
414 let expr = self
415 .required_order
416 .column_orders
417 .iter()
418 .map(|column_order| convert_column_order(column_order, &input_columns))
419 .collect_vec();
420 df_plan = Arc::new(LogicalPlan::Sort(Sort {
421 expr,
422 input: df_plan,
423 fetch: None,
424 }));
425 }
426
427 if self.out_names.len() < df_plan.schema().fields().len() {
428 let df_schema = df_plan.schema().as_ref();
429 let projection_exprs = self
430 .out_fields
431 .ones()
432 .zip_eq_debug(self.out_names.iter())
433 .map(|(i, name)| {
434 DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
435 })
436 .collect_vec();
437 df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
438 projection_exprs,
439 df_plan,
440 )?));
441 }
442
443 tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
444
445 Ok(df_plan)
446 }
447}
448
449impl BatchPlanRoot {
450 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
452 self.required_dist = RequiredDist::single();
453 let mut plan = self.plan;
454
455 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
457
458 let ctx = plan.ctx();
459 if ctx.is_explain_trace() {
460 ctx.trace("To Batch Distributed Plan:");
461 ctx.trace(plan.explain_to_string());
462 }
463 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
464 plan =
465 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
466 }
467
468 if self.out_fields.count_ones(..) != self.out_fields.len() {
470 plan =
471 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
472 }
473
474 let plan = plan.optimize_by_rules(&OptimizationStage::new(
476 "Push Limit To Scan",
477 vec![BatchPushLimitToScanRule::create()],
478 ApplyOrder::BottomUp,
479 ))?;
480
481 Ok(plan)
482 }
483
484 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
486 let mut plan = self.plan;
487
488 plan = plan.to_local_with_order_required(&self.required_order)?;
490
491 let insert_exchange = match plan.distribution() {
494 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
495 _ => true,
496 };
497 if insert_exchange {
498 plan =
499 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
500 }
501
502 if self.out_fields.count_ones(..) != self.out_fields.len() {
504 plan =
505 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
506 }
507
508 let ctx = plan.ctx();
509 if ctx.is_explain_trace() {
510 ctx.trace("To Batch Local Plan:");
511 ctx.trace(plan.explain_to_string());
512 }
513
514 let plan = plan.optimize_by_rules(&OptimizationStage::new(
516 "Push Limit To Scan",
517 vec![BatchPushLimitToScanRule::create()],
518 ApplyOrder::BottomUp,
519 ))?;
520
521 Ok(plan)
522 }
523}
524
525impl LogicalPlanRoot {
526 fn gen_optimized_stream_plan(
528 self,
529 emit_on_window_close: bool,
530 allow_snapshot_backfill: bool,
531 ) -> Result<StreamOptimizedLogicalPlanRoot> {
532 let backfill_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
533 BackfillType::SnapshotBackfill
534 } else if self.should_use_arrangement_backfill() {
535 BackfillType::ArrangementBackfill
536 } else {
537 BackfillType::Backfill
538 };
539 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)
540 }
541
542 fn gen_optimized_stream_plan_inner(
543 self,
544 emit_on_window_close: bool,
545 backfill_type: BackfillType,
546 ) -> Result<StreamOptimizedLogicalPlanRoot> {
547 let ctx = self.plan.ctx();
548 let _explain_trace = ctx.is_explain_trace();
549
550 let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
551
552 let mut plan = optimized_plan
553 .plan
554 .clone()
555 .optimize_by_rules(&OptimizationStage::new(
556 "Merge StreamProject",
557 vec![StreamProjectMergeRule::create()],
558 ApplyOrder::BottomUp,
559 ))?;
560
561 if ctx
562 .session_ctx()
563 .config()
564 .streaming_separate_consecutive_join()
565 {
566 plan = plan.optimize_by_rules(&OptimizationStage::new(
567 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
568 vec![SeparateConsecutiveJoinRule::create()],
569 ApplyOrder::BottomUp,
570 ))?;
571 }
572
573 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
577 plan = plan.optimize_by_rules(&OptimizationStage::new(
578 "Add Logstore for Unaligned join",
579 vec![AddLogstoreRule::create()],
580 ApplyOrder::BottomUp,
581 ))?;
582 }
583
584 if ctx.session_ctx().config().streaming_enable_delta_join()
585 && ctx.session_ctx().config().enable_index_selection()
586 {
587 plan = plan.optimize_by_rules(&OptimizationStage::new(
590 "To IndexDeltaJoin",
591 vec![IndexDeltaJoinRule::create()],
592 ApplyOrder::BottomUp,
593 ))?;
594 }
595 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
597
598 if ctx.is_explain_trace() {
599 ctx.trace("Inline session timezone:");
600 ctx.trace(plan.explain_to_string());
601 }
602
603 plan = const_eval_exprs(plan)?;
605
606 if ctx.is_explain_trace() {
607 ctx.trace("Const eval exprs:");
608 ctx.trace(plan.explain_to_string());
609 }
610
611 #[cfg(debug_assertions)]
612 InputRefValidator.validate(plan.clone());
613
614 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
615 return Err(ErrorCode::NotSupported(
616 "exist dangling temporal scan".to_owned(),
617 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
618 ).into());
619 }
620
621 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
622 return Err(ErrorCode::NotSupported(
623 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
624 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
625 ).into());
626 }
627
628 if LocalityProviderCounter::count(plan.clone()) > 5 {
629 assert!(ctx.session_ctx().config().enable_locality_backfill());
631 risingwave_common::license::Feature::LocalityBackfill.check_available()?;
632 }
633
634 if ctx.missed_locality_providers() > 1
635 && risingwave_common::license::Feature::LocalityBackfill
636 .check_available()
637 .is_ok()
638 {
639 assert!(!ctx.session_ctx().config().enable_locality_backfill());
641 ctx.warn_to_user(format!(
642 "This streaming job has {} operators that could benefit from locality backfill. \
643 Consider enabling it with `SET enable_locality_backfill = true` for potentially \
644 faster backfill performance, when existing data volume in upstream(s) is large.",
645 ctx.missed_locality_providers()
646 ));
647 }
648
649 Ok(optimized_plan.into_phase(plan))
650 }
651
652 fn gen_stream_plan(
654 self,
655 emit_on_window_close: bool,
656 backfill_type: BackfillType,
657 ) -> Result<StreamOptimizedLogicalPlanRoot> {
658 let ctx = self.plan.ctx();
659 let explain_trace = ctx.is_explain_trace();
660
661 let plan = {
662 {
663 if !ctx
664 .session_ctx()
665 .config()
666 .streaming_allow_jsonb_in_stream_key()
667 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
668 {
669 return Err(ErrorCode::NotSupported(
670 err,
671 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
672 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
673 ).into());
674 }
675 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
676 let (plan, out_col_change) = {
677 let (plan, out_col_change) = optimized_plan
678 .plan
679 .logical_rewrite_for_stream(&mut Default::default())?;
680 if out_col_change.is_injective() {
681 (plan, out_col_change)
682 } else {
683 let mut output_indices = (0..plan.schema().len()).collect_vec();
684 #[allow(unused_assignments)]
685 let (mut map, mut target_size) = out_col_change.into_parts();
686
687 target_size = plan.schema().len();
690 let mut tar_exists = vec![false; target_size];
691 for i in map.iter_mut().flatten() {
692 if tar_exists[*i] {
693 output_indices.push(*i);
694 *i = target_size;
695 target_size += 1;
696 } else {
697 tar_exists[*i] = true;
698 }
699 }
700 let plan =
701 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
702 let out_col_change = ColIndexMapping::new(map, target_size);
703 (plan.into(), out_col_change)
704 }
705 };
706
707 if explain_trace {
708 ctx.trace("Logical Rewrite For Stream:");
709 ctx.trace(plan.explain_to_string());
710 }
711
712 optimized_plan.required_dist =
713 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
714 optimized_plan.required_order = out_col_change
715 .rewrite_required_order(&optimized_plan.required_order)
716 .unwrap();
717 optimized_plan.out_fields =
718 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
719 let mut plan = plan.to_stream_with_dist_required(
720 &optimized_plan.required_dist,
721 &mut ToStreamContext::new_with_backfill_type(
722 emit_on_window_close,
723 backfill_type,
724 ),
725 )?;
726 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
727 optimized_plan.into_phase(plan)
728 }
729 };
730
731 if explain_trace {
732 ctx.trace("To Stream Plan:");
733 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
735 }
736 Ok(plan)
737 }
738
739 fn compute_cardinality(&self) -> Cardinality {
743 CardinalityVisitor.visit(self.plan.clone())
744 }
745
746 pub fn gen_table_plan(
748 self,
749 context: OptimizerContextRef,
750 table_name: String,
751 database_id: DatabaseId,
752 schema_id: SchemaId,
753 CreateTableInfo {
754 columns,
755 pk_column_ids,
756 row_id_index,
757 watermark_descs,
758 source_catalog,
759 version,
760 }: CreateTableInfo,
761 CreateTableProps {
762 definition,
763 append_only,
764 on_conflict,
765 with_version_columns,
766 webhook_info,
767 engine,
768 }: CreateTableProps,
769 ) -> Result<StreamMaterialize> {
770 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
772
773 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
774
775 let pk_column_indices = {
776 let mut id_to_idx = HashMap::new();
777
778 columns.iter().enumerate().for_each(|(idx, c)| {
779 id_to_idx.insert(c.column_id(), idx);
780 });
781 pk_column_ids
782 .iter()
783 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
785 };
786
787 fn inject_project_for_generated_column_if_needed(
788 columns: &[ColumnCatalog],
789 node: StreamPlanRef,
790 ) -> Result<StreamPlanRef> {
791 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
792 if let Some(exprs) = exprs {
793 let logical_project = generic::Project::new(exprs, node);
794 return Ok(StreamProject::new(logical_project).into());
795 }
796 Ok(node)
797 }
798
799 #[derive(PartialEq, Debug, Copy, Clone)]
800 enum PrimaryKeyKind {
801 UserDefinedPrimaryKey,
802 NonAppendOnlyRowIdPk,
803 AppendOnlyRowIdPk,
804 }
805
806 fn inject_dml_node(
807 columns: &[ColumnCatalog],
808 append_only: bool,
809 stream_plan: StreamPlanRef,
810 pk_column_indices: &[usize],
811 kind: PrimaryKeyKind,
812 column_descs: Vec<ColumnDesc>,
813 ) -> Result<StreamPlanRef> {
814 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
815
816 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
818
819 dml_node = match kind {
820 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
821 RequiredDist::hash_shard(pk_column_indices)
822 .streaming_enforce_if_not_satisfies(dml_node)?
823 }
824 PrimaryKeyKind::AppendOnlyRowIdPk => {
825 StreamExchange::new_no_shuffle(dml_node).into()
826 }
827 };
828
829 Ok(dml_node)
830 }
831
832 let kind = if let Some(row_id_index) = row_id_index {
833 assert_eq!(
834 pk_column_indices.iter().exactly_one().copied().unwrap(),
835 row_id_index
836 );
837 if append_only {
838 PrimaryKeyKind::AppendOnlyRowIdPk
839 } else {
840 PrimaryKeyKind::NonAppendOnlyRowIdPk
841 }
842 } else {
843 PrimaryKeyKind::UserDefinedPrimaryKey
844 };
845
846 let column_descs: Vec<ColumnDesc> = columns
847 .iter()
848 .filter(|&c| c.can_dml())
849 .map(|c| c.column_desc.clone())
850 .collect();
851
852 let mut not_null_idxs = vec![];
853 for (idx, column) in column_descs.iter().enumerate() {
854 if !column.nullable {
855 not_null_idxs.push(idx);
856 }
857 }
858
859 let version_column_indices = if !with_version_columns.is_empty() {
860 find_version_column_indices(&columns, with_version_columns)?
861 } else {
862 vec![]
863 };
864
865 let with_external_source = source_catalog.is_some();
866 let (dml_source_node, external_source_node) = if with_external_source {
867 let dummy_source_node = LogicalSource::new(
868 None,
869 columns.clone(),
870 row_id_index,
871 SourceNodeKind::CreateTable,
872 context.clone(),
873 None,
874 )
875 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
876 let mut external_source_node = stream_plan.plan;
877 external_source_node =
878 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
879 external_source_node = match kind {
880 PrimaryKeyKind::UserDefinedPrimaryKey => {
881 RequiredDist::hash_shard(&pk_column_indices)
882 .streaming_enforce_if_not_satisfies(external_source_node)?
883 }
884
885 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
886 StreamExchange::new_no_shuffle(external_source_node).into()
887 }
888 };
889 (dummy_source_node, Some(external_source_node))
890 } else {
891 (stream_plan.plan, None)
892 };
893
894 let dml_node = inject_dml_node(
895 &columns,
896 append_only,
897 dml_source_node,
898 &pk_column_indices,
899 kind,
900 column_descs,
901 )?;
902
903 let dists = external_source_node
904 .iter()
905 .map(|input| input.distribution())
906 .chain([dml_node.distribution()])
907 .unique()
908 .collect_vec();
909
910 let dist = match &dists[..] {
911 &[Distribution::SomeShard, Distribution::HashShard(_)]
912 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
913 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
914 dist.clone()
915 }
916 _ => {
917 unreachable!()
918 }
919 };
920
921 let generated_column_exprs =
922 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
923 let upstream_sink_union = StreamUpstreamSinkUnion::new(
924 context.clone(),
925 dml_node.schema(),
926 dml_node.stream_key(),
927 dist.clone(), append_only,
929 row_id_index.is_none(),
930 generated_column_exprs,
931 );
932
933 let union_inputs = external_source_node
934 .into_iter()
935 .chain([dml_node, upstream_sink_union.into()])
936 .collect_vec();
937
938 let mut stream_plan = StreamUnion::new_with_dist(
939 Union {
940 all: true,
941 inputs: union_inputs,
942 source_col: None,
943 },
944 dist,
945 )
946 .into();
947
948 let ttl_watermark_indices = watermark_descs
949 .iter()
950 .filter(|d| d.with_ttl)
951 .map(|d| d.watermark_idx as usize)
952 .collect_vec();
953
954 if !watermark_descs.is_empty() {
956 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
957 }
958
959 if let Some(row_id_index) = row_id_index {
961 match kind {
962 PrimaryKeyKind::UserDefinedPrimaryKey => {
963 unreachable!()
964 }
965 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
966 stream_plan = StreamRowIdGen::new_with_dist(
967 stream_plan,
968 row_id_index,
969 Distribution::HashShard(vec![row_id_index]),
970 )
971 .into();
972 }
973 }
974 }
975
976 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
977
978 if let ConflictBehavior::IgnoreConflict = conflict_behavior
979 && !version_column_indices.is_empty()
980 {
981 Err(ErrorCode::InvalidParameterValue(
982 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
983 ))?
984 }
985
986 let retention_seconds = context.with_options().retention_seconds();
987
988 let table_required_dist = {
989 let mut bitset = FixedBitSet::with_capacity(columns.len());
990 for idx in &pk_column_indices {
991 bitset.insert(*idx);
992 }
993 RequiredDist::ShardByKey(bitset)
994 };
995
996 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
997
998 if !not_null_idxs.is_empty() {
999 stream_plan =
1000 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
1001 }
1002
1003 let refreshable = source_catalog
1005 .as_ref()
1006 .map(|catalog| {
1007 catalog.with_properties.is_batch_connector() || {
1008 matches!(
1009 catalog
1010 .refresh_mode
1011 .as_ref()
1012 .map(|refresh_mode| refresh_mode.refresh_mode),
1013 Some(Some(RefreshMode::FullReload(_)))
1014 )
1015 }
1016 })
1017 .unwrap_or(false);
1018
1019 if refreshable && row_id_index.is_some() {
1021 return Err(crate::error::ErrorCode::BindError(
1022 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1023 .to_owned(),
1024 )
1025 .into());
1026 }
1027
1028 StreamMaterialize::create_for_table(
1029 stream_plan,
1030 table_name,
1031 database_id,
1032 schema_id,
1033 table_required_dist,
1034 Order::any(),
1035 columns,
1036 definition,
1037 conflict_behavior,
1038 version_column_indices,
1039 pk_column_indices,
1040 ttl_watermark_indices,
1041 row_id_index,
1042 version,
1043 retention_seconds,
1044 webhook_info,
1045 engine,
1046 refreshable,
1047 )
1048 }
1049
1050 pub fn gen_materialize_plan(
1052 self,
1053 database_id: DatabaseId,
1054 schema_id: SchemaId,
1055 mv_name: String,
1056 definition: String,
1057 emit_on_window_close: bool,
1058 ) -> Result<StreamMaterialize> {
1059 let cardinality = self.compute_cardinality();
1060 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1061 StreamMaterialize::create(
1062 stream_plan,
1063 mv_name,
1064 database_id,
1065 schema_id,
1066 definition,
1067 TableType::MaterializedView,
1068 cardinality,
1069 None,
1070 )
1071 }
1072
1073 pub fn gen_index_plan(
1075 self,
1076 index_name: String,
1077 database_id: DatabaseId,
1078 schema_id: SchemaId,
1079 definition: String,
1080 retention_seconds: Option<NonZeroU32>,
1081 ) -> Result<StreamMaterialize> {
1082 let cardinality = self.compute_cardinality();
1083 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1084
1085 StreamMaterialize::create(
1086 stream_plan,
1087 index_name,
1088 database_id,
1089 schema_id,
1090 definition,
1091 TableType::Index,
1092 cardinality,
1093 retention_seconds,
1094 )
1095 }
1096
1097 pub fn gen_vector_index_plan(
1098 self,
1099 index_name: String,
1100 database_id: DatabaseId,
1101 schema_id: SchemaId,
1102 definition: String,
1103 retention_seconds: Option<NonZeroU32>,
1104 vector_index_info: PbVectorIndexInfo,
1105 ) -> Result<StreamVectorIndexWrite> {
1106 let cardinality = self.compute_cardinality();
1107 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1108
1109 StreamVectorIndexWrite::create(
1110 stream_plan,
1111 index_name,
1112 database_id,
1113 schema_id,
1114 definition,
1115 cardinality,
1116 retention_seconds,
1117 vector_index_info,
1118 )
1119 }
1120
1121 #[expect(clippy::too_many_arguments)]
1123 pub fn gen_sink_plan(
1124 self,
1125 sink_name: String,
1126 definition: String,
1127 properties: WithOptionsSecResolved,
1128 emit_on_window_close: bool,
1129 db_name: String,
1130 sink_from_table_name: String,
1131 format_desc: Option<SinkFormatDesc>,
1132 without_backfill: bool,
1133 target_table: Option<Arc<TableCatalog>>,
1134 partition_info: Option<PartitionComputeInfo>,
1135 user_specified_columns: bool,
1136 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1137 allow_snapshot_backfill: bool,
1138 ) -> Result<StreamSink> {
1139 let backfill_type = if without_backfill {
1140 BackfillType::UpstreamOnly
1141 } else if allow_snapshot_backfill
1142 && self.should_use_snapshot_backfill()
1143 && {
1144 if auto_refresh_schema_from_table.is_some() {
1145 self.plan.ctx().session_ctx().notice_to_user("Auto schema change only support for ArrangementBackfill. Switched to use ArrangementBackfill");
1146 false
1147 } else {
1148 true
1149 }
1150 }
1151 {
1152 assert!(
1153 target_table.is_none(),
1154 "should not allow snapshot backfill for sink-into-table"
1155 );
1156 BackfillType::SnapshotBackfill
1158 } else if self.should_use_arrangement_backfill() {
1159 BackfillType::ArrangementBackfill
1160 } else {
1161 BackfillType::Backfill
1162 };
1163 if auto_refresh_schema_from_table.is_some()
1164 && backfill_type != BackfillType::ArrangementBackfill
1165 {
1166 return Err(ErrorCode::InvalidInputSyntax(format!(
1167 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1168 backfill_type
1169 ))
1170 .into());
1171 }
1172 let stream_plan =
1173 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)?;
1174 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1175 let columns = t.columns_without_rw_timestamp();
1176 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1177 });
1178
1179 StreamSink::create(
1180 stream_plan,
1181 sink_name,
1182 db_name,
1183 sink_from_table_name,
1184 target_table,
1185 target_columns_to_plan_mapping,
1186 definition,
1187 properties,
1188 format_desc,
1189 partition_info,
1190 auto_refresh_schema_from_table,
1191 )
1192 }
1193
1194 pub fn should_use_arrangement_backfill(&self) -> bool {
1195 let ctx = self.plan.ctx();
1196 let session_ctx = ctx.session_ctx();
1197 let arrangement_backfill_enabled = session_ctx
1198 .env()
1199 .streaming_config()
1200 .developer
1201 .enable_arrangement_backfill;
1202 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1203 }
1204
1205 pub fn should_use_snapshot_backfill(&self) -> bool {
1206 let ctx = self.plan.ctx();
1207 let session_ctx = ctx.session_ctx();
1208 let use_snapshot_backfill = session_ctx
1209 .env()
1210 .streaming_config()
1211 .developer
1212 .enable_snapshot_backfill
1213 && session_ctx.config().streaming_use_snapshot_backfill();
1214 if use_snapshot_backfill {
1215 if let Some(warning_msg) = self.plan.forbid_snapshot_backfill() {
1216 self.plan.ctx().session_ctx().notice_to_user(warning_msg);
1217 false
1218 } else {
1219 true
1220 }
1221 } else {
1222 false
1223 }
1224 }
1225}
1226
1227impl<P: PlanPhase> PlanRoot<P> {
1228 pub fn target_columns_to_plan_mapping(
1230 &self,
1231 tar_cols: &[ColumnCatalog],
1232 user_specified_columns: bool,
1233 ) -> Vec<Option<usize>> {
1234 #[allow(clippy::disallowed_methods)]
1235 let visible_cols: Vec<(usize, String)> = self
1236 .out_fields
1237 .ones()
1238 .zip_eq(self.out_names.iter().cloned())
1239 .collect_vec();
1240
1241 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1242 let visible_col_idxes_by_name = visible_cols
1243 .iter()
1244 .map(|(i, name)| (name.as_ref(), *i))
1245 .collect::<BTreeMap<_, _>>();
1246
1247 tar_cols
1248 .iter()
1249 .enumerate()
1250 .filter(|(_, tar_col)| tar_col.can_dml())
1251 .map(|(tar_i, tar_col)| {
1252 if user_specified_columns {
1253 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1254 } else {
1255 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1256 }
1257 })
1258 .collect()
1259 }
1260}
1261
1262fn find_version_column_indices(
1263 column_catalog: &Vec<ColumnCatalog>,
1264 version_column_names: Vec<String>,
1265) -> Result<Vec<usize>> {
1266 let mut indices = Vec::new();
1267 for version_column_name in version_column_names {
1268 let mut found = false;
1269 for (index, column) in column_catalog.iter().enumerate() {
1270 if column.column_desc.name == version_column_name {
1271 if let &DataType::Jsonb
1272 | &DataType::List(_)
1273 | &DataType::Struct(_)
1274 | &DataType::Bytea
1275 | &DataType::Boolean = column.data_type()
1276 {
1277 return Err(ErrorCode::InvalidInputSyntax(format!(
1278 "Version column {} must be of a comparable data type",
1279 version_column_name
1280 ))
1281 .into());
1282 }
1283 indices.push(index);
1284 found = true;
1285 break;
1286 }
1287 }
1288 if !found {
1289 return Err(ErrorCode::InvalidInputSyntax(format!(
1290 "Version column {} not found",
1291 version_column_name
1292 ))
1293 .into());
1294 }
1295 }
1296 Ok(indices)
1297}
1298
1299fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1300 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1301
1302 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1303 if let Some(error) = const_eval_rewriter.error {
1304 return Err(error);
1305 }
1306 Ok(plan)
1307}
1308
1309fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1310 ctx: OptimizerContextRef,
1311 plan: PlanRef<C>,
1312) -> Result<PlanRef<C>> {
1313 let mut v = TimestamptzExprFinder::default();
1314 plan.visit_exprs_recursive(&mut v);
1315 if v.has() {
1316 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1317 } else {
1318 Ok(plan)
1319 }
1320}
1321
1322fn exist_and_no_exchange_before(
1323 plan: &BatchPlanRef,
1324 is_candidate: fn(&BatchPlanRef) -> bool,
1325) -> bool {
1326 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1327 return false;
1328 }
1329 is_candidate(plan)
1330 || plan
1331 .inputs()
1332 .iter()
1333 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1334}
1335
1336impl BatchPlanRef {
1337 fn is_user_table_scan(&self) -> bool {
1338 self.node_type() == BatchPlanNodeType::BatchSeqScan
1339 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1340 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1341 }
1342
1343 fn is_source_scan(&self) -> bool {
1344 self.node_type() == BatchPlanNodeType::BatchSource
1345 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1346 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1347 }
1348
1349 fn is_insert(&self) -> bool {
1350 self.node_type() == BatchPlanNodeType::BatchInsert
1351 }
1352
1353 fn is_update(&self) -> bool {
1354 self.node_type() == BatchPlanNodeType::BatchUpdate
1355 }
1356
1357 fn is_delete(&self) -> bool {
1358 self.node_type() == BatchPlanNodeType::BatchDelete
1359 }
1360}
1361
1362fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1368 assert_eq!(plan.distribution(), &Distribution::Single);
1369 exist_and_no_exchange_before(&plan, |plan| {
1370 plan.is_user_table_scan()
1371 || plan.is_source_scan()
1372 || plan.is_insert()
1373 || plan.is_update()
1374 || plan.is_delete()
1375 })
1376}
1377
1378fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1381 assert_eq!(plan.distribution(), &Distribution::Single);
1382 exist_and_no_exchange_before(&plan, |plan| {
1383 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1384 })
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389 use super::*;
1390 use crate::optimizer::plan_node::LogicalValues;
1391
1392 #[tokio::test]
1393 async fn test_as_subplan() {
1394 let ctx = OptimizerContext::mock().await;
1395 let values = LogicalValues::new(
1396 vec![],
1397 Schema::new(vec![
1398 Field::with_name(DataType::Int32, "v1"),
1399 Field::with_name(DataType::Varchar, "v2"),
1400 ]),
1401 ctx,
1402 )
1403 .into();
1404 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1405 let out_names = vec!["v1".into()];
1406 let root = PlanRoot::new_with_logical_plan(
1407 values,
1408 RequiredDist::Any,
1409 Order::any(),
1410 out_fields,
1411 out_names,
1412 );
1413 let subplan = root.into_unordered_subplan();
1414 assert_eq!(
1415 subplan.schema(),
1416 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1417 );
1418 }
1419}