risingwave_frontend/optimizer/plan_node/generic/
insert.rs1use std::hash::Hash;
16
17use educe::Educe;
18use pretty_xmlish::{Pretty, StrAssocArr};
19use risingwave_common::catalog::{ColumnCatalog, Field, Schema, TableVersionId};
20use risingwave_common::types::DataType;
21
22use super::{GenericPlanNode, GenericPlanRef};
23use crate::OptimizerContextRef;
24use crate::catalog::TableId;
25use crate::expr::ExprImpl;
26use crate::optimizer::property::FunctionalDependencySet;
27
28#[derive(Debug, Clone, Educe)]
29#[educe(PartialEq, Eq, Hash)]
30pub struct Insert<PlanRef: Eq + Hash> {
31 #[educe(PartialEq(ignore))]
32 #[educe(Hash(ignore))]
33 pub table_name: String, pub table_id: TableId,
35 pub table_version_id: TableVersionId,
36 pub table_visible_columns: Vec<ColumnCatalog>,
37 pub input: PlanRef,
38 pub column_indices: Vec<usize>, pub default_columns: Vec<(usize, ExprImpl)>, pub row_id_index: Option<usize>,
41 pub returning: bool,
42}
43
44impl<PlanRef: GenericPlanRef> GenericPlanNode for Insert<PlanRef> {
45 fn functional_dependency(&self) -> FunctionalDependencySet {
46 FunctionalDependencySet::new(self.output_len())
47 }
48
49 fn schema(&self) -> Schema {
50 if self.returning {
51 Schema::new(
54 self.table_visible_columns
55 .iter()
56 .map(|c| Field::from(&c.column_desc))
57 .collect(),
58 )
59 } else {
60 Schema::new(vec![Field::unnamed(DataType::Int64)])
61 }
62 }
63
64 fn stream_key(&self) -> Option<Vec<usize>> {
65 None
66 }
67
68 fn ctx(&self) -> OptimizerContextRef {
69 self.input.ctx()
70 }
71}
72
73impl<PlanRef: GenericPlanRef> Insert<PlanRef> {
74 pub fn output_len(&self) -> usize {
75 if self.returning {
76 self.table_visible_columns.len()
77 } else {
78 1
79 }
80 }
81
82 pub fn fields_pretty<'a>(&self, verbose: bool) -> StrAssocArr<'a> {
83 let mut capacity = 1;
84 if self.returning {
85 capacity += 1;
86 }
87 if verbose {
88 capacity += 1;
89 if !self.default_columns.is_empty() {
90 capacity += 1;
91 }
92 }
93 let mut vec = Vec::with_capacity(capacity);
94 vec.push(("table", Pretty::from(self.table_name.clone())));
95 if self.returning {
96 vec.push(("returning", Pretty::debug(&true)));
97 }
98 if verbose {
99 let collect = (self.column_indices.iter().enumerate())
100 .map(|(k, v)| Pretty::from(format!("{}:{}", k, v)))
101 .collect();
102 vec.push(("mapping", Pretty::Array(collect)));
103 if !self.default_columns.is_empty() {
104 let collect = self
105 .default_columns
106 .iter()
107 .map(|(k, v)| Pretty::from(format!("{}<-{:?}", k, v)))
108 .collect();
109 vec.push(("default", Pretty::Array(collect)));
110 }
111 }
112 vec
113 }
114}
115
116impl<PlanRef: Eq + Hash> Insert<PlanRef> {
117 #[allow(clippy::too_many_arguments)]
119 pub fn new(
120 input: PlanRef,
121 table_name: String,
122 table_id: TableId,
123 table_version_id: TableVersionId,
124 table_visible_columns: Vec<ColumnCatalog>,
125 column_indices: Vec<usize>,
126 default_columns: Vec<(usize, ExprImpl)>,
127 row_id_index: Option<usize>,
128 returning: bool,
129 ) -> Self {
130 Self {
131 table_name,
132 table_id,
133 table_version_id,
134 table_visible_columns,
135 input,
136 column_indices,
137 default_columns,
138 row_id_index,
139 returning,
140 }
141 }
142}