risingwave_frontend/optimizer/plan_node/
logical_update.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::generic::GenericPlanRef;
16use super::utils::impl_distill_by_unit;
17use super::{
18    BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
19    PlanBase, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown,
20    generic,
21};
22use crate::error::Result;
23use crate::expr::{ExprRewriter, ExprVisitor};
24use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
25use crate::optimizer::plan_node::{
26    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
27};
28use crate::utils::{ColIndexMapping, Condition};
29
30/// [`LogicalUpdate`] iterates on input relation, set some columns, and inject update records into
31/// specified table.
32///
33/// It corresponds to the `UPDATE` statements in SQL.
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct LogicalUpdate {
36    pub base: PlanBase<Logical>,
37    core: generic::Update<PlanRef>,
38}
39
40impl From<generic::Update<PlanRef>> for LogicalUpdate {
41    fn from(core: generic::Update<PlanRef>) -> Self {
42        let base = PlanBase::new_logical_with_core(&core);
43        Self { base, core }
44    }
45}
46
47impl PlanTreeNodeUnary<Logical> for LogicalUpdate {
48    fn input(&self) -> PlanRef {
49        self.core.input.clone()
50    }
51
52    fn clone_with_input(&self, input: PlanRef) -> Self {
53        let mut core = self.core.clone();
54        core.input = input;
55        core.into()
56    }
57}
58
59impl_plan_tree_node_for_unary! { Logical, LogicalUpdate }
60impl_distill_by_unit!(LogicalUpdate, core, "LogicalUpdate");
61
62impl ExprRewritable<Logical> for LogicalUpdate {
63    fn has_rewritable_expr(&self) -> bool {
64        true
65    }
66
67    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
68        let mut core = self.core.clone();
69        core.rewrite_exprs(r);
70        Self::from(core).into()
71    }
72}
73
74impl ExprVisitable for LogicalUpdate {
75    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
76        self.core.visit_exprs(v);
77    }
78}
79
80impl ColPrunable for LogicalUpdate {
81    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
82        let pruned_input = {
83            let input = &self.core.input;
84            let required_cols: Vec<_> = (0..input.schema().len()).collect();
85            input.prune_col(&required_cols, ctx)
86        };
87
88        // No pruning.
89        LogicalProject::with_out_col_idx(
90            self.clone_with_input(pruned_input).into(),
91            required_cols.iter().copied(),
92        )
93        .into()
94    }
95}
96
97impl PredicatePushdown for LogicalUpdate {
98    fn predicate_pushdown(
99        &self,
100        predicate: Condition,
101        ctx: &mut PredicatePushdownContext,
102    ) -> PlanRef {
103        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
104    }
105}
106
107impl ToBatch for LogicalUpdate {
108    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
109        let new_input = self.input().to_batch()?;
110        let core = generic::Update {
111            table_name: self.core.table_name.clone(),
112            table_id: self.core.table_id,
113            table_version_id: self.core.table_version_id,
114            input: new_input,
115            old_exprs: self.core.old_exprs.clone(),
116            new_exprs: self.core.new_exprs.clone(),
117            returning: self.core.returning,
118        };
119        Ok(BatchUpdate::new(core, self.schema().clone()).into())
120    }
121}
122
123impl ToStream for LogicalUpdate {
124    fn to_stream(
125        &self,
126        _ctx: &mut ToStreamContext,
127    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
128        unreachable!("update should always be converted to batch plan");
129    }
130
131    fn logical_rewrite_for_stream(
132        &self,
133        _ctx: &mut RewriteStreamContext,
134    ) -> Result<(PlanRef, ColIndexMapping)> {
135        unreachable!("update should always be converted to batch plan");
136    }
137}