risingwave_common/types/
to_binary.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use bytes::{BufMut, Bytes, BytesMut};
16use postgres_types::{ToSql, Type};
17use rw_iter_util::ZipEqFast;
18
19use super::{
20    DataType, Date, Decimal, F32, F64, Interval, ScalarRefImpl, Serial, Time, Timestamp,
21    Timestamptz,
22};
23use crate::array::{ListRef, StructRef};
24use crate::error::NotImplemented;
25
26/// Error type for [`ToBinary`] trait.
27#[derive(thiserror::Error, Debug)]
28pub enum ToBinaryError {
29    #[error(transparent)]
30    ToSql(Box<dyn std::error::Error + Send + Sync>),
31
32    #[error(transparent)]
33    NotImplemented(#[from] NotImplemented),
34}
35
36pub type Result<T> = std::result::Result<T, ToBinaryError>;
37
38/// Converts `ScalarRef` to pgwire "BINARY" format.
39///
40/// [`postgres_types::ToSql`] has similar functionality, and most of our types implement
41/// that trait and forward `ToBinary` to it directly.
42pub trait ToBinary {
43    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes>;
44}
45macro_rules! implement_using_to_sql {
46    ($({ $scalar_type:ty, $data_type:ident, $accessor:expr } ),* $(,)?) => {
47        $(
48            impl ToBinary for $scalar_type {
49                fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
50                    match ty {
51                        DataType::$data_type => {
52                            let mut output = BytesMut::new();
53                            #[allow(clippy::redundant_closure_call)]
54                            $accessor(self).to_sql(&Type::ANY, &mut output).map_err(ToBinaryError::ToSql)?;
55                            Ok(output.freeze())
56                        },
57                        _ => unreachable!(),
58                    }
59                }
60            }
61        )*
62    };
63}
64
65implement_using_to_sql! {
66    { i16, Int16, |x| x },
67    { i32, Int32, |x| x },
68    { i64, Int64, |x| x },
69    { &str, Varchar, |x| x },
70    { F32, Float32, |x: &F32| x.0 },
71    { F64, Float64, |x: &F64| x.0 },
72    { bool, Boolean, |x| x },
73    { &[u8], Bytea, |x| x },
74    { Time, Time, |x: &Time| x.0 },
75    { Date, Date, |x: &Date| x.0 },
76    { Timestamp, Timestamp, |x: &Timestamp| x.0 },
77    { Decimal, Decimal, |x| x },
78    { Interval, Interval, |x| x },
79    { Serial, Serial, |x: &Serial| x.0 },
80    { Timestamptz, Timestamptz, |x: &Timestamptz| x.to_datetime_utc() }
81}
82
83fn list_to_binary_with_type(
84    iter: impl ExactSizeIterator<Item = Option<impl ToBinary>>,
85    ty: &DataType,
86) -> Result<Bytes> {
87    {
88        // Reference: Postgres code `src/backend/utils/adt/arrayfuncs.c`
89        // https://github.com/postgres/postgres/blob/c1c09007e219ae68d1f8428a54baf68ccc1f8683/src/backend/utils/adt/arrayfuncs.c#L1548
90        let element_ty = ty.as_list_elem();
91        if matches!(element_ty, DataType::List(_)) {
92            bail_not_implemented!(
93                issue = 7949,
94                "list with 2 or more dimensions is not supported"
95            )
96        }
97        let mut buf = BytesMut::new();
98        buf.put_i32(1); // Number of dimensions (must be 1)
99        buf.put_i32(1); // Has nulls?
100        buf.put_i32(element_ty.to_oid()); // Element type
101        buf.put_i32(iter.len() as i32); // Length of 1st dimension
102        buf.put_i32(1); // Offset of 1st dimension, starting from 1
103        for element in iter {
104            match element {
105                None => {
106                    buf.put_i32(-1); // -1 length means a NULL
107                }
108                Some(value) => {
109                    let data = value.to_binary_with_type(element_ty)?;
110                    buf.put_i32(data.len() as i32); // Length of element
111                    buf.put(data);
112                }
113            }
114        }
115        Ok(buf.into())
116    }
117}
118
119impl ToBinary for StructRef<'_> {
120    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
121        // Reference: Postgres code `src/backend/utils/adt/rowtypes.c`
122        // https://github.com/postgres/postgres/blob/a3699daea2026de324ed7cc7115c36d3499010d3/src/backend/utils/adt/rowtypes.c#L687
123        let mut buf = BytesMut::new();
124        buf.put_i32(ty.as_struct().len() as i32); // number of columns
125        for (datum, field_ty) in self.iter_fields_ref().zip_eq_fast(ty.as_struct().types()) {
126            buf.put_i32(field_ty.to_oid()); // column type
127            match datum {
128                None => {
129                    buf.put_i32(-1); // -1 length means a NULL
130                }
131                Some(value) => {
132                    let data = value.to_binary_with_type(field_ty)?;
133                    buf.put_i32(data.len() as i32); // Length of element
134                    buf.put(data);
135                }
136            }
137        }
138        Ok(buf.into())
139    }
140}
141
142impl ToBinary for ListRef<'_> {
143    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
144        list_to_binary_with_type(self.iter(), ty)
145    }
146}
147
148impl ToBinary for ScalarRefImpl<'_> {
149    fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
150        match self {
151            ScalarRefImpl::Int16(v) => v.to_binary_with_type(ty),
152            ScalarRefImpl::Int32(v) => v.to_binary_with_type(ty),
153            ScalarRefImpl::Int64(v) => v.to_binary_with_type(ty),
154            ScalarRefImpl::Int256(v) => v.to_binary_with_type(ty),
155            ScalarRefImpl::Serial(v) => v.to_binary_with_type(ty),
156            ScalarRefImpl::Float32(v) => v.to_binary_with_type(ty),
157            ScalarRefImpl::Float64(v) => v.to_binary_with_type(ty),
158            ScalarRefImpl::Utf8(v) => v.to_binary_with_type(ty),
159            ScalarRefImpl::Bool(v) => v.to_binary_with_type(ty),
160            ScalarRefImpl::Decimal(v) => v.to_binary_with_type(ty),
161            ScalarRefImpl::Interval(v) => v.to_binary_with_type(ty),
162            ScalarRefImpl::Date(v) => v.to_binary_with_type(ty),
163            ScalarRefImpl::Timestamp(v) => v.to_binary_with_type(ty),
164            ScalarRefImpl::Timestamptz(v) => v.to_binary_with_type(ty),
165            ScalarRefImpl::Time(v) => v.to_binary_with_type(ty),
166            ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty),
167            ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty),
168            ScalarRefImpl::Vector(v) => {
169                assert_eq!(&DataType::Vector(v.dimension()), ty);
170                list_to_binary_with_type(
171                    v.as_slice().iter().cloned().map(Some),
172                    &DataType::Float32.list(),
173                )
174            }
175            ScalarRefImpl::List(v) => v.to_binary_with_type(ty),
176            ScalarRefImpl::Struct(v) => v.to_binary_with_type(ty),
177            ScalarRefImpl::Map(_) => {
178                bail_not_implemented!(
179                    issue = 7949,
180                    "the pgwire extended-mode encoding for {ty} is unsupported"
181                )
182            }
183        }
184    }
185}