risingwave_common/types/
to_binary.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::{BufMut, Bytes, BytesMut};
16use postgres_types::{ToSql, Type};
17use rw_iter_util::ZipEqFast;
18
19use super::{
20    DataType, Date, Decimal, F32, F64, Interval, ScalarRefImpl, Serial, Time, Timestamp,
21    Timestamptz,
22};
23use crate::array::{ListRef, StructRef};
24use crate::error::NotImplemented;
25
26/// Error type for [`ToBinary`] trait.
27#[derive(thiserror::Error, Debug)]
28pub enum ToBinaryError {
29    #[error(transparent)]
30    ToSql(Box<dyn std::error::Error + Send + Sync>),
31
32    #[error(transparent)]
33    NotImplemented(#[from] NotImplemented),
34}
35
36pub type Result<T> = std::result::Result<T, ToBinaryError>;
37
38/// Converts `ScalarRef` to pgwire "BINARY" format.
39///
40/// [`postgres_types::ToSql`] has similar functionality, and most of our types implement
41/// that trait and forward `ToBinary` to it directly.
42pub trait ToBinary {
43    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes>;
44}
45macro_rules! implement_using_to_sql {
46    ($({ $scalar_type:ty, $data_type:ident, $accessor:expr } ),* $(,)?) => {
47        $(
48            impl ToBinary for $scalar_type {
49                fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
50                    match ty {
51                        DataType::$data_type => {
52                            let mut output = BytesMut::new();
53                            #[allow(clippy::redundant_closure_call)]
54                            $accessor(self).to_sql(&Type::ANY, &mut output).map_err(ToBinaryError::ToSql)?;
55                            Ok(output.freeze())
56                        },
57                        _ => unreachable!(),
58                    }
59                }
60            }
61        )*
62    };
63}
64
65implement_using_to_sql! {
66    { i16, Int16, |x| x },
67    { i32, Int32, |x| x },
68    { i64, Int64, |x| x },
69    { &str, Varchar, |x| x },
70    { F32, Float32, |x: &F32| x.0 },
71    { F64, Float64, |x: &F64| x.0 },
72    { bool, Boolean, |x| x },
73    { &[u8], Bytea, |x| x },
74    { Time, Time, |x: &Time| x.0 },
75    { Date, Date, |x: &Date| x.0 },
76    { Timestamp, Timestamp, |x: &Timestamp| x.0 },
77    { Decimal, Decimal, |x| x },
78    { Interval, Interval, |x| x },
79    { Serial, Serial, |x: &Serial| x.0 },
80    { Timestamptz, Timestamptz, |x: &Timestamptz| x.to_datetime_utc() }
81}
82
83impl ToBinary for ListRef<'_> {
84    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
85        // Reference: Postgres code `src/backend/utils/adt/arrayfuncs.c`
86        // https://github.com/postgres/postgres/blob/c1c09007e219ae68d1f8428a54baf68ccc1f8683/src/backend/utils/adt/arrayfuncs.c#L1548
87        use crate::row::Row;
88        let element_ty = match ty {
89            DataType::List(ty) => ty.as_ref(),
90            _ => unreachable!(),
91        };
92        if matches!(element_ty, DataType::List(_)) {
93            bail_not_implemented!(
94                issue = 7949,
95                "list with 2 or more dimensions is not supported"
96            )
97        }
98        let mut buf = BytesMut::new();
99        buf.put_i32(1); // Number of dimensions (must be 1)
100        buf.put_i32(1); // Has nulls?
101        buf.put_i32(element_ty.to_oid()); // Element type
102        buf.put_i32(self.len() as i32); // Length of 1st dimension
103        buf.put_i32(1); // Offset of 1st dimension, starting from 1
104        for element in self.iter() {
105            match element {
106                None => {
107                    buf.put_i32(-1); // -1 length means a NULL
108                }
109                Some(value) => {
110                    let data = value.to_binary_with_type(element_ty)?;
111                    buf.put_i32(data.len() as i32); // Length of element
112                    buf.put(data);
113                }
114            }
115        }
116        Ok(buf.into())
117    }
118}
119
120impl ToBinary for StructRef<'_> {
121    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
122        // Reference: Postgres code `src/backend/utils/adt/rowtypes.c`
123        // https://github.com/postgres/postgres/blob/a3699daea2026de324ed7cc7115c36d3499010d3/src/backend/utils/adt/rowtypes.c#L687
124        let mut buf = BytesMut::new();
125        buf.put_i32(ty.as_struct().len() as i32); // number of columns
126        for (datum, field_ty) in self.iter_fields_ref().zip_eq_fast(ty.as_struct().types()) {
127            buf.put_i32(field_ty.to_oid()); // column type
128            match datum {
129                None => {
130                    buf.put_i32(-1); // -1 length means a NULL
131                }
132                Some(value) => {
133                    let data = value.to_binary_with_type(field_ty)?;
134                    buf.put_i32(data.len() as i32); // Length of element
135                    buf.put(data);
136                }
137            }
138        }
139        Ok(buf.into())
140    }
141}
142
143impl ToBinary for ScalarRefImpl<'_> {
144    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
145        match self {
146            ScalarRefImpl::Int16(v) => v.to_binary_with_type(ty),
147            ScalarRefImpl::Int32(v) => v.to_binary_with_type(ty),
148            ScalarRefImpl::Int64(v) => v.to_binary_with_type(ty),
149            ScalarRefImpl::Int256(v) => v.to_binary_with_type(ty),
150            ScalarRefImpl::Serial(v) => v.to_binary_with_type(ty),
151            ScalarRefImpl::Float32(v) => v.to_binary_with_type(ty),
152            ScalarRefImpl::Float64(v) => v.to_binary_with_type(ty),
153            ScalarRefImpl::Utf8(v) => v.to_binary_with_type(ty),
154            ScalarRefImpl::Bool(v) => v.to_binary_with_type(ty),
155            ScalarRefImpl::Decimal(v) => v.to_binary_with_type(ty),
156            ScalarRefImpl::Interval(v) => v.to_binary_with_type(ty),
157            ScalarRefImpl::Date(v) => v.to_binary_with_type(ty),
158            ScalarRefImpl::Timestamp(v) => v.to_binary_with_type(ty),
159            ScalarRefImpl::Timestamptz(v) => v.to_binary_with_type(ty),
160            ScalarRefImpl::Time(v) => v.to_binary_with_type(ty),
161            ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty),
162            ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty),
163            ScalarRefImpl::List(v) => v.to_binary_with_type(ty),
164            ScalarRefImpl::Struct(v) => v.to_binary_with_type(ty),
165            ScalarRefImpl::Map(_) => {
166                bail_not_implemented!(
167                    issue = 7949,
168                    "the pgwire extended-mode encoding for {ty} is unsupported"
169                )
170            }
171        }
172    }
173}