risingwave_connector/parser/
scalar_adapter.rsuse std::str::FromStr;
use anyhow::anyhow;
use bytes::BytesMut;
use pg_bigdecimal::PgNumeric;
use risingwave_common::types::{DataType, Decimal, Int256, ListValue, ScalarImpl, ScalarRefImpl};
use thiserror_ext::AsReport;
use tokio_postgres::types::{to_sql_checked, FromSql, IsNull, Kind, ToSql, Type};
use crate::error::ConnectorResult;
#[derive(Clone, Debug)]
pub struct EnumString(pub String);
impl<'a> FromSql<'a> for EnumString {
fn from_sql(
_ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + 'static + Sync + Send>> {
Ok(EnumString(String::from_utf8_lossy(raw).into_owned()))
}
fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}
}
impl ToSql for EnumString {
to_sql_checked!();
fn to_sql(
&self,
ty: &Type,
out: &mut BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
match ty.kind() {
Kind::Enum(e) => {
if e.contains(&self.0) {
out.extend_from_slice(self.0.as_bytes());
Ok(IsNull::No)
} else {
Err(format!(
"EnumString value {} is not in the enum type {:?}",
self.0, e
)
.into())
}
}
_ => Err("EnumString can only be used with ENUM types".into()),
}
}
fn accepts(ty: &Type) -> bool {
matches!(ty.kind(), Kind::Enum(_))
}
}
#[derive(Debug)]
pub(crate) enum ScalarAdapter {
Builtin(ScalarImpl),
Uuid(uuid::Uuid),
Numeric(PgNumeric),
Enum(EnumString),
NumericList(Vec<Option<PgNumeric>>),
EnumList(Vec<Option<EnumString>>),
List(Vec<Option<ScalarAdapter>>),
}
impl ToSql for ScalarAdapter {
to_sql_checked!();
fn to_sql(
&self,
ty: &Type,
out: &mut bytes::BytesMut,
) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
match self {
ScalarAdapter::Builtin(v) => v.to_sql(ty, out),
ScalarAdapter::Uuid(v) => v.to_sql(ty, out),
ScalarAdapter::Numeric(v) => v.to_sql(ty, out),
ScalarAdapter::Enum(v) => v.to_sql(ty, out),
ScalarAdapter::NumericList(v) => v.to_sql(ty, out),
ScalarAdapter::EnumList(v) => v.to_sql(ty, out),
ScalarAdapter::List(v) => v.to_sql(ty, out),
}
}
fn accepts(_ty: &Type) -> bool {
true
}
}
impl<'a> FromSql<'a> for ScalarAdapter {
fn from_sql(
ty: &Type,
raw: &'a [u8],
) -> Result<Self, Box<dyn std::error::Error + Sync + Send>> {
match ty.kind() {
Kind::Simple => match *ty {
Type::UUID => Ok(ScalarAdapter::Uuid(uuid::Uuid::from_sql(ty, raw)?)),
Type::NUMERIC => Ok(ScalarAdapter::Numeric(PgNumeric::from_sql(ty, raw)?)),
_ => Ok(ScalarAdapter::Builtin(ScalarImpl::from_sql(ty, raw)?)),
},
Kind::Enum(_) => Ok(ScalarAdapter::Enum(EnumString::from_sql(ty, raw)?)),
Kind::Array(Type::NUMERIC) => {
Ok(ScalarAdapter::NumericList(FromSql::from_sql(ty, raw)?))
}
Kind::Array(inner_type) if let Kind::Enum(_) = inner_type.kind() => {
Ok(ScalarAdapter::EnumList(FromSql::from_sql(ty, raw)?))
}
Kind::Array(_) => Ok(ScalarAdapter::List(FromSql::from_sql(ty, raw)?)),
_ => Err(anyhow!("failed to convert type {:?} to ScalarAdapter", ty).into()),
}
}
fn accepts(ty: &Type) -> bool {
match ty.kind() {
Kind::Simple => {
matches!(ty, &Type::UUID | &Type::NUMERIC) || <ScalarImpl as FromSql>::accepts(ty)
}
Kind::Enum(_) => true,
Kind::Array(inner_type) => <ScalarAdapter as FromSql>::accepts(inner_type),
_ => false,
}
}
}
impl ScalarAdapter {
pub fn name(&self) -> &'static str {
match self {
ScalarAdapter::Builtin(_) => "Builtin",
ScalarAdapter::Uuid(_) => "Uuid",
ScalarAdapter::Numeric(_) => "Numeric",
ScalarAdapter::Enum(_) => "Enum",
ScalarAdapter::EnumList(_) => "EnumList",
ScalarAdapter::NumericList(_) => "NumericList",
ScalarAdapter::List(_) => "List",
}
}
pub(crate) fn from_scalar(
scalar: ScalarRefImpl<'_>,
ty: &Type,
) -> ConnectorResult<ScalarAdapter> {
Ok(match (scalar, ty, ty.kind()) {
(ScalarRefImpl::Utf8(s), &Type::UUID, _) => ScalarAdapter::Uuid(s.parse()?),
(ScalarRefImpl::Utf8(s), &Type::NUMERIC, _) => {
ScalarAdapter::Numeric(string_to_pg_numeric(s))
}
(ScalarRefImpl::Int256(s), &Type::NUMERIC, _) => {
ScalarAdapter::Numeric(string_to_pg_numeric(&s.to_string()))
}
(ScalarRefImpl::Utf8(s), _, Kind::Enum(_)) => {
ScalarAdapter::Enum(EnumString(s.to_owned()))
}
(ScalarRefImpl::List(list), &Type::NUMERIC_ARRAY, _) => {
let mut vec = vec![];
for datum in list.iter() {
vec.push(match datum {
Some(ScalarRefImpl::Int256(s)) => Some(string_to_pg_numeric(&s.to_string())),
Some(ScalarRefImpl::Decimal(s)) => Some(rw_numeric_to_pg_numeric(s)),
Some(ScalarRefImpl::Utf8(s)) => Some(string_to_pg_numeric(s)),
None => None,
_ => {
unreachable!("Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]");
}
})
}
ScalarAdapter::NumericList(vec)
}
(ScalarRefImpl::List(list), _, Kind::Array(inner_type)) => match inner_type.kind() {
Kind::Enum(_) => {
let mut vec = vec![];
for datum in list.iter() {
vec.push(match datum {
Some(ScalarRefImpl::Utf8(s)) => Some(EnumString(s.to_owned())),
_ => unreachable!(
"Only non-null varchar[] is supported to convert to enum[]"
),
})
}
ScalarAdapter::EnumList(vec)
}
_ => {
let mut vec = vec![];
for datum in list.iter() {
vec.push(
datum
.map(|s| ScalarAdapter::from_scalar(s, inner_type))
.transpose()?,
);
}
ScalarAdapter::List(vec)
}
},
_ => ScalarAdapter::Builtin(scalar.into_scalar_impl()),
})
}
pub fn into_scalar(self, ty: &DataType) -> Option<ScalarImpl> {
match (self, &ty) {
(ScalarAdapter::Builtin(scalar), _) => Some(scalar),
(ScalarAdapter::Uuid(uuid), &DataType::Varchar) => {
Some(ScalarImpl::from(uuid.to_string()))
}
(ScalarAdapter::Numeric(numeric), &DataType::Varchar) => {
Some(ScalarImpl::from(pg_numeric_to_string(&numeric)))
}
(ScalarAdapter::Numeric(numeric), &DataType::Int256) => {
pg_numeric_to_rw_int256(&numeric)
}
(ScalarAdapter::Numeric(numeric), &DataType::Decimal) => {
pg_numeric_to_rw_numeric(&numeric)
}
(ScalarAdapter::Enum(EnumString(s)), &DataType::Varchar) => Some(ScalarImpl::from(s)),
(ScalarAdapter::NumericList(vec), &DataType::List(dtype)) => {
let mut builder = dtype.create_array_builder(0);
for val in vec {
let scalar = match (val, &dtype) {
(Some(numeric), box DataType::Varchar) => {
if pg_numeric_is_special(&numeric) {
return None;
} else {
ScalarAdapter::Numeric(numeric).into_scalar(dtype)
}
}
(Some(numeric), box DataType::Int256 | box DataType::Decimal) => {
if pg_numeric_is_special(&numeric) {
return None;
} else {
match ScalarAdapter::Numeric(numeric).into_scalar(dtype) {
Some(scalar) => Some(scalar),
None => {
return None;
}
}
}
}
(Some(_), _) => unreachable!(
"Only rw-numeric[], rw_int256[] and varchar[] are supported to convert to pg-numeric[]"
),
(None, _) => None,
};
builder.append(scalar);
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
(ScalarAdapter::EnumList(vec), &DataType::List(dtype)) => {
let mut builder = dtype.create_array_builder(0);
for val in vec {
match val {
Some(EnumString(s)) => {
builder.append(Some(ScalarImpl::from(s)));
}
None => {
return None;
}
}
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
(ScalarAdapter::List(vec), &DataType::List(dtype)) => {
if matches!(**dtype, DataType::Interval) {
return None;
}
let mut builder = dtype.create_array_builder(0);
for val in vec {
builder.append(val.and_then(|v| v.into_scalar(dtype)));
}
Some(ScalarImpl::from(ListValue::new(builder.finish())))
}
(scaler, ty) => {
tracing::error!(
adapter = scaler.name(),
rw_type = ty.pg_name(),
"failed to convert from ScalarAdapter: invalid conversion"
);
None
}
}
}
}
fn pg_numeric_is_special(val: &PgNumeric) -> bool {
matches!(
val,
PgNumeric::NegativeInf | PgNumeric::PositiveInf | PgNumeric::NaN
)
}
fn pg_numeric_to_rw_int256(val: &PgNumeric) -> Option<ScalarImpl> {
match Int256::from_str(pg_numeric_to_string(val).as_str()) {
Ok(num) => Some(ScalarImpl::from(num)),
Err(err) => {
tracing::error!(error = %err.as_report(), "failed to convert PgNumeric to Int256");
None
}
}
}
fn pg_numeric_to_rw_numeric(val: &PgNumeric) -> Option<ScalarImpl> {
match val {
PgNumeric::NegativeInf => Some(ScalarImpl::from(Decimal::NegativeInf)),
PgNumeric::Normalized(big_decimal) => {
match Decimal::from_str(big_decimal.to_string().as_str()) {
Ok(num) => Some(ScalarImpl::from(num)),
Err(err) => {
tracing::error!(error = %err.as_report(), "parse pg-numeric as rw-numeric failed (likely out-of-range");
None
}
}
}
PgNumeric::PositiveInf => Some(ScalarImpl::from(Decimal::PositiveInf)),
PgNumeric::NaN => Some(ScalarImpl::from(Decimal::NaN)),
}
}
fn pg_numeric_to_string(val: &PgNumeric) -> String {
match val {
PgNumeric::NegativeInf => String::from("NEGATIVE_INFINITY"),
PgNumeric::Normalized(big_decimal) => big_decimal.to_string(),
PgNumeric::PositiveInf => String::from("POSITIVE_INFINITY"),
PgNumeric::NaN => String::from("NAN"),
}
}
fn string_to_pg_numeric(s: &str) -> PgNumeric {
match s {
"NEGATIVE_INFINITY" => PgNumeric::NegativeInf,
"POSITIVE_INFINITY" => PgNumeric::PositiveInf,
"NAN" => PgNumeric::NaN,
_ => PgNumeric::Normalized(s.parse().unwrap()),
}
}
fn rw_numeric_to_pg_numeric(val: Decimal) -> PgNumeric {
match val {
Decimal::NegativeInf => PgNumeric::NegativeInf,
Decimal::Normalized(inner) => PgNumeric::Normalized(inner.to_string().parse().unwrap()),
Decimal::PositiveInf => PgNumeric::PositiveInf,
Decimal::NaN => PgNumeric::NaN,
}
}