1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DataFusionExecuteCheckerExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72 ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87 BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88 PlanTreeNode, RewriteStreamContext, Stream, StreamExchange, StreamPlanRef, StreamUnion,
89 StreamUpstreamSinkUnion, StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{
96 ColIndexMappingRewriteExt, MV_REFRESH_INTERVAL_SEC_KEY, WithOptionsSecResolved,
97};
98
99#[derive(Educe)]
110#[educe(Debug, Clone)]
111pub struct PlanRoot<P: PlanPhase> {
112 pub plan: PlanRef<P::Convention>,
114 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
116 _phase: PhantomData<P>,
117 required_dist: RequiredDist,
118 required_order: Order,
119 out_fields: FixedBitSet,
120 out_names: Vec<String>,
121}
122
123pub trait PlanPhase {
129 type Convention: ConventionMarker;
130}
131
132macro_rules! for_all_phase {
133 () => {
134 for_all_phase! {
135 { Logical, $crate::optimizer::plan_node::Logical },
136 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
137 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
138 { Batch, $crate::optimizer::plan_node::Batch },
139 { Stream, $crate::optimizer::plan_node::Stream }
140 }
141 };
142 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
143 $(
144 paste::paste! {
145 pub struct [< PlanPhase$phase >];
146 impl PlanPhase for [< PlanPhase$phase >] {
147 type Convention = $convention;
148 }
149 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
150 }
151 )+
152 }
153}
154
155for_all_phase!();
156
157impl LogicalPlanRoot {
158 pub fn new_with_logical_plan(
159 plan: LogicalPlanRef,
160 required_dist: RequiredDist,
161 required_order: Order,
162 out_fields: FixedBitSet,
163 out_names: Vec<String>,
164 ) -> Self {
165 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
166 }
167}
168
169impl BatchPlanRoot {
170 pub fn new_with_batch_plan(
171 plan: BatchPlanRef,
172 required_dist: RequiredDist,
173 required_order: Order,
174 out_fields: FixedBitSet,
175 out_names: Vec<String>,
176 ) -> Self {
177 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
178 }
179}
180
181impl<P: PlanPhase> PlanRoot<P> {
182 fn new_inner(
183 plan: PlanRef<P::Convention>,
184 required_dist: RequiredDist,
185 required_order: Order,
186 out_fields: FixedBitSet,
187 out_names: Vec<String>,
188 ) -> Self {
189 let input_schema = plan.schema();
190 assert_eq!(input_schema.fields().len(), out_fields.len());
191 assert_eq!(out_fields.count_ones(..), out_names.len());
192
193 Self {
194 plan,
195 _phase: PhantomData,
196 required_dist,
197 required_order,
198 out_fields,
199 out_names,
200 }
201 }
202
203 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
204 PlanRoot {
205 plan,
206 _phase: PhantomData,
207 required_dist: self.required_dist,
208 required_order: self.required_order,
209 out_fields: self.out_fields,
210 out_names: self.out_names,
211 }
212 }
213
214 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
219 if out_names.len() != self.out_fields.count_ones(..) {
220 Err(ErrorCode::InvalidInputSyntax(
221 "number of column names does not match number of columns".to_owned(),
222 ))?
223 }
224 self.out_names = out_names;
225 Ok(())
226 }
227
228 pub fn schema(&self) -> Schema {
230 Schema {
233 fields: self
234 .out_fields
235 .ones()
236 .map(|i| self.plan.schema().fields()[i].clone())
237 .zip_eq_debug(&self.out_names)
238 .map(|(field, name)| Field {
239 name: name.clone(),
240 ..field
241 })
242 .collect(),
243 }
244 }
245}
246
247impl LogicalPlanRoot {
248 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
252 if self.out_fields.count_ones(..) == self.out_fields.len() {
253 return self.plan;
254 }
255 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
256 }
257
258 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
262 use generic::Agg;
263 use plan_node::PlanAggCall;
264 use risingwave_common::types::ListValue;
265 use risingwave_expr::aggregate::PbAggKind;
266
267 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
268 use crate::utils::{Condition, IndexSet};
269
270 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
271 bail!("subquery must return only one column");
272 };
273 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
274 let return_type = DataType::list(input_column_type.clone());
275 let agg = Agg::new(
276 vec![PlanAggCall {
277 agg_type: PbAggKind::ArrayAgg.into(),
278 return_type: return_type.clone(),
279 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
280 distinct: false,
281 order_by: self.required_order.column_orders,
282 filter: Condition::true_cond(),
283 direct_args: vec![],
284 }],
285 IndexSet::empty(),
286 self.plan,
287 );
288 Ok(LogicalProject::create(
289 agg.into(),
290 vec![
291 FunctionCall::new(
292 ExprType::Coalesce,
293 vec![
294 InputRef::new(0, return_type).into(),
295 ExprImpl::literal_list(
296 ListValue::empty(&input_column_type),
297 input_column_type,
298 ),
299 ],
300 )
301 .unwrap()
302 .into(),
303 ],
304 ))
305 }
306
307 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
309 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
310 Ok(self)
311 }
312
313 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
315 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
316 Ok(self.into_phase(plan))
317 }
318
319 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
320 self.gen_optimized_logical_plan_for_batch()?
321 .gen_batch_plan()
322 }
323}
324
325impl BatchOptimizedLogicalPlanRoot {
326 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
328 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
329 return Err(ErrorCode::NotSupported(
330 "do not support temporal join for batch queries".to_owned(),
331 "please use temporal join in streaming queries".to_owned(),
332 )
333 .into());
334 }
335
336 let ctx = self.plan.ctx();
337 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
339
340 plan = const_eval_exprs(plan)?;
342
343 if ctx.is_explain_trace() {
344 ctx.trace("Const eval exprs:");
345 ctx.trace(plan.explain_to_string());
346 }
347
348 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
350 if ctx.is_explain_trace() {
351 ctx.trace("To Batch Plan:");
352 ctx.trace(plan.explain_to_string());
353 }
354
355 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
356 "Merge BatchProject",
357 vec![BatchProjectMergeRule::create()],
358 ApplyOrder::BottomUp,
359 ))?;
360
361 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
363
364 if ctx.is_explain_trace() {
365 ctx.trace("Inline Session Timezone:");
366 ctx.trace(plan.explain_to_string());
367 }
368
369 #[cfg(debug_assertions)]
370 InputRefValidator.validate(plan.clone());
371 assert_eq!(
372 *plan.distribution(),
373 Distribution::Single,
374 "{}",
375 plan.explain_to_string()
376 );
377 assert!(
378 !has_batch_exchange(plan.clone()),
379 "{}",
380 plan.explain_to_string()
381 );
382
383 let ctx = plan.ctx();
384 if ctx.is_explain_trace() {
385 ctx.trace("To Batch Physical Plan:");
386 ctx.trace(plan.explain_to_string());
387 }
388
389 Ok(self.into_phase(plan))
390 }
391
392 #[cfg(feature = "datafusion")]
393 pub fn gen_datafusion_logical_plan(
394 &self,
395 ) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
396 use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
397 use datafusion_common::Column;
398 use plan_visitor::LogicalPlanToDataFusionExt;
399
400 use crate::datafusion::{InputColumns, convert_column_order};
401
402 tracing::debug!(
403 "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
404 self.plan
405 );
406
407 let ctx = self.plan.ctx();
408 let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
410 plan = const_eval_exprs(plan)?;
411
412 let mut df_plan = plan.to_datafusion_logical_plan()?;
413
414 if !self.required_order.is_any() {
415 let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
416 let expr = self
417 .required_order
418 .column_orders
419 .iter()
420 .map(|column_order| convert_column_order(column_order, &input_columns))
421 .collect_vec();
422 df_plan = Arc::new(LogicalPlan::Sort(Sort {
423 expr,
424 input: df_plan,
425 fetch: None,
426 }));
427 }
428
429 if self.out_names.len() < df_plan.schema().fields().len() {
430 let df_schema = df_plan.schema().as_ref();
431 let projection_exprs = self
432 .out_fields
433 .ones()
434 .zip_eq_debug(self.out_names.iter())
435 .map(|(i, name)| {
436 DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
437 })
438 .collect_vec();
439 df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
440 projection_exprs,
441 df_plan,
442 )?));
443 }
444
445 tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
446
447 Ok(df_plan)
448 }
449}
450
451impl BatchPlanRoot {
452 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
454 self.required_dist = RequiredDist::single();
455 let mut plan = self.plan;
456
457 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
459
460 let ctx = plan.ctx();
461 if ctx.is_explain_trace() {
462 ctx.trace("To Batch Distributed Plan:");
463 ctx.trace(plan.explain_to_string());
464 }
465 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
466 plan =
467 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
468 }
469
470 if self.out_fields.count_ones(..) != self.out_fields.len() {
472 plan =
473 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
474 }
475
476 let plan = plan.optimize_by_rules(&OptimizationStage::new(
478 "Push Limit To Scan",
479 vec![BatchPushLimitToScanRule::create()],
480 ApplyOrder::BottomUp,
481 ))?;
482
483 Ok(plan)
484 }
485
486 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
488 let mut plan = self.plan;
489
490 plan = plan.to_local_with_order_required(&self.required_order)?;
492
493 let insert_exchange = match plan.distribution() {
496 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
497 _ => true,
498 };
499 if insert_exchange {
500 plan =
501 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
502 }
503
504 if self.out_fields.count_ones(..) != self.out_fields.len() {
506 plan =
507 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
508 }
509
510 let ctx = plan.ctx();
511 if ctx.is_explain_trace() {
512 ctx.trace("To Batch Local Plan:");
513 ctx.trace(plan.explain_to_string());
514 }
515
516 let plan = plan.optimize_by_rules(&OptimizationStage::new(
518 "Push Limit To Scan",
519 vec![BatchPushLimitToScanRule::create()],
520 ApplyOrder::BottomUp,
521 ))?;
522
523 Ok(plan)
524 }
525}
526
527impl LogicalPlanRoot {
528 pub(crate) fn derive_backfill_type(&self, allow_snapshot_backfill: bool) -> BackfillType {
530 if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
531 BackfillType::SnapshotBackfill
532 } else if self.should_use_arrangement_backfill() {
533 BackfillType::ArrangementBackfill
534 } else {
535 BackfillType::Backfill
536 }
537 }
538
539 fn gen_optimized_stream_plan(
540 self,
541 emit_on_window_close: bool,
542 backfill_type: BackfillType,
543 ) -> Result<StreamOptimizedLogicalPlanRoot> {
544 let ctx = self.plan.ctx();
545 let _explain_trace = ctx.is_explain_trace();
546
547 let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
548
549 let mut plan = optimized_plan
550 .plan
551 .clone()
552 .optimize_by_rules(&OptimizationStage::new(
553 "Merge StreamProject",
554 vec![StreamProjectMergeRule::create()],
555 ApplyOrder::BottomUp,
556 ))?;
557
558 if ctx
559 .session_ctx()
560 .config()
561 .streaming_separate_consecutive_join()
562 {
563 plan = plan.optimize_by_rules(&OptimizationStage::new(
564 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
565 vec![SeparateConsecutiveJoinRule::create()],
566 ApplyOrder::BottomUp,
567 ))?;
568 }
569
570 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
574 plan = plan.optimize_by_rules(&OptimizationStage::new(
575 "Add Logstore for Unaligned join",
576 vec![AddLogstoreRule::create()],
577 ApplyOrder::BottomUp,
578 ))?;
579 }
580
581 if ctx.session_ctx().config().streaming_enable_delta_join()
582 && ctx.session_ctx().config().enable_index_selection()
583 {
584 plan = plan.optimize_by_rules(&OptimizationStage::new(
587 "To IndexDeltaJoin",
588 vec![IndexDeltaJoinRule::create()],
589 ApplyOrder::BottomUp,
590 ))?;
591 }
592 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
594
595 if ctx.is_explain_trace() {
596 ctx.trace("Inline session timezone:");
597 ctx.trace(plan.explain_to_string());
598 }
599
600 plan = const_eval_exprs(plan)?;
602
603 if ctx.is_explain_trace() {
604 ctx.trace("Const eval exprs:");
605 ctx.trace(plan.explain_to_string());
606 }
607
608 #[cfg(debug_assertions)]
609 InputRefValidator.validate(plan.clone());
610
611 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
612 return Err(ErrorCode::NotSupported(
613 "exist dangling temporal scan".to_owned(),
614 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
615 ).into());
616 }
617
618 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
619 return Err(ErrorCode::NotSupported(
620 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
621 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
622 ).into());
623 }
624
625 if LocalityProviderCounter::count(plan.clone()) > 5 {
626 assert!(ctx.session_ctx().config().enable_locality_backfill());
628 risingwave_common::license::Feature::LocalityBackfill.check_available()?;
629 }
630
631 if ctx.missed_locality_providers() > 1
632 && risingwave_common::license::Feature::LocalityBackfill
633 .check_available()
634 .is_ok()
635 {
636 assert!(!ctx.session_ctx().config().enable_locality_backfill());
638 ctx.warn_to_user(format!(
639 "This streaming job has {} operators that could benefit from locality backfill. \
640 Consider enabling it with `SET enable_locality_backfill = true` for potentially \
641 faster backfill performance, when existing data volume in upstream(s) is large.",
642 ctx.missed_locality_providers()
643 ));
644 }
645
646 Ok(optimized_plan.into_phase(plan))
647 }
648
649 pub(crate) fn require_snapshot_backfill_for_batch_refresh(&self) -> Result<()> {
650 let ctx = self.plan.ctx();
651 let session_ctx = ctx.session_ctx();
652 let snapshot_backfill_enabled = session_ctx
653 .env()
654 .streaming_config()
655 .developer
656 .enable_snapshot_backfill
657 && session_ctx.config().streaming_use_snapshot_backfill();
658 if !snapshot_backfill_enabled {
659 return Err(ErrorCode::NotSupported(
660 "Batch refresh materialized view requires snapshot backfill".to_owned(),
661 format!(
662 "Please enable snapshot backfill or remove `{}` from the WITH clause.",
663 MV_REFRESH_INTERVAL_SEC_KEY
664 ),
665 )
666 .into());
667 }
668 if let Some(reason) = self.plan.forbid_snapshot_backfill() {
669 return Err(ErrorCode::NotSupported(
670 format!("Batch refresh materialized view requires snapshot backfill, but {reason}"),
671 "Please rewrite the query to avoid operators that forbid snapshot backfill."
672 .to_owned(),
673 )
674 .into());
675 }
676 Ok(())
677 }
678
679 fn gen_stream_plan(
681 self,
682 emit_on_window_close: bool,
683 backfill_type: BackfillType,
684 ) -> Result<StreamOptimizedLogicalPlanRoot> {
685 let ctx = self.plan.ctx();
686 let explain_trace = ctx.is_explain_trace();
687
688 let plan = {
689 {
690 if !ctx
691 .session_ctx()
692 .config()
693 .streaming_allow_jsonb_in_stream_key()
694 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
695 {
696 return Err(ErrorCode::NotSupported(
697 err,
698 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
699 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
700 ).into());
701 }
702 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
703 let (plan, out_col_change) = {
704 let (plan, out_col_change) = optimized_plan.plan.logical_rewrite_for_stream(
705 &mut RewriteStreamContext::new_with_backfill_type(backfill_type),
706 )?;
707 if out_col_change.is_injective() {
708 (plan, out_col_change)
709 } else {
710 let mut output_indices = (0..plan.schema().len()).collect_vec();
711 #[expect(unused_assignments)]
712 let (mut map, mut target_size) = out_col_change.into_parts();
713
714 target_size = plan.schema().len();
717 let mut tar_exists = vec![false; target_size];
718 for i in map.iter_mut().flatten() {
719 if tar_exists[*i] {
720 output_indices.push(*i);
721 *i = target_size;
722 target_size += 1;
723 } else {
724 tar_exists[*i] = true;
725 }
726 }
727 let plan =
728 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
729 let out_col_change = ColIndexMapping::new(map, target_size);
730 (plan.into(), out_col_change)
731 }
732 };
733 if explain_trace {
734 ctx.trace("Logical Rewrite For Stream:");
735 ctx.trace(plan.explain_to_string());
736 }
737
738 optimized_plan.required_dist =
739 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
740 optimized_plan.required_order = out_col_change
741 .rewrite_required_order(&optimized_plan.required_order)
742 .unwrap();
743 optimized_plan.out_fields =
744 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
745 let mut plan = plan.to_stream_with_dist_required(
746 &optimized_plan.required_dist,
747 &mut ToStreamContext::new_with_backfill_type(
748 emit_on_window_close,
749 backfill_type,
750 ),
751 )?;
752 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
753 optimized_plan.into_phase(plan)
754 }
755 };
756
757 if explain_trace {
758 ctx.trace("To Stream Plan:");
759 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
761 }
762 Ok(plan)
763 }
764
765 fn compute_cardinality(&self) -> Cardinality {
769 CardinalityVisitor.visit(self.plan.clone())
770 }
771
772 pub fn gen_table_plan(
774 self,
775 context: OptimizerContextRef,
776 table_name: String,
777 database_id: DatabaseId,
778 schema_id: SchemaId,
779 CreateTableInfo {
780 columns,
781 pk_column_ids,
782 row_id_index,
783 watermark_descs,
784 source_catalog,
785 version,
786 }: CreateTableInfo,
787 CreateTableProps {
788 definition,
789 append_only,
790 on_conflict,
791 with_version_columns,
792 webhook_info,
793 engine,
794 }: CreateTableProps,
795 ) -> Result<StreamMaterialize> {
796 let backfill_type = self.derive_backfill_type(false);
797 let stream_plan = self.gen_optimized_stream_plan(false, backfill_type)?;
799
800 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
801
802 let pk_column_indices = {
803 let mut id_to_idx = HashMap::new();
804
805 columns.iter().enumerate().for_each(|(idx, c)| {
806 id_to_idx.insert(c.column_id(), idx);
807 });
808 pk_column_ids
809 .iter()
810 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
812 };
813
814 fn inject_project_for_generated_column_if_needed(
815 columns: &[ColumnCatalog],
816 node: StreamPlanRef,
817 ) -> Result<StreamPlanRef> {
818 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
819 if let Some(exprs) = exprs {
820 let logical_project = generic::Project::new(exprs, node);
821 return Ok(StreamProject::new(logical_project).into());
822 }
823 Ok(node)
824 }
825
826 #[derive(PartialEq, Debug, Copy, Clone)]
827 enum PrimaryKeyKind {
828 UserDefinedPrimaryKey,
829 NonAppendOnlyRowIdPk,
830 AppendOnlyRowIdPk,
831 }
832
833 fn inject_dml_node(
834 columns: &[ColumnCatalog],
835 append_only: bool,
836 stream_plan: StreamPlanRef,
837 pk_column_indices: &[usize],
838 kind: PrimaryKeyKind,
839 column_descs: Vec<ColumnDesc>,
840 ) -> Result<StreamPlanRef> {
841 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
842
843 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
845
846 dml_node = match kind {
847 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
848 RequiredDist::hash_shard(pk_column_indices)
849 .streaming_enforce_if_not_satisfies(dml_node)?
850 }
851 PrimaryKeyKind::AppendOnlyRowIdPk => {
852 StreamExchange::new_no_shuffle(dml_node).into()
853 }
854 };
855
856 Ok(dml_node)
857 }
858
859 let kind = if let Some(row_id_index) = row_id_index {
860 assert_eq!(
861 pk_column_indices.iter().exactly_one().copied().unwrap(),
862 row_id_index
863 );
864 if append_only {
865 PrimaryKeyKind::AppendOnlyRowIdPk
866 } else {
867 PrimaryKeyKind::NonAppendOnlyRowIdPk
868 }
869 } else {
870 PrimaryKeyKind::UserDefinedPrimaryKey
871 };
872
873 let column_descs: Vec<ColumnDesc> = columns
874 .iter()
875 .filter(|&c| c.can_dml())
876 .map(|c| c.column_desc.clone())
877 .collect();
878
879 let mut not_null_idxs = vec![];
880 for (idx, column) in column_descs.iter().enumerate() {
881 if !column.nullable {
882 not_null_idxs.push(idx);
883 }
884 }
885
886 let version_column_indices = if !with_version_columns.is_empty() {
887 find_version_column_indices(&columns, with_version_columns)?
888 } else {
889 vec![]
890 };
891
892 let with_external_source = source_catalog.is_some();
893 let (dml_source_node, external_source_node) = if with_external_source {
894 let dummy_source_node = LogicalSource::new(
895 None,
896 columns.clone(),
897 row_id_index,
898 SourceNodeKind::CreateTable,
899 context.clone(),
900 None,
901 )
902 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
903 let mut external_source_node = stream_plan.plan;
904 external_source_node =
905 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
906 external_source_node = match kind {
907 PrimaryKeyKind::UserDefinedPrimaryKey => {
908 RequiredDist::hash_shard(&pk_column_indices)
909 .streaming_enforce_if_not_satisfies(external_source_node)?
910 }
911
912 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
913 StreamExchange::new_no_shuffle(external_source_node).into()
914 }
915 };
916 (dummy_source_node, Some(external_source_node))
917 } else {
918 (stream_plan.plan, None)
919 };
920
921 let dml_node = inject_dml_node(
922 &columns,
923 append_only,
924 dml_source_node,
925 &pk_column_indices,
926 kind,
927 column_descs,
928 )?;
929
930 let dists = external_source_node
931 .iter()
932 .map(|input| input.distribution())
933 .chain([dml_node.distribution()])
934 .unique()
935 .collect_vec();
936
937 let dist = match &dists[..] {
938 &[Distribution::SomeShard, Distribution::HashShard(_)]
939 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
940 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
941 dist.clone()
942 }
943 _ => {
944 unreachable!()
945 }
946 };
947
948 let generated_column_exprs =
949 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
950 let upstream_sink_union = StreamUpstreamSinkUnion::new(
951 context.clone(),
952 dml_node.schema(),
953 dml_node.stream_key(),
954 dist.clone(), append_only,
956 row_id_index.is_none(),
957 generated_column_exprs,
958 );
959
960 let union_inputs = external_source_node
961 .into_iter()
962 .chain([dml_node, upstream_sink_union.into()])
963 .collect_vec();
964
965 let mut stream_plan = StreamUnion::new_with_dist(
966 Union {
967 all: true,
968 inputs: union_inputs,
969 source_col: None,
970 },
971 dist,
972 )
973 .into();
974
975 let ttl_watermark_indices = watermark_descs
976 .iter()
977 .filter(|d| d.with_ttl)
978 .map(|d| d.watermark_idx as usize)
979 .collect_vec();
980
981 if !watermark_descs.is_empty() {
983 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
984 }
985
986 if let Some(row_id_index) = row_id_index {
988 match kind {
989 PrimaryKeyKind::UserDefinedPrimaryKey => {
990 unreachable!()
991 }
992 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
993 stream_plan = StreamRowIdGen::new_with_dist(
994 stream_plan,
995 row_id_index,
996 Distribution::HashShard(vec![row_id_index]),
997 )
998 .into();
999 }
1000 }
1001 }
1002
1003 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
1004
1005 if let ConflictBehavior::IgnoreConflict = conflict_behavior
1006 && !version_column_indices.is_empty()
1007 {
1008 Err(ErrorCode::InvalidParameterValue(
1009 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
1010 ))?
1011 }
1012
1013 let retention_seconds = context.with_options().retention_seconds();
1014
1015 let table_required_dist = {
1016 let mut bitset = FixedBitSet::with_capacity(columns.len());
1017 for idx in &pk_column_indices {
1018 bitset.insert(*idx);
1019 }
1020 RequiredDist::ShardByKey(bitset)
1021 };
1022
1023 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
1024
1025 if !not_null_idxs.is_empty() {
1026 stream_plan =
1027 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
1028 }
1029
1030 let refreshable = source_catalog
1032 .as_ref()
1033 .map(|catalog| {
1034 catalog.with_properties.is_batch_connector() || {
1035 matches!(
1036 catalog
1037 .refresh_mode
1038 .as_ref()
1039 .map(|refresh_mode| refresh_mode.refresh_mode),
1040 Some(Some(RefreshMode::FullReload(_)))
1041 )
1042 }
1043 })
1044 .unwrap_or(false);
1045
1046 if refreshable && row_id_index.is_some() {
1048 return Err(crate::error::ErrorCode::BindError(
1049 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1050 .to_owned(),
1051 )
1052 .into());
1053 }
1054
1055 StreamMaterialize::create_for_table(
1056 stream_plan,
1057 table_name,
1058 database_id,
1059 schema_id,
1060 table_required_dist,
1061 Order::any(),
1062 columns,
1063 definition,
1064 conflict_behavior,
1065 version_column_indices,
1066 pk_column_indices,
1067 ttl_watermark_indices,
1068 row_id_index,
1069 version,
1070 retention_seconds,
1071 webhook_info,
1072 engine,
1073 refreshable,
1074 )
1075 }
1076
1077 pub fn gen_materialize_plan(
1079 self,
1080 database_id: DatabaseId,
1081 schema_id: SchemaId,
1082 mv_name: String,
1083 definition: String,
1084 emit_on_window_close: bool,
1085 backfill_type: BackfillType,
1086 ) -> Result<StreamMaterialize> {
1087 let cardinality = self.compute_cardinality();
1088 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, backfill_type)?;
1089 StreamMaterialize::create(
1090 stream_plan,
1091 mv_name,
1092 database_id,
1093 schema_id,
1094 definition,
1095 TableType::MaterializedView,
1096 cardinality,
1097 None,
1098 )
1099 }
1100
1101 pub fn gen_index_plan(
1103 self,
1104 index_name: String,
1105 database_id: DatabaseId,
1106 schema_id: SchemaId,
1107 definition: String,
1108 retention_seconds: Option<NonZeroU32>,
1109 ) -> Result<StreamMaterialize> {
1110 let cardinality = self.compute_cardinality();
1111 let backfill_type = self.derive_backfill_type(false);
1112 let stream_plan = self.gen_optimized_stream_plan(false, backfill_type)?;
1113
1114 StreamMaterialize::create(
1115 stream_plan,
1116 index_name,
1117 database_id,
1118 schema_id,
1119 definition,
1120 TableType::Index,
1121 cardinality,
1122 retention_seconds,
1123 )
1124 }
1125
1126 pub fn gen_vector_index_plan(
1127 self,
1128 index_name: String,
1129 database_id: DatabaseId,
1130 schema_id: SchemaId,
1131 definition: String,
1132 retention_seconds: Option<NonZeroU32>,
1133 vector_index_info: PbVectorIndexInfo,
1134 ) -> Result<StreamVectorIndexWrite> {
1135 let cardinality = self.compute_cardinality();
1136 let backfill_type = self.derive_backfill_type(false);
1137 let stream_plan = self.gen_optimized_stream_plan(false, backfill_type)?;
1138
1139 StreamVectorIndexWrite::create(
1140 stream_plan,
1141 index_name,
1142 database_id,
1143 schema_id,
1144 definition,
1145 cardinality,
1146 retention_seconds,
1147 vector_index_info,
1148 )
1149 }
1150
1151 #[expect(clippy::too_many_arguments)]
1153 pub fn gen_sink_plan(
1154 self,
1155 sink_name: String,
1156 definition: String,
1157 properties: WithOptionsSecResolved,
1158 emit_on_window_close: bool,
1159 db_name: String,
1160 sink_from_table_name: String,
1161 format_desc: Option<SinkFormatDesc>,
1162 without_backfill: bool,
1163 target_table: Option<Arc<TableCatalog>>,
1164 partition_info: Option<PartitionComputeInfo>,
1165 user_specified_columns: bool,
1166 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1167 allow_snapshot_backfill: bool,
1168 ) -> Result<StreamSink> {
1169 let backfill_type = if without_backfill {
1170 BackfillType::UpstreamOnly
1171 } else if allow_snapshot_backfill
1172 && self.should_use_snapshot_backfill()
1173 && {
1174 if auto_refresh_schema_from_table.is_some() {
1175 self.plan.ctx().session_ctx().notice_to_user("Auto schema change only support for ArrangementBackfill. Switched to use ArrangementBackfill");
1176 false
1177 } else {
1178 true
1179 }
1180 }
1181 {
1182 assert!(
1183 target_table.is_none(),
1184 "should not allow snapshot backfill for sink-into-table"
1185 );
1186 BackfillType::SnapshotBackfill
1188 } else if self.should_use_arrangement_backfill() {
1189 BackfillType::ArrangementBackfill
1190 } else {
1191 BackfillType::Backfill
1192 };
1193 if auto_refresh_schema_from_table.is_some()
1194 && backfill_type != BackfillType::ArrangementBackfill
1195 {
1196 return Err(ErrorCode::InvalidInputSyntax(format!(
1197 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1198 backfill_type
1199 ))
1200 .into());
1201 }
1202 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, backfill_type)?;
1203 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1204 let columns = t.columns_without_rw_timestamp();
1205 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1206 });
1207
1208 StreamSink::create(
1209 stream_plan,
1210 sink_name,
1211 db_name,
1212 sink_from_table_name,
1213 target_table,
1214 target_columns_to_plan_mapping,
1215 definition,
1216 properties,
1217 format_desc,
1218 partition_info,
1219 auto_refresh_schema_from_table,
1220 )
1221 }
1222
1223 pub fn should_use_arrangement_backfill(&self) -> bool {
1224 let ctx = self.plan.ctx();
1225 let session_ctx = ctx.session_ctx();
1226 let arrangement_backfill_enabled = session_ctx
1227 .env()
1228 .streaming_config()
1229 .developer
1230 .enable_arrangement_backfill;
1231 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1232 }
1233
1234 pub fn should_use_snapshot_backfill(&self) -> bool {
1235 let ctx = self.plan.ctx();
1236 let session_ctx = ctx.session_ctx();
1237 let use_snapshot_backfill = session_ctx
1238 .env()
1239 .streaming_config()
1240 .developer
1241 .enable_snapshot_backfill
1242 && session_ctx.config().streaming_use_snapshot_backfill();
1243 if use_snapshot_backfill {
1244 if let Some(warning_msg) = self.plan.forbid_snapshot_backfill() {
1245 self.plan.ctx().session_ctx().notice_to_user(warning_msg);
1246 false
1247 } else {
1248 true
1249 }
1250 } else {
1251 false
1252 }
1253 }
1254}
1255
1256impl<P: PlanPhase> PlanRoot<P> {
1257 pub fn target_columns_to_plan_mapping(
1259 &self,
1260 tar_cols: &[ColumnCatalog],
1261 user_specified_columns: bool,
1262 ) -> Vec<Option<usize>> {
1263 #[expect(clippy::disallowed_methods)]
1264 let visible_cols: Vec<(usize, String)> = self
1265 .out_fields
1266 .ones()
1267 .zip_eq(self.out_names.iter().cloned())
1268 .collect_vec();
1269
1270 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1271 let visible_col_idxes_by_name = visible_cols
1272 .iter()
1273 .map(|(i, name)| (name.as_ref(), *i))
1274 .collect::<BTreeMap<_, _>>();
1275
1276 tar_cols
1277 .iter()
1278 .enumerate()
1279 .filter(|(_, tar_col)| tar_col.can_dml())
1280 .map(|(tar_i, tar_col)| {
1281 if user_specified_columns {
1282 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1283 } else {
1284 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1285 }
1286 })
1287 .collect()
1288 }
1289}
1290
1291fn find_version_column_indices(
1292 column_catalog: &Vec<ColumnCatalog>,
1293 version_column_names: Vec<String>,
1294) -> Result<Vec<usize>> {
1295 let mut indices = Vec::new();
1296 for version_column_name in version_column_names {
1297 let mut found = false;
1298 for (index, column) in column_catalog.iter().enumerate() {
1299 if column.column_desc.name == version_column_name {
1300 if let &DataType::Jsonb
1301 | &DataType::List(_)
1302 | &DataType::Struct(_)
1303 | &DataType::Bytea
1304 | &DataType::Boolean = column.data_type()
1305 {
1306 return Err(ErrorCode::InvalidInputSyntax(format!(
1307 "Version column {} must be of a comparable data type",
1308 version_column_name
1309 ))
1310 .into());
1311 }
1312 indices.push(index);
1313 found = true;
1314 break;
1315 }
1316 }
1317 if !found {
1318 return Err(ErrorCode::InvalidInputSyntax(format!(
1319 "Version column {} not found",
1320 version_column_name
1321 ))
1322 .into());
1323 }
1324 }
1325 Ok(indices)
1326}
1327
1328fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1329 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1330
1331 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1332 if let Some(error) = const_eval_rewriter.error {
1333 return Err(error);
1334 }
1335 Ok(plan)
1336}
1337
1338fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1339 ctx: OptimizerContextRef,
1340 plan: PlanRef<C>,
1341) -> Result<PlanRef<C>> {
1342 let mut v = TimestamptzExprFinder::default();
1343 plan.visit_exprs_recursive(&mut v);
1344 if v.has() {
1345 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1346 } else {
1347 Ok(plan)
1348 }
1349}
1350
1351fn exist_and_no_exchange_before(
1352 plan: &BatchPlanRef,
1353 is_candidate: fn(&BatchPlanRef) -> bool,
1354) -> bool {
1355 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1356 return false;
1357 }
1358 is_candidate(plan)
1359 || plan
1360 .inputs()
1361 .iter()
1362 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1363}
1364
1365impl BatchPlanRef {
1366 fn is_user_table_scan(&self) -> bool {
1367 self.node_type() == BatchPlanNodeType::BatchSeqScan
1368 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1369 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1370 }
1371
1372 fn is_source_scan(&self) -> bool {
1373 self.node_type() == BatchPlanNodeType::BatchSource
1374 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1375 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1376 }
1377
1378 fn is_insert(&self) -> bool {
1379 self.node_type() == BatchPlanNodeType::BatchInsert
1380 }
1381
1382 fn is_update(&self) -> bool {
1383 self.node_type() == BatchPlanNodeType::BatchUpdate
1384 }
1385
1386 fn is_delete(&self) -> bool {
1387 self.node_type() == BatchPlanNodeType::BatchDelete
1388 }
1389}
1390
1391fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1397 assert_eq!(plan.distribution(), &Distribution::Single);
1398 exist_and_no_exchange_before(&plan, |plan| {
1399 plan.is_user_table_scan()
1400 || plan.is_source_scan()
1401 || plan.is_insert()
1402 || plan.is_update()
1403 || plan.is_delete()
1404 })
1405}
1406
1407fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1410 assert_eq!(plan.distribution(), &Distribution::Single);
1411 exist_and_no_exchange_before(&plan, |plan| {
1412 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1413 })
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use super::*;
1419 use crate::optimizer::plan_node::LogicalValues;
1420
1421 #[tokio::test]
1422 async fn test_as_subplan() {
1423 let ctx = OptimizerContext::mock();
1424 let values = LogicalValues::new(
1425 vec![],
1426 Schema::new(vec![
1427 Field::with_name(DataType::Int32, "v1"),
1428 Field::with_name(DataType::Varchar, "v2"),
1429 ]),
1430 ctx,
1431 )
1432 .into();
1433 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1434 let out_names = vec!["v1".into()];
1435 let root = PlanRoot::new_with_logical_plan(
1436 values,
1437 RequiredDist::Any,
1438 Order::any(),
1439 out_fields,
1440 out_names,
1441 );
1442 let subplan = root.into_unordered_subplan();
1443 assert_eq!(
1444 subplan.schema(),
1445 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1446 );
1447 }
1448}