risingwave_frontend/optimizer/plan_node/generic/
insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hash;
16
17use educe::Educe;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{ColumnCatalog, Field, Schema, TableVersionId};
20use risingwave_common::types::DataType;
21
22use super::{GenericPlanNode, GenericPlanRef};
23use crate::OptimizerContextRef;
24use crate::catalog::TableId;
25use crate::expr::ExprImpl;
26use crate::optimizer::property::FunctionalDependencySet;
27
28#[derive(Debug, Clone, Educe)]
29#[educe(PartialEq, Eq, Hash)]
30pub struct Insert<PlanRef: Eq + Hash> {
31    #[educe(PartialEq(ignore))]
32    #[educe(Hash(ignore))]
33    pub table_name: String, // explain-only
34    pub table_id: TableId,
35    pub table_version_id: TableVersionId,
36    pub table_visible_columns: Vec<ColumnCatalog>,
37    pub input: PlanRef,
38    pub column_indices: Vec<usize>, // columns in which to insert
39    pub default_columns: Vec<(usize, ExprImpl)>, // columns to be set to default
40    pub row_id_index: Option<usize>,
41    pub returning: bool,
42}
43
44impl<PlanRef: GenericPlanRef> GenericPlanNode for Insert<PlanRef> {
45    fn functional_dependency(&self) -> FunctionalDependencySet {
46        FunctionalDependencySet::new(self.output_len())
47    }
48
49    fn schema(&self) -> Schema {
50        if self.returning {
51            // We cannot directly use `self.input.schema()` here since it may omit some columns that
52            // will be filled with default values.
53            Schema::new(
54                self.table_visible_columns
55                    .iter()
56                    .map(|c| Field::from(&c.column_desc))
57                    .collect(),
58            )
59        } else {
60            Schema::new(vec![Field::unnamed(DataType::Int64)])
61        }
62    }
63
64    fn stream_key(&self) -> Option<Vec<usize>> {
65        None
66    }
67
68    fn ctx(&self) -> OptimizerContextRef {
69        self.input.ctx()
70    }
71}
72
73impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
74    pub fn clone_with_input<OtherPlanRef: Eq + Hash>(
75        &self,
76        input: OtherPlanRef,
77    ) -> Insert<OtherPlanRef> {
78        Insert {
79            table_name: self.table_name.clone(),
80            table_id: self.table_id,
81            table_version_id: self.table_version_id,
82            table_visible_columns: self.table_visible_columns.clone(),
83            input,
84            column_indices: self.column_indices.clone(),
85            default_columns: self.default_columns.clone(),
86            row_id_index: self.row_id_index,
87            returning: self.returning,
88        }
89    }
90
91    pub fn output_len(&self) -> usize {
92        if self.returning {
93            self.table_visible_columns.len()
94        } else {
95            1
96        }
97    }
98
99    pub fn fields_pretty<'a>(&self, verbose: bool) -> StrAssocArr<'a> {
100        let mut capacity = 1;
101        if self.returning {
102            capacity += 1;
103        }
104        if verbose {
105            capacity += 1;
106            if !self.default_columns.is_empty() {
107                capacity += 1;
108            }
109        }
110        let mut vec = Vec::with_capacity(capacity);
111        vec.push(("table", Pretty::from(self.table_name.clone())));
112        if self.returning {
113            vec.push(("returning", Pretty::debug(&true)));
114        }
115        if verbose {
116            let collect = (self.column_indices.iter().enumerate())
117                .map(|(k, v)| Pretty::from(format!("{}:{}", k, v)))
118                .collect();
119            vec.push(("mapping", Pretty::Array(collect)));
120            if !self.default_columns.is_empty() {
121                let collect = self
122                    .default_columns
123                    .iter()
124                    .map(|(k, v)| Pretty::from(format!("{}<-{:?}", k, v)))
125                    .collect();
126                vec.push(("default", Pretty::Array(collect)));
127            }
128        }
129        vec
130    }
131}
132
133impl<PlanRef: Eq + Hash> Insert<PlanRef> {
134    /// Create a [`Insert`] node. Used internally by optimizer.
135    #[allow(clippy::too_many_arguments)]
136    pub fn new(
137        input: PlanRef,
138        table_name: String,
139        table_id: TableId,
140        table_version_id: TableVersionId,
141        table_visible_columns: Vec<ColumnCatalog>,
142        column_indices: Vec<usize>,
143        default_columns: Vec<(usize, ExprImpl)>,
144        row_id_index: Option<usize>,
145        returning: bool,
146    ) -> Self {
147        Self {
148            table_name,
149            table_id,
150            table_version_id,
151            table_visible_columns,
152            input,
153            column_indices,
154            default_columns,
155            row_id_index,
156            returning,
157        }
158    }
159}