risingwave_connector/parser/
sql_server.rs1use std::collections::HashSet;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
20use risingwave_common::catalog::Schema;
21use risingwave_common::log::LogSuppresser;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{DataType, Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
24use rust_decimal::Decimal as RustDecimal;
25use thiserror_ext::AsReport;
26use tiberius::Row;
27use tiberius::xml::XmlData;
28use uuid::Uuid;
29
30use crate::parser::utils::log_error;
31
32static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
33
34pub fn sql_server_row_to_owned_row(row: &mut Row, schema: &Schema) -> OwnedRow {
35 let mut datums: Vec<Option<ScalarImpl>> = vec![];
36 let mut money_fields: HashSet<&str> = HashSet::new();
37 for (column, _) in row.cells() {
39 if column.column_type() == tiberius::ColumnType::Money {
40 money_fields.insert(column.name());
41 }
42 }
43 for i in 0..schema.fields.len() {
44 let rw_field = &schema.fields[i];
45 let name = rw_field.name.as_str();
46 let datum = match money_fields.contains(name) {
47 true => match row.try_get::<i64, usize>(i) {
48 Ok(Some(value)) => Some(convert_money_i64_to_type(value, &rw_field.data_type)),
49 Ok(None) => None,
50 Err(err) => {
51 log_error!(name, err, "parse column failed");
52 None
53 }
54 },
55 false => match row.try_get::<ScalarImplTiberiusWrapper, usize>(i) {
56 Ok(datum) => datum.map(|d| d.0),
57 Err(err) => {
58 log_error!(name, err, "parse column failed");
59 None
60 }
61 },
62 };
63
64 datums.push(datum);
65 }
66 OwnedRow::new(datums)
67}
68
69pub fn convert_money_i64_to_type(value: i64, data_type: &DataType) -> ScalarImpl {
70 match data_type {
71 DataType::Decimal => {
72 ScalarImpl::Decimal(Decimal::from(value) / Decimal::from_str("10000").unwrap())
73 }
74 _ => {
75 panic!(
76 "Conversion of Money type to {:?} is not supported",
77 data_type
78 );
79 }
80 }
81}
82macro_rules! impl_tiberius_wrapper {
83 ($wrapper_name:ident, $variant_name:ident) => {
84 pub struct $wrapper_name($variant_name);
85
86 impl From<$variant_name> for $wrapper_name {
87 fn from(value: $variant_name) -> Self {
88 Self(value)
89 }
90 }
91 };
92}
93
94impl_tiberius_wrapper!(ScalarImplTiberiusWrapper, ScalarImpl);
95impl_tiberius_wrapper!(TimeTiberiusWrapper, Time);
96impl_tiberius_wrapper!(DateTiberiusWrapper, Date);
97impl_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp);
98impl_tiberius_wrapper!(TimestamptzTiberiusWrapper, Timestamptz);
99impl_tiberius_wrapper!(DecimalTiberiusWrapper, Decimal);
100
101macro_rules! impl_chrono_tiberius_wrapper {
102 ($wrapper_name:ident, $variant_name:ident, $chrono:ty) => {
103 impl<'a> tiberius::IntoSql<'a> for $wrapper_name {
104 fn into_sql(self) -> tiberius::ColumnData<'a> {
105 self.0.0.into_sql()
106 }
107 }
108
109 impl<'a> tiberius::FromSql<'a> for $wrapper_name {
110 fn from_sql(
111 value: &'a tiberius::ColumnData<'static>,
112 ) -> tiberius::Result<Option<Self>> {
113 let instant = <$chrono>::from_sql(value)?;
114 let time = instant.map($variant_name::from).map($wrapper_name::from);
115 tiberius::Result::Ok(time)
116 }
117 }
118 };
119}
120
121impl_chrono_tiberius_wrapper!(TimeTiberiusWrapper, Time, NaiveTime);
122impl_chrono_tiberius_wrapper!(DateTiberiusWrapper, Date, NaiveDate);
123impl_chrono_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp, NaiveDateTime);
124
125impl<'a> tiberius::IntoSql<'a> for DecimalTiberiusWrapper {
126 fn into_sql(self) -> tiberius::ColumnData<'a> {
127 match self.0 {
128 Decimal::Normalized(d) => d.into_sql(),
129 Decimal::NaN => tiberius::ColumnData::Numeric(None),
130 Decimal::PositiveInf => tiberius::ColumnData::Numeric(None),
131 Decimal::NegativeInf => tiberius::ColumnData::Numeric(None),
132 }
133 }
134}
135
136impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper {
137 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
139 tiberius::Result::Ok(
140 RustDecimal::from_sql(value)?
141 .map(Decimal::Normalized)
142 .map(DecimalTiberiusWrapper::from),
143 )
144 }
145}
146
147impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper {
148 fn into_sql(self) -> tiberius::ColumnData<'a> {
149 self.0.to_datetime_utc().into_sql()
150 }
151}
152
153impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper {
154 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
155 let instant = DateTime::<Utc>::from_sql(value)?;
156 let time = instant
157 .map(Timestamptz::from)
158 .map(TimestamptzTiberiusWrapper::from);
159 tiberius::Result::Ok(time)
160 }
161}
162
163impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper {
197 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
198 Ok(match &value {
199 tiberius::ColumnData::U8(_) => u8::from_sql(value)?
200 .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))),
201 tiberius::ColumnData::I16(_) => i16::from_sql(value)?
202 .map(ScalarImpl::from)
203 .map(ScalarImplTiberiusWrapper::from),
204 tiberius::ColumnData::I32(_) => i32::from_sql(value)?
205 .map(ScalarImpl::from)
206 .map(ScalarImplTiberiusWrapper::from),
207 tiberius::ColumnData::I64(_) => i64::from_sql(value)?
208 .map(ScalarImpl::from)
209 .map(ScalarImplTiberiusWrapper::from),
210 tiberius::ColumnData::F32(_) => f32::from_sql(value)?
211 .map(ScalarImpl::from)
212 .map(ScalarImplTiberiusWrapper::from),
213 tiberius::ColumnData::F64(_) => f64::from_sql(value)?
214 .map(ScalarImpl::from)
215 .map(ScalarImplTiberiusWrapper::from),
216 tiberius::ColumnData::Bit(_) => bool::from_sql(value)?
217 .map(ScalarImpl::from)
218 .map(ScalarImplTiberiusWrapper::from),
219 tiberius::ColumnData::String(_) => <&str>::from_sql(value)?
220 .map(ScalarImpl::from)
221 .map(ScalarImplTiberiusWrapper::from),
222 tiberius::ColumnData::Numeric(_) => DecimalTiberiusWrapper::from_sql(value)?
223 .map(|w| ScalarImpl::from(w.0))
224 .map(ScalarImplTiberiusWrapper::from),
225 tiberius::ColumnData::DateTime(_)
226 | tiberius::ColumnData::DateTime2(_)
227 | tiberius::ColumnData::SmallDateTime(_) => TimestampTiberiusWrapper::from_sql(value)?
228 .map(|w| ScalarImpl::from(w.0))
229 .map(ScalarImplTiberiusWrapper::from),
230 tiberius::ColumnData::Time(_) => TimeTiberiusWrapper::from_sql(value)?
231 .map(|w| ScalarImpl::from(w.0))
232 .map(ScalarImplTiberiusWrapper::from),
233 tiberius::ColumnData::Date(_) => DateTiberiusWrapper::from_sql(value)?
234 .map(|w| ScalarImpl::from(w.0))
235 .map(ScalarImplTiberiusWrapper::from),
236 tiberius::ColumnData::DateTimeOffset(_) => TimestamptzTiberiusWrapper::from_sql(value)?
237 .map(|w| ScalarImpl::from(w.0))
238 .map(ScalarImplTiberiusWrapper::from),
239 tiberius::ColumnData::Binary(_) => <&[u8]>::from_sql(value)?
240 .map(ScalarImpl::from)
241 .map(ScalarImplTiberiusWrapper::from),
242 tiberius::ColumnData::Guid(_) => <Uuid>::from_sql(value)?
243 .map(|uuid| uuid.to_string().to_uppercase())
244 .map(ScalarImpl::from)
245 .map(ScalarImplTiberiusWrapper::from),
246 tiberius::ColumnData::Xml(_) => <&XmlData>::from_sql(value)?
247 .map(|xml| xml.clone().into_string())
248 .map(ScalarImpl::from)
249 .map(ScalarImplTiberiusWrapper::from),
250 })
251 }
252}
253
254impl<'a> tiberius::IntoSql<'a> for ScalarImplTiberiusWrapper {
302 fn into_sql(self) -> tiberius::ColumnData<'a> {
303 match self.0 {
304 ScalarImpl::Int16(v) => v.into_sql(),
305 ScalarImpl::Int32(v) => v.into_sql(),
306 ScalarImpl::Int64(v) => v.into_sql(),
307 ScalarImpl::Float32(v) => v.0.into_sql(),
308 ScalarImpl::Float64(v) => v.0.into_sql(),
309 ScalarImpl::Bool(v) => v.into_sql(),
310 ScalarImpl::Decimal(v) => DecimalTiberiusWrapper::from(v).into_sql(),
311 ScalarImpl::Date(v) => DateTiberiusWrapper::from(v).into_sql(),
312 ScalarImpl::Timestamp(v) => TimestampTiberiusWrapper::from(v).into_sql(),
313 ScalarImpl::Timestamptz(v) => TimestamptzTiberiusWrapper::from(v).into_sql(),
314 ScalarImpl::Time(v) => TimeTiberiusWrapper::from(v).into_sql(),
315 value => {
317 unimplemented!("the sql server decoding for {:?} is unsupported", value);
319 }
320 }
321 }
322}