risingwave_connector/parser/
sql_server.rs1use std::sync::LazyLock;
16
17use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
18use risingwave_common::catalog::Schema;
19use risingwave_common::log::LogSuppresser;
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
22use rust_decimal::Decimal as RustDecimal;
23use thiserror_ext::AsReport;
24use tiberius::Row;
25use tiberius::xml::XmlData;
26use uuid::Uuid;
27
28use crate::parser::utils::log_error;
29
30static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
31
32pub fn sql_server_row_to_owned_row(row: &mut Row, schema: &Schema) -> OwnedRow {
33 let mut datums: Vec<Option<ScalarImpl>> = vec![];
34 for i in 0..schema.fields.len() {
35 let rw_field = &schema.fields[i];
36 let name = rw_field.name.as_str();
37 let datum = match row.try_get::<ScalarImplTiberiusWrapper, usize>(i) {
38 Ok(datum) => datum.map(|d| d.0),
39 Err(err) => {
40 log_error!(name, err, "parse column failed");
41 None
42 }
43 };
44 datums.push(datum);
45 }
46 OwnedRow::new(datums)
47}
48macro_rules! impl_tiberius_wrapper {
49 ($wrapper_name:ident, $variant_name:ident) => {
50 pub struct $wrapper_name($variant_name);
51
52 impl From<$variant_name> for $wrapper_name {
53 fn from(value: $variant_name) -> Self {
54 Self(value)
55 }
56 }
57 };
58}
59
60impl_tiberius_wrapper!(ScalarImplTiberiusWrapper, ScalarImpl);
61impl_tiberius_wrapper!(TimeTiberiusWrapper, Time);
62impl_tiberius_wrapper!(DateTiberiusWrapper, Date);
63impl_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp);
64impl_tiberius_wrapper!(TimestamptzTiberiusWrapper, Timestamptz);
65impl_tiberius_wrapper!(DecimalTiberiusWrapper, Decimal);
66
67macro_rules! impl_chrono_tiberius_wrapper {
68 ($wrapper_name:ident, $variant_name:ident, $chrono:ty) => {
69 impl<'a> tiberius::IntoSql<'a> for $wrapper_name {
70 fn into_sql(self) -> tiberius::ColumnData<'a> {
71 self.0.0.into_sql()
72 }
73 }
74
75 impl<'a> tiberius::FromSql<'a> for $wrapper_name {
76 fn from_sql(
77 value: &'a tiberius::ColumnData<'static>,
78 ) -> tiberius::Result<Option<Self>> {
79 let instant = <$chrono>::from_sql(value)?;
80 let time = instant.map($variant_name::from).map($wrapper_name::from);
81 tiberius::Result::Ok(time)
82 }
83 }
84 };
85}
86
87impl_chrono_tiberius_wrapper!(TimeTiberiusWrapper, Time, NaiveTime);
88impl_chrono_tiberius_wrapper!(DateTiberiusWrapper, Date, NaiveDate);
89impl_chrono_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp, NaiveDateTime);
90
91impl<'a> tiberius::IntoSql<'a> for DecimalTiberiusWrapper {
92 fn into_sql(self) -> tiberius::ColumnData<'a> {
93 match self.0 {
94 Decimal::Normalized(d) => d.into_sql(),
95 Decimal::NaN => tiberius::ColumnData::Numeric(None),
96 Decimal::PositiveInf => tiberius::ColumnData::Numeric(None),
97 Decimal::NegativeInf => tiberius::ColumnData::Numeric(None),
98 }
99 }
100}
101
102impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper {
103 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
105 tiberius::Result::Ok(
106 RustDecimal::from_sql(value)?
107 .map(Decimal::Normalized)
108 .map(DecimalTiberiusWrapper::from),
109 )
110 }
111}
112
113impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper {
114 fn into_sql(self) -> tiberius::ColumnData<'a> {
115 self.0.to_datetime_utc().into_sql()
116 }
117}
118
119impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper {
120 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
121 let instant = DateTime::<Utc>::from_sql(value)?;
122 let time = instant
123 .map(Timestamptz::from)
124 .map(TimestamptzTiberiusWrapper::from);
125 tiberius::Result::Ok(time)
126 }
127}
128
129impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper {
163 fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
164 Ok(match &value {
165 tiberius::ColumnData::U8(_) => u8::from_sql(value)?
166 .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))),
167 tiberius::ColumnData::I16(_) => i16::from_sql(value)?
168 .map(ScalarImpl::from)
169 .map(ScalarImplTiberiusWrapper::from),
170 tiberius::ColumnData::I32(_) => i32::from_sql(value)?
171 .map(ScalarImpl::from)
172 .map(ScalarImplTiberiusWrapper::from),
173 tiberius::ColumnData::I64(_) => i64::from_sql(value)?
174 .map(ScalarImpl::from)
175 .map(ScalarImplTiberiusWrapper::from),
176 tiberius::ColumnData::F32(_) => f32::from_sql(value)?
177 .map(ScalarImpl::from)
178 .map(ScalarImplTiberiusWrapper::from),
179 tiberius::ColumnData::F64(_) => f64::from_sql(value)?
180 .map(ScalarImpl::from)
181 .map(ScalarImplTiberiusWrapper::from),
182 tiberius::ColumnData::Bit(_) => bool::from_sql(value)?
183 .map(ScalarImpl::from)
184 .map(ScalarImplTiberiusWrapper::from),
185 tiberius::ColumnData::String(_) => <&str>::from_sql(value)?
186 .map(ScalarImpl::from)
187 .map(ScalarImplTiberiusWrapper::from),
188 tiberius::ColumnData::Numeric(_) => DecimalTiberiusWrapper::from_sql(value)?
189 .map(|w| ScalarImpl::from(w.0))
190 .map(ScalarImplTiberiusWrapper::from),
191 tiberius::ColumnData::DateTime(_)
192 | tiberius::ColumnData::DateTime2(_)
193 | tiberius::ColumnData::SmallDateTime(_) => TimestampTiberiusWrapper::from_sql(value)?
194 .map(|w| ScalarImpl::from(w.0))
195 .map(ScalarImplTiberiusWrapper::from),
196 tiberius::ColumnData::Time(_) => TimeTiberiusWrapper::from_sql(value)?
197 .map(|w| ScalarImpl::from(w.0))
198 .map(ScalarImplTiberiusWrapper::from),
199 tiberius::ColumnData::Date(_) => DateTiberiusWrapper::from_sql(value)?
200 .map(|w| ScalarImpl::from(w.0))
201 .map(ScalarImplTiberiusWrapper::from),
202 tiberius::ColumnData::DateTimeOffset(_) => TimestamptzTiberiusWrapper::from_sql(value)?
203 .map(|w| ScalarImpl::from(w.0))
204 .map(ScalarImplTiberiusWrapper::from),
205 tiberius::ColumnData::Binary(_) => <&[u8]>::from_sql(value)?
206 .map(ScalarImpl::from)
207 .map(ScalarImplTiberiusWrapper::from),
208 tiberius::ColumnData::Guid(_) => <Uuid>::from_sql(value)?
209 .map(|uuid| uuid.to_string().to_uppercase())
210 .map(ScalarImpl::from)
211 .map(ScalarImplTiberiusWrapper::from),
212 tiberius::ColumnData::Xml(_) => <&XmlData>::from_sql(value)?
213 .map(|xml| xml.clone().into_string())
214 .map(ScalarImpl::from)
215 .map(ScalarImplTiberiusWrapper::from),
216 })
217 }
218}
219
220impl<'a> tiberius::IntoSql<'a> for ScalarImplTiberiusWrapper {
268 fn into_sql(self) -> tiberius::ColumnData<'a> {
269 match self.0 {
270 ScalarImpl::Int16(v) => v.into_sql(),
271 ScalarImpl::Int32(v) => v.into_sql(),
272 ScalarImpl::Int64(v) => v.into_sql(),
273 ScalarImpl::Float32(v) => v.0.into_sql(),
274 ScalarImpl::Float64(v) => v.0.into_sql(),
275 ScalarImpl::Bool(v) => v.into_sql(),
276 ScalarImpl::Decimal(v) => DecimalTiberiusWrapper::from(v).into_sql(),
277 ScalarImpl::Date(v) => DateTiberiusWrapper::from(v).into_sql(),
278 ScalarImpl::Timestamp(v) => TimestampTiberiusWrapper::from(v).into_sql(),
279 ScalarImpl::Timestamptz(v) => TimestamptzTiberiusWrapper::from(v).into_sql(),
280 ScalarImpl::Time(v) => TimeTiberiusWrapper::from(v).into_sql(),
281 value => {
283 unimplemented!("the sql server decoding for {:?} is unsupported", value);
285 }
286 }
287 }
288}