risingwave_frontend/optimizer/plan_node/
logical_insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::catalog::TableVersionId;
17
18use super::generic::GenericPlanRef;
19use super::utils::{Distill, childless_record};
20use super::{
21    BatchInsert, ColPrunable, ExprRewritable, Logical, LogicalPlanRef as PlanRef, LogicalProject,
22    PlanBase, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown,
23    generic,
24};
25use crate::catalog::TableId;
26use crate::error::Result;
27use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
28use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
29use crate::optimizer::plan_node::{
30    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
31};
32use crate::utils::{ColIndexMapping, Condition};
33
34/// `LogicalInsert` iterates on input relation and insert the data into specified table.
35///
36/// It corresponds to the `INSERT` statements in SQL. Especially, for `INSERT ... VALUES`
37/// statements, the input relation would be [`super::LogicalValues`].
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39pub struct LogicalInsert {
40    pub base: PlanBase<Logical>,
41    core: generic::Insert<PlanRef>,
42}
43
44impl LogicalInsert {
45    pub fn new(core: generic::Insert<PlanRef>) -> Self {
46        let base = PlanBase::new_logical_with_core(&core);
47        Self { base, core }
48    }
49
50    // Get the column indexes in which to insert to
51    #[must_use]
52    pub fn column_indices(&self) -> Vec<usize> {
53        self.core.column_indices.clone()
54    }
55
56    #[must_use]
57    pub fn default_columns(&self) -> Vec<(usize, ExprImpl)> {
58        self.core.default_columns.clone()
59    }
60
61    #[must_use]
62    pub fn table_id(&self) -> TableId {
63        self.core.table_id
64    }
65
66    #[must_use]
67    pub fn row_id_index(&self) -> Option<usize> {
68        self.core.row_id_index
69    }
70
71    pub fn has_returning(&self) -> bool {
72        self.core.returning
73    }
74
75    pub fn table_version_id(&self) -> TableVersionId {
76        self.core.table_version_id
77    }
78}
79
80impl PlanTreeNodeUnary<Logical> for LogicalInsert {
81    fn input(&self) -> PlanRef {
82        self.core.input.clone()
83    }
84
85    fn clone_with_input(&self, input: PlanRef) -> Self {
86        let mut core = self.core.clone();
87        core.input = input;
88        Self::new(core)
89    }
90}
91
92impl_plan_tree_node_for_unary! { Logical, LogicalInsert}
93
94impl Distill for LogicalInsert {
95    fn distill<'a>(&self) -> XmlNode<'a> {
96        let vec = self
97            .core
98            .fields_pretty(self.base.ctx().is_explain_verbose());
99        childless_record("LogicalInsert", vec)
100    }
101}
102
103impl ColPrunable for LogicalInsert {
104    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
105        let pruned_input = {
106            let input = &self.core.input;
107            let required_cols: Vec<_> = (0..input.schema().len()).collect();
108            input.prune_col(&required_cols, ctx)
109        };
110
111        // No pruning.
112        LogicalProject::with_out_col_idx(
113            self.clone_with_input(pruned_input).into(),
114            required_cols.iter().copied(),
115        )
116        .into()
117    }
118}
119
120impl ExprRewritable<Logical> for LogicalInsert {
121    fn has_rewritable_expr(&self) -> bool {
122        true
123    }
124
125    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
126        let mut new = self.clone();
127        new.core.default_columns = new
128            .core
129            .default_columns
130            .into_iter()
131            .map(|(c, e)| (c, r.rewrite_expr(e)))
132            .collect();
133        new.into()
134    }
135}
136
137impl ExprVisitable for LogicalInsert {
138    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
139        self.core
140            .default_columns
141            .iter()
142            .for_each(|(_, e)| v.visit_expr(e));
143    }
144}
145
146impl PredicatePushdown for LogicalInsert {
147    fn predicate_pushdown(
148        &self,
149        predicate: Condition,
150        ctx: &mut PredicatePushdownContext,
151    ) -> PlanRef {
152        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
153    }
154}
155
156impl ToBatch for LogicalInsert {
157    fn to_batch(&self) -> Result<crate::optimizer::plan_node::BatchPlanRef> {
158        let new_input = self.input().to_batch()?;
159        let core = self.core.clone_with_input(new_input);
160        Ok(BatchInsert::new(core).into())
161    }
162}
163
164impl ToStream for LogicalInsert {
165    fn to_stream(
166        &self,
167        _ctx: &mut ToStreamContext,
168    ) -> Result<crate::optimizer::plan_node::StreamPlanRef> {
169        unreachable!("insert should always be converted to batch plan");
170    }
171
172    fn logical_rewrite_for_stream(
173        &self,
174        _ctx: &mut RewriteStreamContext,
175    ) -> Result<(PlanRef, ColIndexMapping)> {
176        unreachable!("insert should always be converted to batch plan");
177    }
178}