1use std::num::NonZeroU32;
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use risingwave_pb::catalog::PbVectorIndexInfo;
20
21pub mod plan_node;
22
23use plan_node::StreamFilter;
24pub use plan_node::{Explain, LogicalPlanRef, PlanRef};
25
26pub mod property;
27
28mod delta_join_solver;
29mod heuristic_optimizer;
30mod plan_rewriter;
31
32mod plan_visitor;
33
34#[cfg(feature = "datafusion")]
35pub use plan_visitor::DatafusionExecuteCheckerExt;
36pub use plan_visitor::{
37 ExecutionModeDecider, PlanVisitor, RelationCollectorVisitor, SysTableVisitor,
38};
39use risingwave_pb::plan_common::source_refresh_mode::RefreshMode;
40
41pub mod backfill_order_strategy;
42mod logical_optimization;
43mod optimizer_context;
44pub mod plan_expr_rewriter;
45mod plan_expr_visitor;
46mod rule;
47
48use std::collections::{BTreeMap, HashMap};
49use std::marker::PhantomData;
50
51use educe::Educe;
52use fixedbitset::FixedBitSet;
53use itertools::Itertools as _;
54pub use logical_optimization::*;
55pub use optimizer_context::*;
56use plan_expr_rewriter::ConstEvalRewriter;
57use property::Order;
58use risingwave_common::bail;
59use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ConflictBehavior, Field, Schema};
60use risingwave_common::types::DataType;
61use risingwave_common::util::column_index_mapping::ColIndexMapping;
62use risingwave_common::util::iter_util::ZipEqDebug;
63use risingwave_connector::WithPropertiesExt;
64use risingwave_connector::sink::catalog::SinkFormatDesc;
65use risingwave_pb::stream_plan::StreamScanType;
66
67use self::heuristic_optimizer::ApplyOrder;
68use self::plan_node::generic::{self, PhysicalPlanRef};
69use self::plan_node::{
70 BatchProject, LogicalProject, LogicalSource, PartitionComputeInfo, StreamDml,
71 StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
72 ToStreamContext, stream_enforce_eowc_requirement,
73};
74#[cfg(debug_assertions)]
75use self::plan_visitor::InputRefValidator;
76use self::plan_visitor::{CardinalityVisitor, StreamKeyChecker, has_batch_exchange};
77use self::property::{Cardinality, RequiredDist};
78use self::rule::*;
79use crate::TableCatalog;
80use crate::catalog::table_catalog::TableType;
81use crate::catalog::{DatabaseId, SchemaId};
82use crate::error::{ErrorCode, Result};
83use crate::expr::TimestamptzExprFinder;
84use crate::handler::create_table::{CreateTableInfo, CreateTableProps};
85use crate::optimizer::plan_node::generic::{GenericPlanRef, SourceNodeKind, Union};
86use crate::optimizer::plan_node::{
87 BackfillType, Batch, BatchExchange, BatchPlanNodeType, BatchPlanRef, ConventionMarker,
88 PlanTreeNode, Stream, StreamExchange, StreamPlanRef, StreamUnion, StreamUpstreamSinkUnion,
89 StreamVectorIndexWrite, ToStream, VisitExprsRecursive,
90};
91use crate::optimizer::plan_visitor::{
92 LocalityProviderCounter, RwTimestampValidator, TemporalJoinValidator,
93};
94use crate::optimizer::property::Distribution;
95use crate::utils::{ColIndexMappingRewriteExt, WithOptionsSecResolved};
96
97#[derive(Educe)]
108#[educe(Debug, Clone)]
109pub struct PlanRoot<P: PlanPhase> {
110 pub plan: PlanRef<P::Convention>,
112 #[educe(Debug(ignore), Clone(method(PhantomData::clone)))]
114 _phase: PhantomData<P>,
115 required_dist: RequiredDist,
116 required_order: Order,
117 out_fields: FixedBitSet,
118 out_names: Vec<String>,
119}
120
121pub trait PlanPhase {
127 type Convention: ConventionMarker;
128}
129
130macro_rules! for_all_phase {
131 () => {
132 for_all_phase! {
133 { Logical, $crate::optimizer::plan_node::Logical },
134 { BatchOptimizedLogical, $crate::optimizer::plan_node::Logical },
135 { StreamOptimizedLogical, $crate::optimizer::plan_node::Stream },
136 { Batch, $crate::optimizer::plan_node::Batch },
137 { Stream, $crate::optimizer::plan_node::Stream }
138 }
139 };
140 ($({$phase:ident, $convention:ty}),+ $(,)?) => {
141 $(
142 paste::paste! {
143 pub struct [< PlanPhase$phase >];
144 impl PlanPhase for [< PlanPhase$phase >] {
145 type Convention = $convention;
146 }
147 pub type [< $phase PlanRoot >] = PlanRoot<[< PlanPhase$phase >]>;
148 }
149 )+
150 }
151}
152
153for_all_phase!();
154
155impl LogicalPlanRoot {
156 pub fn new_with_logical_plan(
157 plan: LogicalPlanRef,
158 required_dist: RequiredDist,
159 required_order: Order,
160 out_fields: FixedBitSet,
161 out_names: Vec<String>,
162 ) -> Self {
163 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
164 }
165}
166
167impl BatchPlanRoot {
168 pub fn new_with_batch_plan(
169 plan: BatchPlanRef,
170 required_dist: RequiredDist,
171 required_order: Order,
172 out_fields: FixedBitSet,
173 out_names: Vec<String>,
174 ) -> Self {
175 Self::new_inner(plan, required_dist, required_order, out_fields, out_names)
176 }
177}
178
179impl<P: PlanPhase> PlanRoot<P> {
180 fn new_inner(
181 plan: PlanRef<P::Convention>,
182 required_dist: RequiredDist,
183 required_order: Order,
184 out_fields: FixedBitSet,
185 out_names: Vec<String>,
186 ) -> Self {
187 let input_schema = plan.schema();
188 assert_eq!(input_schema.fields().len(), out_fields.len());
189 assert_eq!(out_fields.count_ones(..), out_names.len());
190
191 Self {
192 plan,
193 _phase: PhantomData,
194 required_dist,
195 required_order,
196 out_fields,
197 out_names,
198 }
199 }
200
201 fn into_phase<P2: PlanPhase>(self, plan: PlanRef<P2::Convention>) -> PlanRoot<P2> {
202 PlanRoot {
203 plan,
204 _phase: PhantomData,
205 required_dist: self.required_dist,
206 required_order: self.required_order,
207 out_fields: self.out_fields,
208 out_names: self.out_names,
209 }
210 }
211
212 pub fn set_out_names(&mut self, out_names: Vec<String>) -> Result<()> {
217 if out_names.len() != self.out_fields.count_ones(..) {
218 Err(ErrorCode::InvalidInputSyntax(
219 "number of column names does not match number of columns".to_owned(),
220 ))?
221 }
222 self.out_names = out_names;
223 Ok(())
224 }
225
226 pub fn schema(&self) -> Schema {
228 Schema {
231 fields: self
232 .out_fields
233 .ones()
234 .map(|i| self.plan.schema().fields()[i].clone())
235 .zip_eq_debug(&self.out_names)
236 .map(|(field, name)| Field {
237 name: name.clone(),
238 ..field
239 })
240 .collect(),
241 }
242 }
243}
244
245impl LogicalPlanRoot {
246 pub fn into_unordered_subplan(self) -> LogicalPlanRef {
250 if self.out_fields.count_ones(..) == self.out_fields.len() {
251 return self.plan;
252 }
253 LogicalProject::with_out_fields(self.plan, &self.out_fields).into()
254 }
255
256 pub fn into_array_agg(self) -> Result<LogicalPlanRef> {
260 use generic::Agg;
261 use plan_node::PlanAggCall;
262 use risingwave_common::types::ListValue;
263 use risingwave_expr::aggregate::PbAggKind;
264
265 use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
266 use crate::utils::{Condition, IndexSet};
267
268 let Ok(select_idx) = self.out_fields.ones().exactly_one() else {
269 bail!("subquery must return only one column");
270 };
271 let input_column_type = self.plan.schema().fields()[select_idx].data_type();
272 let return_type = DataType::list(input_column_type.clone());
273 let agg = Agg::new(
274 vec![PlanAggCall {
275 agg_type: PbAggKind::ArrayAgg.into(),
276 return_type: return_type.clone(),
277 inputs: vec![InputRef::new(select_idx, input_column_type.clone())],
278 distinct: false,
279 order_by: self.required_order.column_orders,
280 filter: Condition::true_cond(),
281 direct_args: vec![],
282 }],
283 IndexSet::empty(),
284 self.plan,
285 );
286 Ok(LogicalProject::create(
287 agg.into(),
288 vec![
289 FunctionCall::new(
290 ExprType::Coalesce,
291 vec![
292 InputRef::new(0, return_type).into(),
293 ExprImpl::literal_list(
294 ListValue::empty(&input_column_type),
295 input_column_type,
296 ),
297 ],
298 )
299 .unwrap()
300 .into(),
301 ],
302 ))
303 }
304
305 pub fn gen_optimized_logical_plan_for_stream(mut self) -> Result<LogicalPlanRoot> {
307 self.plan = LogicalOptimizer::gen_optimized_logical_plan_for_stream(self.plan.clone())?;
308 Ok(self)
309 }
310
311 pub fn gen_optimized_logical_plan_for_batch(self) -> Result<BatchOptimizedLogicalPlanRoot> {
313 let plan = LogicalOptimizer::gen_optimized_logical_plan_for_batch(self.plan.clone())?;
314 Ok(self.into_phase(plan))
315 }
316
317 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
318 self.gen_optimized_logical_plan_for_batch()?
319 .gen_batch_plan()
320 }
321}
322
323impl BatchOptimizedLogicalPlanRoot {
324 pub fn gen_batch_plan(self) -> Result<BatchPlanRoot> {
326 if TemporalJoinValidator::exist_dangling_temporal_scan(self.plan.clone()) {
327 return Err(ErrorCode::NotSupported(
328 "do not support temporal join for batch queries".to_owned(),
329 "please use temporal join in streaming queries".to_owned(),
330 )
331 .into());
332 }
333
334 let ctx = self.plan.ctx();
335 let mut plan = inline_session_timezone_in_exprs(ctx.clone(), self.plan.clone())?;
337
338 plan = const_eval_exprs(plan)?;
340
341 if ctx.is_explain_trace() {
342 ctx.trace("Const eval exprs:");
343 ctx.trace(plan.explain_to_string());
344 }
345
346 let mut plan = plan.to_batch_with_order_required(&self.required_order)?;
348 if ctx.is_explain_trace() {
349 ctx.trace("To Batch Plan:");
350 ctx.trace(plan.explain_to_string());
351 }
352
353 plan = plan.optimize_by_rules(&OptimizationStage::<Batch>::new(
354 "Merge BatchProject",
355 vec![BatchProjectMergeRule::create()],
356 ApplyOrder::BottomUp,
357 ))?;
358
359 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
361
362 if ctx.is_explain_trace() {
363 ctx.trace("Inline Session Timezone:");
364 ctx.trace(plan.explain_to_string());
365 }
366
367 #[cfg(debug_assertions)]
368 InputRefValidator.validate(plan.clone());
369 assert_eq!(
370 *plan.distribution(),
371 Distribution::Single,
372 "{}",
373 plan.explain_to_string()
374 );
375 assert!(
376 !has_batch_exchange(plan.clone()),
377 "{}",
378 plan.explain_to_string()
379 );
380
381 let ctx = plan.ctx();
382 if ctx.is_explain_trace() {
383 ctx.trace("To Batch Physical Plan:");
384 ctx.trace(plan.explain_to_string());
385 }
386
387 Ok(self.into_phase(plan))
388 }
389
390 #[cfg(feature = "datafusion")]
391 pub fn gen_datafusion_logical_plan(self) -> Result<Arc<datafusion::logical_expr::LogicalPlan>> {
392 use datafusion::logical_expr::{Expr as DFExpr, LogicalPlan, Projection, Sort};
393 use datafusion_common::Column;
394 use plan_visitor::LogicalPlanToDataFusionExt;
395
396 use crate::datafusion::{InputColumns, convert_column_order};
397
398 tracing::debug!(
399 "Converting RisingWave logical plan to DataFusion plan:\nRisingWave Plan: {:?}",
400 self.plan
401 );
402
403 let ctx = self.plan.ctx();
404 let mut plan = inline_session_timezone_in_exprs(ctx, self.plan.clone())?;
406 plan = const_eval_exprs(plan)?;
407
408 let mut df_plan = plan.to_datafusion_logical_plan()?;
409
410 if !self.required_order.is_any() {
411 let input_columns = InputColumns::new(df_plan.schema().as_ref(), plan.schema());
412 let expr = self
413 .required_order
414 .column_orders
415 .iter()
416 .map(|column_order| convert_column_order(column_order, &input_columns))
417 .collect_vec();
418 df_plan = Arc::new(LogicalPlan::Sort(Sort {
419 expr,
420 input: df_plan,
421 fetch: None,
422 }));
423 }
424
425 if self.out_names.len() < df_plan.schema().fields().len() {
426 let df_schema = df_plan.schema().as_ref();
427 let projection_exprs = self
428 .out_fields
429 .ones()
430 .zip_eq_debug(self.out_names.iter())
431 .map(|(i, name)| {
432 DFExpr::Column(Column::from(df_schema.qualified_field(i))).alias(name)
433 })
434 .collect_vec();
435 df_plan = Arc::new(LogicalPlan::Projection(Projection::try_new(
436 projection_exprs,
437 df_plan,
438 )?));
439 }
440
441 tracing::debug!("Converted DataFusion plan:\nDataFusion Plan: {:?}", df_plan);
442
443 Ok(df_plan)
444 }
445}
446
447impl BatchPlanRoot {
448 pub fn gen_batch_distributed_plan(mut self) -> Result<BatchPlanRef> {
450 self.required_dist = RequiredDist::single();
451 let mut plan = self.plan;
452
453 plan = plan.to_distributed_with_required(&self.required_order, &self.required_dist)?;
455
456 let ctx = plan.ctx();
457 if ctx.is_explain_trace() {
458 ctx.trace("To Batch Distributed Plan:");
459 ctx.trace(plan.explain_to_string());
460 }
461 if require_additional_exchange_on_root_in_distributed_mode(plan.clone()) {
462 plan =
463 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into();
464 }
465
466 if self.out_fields.count_ones(..) != self.out_fields.len() {
468 plan =
469 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
470 }
471
472 let plan = plan.optimize_by_rules(&OptimizationStage::new(
474 "Push Limit To Scan",
475 vec![BatchPushLimitToScanRule::create()],
476 ApplyOrder::BottomUp,
477 ))?;
478
479 let plan = plan.optimize_by_rules(&OptimizationStage::new(
480 "Iceberg Count Star",
481 vec![BatchIcebergCountStar::create()],
482 ApplyOrder::TopDown,
483 ))?;
484
485 let plan = plan.optimize_by_rules(&OptimizationStage::new(
488 "Iceberg Predicate Pushdown",
489 vec![BatchIcebergPredicatePushDownRule::create()],
490 ApplyOrder::BottomUp,
491 ))?;
492
493 Ok(plan)
494 }
495
496 pub fn gen_batch_local_plan(self) -> Result<BatchPlanRef> {
498 let mut plan = self.plan;
499
500 plan = plan.to_local_with_order_required(&self.required_order)?;
502
503 let insert_exchange = match plan.distribution() {
506 Distribution::Single => require_additional_exchange_on_root_in_local_mode(plan.clone()),
507 _ => true,
508 };
509 if insert_exchange {
510 plan =
511 BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
512 }
513
514 if self.out_fields.count_ones(..) != self.out_fields.len() {
516 plan =
517 BatchProject::new(generic::Project::with_out_fields(plan, &self.out_fields)).into();
518 }
519
520 let ctx = plan.ctx();
521 if ctx.is_explain_trace() {
522 ctx.trace("To Batch Local Plan:");
523 ctx.trace(plan.explain_to_string());
524 }
525
526 let plan = plan.optimize_by_rules(&OptimizationStage::new(
528 "Push Limit To Scan",
529 vec![BatchPushLimitToScanRule::create()],
530 ApplyOrder::BottomUp,
531 ))?;
532
533 let plan = plan.optimize_by_rules(&OptimizationStage::new(
534 "Iceberg Count Star",
535 vec![BatchIcebergCountStar::create()],
536 ApplyOrder::TopDown,
537 ))?;
538 Ok(plan)
539 }
540}
541
542impl LogicalPlanRoot {
543 fn gen_optimized_stream_plan(
545 self,
546 emit_on_window_close: bool,
547 allow_snapshot_backfill: bool,
548 ) -> Result<StreamOptimizedLogicalPlanRoot> {
549 let backfill_type = if allow_snapshot_backfill && self.should_use_snapshot_backfill() {
550 BackfillType::SnapshotBackfill
551 } else if self.should_use_arrangement_backfill() {
552 BackfillType::ArrangementBackfill
553 } else {
554 BackfillType::Backfill
555 };
556 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)
557 }
558
559 fn gen_optimized_stream_plan_inner(
560 self,
561 emit_on_window_close: bool,
562 backfill_type: BackfillType,
563 ) -> Result<StreamOptimizedLogicalPlanRoot> {
564 let ctx = self.plan.ctx();
565 let _explain_trace = ctx.is_explain_trace();
566
567 let optimized_plan = self.gen_stream_plan(emit_on_window_close, backfill_type)?;
568
569 let mut plan = optimized_plan
570 .plan
571 .clone()
572 .optimize_by_rules(&OptimizationStage::new(
573 "Merge StreamProject",
574 vec![StreamProjectMergeRule::create()],
575 ApplyOrder::BottomUp,
576 ))?;
577
578 if ctx
579 .session_ctx()
580 .config()
581 .streaming_separate_consecutive_join()
582 {
583 plan = plan.optimize_by_rules(&OptimizationStage::new(
584 "Separate consecutive StreamHashJoin by no-shuffle StreamExchange",
585 vec![SeparateConsecutiveJoinRule::create()],
586 ApplyOrder::BottomUp,
587 ))?;
588 }
589
590 if ctx.session_ctx().config().streaming_enable_unaligned_join() {
594 plan = plan.optimize_by_rules(&OptimizationStage::new(
595 "Add Logstore for Unaligned join",
596 vec![AddLogstoreRule::create()],
597 ApplyOrder::BottomUp,
598 ))?;
599 }
600
601 if ctx.session_ctx().config().streaming_enable_delta_join()
602 && ctx.session_ctx().config().enable_index_selection()
603 {
604 plan = plan.optimize_by_rules(&OptimizationStage::new(
607 "To IndexDeltaJoin",
608 vec![IndexDeltaJoinRule::create()],
609 ApplyOrder::BottomUp,
610 ))?;
611 }
612 plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
614
615 if ctx.is_explain_trace() {
616 ctx.trace("Inline session timezone:");
617 ctx.trace(plan.explain_to_string());
618 }
619
620 plan = const_eval_exprs(plan)?;
622
623 if ctx.is_explain_trace() {
624 ctx.trace("Const eval exprs:");
625 ctx.trace(plan.explain_to_string());
626 }
627
628 #[cfg(debug_assertions)]
629 InputRefValidator.validate(plan.clone());
630
631 if TemporalJoinValidator::exist_dangling_temporal_scan(plan.clone()) {
632 return Err(ErrorCode::NotSupported(
633 "exist dangling temporal scan".to_owned(),
634 "please check your temporal join syntax e.g. consider removing the right outer join if it is being used.".to_owned(),
635 ).into());
636 }
637
638 if RwTimestampValidator::select_rw_timestamp_in_stream_query(plan.clone()) {
639 return Err(ErrorCode::NotSupported(
640 "selecting `_rw_timestamp` in a streaming query is not allowed".to_owned(),
641 "please run the sql in batch mode or remove the column `_rw_timestamp` from the streaming query".to_owned(),
642 ).into());
643 }
644
645 if ctx.session_ctx().config().enable_locality_backfill()
646 && LocalityProviderCounter::count(plan.clone()) > 5
647 {
648 risingwave_common::license::Feature::LocalityBackfill.check_available()?
649 }
650
651 Ok(optimized_plan.into_phase(plan))
652 }
653
654 fn gen_stream_plan(
656 self,
657 emit_on_window_close: bool,
658 backfill_type: BackfillType,
659 ) -> Result<StreamOptimizedLogicalPlanRoot> {
660 let ctx = self.plan.ctx();
661 let explain_trace = ctx.is_explain_trace();
662
663 let plan = {
664 {
665 if !ctx
666 .session_ctx()
667 .config()
668 .streaming_allow_jsonb_in_stream_key()
669 && let Some(err) = StreamKeyChecker.visit(self.plan.clone())
670 {
671 return Err(ErrorCode::NotSupported(
672 err,
673 "Using JSONB columns as part of the join or aggregation keys can severely impair performance. \
674 If you intend to proceed, force to enable it with: `set rw_streaming_allow_jsonb_in_stream_key to true`".to_owned(),
675 ).into());
676 }
677 let mut optimized_plan = self.gen_optimized_logical_plan_for_stream()?;
678 let (plan, out_col_change) = {
679 let (plan, out_col_change) = optimized_plan
680 .plan
681 .logical_rewrite_for_stream(&mut Default::default())?;
682 if out_col_change.is_injective() {
683 (plan, out_col_change)
684 } else {
685 let mut output_indices = (0..plan.schema().len()).collect_vec();
686 #[allow(unused_assignments)]
687 let (mut map, mut target_size) = out_col_change.into_parts();
688
689 target_size = plan.schema().len();
692 let mut tar_exists = vec![false; target_size];
693 for i in map.iter_mut().flatten() {
694 if tar_exists[*i] {
695 output_indices.push(*i);
696 *i = target_size;
697 target_size += 1;
698 } else {
699 tar_exists[*i] = true;
700 }
701 }
702 let plan =
703 LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
704 let out_col_change = ColIndexMapping::new(map, target_size);
705 (plan.into(), out_col_change)
706 }
707 };
708
709 if explain_trace {
710 ctx.trace("Logical Rewrite For Stream:");
711 ctx.trace(plan.explain_to_string());
712 }
713
714 optimized_plan.required_dist =
715 out_col_change.rewrite_required_distribution(&optimized_plan.required_dist);
716 optimized_plan.required_order = out_col_change
717 .rewrite_required_order(&optimized_plan.required_order)
718 .unwrap();
719 optimized_plan.out_fields =
720 out_col_change.rewrite_bitset(&optimized_plan.out_fields);
721 let mut plan = plan.to_stream_with_dist_required(
722 &optimized_plan.required_dist,
723 &mut ToStreamContext::new_with_backfill_type(
724 emit_on_window_close,
725 backfill_type,
726 ),
727 )?;
728 plan = stream_enforce_eowc_requirement(ctx.clone(), plan, emit_on_window_close)?;
729 optimized_plan.into_phase(plan)
730 }
731 };
732
733 if explain_trace {
734 ctx.trace("To Stream Plan:");
735 ctx.trace(<PlanRef<Stream> as Explain>::explain_to_string(&plan.plan));
737 }
738 Ok(plan)
739 }
740
741 fn compute_cardinality(&self) -> Cardinality {
745 CardinalityVisitor.visit(self.plan.clone())
746 }
747
748 pub fn gen_table_plan(
750 self,
751 context: OptimizerContextRef,
752 table_name: String,
753 database_id: DatabaseId,
754 schema_id: SchemaId,
755 CreateTableInfo {
756 columns,
757 pk_column_ids,
758 row_id_index,
759 watermark_descs,
760 source_catalog,
761 version,
762 }: CreateTableInfo,
763 CreateTableProps {
764 definition,
765 append_only,
766 on_conflict,
767 with_version_columns,
768 webhook_info,
769 engine,
770 }: CreateTableProps,
771 ) -> Result<StreamMaterialize> {
772 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
774
775 assert!(!pk_column_ids.is_empty() || row_id_index.is_some());
776
777 let pk_column_indices = {
778 let mut id_to_idx = HashMap::new();
779
780 columns.iter().enumerate().for_each(|(idx, c)| {
781 id_to_idx.insert(c.column_id(), idx);
782 });
783 pk_column_ids
784 .iter()
785 .map(|c| id_to_idx.get(c).copied().unwrap()) .collect_vec()
787 };
788
789 fn inject_project_for_generated_column_if_needed(
790 columns: &[ColumnCatalog],
791 node: StreamPlanRef,
792 ) -> Result<StreamPlanRef> {
793 let exprs = LogicalSource::derive_output_exprs_from_generated_columns(columns)?;
794 if let Some(exprs) = exprs {
795 let logical_project = generic::Project::new(exprs, node);
796 return Ok(StreamProject::new(logical_project).into());
797 }
798 Ok(node)
799 }
800
801 #[derive(PartialEq, Debug, Copy, Clone)]
802 enum PrimaryKeyKind {
803 UserDefinedPrimaryKey,
804 NonAppendOnlyRowIdPk,
805 AppendOnlyRowIdPk,
806 }
807
808 fn inject_dml_node(
809 columns: &[ColumnCatalog],
810 append_only: bool,
811 stream_plan: StreamPlanRef,
812 pk_column_indices: &[usize],
813 kind: PrimaryKeyKind,
814 column_descs: Vec<ColumnDesc>,
815 ) -> Result<StreamPlanRef> {
816 let mut dml_node = StreamDml::new(stream_plan, append_only, column_descs).into();
817
818 dml_node = inject_project_for_generated_column_if_needed(columns, dml_node)?;
820
821 dml_node = match kind {
822 PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::NonAppendOnlyRowIdPk => {
823 RequiredDist::hash_shard(pk_column_indices)
824 .streaming_enforce_if_not_satisfies(dml_node)?
825 }
826 PrimaryKeyKind::AppendOnlyRowIdPk => {
827 StreamExchange::new_no_shuffle(dml_node).into()
828 }
829 };
830
831 Ok(dml_node)
832 }
833
834 let kind = if let Some(row_id_index) = row_id_index {
835 assert_eq!(
836 pk_column_indices.iter().exactly_one().copied().unwrap(),
837 row_id_index
838 );
839 if append_only {
840 PrimaryKeyKind::AppendOnlyRowIdPk
841 } else {
842 PrimaryKeyKind::NonAppendOnlyRowIdPk
843 }
844 } else {
845 PrimaryKeyKind::UserDefinedPrimaryKey
846 };
847
848 let column_descs: Vec<ColumnDesc> = columns
849 .iter()
850 .filter(|&c| c.can_dml())
851 .map(|c| c.column_desc.clone())
852 .collect();
853
854 let mut not_null_idxs = vec![];
855 for (idx, column) in column_descs.iter().enumerate() {
856 if !column.nullable {
857 not_null_idxs.push(idx);
858 }
859 }
860
861 let version_column_indices = if !with_version_columns.is_empty() {
862 find_version_column_indices(&columns, with_version_columns)?
863 } else {
864 vec![]
865 };
866
867 let with_external_source = source_catalog.is_some();
868 let (dml_source_node, external_source_node) = if with_external_source {
869 let dummy_source_node = LogicalSource::new(
870 None,
871 columns.clone(),
872 row_id_index,
873 SourceNodeKind::CreateTable,
874 context.clone(),
875 None,
876 )
877 .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;
878 let mut external_source_node = stream_plan.plan;
879 external_source_node =
880 inject_project_for_generated_column_if_needed(&columns, external_source_node)?;
881 external_source_node = match kind {
882 PrimaryKeyKind::UserDefinedPrimaryKey => {
883 RequiredDist::hash_shard(&pk_column_indices)
884 .streaming_enforce_if_not_satisfies(external_source_node)?
885 }
886
887 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
888 StreamExchange::new_no_shuffle(external_source_node).into()
889 }
890 };
891 (dummy_source_node, Some(external_source_node))
892 } else {
893 (stream_plan.plan, None)
894 };
895
896 let dml_node = inject_dml_node(
897 &columns,
898 append_only,
899 dml_source_node,
900 &pk_column_indices,
901 kind,
902 column_descs,
903 )?;
904
905 let dists = external_source_node
906 .iter()
907 .map(|input| input.distribution())
908 .chain([dml_node.distribution()])
909 .unique()
910 .collect_vec();
911
912 let dist = match &dists[..] {
913 &[Distribution::SomeShard, Distribution::HashShard(_)]
914 | &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
915 &[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
916 dist.clone()
917 }
918 _ => {
919 unreachable!()
920 }
921 };
922
923 let generated_column_exprs =
924 LogicalSource::derive_output_exprs_from_generated_columns(&columns)?;
925 let upstream_sink_union = StreamUpstreamSinkUnion::new(
926 context.clone(),
927 dml_node.schema(),
928 dml_node.stream_key(),
929 dist.clone(), append_only,
931 row_id_index.is_none(),
932 generated_column_exprs,
933 );
934
935 let union_inputs = external_source_node
936 .into_iter()
937 .chain([dml_node, upstream_sink_union.into()])
938 .collect_vec();
939
940 let mut stream_plan = StreamUnion::new_with_dist(
941 Union {
942 all: true,
943 inputs: union_inputs,
944 source_col: None,
945 },
946 dist,
947 )
948 .into();
949
950 let ttl_watermark_indices = watermark_descs
951 .iter()
952 .filter(|d| d.with_ttl)
953 .map(|d| d.watermark_idx as usize)
954 .collect_vec();
955
956 if !watermark_descs.is_empty() {
958 stream_plan = StreamWatermarkFilter::new(stream_plan, watermark_descs).into();
959 }
960
961 if let Some(row_id_index) = row_id_index {
963 match kind {
964 PrimaryKeyKind::UserDefinedPrimaryKey => {
965 unreachable!()
966 }
967 PrimaryKeyKind::NonAppendOnlyRowIdPk | PrimaryKeyKind::AppendOnlyRowIdPk => {
968 stream_plan = StreamRowIdGen::new_with_dist(
969 stream_plan,
970 row_id_index,
971 Distribution::HashShard(vec![row_id_index]),
972 )
973 .into();
974 }
975 }
976 }
977
978 let conflict_behavior = on_conflict.to_behavior(append_only, row_id_index.is_some())?;
979
980 if let ConflictBehavior::IgnoreConflict = conflict_behavior
981 && !version_column_indices.is_empty()
982 {
983 Err(ErrorCode::InvalidParameterValue(
984 "The with version column syntax cannot be used with the ignore behavior of on conflict".to_owned(),
985 ))?
986 }
987
988 let retention_seconds = context.with_options().retention_seconds();
989
990 let table_required_dist = {
991 let mut bitset = FixedBitSet::with_capacity(columns.len());
992 for idx in &pk_column_indices {
993 bitset.insert(*idx);
994 }
995 RequiredDist::ShardByKey(bitset)
996 };
997
998 let mut stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
999
1000 if !not_null_idxs.is_empty() {
1001 stream_plan =
1002 StreamFilter::filter_out_any_null_rows(stream_plan.clone(), ¬_null_idxs);
1003 }
1004
1005 let refreshable = source_catalog
1007 .as_ref()
1008 .map(|catalog| {
1009 catalog.with_properties.is_batch_connector() || {
1010 matches!(
1011 catalog
1012 .refresh_mode
1013 .as_ref()
1014 .map(|refresh_mode| refresh_mode.refresh_mode),
1015 Some(Some(RefreshMode::FullReload(_)))
1016 )
1017 }
1018 })
1019 .unwrap_or(false);
1020
1021 if refreshable && row_id_index.is_some() {
1023 return Err(crate::error::ErrorCode::BindError(
1024 "Refreshable tables must have a PRIMARY KEY. Please define a primary key for the table."
1025 .to_owned(),
1026 )
1027 .into());
1028 }
1029
1030 StreamMaterialize::create_for_table(
1031 stream_plan,
1032 table_name,
1033 database_id,
1034 schema_id,
1035 table_required_dist,
1036 Order::any(),
1037 columns,
1038 definition,
1039 conflict_behavior,
1040 version_column_indices,
1041 pk_column_indices,
1042 ttl_watermark_indices,
1043 row_id_index,
1044 version,
1045 retention_seconds,
1046 webhook_info,
1047 engine,
1048 refreshable,
1049 )
1050 }
1051
1052 pub fn gen_materialize_plan(
1054 self,
1055 database_id: DatabaseId,
1056 schema_id: SchemaId,
1057 mv_name: String,
1058 definition: String,
1059 emit_on_window_close: bool,
1060 ) -> Result<StreamMaterialize> {
1061 let cardinality = self.compute_cardinality();
1062 let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close, true)?;
1063 StreamMaterialize::create(
1064 stream_plan,
1065 mv_name,
1066 database_id,
1067 schema_id,
1068 definition,
1069 TableType::MaterializedView,
1070 cardinality,
1071 None,
1072 )
1073 }
1074
1075 pub fn gen_index_plan(
1077 self,
1078 index_name: String,
1079 database_id: DatabaseId,
1080 schema_id: SchemaId,
1081 definition: String,
1082 retention_seconds: Option<NonZeroU32>,
1083 ) -> Result<StreamMaterialize> {
1084 let cardinality = self.compute_cardinality();
1085 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1086
1087 StreamMaterialize::create(
1088 stream_plan,
1089 index_name,
1090 database_id,
1091 schema_id,
1092 definition,
1093 TableType::Index,
1094 cardinality,
1095 retention_seconds,
1096 )
1097 }
1098
1099 pub fn gen_vector_index_plan(
1100 self,
1101 index_name: String,
1102 database_id: DatabaseId,
1103 schema_id: SchemaId,
1104 definition: String,
1105 retention_seconds: Option<NonZeroU32>,
1106 vector_index_info: PbVectorIndexInfo,
1107 ) -> Result<StreamVectorIndexWrite> {
1108 let cardinality = self.compute_cardinality();
1109 let stream_plan = self.gen_optimized_stream_plan(false, false)?;
1110
1111 StreamVectorIndexWrite::create(
1112 stream_plan,
1113 index_name,
1114 database_id,
1115 schema_id,
1116 definition,
1117 cardinality,
1118 retention_seconds,
1119 vector_index_info,
1120 )
1121 }
1122
1123 #[allow(clippy::too_many_arguments)]
1125 pub fn gen_sink_plan(
1126 self,
1127 sink_name: String,
1128 definition: String,
1129 properties: WithOptionsSecResolved,
1130 emit_on_window_close: bool,
1131 db_name: String,
1132 sink_from_table_name: String,
1133 format_desc: Option<SinkFormatDesc>,
1134 without_backfill: bool,
1135 target_table: Option<Arc<TableCatalog>>,
1136 partition_info: Option<PartitionComputeInfo>,
1137 user_specified_columns: bool,
1138 auto_refresh_schema_from_table: Option<Arc<TableCatalog>>,
1139 ) -> Result<StreamSink> {
1140 let backfill_type = if without_backfill {
1141 BackfillType::UpstreamOnly
1142 } else if target_table.is_none() && self.should_use_snapshot_backfill() {
1143 BackfillType::SnapshotBackfill
1145 } else if self.should_use_arrangement_backfill() {
1146 BackfillType::ArrangementBackfill
1147 } else {
1148 BackfillType::Backfill
1149 };
1150 if auto_refresh_schema_from_table.is_some()
1151 && backfill_type != BackfillType::ArrangementBackfill
1152 {
1153 return Err(ErrorCode::InvalidInputSyntax(format!(
1154 "auto schema change only support for ArrangementBackfill, but got: {:?}",
1155 backfill_type
1156 ))
1157 .into());
1158 }
1159 let stream_plan =
1160 self.gen_optimized_stream_plan_inner(emit_on_window_close, backfill_type)?;
1161 let target_columns_to_plan_mapping = target_table.as_ref().map(|t| {
1162 let columns = t.columns_without_rw_timestamp();
1163 stream_plan.target_columns_to_plan_mapping(&columns, user_specified_columns)
1164 });
1165
1166 StreamSink::create(
1167 stream_plan,
1168 sink_name,
1169 db_name,
1170 sink_from_table_name,
1171 target_table,
1172 target_columns_to_plan_mapping,
1173 definition,
1174 properties,
1175 format_desc,
1176 partition_info,
1177 auto_refresh_schema_from_table,
1178 )
1179 }
1180}
1181
1182impl<P: PlanPhase> PlanRoot<P> {
1183 pub fn should_use_arrangement_backfill(&self) -> bool {
1184 let ctx = self.plan.ctx();
1185 let session_ctx = ctx.session_ctx();
1186 let arrangement_backfill_enabled = session_ctx
1187 .env()
1188 .streaming_config()
1189 .developer
1190 .enable_arrangement_backfill;
1191 arrangement_backfill_enabled && session_ctx.config().streaming_use_arrangement_backfill()
1192 }
1193
1194 pub fn should_use_snapshot_backfill(&self) -> bool {
1195 self.plan
1196 .ctx()
1197 .session_ctx()
1198 .config()
1199 .streaming_use_snapshot_backfill()
1200 }
1201
1202 pub fn target_columns_to_plan_mapping(
1204 &self,
1205 tar_cols: &[ColumnCatalog],
1206 user_specified_columns: bool,
1207 ) -> Vec<Option<usize>> {
1208 #[allow(clippy::disallowed_methods)]
1209 let visible_cols: Vec<(usize, String)> = self
1210 .out_fields
1211 .ones()
1212 .zip_eq(self.out_names.iter().cloned())
1213 .collect_vec();
1214
1215 let visible_col_idxes = visible_cols.iter().map(|(i, _)| *i).collect_vec();
1216 let visible_col_idxes_by_name = visible_cols
1217 .iter()
1218 .map(|(i, name)| (name.as_ref(), *i))
1219 .collect::<BTreeMap<_, _>>();
1220
1221 tar_cols
1222 .iter()
1223 .enumerate()
1224 .filter(|(_, tar_col)| tar_col.can_dml())
1225 .map(|(tar_i, tar_col)| {
1226 if user_specified_columns {
1227 visible_col_idxes_by_name.get(tar_col.name()).cloned()
1228 } else {
1229 (tar_i < visible_col_idxes.len()).then(|| visible_cols[tar_i].0)
1230 }
1231 })
1232 .collect()
1233 }
1234}
1235
1236fn find_version_column_indices(
1237 column_catalog: &Vec<ColumnCatalog>,
1238 version_column_names: Vec<String>,
1239) -> Result<Vec<usize>> {
1240 let mut indices = Vec::new();
1241 for version_column_name in version_column_names {
1242 let mut found = false;
1243 for (index, column) in column_catalog.iter().enumerate() {
1244 if column.column_desc.name == version_column_name {
1245 if let &DataType::Jsonb
1246 | &DataType::List(_)
1247 | &DataType::Struct(_)
1248 | &DataType::Bytea
1249 | &DataType::Boolean = column.data_type()
1250 {
1251 return Err(ErrorCode::InvalidInputSyntax(format!(
1252 "Version column {} must be of a comparable data type",
1253 version_column_name
1254 ))
1255 .into());
1256 }
1257 indices.push(index);
1258 found = true;
1259 break;
1260 }
1261 }
1262 if !found {
1263 return Err(ErrorCode::InvalidInputSyntax(format!(
1264 "Version column {} not found",
1265 version_column_name
1266 ))
1267 .into());
1268 }
1269 }
1270 Ok(indices)
1271}
1272
1273fn const_eval_exprs<C: ConventionMarker>(plan: PlanRef<C>) -> Result<PlanRef<C>> {
1274 let mut const_eval_rewriter = ConstEvalRewriter { error: None };
1275
1276 let plan = plan.rewrite_exprs_recursive(&mut const_eval_rewriter);
1277 if let Some(error) = const_eval_rewriter.error {
1278 return Err(error);
1279 }
1280 Ok(plan)
1281}
1282
1283fn inline_session_timezone_in_exprs<C: ConventionMarker>(
1284 ctx: OptimizerContextRef,
1285 plan: PlanRef<C>,
1286) -> Result<PlanRef<C>> {
1287 let mut v = TimestamptzExprFinder::default();
1288 plan.visit_exprs_recursive(&mut v);
1289 if v.has() {
1290 Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()))
1291 } else {
1292 Ok(plan)
1293 }
1294}
1295
1296fn exist_and_no_exchange_before(
1297 plan: &BatchPlanRef,
1298 is_candidate: fn(&BatchPlanRef) -> bool,
1299) -> bool {
1300 if plan.node_type() == BatchPlanNodeType::BatchExchange {
1301 return false;
1302 }
1303 is_candidate(plan)
1304 || plan
1305 .inputs()
1306 .iter()
1307 .any(|input| exist_and_no_exchange_before(input, is_candidate))
1308}
1309
1310impl BatchPlanRef {
1311 fn is_user_table_scan(&self) -> bool {
1312 self.node_type() == BatchPlanNodeType::BatchSeqScan
1313 || self.node_type() == BatchPlanNodeType::BatchLogSeqScan
1314 || self.node_type() == BatchPlanNodeType::BatchVectorSearch
1315 }
1316
1317 fn is_source_scan(&self) -> bool {
1318 self.node_type() == BatchPlanNodeType::BatchSource
1319 || self.node_type() == BatchPlanNodeType::BatchKafkaScan
1320 || self.node_type() == BatchPlanNodeType::BatchIcebergScan
1321 }
1322
1323 fn is_insert(&self) -> bool {
1324 self.node_type() == BatchPlanNodeType::BatchInsert
1325 }
1326
1327 fn is_update(&self) -> bool {
1328 self.node_type() == BatchPlanNodeType::BatchUpdate
1329 }
1330
1331 fn is_delete(&self) -> bool {
1332 self.node_type() == BatchPlanNodeType::BatchDelete
1333 }
1334}
1335
1336fn require_additional_exchange_on_root_in_distributed_mode(plan: BatchPlanRef) -> bool {
1342 assert_eq!(plan.distribution(), &Distribution::Single);
1343 exist_and_no_exchange_before(&plan, |plan| {
1344 plan.is_user_table_scan()
1345 || plan.is_source_scan()
1346 || plan.is_insert()
1347 || plan.is_update()
1348 || plan.is_delete()
1349 })
1350}
1351
1352fn require_additional_exchange_on_root_in_local_mode(plan: BatchPlanRef) -> bool {
1355 assert_eq!(plan.distribution(), &Distribution::Single);
1356 exist_and_no_exchange_before(&plan, |plan| {
1357 plan.is_user_table_scan() || plan.is_source_scan() || plan.is_insert()
1358 })
1359}
1360
1361#[cfg(test)]
1362mod tests {
1363 use super::*;
1364 use crate::optimizer::plan_node::LogicalValues;
1365
1366 #[tokio::test]
1367 async fn test_as_subplan() {
1368 let ctx = OptimizerContext::mock().await;
1369 let values = LogicalValues::new(
1370 vec![],
1371 Schema::new(vec![
1372 Field::with_name(DataType::Int32, "v1"),
1373 Field::with_name(DataType::Varchar, "v2"),
1374 ]),
1375 ctx,
1376 )
1377 .into();
1378 let out_fields = FixedBitSet::with_capacity_and_blocks(2, [1]);
1379 let out_names = vec!["v1".into()];
1380 let root = PlanRoot::new_with_logical_plan(
1381 values,
1382 RequiredDist::Any,
1383 Order::any(),
1384 out_fields,
1385 out_names,
1386 );
1387 let subplan = root.into_unordered_subplan();
1388 assert_eq!(
1389 subplan.schema(),
1390 &Schema::new(vec![Field::with_name(DataType::Int32, "v1")])
1391 );
1392 }
1393}