risingwave_common/types/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Data types in RisingWave.
16
17// NOTE: When adding or modifying data types, remember to update the type matrix in
18// src/expr/macro/src/types.rs
19
20use std::fmt::Debug;
21use std::hash::Hash;
22use std::str::FromStr;
23
24use bytes::{Buf, BufMut, Bytes};
25use chrono::{Datelike, Timelike};
26use itertools::Itertools;
27use parse_display::{Display, FromStr};
28use paste::paste;
29use postgres_types::{FromSql, IsNull, ToSql, Type};
30use risingwave_common_estimate_size::{EstimateSize, ZeroHeapSize};
31use risingwave_pb::data::PbDataType;
32use risingwave_pb::data::data_type::PbTypeName;
33use rw_iter_util::ZipEqFast as _;
34use serde::{Deserialize, Serialize, Serializer};
35use strum_macros::EnumDiscriminants;
36use thiserror_ext::AsReport;
37
38use crate::array::{
39    ArrayBuilderImpl, ArrayError, ArrayResult, NULL_VAL_FOR_HASH, PrimitiveArrayItemType,
40};
41// Complex type's value is based on the array
42pub use crate::array::{
43    ListRef, ListValue, MapRef, MapValue, StructRef, StructValue, VectorRef, VectorVal,
44};
45use crate::cast::{str_to_bool, str_to_bytea};
46use crate::catalog::ColumnId;
47use crate::error::BoxedError;
48use crate::{
49    dispatch_data_types, dispatch_scalar_ref_variants, dispatch_scalar_variants, for_all_variants,
50};
51
52mod cow;
53mod datetime;
54mod decimal;
55mod fields;
56mod from_sql;
57mod interval;
58mod jsonb;
59mod list_type;
60mod macros;
61mod map_type;
62mod native_type;
63mod num256;
64mod ops;
65mod ordered;
66mod ordered_float;
67pub mod postgres_type;
68mod scalar_impl;
69mod sentinel;
70mod serial;
71mod struct_type;
72mod successor;
73mod timestamptz;
74mod to_binary;
75mod to_sql;
76mod to_text;
77mod with_data_type;
78
79pub use fields::Fields;
80pub use risingwave_fields_derive::Fields;
81use risingwave_pb::id::TypedId;
82
83pub use self::cow::DatumCow;
84pub use self::datetime::{Date, Time, Timestamp};
85pub use self::decimal::{Decimal, PowError as DecimalPowError};
86pub use self::interval::{DateTimeField, Interval, IntervalDisplay, test_utils};
87pub use self::jsonb::{JsonbRef, JsonbVal};
88pub use self::list_type::ListType;
89pub use self::map_type::MapType;
90pub use self::native_type::*;
91pub use self::num256::{Int256, Int256Ref};
92pub use self::ops::{CheckedAdd, IsNegative};
93pub use self::ordered::*;
94pub use self::ordered_float::{FloatExt, IntoOrdered};
95pub use self::scalar_impl::*;
96pub use self::sentinel::Sentinelled;
97pub use self::serial::Serial;
98pub use self::struct_type::StructType;
99pub use self::successor::Successor;
100pub use self::timestamptz::*;
101pub use self::to_text::ToText;
102pub use self::with_data_type::WithDataType;
103
104/// A 32-bit floating point type with total order.
105pub type F32 = ordered_float::OrderedFloat<f32>;
106
107/// A 64-bit floating point type with total order.
108pub type F64 = ordered_float::OrderedFloat<f64>;
109
110pub const DEBEZIUM_UNAVAILABLE_VALUE: &str = "__debezium_unavailable_value";
111
112// Pre-built JSON value for Debezium unavailable value to avoid rebuilding it every time
113pub static DEBEZIUM_UNAVAILABLE_JSON: std::sync::LazyLock<JsonbVal> =
114    std::sync::LazyLock::new(|| {
115        let mut builder = jsonbb::Builder::default();
116        builder.add_string(DEBEZIUM_UNAVAILABLE_VALUE);
117        JsonbVal(builder.finish())
118    });
119
120/// The set of datatypes that are supported in RisingWave.
121///
122/// # Trait implementations
123///
124/// - `EnumDiscriminants` generates [`DataTypeName`] enum with the same variants,
125///   but without data fields.
126/// - `FromStr` is only used internally for tests.
127///   The generated implementation isn't efficient, and doesn't handle whitespaces, etc.
128#[derive(Debug, Display, Clone, PartialEq, Eq, Hash, EnumDiscriminants, FromStr)]
129#[strum_discriminants(derive(Hash, Ord, PartialOrd))]
130#[strum_discriminants(name(DataTypeName))]
131#[strum_discriminants(vis(pub))]
132#[cfg_attr(test, strum_discriminants(derive(strum_macros::EnumIter)))]
133pub enum DataType {
134    #[display("boolean")]
135    #[from_str(regex = "(?i)^bool$|^boolean$")]
136    Boolean,
137    #[display("smallint")]
138    #[from_str(regex = "(?i)^smallint$|^int2$")]
139    Int16,
140    #[display("integer")]
141    #[from_str(regex = "(?i)^integer$|^int$|^int4$")]
142    Int32,
143    #[display("bigint")]
144    #[from_str(regex = "(?i)^bigint$|^int8$")]
145    Int64,
146    #[display("real")]
147    #[from_str(regex = "(?i)^real$|^float4$")]
148    Float32,
149    #[display("double precision")]
150    #[from_str(regex = "(?i)^double precision$|^float8$")]
151    Float64,
152    #[display("numeric")]
153    #[from_str(regex = "(?i)^numeric$|^decimal$")]
154    Decimal,
155    #[display("date")]
156    #[from_str(regex = "(?i)^date$")]
157    Date,
158    #[display("character varying")]
159    #[from_str(regex = "(?i)^character varying$|^varchar$")]
160    Varchar,
161    #[display("time without time zone")]
162    #[from_str(regex = "(?i)^time$|^time without time zone$")]
163    Time,
164    #[display("timestamp without time zone")]
165    #[from_str(regex = "(?i)^timestamp$|^timestamp without time zone$")]
166    Timestamp,
167    #[display("timestamp with time zone")]
168    #[from_str(regex = "(?i)^timestamptz$|^timestamp with time zone$")]
169    Timestamptz,
170    #[display("interval")]
171    #[from_str(regex = "(?i)^interval$")]
172    Interval,
173    #[display("{0}")]
174    #[from_str(regex = "(?i)^(?P<0>.+)$")]
175    Struct(StructType),
176    #[display("{0}")]
177    #[from_str(regex = "(?i)^(?P<0>.+)$")]
178    List(ListType),
179    #[display("bytea")]
180    #[from_str(regex = "(?i)^bytea$")]
181    Bytea,
182    #[display("jsonb")]
183    #[from_str(regex = "(?i)^jsonb$")]
184    Jsonb,
185    #[display("serial")]
186    #[from_str(regex = "(?i)^serial$")]
187    Serial,
188    #[display("rw_int256")]
189    #[from_str(regex = "(?i)^rw_int256$")]
190    Int256,
191    #[display("{0}")]
192    #[from_str(regex = "(?i)^(?P<0>.+)$")]
193    Map(MapType),
194    #[display("vector({0})")]
195    #[from_str(regex = "(?i)^vector\\((?P<0>.+)\\)$")]
196    Vector(usize),
197}
198
199impl !PartialOrd for DataType {}
200
201impl ZeroHeapSize for DataType {}
202
203impl TryFrom<DataTypeName> for DataType {
204    type Error = &'static str;
205
206    fn try_from(type_name: DataTypeName) -> Result<Self, Self::Error> {
207        match type_name {
208            DataTypeName::Boolean => Ok(DataType::Boolean),
209            DataTypeName::Int16 => Ok(DataType::Int16),
210            DataTypeName::Int32 => Ok(DataType::Int32),
211            DataTypeName::Int64 => Ok(DataType::Int64),
212            DataTypeName::Int256 => Ok(DataType::Int256),
213            DataTypeName::Serial => Ok(DataType::Serial),
214            DataTypeName::Decimal => Ok(DataType::Decimal),
215            DataTypeName::Float32 => Ok(DataType::Float32),
216            DataTypeName::Float64 => Ok(DataType::Float64),
217            DataTypeName::Varchar => Ok(DataType::Varchar),
218            DataTypeName::Bytea => Ok(DataType::Bytea),
219            DataTypeName::Date => Ok(DataType::Date),
220            DataTypeName::Timestamp => Ok(DataType::Timestamp),
221            DataTypeName::Timestamptz => Ok(DataType::Timestamptz),
222            DataTypeName::Time => Ok(DataType::Time),
223            DataTypeName::Interval => Ok(DataType::Interval),
224            DataTypeName::Jsonb => Ok(DataType::Jsonb),
225            DataTypeName::Struct
226            | DataTypeName::List
227            | DataTypeName::Map
228            | DataTypeName::Vector => Err(
229                "Functions returning parameterized types can not be inferred. Please use `FunctionCall::new_unchecked`.",
230            ),
231        }
232    }
233}
234
235impl From<&PbDataType> for DataType {
236    fn from(proto: &PbDataType) -> DataType {
237        match proto.get_type_name().expect("missing type field") {
238            PbTypeName::TypeUnspecified => unreachable!(),
239            PbTypeName::Int16 => DataType::Int16,
240            PbTypeName::Int32 => DataType::Int32,
241            PbTypeName::Int64 => DataType::Int64,
242            PbTypeName::Serial => DataType::Serial,
243            PbTypeName::Float => DataType::Float32,
244            PbTypeName::Double => DataType::Float64,
245            PbTypeName::Boolean => DataType::Boolean,
246            PbTypeName::Varchar => DataType::Varchar,
247            PbTypeName::Date => DataType::Date,
248            PbTypeName::Time => DataType::Time,
249            PbTypeName::Timestamp => DataType::Timestamp,
250            PbTypeName::Timestamptz => DataType::Timestamptz,
251            PbTypeName::Decimal => DataType::Decimal,
252            PbTypeName::Interval => DataType::Interval,
253            PbTypeName::Bytea => DataType::Bytea,
254            PbTypeName::Jsonb => DataType::Jsonb,
255            PbTypeName::Struct => {
256                let fields: Vec<DataType> = proto.field_type.iter().map(|f| f.into()).collect_vec();
257                let field_names: Vec<String> = proto.field_names.iter().cloned().collect_vec();
258                let field_ids = (proto.field_ids.iter().copied())
259                    .map(ColumnId::new)
260                    .collect_vec();
261
262                let mut struct_type = if proto.field_names.is_empty() {
263                    StructType::unnamed(fields)
264                } else {
265                    StructType::new(field_names.into_iter().zip_eq_fast(fields))
266                };
267                // `field_ids` is used for nested-schema evolution. Cases when `field_ids` is empty:
268                //
269                // 1. The data type is not associated with a table column, so we don't need to set it.
270                // 2. The column is created before nested-schema evolution is supported, thus is using
271                //    the old serialization format and does not have field ids.
272                // 3. This is an empty struct, which is always considered alterable, and setting ids
273                //    is a no-op.
274                if !field_ids.is_empty() {
275                    struct_type = struct_type.with_ids(field_ids);
276                }
277                struct_type.into()
278            }
279            PbTypeName::List => DataType::list(
280                // The first (and only) item is the list element type.
281                proto.field_type[0].clone().into(),
282            ),
283            PbTypeName::Map => {
284                // Map is physically the same as a list.
285                // So the first (and only) item is the list element type.
286                let list_entries_type: DataType = (&proto.field_type[0]).into();
287                DataType::Map(MapType::from_entries(list_entries_type))
288            }
289            PbTypeName::Vector => DataType::Vector(proto.precision as _),
290            PbTypeName::Int256 => DataType::Int256,
291        }
292    }
293}
294
295impl From<PbDataType> for DataType {
296    fn from(proto: PbDataType) -> DataType {
297        DataType::from(&proto)
298    }
299}
300
301impl From<DataTypeName> for PbTypeName {
302    fn from(type_name: DataTypeName) -> Self {
303        match type_name {
304            DataTypeName::Boolean => PbTypeName::Boolean,
305            DataTypeName::Int16 => PbTypeName::Int16,
306            DataTypeName::Int32 => PbTypeName::Int32,
307            DataTypeName::Int64 => PbTypeName::Int64,
308            DataTypeName::Serial => PbTypeName::Serial,
309            DataTypeName::Float32 => PbTypeName::Float,
310            DataTypeName::Float64 => PbTypeName::Double,
311            DataTypeName::Varchar => PbTypeName::Varchar,
312            DataTypeName::Date => PbTypeName::Date,
313            DataTypeName::Timestamp => PbTypeName::Timestamp,
314            DataTypeName::Timestamptz => PbTypeName::Timestamptz,
315            DataTypeName::Time => PbTypeName::Time,
316            DataTypeName::Interval => PbTypeName::Interval,
317            DataTypeName::Decimal => PbTypeName::Decimal,
318            DataTypeName::Bytea => PbTypeName::Bytea,
319            DataTypeName::Jsonb => PbTypeName::Jsonb,
320            DataTypeName::Struct => PbTypeName::Struct,
321            DataTypeName::List => PbTypeName::List,
322            DataTypeName::Int256 => PbTypeName::Int256,
323            DataTypeName::Map => PbTypeName::Map,
324            DataTypeName::Vector => PbTypeName::Vector,
325        }
326    }
327}
328
329/// Convenient macros to generate match arms for [`DataType`].
330pub mod data_types {
331    use super::DataType;
332
333    /// Numeric [`DataType`]s supported to be `offset` of `RANGE` frame.
334    #[macro_export]
335    macro_rules! _range_frame_numeric_data_types {
336        () => {
337            DataType::Int16
338                | DataType::Int32
339                | DataType::Int64
340                | DataType::Float32
341                | DataType::Float64
342                | DataType::Decimal
343        };
344    }
345    pub use _range_frame_numeric_data_types as range_frame_numeric;
346
347    /// Date/time [`DataType`]s supported to be `offset` of `RANGE` frame.
348    #[macro_export]
349    macro_rules! _range_frame_datetime_data_types {
350        () => {
351            DataType::Date
352                | DataType::Time
353                | DataType::Timestamp
354                | DataType::Timestamptz
355                | DataType::Interval
356        };
357    }
358    pub use _range_frame_datetime_data_types as range_frame_datetime;
359
360    /// Data types that do not have inner fields.
361    #[macro_export]
362    macro_rules! _simple_data_types {
363        () => {
364            DataType::Boolean
365                | DataType::Int16
366                | DataType::Int32
367                | DataType::Int64
368                | DataType::Float32
369                | DataType::Float64
370                | DataType::Decimal
371                | DataType::Date
372                | DataType::Varchar
373                | DataType::Time
374                | DataType::Timestamp
375                | DataType::Timestamptz
376                | DataType::Interval
377                | DataType::Bytea
378                | DataType::Jsonb
379                | DataType::Serial
380                | DataType::Int256
381                | DataType::Vector(_)
382        };
383    }
384    pub use _simple_data_types as simple;
385
386    /// Data types that have inner fields.
387    #[macro_export]
388    macro_rules! _composite_data_types {
389        () => {
390            DataType::Struct { .. } | DataType::List { .. } | DataType::Map { .. }
391        };
392    }
393    pub use _composite_data_types as composite;
394
395    /// Test that all data types are covered either by `simple!()` or `composite!()`.
396    fn _simple_composite_data_types_exhausted(dt: DataType) {
397        match dt {
398            simple!() => {}
399            composite!() => {}
400        }
401    }
402}
403
404impl DataType {
405    /// Same as pgvector; unsure how it was chosen there
406    /// <https://github.com/pgvector/pgvector/blob/v0.8.0/README.md#vector-type>
407    pub const VEC_MAX_SIZE: usize = 16000;
408
409    pub fn create_array_builder(&self, capacity: usize) -> ArrayBuilderImpl {
410        use crate::array::*;
411
412        dispatch_data_types!(self, [B = ArrayBuilder], {
413            B::with_type(capacity, self.clone()).into()
414        })
415    }
416
417    pub fn type_name(&self) -> DataTypeName {
418        DataTypeName::from(self)
419    }
420
421    pub fn prost_type_name(&self) -> PbTypeName {
422        self.type_name().into()
423    }
424
425    pub fn to_protobuf(&self) -> PbDataType {
426        let mut pb = PbDataType {
427            type_name: self.prost_type_name() as i32,
428            is_nullable: true,
429            ..Default::default()
430        };
431        match self {
432            DataType::Struct(t) => {
433                if !t.is_unnamed() {
434                    // To be consistent with `From<&PbDataType>`,
435                    // we only set field names when it's a named struct.
436                    pb.field_names = t.names().map(|s| s.into()).collect();
437                }
438                pb.field_type = t.types().map(|f| f.to_protobuf()).collect();
439                if let Some(ids) = t.ids() {
440                    pb.field_ids = ids.map(|id| id.get_id()).collect();
441                }
442            }
443            DataType::List(list) => {
444                pb.field_type = vec![list.elem().to_protobuf()];
445            }
446            DataType::Map(map) => {
447                // Same as List<Struct<K,V>>
448                pb.field_type = vec![map.clone().into_struct().to_protobuf()];
449            }
450            DataType::Vector(size) => {
451                pb.precision = *size as _;
452            }
453            DataType::Boolean
454            | DataType::Int16
455            | DataType::Int32
456            | DataType::Int64
457            | DataType::Float32
458            | DataType::Float64
459            | DataType::Decimal
460            | DataType::Date
461            | DataType::Varchar
462            | DataType::Time
463            | DataType::Timestamp
464            | DataType::Timestamptz
465            | DataType::Interval
466            | DataType::Bytea
467            | DataType::Jsonb
468            | DataType::Serial
469            | DataType::Int256 => (),
470        }
471        pb
472    }
473
474    pub fn is_numeric(&self) -> bool {
475        matches!(
476            self,
477            DataType::Int16
478                | DataType::Int32
479                | DataType::Int64
480                | DataType::Serial
481                | DataType::Float32
482                | DataType::Float64
483                | DataType::Decimal
484        )
485    }
486
487    /// Returns whether the data type does not have inner fields.
488    pub fn is_simple(&self) -> bool {
489        matches!(self, data_types::simple!())
490    }
491
492    /// Returns whether the data type has inner fields.
493    pub fn is_composite(&self) -> bool {
494        matches!(self, data_types::composite!())
495    }
496
497    pub fn is_array(&self) -> bool {
498        matches!(self, DataType::List(_))
499    }
500
501    pub fn is_struct(&self) -> bool {
502        matches!(self, DataType::Struct(_))
503    }
504
505    pub fn is_map(&self) -> bool {
506        matches!(self, DataType::Map(_))
507    }
508
509    pub fn is_int(&self) -> bool {
510        matches!(self, DataType::Int16 | DataType::Int32 | DataType::Int64)
511    }
512
513    /// Returns the output type of time window function on a given input type.
514    pub fn window_of(input: &DataType) -> Option<DataType> {
515        match input {
516            DataType::Timestamptz => Some(DataType::Timestamptz),
517            DataType::Timestamp | DataType::Date => Some(DataType::Timestamp),
518            _ => None,
519        }
520    }
521
522    pub fn as_struct(&self) -> &StructType {
523        match self {
524            DataType::Struct(t) => t,
525            t => panic!("expect struct type, got {t}"),
526        }
527    }
528
529    pub fn into_struct(self) -> StructType {
530        match self {
531            DataType::Struct(t) => t,
532            t => panic!("expect struct type, got {t}"),
533        }
534    }
535
536    pub fn as_map(&self) -> &MapType {
537        match self {
538            DataType::Map(t) => t,
539            t => panic!("expect map type, got {t}"),
540        }
541    }
542
543    pub fn into_map(self) -> MapType {
544        match self {
545            DataType::Map(t) => t,
546            t => panic!("expect map type, got {t}"),
547        }
548    }
549
550    pub fn as_list(&self) -> &ListType {
551        match self {
552            DataType::List(t) => t,
553            t => panic!("expect list type, got {t}"),
554        }
555    }
556
557    pub fn into_list(self) -> ListType {
558        match self {
559            DataType::List(t) => t,
560            t => panic!("expect list type, got {t}"),
561        }
562    }
563
564    /// Returns the inner element's type if `self` is a list type.
565    /// Equivalent to `self.as_list().elem()`.
566    pub fn as_list_elem(&self) -> &DataType {
567        self.as_list().elem()
568    }
569
570    /// Returns the inner element's type if `self` is a list type.
571    /// Equivalent to `self.into_list().into_elem()`.
572    pub fn into_list_elem(self) -> DataType {
573        self.into_list().into_elem()
574    }
575
576    /// Return a new type that removes the outer list, and get the innermost element type.
577    ///
578    /// Use [`DataType::as_list_elem`] if you only want the element type of a list.
579    ///
580    /// ```
581    /// use risingwave_common::types::DataType::*;
582    /// assert_eq!(Int32.list().unnest_list(), &Int32);
583    /// assert_eq!(Int32.list().list().unnest_list(), &Int32);
584    /// ```
585    pub fn unnest_list(&self) -> &Self {
586        match self {
587            DataType::List(list) => list.elem().unnest_list(),
588            _ => self,
589        }
590    }
591
592    /// Return the number of dimensions of this array/list type. Return `0` when this type is not an
593    /// array/list.
594    pub fn array_ndims(&self) -> usize {
595        let mut d = 0;
596        let mut t = self;
597        while let Self::List(list) = t {
598            d += 1;
599            t = list.elem();
600        }
601        d
602    }
603
604    /// Compares the datatype with another, ignoring nested field names and ids.
605    pub fn equals_datatype(&self, other: &DataType) -> bool {
606        match (self, other) {
607            (Self::Struct(s1), Self::Struct(s2)) => s1.equals_datatype(s2),
608            (Self::List(d1), Self::List(d2)) => d1.elem().equals_datatype(d2.elem()),
609            (Self::Map(m1), Self::Map(m2)) => {
610                m1.key().equals_datatype(m2.key()) && m1.value().equals_datatype(m2.value())
611            }
612            _ => self == other,
613        }
614    }
615
616    /// Whether a column with this data type can be altered to a new data type. This determines
617    /// the encoding of the column data.
618    ///
619    /// Returns...
620    /// - `None`, if the data type is simple or does not contain a struct type.
621    /// - `Some(true)`, if the data type contains a struct type with field ids ([`StructType::has_ids`]).
622    /// - `Some(false)`, if the data type contains a struct type without field ids.
623    pub fn can_alter(&self) -> Option<bool> {
624        match self {
625            data_types::simple!() => None,
626            DataType::Struct(struct_type) => {
627                // As long as we meet a struct type, we can check its `ids` field to determine if
628                // it can be altered.
629                let struct_can_alter = struct_type.has_ids();
630                // In debug build, we assert that once a struct type does (or does not) have ids,
631                // all its composite fields should have the same property.
632                if cfg!(debug_assertions) {
633                    for field in struct_type.types() {
634                        if let Some(field_can_alter) = field.can_alter() {
635                            assert_eq!(struct_can_alter, field_can_alter);
636                        }
637                    }
638                }
639                Some(struct_can_alter)
640            }
641
642            DataType::List(list_type) => list_type.elem().can_alter(),
643            DataType::Map(map_type) => {
644                debug_assert!(
645                    map_type.key().is_simple(),
646                    "unexpected key type of map {map_type:?}"
647                );
648                map_type.value().can_alter()
649            }
650        }
651    }
652}
653
654impl From<StructType> for DataType {
655    fn from(value: StructType) -> Self {
656        Self::Struct(value)
657    }
658}
659
660impl From<DataType> for PbDataType {
661    fn from(data_type: DataType) -> Self {
662        data_type.to_protobuf()
663    }
664}
665
666mod private {
667    use super::*;
668
669    // Note: put pub trait inside a private mod just makes the name private,
670    // The trait methods will still be publicly available...
671    // a.k.a. ["Voldemort type"](https://rust-lang.github.io/rfcs/2145-type-privacy.html#lint-3-voldemort-types-its-reachable-but-i-cant-name-it)
672
673    /// Common trait bounds of scalar and scalar reference types.
674    ///
675    /// NOTE(rc): `Hash` is not in the trait bound list, it's implemented as [`ScalarRef::hash_scalar`].
676    pub trait ScalarBounds<Impl> = Debug
677        + Send
678        + Sync
679        + Clone
680        + PartialEq
681        + Eq
682        // in default ascending order
683        + PartialOrd
684        + Ord
685        + TryFrom<Impl, Error = ArrayError>
686        // `ScalarImpl`/`ScalarRefImpl`
687        + Into<Impl>;
688}
689
690/// `Scalar` is a trait over all possible owned types in the evaluation
691/// framework.
692///
693/// `Scalar` is reciprocal to `ScalarRef`. Use `as_scalar_ref` to get a
694/// reference which has the same lifetime as `self`.
695pub trait Scalar: private::ScalarBounds<ScalarImpl> + 'static {
696    /// Type for reference of `Scalar`
697    type ScalarRefType<'a>: ScalarRef<'a, ScalarType = Self> + 'a
698    where
699        Self: 'a;
700
701    /// Get a reference to current scalar.
702    fn as_scalar_ref(&self) -> Self::ScalarRefType<'_>;
703
704    fn to_scalar_value(self) -> ScalarImpl {
705        self.into()
706    }
707}
708
709/// `ScalarRef` is a trait over all possible references in the evaluation
710/// framework.
711///
712/// `ScalarRef` is reciprocal to `Scalar`. Use `to_owned_scalar` to get an
713/// owned scalar.
714pub trait ScalarRef<'a>: private::ScalarBounds<ScalarRefImpl<'a>> + 'a + Copy {
715    /// `ScalarType` is the owned type of current `ScalarRef`.
716    type ScalarType: Scalar<ScalarRefType<'a> = Self>;
717
718    /// Convert `ScalarRef` to an owned scalar.
719    fn to_owned_scalar(&self) -> Self::ScalarType;
720
721    /// A wrapped hash function to get the hash value for this scaler.
722    fn hash_scalar<H: std::hash::Hasher>(&self, state: &mut H);
723}
724
725/// Define `ScalarImpl` and `ScalarRefImpl` with macro.
726macro_rules! scalar_impl_enum {
727    ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
728        /// `ScalarImpl` embeds all possible scalars in the evaluation framework.
729        ///
730        /// Note: `ScalarImpl` doesn't contain all information of its `DataType`,
731        /// so sometimes they need to be used together.
732        /// e.g., for `Struct`, we don't have the field names in the value.
733        ///
734        /// See `for_all_variants` for the definition.
735        #[derive(Debug, Clone, PartialEq, Eq, EstimateSize)]
736        pub enum ScalarImpl {
737            $( $variant_name($scalar) ),*
738        }
739
740        /// `ScalarRefImpl` embeds all possible scalar references in the evaluation
741        /// framework.
742        ///
743        /// Note: `ScalarRefImpl` doesn't contain all information of its `DataType`,
744        /// so sometimes they need to be used together.
745        /// e.g., for `Struct`, we don't have the field names in the value.
746        ///
747        /// See `for_all_variants` for the definition.
748        #[derive(Debug, Copy, Clone, PartialEq, Eq)]
749        pub enum ScalarRefImpl<'scalar> {
750            $( $variant_name($scalar_ref) ),*
751        }
752    };
753}
754
755for_all_variants! { scalar_impl_enum }
756
757// We MUST NOT implement `Ord` for `ScalarImpl` because that will make `Datum` derive an incorrect
758// default `Ord`. To get a default-ordered `ScalarImpl`/`ScalarRefImpl`/`Datum`/`DatumRef`, you can
759// use `DefaultOrdered<T>`. If non-default order is needed, please refer to `sort_util`.
760impl !PartialOrd for ScalarImpl {}
761impl !PartialOrd for ScalarRefImpl<'_> {}
762
763pub type Datum = Option<ScalarImpl>;
764pub type DatumRef<'a> = Option<ScalarRefImpl<'a>>;
765
766/// This trait is to implement `to_owned_datum` for `Option<ScalarImpl>`
767pub trait ToOwnedDatum {
768    /// Convert the datum to an owned [`Datum`].
769    fn to_owned_datum(self) -> Datum;
770}
771
772impl ToOwnedDatum for &Datum {
773    #[inline(always)]
774    fn to_owned_datum(self) -> Datum {
775        self.clone()
776    }
777}
778
779impl<T: Into<ScalarImpl>> ToOwnedDatum for T {
780    #[inline(always)]
781    fn to_owned_datum(self) -> Datum {
782        Some(self.into())
783    }
784}
785
786impl<T: Into<ScalarImpl>> ToOwnedDatum for Option<T> {
787    #[inline(always)]
788    fn to_owned_datum(self) -> Datum {
789        self.map(Into::into)
790    }
791}
792
793impl<const N: usize> From<TypedId<N, u32>> for ScalarImpl {
794    fn from(value: TypedId<N, u32>) -> Self {
795        value.as_i32_id().into()
796    }
797}
798
799impl<const N: usize> From<TypedId<N, u64>> for ScalarImpl {
800    fn from(value: TypedId<N, u64>) -> Self {
801        value.as_i64_id().into()
802    }
803}
804
805#[auto_impl::auto_impl(&)]
806pub trait ToDatumRef: PartialEq + Eq + Debug + Send + Sync {
807    /// Convert the datum to [`DatumRef`].
808    fn to_datum_ref(&self) -> DatumRef<'_>;
809}
810
811impl ToDatumRef for Datum {
812    #[inline(always)]
813    fn to_datum_ref(&self) -> DatumRef<'_> {
814        self.as_ref().map(|d| d.as_scalar_ref_impl())
815    }
816}
817impl ToDatumRef for Option<&ScalarImpl> {
818    #[inline(always)]
819    fn to_datum_ref(&self) -> DatumRef<'_> {
820        self.map(|d| d.as_scalar_ref_impl())
821    }
822}
823impl ToDatumRef for DatumRef<'_> {
824    #[inline(always)]
825    fn to_datum_ref(&self) -> DatumRef<'_> {
826        *self
827    }
828}
829
830/// To make sure there is `as_scalar_ref` for all scalar ref types.
831/// See <https://github.com/risingwavelabs/risingwave/pull/9977/files#r1208972881>
832///
833/// This is used by the expr macro.
834pub trait SelfAsScalarRef {
835    fn as_scalar_ref(&self) -> Self;
836}
837macro_rules! impl_self_as_scalar_ref {
838    ($($t:ty),*) => {
839        $(
840            impl SelfAsScalarRef for $t {
841                fn as_scalar_ref(&self) -> Self {
842                    *self
843                }
844            }
845        )*
846    };
847}
848impl_self_as_scalar_ref! { &str, &[u8], Int256Ref<'_>, JsonbRef<'_>, ListRef<'_>, StructRef<'_>, ScalarRefImpl<'_>, MapRef<'_> }
849
850/// `for_all_native_types` includes all native variants of our scalar types.
851///
852/// Specifically, it doesn't support u8/u16/u32/u64.
853#[macro_export]
854macro_rules! for_all_native_types {
855    ($macro:ident) => {
856        $macro! {
857            { i16, Int16, read_i16 },
858            { i32, Int32, read_i32 },
859            { i64, Int64, read_i64 },
860            { Serial, Serial, read_i64 },
861            { $crate::types::F32, Float32, read_f32 },
862            { $crate::types::F64, Float64, read_f64 }
863        }
864    };
865}
866
867/// `impl_convert` implements several conversions for `Scalar`.
868/// * `Scalar <-> ScalarImpl` with `From` and `TryFrom` trait.
869/// * `ScalarRef <-> ScalarRefImpl` with `From` and `TryFrom` trait.
870/// * `&ScalarImpl -> &Scalar` with `impl.as_int16()`.
871/// * `ScalarImpl -> Scalar` with `impl.into_int16()`.
872macro_rules! impl_convert {
873    ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty } ),*) => {
874        $(
875            impl From<$scalar> for ScalarImpl {
876                fn from(val: $scalar) -> Self {
877                    ScalarImpl::$variant_name(val)
878                }
879            }
880
881            impl TryFrom<ScalarImpl> for $scalar {
882                type Error = ArrayError;
883
884                fn try_from(val: ScalarImpl) -> ArrayResult<Self> {
885                    match val {
886                        ScalarImpl::$variant_name(scalar) => Ok(scalar),
887                        other_scalar => bail!("cannot convert ScalarImpl::{} to concrete type", other_scalar.get_ident()),
888                    }
889                }
890            }
891
892            impl <'scalar> From<$scalar_ref> for ScalarRefImpl<'scalar> {
893                fn from(val: $scalar_ref) -> Self {
894                    ScalarRefImpl::$variant_name(val)
895                }
896            }
897
898            impl <'scalar> TryFrom<ScalarRefImpl<'scalar>> for $scalar_ref {
899                type Error = ArrayError;
900
901                fn try_from(val: ScalarRefImpl<'scalar>) -> ArrayResult<Self> {
902                    match val {
903                        ScalarRefImpl::$variant_name(scalar_ref) => Ok(scalar_ref),
904                        other_scalar => bail!("cannot convert ScalarRefImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name)),
905                    }
906                }
907            }
908
909            paste! {
910                impl ScalarImpl {
911                    /// # Panics
912                    /// If the scalar is not of the expected type.
913                    pub fn [<as_ $suffix_name>](&self) -> &$scalar {
914                        match self {
915                            Self::$variant_name(scalar) => scalar,
916                            other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
917                        }
918                    }
919
920                    /// # Panics
921                    /// If the scalar is not of the expected type.
922                    pub fn [<into_ $suffix_name>](self) -> $scalar {
923                        match self {
924                            Self::$variant_name(scalar) => scalar,
925                            other_scalar =>  panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
926                        }
927                    }
928                }
929
930                impl <'scalar> ScalarRefImpl<'scalar> {
931                    /// # Panics
932                    /// If the scalar is not of the expected type.
933                    pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
934                        match self {
935                            Self::$variant_name(inner) => inner,
936                            other_scalar => panic!("cannot convert ScalarRefImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
937                        }
938                    }
939                }
940            }
941        )*
942    };
943}
944
945for_all_variants! { impl_convert }
946
947// Implement `From<raw float>` for `ScalarImpl::Float` as a sugar.
948impl From<f32> for ScalarImpl {
949    fn from(f: f32) -> Self {
950        Self::Float32(f.into())
951    }
952}
953impl From<f64> for ScalarImpl {
954    fn from(f: f64) -> Self {
955        Self::Float64(f.into())
956    }
957}
958
959// Implement `From<string like>` for `ScalarImpl::Utf8` as a sugar.
960impl From<String> for ScalarImpl {
961    fn from(s: String) -> Self {
962        Self::Utf8(s.into_boxed_str())
963    }
964}
965impl From<&str> for ScalarImpl {
966    fn from(s: &str) -> Self {
967        Self::Utf8(s.into())
968    }
969}
970impl From<&String> for ScalarImpl {
971    fn from(s: &String) -> Self {
972        Self::Utf8(s.as_str().into())
973    }
974}
975impl TryFrom<ScalarImpl> for String {
976    type Error = ArrayError;
977
978    fn try_from(val: ScalarImpl) -> ArrayResult<Self> {
979        match val {
980            ScalarImpl::Utf8(s) => Ok(s.into()),
981            other_scalar => bail!(
982                "cannot convert ScalarImpl::{} to concrete type",
983                other_scalar.get_ident()
984            ),
985        }
986    }
987}
988
989impl From<char> for ScalarImpl {
990    fn from(c: char) -> Self {
991        Self::Utf8(c.to_string().into())
992    }
993}
994
995impl From<&[u8]> for ScalarImpl {
996    fn from(s: &[u8]) -> Self {
997        Self::Bytea(s.into())
998    }
999}
1000
1001impl From<JsonbRef<'_>> for ScalarImpl {
1002    fn from(jsonb: JsonbRef<'_>) -> Self {
1003        Self::Jsonb(jsonb.to_owned_scalar())
1004    }
1005}
1006
1007impl<T: PrimitiveArrayItemType> From<Vec<T>> for ScalarImpl {
1008    fn from(v: Vec<T>) -> Self {
1009        Self::List(v.into_iter().collect())
1010    }
1011}
1012
1013impl<T: PrimitiveArrayItemType> From<Vec<Option<T>>> for ScalarImpl {
1014    fn from(v: Vec<Option<T>>) -> Self {
1015        Self::List(v.into_iter().collect())
1016    }
1017}
1018
1019impl From<Vec<String>> for ScalarImpl {
1020    fn from(v: Vec<String>) -> Self {
1021        Self::List(v.iter().map(|s| s.as_str()).collect())
1022    }
1023}
1024
1025impl From<Vec<u8>> for ScalarImpl {
1026    fn from(v: Vec<u8>) -> Self {
1027        Self::Bytea(v.into())
1028    }
1029}
1030
1031impl From<Bytes> for ScalarImpl {
1032    fn from(v: Bytes) -> Self {
1033        Self::Bytea(v.as_ref().into())
1034    }
1035}
1036
1037impl From<ListRef<'_>> for ScalarImpl {
1038    fn from(list: ListRef<'_>) -> Self {
1039        Self::List(list.to_owned_scalar())
1040    }
1041}
1042
1043impl ScalarImpl {
1044    /// Creates a scalar from pgwire "BINARY" format.
1045    ///
1046    /// The counterpart of [`to_binary::ToBinary`].
1047    pub fn from_binary(bytes: &Bytes, data_type: &DataType) -> Result<Self, BoxedError> {
1048        let res = match data_type {
1049            DataType::Varchar => Self::Utf8(String::from_sql(&Type::VARCHAR, bytes)?.into()),
1050            DataType::Bytea => Self::Bytea(Vec::<u8>::from_sql(&Type::BYTEA, bytes)?.into()),
1051            DataType::Boolean => Self::Bool(bool::from_sql(&Type::BOOL, bytes)?),
1052            DataType::Int16 => Self::Int16(i16::from_sql(&Type::INT2, bytes)?),
1053            DataType::Int32 => Self::Int32(i32::from_sql(&Type::INT4, bytes)?),
1054            DataType::Int64 => Self::Int64(i64::from_sql(&Type::INT8, bytes)?),
1055            DataType::Serial => Self::Serial(Serial::from(i64::from_sql(&Type::INT8, bytes)?)),
1056            DataType::Float32 => Self::Float32(f32::from_sql(&Type::FLOAT4, bytes)?.into()),
1057            DataType::Float64 => Self::Float64(f64::from_sql(&Type::FLOAT8, bytes)?.into()),
1058            DataType::Decimal => {
1059                Self::Decimal(rust_decimal::Decimal::from_sql(&Type::NUMERIC, bytes)?.into())
1060            }
1061            DataType::Date => Self::Date(chrono::NaiveDate::from_sql(&Type::DATE, bytes)?.into()),
1062            DataType::Time => Self::Time(chrono::NaiveTime::from_sql(&Type::TIME, bytes)?.into()),
1063            DataType::Timestamp => {
1064                Self::Timestamp(chrono::NaiveDateTime::from_sql(&Type::TIMESTAMP, bytes)?.into())
1065            }
1066            DataType::Timestamptz => Self::Timestamptz(
1067                chrono::DateTime::<chrono::Utc>::from_sql(&Type::TIMESTAMPTZ, bytes)?.into(),
1068            ),
1069            DataType::Interval => Self::Interval(Interval::from_sql(&Type::INTERVAL, bytes)?),
1070            DataType::Jsonb => Self::Jsonb(
1071                JsonbVal::value_deserialize(bytes)
1072                    .ok_or_else(|| "invalid value of Jsonb".to_owned())?,
1073            ),
1074            DataType::Int256 => Self::Int256(Int256::from_binary(bytes)?),
1075            DataType::Vector(_) | DataType::Struct(_) | DataType::List(_) | DataType::Map(_) => {
1076                return Err(format!("unsupported data type: {}", data_type).into());
1077            }
1078        };
1079        Ok(res)
1080    }
1081
1082    /// Creates a scalar from pgwire "TEXT" format.
1083    ///
1084    /// The counterpart of [`ToText`].
1085    pub fn from_text(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
1086        Ok(match data_type {
1087            DataType::Boolean => str_to_bool(s)?.into(),
1088            DataType::Int16 => i16::from_str(s)?.into(),
1089            DataType::Int32 => i32::from_str(s)?.into(),
1090            DataType::Int64 => i64::from_str(s)?.into(),
1091            DataType::Int256 => Int256::from_str(s)?.into(),
1092            DataType::Serial => Serial::from(i64::from_str(s)?).into(),
1093            DataType::Decimal => Decimal::from_str(s)?.into(),
1094            DataType::Float32 => F32::from_str(s)?.into(),
1095            DataType::Float64 => F64::from_str(s)?.into(),
1096            DataType::Varchar => s.into(),
1097            DataType::Date => Date::from_str(s)?.into(),
1098            DataType::Timestamp => Timestamp::from_str(s)?.into(),
1099            // We only handle the case with timezone here, and leave the implicit session timezone case
1100            // for later phase.
1101            DataType::Timestamptz => Timestamptz::from_str(s)?.into(),
1102            DataType::Time => Time::from_str(s)?.into(),
1103            DataType::Interval => Interval::from_str(s)?.into(),
1104            DataType::List(_) => ListValue::from_str(s, data_type)?.into(),
1105            DataType::Struct(st) => StructValue::from_str(s, st)?.into(),
1106            DataType::Jsonb => JsonbVal::from_str(s)?.into(),
1107            DataType::Bytea => {
1108                let mut buf = Vec::new();
1109                str_to_bytea(s, &mut buf)?;
1110                buf.into()
1111            }
1112            DataType::Vector(size) => VectorVal::from_text(s, *size)?.into(),
1113            DataType::Map(_m) => return Err("map from text is not supported".into()),
1114        })
1115    }
1116
1117    pub fn from_text_for_test(s: &str, data_type: &DataType) -> Result<Self, BoxedError> {
1118        Ok(match data_type {
1119            DataType::Map(map_type) => MapValue::from_str_for_test(s, map_type)?.into(),
1120            _ => ScalarImpl::from_text(s, data_type)?,
1121        })
1122    }
1123}
1124
1125impl From<ScalarRefImpl<'_>> for ScalarImpl {
1126    fn from(scalar_ref: ScalarRefImpl<'_>) -> Self {
1127        scalar_ref.into_scalar_impl()
1128    }
1129}
1130
1131impl<'a> From<&'a ScalarImpl> for ScalarRefImpl<'a> {
1132    fn from(scalar: &'a ScalarImpl) -> Self {
1133        scalar.as_scalar_ref_impl()
1134    }
1135}
1136
1137impl ScalarImpl {
1138    /// Converts [`ScalarImpl`] to [`ScalarRefImpl`]
1139    pub fn as_scalar_ref_impl(&self) -> ScalarRefImpl<'_> {
1140        dispatch_scalar_variants!(self, inner, { inner.as_scalar_ref().into() })
1141    }
1142}
1143
1144impl ScalarRefImpl<'_> {
1145    /// Converts [`ScalarRefImpl`] to [`ScalarImpl`]
1146    pub fn into_scalar_impl(self) -> ScalarImpl {
1147        dispatch_scalar_ref_variants!(self, inner, { inner.to_owned_scalar().into() })
1148    }
1149}
1150
1151impl Hash for ScalarImpl {
1152    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1153        dispatch_scalar_variants!(self, inner, { inner.as_scalar_ref().hash_scalar(state) })
1154    }
1155}
1156
1157impl Hash for ScalarRefImpl<'_> {
1158    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1159        dispatch_scalar_ref_variants!(self, inner, { inner.hash_scalar(state) })
1160    }
1161}
1162
1163/// Feeds the raw scalar reference of `datum` to the given `state`, which should behave the same
1164/// as [`crate::array::Array::hash_at`], where NULL value will be carefully handled.
1165///
1166/// **FIXME**: the result of this function might be different from [`std::hash::Hash`] due to the
1167/// type alias of `DatumRef = Option<_>`, we should manually implement [`std::hash::Hash`] for
1168/// [`DatumRef`] in the future when it becomes a newtype. (#477)
1169#[inline(always)]
1170pub fn hash_datum(datum: impl ToDatumRef, state: &mut impl std::hash::Hasher) {
1171    match datum.to_datum_ref() {
1172        Some(scalar_ref) => scalar_ref.hash(state),
1173        None => NULL_VAL_FOR_HASH.hash(state),
1174    }
1175}
1176
1177impl ScalarRefImpl<'_> {
1178    pub fn binary_format(&self, data_type: &DataType) -> to_binary::Result<Bytes> {
1179        use self::to_binary::ToBinary;
1180        self.to_binary_with_type(data_type)
1181    }
1182
1183    pub fn text_format(&self, data_type: &DataType) -> String {
1184        self.to_text_with_type(data_type)
1185    }
1186
1187    /// Serialize the scalar into the `memcomparable` format.
1188    pub fn serialize(
1189        &self,
1190        ser: &mut memcomparable::Serializer<impl BufMut>,
1191    ) -> memcomparable::Result<()> {
1192        match self {
1193            Self::Int16(v) => v.serialize(ser)?,
1194            Self::Int32(v) => v.serialize(ser)?,
1195            Self::Int64(v) => v.serialize(ser)?,
1196            Self::Serial(v) => v.serialize(ser)?,
1197            Self::Float32(v) => v.serialize(ser)?,
1198            Self::Float64(v) => v.serialize(ser)?,
1199            Self::Utf8(v) => v.serialize(ser)?,
1200            Self::Bytea(v) => ser.serialize_bytes(v)?,
1201            Self::Bool(v) => v.serialize(ser)?,
1202            Self::Decimal(v) => ser.serialize_decimal((*v).into())?,
1203            Self::Interval(v) => v.serialize(ser)?,
1204            Self::Date(v) => v.0.num_days_from_ce().serialize(ser)?,
1205            Self::Timestamp(v) => {
1206                v.0.and_utc().timestamp().serialize(&mut *ser)?;
1207                v.0.and_utc().timestamp_subsec_nanos().serialize(ser)?;
1208            }
1209            Self::Timestamptz(v) => v.serialize(ser)?,
1210            Self::Time(v) => {
1211                v.0.num_seconds_from_midnight().serialize(&mut *ser)?;
1212                v.0.nanosecond().serialize(ser)?;
1213            }
1214            Self::Int256(v) => v.memcmp_serialize(ser)?,
1215            Self::Jsonb(v) => v.memcmp_serialize(ser)?,
1216            Self::Struct(v) => v.memcmp_serialize(ser)?,
1217            Self::List(v) => v.memcmp_serialize(ser)?,
1218            Self::Map(v) => v.memcmp_serialize(ser)?,
1219            Self::Vector(v) => v.memcmp_serialize(ser)?,
1220        };
1221        Ok(())
1222    }
1223}
1224
1225impl ScalarImpl {
1226    /// Serialize the scalar into the `memcomparable` format.
1227    pub fn serialize(
1228        &self,
1229        ser: &mut memcomparable::Serializer<impl BufMut>,
1230    ) -> memcomparable::Result<()> {
1231        self.as_scalar_ref_impl().serialize(ser)
1232    }
1233
1234    /// Deserialize the scalar from the `memcomparable` format.
1235    pub fn deserialize(
1236        ty: &DataType,
1237        de: &mut memcomparable::Deserializer<impl Buf>,
1238    ) -> memcomparable::Result<Self> {
1239        use DataType as Ty;
1240        Ok(match ty {
1241            Ty::Int16 => Self::Int16(i16::deserialize(de)?),
1242            Ty::Int32 => Self::Int32(i32::deserialize(de)?),
1243            Ty::Int64 => Self::Int64(i64::deserialize(de)?),
1244            Ty::Int256 => Self::Int256(Int256::memcmp_deserialize(de)?),
1245            Ty::Serial => Self::Serial(Serial::from(i64::deserialize(de)?)),
1246            Ty::Float32 => Self::Float32(f32::deserialize(de)?.into()),
1247            Ty::Float64 => Self::Float64(f64::deserialize(de)?.into()),
1248            Ty::Varchar => Self::Utf8(Box::<str>::deserialize(de)?),
1249            Ty::Bytea => Self::Bytea(serde_bytes::ByteBuf::deserialize(de)?.into_vec().into()),
1250            Ty::Boolean => Self::Bool(bool::deserialize(de)?),
1251            Ty::Decimal => Self::Decimal(de.deserialize_decimal()?.into()),
1252            Ty::Interval => Self::Interval(Interval::deserialize(de)?),
1253            Ty::Time => Self::Time({
1254                let secs = u32::deserialize(&mut *de)?;
1255                let nano = u32::deserialize(de)?;
1256                Time::with_secs_nano(secs, nano)
1257                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1258            }),
1259            Ty::Timestamp => Self::Timestamp({
1260                let secs = i64::deserialize(&mut *de)?;
1261                let nsecs = u32::deserialize(de)?;
1262                Timestamp::with_secs_nsecs(secs, nsecs)
1263                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1264            }),
1265            Ty::Timestamptz => Self::Timestamptz(Timestamptz::deserialize(de)?),
1266            Ty::Date => Self::Date({
1267                let days = i32::deserialize(de)?;
1268                Date::with_days_since_ce(days)
1269                    .map_err(|e| memcomparable::Error::Message(e.to_report_string()))?
1270            }),
1271            Ty::Jsonb => Self::Jsonb(JsonbVal::memcmp_deserialize(de)?),
1272            Ty::Struct(t) => StructValue::memcmp_deserialize(t.types(), de)?.to_scalar_value(),
1273            Ty::List(t) => ListValue::memcmp_deserialize(t, de)?.to_scalar_value(),
1274            Ty::Map(t) => MapValue::memcmp_deserialize(t, de)?.to_scalar_value(),
1275            Ty::Vector(dimension) => {
1276                VectorVal::memcmp_deserialize(*dimension, de)?.to_scalar_value()
1277            }
1278        })
1279    }
1280
1281    pub fn as_integral(&self) -> i64 {
1282        match self {
1283            Self::Int16(v) => *v as i64,
1284            Self::Int32(v) => *v as i64,
1285            Self::Int64(v) => *v,
1286            _ => panic!(
1287                "Can't convert ScalarImpl::{} to a integral",
1288                self.get_ident()
1289            ),
1290        }
1291    }
1292}
1293
1294/// Returns whether the `literal` matches the `data_type`.
1295pub fn literal_type_match(data_type: &DataType, literal: Option<&ScalarImpl>) -> bool {
1296    match literal {
1297        None => true,
1298        Some(scalar) => scalar_ref_type_match(data_type, scalar.as_scalar_ref_impl()),
1299    }
1300}
1301
1302/// Returns whether the scalar ref matches the `data_type`.
1303///
1304/// This is a lightweight "shape check" intended for callers that need to avoid panics on
1305/// malformed input. For nested types, it checks element/field types recursively.
1306pub fn scalar_ref_type_match(data_type: &DataType, scalar: ScalarRefImpl<'_>) -> bool {
1307    match (data_type, scalar) {
1308        (DataType::List(list_type), ScalarRefImpl::List(v)) => {
1309            v.elem_type().equals_datatype(list_type.elem())
1310        }
1311        (DataType::Map(map_type), ScalarRefImpl::Map(v)) => v
1312            .inner()
1313            .elem_type()
1314            .equals_datatype(&map_type.clone().into_struct()),
1315        (DataType::Vector(size), ScalarRefImpl::Vector(v)) => v.dimension() == *size,
1316        (DataType::Struct(struct_type), ScalarRefImpl::Struct(v)) => {
1317            struct_ref_type_match(struct_type, v)
1318        }
1319
1320        _ => {
1321            macro_rules! matches {
1322                ($( { $data_type:ident, $variant_name:ident, $suffix_name:ident, $scalar:ty, $scalar_ref:ty, $array:ty, $builder:ty }),*) => {
1323                    match (data_type, scalar) {
1324                        $(
1325                            (DataType::$data_type { .. }, ScalarRefImpl::$variant_name(_)) => true,
1326                            (DataType::$data_type { .. }, _) => false, // keep exhaustive over DataType variants
1327                        )*
1328                    }
1329                }
1330            }
1331            for_all_variants! { matches }
1332        }
1333    }
1334}
1335
1336/// Returns whether the `datum` matches the `data_type`.
1337#[inline(always)]
1338pub fn datum_ref_type_match(data_type: &DataType, datum: DatumRef<'_>) -> bool {
1339    match datum {
1340        None => true,
1341        Some(scalar) => scalar_ref_type_match(data_type, scalar),
1342    }
1343}
1344
1345fn struct_ref_type_match(expected: &StructType, value: StructRef<'_>) -> bool {
1346    match value {
1347        StructRef::Indexed { arr, .. } => {
1348            // `StructRef::Indexed` comes with a `StructArray`, whose type can be compared directly.
1349            crate::array::Array::data_type(arr).equals_datatype(&DataType::Struct(expected.clone()))
1350        }
1351        StructRef::ValueRef { val } => {
1352            let fields = val.fields();
1353            if fields.len() != expected.len() {
1354                return false;
1355            }
1356            expected
1357                .types()
1358                .zip_eq_fast(fields.iter())
1359                .all(|(ty, datum)| datum_ref_type_match(ty, datum.to_datum_ref()))
1360        }
1361    }
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use std::hash::{BuildHasher, Hasher};
1367
1368    use strum::IntoEnumIterator;
1369
1370    use super::*;
1371    use crate::util::hash_util::Crc32FastBuilder;
1372
1373    #[test]
1374    fn test_size() {
1375        use static_assertions::const_assert_eq;
1376
1377        use crate::array::*;
1378
1379        macro_rules! assert_item_size_eq {
1380            ($array:ty, $size:literal) => {
1381                const_assert_eq!(std::mem::size_of::<<$array as Array>::OwnedItem>(), $size);
1382            };
1383        }
1384
1385        assert_item_size_eq!(StructArray, 16); // Box<[Datum]>
1386        assert_item_size_eq!(ListArray, 8); // Box<ArrayImpl>
1387        assert_item_size_eq!(Utf8Array, 16); // Box<str>
1388        assert_item_size_eq!(IntervalArray, 16);
1389        assert_item_size_eq!(TimestampArray, 12);
1390
1391        // TODO: try to reduce the memory usage of `Decimal`, `ScalarImpl` and `Datum`.
1392        assert_item_size_eq!(DecimalArray, 20);
1393
1394        const_assert_eq!(std::mem::size_of::<ScalarImpl>(), 24);
1395        const_assert_eq!(std::mem::size_of::<ScalarRefImpl<'_>>(), 24);
1396        const_assert_eq!(std::mem::size_of::<Datum>(), 24);
1397        const_assert_eq!(std::mem::size_of::<StructType>(), 8);
1398        const_assert_eq!(std::mem::size_of::<DataType>(), 16);
1399    }
1400
1401    #[test]
1402    fn test_data_type_display() {
1403        let d: DataType =
1404            StructType::new(vec![("i", DataType::Int32), ("j", DataType::Varchar)]).into();
1405        assert_eq!(
1406            format!("{}", d),
1407            "struct<i integer, j character varying>".to_owned()
1408        );
1409    }
1410
1411    #[test]
1412    fn test_hash_implementation() {
1413        fn test(datum: Datum, data_type: DataType) {
1414            assert!(literal_type_match(&data_type, datum.as_ref()));
1415
1416            let mut builder = data_type.create_array_builder(6);
1417            for _ in 0..3 {
1418                builder.append_null();
1419                builder.append(&datum);
1420            }
1421            let array = builder.finish();
1422
1423            let hash_from_array = {
1424                let mut state = Crc32FastBuilder.build_hasher();
1425                array.hash_at(3, &mut state);
1426                state.finish()
1427            };
1428
1429            let hash_from_datum = {
1430                let mut state = Crc32FastBuilder.build_hasher();
1431                hash_datum(&datum, &mut state);
1432                state.finish()
1433            };
1434
1435            let hash_from_datum_ref = {
1436                let mut state = Crc32FastBuilder.build_hasher();
1437                hash_datum(datum.to_datum_ref(), &mut state);
1438                state.finish()
1439            };
1440
1441            assert_eq!(hash_from_array, hash_from_datum);
1442            assert_eq!(hash_from_datum, hash_from_datum_ref);
1443        }
1444
1445        for name in DataTypeName::iter() {
1446            let (scalar, data_type) = match name {
1447                DataTypeName::Boolean => (ScalarImpl::Bool(true), DataType::Boolean),
1448                DataTypeName::Int16 => (ScalarImpl::Int16(233), DataType::Int16),
1449                DataTypeName::Int32 => (ScalarImpl::Int32(233333), DataType::Int32),
1450                DataTypeName::Int64 => (ScalarImpl::Int64(233333333333), DataType::Int64),
1451                DataTypeName::Int256 => (
1452                    ScalarImpl::Int256(233333333333_i64.into()),
1453                    DataType::Int256,
1454                ),
1455                DataTypeName::Serial => (ScalarImpl::Serial(233333333333.into()), DataType::Serial),
1456                DataTypeName::Float32 => (ScalarImpl::Float32(23.33.into()), DataType::Float32),
1457                DataTypeName::Float64 => (
1458                    ScalarImpl::Float64(23.333333333333.into()),
1459                    DataType::Float64,
1460                ),
1461                DataTypeName::Decimal => (
1462                    ScalarImpl::Decimal("233.33".parse().unwrap()),
1463                    DataType::Decimal,
1464                ),
1465                DataTypeName::Date => (
1466                    ScalarImpl::Date(Date::from_ymd_uncheck(2333, 3, 3)),
1467                    DataType::Date,
1468                ),
1469                DataTypeName::Varchar => (ScalarImpl::Utf8("233".into()), DataType::Varchar),
1470                DataTypeName::Bytea => (
1471                    ScalarImpl::Bytea("\\x233".as_bytes().into()),
1472                    DataType::Bytea,
1473                ),
1474                DataTypeName::Time => (
1475                    ScalarImpl::Time(Time::from_hms_uncheck(2, 3, 3)),
1476                    DataType::Time,
1477                ),
1478                DataTypeName::Timestamp => (
1479                    ScalarImpl::Timestamp(Timestamp::from_timestamp_uncheck(23333333, 2333)),
1480                    DataType::Timestamp,
1481                ),
1482                DataTypeName::Timestamptz => (
1483                    ScalarImpl::Timestamptz(Timestamptz::from_micros(233333333)),
1484                    DataType::Timestamptz,
1485                ),
1486                DataTypeName::Interval => (
1487                    ScalarImpl::Interval(Interval::from_month_day_usec(2, 3, 3333)),
1488                    DataType::Interval,
1489                ),
1490                DataTypeName::Jsonb => (ScalarImpl::Jsonb(JsonbVal::null()), DataType::Jsonb),
1491                DataTypeName::Struct => (
1492                    ScalarImpl::Struct(StructValue::new(vec![
1493                        ScalarImpl::Int64(233).into(),
1494                        ScalarImpl::Float64(23.33.into()).into(),
1495                    ])),
1496                    DataType::Struct(StructType::new(vec![
1497                        ("a", DataType::Int64),
1498                        ("b", DataType::Float64),
1499                    ])),
1500                ),
1501                DataTypeName::List => (
1502                    ScalarImpl::List(ListValue::from_iter([233i64, 2333])),
1503                    DataType::Int64.list(),
1504                ),
1505                DataTypeName::Vector => (
1506                    ScalarImpl::Vector(VectorVal::from_iter(
1507                        (0..VectorVal::TEST_VECTOR_DIMENSION)
1508                            .map(|i| ((i + 1) as f32).try_into().unwrap()),
1509                    )),
1510                    DataType::Vector(VectorVal::TEST_VECTOR_DIMENSION),
1511                ),
1512                DataTypeName::Map => {
1513                    // map is not hashable
1514                    continue;
1515                }
1516            };
1517
1518            test(Some(scalar), data_type.clone());
1519            test(None, data_type);
1520        }
1521    }
1522
1523    #[test]
1524    fn test_data_type_from_str() {
1525        assert_eq!(DataType::from_str("bool").unwrap(), DataType::Boolean);
1526        assert_eq!(DataType::from_str("boolean").unwrap(), DataType::Boolean);
1527        assert_eq!(DataType::from_str("BOOL").unwrap(), DataType::Boolean);
1528        assert_eq!(DataType::from_str("BOOLEAN").unwrap(), DataType::Boolean);
1529
1530        assert_eq!(DataType::from_str("int2").unwrap(), DataType::Int16);
1531        assert_eq!(DataType::from_str("smallint").unwrap(), DataType::Int16);
1532        assert_eq!(DataType::from_str("INT2").unwrap(), DataType::Int16);
1533        assert_eq!(DataType::from_str("SMALLINT").unwrap(), DataType::Int16);
1534
1535        assert_eq!(DataType::from_str("int4").unwrap(), DataType::Int32);
1536        assert_eq!(DataType::from_str("integer").unwrap(), DataType::Int32);
1537        assert_eq!(DataType::from_str("int4").unwrap(), DataType::Int32);
1538        assert_eq!(DataType::from_str("INT4").unwrap(), DataType::Int32);
1539        assert_eq!(DataType::from_str("INTEGER").unwrap(), DataType::Int32);
1540        assert_eq!(DataType::from_str("INT").unwrap(), DataType::Int32);
1541
1542        assert_eq!(DataType::from_str("int8").unwrap(), DataType::Int64);
1543        assert_eq!(DataType::from_str("bigint").unwrap(), DataType::Int64);
1544        assert_eq!(DataType::from_str("INT8").unwrap(), DataType::Int64);
1545        assert_eq!(DataType::from_str("BIGINT").unwrap(), DataType::Int64);
1546
1547        assert_eq!(DataType::from_str("rw_int256").unwrap(), DataType::Int256);
1548        assert_eq!(DataType::from_str("RW_INT256").unwrap(), DataType::Int256);
1549
1550        assert_eq!(DataType::from_str("float4").unwrap(), DataType::Float32);
1551        assert_eq!(DataType::from_str("real").unwrap(), DataType::Float32);
1552        assert_eq!(DataType::from_str("FLOAT4").unwrap(), DataType::Float32);
1553        assert_eq!(DataType::from_str("REAL").unwrap(), DataType::Float32);
1554
1555        assert_eq!(DataType::from_str("float8").unwrap(), DataType::Float64);
1556        assert_eq!(
1557            DataType::from_str("double precision").unwrap(),
1558            DataType::Float64
1559        );
1560        assert_eq!(DataType::from_str("FLOAT8").unwrap(), DataType::Float64);
1561        assert_eq!(
1562            DataType::from_str("DOUBLE PRECISION").unwrap(),
1563            DataType::Float64
1564        );
1565
1566        assert_eq!(DataType::from_str("decimal").unwrap(), DataType::Decimal);
1567        assert_eq!(DataType::from_str("DECIMAL").unwrap(), DataType::Decimal);
1568        assert_eq!(DataType::from_str("numeric").unwrap(), DataType::Decimal);
1569        assert_eq!(DataType::from_str("NUMERIC").unwrap(), DataType::Decimal);
1570
1571        assert_eq!(DataType::from_str("date").unwrap(), DataType::Date);
1572        assert_eq!(DataType::from_str("DATE").unwrap(), DataType::Date);
1573
1574        assert_eq!(DataType::from_str("varchar").unwrap(), DataType::Varchar);
1575        assert_eq!(DataType::from_str("VARCHAR").unwrap(), DataType::Varchar);
1576
1577        assert_eq!(DataType::from_str("time").unwrap(), DataType::Time);
1578        assert_eq!(
1579            DataType::from_str("time without time zone").unwrap(),
1580            DataType::Time
1581        );
1582        assert_eq!(DataType::from_str("TIME").unwrap(), DataType::Time);
1583        assert_eq!(
1584            DataType::from_str("TIME WITHOUT TIME ZONE").unwrap(),
1585            DataType::Time
1586        );
1587
1588        assert_eq!(
1589            DataType::from_str("timestamp").unwrap(),
1590            DataType::Timestamp
1591        );
1592        assert_eq!(
1593            DataType::from_str("timestamp without time zone").unwrap(),
1594            DataType::Timestamp
1595        );
1596        assert_eq!(
1597            DataType::from_str("TIMESTAMP").unwrap(),
1598            DataType::Timestamp
1599        );
1600        assert_eq!(
1601            DataType::from_str("TIMESTAMP WITHOUT TIME ZONE").unwrap(),
1602            DataType::Timestamp
1603        );
1604
1605        assert_eq!(
1606            DataType::from_str("timestamptz").unwrap(),
1607            DataType::Timestamptz
1608        );
1609        assert_eq!(
1610            DataType::from_str("timestamp with time zone").unwrap(),
1611            DataType::Timestamptz
1612        );
1613        assert_eq!(
1614            DataType::from_str("TIMESTAMPTZ").unwrap(),
1615            DataType::Timestamptz
1616        );
1617        assert_eq!(
1618            DataType::from_str("TIMESTAMP WITH TIME ZONE").unwrap(),
1619            DataType::Timestamptz
1620        );
1621
1622        assert_eq!(DataType::from_str("interval").unwrap(), DataType::Interval);
1623        assert_eq!(DataType::from_str("INTERVAL").unwrap(), DataType::Interval);
1624
1625        assert_eq!(
1626            DataType::from_str("int2[]").unwrap(),
1627            DataType::Int16.list()
1628        );
1629        assert_eq!(DataType::from_str("int[]").unwrap(), DataType::Int32.list());
1630        assert_eq!(
1631            DataType::from_str("int8[]").unwrap(),
1632            DataType::Int64.list()
1633        );
1634        assert_eq!(
1635            DataType::from_str("float4[]").unwrap(),
1636            DataType::Float32.list()
1637        );
1638        assert_eq!(
1639            DataType::from_str("float8[]").unwrap(),
1640            DataType::Float64.list()
1641        );
1642        assert_eq!(
1643            DataType::from_str("decimal[]").unwrap(),
1644            DataType::Decimal.list()
1645        );
1646        assert_eq!(
1647            DataType::from_str("varchar[]").unwrap(),
1648            DataType::Varchar.list()
1649        );
1650        assert_eq!(DataType::from_str("date[]").unwrap(), DataType::Date.list());
1651        assert_eq!(DataType::from_str("time[]").unwrap(), DataType::Time.list());
1652        assert_eq!(
1653            DataType::from_str("timestamp[]").unwrap(),
1654            DataType::Timestamp.list()
1655        );
1656        assert_eq!(
1657            DataType::from_str("timestamptz[]").unwrap(),
1658            DataType::Timestamptz.list()
1659        );
1660        assert_eq!(
1661            DataType::from_str("interval[]").unwrap(),
1662            DataType::Interval.list()
1663        );
1664
1665        assert_eq!(
1666            DataType::from_str("record").unwrap(),
1667            DataType::Struct(StructType::unnamed(vec![]))
1668        );
1669        assert_eq!(
1670            DataType::from_str("struct<a int4, b varchar>").unwrap(),
1671            DataType::Struct(StructType::new(vec![
1672                ("a", DataType::Int32),
1673                ("b", DataType::Varchar)
1674            ]))
1675        );
1676    }
1677
1678    #[test]
1679    fn test_can_alter() {
1680        let cannots = [
1681            (DataType::Int32, None),
1682            (DataType::Int32.list(), None),
1683            (
1684                MapType::from_kv(DataType::Varchar, DataType::Int32.list()).into(),
1685                None,
1686            ),
1687            (
1688                StructType::new([("a", DataType::Int32)]).into(),
1689                Some(false),
1690            ),
1691            (
1692                MapType::from_kv(
1693                    DataType::Varchar,
1694                    StructType::new([("a", DataType::Int32)]).into(),
1695                )
1696                .into(),
1697                Some(false),
1698            ),
1699        ];
1700        for (cannot, why) in cannots {
1701            assert_eq!(cannot.can_alter(), why, "{cannot:?}");
1702        }
1703
1704        let cans = [
1705            StructType::new([("a", DataType::Int32), ("b", DataType::Int32.list())])
1706                .with_ids([ColumnId::new(1), ColumnId::new(2)])
1707                .into(),
1708            DataType::list(DataType::Struct(
1709                StructType::new([("a", DataType::Int32)]).with_ids([ColumnId::new(1)]),
1710            )),
1711            MapType::from_kv(
1712                DataType::Varchar,
1713                StructType::new([("a", DataType::Int32)])
1714                    .with_ids([ColumnId::new(1)])
1715                    .into(),
1716            )
1717            .into(),
1718        ];
1719        for can in cans {
1720            assert_eq!(can.can_alter(), Some(true), "{can:?}");
1721        }
1722    }
1723}