risingwave_connector/parser/
scalar_adapter.rs1use std::str::FromStr;
16
17use anyhow::anyhow;
18use bytes::BytesMut;
19use pg_bigdecimal::PgNumeric;
20use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl};
21use thiserror_ext::AsReport;
22use tokio_postgres::types::{FromSql, IsNull, Kind, ToSql, Type, to_sql_checked};
23
24use crate::error::ConnectorResult;
25
26#[derive(Clone, Debug)]
27pub struct EnumString(pub String);
28
29impl<'a> FromSql<'a> for EnumString {
30 fn from_sql(
31 _ty: &Type,
32 raw: &'a [u8],
33 ) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
34 Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
35 }
36
37 fn accepts(ty: &Type) -> bool {
38 matches!(ty.kind(), Kind::Enum(_))
39 }
40}
41
42impl ToSql for EnumString {
43 to_sql_checked!();
44
45 fn to_sql(
46 &self,
47 ty: &Type,
48 out: &mut BytesMut,
49 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
50 where
51 Self: Sized,
52 {
53 match ty.kind() {
54 Kind::Enum(e) => {
55 if e.contains(&self.0) {
56 out.extend_from_slice(self.0.as_bytes());
57 Ok(IsNull::No)
58 } else {
59 Err(format!(
60 "EnumString value {} is not in the enum type {:?}",
61 self.0, e
62 )
63 .into())
64 }
65 }
66 _ => Err("EnumString can only be used with ENUM types".into()),
67 }
68 }
69
70 fn accepts(ty: &Type) -> bool {
71 matches!(ty.kind(), Kind::Enum(_))
72 }
73}
74
75#[derive(Debug)]
78pub(crate) enum ScalarAdapter {
79 Builtin(ScalarImpl),
80 Uuid(uuid::Uuid),
81 Numeric(PgNumeric),
84 Enum(EnumString),
85 NumericList(Vec<Option<PgNumeric>>),
86 EnumList(Vec<Option<EnumString>>),
87 List(Vec<Option<ScalarAdapter>>),
90}
91
92impl ToSql for ScalarAdapter {
93 to_sql_checked!();
94
95 fn to_sql(
96 &self,
97 ty: &Type,
98 out: &mut bytes::BytesMut,
99 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
100 match self {
101 ScalarAdapter::Builtin(v) => v.to_sql(ty, out),
102 ScalarAdapter::Uuid(v) => v.to_sql(ty, out),
103 ScalarAdapter::Numeric(v) => v.to_sql(ty, out),
104 ScalarAdapter::Enum(v) => v.to_sql(ty, out),
105 ScalarAdapter::NumericList(v) => v.to_sql(ty, out),
106 ScalarAdapter::EnumList(v) => v.to_sql(ty, out),
107 ScalarAdapter::List(v) => v.to_sql(ty, out),
108 }
109 }
110
111 fn accepts(_ty: &Type) -> bool {
112 true
113 }
114}
115
116impl<'a> FromSql<'a> for ScalarAdapter {
118 fn from_sql(
119 ty: &Type,
120 raw: &'a [u8],
121 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
122 match ty.kind() {
123 Kind::Simple => match *ty {
124 Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)),
125 Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)),
128 _ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)),
129 },
130 Kind::Enum(_) => Ok(ScalarAdapter::Enum(EnumString::from_sql(ty, raw)?)),
131 Kind::Array(Type::NUMERIC) => {
132 Ok(ScalarAdapter::NumericList(FromSql::from_sql(ty, raw)?))
133 }
134 Kind::Array(inner_type) if let Kind::Enum(_) = inner_type.kind() => {
135 Ok(ScalarAdapter::EnumList(FromSql::from_sql(ty, raw)?))
136 }
137 Kind::Array(_) => Ok(ScalarAdapter::List(FromSql::from_sql(ty, raw)?)),
138 _ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()),
139 }
140 }
141
142 fn accepts(ty: &Type) -> bool {
143 match ty.kind() {
144 Kind::Simple => {
145 matches!(ty, &Type::UUID | &Type::NUMERIC) || <ScalarImpl as FromSql>::accepts(ty)
146 }
147 Kind::Enum(_) => true,
148 Kind::Array(inner_type) => <ScalarAdapter as FromSql>::accepts(inner_type),
149 _ => false,
150 }
151 }
152}
153
154impl ScalarAdapter {
155 pub fn name(&self) -> &'static str {
156 match self {
157 ScalarAdapter::Builtin(_) => "Builtin",
158 ScalarAdapter::Uuid(_) => "Uuid",
159 ScalarAdapter::Numeric(_) => "Numeric",
160 ScalarAdapter::Enum(_) => "Enum",
161 ScalarAdapter::EnumList(_) => "EnumList",
162 ScalarAdapter::NumericList(_) => "NumericList",
163 ScalarAdapter::List(_) => "List",
164 }
165 }
166
167 pub(crate) fn from_scalar(
169 scalar: ScalarRefImpl<'_>,
170 ty: &Type,
171 ) -> ConnectorResult<ScalarAdapter> {
172 Ok(match (scalar, ty, ty.kind()) {
173 (ScalarRefImpl::Utf8(s), &Type::UUID, _) => ScalarAdapter::Uuid(s.parse()?),
174 (ScalarRefImpl::Utf8(s), &Type::NUMERIC, _) => {
175 ScalarAdapter::Numeric(string_to_pg_numeric(s))
176 }
177 (ScalarRefImpl::Int256(s), &Type::NUMERIC, _) => {
178 ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string()))
179 }
180 (ScalarRefImpl::Utf8(s), _, Kind::Enum(_)) => {
181 ScalarAdapter::Enum(EnumString(s.to_owned()))
182 }
183 (ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => {
184 let mut vec = vec![];
185 for datum in list.iter() {
186 vec.push(match datum {
187 Some(ScalarRefImpl::Int256(s)) => Some(string_to_pg_numeric(&s.to_string())),
188 Some(ScalarRefImpl::Decimal(s)) => Some(rw_numeric_to_pg_numeric(s)),
189 Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)),
190 None => None,
191 _ => {
192 unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
193 }
194 })
195 }
196 ScalarAdapter::NumericList(vec)
197 }
198 (ScalarRefImpl::List(list), _, Kind::Array(inner_type)) => match inner_type.kind() {
199 Kind::Enum(_) => {
200 let mut vec = vec![];
201 for datum in list.iter() {
202 vec.push(match datum {
203 Some(ScalarRefImpl::Utf8(s)) => Some(EnumString(s.to_owned())),
204 _ => unreachable!(
205 "Only non-null varchar[] is supported to convert to enum[]"
206 ),
207 })
208 }
209 ScalarAdapter::EnumList(vec)
210 }
211 _ => {
212 let mut vec = vec![];
213 for datum in list.iter() {
214 vec.push(
215 datum
216 .map(|s| ScalarAdapter::from_scalar(s, inner_type))
217 .transpose()?,
218 );
219 }
220 ScalarAdapter::List(vec)
221 }
222 },
223 _ => ScalarAdapter::Builtin(scalar.into_scalar_impl()),
224 })
225 }
226
227 pub fn into_scalar(self, ty: &DataType) -> Option<ScalarImpl> {
228 match (self, &ty) {
229 (ScalarAdapter::Builtin(scalar), _) => Some(scalar),
230 (ScalarAdapter::Uuid(uuid), &DataType::Varchar) => {
231 Some(ScalarImpl::from(uuid.to_string()))
232 }
233 (ScalarAdapter::Numeric(numeric), &DataType::Varchar) => {
234 Some(ScalarImpl::from(pg_numeric_to_string(&numeric)))
235 }
236 (ScalarAdapter::Numeric(numeric), &DataType::Int256) => {
237 pg_numeric_to_rw_int256(&numeric)
238 }
239 (ScalarAdapter::Numeric(numeric), &DataType::Decimal) => {
240 pg_numeric_to_rw_numeric(&numeric)
241 }
242 (ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)),
243 (ScalarAdapter::NumericList(vec), &DataType::List(dtype)) => {
244 let mut builder = dtype.create_array_builder(0);
245 for val in vec {
246 let scalar = match (val, &dtype) {
247 (Some(numeric), box DataType::Varchar) => {
250 if pg_numeric_is_special(&numeric) {
251 return None;
252 } else {
253 ScalarAdapter::Numeric(numeric).into_scalar(dtype)
254 }
255 }
256 (Some(numeric), box DataType::Int256 | box DataType::Decimal) => {
257 if pg_numeric_is_special(&numeric) {
258 return None;
259 } else {
260 match ScalarAdapter::Numeric(numeric).into_scalar(dtype) {
264 Some(scalar) => Some(scalar),
265 None => {
266 return None;
267 }
268 }
269 }
270 }
271 (Some(_), _) => unreachable!(
272 "Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"
273 ),
274 (None, _) => None,
276 };
277 builder.append(scalar);
278 }
279 Some(ScalarImpl::from(ListValue::new(builder.finish())))
280 }
281 (ScalarAdapter::EnumList(vec), &DataType::List(dtype)) => {
282 let mut builder = dtype.create_array_builder(0);
283 for val in vec {
284 match val {
285 Some(EnumString(s)) => {
286 builder.append(Some(ScalarImpl::from(s)));
287 }
288 None => {
289 return None;
290 }
291 }
292 }
293 Some(ScalarImpl::from(ListValue::new(builder.finish())))
294 }
295 (ScalarAdapter::List(vec), &DataType::List(dtype)) => {
296 if matches!(**dtype, DataType::Interval) {
298 return None;
299 }
300 let mut builder = dtype.create_array_builder(0);
301 for val in vec {
302 builder.append(val.and_then(|v| v.into_scalar(dtype)));
303 }
304 Some(ScalarImpl::from(ListValue::new(builder.finish())))
305 }
306 (scaler, ty) => {
307 tracing::error!(
308 adapter = scaler.name(),
309 rw_type = ty.pg_name(),
310 "failed to convert from ScalarAdapter: invalid conversion"
311 );
312 None
313 }
314 }
315 }
316}
317
318pub fn validate_pg_type_to_rw_type(pg_type: &DataType, rw_type: &DataType) -> bool {
319 if pg_type == rw_type {
320 return true;
321 }
322 match rw_type {
323 DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256),
324 DataType::List(box DataType::Varchar) => {
325 matches!(
326 pg_type,
327 DataType::List(box (DataType::Decimal | DataType::Int256))
328 )
329 }
330 _ => false,
331 }
332}
333
334fn pg_numeric_is_special(val: &PgNumeric) -> bool {
335 matches!(
336 val,
337 PgNumeric::NegativeInf | PgNumeric::PositiveInf | PgNumeric::NaN
338 )
339}
340
341fn pg_numeric_to_rw_int256(val: &PgNumeric) -> Option<ScalarImpl> {
342 match Int256::from_str(pg_numeric_to_string(val).as_str()) {
343 Ok(num) => Some(ScalarImpl::from(num)),
344 Err(err) => {
345 tracing::error!(error = %err.as_report(), "failed to convert PgNumeric to Int256");
346 None
347 }
348 }
349}
350
351fn pg_numeric_to_rw_numeric(val: &PgNumeric) -> Option<ScalarImpl> {
352 match val {
353 PgNumeric::NegativeInf => Some(ScalarImpl::from(Decimal::NegativeInf)),
354 PgNumeric::Normalized(big_decimal) => {
355 match Decimal::from_str(big_decimal.to_string().as_str()) {
356 Ok(num) => Some(ScalarImpl::from(num)),
357 Err(err) => {
358 tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range");
359 None
360 }
361 }
362 }
363 PgNumeric::PositiveInf => Some(ScalarImpl::from(Decimal::PositiveInf)),
364 PgNumeric::NaN => Some(ScalarImpl::from(Decimal::NaN)),
365 }
366}
367
368fn pg_numeric_to_string(val: &PgNumeric) -> String {
369 match val {
372 PgNumeric::NegativeInf => String::from("NEGATIVE_INFINITY"),
373 PgNumeric::Normalized(big_decimal) => big_decimal.to_string(),
374 PgNumeric::PositiveInf => String::from("POSITIVE_INFINITY"),
375 PgNumeric::NaN => String::from("NAN"),
376 }
377}
378
379fn string_to_pg_numeric(s: &str) -> PgNumeric {
380 match s {
381 "NEGATIVE_INFINITY" => PgNumeric::NegativeInf,
382 "POSITIVE_INFINITY" => PgNumeric::PositiveInf,
383 "NAN" => PgNumeric::NaN,
384 _ => PgNumeric::Normalized(s.parse().unwrap()),
385 }
386}
387
388fn rw_numeric_to_pg_numeric(val: Decimal) -> PgNumeric {
389 match val {
390 Decimal::NegativeInf => PgNumeric::NegativeInf,
391 Decimal::Normalized(inner) => PgNumeric::Normalized(inner.to_string().parse().unwrap()),
392 Decimal::PositiveInf => PgNumeric::PositiveInf,
393 Decimal::NaN => PgNumeric::NaN,
394 }
395}