risingwave_frontend/optimizer/plan_node/
logical_update.rs1use super::generic::GenericPlanRef;
16use super::utils::impl_distill_by_unit;
17use super::{
18 BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
19 PlanBase, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown,
20 generic,
21};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26 ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalUpdate {
36 pub base: PlanBase<Logical>,
37 core: generic::Update<PlanRef>,
38}
39
40impl From<generic::Update<PlanRef>> for LogicalUpdate {
41 fn from(core: generic::Update<PlanRef>) -> Self {
42 let base = PlanBase::new_logical_with_core(&core);
43 Self { base, core }
44 }
45}
46
47impl PlanTreeNodeUnary<Logical> for LogicalUpdate {
48 fn input(&self) -> PlanRef {
49 self.core.input.clone()
50 }
51
52 fn clone_with_input(&self, input: PlanRef) -> Self {
53 let mut core = self.core.clone();
54 core.input = input;
55 core.into()
56 }
57}
58
59impl_plan_tree_node_for_unary! { Logical, LogicalUpdate }
60impl_distill_by_unit!(LogicalUpdate, core, "LogicalUpdate");
61
62impl ExprRewritable<Logical> for LogicalUpdate {
63 fn has_rewritable_expr(&self) -> bool {
64 true
65 }
66
67 fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
68 let mut core = self.core.clone();
69 core.rewrite_exprs(r);
70 Self::from(core).into()
71 }
72}
73
74impl ExprVisitable for LogicalUpdate {
75 fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
76 self.core.visit_exprs(v);
77 }
78}
79
80impl ColPrunable for LogicalUpdate {
81 fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
82 let pruned_input = {
83 let input = &self.core.input;
84 let required_cols: Vec<_> = (0..input.schema().len()).collect();
85 input.prune_col(&required_cols, ctx)
86 };
87
88 LogicalProject::with_out_col_idx(
90 self.clone_with_input(pruned_input).into(),
91 required_cols.iter().copied(),
92 )
93 .into()
94 }
95}
96
97impl PredicatePushdown for LogicalUpdate {
98 fn predicate_pushdown(
99 &self,
100 predicate: Condition,
101 ctx: &mut PredicatePushdownContext,
102 ) -> PlanRef {
103 gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
104 }
105}
106
107impl ToBatch for LogicalUpdate {
108 fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
109 let new_input = self.input().to_batch()?;
110 let core = generic::Update {
111 table_name: self.core.table_name.clone(),
112 table_id: self.core.table_id,
113 table_version_id: self.core.table_version_id,
114 input: new_input,
115 old_exprs: self.core.old_exprs.clone(),
116 new_exprs: self.core.new_exprs.clone(),
117 returning: self.core.returning,
118 };
119 Ok(BatchUpdate::new(core, self.schema().clone()).into())
120 }
121}
122
123impl ToStream for LogicalUpdate {
124 fn to_stream(
125 &self,
126 _ctx: &mut ToStreamContext,
127 ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
128 unreachable!("update should always be converted to batch plan");
129 }
130
131 fn logical_rewrite_for_stream(
132 &self,
133 _ctx: &mut RewriteStreamContext,
134 ) -> Result<(PlanRef, ColIndexMapping)> {
135 unreachable!("update should always be converted to batch plan");
136 }
137}