risingwave_common/catalog/
schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Index;
16
17use risingwave_pb::plan_common::{PbColumnDesc, PbField};
18
19use super::ColumnDesc;
20use crate::array::ArrayBuilderImpl;
21use crate::types::{DataType, StructType};
22use crate::util::iter_util::ZipEqFast;
23
24/// The field in the schema of the executor's return data
25#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct Field {
27    pub data_type: DataType,
28    pub name: String,
29}
30
31impl Field {
32    pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
33        Self {
34            data_type,
35            name: name.into(),
36        }
37    }
38}
39
40impl std::fmt::Debug for Field {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}:{:?}", self.name, self.data_type)
43    }
44}
45
46impl Field {
47    pub fn to_prost(&self) -> PbField {
48        PbField {
49            data_type: Some(self.data_type.to_protobuf()),
50            name: self.name.clone(),
51        }
52    }
53
54    pub fn from_prost(pb: &PbField) -> Self {
55        Field {
56            data_type: DataType::from(pb.data_type.as_ref().unwrap()),
57            name: pb.name.clone(),
58        }
59    }
60}
61
62impl From<&ColumnDesc> for Field {
63    fn from(desc: &ColumnDesc) -> Self {
64        Self {
65            data_type: desc.data_type.clone(),
66            name: desc.name.clone(),
67        }
68    }
69}
70
71impl From<ColumnDesc> for Field {
72    fn from(column_desc: ColumnDesc) -> Self {
73        Self {
74            data_type: column_desc.data_type,
75            name: column_desc.name,
76        }
77    }
78}
79
80impl From<&PbColumnDesc> for Field {
81    fn from(pb_column_desc: &PbColumnDesc) -> Self {
82        Self {
83            data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
84            name: pb_column_desc.name.clone(),
85        }
86    }
87}
88
89/// Something that has a data type and a name.
90#[auto_impl::auto_impl(&)]
91pub trait FieldLike {
92    fn data_type(&self) -> &DataType;
93    fn name(&self) -> &str;
94}
95
96impl FieldLike for Field {
97    fn data_type(&self) -> &DataType {
98        &self.data_type
99    }
100
101    fn name(&self) -> &str {
102        &self.name
103    }
104}
105
106pub struct FieldDisplay<'a>(pub &'a Field);
107
108impl std::fmt::Debug for FieldDisplay<'_> {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "{}", self.0.name)
111    }
112}
113
114impl std::fmt::Display for FieldDisplay<'_> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}", self.0.name)
117    }
118}
119
120/// `schema_unnamed` builds a `Schema` with the given types, but without names.
121#[macro_export]
122macro_rules! schema_unnamed {
123    ($($t:expr),*) => {{
124        $crate::catalog::Schema {
125            fields: vec![
126                $( $crate::catalog::Field::unnamed($t) ),*
127            ],
128        }
129    }};
130}
131
132/// the schema of the executor's return data
133#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
134pub struct Schema {
135    pub fields: Vec<Field>,
136}
137
138impl Schema {
139    pub fn empty() -> &'static Self {
140        static EMPTY: Schema = Schema { fields: Vec::new() };
141        &EMPTY
142    }
143
144    pub fn len(&self) -> usize {
145        self.fields.len()
146    }
147
148    pub fn is_empty(&self) -> bool {
149        self.fields.is_empty()
150    }
151
152    pub fn new(fields: Vec<Field>) -> Self {
153        Self { fields }
154    }
155
156    pub fn names(&self) -> Vec<String> {
157        self.fields().iter().map(|f| f.name.clone()).collect()
158    }
159
160    pub fn names_str(&self) -> Vec<&str> {
161        self.fields().iter().map(|f| f.name.as_str()).collect()
162    }
163
164    pub fn data_types(&self) -> Vec<DataType> {
165        self.fields
166            .iter()
167            .map(|field| field.data_type.clone())
168            .collect()
169    }
170
171    pub fn fields(&self) -> &[Field] {
172        &self.fields
173    }
174
175    pub fn into_fields(self) -> Vec<Field> {
176        self.fields
177    }
178
179    /// Create array builders for all fields in this schema.
180    pub fn create_array_builders(&self, capacity: usize) -> Vec<ArrayBuilderImpl> {
181        self.fields
182            .iter()
183            .map(|field| field.data_type.create_array_builder(capacity))
184            .collect()
185    }
186
187    pub fn to_prost(&self) -> Vec<PbField> {
188        self.fields
189            .clone()
190            .into_iter()
191            .map(|field| field.to_prost())
192            .collect()
193    }
194
195    pub fn type_eq(&self, other: &Self) -> bool {
196        if self.len() != other.len() {
197            return false;
198        }
199
200        for (a, b) in self.fields.iter().zip_eq_fast(other.fields.iter()) {
201            if a.data_type != b.data_type {
202                return false;
203            }
204        }
205
206        true
207    }
208
209    pub fn all_type_eq<'a>(inputs: impl IntoIterator<Item = &'a Self>) -> bool {
210        let mut iter = inputs.into_iter();
211        if let Some(first) = iter.next() {
212            iter.all(|x| x.type_eq(first))
213        } else {
214            true
215        }
216    }
217
218    pub fn formatted_col_names(&self) -> String {
219        self.fields
220            .iter()
221            .map(|f| format!("\"{}\"", &f.name))
222            .collect::<Vec<_>>()
223            .join(", ")
224    }
225}
226
227impl Field {
228    // TODO: rename to `new`
229    pub fn with_name<S>(data_type: DataType, name: S) -> Self
230    where
231        S: Into<String>,
232    {
233        Self {
234            data_type,
235            name: name.into(),
236        }
237    }
238
239    pub fn unnamed(data_type: DataType) -> Self {
240        Self {
241            data_type,
242            name: String::new(),
243        }
244    }
245
246    pub fn data_type(&self) -> DataType {
247        self.data_type.clone()
248    }
249
250    pub fn from_with_table_name_prefix(desc: &ColumnDesc, table_name: &str) -> Self {
251        Self {
252            data_type: desc.data_type.clone(),
253            name: format!("{}.{}", table_name, desc.name),
254        }
255    }
256}
257
258impl From<&PbField> for Field {
259    fn from(prost_field: &PbField) -> Self {
260        Self {
261            data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
262            name: prost_field.get_name().clone(),
263        }
264    }
265}
266
267impl Index<usize> for Schema {
268    type Output = Field;
269
270    fn index(&self, index: usize) -> &Self::Output {
271        &self.fields[index]
272    }
273}
274
275impl FromIterator<Field> for Schema {
276    fn from_iter<I: IntoIterator<Item = Field>>(iter: I) -> Self {
277        Schema {
278            fields: iter.into_iter().collect::<Vec<_>>(),
279        }
280    }
281}
282
283impl From<&StructType> for Schema {
284    fn from(t: &StructType) -> Self {
285        Schema::new(
286            t.iter()
287                .map(|(s, d)| Field::with_name(d.clone(), s))
288                .collect(),
289        )
290    }
291}
292
293pub mod test_utils {
294    use super::*;
295
296    pub fn field_n<const N: usize>(data_type: DataType) -> Schema {
297        Schema::new(vec![Field::unnamed(data_type); N])
298    }
299
300    fn int32_n<const N: usize>() -> Schema {
301        field_n::<N>(DataType::Int32)
302    }
303
304    /// Create a util schema **for test only** with two int32 fields.
305    pub fn ii() -> Schema {
306        int32_n::<2>()
307    }
308
309    /// Create a util schema **for test only** with three int32 fields.
310    pub fn iii() -> Schema {
311        int32_n::<3>()
312    }
313
314    fn varchar_n<const N: usize>() -> Schema {
315        field_n::<N>(DataType::Varchar)
316    }
317
318    /// Create a util schema **for test only** with three varchar fields.
319    pub fn sss() -> Schema {
320        varchar_n::<3>()
321    }
322
323    fn decimal_n<const N: usize>() -> Schema {
324        field_n::<N>(DataType::Decimal)
325    }
326
327    /// Create a util schema **for test only** with three decimal fields.
328    pub fn ddd() -> Schema {
329        decimal_n::<3>()
330    }
331}