risingwave_connector/parser/
scalar_adapter.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::str::FromStr;
16
17use anyhow::anyhow;
18use bytes::BytesMut;
19use pg_bigdecimal::PgNumeric;
20use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl};
21use thiserror_ext::AsReport;
22use tokio_postgres::types::{FromSql, IsNull, Kind, ToSql, Type, to_sql_checked};
23
24use crate::error::ConnectorResult;
25
26#[derive(Clone, Debug)]
27pub struct EnumString(pub String);
28
29impl<'a> FromSql<'a> for EnumString {
30    fn from_sql(
31        _ty: &Type,
32        raw: &'a [u8],
33    ) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
34        Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
35    }
36
37    fn accepts(ty: &Type) -> bool {
38        matches!(ty.kind(), Kind::Enum(_))
39    }
40}
41
42impl ToSql for EnumString {
43    to_sql_checked!();
44
45    fn to_sql(
46        &self,
47        ty: &Type,
48        out: &mut BytesMut,
49    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
50    where
51        Self: Sized,
52    {
53        match ty.kind() {
54            Kind::Enum(e) => {
55                if e.contains(&self.0) {
56                    out.extend_from_slice(self.0.as_bytes());
57                    Ok(IsNull::No)
58                } else {
59                    Err(format!(
60                        "EnumString value {} is not in the enum type {:?}",
61                        self.0, e
62                    )
63                    .into())
64                }
65            }
66            _ => Err("EnumString can only be used with ENUM types".into()),
67        }
68    }
69
70    fn accepts(ty: &Type) -> bool {
71        matches!(ty.kind(), Kind::Enum(_))
72    }
73}
74
75/// Adapter for `ScalarImpl` to Postgres data type,
76/// which can be used to encode/decode to/from Postgres value.
77#[derive(Debug)]
78pub(crate) enum ScalarAdapter {
79    Builtin(ScalarImpl),
80    Uuid(uuid::Uuid),
81    // Currently in order to handle the decimal beyond RustDecimal,
82    // we use the PgNumeric type to convert the decimal to a string/decimal/rw_int256.
83    Numeric(PgNumeric),
84    Enum(EnumString),
85    NumericList(Vec<Option<PgNumeric>>),
86    EnumList(Vec<Option<EnumString>>),
87    // UuidList is covered by List, while NumericList and EnumList are special cases.
88    // Note: The IntervalList is not supported.
89    List(Vec<Option<ScalarAdapter>>),
90}
91
92impl ToSql for ScalarAdapter {
93    to_sql_checked!();
94
95    fn to_sql(
96        &self,
97        ty: &Type,
98        out: &mut bytes::BytesMut,
99    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
100        match self {
101            ScalarAdapter::Builtin(v) => v.to_sql(ty, out),
102            ScalarAdapter::Uuid(v) => v.to_sql(ty, out),
103            ScalarAdapter::Numeric(v) => v.to_sql(ty, out),
104            ScalarAdapter::Enum(v) => v.to_sql(ty, out),
105            ScalarAdapter::NumericList(v) => v.to_sql(ty, out),
106            ScalarAdapter::EnumList(v) => v.to_sql(ty, out),
107            ScalarAdapter::List(v) => v.to_sql(ty, out),
108        }
109    }
110
111    fn accepts(_ty: &Type) -> bool {
112        true
113    }
114}
115
116/// convert from Postgres uuid, numeric and enum to `ScalarAdapter`
117impl<'a> FromSql<'a> for ScalarAdapter {
118    fn from_sql(
119        ty: &Type,
120        raw: &'a [u8],
121    ) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
122        match ty.kind() {
123            Kind::Simple => match *ty {
124                Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)),
125                // In order to cover the decimal beyond RustDecimal(only 28 digits are supported),
126                // we use the PgNumeric to handle decimal from postgres.
127                Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)),
128                _ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)),
129            },
130            Kind::Enum(_) => Ok(ScalarAdapter::Enum(EnumString::from_sql(ty, raw)?)),
131            Kind::Array(Type::NUMERIC) => {
132                Ok(ScalarAdapter::NumericList(FromSql::from_sql(ty, raw)?))
133            }
134            Kind::Array(inner_type) if let Kind::Enum(_) = inner_type.kind() => {
135                Ok(ScalarAdapter::EnumList(FromSql::from_sql(ty, raw)?))
136            }
137            Kind::Array(_) => Ok(ScalarAdapter::List(FromSql::from_sql(ty, raw)?)),
138            _ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()),
139        }
140    }
141
142    fn accepts(ty: &Type) -> bool {
143        match ty.kind() {
144            Kind::Simple => {
145                matches!(ty, &Type::UUID | &Type::NUMERIC) || <ScalarImpl as FromSql>::accepts(ty)
146            }
147            Kind::Enum(_) => true,
148            Kind::Array(inner_type) => <ScalarAdapter as FromSql>::accepts(inner_type),
149            _ => false,
150        }
151    }
152}
153
154impl ScalarAdapter {
155    pub fn name(&self) -> &'static str {
156        match self {
157            ScalarAdapter::Builtin(_) => "Builtin",
158            ScalarAdapter::Uuid(_) => "Uuid",
159            ScalarAdapter::Numeric(_) => "Numeric",
160            ScalarAdapter::Enum(_) => "Enum",
161            ScalarAdapter::EnumList(_) => "EnumList",
162            ScalarAdapter::NumericList(_) => "NumericList",
163            ScalarAdapter::List(_) => "List",
164        }
165    }
166
167    /// convert `ScalarRefImpl` to `ScalarAdapter` so that we can correctly encode to postgres value
168    pub(crate) fn from_scalar(
169        scalar: ScalarRefImpl<'_>,
170        ty: &Type,
171    ) -> ConnectorResult<ScalarAdapter> {
172        Ok(match (scalar, ty, ty.kind()) {
173            (ScalarRefImpl::Utf8(s), &Type::UUID, _) => ScalarAdapter::Uuid(s.parse()?),
174            (ScalarRefImpl::Utf8(s), &Type::NUMERIC, _) => {
175                ScalarAdapter::Numeric(string_to_pg_numeric(s))
176            }
177            (ScalarRefImpl::Int256(s), &Type::NUMERIC, _) => {
178                ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string()))
179            }
180            (ScalarRefImpl::Utf8(s), _, Kind::Enum(_)) => {
181                ScalarAdapter::Enum(EnumString(s.to_owned()))
182            }
183            (ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => {
184                let mut vec = vec![];
185                for datum in list.iter() {
186                    vec.push(match datum {
187                        Some(ScalarRefImpl::Int256(s)) => Some(string_to_pg_numeric(&s.to_string())),
188                        Some(ScalarRefImpl::Decimal(s)) => Some(rw_numeric_to_pg_numeric(s)),
189                        Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)),
190                        None => None,
191                        _ => {
192                            unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
193                        }
194                    })
195                }
196                ScalarAdapter::NumericList(vec)
197            }
198            (ScalarRefImpl::List(list), _, Kind::Array(inner_type)) => match inner_type.kind() {
199                Kind::Enum(_) => {
200                    let mut vec = vec![];
201                    for datum in list.iter() {
202                        vec.push(match datum {
203                            Some(ScalarRefImpl::Utf8(s)) => Some(EnumString(s.to_owned())),
204                            _ => unreachable!(
205                                "Only non-null varchar[] is supported to convert to enum[]"
206                            ),
207                        })
208                    }
209                    ScalarAdapter::EnumList(vec)
210                }
211                _ => {
212                    let mut vec = vec![];
213                    for datum in list.iter() {
214                        vec.push(
215                            datum
216                                .map(|s| ScalarAdapter::from_scalar(s, inner_type))
217                                .transpose()?,
218                        );
219                    }
220                    ScalarAdapter::List(vec)
221                }
222            },
223            _ => ScalarAdapter::Builtin(scalar.into_scalar_impl()),
224        })
225    }
226
227    pub fn into_scalar(self, ty: &DataType) -> Option<ScalarImpl> {
228        match (self, &ty) {
229            (ScalarAdapter::Builtin(scalar), _) => Some(scalar),
230            (ScalarAdapter::Uuid(uuid), &DataType::Varchar) => {
231                Some(ScalarImpl::from(uuid.to_string()))
232            }
233            (ScalarAdapter::Numeric(numeric), &DataType::Varchar) => {
234                Some(ScalarImpl::from(pg_numeric_to_string(&numeric)))
235            }
236            (ScalarAdapter::Numeric(numeric), &DataType::Int256) => {
237                pg_numeric_to_rw_int256(&numeric)
238            }
239            (ScalarAdapter::Numeric(numeric), &DataType::Decimal) => {
240                pg_numeric_to_rw_numeric(&numeric)
241            }
242            (ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)),
243            (ScalarAdapter::NumericList(vec), &DataType::List(dtype)) => {
244                let mut builder = dtype.create_array_builder(0);
245                for val in vec {
246                    let scalar = match (val, &dtype) {
247                        // A numeric array contains special values like NaN, Inf, -Inf, which are not supported in Debezium,
248                        // when we encounter these special values, we fallback the array to NULL, returning None directly.
249                        (Some(numeric), box DataType::Varchar) => {
250                            if pg_numeric_is_special(&numeric) {
251                                return None;
252                            } else {
253                                ScalarAdapter::Numeric(numeric).into_scalar(dtype)
254                            }
255                        }
256                        (Some(numeric), box DataType::Int256 | box DataType::Decimal) => {
257                            if pg_numeric_is_special(&numeric) {
258                                return None;
259                            } else {
260                                // A PgNumeric can sometimes exceeds the range of Int256 and RwNumeric.
261                                // In our json parsing, we fallback the array to NULL in this case.
262                                // Here we keep the behavior consistent and return None directly.
263                                match ScalarAdapter::Numeric(numeric).into_scalar(dtype) {
264                                    Some(scalar) => Some(scalar),
265                                    None => {
266                                        return None;
267                                    }
268                                }
269                            }
270                        }
271                        (Some(_), _) => unreachable!(
272                            "Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"
273                        ),
274                        // This item is NULL, continue to handle next item.
275                        (None, _) => None,
276                    };
277                    builder.append(scalar);
278                }
279                Some(ScalarImpl::from(ListValue::new(builder.finish())))
280            }
281            (ScalarAdapter::EnumList(vec), &DataType::List(dtype)) => {
282                let mut builder = dtype.create_array_builder(0);
283                for val in vec {
284                    match val {
285                        Some(EnumString(s)) => {
286                            builder.append(Some(ScalarImpl::from(s)));
287                        }
288                        None => {
289                            return None;
290                        }
291                    }
292                }
293                Some(ScalarImpl::from(ListValue::new(builder.finish())))
294            }
295            (ScalarAdapter::List(vec), &DataType::List(dtype)) => {
296                // Due to https://github.com/risingwavelabs/risingwave/issues/16882, INTERVAL_ARRAY is not supported in Debezium, so we keep backfilling and CDC consistent.
297                if matches!(**dtype, DataType::Interval) {
298                    return None;
299                }
300                let mut builder = dtype.create_array_builder(0);
301                for val in vec {
302                    builder.append(val.and_then(|v| v.into_scalar(dtype)));
303                }
304                Some(ScalarImpl::from(ListValue::new(builder.finish())))
305            }
306            (scaler, ty) => {
307                tracing::error!(
308                    adapter = scaler.name(),
309                    rw_type = ty.pg_name(),
310                    "failed to convert from ScalarAdapter: invalid conversion"
311                );
312                None
313            }
314        }
315    }
316}
317
318pub fn validate_pg_type_to_rw_type(pg_type: &DataType, rw_type: &DataType) -> bool {
319    if pg_type == rw_type {
320        return true;
321    }
322    match rw_type {
323        DataType::Varchar => matches!(pg_type, DataType::Decimal | DataType::Int256),
324        DataType::List(box DataType::Varchar) => {
325            matches!(
326                pg_type,
327                DataType::List(box (DataType::Decimal | DataType::Int256))
328            )
329        }
330        _ => false,
331    }
332}
333
334fn pg_numeric_is_special(val: &PgNumeric) -> bool {
335    matches!(
336        val,
337        PgNumeric::NegativeInf | PgNumeric::PositiveInf | PgNumeric::NaN
338    )
339}
340
341fn pg_numeric_to_rw_int256(val: &PgNumeric) -> Option<ScalarImpl> {
342    match Int256::from_str(pg_numeric_to_string(val).as_str()) {
343        Ok(num) => Some(ScalarImpl::from(num)),
344        Err(err) => {
345            tracing::error!(error = %err.as_report(), "failed to convert PgNumeric to Int256");
346            None
347        }
348    }
349}
350
351fn pg_numeric_to_rw_numeric(val: &PgNumeric) -> Option<ScalarImpl> {
352    match val {
353        PgNumeric::NegativeInf => Some(ScalarImpl::from(Decimal::NegativeInf)),
354        PgNumeric::Normalized(big_decimal) => {
355            match Decimal::from_str(big_decimal.to_string().as_str()) {
356                Ok(num) => Some(ScalarImpl::from(num)),
357                Err(err) => {
358                    tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range");
359                    None
360                }
361            }
362        }
363        PgNumeric::PositiveInf => Some(ScalarImpl::from(Decimal::PositiveInf)),
364        PgNumeric::NaN => Some(ScalarImpl::from(Decimal::NaN)),
365    }
366}
367
368fn pg_numeric_to_string(val: &PgNumeric) -> String {
369    // TODO(kexiang): NEGATIVE_INFINITY -> -Infinity, POSITIVE_INFINITY -> Infinity, NAN -> NaN
370    // The current implementation is to ensure consistency with the behavior of cdc event parsor.
371    match val {
372        PgNumeric::NegativeInf => String::from("NEGATIVE_INFINITY"),
373        PgNumeric::Normalized(big_decimal) => big_decimal.to_string(),
374        PgNumeric::PositiveInf => String::from("POSITIVE_INFINITY"),
375        PgNumeric::NaN => String::from("NAN"),
376    }
377}
378
379fn string_to_pg_numeric(s: &str) -> PgNumeric {
380    match s {
381        "NEGATIVE_INFINITY" => PgNumeric::NegativeInf,
382        "POSITIVE_INFINITY" => PgNumeric::PositiveInf,
383        "NAN" => PgNumeric::NaN,
384        _ => PgNumeric::Normalized(s.parse().unwrap()),
385    }
386}
387
388fn rw_numeric_to_pg_numeric(val: Decimal) -> PgNumeric {
389    match val {
390        Decimal::NegativeInf => PgNumeric::NegativeInf,
391        Decimal::Normalized(inner) => PgNumeric::Normalized(inner.to_string().parse().unwrap()),
392        Decimal::PositiveInf => PgNumeric::PositiveInf,
393        Decimal::NaN => PgNumeric::NaN,
394    }
395}