risingwave_common/types/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Data types in RisingWave.
16
17// NOTE: When adding or modifying data types, remember to update the type matrix in
18// src/expr/macro/src/types.rs
19
20use std::fmt::Debug;
21use std::hash::Hash;
22use std::str::FromStr;
23
24use bytes::{Buf, BufMut, Bytes};
25use chrono::{Datelike, Timelike};
26use itertools::Itertools;
27use parse_display::{Display, FromStr};
28use paste::paste;
29use postgres_types::{FromSql, IsNull, ToSql, Type};
30use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize};
31use risingwave_pb::data::PbDataType;
32use risingwave_pb::data::data_type::PbTypeName;
33use rw_iter_util::ZipEqFast as _;
34use serde::{Deserialize, Serialize, Serializer};
35use strum_macros::EnumDiscriminants;
36use thiserror_ext::AsReport;
37
38use crate::array::{
39    ArrayBuilderImpl, ArrayError, ArrayResult, NULL_VAL_FOR_HASH, PrimitiveArrayItemType,
40};
41// Complex type's value is based on the array
42pub use crate::array::{
43    ListRef, ListValue, MapRef, MapValue, StructRef, StructValue, VectorRef, VectorVal,
44};
45use crate::cast::{str_to_bool, str_to_bytea};
46use crate::catalog::ColumnId;
47use crate::error::BoxedError;
48use crate::{
49    dispatch_data_types, dispatch_scalar_ref_variants, dispatch_scalar_variants, for_all_variants,
50};
51
52mod cow;
53mod datetime;
54mod decimal;
55mod fields;
56mod from_sql;
57mod interval;
58mod jsonb;
59mod list_type;
60mod macros;
61mod map_type;
62mod native_type;
63mod num256;
64mod ops;
65mod ordered;
66mod ordered_float;
67pub mod postgres_type;
68mod scalar_impl;
69mod sentinel;
70mod serial;
71mod struct_type;
72mod successor;
73mod timestamptz;
74mod to_binary;
75mod to_sql;
76mod to_text;
77mod with_data_type;
78
79pub use fields::Fields;
80pub use risingwave_fields_derive::Fields;
81use risingwave_pb::id::TypedId;
82
83pub use self::cow::DatumCow;
84pub use self::datetime::{Date, Time, Timestamp};
85pub use self::decimal::{Decimal, PowError as DecimalPowError};
86pub use self::interval::{DateTimeField, Interval, IntervalDisplay, test_utils};
87pub use self::jsonb::{JsonbRef, JsonbVal};
88pub use self::list_type::ListType;
89pub use self::map_type::MapType;
90pub use self::native_type::*;
91pub use self::num256::{Int256, Int256Ref};
92pub use self::ops::{CheckedAdd, IsNegative};
93pub use self::ordered::*;
94pub use self::ordered_float::{FloatExt, IntoOrdered};
95pub use self::scalar_impl::*;
96pub use self::sentinel::Sentinelled;
97pub use self::serial::Serial;
98pub use self::struct_type::StructType;
99pub use self::successor::Successor;
100pub use self::timestamptz::*;
101pub use self::to_text::ToText;
102pub use self::with_data_type::WithDataType;
103
104/// A 32-bit floating point type with total order.
105pub type F32 = ordered_float::OrderedFloat<f32>;
106
107/// A 64-bit floating point type with total order.
108pub type F64 = ordered_float::OrderedFloat<f64>;
109
110pub const DEBEZIUM_UNAVAILABLE_VALUE: &str = "__debezium_unavailable_value";
111
112// Pre-built JSON value for Debezium unavailable value to avoid rebuilding it every time
113pub static DEBEZIUM_UNAVAILABLE_JSON: std::sync::LazyLock<JsonbVal> =
114    std::sync::LazyLock::new(|| {
115        let mut builder = jsonbb::Builder::default();
116        builder.add_string(DEBEZIUM_UNAVAILABLE_VALUE);
117        JsonbVal(builder.finish())
118    });
119
120/// Magic per-element value used to build the Debezium unchanged-TOAST sentinel for
121/// pgvector columns. Picked because normal embeddings sit in a normalised range and
122/// having all elements simultaneously equal to `f32::MAX` is effectively impossible.
123pub const DEBEZIUM_UNAVAILABLE_VECTOR_ELEM: f32 = f32::MAX;
124
125/// Build a sentinel `VectorVal` of the given dimension to represent Debezium's
126/// unchanged-TOAST placeholder. The dimension must match the column's declared
127/// `vector(N)` size so it passes `check_datum_type` on the way through the
128/// `SourceStreamChunkBuilder`; the materialize executor recognises this sentinel
129/// by checking that every element equals `DEBEZIUM_UNAVAILABLE_VECTOR_ELEM`.
130pub fn debezium_unavailable_vector(size: usize) -> VectorVal {
131    VectorVal::from(
132        (0..size)
133            .map(|_| {
134                crate::array::Finite32::try_from(DEBEZIUM_UNAVAILABLE_VECTOR_ELEM)
135                    .expect("f32::MAX is finite")
136            })
137            .collect::<Vec<_>>(),
138    )
139}
140
141/// The set of datatypes that are supported in RisingWave.
142///
143/// # Trait implementations
144///
145/// - `EnumDiscriminants` generates [`DataTypeName`] enum with the same variants,
146///   but without data fields.
147/// - `FromStr` is only used internally for tests.
148///   The generated implementation isn't efficient, and doesn't handle whitespaces, etc.
149#[derive(Debug, Display, Clone, PartialEq, Eq, Hash, EnumDiscriminants, FromStr)]
150#[strum_discriminants(derive(Hash, Ord, PartialOrd))]
151#[strum_discriminants(name(DataTypeName))]
152#[strum_discriminants(vis(pub))]
153#[cfg_attr(test, strum_discriminants(derive(strum_macros::EnumIter)))]
154pub enum DataType {
155    #[display("boolean")]
156    #[from_str(regex = "(?i)^bool$|^boolean$")]
157    Boolean,
158    #[display("smallint")]
159    #[from_str(regex = "(?i)^smallint$|^int2$")]
160    Int16,
161    #[display("integer")]
162    #[from_str(regex = "(?i)^integer$|^int$|^int4$")]
163    Int32,
164    #[display("bigint")]
165    #[from_str(regex = "(?i)^bigint$|^int8$")]
166    Int64,
167    #[display("real")]
168    #[from_str(regex = "(?i)^real$|^float4$")]
169    Float32,
170    #[display("double precision")]
171    #[from_str(regex = "(?i)^double precision$|^float8$")]
172    Float64,
173    #[display("numeric")]
174    #[from_str(regex = "(?i)^numeric$|^decimal$")]
175    Decimal,
176    #[display("date")]
177    #[from_str(regex = "(?i)^date$")]
178    Date,
179    #[display("character varying")]
180    #[from_str(regex = "(?i)^character varying$|^varchar$")]
181    Varchar,
182    #[display("time without time zone")]
183    #[from_str(regex = "(?i)^time$|^time without time zone$")]
184    Time,
185    #[display("timestamp without time zone")]
186    #[from_str(regex = "(?i)^timestamp$|^timestamp without time zone$")]
187    Timestamp,
188    #[display("timestamp with time zone")]
189    #[from_str(regex = "(?i)^timestamptz$|^timestamp with time zone$")]
190    Timestamptz,
191    #[display("interval")]
192    #[from_str(regex = "(?i)^interval$")]
193    Interval,
194    #[display("{0}")]
195    #[from_str(regex = "(?i)^(?P<0>.+)$")]
196    Struct(StructType),
197    #[display("{0}")]
198    #[from_str(regex = "(?i)^(?P<0>.+)$")]
199    List(ListType),
200    #[display("bytea")]
201    #[from_str(regex = "(?i)^bytea$")]
202    Bytea,
203    #[display("jsonb")]
204    #[from_str(regex = "(?i)^jsonb$")]
205    Jsonb,
206    #[display("serial")]
207    #[from_str(regex = "(?i)^serial$")]
208    Serial,
209    #[display("rw_int256")]
210    #[from_str(regex = "(?i)^rw_int256$")]
211    Int256,
212    #[display("{0}")]
213    #[from_str(regex = "(?i)^(?P<0>.+)$")]
214    Map(MapType),
215    #[display("vector({0})")]
216    #[from_str(regex = "(?i)^vector\\((?P<0>.+)\\)$")]
217    Vector(usize),
218}
219
220impl !PartialOrd for DataType {}
221
222impl ZeroHeapSize for DataType {}
223
224impl TryFrom<DataTypeName> for DataType {
225    type Error = &'static str;
226
227    fn try_from(type_name: DataTypeName) -> Result<Self, Self::Error> {
228        match type_name {
229            DataTypeName::Boolean => Ok(DataType::Boolean),
230            DataTypeName::Int16 => Ok(DataType::Int16),
231            DataTypeName::Int32 => Ok(DataType::Int32),
232            DataTypeName::Int64 => Ok(DataType::Int64),
233            DataTypeName::Int256 => Ok(DataType::Int256),
234            DataTypeName::Serial => Ok(DataType::Serial),
235            DataTypeName::Decimal => Ok(DataType::Decimal),
236            DataTypeName::Float32 => Ok(DataType::Float32),
237            DataTypeName::Float64 => Ok(DataType::Float64),
238            DataTypeName::Varchar => Ok(DataType::Varchar),
239            DataTypeName::Bytea => Ok(DataType::Bytea),
240            DataTypeName::Date => Ok(DataType::Date),
241            DataTypeName::Timestamp => Ok(DataType::Timestamp),
242            DataTypeName::Timestamptz => Ok(DataType::Timestamptz),
243            DataTypeName::Time => Ok(DataType::Time),
244            DataTypeName::Interval => Ok(DataType::Interval),
245            DataTypeName::Jsonb => Ok(DataType::Jsonb),
246            DataTypeName::Struct
247            | DataTypeName::List
248            | DataTypeName::Map
249            | DataTypeName::Vector => Err(
250                "Functions returning parameterized types can not be inferred. Please use `FunctionCall::new_unchecked`.",
251            ),
252        }
253    }
254}
255
256impl From<&PbDataType> for DataType {
257    fn from(proto: &PbDataType) -> DataType {
258        match proto.get_type_name().expect("missing type field") {
259            PbTypeName::TypeUnspecified => unreachable!(),
260            PbTypeName::Int16 => DataType::Int16,
261            PbTypeName::Int32 => DataType::Int32,
262            PbTypeName::Int64 => DataType::Int64,
263            PbTypeName::Serial => DataType::Serial,
264            PbTypeName::Float => DataType::Float32,
265            PbTypeName::Double => DataType::Float64,
266            PbTypeName::Boolean => DataType::Boolean,
267            PbTypeName::Varchar => DataType::Varchar,
268            PbTypeName::Date => DataType::Date,
269            PbTypeName::Time => DataType::Time,
270            PbTypeName::Timestamp => DataType::Timestamp,
271            PbTypeName::Timestamptz => DataType::Timestamptz,
272            PbTypeName::Decimal => DataType::Decimal,
273            PbTypeName::Interval => DataType::Interval,
274            PbTypeName::Bytea => DataType::Bytea,
275            PbTypeName::Jsonb => DataType::Jsonb,
276            PbTypeName::Struct => {
277                let fields: Vec<DataType> = proto.field_type.iter().map(|f| f.into()).collect_vec();
278                let field_names: Vec<String> = proto.field_names.iter().cloned().collect_vec();
279                let field_ids = (proto.field_ids.iter().copied())
280                    .map(ColumnId::new)
281                    .collect_vec();
282
283                let mut struct_type = if proto.field_names.is_empty() {
284                    StructType::unnamed(fields)
285                } else {
286                    StructType::new(field_names.into_iter().zip_eq_fast(fields))
287                };
288                // `field_ids` is used for nested-schema evolution. Cases when `field_ids` is empty:
289                //
290                // 1. The data type is not associated with a table column, so we don't need to set it.
291                // 2. The column is created before nested-schema evolution is supported, thus is using
292                //    the old serialization format and does not have field ids.
293                // 3. This is an empty struct, which is always considered alterable, and setting ids
294                //    is a no-op.
295                if !field_ids.is_empty() {
296                    struct_type = struct_type.with_ids(field_ids);
297                }
298                struct_type.into()
299            }
300            PbTypeName::List => DataType::list(
301                // The first (and only) item is the list element type.
302                proto.field_type[0].clone().into(),
303            ),
304            PbTypeName::Map => {
305                // Map is physically the same as a list.
306                // So the first (and only) item is the list element type.
307                let list_entries_type: DataType = (&proto.field_type[0]).into();
308                DataType::Map(MapType::from_entries(list_entries_type))
309            }
310            PbTypeName::Vector => DataType::Vector(proto.precision as _),
311            PbTypeName::Int256 => DataType::Int256,
312        }
313    }
314}
315
316impl From<PbDataType> for DataType {
317    fn from(proto: PbDataType) -> DataType {
318        DataType::from(&proto)
319    }
320}
321
322impl From<DataTypeName> for PbTypeName {
323    fn from(type_name: DataTypeName) -> Self {
324        match type_name {
325            DataTypeName::Boolean => PbTypeName::Boolean,
326            DataTypeName::Int16 => PbTypeName::Int16,
327            DataTypeName::Int32 => PbTypeName::Int32,
328            DataTypeName::Int64 => PbTypeName::Int64,
329            DataTypeName::Serial => PbTypeName::Serial,
330            DataTypeName::Float32 => PbTypeName::Float,
331            DataTypeName::Float64 => PbTypeName::Double,
332            DataTypeName::Varchar => PbTypeName::Varchar,
333            DataTypeName::Date => PbTypeName::Date,
334            DataTypeName::Timestamp => PbTypeName::Timestamp,
335            DataTypeName::Timestamptz => PbTypeName::Timestamptz,
336            DataTypeName::Time => PbTypeName::Time,
337            DataTypeName::Interval => PbTypeName::Interval,
338            DataTypeName::Decimal => PbTypeName::Decimal,
339            DataTypeName::Bytea => PbTypeName::Bytea,
340            DataTypeName::Jsonb => PbTypeName::Jsonb,
341            DataTypeName::Struct => PbTypeName::Struct,
342            DataTypeName::List => PbTypeName::List,
343            DataTypeName::Int256 => PbTypeName::Int256,
344            DataTypeName::Map => PbTypeName::Map,
345            DataTypeName::Vector => PbTypeName::Vector,
346        }
347    }
348}
349
350/// Convenient macros to generate match arms for [`DataType`].
351pub mod data_types {
352    use super::DataType;
353
354    /// Numeric [`DataType`]s supported to be `offset` of `RANGE` frame.
355    #[macro_export]
356    macro_rules! _range_frame_numeric_data_types {
357        () => {
358            DataType::Int16
359                | DataType::Int32
360                | DataType::Int64
361                | DataType::Float32
362                | DataType::Float64
363                | DataType::Decimal
364        };
365    }
366    pub use _range_frame_numeric_data_types as range_frame_numeric;
367
368    /// Date/time [`DataType`]s supported to be `offset` of `RANGE` frame.
369    #[macro_export]
370    macro_rules! _range_frame_datetime_data_types {
371        () => {
372            DataType::Date
373                | DataType::Time
374                | DataType::Timestamp
375                | DataType::Timestamptz
376                | DataType::Interval
377        };
378    }
379    pub use _range_frame_datetime_data_types as range_frame_datetime;
380
381    /// Data types that do not have inner fields.
382    #[macro_export]
383    macro_rules! _simple_data_types {
384        () => {
385            DataType::Boolean
386                | DataType::Int16
387                | DataType::Int32
388                | DataType::Int64
389                | DataType::Float32
390                | DataType::Float64
391                | DataType::Decimal
392                | DataType::Date
393                | DataType::Varchar
394                | DataType::Time
395                | DataType::Timestamp
396                | DataType::Timestamptz
397                | DataType::Interval
398                | DataType::Bytea
399                | DataType::Jsonb
400                | DataType::Serial
401                | DataType::Int256
402                | DataType::Vector(_)
403        };
404    }
405    pub use _simple_data_types as simple;
406
407    /// Data types that have inner fields.
408    #[macro_export]
409    macro_rules! _composite_data_types {
410        () => {
411            DataType::Struct { .. } | DataType::List { .. } | DataType::Map { .. }
412        };
413    }
414    pub use _composite_data_types as composite;
415
416    /// Test that all data types are covered either by `simple!()` or `composite!()`.
417    fn _simple_composite_data_types_exhausted(dt: DataType) {
418        match dt {
419            simple!() => {}
420            composite!() => {}
421        }
422    }
423}
424
425impl DataType {
426    /// Same as pgvector; unsure how it was chosen there
427    /// <https://github.com/pgvector/pgvector/blob/v0.8.0/README.md#vector-type>
428    pub const VEC_MAX_SIZE: usize = 16000;
429
430    pub fn create_array_builder(&self, capacity: usize) -> ArrayBuilderImpl {
431        use crate::array::*;
432
433        dispatch_data_types!(self, [B = ArrayBuilder], {
434            B::with_type(capacity, self.clone()).into()
435        })
436    }
437
438    pub fn type_name(&self) -> DataTypeName {
439        DataTypeName::from(self)
440    }
441
442    pub fn prost_type_name(&self) -> PbTypeName {
443        self.type_name().into()
444    }
445
446    pub fn to_protobuf(&self) -> PbDataType {
447        let mut pb = PbDataType {
448            type_name: self.prost_type_name() as i32,
449            is_nullable: true,
450            ..Default::default()
451        };
452        match self {
453            DataType::Struct(t) => {
454                if !t.is_unnamed() {
455                    // To be consistent with `From<&PbDataType>`,
456                    // we only set field names when it's a named struct.
457                    pb.field_names = t.names().map(|s| s.into()).collect();
458                }
459                pb.field_type = t.types().map(|f| f.to_protobuf()).collect();
460                if let Some(ids) = t.ids() {
461                    pb.field_ids = ids.map(|id| id.get_id()).collect();
462                }
463            }
464            DataType::List(list) => {
465                pb.field_type = vec![list.elem().to_protobuf()];
466            }
467            DataType::Map(map) => {
468                // Same as List<Struct<K,V>>
469                pb.field_type = vec![map.clone().into_struct().to_protobuf()];
470            }
471            DataType::Vector(size) => {
472                pb.precision = *size as _;
473            }
474            DataType::Boolean
475            | DataType::Int16
476            | DataType::Int32
477            | DataType::Int64
478            | DataType::Float32
479            | DataType::Float64
480            | DataType::Decimal
481            | DataType::Date
482            | DataType::Varchar
483            | DataType::Time
484            | DataType::Timestamp
485            | DataType::Timestamptz
486            | DataType::Interval
487            | DataType::Bytea
488            | DataType::Jsonb
489            | DataType::Serial
490            | DataType::Int256 => (),
491        }
492        pb
493    }
494
495    pub fn is_numeric(&self) -> bool {
496        matches!(
497            self,
498            DataType::Int16
499                | DataType::Int32
500                | DataType::Int64
501                | DataType::Serial
502                | DataType::Float32
503                | DataType::Float64
504                | DataType::Decimal
505        )
506    }
507
508    /// Returns whether the data type does not have inner fields.
509    pub fn is_simple(&self) -> bool {
510        matches!(self, data_types::simple!())
511    }
512
513    /// Returns whether the data type has inner fields.
514    pub fn is_composite(&self) -> bool {
515        matches!(self, data_types::composite!())
516    }
517
518    pub fn is_array(&self) -> bool {
519        matches!(self, DataType::List(_))
520    }
521
522    pub fn is_struct(&self) -> bool {
523        matches!(self, DataType::Struct(_))
524    }
525
526    pub fn is_map(&self) -> bool {
527        matches!(self, DataType::Map(_))
528    }
529
530    pub fn is_int(&self) -> bool {
531        matches!(self, DataType::Int16 | DataType::Int32 | DataType::Int64)
532    }
533
534    /// Returns the output type of time window function on a given input type.
535    pub fn window_of(input: &DataType) -> Option<DataType> {
536        match input {
537            DataType::Timestamptz => Some(DataType::Timestamptz),
538            DataType::Timestamp | DataType::Date => Some(DataType::Timestamp),
539            _ => None,
540        }
541    }
542
543    pub fn as_struct(&self) -> &StructType {
544        match self {
545            DataType::Struct(t) => t,
546            t => panic!("expect struct type, got {t}"),
547        }
548    }
549
550    pub fn into_struct(self) -> StructType {
551        match self {
552            DataType::Struct(t) => t,
553            t => panic!("expect struct type, got {t}"),
554        }
555    }
556
557    pub fn as_map(&self) -> &MapType {
558        match self {
559            DataType::Map(t) => t,
560            t => panic!("expect map type, got {t}"),
561        }
562    }
563
564    pub fn into_map(self) -> MapType {
565        match self {
566            DataType::Map(t) => t,
567            t => panic!("expect map type, got {t}"),
568        }
569    }
570
571    pub fn as_list(&self) -> &ListType {
572        match self {
573            DataType::List(t) => t,
574            t => panic!("expect list type, got {t}"),
575        }
576    }
577
578    pub fn into_list(self) -> ListType {
579        match self {
580            DataType::List(t) => t,
581            t => panic!("expect list type, got {t}"),
582        }
583    }
584
585    /// Returns the inner element's type if `self` is a list type.
586    /// Equivalent to `self.as_list().elem()`.
587    pub fn as_list_elem(&self) -> &DataType {
588        self.as_list().elem()
589    }
590
591    /// Returns the inner element's type if `self` is a list type.
592    /// Equivalent to `self.into_list().into_elem()`.
593    pub fn into_list_elem(self) -> DataType {
594        self.into_list().into_elem()
595    }
596
597    /// Return a new type that removes the outer list, and get the innermost element type.
598    ///
599    /// Use [`DataType::as_list_elem`] if you only want the element type of a list.
600    ///
601    /// ```
602    /// use risingwave_common::types::DataType::*;
603    /// assert_eq!(Int32.list().unnest_list(), &Int32);
604    /// assert_eq!(Int32.list().list().unnest_list(), &Int32);
605    /// ```
606    pub fn unnest_list(&self) -> &Self {
607        match self {
608            DataType::List(list) => list.elem().unnest_list(),
609            _ => self,
610        }
611    }
612
613    /// Return the number of dimensions of this array/list type. Return `0` when this type is not an
614    /// array/list.
615    pub fn array_ndims(&self) -> usize {
616        let mut d = 0;
617        let mut t = self;
618        while let Self::List(list) = t {
619            d += 1;
620            t = list.elem();
621        }
622        d
623    }
624
625    /// Compares the datatype with another, ignoring nested field names and ids.
626    pub fn equals_datatype(&self, other: &DataType) -> bool {
627        match (self, other) {
628            (Self::Struct(s1), Self::Struct(s2)) => s1.equals_datatype(s2),
629            (Self::List(d1), Self::List(d2)) => d1.elem().equals_datatype(d2.elem()),
630            (Self::Map(m1), Self::Map(m2)) => {
631                m1.key().equals_datatype(m2.key()) && m1.value().equals_datatype(m2.value())
632            }
633            _ => self == other,
634        }
635    }
636
637    /// Whether a column with this data type can be altered to a new data type. This determines
638    /// the encoding of the column data.
639    ///
640    /// Returns...
641    /// - `None`, if the data type is simple or does not contain a struct type.
642    /// - `Some(true)`, if the data type contains a struct type with field ids ([`StructType::has_ids`]).
643    /// - `Some(false)`, if the data type contains a struct type without field ids.
644    pub fn can_alter(&self) -> Option<bool> {
645        match self {
646            data_types::simple!() => None,
647            DataType::Struct(struct_type) => {
648                // As long as we meet a struct type, we can check its `ids` field to determine if
649                // it can be altered.
650                let struct_can_alter = struct_type.has_ids();
651                // In debug build, we assert that once a struct type does (or does not) have ids,
652                // all its composite fields should have the same property.
653                if cfg!(debug_assertions) {
654                    for field in struct_type.types() {
655                        if let Some(field_can_alter) = field.can_alter() {
656                            assert_eq!(struct_can_alter, field_can_alter);
657                        }
658                    }
659                }
660                Some(struct_can_alter)
661            }
662
663            DataType::List(list_type) => list_type.elem().can_alter(),
664            DataType::Map(map_type) => {
665                debug_assert!(
666                    map_type.key().is_simple(),
667                    "unexpected key type of map {map_type:?}"
668                );
669                map_type.value().can_alter()
670            }
671        }
672    }
673}
674
675impl From<StructType> for DataType {
676    fn from(value: StructType) -> Self {
677        Self::Struct(value)
678    }
679}
680
681impl From<DataType> for PbDataType {
682    fn from(data_type: DataType) -> Self {
683        data_type.to_protobuf()
684    }
685}
686
687mod private {
688    use super::*;
689
690    // Note: put pub trait inside a private mod just makes the name private,
691    // The trait methods will still be publicly available...
692    // a.k.a. ["Voldemort type"](https://rust-lang.github.io/rfcs/2145-type-privacy.html#lint-3-voldemort-types-its-reachable-but-i-cant-name-it)
693
694    /// Common trait bounds of scalar and scalar reference types.
695    ///
696    /// NOTE(rc): `Hash` is not in the trait bound list, it's implemented as [`ScalarRef::hash_scalar`].
697    pub trait ScalarBounds<Impl> = Debug
698        + Send
699        + Sync
700        + Clone
701        + PartialEq
702        + Eq
703        // in default ascending order
704        + PartialOrd
705        + Ord
706        + TryFrom<Impl, Error = ArrayError>
707        // `ScalarImpl`/`ScalarRefImpl`
708        + Into<Impl>;
709}
710
711/// `Scalar` is a trait over all possible owned types in the evaluation
712/// framework.
713///
714/// `Scalar` is reciprocal to `ScalarRef`. Use `as_scalar_ref` to get a
715/// reference which has the same lifetime as `self`.
716pub trait Scalar: private::ScalarBounds<ScalarImpl> + 'static {
717    /// Type for reference of `Scalar`
718    type ScalarRefType<'a>: ScalarRef<'a, ScalarType = Self> + 'a
719    where
720        Self: 'a;
721
722    /// Get a reference to current scalar.
723    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_>;
724
725    fn to_scalar_value(self) -> ScalarImpl {
726        self.into()
727    }
728}
729
730/// `ScalarRef` is a trait over all possible references in the evaluation
731/// framework.
732///
733/// `ScalarRef` is reciprocal to `Scalar`. Use `to_owned_scalar` to get an
734/// owned scalar.
735pub trait ScalarRef<'a>: private::ScalarBounds<ScalarRefImpl<'a>> + 'a + Copy {
736    /// `ScalarType` is the owned type of current `ScalarRef`.
737    type ScalarType: Scalar<ScalarRefType<'a> = Self>;
738
739    /// Convert `ScalarRef` to an owned scalar.
740    fn to_owned_scalar(&self) -> Self::ScalarType;
741
742    /// A wrapped hash function to get the hash value for this scaler.
743    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H);
744}
745
746/// Define `ScalarImpl` and `ScalarRefImpl` with macro.
747macro_rules! scalar_impl_enum {
748    ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
749        /// `ScalarImpl` embeds all possible scalars in the evaluation framework.
750        ///
751        /// Note: `ScalarImpl` doesn't contain all information of its `DataType`,
752        /// so sometimes they need to be used together.
753        /// e.g., for `Struct`, we don't have the field names in the value.
754        ///
755        /// See `for_all_variants` for the definition.
756        #[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
757        pub enum ScalarImpl {
758            $( $variant_name($scalar) ),*
759        }
760
761        /// `ScalarRefImpl` embeds all possible scalar references in the evaluation
762        /// framework.
763        ///
764        /// Note: `ScalarRefImpl` doesn't contain all information of its `DataType`,
765        /// so sometimes they need to be used together.
766        /// e.g., for `Struct`, we don't have the field names in the value.
767        ///
768        /// See `for_all_variants` for the definition.
769        #[derive(Debug, Copy, Clone, PartialEq, Eq)]
770        pub enum ScalarRefImpl<'scalar> {
771            $( $variant_name($scalar_ref) ),*
772        }
773    };
774}
775
776for_all_variants! { scalar_impl_enum }
777
778// We MUST NOT implement `Ord` for `ScalarImpl` because that will make `Datum` derive an incorrect
779// default `Ord`. To get a default-ordered `ScalarImpl`/`ScalarRefImpl`/`Datum`/`DatumRef`, you can
780// use `DefaultOrdered<T>`. If non-default order is needed, please refer to `sort_util`.
781impl !PartialOrd for ScalarImpl {}
782impl !PartialOrd for ScalarRefImpl<'_> {}
783
784pub type Datum = Option<ScalarImpl>;
785pub type DatumRef<'a> = Option<ScalarRefImpl<'a>>;
786
787/// This trait is to implement `to_owned_datum` for `Option<ScalarImpl>`
788pub trait ToOwnedDatum {
789    /// Convert the datum to an owned [`Datum`].
790    fn to_owned_datum(self) -> Datum;
791}
792
793impl ToOwnedDatum for &Datum {
794    #[inline(always)]
795    fn to_owned_datum(self) -> Datum {
796        self.clone()
797    }
798}
799
800impl<T: Into<ScalarImpl>> ToOwnedDatum for T {
801    #[inline(always)]
802    fn to_owned_datum(self) -> Datum {
803        Some(self.into())
804    }
805}
806
807impl<T: Into<ScalarImpl>> ToOwnedDatum for Option<T> {
808    #[inline(always)]
809    fn to_owned_datum(self) -> Datum {
810        self.map(Into::into)
811    }
812}
813
814impl<const N: usize> From<TypedId<N, u32>> for ScalarImpl {
815    fn from(value: TypedId<N, u32>) -> Self {
816        value.as_i32_id().into()
817    }
818}
819
820impl<const N: usize> From<TypedId<N, u64>> for ScalarImpl {
821    fn from(value: TypedId<N, u64>) -> Self {
822        value.as_i64_id().into()
823    }
824}
825
826#[auto_impl::auto_impl(&)]
827pub trait ToDatumRef: PartialEq + Eq + Debug + Send + Sync {
828    /// Convert the datum to [`DatumRef`].
829    fn to_datum_ref(&self) -> DatumRef<'_>;
830}
831
832impl ToDatumRef for Datum {
833    #[inline(always)]
834    fn to_datum_ref(&self) -> DatumRef<'_> {
835        self.as_ref().map(|d| d.as_scalar_ref_impl())
836    }
837}
838impl ToDatumRef for Option<&ScalarImpl> {
839    #[inline(always)]
840    fn to_datum_ref(&self) -> DatumRef<'_> {
841        self.map(|d| d.as_scalar_ref_impl())
842    }
843}
844impl ToDatumRef for DatumRef<'_> {
845    #[inline(always)]
846    fn to_datum_ref(&self) -> DatumRef<'_> {
847        *self
848    }
849}
850
851/// To make sure there is `as_scalar_ref` for all scalar ref types.
852/// See <https://github.com/risingwavelabs/risingwave/pull/9977/files#r1208972881>
853///
854/// This is used by the expr macro.
855pub trait SelfAsScalarRef {
856    fn as_scalar_ref(&self) -> Self;
857}
858macro_rules! impl_self_as_scalar_ref {
859    ($($t:ty),*) => {
860        $(
861            impl SelfAsScalarRef for $t {
862                fn as_scalar_ref(&self) -> Self {
863                    *self
864                }
865            }
866        )*
867    };
868}
869impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_>, MapRef<'_> }
870
871/// `for_all_native_types` includes all native variants of our scalar types.
872///
873/// Specifically, it doesn't support u8/u16/u32/u64.
874#[macro_export]
875macro_rules! for_all_native_types {
876    ($macro:ident) => {
877        $macro! {
878            { i16, Int16, read_i16 },
879            { i32, Int32, read_i32 },
880            { i64, Int64, read_i64 },
881            { Serial, Serial, read_i64 },
882            { $crate::types::F32, Float32, read_f32 },
883            { $crate::types::F64, Float64, read_f64 }
884        }
885    };
886}
887
888/// `impl_convert` implements several conversions for `Scalar`.
889/// * `Scalar <-> ScalarImpl` with `From` and `TryFrom` trait.
890/// * `ScalarRef <-> ScalarRefImpl` with `From` and `TryFrom` trait.
891/// * `&ScalarImpl -> &Scalar` with `impl.as_int16()`.
892/// * `ScalarImpl -> Scalar` with `impl.into_int16()`.
893macro_rules! impl_convert {
894    ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
895        $(
896            impl From<$scalar> for ScalarImpl {
897                fn from(val: $scalar) -> Self {
898                    ScalarImpl::$variant_name(val)
899                }
900            }
901
902            impl TryFrom<ScalarImpl> for $scalar {
903                type Error = ArrayError;
904
905                fn try_from(val: ScalarImpl) -> ArrayResult<Self> {
906                    match val {
907                        ScalarImpl::$variant_name(scalar) => Ok(scalar),
908                        other_scalar => bail!("cannot convert ScalarImpl::{} to concrete type", other_scalar.get_ident()),
909                    }
910                }
911            }
912
913            impl <'scalar> From<$scalar_ref> for ScalarRefImpl<'scalar> {
914                fn from(val: $scalar_ref) -> Self {
915                    ScalarRefImpl::$variant_name(val)
916                }
917            }
918
919            impl <'scalar> TryFrom<ScalarRefImpl<'scalar>> for $scalar_ref {
920                type Error = ArrayError;
921
922                fn try_from(val: ScalarRefImpl<'scalar>) -> ArrayResult<Self> {
923                    match val {
924                        ScalarRefImpl::$variant_name(scalar_ref) => Ok(scalar_ref),
925                        other_scalar => bail!("cannot convert ScalarRefImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name)),
926                    }
927                }
928            }
929
930            paste! {
931                impl ScalarImpl {
932                    /// # Panics
933                    /// If the scalar is not of the expected type.
934                    pub fn [<as_ $suffix_name>](&self) -> &$scalar {
935                        match self {
936                            Self::$variant_name(scalar) => scalar,
937                            other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
938                        }
939                    }
940
941                    /// # Panics
942                    /// If the scalar is not of the expected type.
943                    pub fn [<into_ $suffix_name>](self) -> $scalar {
944                        match self {
945                            Self::$variant_name(scalar) => scalar,
946                            other_scalar =>  panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
947                        }
948                    }
949                }
950
951                impl <'scalar> ScalarRefImpl<'scalar> {
952                    /// # Panics
953                    /// If the scalar is not of the expected type.
954                    pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
955                        match self {
956                            Self::$variant_name(inner) => inner,
957                            other_scalar => panic!("cannot convert ScalarRefImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
958                        }
959                    }
960                }
961            }
962        )*
963    };
964}
965
966for_all_variants! { impl_convert }
967
968// Implement `From<raw float>` for `ScalarImpl::Float` as a sugar.
969impl From<f32> for ScalarImpl {
970    fn from(f: f32) -> Self {
971        Self::Float32(f.into())
972    }
973}
974impl From<f64> for ScalarImpl {
975    fn from(f: f64) -> Self {
976        Self::Float64(f.into())
977    }
978}
979
980// Implement `From<string like>` for `ScalarImpl::Utf8` as a sugar.
981impl From<String> for ScalarImpl {
982    fn from(s: String) -> Self {
983        Self::Utf8(s.into_boxed_str())
984    }
985}
986impl From<&str> for ScalarImpl {
987    fn from(s: &str) -> Self {
988        Self::Utf8(s.into())
989    }
990}
991impl From<&String> for ScalarImpl {
992    fn from(s: &String) -> Self {
993        Self::Utf8(s.as_str().into())
994    }
995}
996impl TryFrom<ScalarImpl> for String {
997    type Error = ArrayError;
998
999    fn try_from(val: ScalarImpl) -> ArrayResult<Self> {
1000        match val {
1001            ScalarImpl::Utf8(s) => Ok(s.into()),
1002            other_scalar => bail!(
1003                "cannot convert ScalarImpl::{} to concrete type",
1004                other_scalar.get_ident()
1005            ),
1006        }
1007    }
1008}
1009
1010impl From<char> for ScalarImpl {
1011    fn from(c: char) -> Self {
1012        Self::Utf8(c.to_string().into())
1013    }
1014}
1015
1016impl From<&[u8]> for ScalarImpl {
1017    fn from(s: &[u8]) -> Self {
1018        Self::Bytea(s.into())
1019    }
1020}
1021
1022impl From<JsonbRef<'_>> for ScalarImpl {
1023    fn from(jsonb: JsonbRef<'_>) -> Self {
1024        Self::Jsonb(jsonb.to_owned_scalar())
1025    }
1026}
1027
1028impl<T: PrimitiveArrayItemType> From<Vec<T>> for ScalarImpl {
1029    fn from(v: Vec<T>) -> Self {
1030        Self::List(v.into_iter().collect())
1031    }
1032}
1033
1034impl<T: PrimitiveArrayItemType> From<Vec<Option<T>>> for ScalarImpl {
1035    fn from(v: Vec<Option<T>>) -> Self {
1036        Self::List(v.into_iter().collect())
1037    }
1038}
1039
1040impl From<Vec<String>> for ScalarImpl {
1041    fn from(v: Vec<String>) -> Self {
1042        Self::List(v.iter().map(|s| s.as_str()).collect())
1043    }
1044}
1045
1046impl From<Vec<u8>> for ScalarImpl {
1047    fn from(v: Vec<u8>) -> Self {
1048        Self::Bytea(v.into())
1049    }
1050}
1051
1052impl From<Bytes> for ScalarImpl {
1053    fn from(v: Bytes) -> Self {
1054        Self::Bytea(v.as_ref().into())
1055    }
1056}
1057
1058impl From<ListRef<'_>> for ScalarImpl {
1059    fn from(list: ListRef<'_>) -> Self {
1060        Self::List(list.to_owned_scalar())
1061    }
1062}
1063
1064impl ScalarImpl {
1065    /// Creates a scalar from pgwire "BINARY" format.
1066    ///
1067    /// The counterpart of [`to_binary::ToBinary`].
1068    pub fn from_binary(bytes: &Bytes, data_type: &DataType) -> Result<Self, BoxedError> {
1069        let res = match data_type {
1070            DataType::Varchar => Self::Utf8(String::from_sql(&Type::VARCHAR, bytes)?.into()),
1071            DataType::Bytea => Self::Bytea(Vec::<u8>::from_sql(&Type::BYTEA, bytes)?.into()),
1072            DataType::Boolean => Self::Bool(bool::from_sql(&Type::BOOL, bytes)?),
1073            DataType::Int16 => Self::Int16(i16::from_sql(&Type::INT2, bytes)?),
1074            DataType::Int32 => Self::Int32(i32::from_sql(&Type::INT4, bytes)?),
1075            DataType::Int64 => Self::Int64(i64::from_sql(&Type::INT8, bytes)?),
1076            DataType::Serial => Self::Serial(Serial::from(i64::from_sql(&Type::INT8, bytes)?)),
1077            DataType::Float32 => Self::Float32(f32::from_sql(&Type::FLOAT4, bytes)?.into()),
1078            DataType::Float64 => Self::Float64(f64::from_sql(&Type::FLOAT8, bytes)?.into()),
1079            DataType::Decimal => {
1080                Self::Decimal(rust_decimal::Decimal::from_sql(&Type::NUMERIC, bytes)?.into())
1081            }
1082            DataType::Date => Self::Date(chrono::NaiveDate::from_sql(&Type::DATE, bytes)?.into()),
1083            DataType::Time => Self::Time(chrono::NaiveTime::from_sql(&Type::TIME, bytes)?.into()),
1084            DataType::Timestamp => {
1085                Self::Timestamp(chrono::NaiveDateTime::from_sql(&Type::TIMESTAMP, bytes)?.into())
1086            }
1087            DataType::Timestamptz => Self::Timestamptz(
1088                chrono::DateTime::<chrono::Utc>::from_sql(&Type::TIMESTAMPTZ, bytes)?.into(),
1089            ),
1090            DataType::Interval => Self::Interval(Interval::from_sql(&Type::INTERVAL, bytes)?),
1091            DataType::Jsonb => Self::Jsonb(
1092                JsonbVal::value_deserialize(bytes)
1093                    .ok_or_else(|| "invalid value of Jsonb".to_owned())?,
1094            ),
1095            DataType::Int256 => Self::Int256(Int256::from_binary(bytes)?),
1096            DataType::Vector(_) | DataType::Struct(_) | DataType::List(_) | DataType::Map(_) => {
1097                return Err(format!("unsupported data type: {}", data_type).into());
1098            }
1099        };
1100        Ok(res)
1101    }
1102
1103    /// Creates a scalar from pgwire "TEXT" format.
1104    ///
1105    /// The counterpart of [`ToText`].
1106    pub fn from_text(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
1107        Ok(match data_type {
1108            DataType::Boolean => str_to_bool(s)?.into(),
1109            DataType::Int16 => i16::from_str(s)?.into(),
1110            DataType::Int32 => i32::from_str(s)?.into(),
1111            DataType::Int64 => i64::from_str(s)?.into(),
1112            DataType::Int256 => Int256::from_str(s)?.into(),
1113            DataType::Serial => Serial::from(i64::from_str(s)?).into(),
1114            DataType::Decimal => Decimal::from_str(s)?.into(),
1115            DataType::Float32 => F32::from_str(s)?.into(),
1116            DataType::Float64 => F64::from_str(s)?.into(),
1117            DataType::Varchar => s.into(),
1118            DataType::Date => Date::from_str(s)?.into(),
1119            DataType::Timestamp => Timestamp::from_str(s)?.into(),
1120            // We only handle the case with timezone here, and leave the implicit session timezone case
1121            // for later phase.
1122            DataType::Timestamptz => Timestamptz::from_str(s)?.into(),
1123            DataType::Time => Time::from_str(s)?.into(),
1124            DataType::Interval => Interval::from_str(s)?.into(),
1125            DataType::List(_) => ListValue::from_str(s, data_type)?.into(),
1126            DataType::Struct(st) => StructValue::from_str(s, st)?.into(),
1127            DataType::Jsonb => JsonbVal::from_str(s)?.into(),
1128            DataType::Bytea => {
1129                let mut buf = Vec::new();
1130                str_to_bytea(s, &mut buf)?;
1131                buf.into()
1132            }
1133            DataType::Vector(size) => VectorVal::from_text(s, *size)?.into(),
1134            DataType::Map(_m) => return Err("map from text is not supported".into()),
1135        })
1136    }
1137
1138    pub fn from_text_for_test(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
1139        Ok(match data_type {
1140            DataType::Map(map_type) => MapValue::from_str_for_test(s, map_type)?.into(),
1141            _ => ScalarImpl::from_text(s, data_type)?,
1142        })
1143    }
1144}
1145
1146impl From<ScalarRefImpl<'_>> for ScalarImpl {
1147    fn from(scalar_ref: ScalarRefImpl<'_>) -> Self {
1148        scalar_ref.into_scalar_impl()
1149    }
1150}
1151
1152impl<'a> From<&'a ScalarImpl> for ScalarRefImpl<'a> {
1153    fn from(scalar: &'a ScalarImpl) -> Self {
1154        scalar.as_scalar_ref_impl()
1155    }
1156}
1157
1158impl ScalarImpl {
1159    /// Converts [`ScalarImpl`] to [`ScalarRefImpl`]
1160    pub fn as_scalar_ref_impl(&self) -> ScalarRefImpl<'_> {
1161        dispatch_scalar_variants!(self, inner, { inner.as_scalar_ref().into() })
1162    }
1163}
1164
1165impl ScalarRefImpl<'_> {
1166    /// Converts [`ScalarRefImpl`] to [`ScalarImpl`]
1167    pub fn into_scalar_impl(self) -> ScalarImpl {
1168        dispatch_scalar_ref_variants!(self, inner, { inner.to_owned_scalar().into() })
1169    }
1170}
1171
1172impl Hash for ScalarImpl {
1173    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1174        dispatch_scalar_variants!(self, inner, { inner.as_scalar_ref().hash_scalar(state) })
1175    }
1176}
1177
1178impl Hash for ScalarRefImpl<'_> {
1179    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1180        dispatch_scalar_ref_variants!(self, inner, { inner.hash_scalar(state) })
1181    }
1182}
1183
1184/// Feeds the raw scalar reference of `datum` to the given `state`, which should behave the same
1185/// as [`crate::array::Array::hash_at`], where NULL value will be carefully handled.
1186///
1187/// **FIXME**: the result of this function might be different from [`std::hash::Hash`] due to the
1188/// type alias of `DatumRef = Option<_>`, we should manually implement [`std::hash::Hash`] for
1189/// [`DatumRef`] in the future when it becomes a newtype. (#477)
1190#[inline(always)]
1191pub fn hash_datum(datum: impl ToDatumRef, state: &mut impl std::hash::Hasher) {
1192    match datum.to_datum_ref() {
1193        Some(scalar_ref) => scalar_ref.hash(state),
1194        None => NULL_VAL_FOR_HASH.hash(state),
1195    }
1196}
1197
1198impl ScalarRefImpl<'_> {
1199    pub fn binary_format(&self, data_type: &DataType) -> to_binary::Result<Bytes> {
1200        use self::to_binary::ToBinary;
1201        self.to_binary_with_type(data_type)
1202    }
1203
1204    pub fn text_format(&self, data_type: &DataType) -> String {
1205        self.to_text_with_type(data_type)
1206    }
1207
1208    /// Serialize the scalar into the `memcomparable` format.
1209    pub fn serialize(
1210        &self,
1211        ser: &mut memcomparable::Serializer<impl BufMut>,
1212    ) -> memcomparable::Result<()> {
1213        match self {
1214            Self::Int16(v) => v.serialize(ser)?,
1215            Self::Int32(v) => v.serialize(ser)?,
1216            Self::Int64(v) => v.serialize(ser)?,
1217            Self::Serial(v) => v.serialize(ser)?,
1218            Self::Float32(v) => v.serialize(ser)?,
1219            Self::Float64(v) => v.serialize(ser)?,
1220            Self::Utf8(v) => v.serialize(ser)?,
1221            Self::Bytea(v) => ser.serialize_bytes(v)?,
1222            Self::Bool(v) => v.serialize(ser)?,
1223            Self::Decimal(v) => ser.serialize_decimal((*v).into())?,
1224            Self::Interval(v) => v.serialize(ser)?,
1225            Self::Date(v) => v.0.num_days_from_ce().serialize(ser)?,
1226            Self::Timestamp(v) => {
1227                v.0.and_utc().timestamp().serialize(&mut *ser)?;
1228                v.0.and_utc().timestamp_subsec_nanos().serialize(ser)?;
1229            }
1230            Self::Timestamptz(v) => v.serialize(ser)?,
1231            Self::Time(v) => {
1232                v.0.num_seconds_from_midnight().serialize(&mut *ser)?;
1233                v.0.nanosecond().serialize(ser)?;
1234            }
1235            Self::Int256(v) => v.memcmp_serialize(ser)?,
1236            Self::Jsonb(v) => v.memcmp_serialize(ser)?,
1237            Self::Struct(v) => v.memcmp_serialize(ser)?,
1238            Self::List(v) => v.memcmp_serialize(ser)?,
1239            Self::Map(v) => v.memcmp_serialize(ser)?,
1240            Self::Vector(v) => v.memcmp_serialize(ser)?,
1241        };
1242        Ok(())
1243    }
1244}
1245
1246impl ScalarImpl {
1247    /// Serialize the scalar into the `memcomparable` format.
1248    pub fn serialize(
1249        &self,
1250        ser: &mut memcomparable::Serializer<impl BufMut>,
1251    ) -> memcomparable::Result<()> {
1252        self.as_scalar_ref_impl().serialize(ser)
1253    }
1254
1255    /// Deserialize the scalar from the `memcomparable` format.
1256    pub fn deserialize(
1257        ty: &DataType,
1258        de: &mut memcomparable::Deserializer<impl Buf>,
1259    ) -> memcomparable::Result<Self> {
1260        use DataType as Ty;
1261        Ok(match ty {
1262            Ty::Int16 => Self::Int16(i16::deserialize(de)?),
1263            Ty::Int32 => Self::Int32(i32::deserialize(de)?),
1264            Ty::Int64 => Self::Int64(i64::deserialize(de)?),
1265            Ty::Int256 => Self::Int256(Int256::memcmp_deserialize(de)?),
1266            Ty::Serial => Self::Serial(Serial::from(i64::deserialize(de)?)),
1267            Ty::Float32 => Self::Float32(f32::deserialize(de)?.into()),
1268            Ty::Float64 => Self::Float64(f64::deserialize(de)?.into()),
1269            Ty::Varchar => Self::Utf8(Box::<str>::deserialize(de)?),
1270            Ty::Bytea => Self::Bytea(serde_bytes::ByteBuf::deserialize(de)?.into_vec().into()),
1271            Ty::Boolean => Self::Bool(bool::deserialize(de)?),
1272            Ty::Decimal => Self::Decimal(de.deserialize_decimal()?.into()),
1273            Ty::Interval => Self::Interval(Interval::deserialize(de)?),
1274            Ty::Time => Self::Time({
1275                let secs = u32::deserialize(&mut *de)?;
1276                let nano = u32::deserialize(de)?;
1277                Time::with_secs_nano(secs, nano)
1278                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1279            }),
1280            Ty::Timestamp => Self::Timestamp({
1281                let secs = i64::deserialize(&mut *de)?;
1282                let nsecs = u32::deserialize(de)?;
1283                Timestamp::with_secs_nsecs(secs, nsecs)
1284                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1285            }),
1286            Ty::Timestamptz => Self::Timestamptz(Timestamptz::deserialize(de)?),
1287            Ty::Date => Self::Date({
1288                let days = i32::deserialize(de)?;
1289                Date::with_days_since_ce(days)
1290                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1291            }),
1292            Ty::Jsonb => Self::Jsonb(JsonbVal::memcmp_deserialize(de)?),
1293            Ty::Struct(t) => StructValue::memcmp_deserialize(t.types(), de)?.to_scalar_value(),
1294            Ty::List(t) => ListValue::memcmp_deserialize(t, de)?.to_scalar_value(),
1295            Ty::Map(t) => MapValue::memcmp_deserialize(t, de)?.to_scalar_value(),
1296            Ty::Vector(dimension) => {
1297                VectorVal::memcmp_deserialize(*dimension, de)?.to_scalar_value()
1298            }
1299        })
1300    }
1301
1302    pub fn as_integral(&self) -> i64 {
1303        match self {
1304            Self::Int16(v) => *v as i64,
1305            Self::Int32(v) => *v as i64,
1306            Self::Int64(v) => *v,
1307            _ => panic!(
1308                "Can't convert ScalarImpl::{} to a integral",
1309                self.get_ident()
1310            ),
1311        }
1312    }
1313}
1314
1315/// Returns whether the `literal` matches the `data_type`.
1316pub fn literal_type_match(data_type: &DataType, literal: Option<&ScalarImpl>) -> bool {
1317    match literal {
1318        None => true,
1319        Some(scalar) => scalar_ref_type_match(data_type, scalar.as_scalar_ref_impl()),
1320    }
1321}
1322
1323/// Returns whether the scalar ref matches the `data_type`.
1324///
1325/// This is a lightweight "shape check" intended for callers that need to avoid panics on
1326/// malformed input. For nested types, it checks element/field types recursively.
1327pub fn scalar_ref_type_match(data_type: &DataType, scalar: ScalarRefImpl<'_>) -> bool {
1328    match (data_type, scalar) {
1329        (DataType::List(list_type), ScalarRefImpl::List(v)) => {
1330            v.elem_type().equals_datatype(list_type.elem())
1331        }
1332        (DataType::Map(map_type), ScalarRefImpl::Map(v)) => v
1333            .inner()
1334            .elem_type()
1335            .equals_datatype(&map_type.clone().into_struct()),
1336        (DataType::Vector(size), ScalarRefImpl::Vector(v)) => v.dimension() == *size,
1337        (DataType::Struct(struct_type), ScalarRefImpl::Struct(v)) => {
1338            struct_ref_type_match(struct_type, v)
1339        }
1340
1341        _ => {
1342            macro_rules! matches {
1343                ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty }),*) => {
1344                    match (data_type, scalar) {
1345                        $(
1346                            (DataType::$data_type { .. }, ScalarRefImpl::$variant_name(_)) => true,
1347                            (DataType::$data_type { .. }, _) => false, // keep exhaustive over DataType variants
1348                        )*
1349                    }
1350                }
1351            }
1352            for_all_variants! { matches }
1353        }
1354    }
1355}
1356
1357/// Returns whether the `datum` matches the `data_type`.
1358#[inline(always)]
1359pub fn datum_ref_type_match(data_type: &DataType, datum: DatumRef<'_>) -> bool {
1360    match datum {
1361        None => true,
1362        Some(scalar) => scalar_ref_type_match(data_type, scalar),
1363    }
1364}
1365
1366fn struct_ref_type_match(expected: &StructType, value: StructRef<'_>) -> bool {
1367    match value {
1368        StructRef::Indexed { arr, .. } => {
1369            // `StructRef::Indexed` comes with a `StructArray`, whose type can be compared directly.
1370            crate::array::Array::data_type(arr).equals_datatype(&DataType::Struct(expected.clone()))
1371        }
1372        StructRef::ValueRef { val } => {
1373            let fields = val.fields();
1374            if fields.len() != expected.len() {
1375                return false;
1376            }
1377            expected
1378                .types()
1379                .zip_eq_fast(fields.iter())
1380                .all(|(ty, datum)| datum_ref_type_match(ty, datum.to_datum_ref()))
1381        }
1382    }
1383}
1384
1385#[cfg(test)]
1386mod tests {
1387    use std::hash::{BuildHasher, Hasher};
1388
1389    use strum::IntoEnumIterator;
1390
1391    use super::*;
1392    use crate::util::hash_util::Crc32FastBuilder;
1393
1394    #[test]
1395    fn test_size() {
1396        use static_assertions::const_assert_eq;
1397
1398        use crate::array::*;
1399
1400        macro_rules! assert_item_size_eq {
1401            ($array:ty, $size:literal) => {
1402                const_assert_eq!(std::mem::size_of::<<$array as Array>::OwnedItem>(), $size);
1403            };
1404        }
1405
1406        assert_item_size_eq!(StructArray, 16); // Box<[Datum]>
1407        assert_item_size_eq!(ListArray, 8); // Box<ArrayImpl>
1408        assert_item_size_eq!(Utf8Array, 16); // Box<str>
1409        assert_item_size_eq!(IntervalArray, 16);
1410        assert_item_size_eq!(TimestampArray, 12);
1411
1412        // TODO: try to reduce the memory usage of `Decimal`, `ScalarImpl` and `Datum`.
1413        assert_item_size_eq!(DecimalArray, 20);
1414
1415        const_assert_eq!(std::mem::size_of::<ScalarImpl>(), 24);
1416        const_assert_eq!(std::mem::size_of::<ScalarRefImpl<'_>>(), 24);
1417        const_assert_eq!(std::mem::size_of::<Datum>(), 24);
1418        const_assert_eq!(std::mem::size_of::<StructType>(), 8);
1419        const_assert_eq!(std::mem::size_of::<DataType>(), 16);
1420    }
1421
1422    #[test]
1423    fn test_data_type_display() {
1424        let d: DataType =
1425            StructType::new(vec![("i", DataType::Int32), ("j", DataType::Varchar)]).into();
1426        assert_eq!(
1427            format!("{}", d),
1428            "struct<i integer, j character varying>".to_owned()
1429        );
1430    }
1431
1432    #[test]
1433    fn test_hash_implementation() {
1434        fn test(datum: Datum, data_type: DataType) {
1435            assert!(literal_type_match(&data_type, datum.as_ref()));
1436
1437            let mut builder = data_type.create_array_builder(6);
1438            for _ in 0..3 {
1439                builder.append_null();
1440                builder.append(&datum);
1441            }
1442            let array = builder.finish();
1443
1444            let hash_from_array = {
1445                let mut state = Crc32FastBuilder.build_hasher();
1446                array.hash_at(3, &mut state);
1447                state.finish()
1448            };
1449
1450            let hash_from_datum = {
1451                let mut state = Crc32FastBuilder.build_hasher();
1452                hash_datum(&datum, &mut state);
1453                state.finish()
1454            };
1455
1456            let hash_from_datum_ref = {
1457                let mut state = Crc32FastBuilder.build_hasher();
1458                hash_datum(datum.to_datum_ref(), &mut state);
1459                state.finish()
1460            };
1461
1462            assert_eq!(hash_from_array, hash_from_datum);
1463            assert_eq!(hash_from_datum, hash_from_datum_ref);
1464        }
1465
1466        for name in DataTypeName::iter() {
1467            let (scalar, data_type) = match name {
1468                DataTypeName::Boolean => (ScalarImpl::Bool(true), DataType::Boolean),
1469                DataTypeName::Int16 => (ScalarImpl::Int16(233), DataType::Int16),
1470                DataTypeName::Int32 => (ScalarImpl::Int32(233333), DataType::Int32),
1471                DataTypeName::Int64 => (ScalarImpl::Int64(233333333333), DataType::Int64),
1472                DataTypeName::Int256 => (
1473                    ScalarImpl::Int256(233333333333_i64.into()),
1474                    DataType::Int256,
1475                ),
1476                DataTypeName::Serial => (ScalarImpl::Serial(233333333333.into()), DataType::Serial),
1477                DataTypeName::Float32 => (ScalarImpl::Float32(23.33.into()), DataType::Float32),
1478                DataTypeName::Float64 => (
1479                    ScalarImpl::Float64(23.333333333333.into()),
1480                    DataType::Float64,
1481                ),
1482                DataTypeName::Decimal => (
1483                    ScalarImpl::Decimal("233.33".parse().unwrap()),
1484                    DataType::Decimal,
1485                ),
1486                DataTypeName::Date => (
1487                    ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)),
1488                    DataType::Date,
1489                ),
1490                DataTypeName::Varchar => (ScalarImpl::Utf8("233".into()), DataType::Varchar),
1491                DataTypeName::Bytea => (
1492                    ScalarImpl::Bytea("\\x233".as_bytes().into()),
1493                    DataType::Bytea,
1494                ),
1495                DataTypeName::Time => (
1496                    ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)),
1497                    DataType::Time,
1498                ),
1499                DataTypeName::Timestamp => (
1500                    ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(23333333, 2333)),
1501                    DataType::Timestamp,
1502                ),
1503                DataTypeName::Timestamptz => (
1504                    ScalarImpl::Timestamptz(Timestamptz::from_micros(233333333)),
1505                    DataType::Timestamptz,
1506                ),
1507                DataTypeName::Interval => (
1508                    ScalarImpl::Interval(Interval::from_month_day_usec(2, 3, 3333)),
1509                    DataType::Interval,
1510                ),
1511                DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::null()), DataType::Jsonb),
1512                DataTypeName::Struct => (
1513                    ScalarImpl::Struct(StructValue::new(vec![
1514                        ScalarImpl::Int64(233).into(),
1515                        ScalarImpl::Float64(23.33.into()).into(),
1516                    ])),
1517                    DataType::Struct(StructType::new(vec![
1518                        ("a", DataType::Int64),
1519                        ("b", DataType::Float64),
1520                    ])),
1521                ),
1522                DataTypeName::List => (
1523                    ScalarImpl::List(ListValue::from_iter([233i64, 2333])),
1524                    DataType::Int64.list(),
1525                ),
1526                DataTypeName::Vector => (
1527                    ScalarImpl::Vector(VectorVal::from_iter(
1528                        (0..VectorVal::TEST_VECTOR_DIMENSION)
1529                            .map(|i| ((i + 1) as f32).try_into().unwrap()),
1530                    )),
1531                    DataType::Vector(VectorVal::TEST_VECTOR_DIMENSION),
1532                ),
1533                DataTypeName::Map => {
1534                    // map is not hashable
1535                    continue;
1536                }
1537            };
1538
1539            test(Some(scalar), data_type.clone());
1540            test(None, data_type);
1541        }
1542    }
1543
1544    #[test]
1545    fn test_data_type_from_str() {
1546        assert_eq!(DataType::from_str("bool").unwrap(), DataType::Boolean);
1547        assert_eq!(DataType::from_str("boolean").unwrap(), DataType::Boolean);
1548        assert_eq!(DataType::from_str("BOOL").unwrap(), DataType::Boolean);
1549        assert_eq!(DataType::from_str("BOOLEAN").unwrap(), DataType::Boolean);
1550
1551        assert_eq!(DataType::from_str("int2").unwrap(), DataType::Int16);
1552        assert_eq!(DataType::from_str("smallint").unwrap(), DataType::Int16);
1553        assert_eq!(DataType::from_str("INT2").unwrap(), DataType::Int16);
1554        assert_eq!(DataType::from_str("SMALLINT").unwrap(), DataType::Int16);
1555
1556        assert_eq!(DataType::from_str("int4").unwrap(), DataType::Int32);
1557        assert_eq!(DataType::from_str("integer").unwrap(), DataType::Int32);
1558        assert_eq!(DataType::from_str("int4").unwrap(), DataType::Int32);
1559        assert_eq!(DataType::from_str("INT4").unwrap(), DataType::Int32);
1560        assert_eq!(DataType::from_str("INTEGER").unwrap(), DataType::Int32);
1561        assert_eq!(DataType::from_str("INT").unwrap(), DataType::Int32);
1562
1563        assert_eq!(DataType::from_str("int8").unwrap(), DataType::Int64);
1564        assert_eq!(DataType::from_str("bigint").unwrap(), DataType::Int64);
1565        assert_eq!(DataType::from_str("INT8").unwrap(), DataType::Int64);
1566        assert_eq!(DataType::from_str("BIGINT").unwrap(), DataType::Int64);
1567
1568        assert_eq!(DataType::from_str("rw_int256").unwrap(), DataType::Int256);
1569        assert_eq!(DataType::from_str("RW_INT256").unwrap(), DataType::Int256);
1570
1571        assert_eq!(DataType::from_str("float4").unwrap(), DataType::Float32);
1572        assert_eq!(DataType::from_str("real").unwrap(), DataType::Float32);
1573        assert_eq!(DataType::from_str("FLOAT4").unwrap(), DataType::Float32);
1574        assert_eq!(DataType::from_str("REAL").unwrap(), DataType::Float32);
1575
1576        assert_eq!(DataType::from_str("float8").unwrap(), DataType::Float64);
1577        assert_eq!(
1578            DataType::from_str("double precision").unwrap(),
1579            DataType::Float64
1580        );
1581        assert_eq!(DataType::from_str("FLOAT8").unwrap(), DataType::Float64);
1582        assert_eq!(
1583            DataType::from_str("DOUBLE PRECISION").unwrap(),
1584            DataType::Float64
1585        );
1586
1587        assert_eq!(DataType::from_str("decimal").unwrap(), DataType::Decimal);
1588        assert_eq!(DataType::from_str("DECIMAL").unwrap(), DataType::Decimal);
1589        assert_eq!(DataType::from_str("numeric").unwrap(), DataType::Decimal);
1590        assert_eq!(DataType::from_str("NUMERIC").unwrap(), DataType::Decimal);
1591
1592        assert_eq!(DataType::from_str("date").unwrap(), DataType::Date);
1593        assert_eq!(DataType::from_str("DATE").unwrap(), DataType::Date);
1594
1595        assert_eq!(DataType::from_str("varchar").unwrap(), DataType::Varchar);
1596        assert_eq!(DataType::from_str("VARCHAR").unwrap(), DataType::Varchar);
1597
1598        assert_eq!(DataType::from_str("time").unwrap(), DataType::Time);
1599        assert_eq!(
1600            DataType::from_str("time without time zone").unwrap(),
1601            DataType::Time
1602        );
1603        assert_eq!(DataType::from_str("TIME").unwrap(), DataType::Time);
1604        assert_eq!(
1605            DataType::from_str("TIME WITHOUT TIME ZONE").unwrap(),
1606            DataType::Time
1607        );
1608
1609        assert_eq!(
1610            DataType::from_str("timestamp").unwrap(),
1611            DataType::Timestamp
1612        );
1613        assert_eq!(
1614            DataType::from_str("timestamp without time zone").unwrap(),
1615            DataType::Timestamp
1616        );
1617        assert_eq!(
1618            DataType::from_str("TIMESTAMP").unwrap(),
1619            DataType::Timestamp
1620        );
1621        assert_eq!(
1622            DataType::from_str("TIMESTAMP WITHOUT TIME ZONE").unwrap(),
1623            DataType::Timestamp
1624        );
1625
1626        assert_eq!(
1627            DataType::from_str("timestamptz").unwrap(),
1628            DataType::Timestamptz
1629        );
1630        assert_eq!(
1631            DataType::from_str("timestamp with time zone").unwrap(),
1632            DataType::Timestamptz
1633        );
1634        assert_eq!(
1635            DataType::from_str("TIMESTAMPTZ").unwrap(),
1636            DataType::Timestamptz
1637        );
1638        assert_eq!(
1639            DataType::from_str("TIMESTAMP WITH TIME ZONE").unwrap(),
1640            DataType::Timestamptz
1641        );
1642
1643        assert_eq!(DataType::from_str("interval").unwrap(), DataType::Interval);
1644        assert_eq!(DataType::from_str("INTERVAL").unwrap(), DataType::Interval);
1645
1646        assert_eq!(
1647            DataType::from_str("int2[]").unwrap(),
1648            DataType::Int16.list()
1649        );
1650        assert_eq!(DataType::from_str("int[]").unwrap(), DataType::Int32.list());
1651        assert_eq!(
1652            DataType::from_str("int8[]").unwrap(),
1653            DataType::Int64.list()
1654        );
1655        assert_eq!(
1656            DataType::from_str("float4[]").unwrap(),
1657            DataType::Float32.list()
1658        );
1659        assert_eq!(
1660            DataType::from_str("float8[]").unwrap(),
1661            DataType::Float64.list()
1662        );
1663        assert_eq!(
1664            DataType::from_str("decimal[]").unwrap(),
1665            DataType::Decimal.list()
1666        );
1667        assert_eq!(
1668            DataType::from_str("varchar[]").unwrap(),
1669            DataType::Varchar.list()
1670        );
1671        assert_eq!(DataType::from_str("date[]").unwrap(), DataType::Date.list());
1672        assert_eq!(DataType::from_str("time[]").unwrap(), DataType::Time.list());
1673        assert_eq!(
1674            DataType::from_str("timestamp[]").unwrap(),
1675            DataType::Timestamp.list()
1676        );
1677        assert_eq!(
1678            DataType::from_str("timestamptz[]").unwrap(),
1679            DataType::Timestamptz.list()
1680        );
1681        assert_eq!(
1682            DataType::from_str("interval[]").unwrap(),
1683            DataType::Interval.list()
1684        );
1685
1686        assert_eq!(
1687            DataType::from_str("record").unwrap(),
1688            DataType::Struct(StructType::unnamed(vec![]))
1689        );
1690        assert_eq!(
1691            DataType::from_str("struct<a int4, b varchar>").unwrap(),
1692            DataType::Struct(StructType::new(vec![
1693                ("a", DataType::Int32),
1694                ("b", DataType::Varchar)
1695            ]))
1696        );
1697    }
1698
1699    #[test]
1700    fn test_can_alter() {
1701        let cannots = [
1702            (DataType::Int32, None),
1703            (DataType::Int32.list(), None),
1704            (
1705                MapType::from_kv(DataType::Varchar, DataType::Int32.list()).into(),
1706                None,
1707            ),
1708            (
1709                StructType::new([("a", DataType::Int32)]).into(),
1710                Some(false),
1711            ),
1712            (
1713                MapType::from_kv(
1714                    DataType::Varchar,
1715                    StructType::new([("a", DataType::Int32)]).into(),
1716                )
1717                .into(),
1718                Some(false),
1719            ),
1720        ];
1721        for (cannot, why) in cannots {
1722            assert_eq!(cannot.can_alter(), why, "{cannot:?}");
1723        }
1724
1725        let cans = [
1726            StructType::new([("a", DataType::Int32), ("b", DataType::Int32.list())])
1727                .with_ids([ColumnId::new(1), ColumnId::new(2)])
1728                .into(),
1729            DataType::list(DataType::Struct(
1730                StructType::new([("a", DataType::Int32)]).with_ids([ColumnId::new(1)]),
1731            )),
1732            MapType::from_kv(
1733                DataType::Varchar,
1734                StructType::new([("a", DataType::Int32)])
1735                    .with_ids([ColumnId::new(1)])
1736                    .into(),
1737            )
1738            .into(),
1739        ];
1740        for can in cans {
1741            assert_eq!(can.can_alter(), Some(true), "{can:?}");
1742        }
1743    }
1744}