risingwave_frontend/optimizer/plan_node/generic/
insert.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::hash::Hash;
16
17use educe::Educe;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{ColumnCatalog, Field, Schema, TableVersionId};
20use risingwave_common::types::DataType;
21
22use super::{GenericPlanNode, GenericPlanRef};
23use crate::OptimizerContextRef;
24use crate::catalog::TableId;
25use crate::expr::ExprImpl;
26use crate::optimizer::property::FunctionalDependencySet;
27
28#[derive(Debug, Clone, Educe)]
29#[educe(PartialEq, Eq, Hash)]
30pub struct Insert<PlanRef: Eq + Hash> {
31    #[educe(PartialEq(ignore))]
32    #[educe(Hash(ignore))]
33    pub table_name: String, // explain-only
34    pub table_id: TableId,
35    pub table_version_id: TableVersionId,
36    pub table_visible_columns: Vec<ColumnCatalog>,
37    pub input: PlanRef,
38    pub column_indices: Vec<usize>, // columns in which to insert
39    pub default_columns: Vec<(usize, ExprImpl)>, // columns to be set to default
40    pub row_id_index: Option<usize>,
41    pub returning: bool,
42}
43
44impl<PlanRef: GenericPlanRef> GenericPlanNode for Insert<PlanRef> {
45    fn functional_dependency(&self) -> FunctionalDependencySet {
46        FunctionalDependencySet::new(self.output_len())
47    }
48
49    fn schema(&self) -> Schema {
50        if self.returning {
51            // We cannot directly use `self.input.schema()` here since it may omit some columns that
52            // will be filled with default values.
53            Schema::new(
54                self.table_visible_columns
55                    .iter()
56                    .map(|c| Field::from(&c.column_desc))
57                    .collect(),
58            )
59        } else {
60            Schema::new(vec![Field::unnamed(DataType::Int64)])
61        }
62    }
63
64    fn stream_key(&self) -> Option<Vec<usize>> {
65        None
66    }
67
68    fn ctx(&self) -> OptimizerContextRef {
69        self.input.ctx()
70    }
71}
72
73impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
74    pub fn output_len(&self) -> usize {
75        if self.returning {
76            self.table_visible_columns.len()
77        } else {
78            1
79        }
80    }
81
82    pub fn fields_pretty<'a>(&self, verbose: bool) -> StrAssocArr<'a> {
83        let mut capacity = 1;
84        if self.returning {
85            capacity += 1;
86        }
87        if verbose {
88            capacity += 1;
89            if !self.default_columns.is_empty() {
90                capacity += 1;
91            }
92        }
93        let mut vec = Vec::with_capacity(capacity);
94        vec.push(("table", Pretty::from(self.table_name.clone())));
95        if self.returning {
96            vec.push(("returning", Pretty::debug(&true)));
97        }
98        if verbose {
99            let collect = (self.column_indices.iter().enumerate())
100                .map(|(k, v)| Pretty::from(format!("{}:{}", k, v)))
101                .collect();
102            vec.push(("mapping", Pretty::Array(collect)));
103            if !self.default_columns.is_empty() {
104                let collect = self
105                    .default_columns
106                    .iter()
107                    .map(|(k, v)| Pretty::from(format!("{}<-{:?}", k, v)))
108                    .collect();
109                vec.push(("default", Pretty::Array(collect)));
110            }
111        }
112        vec
113    }
114}
115
116impl<PlanRef: Eq + Hash> Insert<PlanRef> {
117    /// Create a [`Insert`] node. Used internally by optimizer.
118    #[allow(clippy::too_many_arguments)]
119    pub fn new(
120        input: PlanRef,
121        table_name: String,
122        table_id: TableId,
123        table_version_id: TableVersionId,
124        table_visible_columns: Vec<ColumnCatalog>,
125        column_indices: Vec<usize>,
126        default_columns: Vec<(usize, ExprImpl)>,
127        row_id_index: Option<usize>,
128        returning: bool,
129    ) -> Self {
130        Self {
131            table_name,
132            table_id,
133            table_version_id,
134            table_visible_columns,
135            input,
136            column_indices,
137            default_columns,
138            row_id_index,
139            returning,
140        }
141    }
142}