risingwave_common/catalog/
schema.rs1use std::ops::Index;
16
17use educe::Educe;
18use risingwave_pb::plan_common::{PbColumnDesc, PbField};
19
20use super::ColumnDesc;
21use crate::array::ArrayBuilderImpl;
22use crate::types::{DataType, StructType};
23use crate::util::iter_util::ZipEqFast;
24
25#[derive(Clone, Educe)]
26#[educe(PartialEq, Eq, Hash)]
27pub struct Field {
28 pub data_type: DataType,
29 #[educe(PartialEq(ignore))]
30 #[educe(Hash(ignore))]
31 pub name: String,
32}
33
34impl Field {
35 pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
36 Self {
37 data_type,
38 name: name.into(),
39 }
40 }
41}
42
43impl std::fmt::Debug for Field {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "{}:{:?}", self.name, self.data_type)
46 }
47}
48
49impl Field {
50 pub fn to_prost(&self) -> PbField {
51 PbField {
52 data_type: Some(self.data_type.to_protobuf()),
53 name: self.name.clone(),
54 }
55 }
56
57 pub fn from_prost(pb: &PbField) -> Self {
58 Field {
59 data_type: DataType::from(pb.data_type.as_ref().unwrap()),
60 name: pb.name.clone(),
61 }
62 }
63}
64
65impl From<&ColumnDesc> for Field {
66 fn from(desc: &ColumnDesc) -> Self {
67 Self {
68 data_type: desc.data_type.clone(),
69 name: desc.name.clone(),
70 }
71 }
72}
73
74impl From<ColumnDesc> for Field {
75 fn from(column_desc: ColumnDesc) -> Self {
76 Self {
77 data_type: column_desc.data_type,
78 name: column_desc.name,
79 }
80 }
81}
82
83impl From<&PbColumnDesc> for Field {
84 fn from(pb_column_desc: &PbColumnDesc) -> Self {
85 Self {
86 data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
87 name: pb_column_desc.name.clone(),
88 }
89 }
90}
91
92#[auto_impl::auto_impl(&)]
94pub trait FieldLike {
95 fn data_type(&self) -> &DataType;
96 fn name(&self) -> &str;
97}
98
99impl FieldLike for Field {
100 fn data_type(&self) -> &DataType {
101 &self.data_type
102 }
103
104 fn name(&self) -> &str {
105 &self.name
106 }
107}
108
109pub struct FieldDisplay<'a>(pub &'a Field);
110
111impl std::fmt::Debug for FieldDisplay<'_> {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "{}", self.0.name)
114 }
115}
116
117impl std::fmt::Display for FieldDisplay<'_> {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{}", self.0.name)
120 }
121}
122
123#[macro_export]
125macro_rules! schema_unnamed {
126 ($($t:expr),*) => {{
127 $crate::catalog::Schema {
128 fields: vec![
129 $( $crate::catalog::Field::unnamed($t) ),*
130 ],
131 }
132 }};
133}
134
135#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
137pub struct Schema {
138 pub fields: Vec<Field>,
139}
140
141impl Schema {
142 pub fn empty() -> &'static Self {
143 static EMPTY: Schema = Schema { fields: Vec::new() };
144 &EMPTY
145 }
146
147 pub fn len(&self) -> usize {
148 self.fields.len()
149 }
150
151 pub fn is_empty(&self) -> bool {
152 self.fields.is_empty()
153 }
154
155 pub fn new(fields: Vec<Field>) -> Self {
156 Self { fields }
157 }
158
159 pub fn names(&self) -> Vec<String> {
160 self.fields().iter().map(|f| f.name.clone()).collect()
161 }
162
163 pub fn names_str(&self) -> Vec<&str> {
164 self.fields().iter().map(|f| f.name.as_str()).collect()
165 }
166
167 pub fn data_types(&self) -> Vec<DataType> {
168 self.fields
169 .iter()
170 .map(|field| field.data_type.clone())
171 .collect()
172 }
173
174 pub fn fields(&self) -> &[Field] {
175 &self.fields
176 }
177
178 pub fn into_fields(self) -> Vec<Field> {
179 self.fields
180 }
181
182 pub fn create_array_builders(&self, capacity: usize) -> Vec<ArrayBuilderImpl> {
184 self.fields
185 .iter()
186 .map(|field| field.data_type.create_array_builder(capacity))
187 .collect()
188 }
189
190 pub fn to_prost(&self) -> Vec<PbField> {
191 self.fields
192 .clone()
193 .into_iter()
194 .map(|field| field.to_prost())
195 .collect()
196 }
197
198 pub fn type_eq(&self, other: &Self) -> bool {
199 if self.len() != other.len() {
200 return false;
201 }
202
203 for (a, b) in self.fields.iter().zip_eq_fast(other.fields.iter()) {
204 if !a.data_type.equals_datatype(&b.data_type) {
205 return false;
206 }
207 }
208
209 true
210 }
211
212 pub fn all_type_eq<'a>(inputs: impl IntoIterator<Item = &'a Self>) -> bool {
213 let mut iter = inputs.into_iter();
214 if let Some(first) = iter.next() {
215 iter.all(|x| x.type_eq(first))
216 } else {
217 true
218 }
219 }
220
221 pub fn formatted_col_names(&self) -> String {
222 self.fields
223 .iter()
224 .map(|f| format!("\"{}\"", &f.name))
225 .collect::<Vec<_>>()
226 .join(", ")
227 }
228}
229
230impl Field {
231 pub fn with_name<S>(data_type: DataType, name: S) -> Self
233 where
234 S: Into<String>,
235 {
236 Self {
237 data_type,
238 name: name.into(),
239 }
240 }
241
242 pub fn unnamed(data_type: DataType) -> Self {
243 Self {
244 data_type,
245 name: String::new(),
246 }
247 }
248
249 pub fn data_type(&self) -> DataType {
250 self.data_type.clone()
251 }
252
253 pub fn from_with_table_name_prefix(desc: &ColumnDesc, table_name: &str) -> Self {
254 Self {
255 data_type: desc.data_type.clone(),
256 name: format!("{}.{}", table_name, desc.name),
257 }
258 }
259}
260
261impl From<&PbField> for Field {
262 fn from(prost_field: &PbField) -> Self {
263 Self {
264 data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
265 name: prost_field.get_name().clone(),
266 }
267 }
268}
269
270impl Index<usize> for Schema {
271 type Output = Field;
272
273 fn index(&self, index: usize) -> &Self::Output {
274 &self.fields[index]
275 }
276}
277
278impl FromIterator<Field> for Schema {
279 fn from_iter<I: IntoIterator<Item = Field>>(iter: I) -> Self {
280 Schema {
281 fields: iter.into_iter().collect::<Vec<_>>(),
282 }
283 }
284}
285
286impl From<&StructType> for Schema {
287 fn from(t: &StructType) -> Self {
288 Schema::new(
289 t.iter()
290 .map(|(s, d)| Field::with_name(d.clone(), s))
291 .collect(),
292 )
293 }
294}
295
296pub mod test_utils {
297 use super::*;
298
299 pub fn field_n<const N: usize>(data_type: DataType) -> Schema {
300 Schema::new(vec![Field::unnamed(data_type); N])
301 }
302
303 fn int32_n<const N: usize>() -> Schema {
304 field_n::<N>(DataType::Int32)
305 }
306
307 pub fn ii() -> Schema {
309 int32_n::<2>()
310 }
311
312 pub fn iii() -> Schema {
314 int32_n::<3>()
315 }
316
317 fn varchar_n<const N: usize>() -> Schema {
318 field_n::<N>(DataType::Varchar)
319 }
320
321 pub fn sss() -> Schema {
323 varchar_n::<3>()
324 }
325
326 fn decimal_n<const N: usize>() -> Schema {
327 field_n::<N>(DataType::Decimal)
328 }
329
330 pub fn ddd() -> Schema {
332 decimal_n::<3>()
333 }
334}