1use bytes::{BufMut, Bytes, BytesMut};
16use postgres_types::{ToSql, Type};
17use rw_iter_util::ZipEqFast;
18
19use super::{
20 DataType, Date, Decimal, F32, F64, Interval, ScalarRefImpl, Serial, Time, Timestamp,
21 Timestamptz,
22};
23use crate::array::{ListRef, StructRef};
24use crate::error::NotImplemented;
25
26#[derive(thiserror::Error, Debug)]
28pub enum ToBinaryError {
29 #[error(transparent)]
30 ToSql(Box<dyn std::error::Error + Send + Sync>),
31
32 #[error(transparent)]
33 NotImplemented(#[from] NotImplemented),
34}
35
36pub type Result<T> = std::result::Result<T, ToBinaryError>;
37
38pub trait ToBinary {
43 fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes>;
44}
45macro_rules! implement_using_to_sql {
46 ($({ $scalar_type:ty, $data_type:ident, $accessor:expr } ),* $(,)?) => {
47 $(
48 impl ToBinary for $scalar_type {
49 fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
50 match ty {
51 DataType::$data_type => {
52 let mut output = BytesMut::new();
53 #[allow(clippy::redundant_closure_call)]
54 $accessor(self).to_sql(&Type::ANY, &mut output).map_err(ToBinaryError::ToSql)?;
55 Ok(output.freeze())
56 },
57 _ => unreachable!(),
58 }
59 }
60 }
61 )*
62 };
63}
64
65implement_using_to_sql! {
66 { i16, Int16, |x| x },
67 { i32, Int32, |x| x },
68 { i64, Int64, |x| x },
69 { &str, Varchar, |x| x },
70 { F32, Float32, |x: &F32| x.0 },
71 { F64, Float64, |x: &F64| x.0 },
72 { bool, Boolean, |x| x },
73 { &[u8], Bytea, |x| x },
74 { Time, Time, |x: &Time| x.0 },
75 { Date, Date, |x: &Date| x.0 },
76 { Timestamp, Timestamp, |x: &Timestamp| x.0 },
77 { Decimal, Decimal, |x| x },
78 { Interval, Interval, |x| x },
79 { Serial, Serial, |x: &Serial| x.0 },
80 { Timestamptz, Timestamptz, |x: &Timestamptz| x.to_datetime_utc() }
81}
82
83fn list_to_binary_with_type(
84 iter: impl ExactSizeIterator<Item = Option<impl ToBinary>>,
85 ty: &DataType,
86) -> Result<Bytes> {
87 {
88 let element_ty = match ty {
91 DataType::List(ty) => ty.as_ref(),
92 _ => unreachable!(),
93 };
94 if matches!(element_ty, DataType::List(_)) {
95 bail_not_implemented!(
96 issue = 7949,
97 "list with 2 or more dimensions is not supported"
98 )
99 }
100 let mut buf = BytesMut::new();
101 buf.put_i32(1); buf.put_i32(1); buf.put_i32(element_ty.to_oid()); buf.put_i32(iter.len() as i32); buf.put_i32(1); for element in iter {
107 match element {
108 None => {
109 buf.put_i32(-1); }
111 Some(value) => {
112 let data = value.to_binary_with_type(element_ty)?;
113 buf.put_i32(data.len() as i32); buf.put(data);
115 }
116 }
117 }
118 Ok(buf.into())
119 }
120}
121
122impl ToBinary for StructRef<'_> {
123 fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
124 let mut buf = BytesMut::new();
127 buf.put_i32(ty.as_struct().len() as i32); for (datum, field_ty) in self.iter_fields_ref().zip_eq_fast(ty.as_struct().types()) {
129 buf.put_i32(field_ty.to_oid()); match datum {
131 None => {
132 buf.put_i32(-1); }
134 Some(value) => {
135 let data = value.to_binary_with_type(field_ty)?;
136 buf.put_i32(data.len() as i32); buf.put(data);
138 }
139 }
140 }
141 Ok(buf.into())
142 }
143}
144
145impl ToBinary for ListRef<'_> {
146 fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
147 list_to_binary_with_type(self.iter(), ty)
148 }
149}
150
151impl ToBinary for ScalarRefImpl<'_> {
152 fn to_binary_with_type(&self, ty: &DataType) -> Result<Bytes> {
153 match self {
154 ScalarRefImpl::Int16(v) => v.to_binary_with_type(ty),
155 ScalarRefImpl::Int32(v) => v.to_binary_with_type(ty),
156 ScalarRefImpl::Int64(v) => v.to_binary_with_type(ty),
157 ScalarRefImpl::Int256(v) => v.to_binary_with_type(ty),
158 ScalarRefImpl::Serial(v) => v.to_binary_with_type(ty),
159 ScalarRefImpl::Float32(v) => v.to_binary_with_type(ty),
160 ScalarRefImpl::Float64(v) => v.to_binary_with_type(ty),
161 ScalarRefImpl::Utf8(v) => v.to_binary_with_type(ty),
162 ScalarRefImpl::Bool(v) => v.to_binary_with_type(ty),
163 ScalarRefImpl::Decimal(v) => v.to_binary_with_type(ty),
164 ScalarRefImpl::Interval(v) => v.to_binary_with_type(ty),
165 ScalarRefImpl::Date(v) => v.to_binary_with_type(ty),
166 ScalarRefImpl::Timestamp(v) => v.to_binary_with_type(ty),
167 ScalarRefImpl::Timestamptz(v) => v.to_binary_with_type(ty),
168 ScalarRefImpl::Time(v) => v.to_binary_with_type(ty),
169 ScalarRefImpl::Bytea(v) => v.to_binary_with_type(ty),
170 ScalarRefImpl::Jsonb(v) => v.to_binary_with_type(ty),
171 ScalarRefImpl::Vector(v) => {
172 assert_eq!(&DataType::Vector(v.dimension()), ty);
173 list_to_binary_with_type(
174 v.as_slice().iter().cloned().map(Some),
175 &DataType::List(DataType::Float32.into()),
176 )
177 }
178 ScalarRefImpl::List(v) => v.to_binary_with_type(ty),
179 ScalarRefImpl::Struct(v) => v.to_binary_with_type(ty),
180 ScalarRefImpl::Map(_) => {
181 bail_not_implemented!(
182 issue = 7949,
183 "the pgwire extended-mode encoding for {ty} is unsupported"
184 )
185 }
186 }
187 }
188}