risingwave_frontend/optimizer/plan_node/
logical_delete.rs1use risingwave_common::catalog::TableVersionId;
16
17use super::utils::impl_distill_by_unit;
18use super::{
19 BatchDelete, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
20 PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
21};
22use crate::catalog::TableId;
23use crate::error::Result;
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalDelete {
35 pub base: PlanBase<Logical>,
36 core: generic::Delete<PlanRef>,
37}
38
39impl From<generic::Delete<PlanRef>> for LogicalDelete {
40 fn from(core: generic::Delete<PlanRef>) -> Self {
41 let base = PlanBase::new_logical_with_core(&core);
42 Self { base, core }
43 }
44}
45
46impl LogicalDelete {
47 #[must_use]
48 pub fn table_id(&self) -> TableId {
49 self.core.table_id
50 }
51
52 pub fn has_returning(&self) -> bool {
53 self.core.returning
54 }
55
56 pub fn table_version_id(&self) -> TableVersionId {
57 self.core.table_version_id
58 }
59}
60
61impl PlanTreeNodeUnary for LogicalDelete {
62 fn input(&self) -> PlanRef {
63 self.core.input.clone()
64 }
65
66 fn clone_with_input(&self, input: PlanRef) -> Self {
67 let mut core = self.core.clone();
68 core.input = input;
69 core.into()
70 }
71}
72
73impl_plan_tree_node_for_unary! { LogicalDelete }
74impl_distill_by_unit!(LogicalDelete, core, "LogicalDelete");
75
76impl ColPrunable for LogicalDelete {
77 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
78 let pruned_input = {
79 let input = &self.core.input;
80 let required_cols: Vec<_> = (0..input.schema().len()).collect();
81 input.prune_col(&required_cols, ctx)
82 };
83
84 LogicalProject::with_out_col_idx(
86 self.clone_with_input(pruned_input).into(),
87 required_cols.iter().copied(),
88 )
89 .into()
90 }
91}
92
93impl ExprRewritable for LogicalDelete {}
94
95impl ExprVisitable for LogicalDelete {}
96
97impl PredicatePushdown for LogicalDelete {
98 fn predicate_pushdown(
99 &self,
100 predicate: Condition,
101 ctx: &mut PredicatePushdownContext,
102 ) -> PlanRef {
103 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
104 }
105}
106
107impl ToBatch for LogicalDelete {
108 fn to_batch(&self) -> Result<PlanRef> {
109 let new_input = self.input().to_batch()?;
110 let mut core = self.core.clone();
111 core.input = new_input;
112 Ok(BatchDelete::new(core).into())
113 }
114}
115
116impl ToStream for LogicalDelete {
117 fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
118 unreachable!("delete should always be converted to batch plan");
119 }
120
121 fn logical_rewrite_for_stream(
122 &self,
123 _ctx: &mut RewriteStreamContext,
124 ) -> Result<(PlanRef, ColIndexMapping)> {
125 unreachable!("delete should always be converted to batch plan");
126 }
127}