risingwave_frontend/optimizer/plan_node/generic/
insert.rs1use std::hash::Hash;
16
17use educe::Educe;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{ColumnCatalog, Field, Schema, TableVersionId};
20use risingwave_common::types::DataType;
21
22use super::{GenericPlanNode, GenericPlanRef};
23use crate::OptimizerContextRef;
24use crate::catalog::TableId;
25use crate::expr::ExprImpl;
26use crate::optimizer::property::FunctionalDependencySet;
27
28#[derive(Debug, Clone, Educe)]
29#[educe(PartialEq, Eq, Hash)]
30pub struct Insert<PlanRef: Eq + Hash> {
31 #[educe(PartialEq(ignore))]
32 #[educe(Hash(ignore))]
33 pub table_name: String, pub table_id: TableId,
35 pub table_version_id: TableVersionId,
36 pub table_visible_columns: Vec<ColumnCatalog>,
37 pub input: PlanRef,
38 pub column_indices: Vec<usize>, pub default_columns: Vec<(usize, ExprImpl)>, pub row_id_index: Option<usize>,
41 pub returning: bool,
42}
43
44impl<PlanRef: GenericPlanRef> GenericPlanNode for Insert<PlanRef> {
45 fn functional_dependency(&self) -> FunctionalDependencySet {
46 FunctionalDependencySet::new(self.output_len())
47 }
48
49 fn schema(&self) -> Schema {
50 if self.returning {
51 Schema::new(
54 self.table_visible_columns
55 .iter()
56 .map(|c| Field::from(&c.column_desc))
57 .collect(),
58 )
59 } else {
60 Schema::new(vec![Field::unnamed(DataType::Int64)])
61 }
62 }
63
64 fn stream_key(&self) -> Option<Vec<usize>> {
65 None
66 }
67
68 fn ctx(&self) -> OptimizerContextRef {
69 self.input.ctx()
70 }
71}
72
73impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
74 pub fn clone_with_input<OtherPlanRef: Eq + Hash>(
75 &self,
76 input: OtherPlanRef,
77 ) -> Insert<OtherPlanRef> {
78 Insert {
79 table_name: self.table_name.clone(),
80 table_id: self.table_id,
81 table_version_id: self.table_version_id,
82 table_visible_columns: self.table_visible_columns.clone(),
83 input,
84 column_indices: self.column_indices.clone(),
85 default_columns: self.default_columns.clone(),
86 row_id_index: self.row_id_index,
87 returning: self.returning,
88 }
89 }
90
91 pub fn output_len(&self) -> usize {
92 if self.returning {
93 self.table_visible_columns.len()
94 } else {
95 1
96 }
97 }
98
99 pub fn fields_pretty<'a>(&self, verbose: bool) -> StrAssocArr<'a> {
100 let mut capacity = 1;
101 if self.returning {
102 capacity += 1;
103 }
104 if verbose {
105 capacity += 1;
106 if !self.default_columns.is_empty() {
107 capacity += 1;
108 }
109 }
110 let mut vec = Vec::with_capacity(capacity);
111 vec.push(("table", Pretty::from(self.table_name.clone())));
112 if self.returning {
113 vec.push(("returning", Pretty::debug(&true)));
114 }
115 if verbose {
116 let collect = (self.column_indices.iter().enumerate())
117 .map(|(k, v)| Pretty::from(format!("{}:{}", k, v)))
118 .collect();
119 vec.push(("mapping", Pretty::Array(collect)));
120 if !self.default_columns.is_empty() {
121 let collect = self
122 .default_columns
123 .iter()
124 .map(|(k, v)| Pretty::from(format!("{}<-{:?}", k, v)))
125 .collect();
126 vec.push(("default", Pretty::Array(collect)));
127 }
128 }
129 vec
130 }
131}
132
133impl<PlanRef: Eq + Hash> Insert<PlanRef> {
134 #[allow(clippy::too_many_arguments)]
136 pub fn new(
137 input: PlanRef,
138 table_name: String,
139 table_id: TableId,
140 table_version_id: TableVersionId,
141 table_visible_columns: Vec<ColumnCatalog>,
142 column_indices: Vec<usize>,
143 default_columns: Vec<(usize, ExprImpl)>,
144 row_id_index: Option<usize>,
145 returning: bool,
146 ) -> Self {
147 Self {
148 table_name,
149 table_id,
150 table_version_id,
151 table_visible_columns,
152 input,
153 column_indices,
154 default_columns,
155 row_id_index,
156 returning,
157 }
158 }
159}