risingwave_connector/parser/
scalar_adapter.rs1use std::str::FromStr;
16
17use anyhow::anyhow;
18use bytes::BytesMut;
19use pg_bigdecimal::PgNumeric;
20use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl};
21use thiserror_ext::AsReport;
22use tokio_postgres::types::{FromSql, IsNull, Kind, ToSql, Type, to_sql_checked};
23
24use crate::error::ConnectorResult;
25
26#[derive(Clone, Debug)]
27pub struct EnumString(pub String);
28
29impl<'a> FromSql<'a> for EnumString {
30 fn from_sql(
31 _ty: &Type,
32 raw: &'a [u8],
33 ) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
34 Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
35 }
36
37 fn accepts(ty: &Type) -> bool {
38 matches!(ty.kind(), Kind::Enum(_))
39 }
40}
41
42impl ToSql for EnumString {
43 to_sql_checked!();
44
45 fn to_sql(
46 &self,
47 ty: &Type,
48 out: &mut BytesMut,
49 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
50 where
51 Self: Sized,
52 {
53 match ty.kind() {
54 Kind::Enum(e) => {
55 if e.contains(&self.0) {
56 out.extend_from_slice(self.0.as_bytes());
57 Ok(IsNull::No)
58 } else {
59 Err(format!(
60 "EnumString value {} is not in the enum type {:?}",
61 self.0, e
62 )
63 .into())
64 }
65 }
66 _ => Err("EnumString can only be used with ENUM types".into()),
67 }
68 }
69
70 fn accepts(ty: &Type) -> bool {
71 matches!(ty.kind(), Kind::Enum(_))
72 }
73}
74
75#[derive(Debug)]
78pub(crate) enum ScalarAdapter {
79 Builtin(ScalarImpl),
80 Uuid(uuid::Uuid),
81 Numeric(PgNumeric),
84 Enum(EnumString),
85 NumericList(Vec<Option<PgNumeric>>),
86 EnumList(Vec<Option<EnumString>>),
87 List(Vec<Option<ScalarAdapter>>),
90}
91
92impl ToSql for ScalarAdapter {
93 to_sql_checked!();
94
95 fn to_sql(
96 &self,
97 ty: &Type,
98 out: &mut bytes::BytesMut,
99 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
100 match self {
101 ScalarAdapter::Builtin(v) => v.to_sql(ty, out),
102 ScalarAdapter::Uuid(v) => v.to_sql(ty, out),
103 ScalarAdapter::Numeric(v) => v.to_sql(ty, out),
104 ScalarAdapter::Enum(v) => v.to_sql(ty, out),
105 ScalarAdapter::NumericList(v) => v.to_sql(ty, out),
106 ScalarAdapter::EnumList(v) => v.to_sql(ty, out),
107 ScalarAdapter::List(v) => v.to_sql(ty, out),
108 }
109 }
110
111 fn accepts(_ty: &Type) -> bool {
112 true
113 }
114}
115
116impl<'a> FromSql<'a> for ScalarAdapter {
118 fn from_sql(
119 ty: &Type,
120 raw: &'a [u8],
121 ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
122 match ty.kind() {
123 Kind::Simple => match *ty {
124 Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)),
125 Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)),
128 _ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)),
129 },
130 Kind::Enum(_) => Ok(ScalarAdapter::Enum(EnumString::from_sql(ty, raw)?)),
131 Kind::Array(Type::NUMERIC) => {
132 Ok(ScalarAdapter::NumericList(FromSql::from_sql(ty, raw)?))
133 }
134 Kind::Array(inner_type) if let Kind::Enum(_) = inner_type.kind() => {
135 Ok(ScalarAdapter::EnumList(FromSql::from_sql(ty, raw)?))
136 }
137 Kind::Array(_) => Ok(ScalarAdapter::List(FromSql::from_sql(ty, raw)?)),
138 _ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()),
139 }
140 }
141
142 fn accepts(ty: &Type) -> bool {
143 match ty.kind() {
144 Kind::Simple => {
145 matches!(ty, &Type::UUID | &Type::NUMERIC) || <ScalarImpl as FromSql>::accepts(ty)
146 }
147 Kind::Enum(_) => true,
148 Kind::Array(inner_type) => <ScalarAdapter as FromSql>::accepts(inner_type),
149 _ => false,
150 }
151 }
152}
153
154impl ScalarAdapter {
155 pub fn name(&self) -> &'static str {
156 match self {
157 ScalarAdapter::Builtin(_) => "Builtin",
158 ScalarAdapter::Uuid(_) => "Uuid",
159 ScalarAdapter::Numeric(_) => "Numeric",
160 ScalarAdapter::Enum(_) => "Enum",
161 ScalarAdapter::EnumList(_) => "EnumList",
162 ScalarAdapter::NumericList(_) => "NumericList",
163 ScalarAdapter::List(_) => "List",
164 }
165 }
166
167 pub(crate) fn from_scalar(
169 scalar: ScalarRefImpl<'_>,
170 ty: &Type,
171 ) -> ConnectorResult<ScalarAdapter> {
172 Ok(match (scalar, ty, ty.kind()) {
173 (ScalarRefImpl::Utf8(s), &Type::UUID, _) => ScalarAdapter::Uuid(s.parse()?),
174 (ScalarRefImpl::Utf8(s), &Type::NUMERIC, _) => {
175 ScalarAdapter::Numeric(string_to_pg_numeric(s))
176 }
177 (ScalarRefImpl::Int256(s), &Type::NUMERIC, _) => {
178 ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string()))
179 }
180 (ScalarRefImpl::Utf8(s), _, Kind::Enum(_)) => {
181 ScalarAdapter::Enum(EnumString(s.to_owned()))
182 }
183 (ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => {
184 let mut vec = vec![];
185 for datum in list.iter() {
186 vec.push(match datum {
187 Some(ScalarRefImpl::Int256(s)) => Some(string_to_pg_numeric(&s.to_string())),
188 Some(ScalarRefImpl::Decimal(s)) => Some(rw_numeric_to_pg_numeric(s)),
189 Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)),
190 None => None,
191 _ => {
192 unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
193 }
194 })
195 }
196 ScalarAdapter::NumericList(vec)
197 }
198 (ScalarRefImpl::List(list), _, Kind::Array(inner_type)) => match inner_type.kind() {
199 Kind::Enum(_) => {
200 let mut vec = vec![];
201 for datum in list.iter() {
202 vec.push(match datum {
203 Some(ScalarRefImpl::Utf8(s)) => Some(EnumString(s.to_owned())),
204 _ => unreachable!(
205 "Only non-null varchar[] is supported to convert to enum[]"
206 ),
207 })
208 }
209 ScalarAdapter::EnumList(vec)
210 }
211 _ => {
212 let mut vec = vec![];
213 for datum in list.iter() {
214 vec.push(
215 datum
216 .map(|s| ScalarAdapter::from_scalar(s, inner_type))
217 .transpose()?,
218 );
219 }
220 ScalarAdapter::List(vec)
221 }
222 },
223 _ => ScalarAdapter::Builtin(scalar.into_scalar_impl()),
224 })
225 }
226
227 pub fn into_scalar(self, ty: &DataType) -> Option<ScalarImpl> {
228 match (self, &ty) {
229 (ScalarAdapter::Builtin(scalar), _) => Some(scalar),
230 (ScalarAdapter::Uuid(uuid), &DataType::Varchar) => {
231 Some(ScalarImpl::from(uuid.to_string()))
232 }
233 (ScalarAdapter::Numeric(numeric), &DataType::Varchar) => {
234 Some(ScalarImpl::from(pg_numeric_to_string(&numeric)))
235 }
236 (ScalarAdapter::Numeric(numeric), &DataType::Int256) => {
237 pg_numeric_to_rw_int256(&numeric)
238 }
239 (ScalarAdapter::Numeric(numeric), &DataType::Decimal) => {
240 pg_numeric_to_rw_numeric(&numeric)
241 }
242 (ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)),
243 (ScalarAdapter::NumericList(vec), &DataType::List(list)) => {
244 let elem = list.elem();
245 let mut builder = elem.create_array_builder(0);
246 for val in vec {
247 let scalar = match (val, &elem) {
248 (Some(numeric), DataType::Varchar) => {
251 if pg_numeric_is_special(&numeric) {
252 return None;
253 } else {
254 ScalarAdapter::Numeric(numeric).into_scalar(elem)
255 }
256 }
257 (Some(numeric), DataType::Int256 | DataType::Decimal) => {
258 if pg_numeric_is_special(&numeric) {
259 return None;
260 } else {
261 match ScalarAdapter::Numeric(numeric).into_scalar(elem) {
265 Some(scalar) => Some(scalar),
266 None => {
267 return None;
268 }
269 }
270 }
271 }
272 (Some(_), _) => unreachable!(
273 "Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"
274 ),
275 (None, _) => None,
277 };
278 builder.append(scalar);
279 }
280 Some(ScalarImpl::from(ListValue::new(builder.finish())))
281 }
282 (ScalarAdapter::EnumList(vec), &DataType::List(list)) => {
283 let mut builder = list.elem().create_array_builder(0);
284 for val in vec {
285 match val {
286 Some(EnumString(s)) => {
287 builder.append(Some(ScalarImpl::from(s)));
288 }
289 None => {
290 return None;
291 }
292 }
293 }
294 Some(ScalarImpl::from(ListValue::new(builder.finish())))
295 }
296 (ScalarAdapter::List(vec), &DataType::List(list)) => {
297 let elem = list.elem();
298 if matches!(elem, DataType::Interval) {
300 return None;
301 }
302 let mut builder = elem.create_array_builder(0);
303 for val in vec {
304 builder.append(val.and_then(|v| v.into_scalar(elem)));
305 }
306 Some(ScalarImpl::from(ListValue::new(builder.finish())))
307 }
308 (scaler, ty) => {
309 tracing::error!(
310 adapter = scaler.name(),
311 rw_type = ty.pg_name(),
312 "failed to convert from ScalarAdapter: invalid conversion"
313 );
314 None
315 }
316 }
317 }
318}
319
320pub fn validate_pg_type_to_rw_type(pg_type: &DataType, rw_type: &DataType) -> bool {
321 if pg_type == rw_type {
322 return true;
323 }
324 match rw_type {
325 DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256),
326 DataType::List(list) if list.elem() == &DataType::Varchar => {
327 matches!(
328 pg_type,
329 DataType::List(list) if matches!(list.elem(), DataType::Decimal | DataType::Int256)
330 )
331 }
332 _ => false,
333 }
334}
335
336fn pg_numeric_is_special(val: &PgNumeric) -> bool {
337 matches!(
338 val,
339 PgNumeric::NegativeInf | PgNumeric::PositiveInf | PgNumeric::NaN
340 )
341}
342
343fn pg_numeric_to_rw_int256(val: &PgNumeric) -> Option<ScalarImpl> {
344 match Int256::from_str(pg_numeric_to_string(val).as_str()) {
345 Ok(num) => Some(ScalarImpl::from(num)),
346 Err(err) => {
347 tracing::error!(error = %err.as_report(), "failed to convert PgNumeric to Int256");
348 None
349 }
350 }
351}
352
353fn pg_numeric_to_rw_numeric(val: &PgNumeric) -> Option<ScalarImpl> {
354 match val {
355 PgNumeric::NegativeInf => Some(ScalarImpl::from(Decimal::NegativeInf)),
356 PgNumeric::Normalized(big_decimal) => {
357 match Decimal::from_str(big_decimal.to_string().as_str()) {
358 Ok(num) => Some(ScalarImpl::from(num)),
359 Err(err) => {
360 tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range");
361 None
362 }
363 }
364 }
365 PgNumeric::PositiveInf => Some(ScalarImpl::from(Decimal::PositiveInf)),
366 PgNumeric::NaN => Some(ScalarImpl::from(Decimal::NaN)),
367 }
368}
369
370fn pg_numeric_to_string(val: &PgNumeric) -> String {
371 match val {
374 PgNumeric::NegativeInf => String::from("NEGATIVE_INFINITY"),
375 PgNumeric::Normalized(big_decimal) => big_decimal.to_string(),
376 PgNumeric::PositiveInf => String::from("POSITIVE_INFINITY"),
377 PgNumeric::NaN => String::from("NAN"),
378 }
379}
380
381fn string_to_pg_numeric(s: &str) -> PgNumeric {
382 match s {
383 "NEGATIVE_INFINITY" => PgNumeric::NegativeInf,
384 "POSITIVE_INFINITY" => PgNumeric::PositiveInf,
385 "NAN" => PgNumeric::NaN,
386 _ => PgNumeric::Normalized(s.parse().unwrap()),
387 }
388}
389
390fn rw_numeric_to_pg_numeric(val: Decimal) -> PgNumeric {
391 match val {
392 Decimal::NegativeInf => PgNumeric::NegativeInf,
393 Decimal::Normalized(inner) => PgNumeric::Normalized(inner.to_string().parse().unwrap()),
394 Decimal::PositiveInf => PgNumeric::PositiveInf,
395 Decimal::NaN => PgNumeric::NaN,
396 }
397}