risingwave_frontend/optimizer/plan_node/
logical_insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pretty_xmlish::XmlNode;
16use risingwave_common::catalog::TableVersionId;
17
18use super::generic::GenericPlanRef;
19use super::utils::{Distill, childless_record};
20use super::{
21    BatchInsert, ColPrunable, ExprRewritable, Logical, LogicalProject, PlanBase, PlanRef,
22    PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, gen_filter_and_pushdown, generic,
23};
24use crate::catalog::TableId;
25use crate::error::Result;
26use crate::expr::{ExprImpl, ExprRewriter, ExprVisitor};
27use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
28use crate::optimizer::plan_node::{
29    ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
30};
31use crate::utils::{ColIndexMapping, Condition};
32
33/// `LogicalInsert` iterates on input relation and insert the data into specified table.
34///
35/// It corresponds to the `INSERT` statements in SQL. Especially, for `INSERT ... VALUES`
36/// statements, the input relation would be [`super::LogicalValues`].
37#[derive(Debug, Clone, PartialEq, Eq, Hash)]
38pub struct LogicalInsert {
39    pub base: PlanBase<Logical>,
40    core: generic::Insert<PlanRef>,
41}
42
43impl LogicalInsert {
44    pub fn new(core: generic::Insert<PlanRef>) -> Self {
45        let base = PlanBase::new_logical_with_core(&core);
46        Self { base, core }
47    }
48
49    // Get the column indexes in which to insert to
50    #[must_use]
51    pub fn column_indices(&self) -> Vec<usize> {
52        self.core.column_indices.clone()
53    }
54
55    #[must_use]
56    pub fn default_columns(&self) -> Vec<(usize, ExprImpl)> {
57        self.core.default_columns.clone()
58    }
59
60    #[must_use]
61    pub fn table_id(&self) -> TableId {
62        self.core.table_id
63    }
64
65    #[must_use]
66    pub fn row_id_index(&self) -> Option<usize> {
67        self.core.row_id_index
68    }
69
70    pub fn has_returning(&self) -> bool {
71        self.core.returning
72    }
73
74    pub fn table_version_id(&self) -> TableVersionId {
75        self.core.table_version_id
76    }
77}
78
79impl PlanTreeNodeUnary for LogicalInsert {
80    fn input(&self) -> PlanRef {
81        self.core.input.clone()
82    }
83
84    fn clone_with_input(&self, input: PlanRef) -> Self {
85        let mut core = self.core.clone();
86        core.input = input;
87        Self::new(core)
88    }
89}
90
91impl_plan_tree_node_for_unary! {LogicalInsert}
92
93impl Distill for LogicalInsert {
94    fn distill<'a>(&self) -> XmlNode<'a> {
95        let vec = self
96            .core
97            .fields_pretty(self.base.ctx().is_explain_verbose());
98        childless_record("LogicalInsert", vec)
99    }
100}
101
102impl ColPrunable for LogicalInsert {
103    fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef {
104        let pruned_input = {
105            let input = &self.core.input;
106            let required_cols: Vec<_> = (0..input.schema().len()).collect();
107            input.prune_col(&required_cols, ctx)
108        };
109
110        // No pruning.
111        LogicalProject::with_out_col_idx(
112            self.clone_with_input(pruned_input).into(),
113            required_cols.iter().copied(),
114        )
115        .into()
116    }
117}
118
119impl ExprRewritable for LogicalInsert {
120    fn has_rewritable_expr(&self) -> bool {
121        true
122    }
123
124    fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
125        let mut new = self.clone();
126        new.core.default_columns = new
127            .core
128            .default_columns
129            .into_iter()
130            .map(|(c, e)| (c, r.rewrite_expr(e)))
131            .collect();
132        new.into()
133    }
134}
135
136impl ExprVisitable for LogicalInsert {
137    fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
138        self.core
139            .default_columns
140            .iter()
141            .for_each(|(_, e)| v.visit_expr(e));
142    }
143}
144
145impl PredicatePushdown for LogicalInsert {
146    fn predicate_pushdown(
147        &self,
148        predicate: Condition,
149        ctx: &mut PredicatePushdownContext,
150    ) -> PlanRef {
151        gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx)
152    }
153}
154
155impl ToBatch for LogicalInsert {
156    fn to_batch(&self) -> Result<PlanRef> {
157        let new_input = self.input().to_batch()?;
158        let mut core = self.core.clone();
159        core.input = new_input;
160        Ok(BatchInsert::new(core).into())
161    }
162}
163
164impl ToStream for LogicalInsert {
165    fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
166        unreachable!("insert should always be converted to batch plan");
167    }
168
169    fn logical_rewrite_for_stream(
170        &self,
171        _ctx: &mut RewriteStreamContext,
172    ) -> Result<(PlanRef, ColIndexMapping)> {
173        unreachable!("insert should always be converted to batch plan");
174    }
175}