risingwave_common/catalog/
schema.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Index;
16
17use risingwave_pb::plan_common::{PbColumnDesc, PbField};
18
19use super::ColumnDesc;
20use crate::array::ArrayBuilderImpl;
21use crate::types::{DataType, StructType};
22use crate::util::iter_util::ZipEqFast;
23
24/// The field in the schema of the executor's return data
25#[derive(Clone, PartialEq, Eq, Hash)]
26pub struct Field {
27    pub data_type: DataType,
28    pub name: String,
29}
30
31impl Field {
32    pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
33        Self {
34            data_type,
35            name: name.into(),
36        }
37    }
38}
39
40impl std::fmt::Debug for Field {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}:{:?}", self.name, self.data_type)
43    }
44}
45
46impl Field {
47    pub fn to_prost(&self) -> PbField {
48        PbField {
49            data_type: Some(self.data_type.to_protobuf()),
50            name: self.name.clone(),
51        }
52    }
53}
54
55impl From<&ColumnDesc> for Field {
56    fn from(desc: &ColumnDesc) -> Self {
57        Self {
58            data_type: desc.data_type.clone(),
59            name: desc.name.clone(),
60        }
61    }
62}
63
64impl From<ColumnDesc> for Field {
65    fn from(column_desc: ColumnDesc) -> Self {
66        Self {
67            data_type: column_desc.data_type,
68            name: column_desc.name,
69        }
70    }
71}
72
73impl From<&PbColumnDesc> for Field {
74    fn from(pb_column_desc: &PbColumnDesc) -> Self {
75        Self {
76            data_type: pb_column_desc.column_type.as_ref().unwrap().into(),
77            name: pb_column_desc.name.clone(),
78        }
79    }
80}
81
82/// Something that has a data type and a name.
83#[auto_impl::auto_impl(&)]
84pub trait FieldLike {
85    fn data_type(&self) -> &DataType;
86    fn name(&self) -> &str;
87}
88
89impl FieldLike for Field {
90    fn data_type(&self) -> &DataType {
91        &self.data_type
92    }
93
94    fn name(&self) -> &str {
95        &self.name
96    }
97}
98
99pub struct FieldDisplay<'a>(pub &'a Field);
100
101impl std::fmt::Debug for FieldDisplay<'_> {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "{}", self.0.name)
104    }
105}
106
107impl std::fmt::Display for FieldDisplay<'_> {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        write!(f, "{}", self.0.name)
110    }
111}
112
113/// `schema_unnamed` builds a `Schema` with the given types, but without names.
114#[macro_export]
115macro_rules! schema_unnamed {
116    ($($t:expr),*) => {{
117        $crate::catalog::Schema {
118            fields: vec![
119                $( $crate::catalog::Field::unnamed($t) ),*
120            ],
121        }
122    }};
123}
124
125/// the schema of the executor's return data
126#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
127pub struct Schema {
128    pub fields: Vec<Field>,
129}
130
131impl Schema {
132    pub fn empty() -> &'static Self {
133        static EMPTY: Schema = Schema { fields: Vec::new() };
134        &EMPTY
135    }
136
137    pub fn len(&self) -> usize {
138        self.fields.len()
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.fields.is_empty()
143    }
144
145    pub fn new(fields: Vec<Field>) -> Self {
146        Self { fields }
147    }
148
149    pub fn names(&self) -> Vec<String> {
150        self.fields().iter().map(|f| f.name.clone()).collect()
151    }
152
153    pub fn names_str(&self) -> Vec<&str> {
154        self.fields().iter().map(|f| f.name.as_str()).collect()
155    }
156
157    pub fn data_types(&self) -> Vec<DataType> {
158        self.fields
159            .iter()
160            .map(|field| field.data_type.clone())
161            .collect()
162    }
163
164    pub fn fields(&self) -> &[Field] {
165        &self.fields
166    }
167
168    pub fn into_fields(self) -> Vec<Field> {
169        self.fields
170    }
171
172    /// Create array builders for all fields in this schema.
173    pub fn create_array_builders(&self, capacity: usize) -> Vec<ArrayBuilderImpl> {
174        self.fields
175            .iter()
176            .map(|field| field.data_type.create_array_builder(capacity))
177            .collect()
178    }
179
180    pub fn to_prost(&self) -> Vec<PbField> {
181        self.fields
182            .clone()
183            .into_iter()
184            .map(|field| field.to_prost())
185            .collect()
186    }
187
188    pub fn type_eq(&self, other: &Self) -> bool {
189        if self.len() != other.len() {
190            return false;
191        }
192
193        for (a, b) in self.fields.iter().zip_eq_fast(other.fields.iter()) {
194            if a.data_type != b.data_type {
195                return false;
196            }
197        }
198
199        true
200    }
201
202    pub fn all_type_eq<'a>(inputs: impl IntoIterator<Item = &'a Self>) -> bool {
203        let mut iter = inputs.into_iter();
204        if let Some(first) = iter.next() {
205            iter.all(|x| x.type_eq(first))
206        } else {
207            true
208        }
209    }
210
211    pub fn formatted_col_names(&self) -> String {
212        self.fields
213            .iter()
214            .map(|f| format!("\"{}\"", &f.name))
215            .collect::<Vec<_>>()
216            .join(", ")
217    }
218}
219
220impl Field {
221    // TODO: rename to `new`
222    pub fn with_name<S>(data_type: DataType, name: S) -> Self
223    where
224        S: Into<String>,
225    {
226        Self {
227            data_type,
228            name: name.into(),
229        }
230    }
231
232    pub fn unnamed(data_type: DataType) -> Self {
233        Self {
234            data_type,
235            name: String::new(),
236        }
237    }
238
239    pub fn data_type(&self) -> DataType {
240        self.data_type.clone()
241    }
242
243    pub fn from_with_table_name_prefix(desc: &ColumnDesc, table_name: &str) -> Self {
244        Self {
245            data_type: desc.data_type.clone(),
246            name: format!("{}.{}", table_name, desc.name),
247        }
248    }
249}
250
251impl From<&PbField> for Field {
252    fn from(prost_field: &PbField) -> Self {
253        Self {
254            data_type: DataType::from(prost_field.get_data_type().expect("data type not found")),
255            name: prost_field.get_name().clone(),
256        }
257    }
258}
259
260impl Index<usize> for Schema {
261    type Output = Field;
262
263    fn index(&self, index: usize) -> &Self::Output {
264        &self.fields[index]
265    }
266}
267
268impl FromIterator<Field> for Schema {
269    fn from_iter<I: IntoIterator<Item = Field>>(iter: I) -> Self {
270        Schema {
271            fields: iter.into_iter().collect::<Vec<_>>(),
272        }
273    }
274}
275
276impl From<&StructType> for Schema {
277    fn from(t: &StructType) -> Self {
278        Schema::new(
279            t.iter()
280                .map(|(s, d)| Field::with_name(d.clone(), s))
281                .collect(),
282        )
283    }
284}
285
286pub mod test_utils {
287    use super::*;
288
289    pub fn field_n<const N: usize>(data_type: DataType) -> Schema {
290        Schema::new(vec![Field::unnamed(data_type); N])
291    }
292
293    fn int32_n<const N: usize>() -> Schema {
294        field_n::<N>(DataType::Int32)
295    }
296
297    /// Create a util schema **for test only** with two int32 fields.
298    pub fn ii() -> Schema {
299        int32_n::<2>()
300    }
301
302    /// Create a util schema **for test only** with three int32 fields.
303    pub fn iii() -> Schema {
304        int32_n::<3>()
305    }
306
307    fn varchar_n<const N: usize>() -> Schema {
308        field_n::<N>(DataType::Varchar)
309    }
310
311    /// Create a util schema **for test only** with three varchar fields.
312    pub fn sss() -> Schema {
313        varchar_n::<3>()
314    }
315
316    fn decimal_n<const N: usize>() -> Schema {
317        field_n::<N>(DataType::Decimal)
318    }
319
320    /// Create a util schema **for test only** with three decimal fields.
321    pub fn ddd() -> Schema {
322        decimal_n::<3>()
323    }
324}