risingwave_frontend/optimizer/plan_node/
logical_update.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::generic::GenericPlanRef;
16use super::utils::impl_distill_by_unit;
17use super::{
18    BatchUpdate, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
19    PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
20};
21use crate::error::Result;
22use crate::expr::{ExprRewriter, ExprVisitor};
23use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
24use crate::optimizer::plan_node::{
25    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
26};
27use crate::utils::{ColIndexMapping, Condition};
28
29/// [`LogicalUpdate`] iterates on input relation, set some columns, and inject update records into
30/// specified table.
31///
32/// It corresponds to the `UPDATE` statements in SQL.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct LogicalUpdate {
35    pub base: PlanBase<Logical>,
36    core: generic::Update<PlanRef>,
37}
38
39impl From<generic::Update<PlanRef>> for LogicalUpdate {
40    fn from(core: generic::Update<PlanRef>) -> Self {
41        let base = PlanBase::new_logical_with_core(&core);
42        Self { base, core }
43    }
44}
45
46impl PlanTreeNodeUnary for LogicalUpdate {
47    fn input(&self) -> PlanRef {
48        self.core.input.clone()
49    }
50
51    fn clone_with_input(&self, input: PlanRef) -> Self {
52        let mut core = self.core.clone();
53        core.input = input;
54        core.into()
55    }
56}
57
58impl_plan_tree_node_for_unary! { LogicalUpdate }
59impl_distill_by_unit!(LogicalUpdate, core, "LogicalUpdate");
60
61impl ExprRewritable for LogicalUpdate {
62    fn has_rewritable_expr(&self) -> bool {
63        true
64    }
65
66    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
67        let mut core = self.core.clone();
68        core.rewrite_exprs(r);
69        Self::from(core).into()
70    }
71}
72
73impl ExprVisitable for LogicalUpdate {
74    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
75        self.core.visit_exprs(v);
76    }
77}
78
79impl ColPrunable for LogicalUpdate {
80    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
81        let pruned_input = {
82            let input = &self.core.input;
83            let required_cols: Vec<_> = (0..input.schema().len()).collect();
84            input.prune_col(&required_cols, ctx)
85        };
86
87        // No pruning.
88        LogicalProject::with_out_col_idx(
89            self.clone_with_input(pruned_input).into(),
90            required_cols.iter().copied(),
91        )
92        .into()
93    }
94}
95
96impl PredicatePushdown for LogicalUpdate {
97    fn predicate_pushdown(
98        &self,
99        predicate: Condition,
100        ctx: &mut PredicatePushdownContext,
101    ) -> PlanRef {
102        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
103    }
104}
105
106impl ToBatch for LogicalUpdate {
107    fn to_batch(&self) -> Result<PlanRef> {
108        let new_input = self.input().to_batch()?;
109        let mut new_logical = self.core.clone();
110        new_logical.input = new_input;
111        Ok(BatchUpdate::new(new_logical, self.schema().clone()).into())
112    }
113}
114
115impl ToStream for LogicalUpdate {
116    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
117        unreachable!("update should always be converted to batch plan");
118    }
119
120    fn logical_rewrite_for_stream(
121        &self,
122        _ctx: &mut RewriteStreamContext,
123    ) -> Result<(PlanRef, ColIndexMapping)> {
124        unreachable!("update should always be converted to batch plan");
125    }
126}