risingwave_frontend/optimizer/plan_node/
logical_delete.rs1use risingwave_common::catalog::TableVersionId;
16
17use super::utils::impl_distill_by_unit;
18use super::{
19 BatchDelete, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
20 PlanBase, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown,
21 generic,
22};
23use crate::catalog::TableId;
24use crate::error::Result;
25use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
26use crate::optimizer::plan_node::{
27 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
28};
29use crate::utils::{ColIndexMapping, Condition};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalDelete {
36 pub base: PlanBase<Logical>,
37 core: generic::Delete<PlanRef>,
38}
39
40impl From<generic::Delete<PlanRef>> for LogicalDelete {
41 fn from(core: generic::Delete<PlanRef>) -> Self {
42 let base = PlanBase::new_logical_with_core(&core);
43 Self { base, core }
44 }
45}
46
47impl LogicalDelete {
48 #[must_use]
49 pub fn table_id(&self) -> TableId {
50 self.core.table_id
51 }
52
53 pub fn has_returning(&self) -> bool {
54 self.core.returning
55 }
56
57 pub fn table_version_id(&self) -> TableVersionId {
58 self.core.table_version_id
59 }
60}
61
62impl PlanTreeNodeUnary<Logical> for LogicalDelete {
63 fn input(&self) -> PlanRef {
64 self.core.input.clone()
65 }
66
67 fn clone_with_input(&self, input: PlanRef) -> Self {
68 let mut core = self.core.clone();
69 core.input = input;
70 core.into()
71 }
72}
73
74impl_plan_tree_node_for_unary! { Logical, LogicalDelete }
75impl_distill_by_unit!(LogicalDelete, core, "LogicalDelete");
76
77impl ColPrunable for LogicalDelete {
78 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
79 let pruned_input = {
80 let input = &self.core.input;
81 let required_cols: Vec<_> = (0..input.schema().len()).collect();
82 input.prune_col(&required_cols, ctx)
83 };
84
85 LogicalProject::with_out_col_idx(
87 self.clone_with_input(pruned_input).into(),
88 required_cols.iter().copied(),
89 )
90 .into()
91 }
92}
93
94impl ExprRewritable<Logical> for LogicalDelete {}
95
96impl ExprVisitable for LogicalDelete {}
97
98impl PredicatePushdown for LogicalDelete {
99 fn predicate_pushdown(
100 &self,
101 predicate: Condition,
102 ctx: &mut PredicatePushdownContext,
103 ) -> PlanRef {
104 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
105 }
106}
107
108impl ToBatch for LogicalDelete {
109 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
110 let new_input = self.input().to_batch()?;
111 let core = self.core.clone_with_input(new_input);
112 Ok(BatchDelete::new(core).into())
113 }
114}
115
116impl ToStream for LogicalDelete {
117 fn to_stream(
118 &self,
119 _ctx: &mut ToStreamContext,
120 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
121 unreachable!("delete should always be converted to batch plan");
122 }
123
124 fn logical_rewrite_for_stream(
125 &self,
126 _ctx: &mut RewriteStreamContext,
127 ) -> Result<(PlanRef, ColIndexMapping)> {
128 unreachable!("delete should always be converted to batch plan");
129 }
130}