risingwave_connector/parser/
sql_server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::str::FromStr;
17use std::sync::LazyLock;
18
19use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
20use risingwave_common::catalog::Schema;
21use risingwave_common::log::LogSuppresser;
22use risingwave_common::row::OwnedRow;
23use risingwave_common::types::{DataType, Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
24use rust_decimal::Decimal as RustDecimal;
25use thiserror_ext::AsReport;
26use tiberius::Row;
27use tiberius::xml::XmlData;
28use uuid::Uuid;
29
30use crate::parser::utils::log_error;
31
32static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
33
34pub fn sql_server_row_to_owned_row(row: &mut Row, schema: &Schema) -> OwnedRow {
35    let mut datums: Vec<Option<ScalarImpl>> = vec![];
36    let mut money_fields: HashSet<&str> = HashSet::new();
37    // Special handling of the money field, as the third-party library Tiberius converts the money type to i64.
38    for (column, _) in row.cells() {
39        if column.column_type() == tiberius::ColumnType::Money {
40            money_fields.insert(column.name());
41        }
42    }
43    for i in 0..schema.fields.len() {
44        let rw_field = &schema.fields[i];
45        let name = rw_field.name.as_str();
46        let datum = match money_fields.contains(name) {
47            true => match row.try_get::<i64, usize>(i) {
48                Ok(Some(value)) => Some(convert_money_i64_to_type(value, &rw_field.data_type)),
49                Ok(None) => None,
50                Err(err) => {
51                    log_error!(name, err, "parse column failed");
52                    None
53                }
54            },
55            false => match row.try_get::<ScalarImplTiberiusWrapper, usize>(i) {
56                Ok(datum) => datum.map(|d| d.0),
57                Err(err) => {
58                    log_error!(name, err, "parse column failed");
59                    None
60                }
61            },
62        };
63
64        datums.push(datum);
65    }
66    OwnedRow::new(datums)
67}
68
69pub fn convert_money_i64_to_type(value: i64, data_type: &DataType) -> ScalarImpl {
70    match data_type {
71        DataType::Decimal => {
72            ScalarImpl::Decimal(Decimal::from(value) / Decimal::from_str("10000").unwrap())
73        }
74        _ => {
75            panic!(
76                "Conversion of Money type to {:?} is not supported",
77                data_type
78            );
79        }
80    }
81}
82macro_rules! impl_tiberius_wrapper {
83    ($wrapper_name:ident, $variant_name:ident) => {
84        pub struct $wrapper_name($variant_name);
85
86        impl From<$variant_name> for $wrapper_name {
87            fn from(value: $variant_name) -> Self {
88                Self(value)
89            }
90        }
91    };
92}
93
94impl_tiberius_wrapper!(ScalarImplTiberiusWrapper, ScalarImpl);
95impl_tiberius_wrapper!(TimeTiberiusWrapper, Time);
96impl_tiberius_wrapper!(DateTiberiusWrapper, Date);
97impl_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp);
98impl_tiberius_wrapper!(TimestamptzTiberiusWrapper, Timestamptz);
99impl_tiberius_wrapper!(DecimalTiberiusWrapper, Decimal);
100
101macro_rules! impl_chrono_tiberius_wrapper {
102    ($wrapper_name:ident, $variant_name:ident, $chrono:ty) => {
103        impl<'a> tiberius::IntoSql<'a> for $wrapper_name {
104            fn into_sql(self) -> tiberius::ColumnData<'a> {
105                self.0.0.into_sql()
106            }
107        }
108
109        impl<'a> tiberius::FromSql<'a> for $wrapper_name {
110            fn from_sql(
111                value: &'a tiberius::ColumnData<'static>,
112            ) -> tiberius::Result<Option<Self>> {
113                let instant = <$chrono>::from_sql(value)?;
114                let time = instant.map($variant_name::from).map($wrapper_name::from);
115                tiberius::Result::Ok(time)
116            }
117        }
118    };
119}
120
121impl_chrono_tiberius_wrapper!(TimeTiberiusWrapper, Time, NaiveTime);
122impl_chrono_tiberius_wrapper!(DateTiberiusWrapper, Date, NaiveDate);
123impl_chrono_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp, NaiveDateTime);
124
125impl<'a> tiberius::IntoSql<'a> for DecimalTiberiusWrapper {
126    fn into_sql(self) -> tiberius::ColumnData<'a> {
127        match self.0 {
128            Decimal::Normalized(d) => d.into_sql(),
129            Decimal::NaN => tiberius::ColumnData::Numeric(None),
130            Decimal::PositiveInf => tiberius::ColumnData::Numeric(None),
131            Decimal::NegativeInf => tiberius::ColumnData::Numeric(None),
132        }
133    }
134}
135
136impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper {
137    // TODO(kexiang): will sql server have inf/-inf/nan for decimal?
138    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
139        tiberius::Result::Ok(
140            RustDecimal::from_sql(value)?
141                .map(Decimal::Normalized)
142                .map(DecimalTiberiusWrapper::from),
143        )
144    }
145}
146
147impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper {
148    fn into_sql(self) -> tiberius::ColumnData<'a> {
149        self.0.to_datetime_utc().into_sql()
150    }
151}
152
153impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper {
154    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
155        let instant = DateTime::<Utc>::from_sql(value)?;
156        let time = instant
157            .map(Timestamptz::from)
158            .map(TimestamptzTiberiusWrapper::from);
159        tiberius::Result::Ok(time)
160    }
161}
162
163/// The following table shows the mapping between Rust types and Sql Server types in tiberius.
164/// |Rust Type|Sql Server Type|
165/// |`u8`|`tinyint`|
166/// |`i16`|`smallint`|
167/// |`i32`|`int`|
168/// |`i64`|`bigint`|
169/// |`f32`|`float(24)`|
170/// |`f64`|`float(53)`|
171/// |`bool`|`bit`|
172/// |`String`/`&str`|`nvarchar`/`varchar`/`nchar`/`char`/`ntext`/`text`|
173/// |`Vec<u8>`/`&[u8]`|`binary`/`varbinary`/`image`|
174/// |[`Uuid`]|`uniqueidentifier`|
175/// |[`Numeric`]|`numeric`/`decimal`|
176/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`|
177/// |[`XmlData`]|`xml`|
178/// |[`NaiveDateTime`] (with feature flag `chrono`)|`datetime`/`datetime2`/`smalldatetime`|
179/// |[`NaiveDate`] (with feature flag `chrono`)|`date`|
180/// |[`NaiveTime`] (with feature flag `chrono`)|`time`|
181/// |[`DateTime`] (with feature flag `chrono`)|`datetimeoffset`|
182///
183/// See the [`time`] module for more information about the date and time structs.
184///
185/// [`Row#get`]: struct.Row.html#method.get
186/// [`Row#try_get`]: struct.Row.html#method.try_get
187/// [`time`]: time/index.html
188/// [`Uuid`]: struct.Uuid.html
189/// [`Numeric`]: numeric/struct.Numeric.html
190/// [`Decimal`]: numeric/struct.Decimal.html
191/// [`XmlData`]: xml/struct.XmlData.html
192/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html
193/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html
194/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html
195/// [`DateTime`]: time/chrono/struct.DateTime.html
196impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper {
197    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
198        Ok(match &value {
199            tiberius::ColumnData::U8(_) => u8::from_sql(value)?
200                .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))),
201            tiberius::ColumnData::I16(_) => i16::from_sql(value)?
202                .map(ScalarImpl::from)
203                .map(ScalarImplTiberiusWrapper::from),
204            tiberius::ColumnData::I32(_) => i32::from_sql(value)?
205                .map(ScalarImpl::from)
206                .map(ScalarImplTiberiusWrapper::from),
207            tiberius::ColumnData::I64(_) => i64::from_sql(value)?
208                .map(ScalarImpl::from)
209                .map(ScalarImplTiberiusWrapper::from),
210            tiberius::ColumnData::F32(_) => f32::from_sql(value)?
211                .map(ScalarImpl::from)
212                .map(ScalarImplTiberiusWrapper::from),
213            tiberius::ColumnData::F64(_) => f64::from_sql(value)?
214                .map(ScalarImpl::from)
215                .map(ScalarImplTiberiusWrapper::from),
216            tiberius::ColumnData::Bit(_) => bool::from_sql(value)?
217                .map(ScalarImpl::from)
218                .map(ScalarImplTiberiusWrapper::from),
219            tiberius::ColumnData::String(_) => <&str>::from_sql(value)?
220                .map(ScalarImpl::from)
221                .map(ScalarImplTiberiusWrapper::from),
222            tiberius::ColumnData::Numeric(_) => DecimalTiberiusWrapper::from_sql(value)?
223                .map(|w| ScalarImpl::from(w.0))
224                .map(ScalarImplTiberiusWrapper::from),
225            tiberius::ColumnData::DateTime(_)
226            | tiberius::ColumnData::DateTime2(_)
227            | tiberius::ColumnData::SmallDateTime(_) => TimestampTiberiusWrapper::from_sql(value)?
228                .map(|w| ScalarImpl::from(w.0))
229                .map(ScalarImplTiberiusWrapper::from),
230            tiberius::ColumnData::Time(_) => TimeTiberiusWrapper::from_sql(value)?
231                .map(|w| ScalarImpl::from(w.0))
232                .map(ScalarImplTiberiusWrapper::from),
233            tiberius::ColumnData::Date(_) => DateTiberiusWrapper::from_sql(value)?
234                .map(|w| ScalarImpl::from(w.0))
235                .map(ScalarImplTiberiusWrapper::from),
236            tiberius::ColumnData::DateTimeOffset(_) => TimestamptzTiberiusWrapper::from_sql(value)?
237                .map(|w| ScalarImpl::from(w.0))
238                .map(ScalarImplTiberiusWrapper::from),
239            tiberius::ColumnData::Binary(_) => <&[u8]>::from_sql(value)?
240                .map(ScalarImpl::from)
241                .map(ScalarImplTiberiusWrapper::from),
242            tiberius::ColumnData::Guid(_) => <Uuid>::from_sql(value)?
243                .map(|uuid| uuid.to_string().to_uppercase())
244                .map(ScalarImpl::from)
245                .map(ScalarImplTiberiusWrapper::from),
246            tiberius::ColumnData::Xml(_) => <&XmlData>::from_sql(value)?
247                .map(|xml| xml.clone().into_string())
248                .map(ScalarImpl::from)
249                .map(ScalarImplTiberiusWrapper::from),
250        })
251    }
252}
253
254/// The following table shows the mapping between Rust types and Sql Server types in tiberius.
255/// |Rust type|Sql Server type|
256/// |--------|--------|
257/// |`u8`|`tinyint`|
258/// |`i16`|`smallint`|
259/// |`i32`|`int`|
260/// |`i64`|`bigint`|
261/// |`f32`|`float(24)`|
262/// |`f64`|`float(53)`|
263/// |`bool`|`bit`|
264/// |`String`/`&str` (< 4000 characters)|`nvarchar(4000)`|
265/// |`String`/`&str`|`nvarchar(max)`|
266/// |`Vec<u8>`/`&[u8]` (< 8000 bytes)|`varbinary(8000)`|
267/// |`Vec<u8>`/`&[u8]`|`varbinary(max)`|
268/// |[`Uuid`]|`uniqueidentifier`|
269/// |[`Numeric`]|`numeric`/`decimal`|
270/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`|
271/// |[`BigDecimal`] (with feature flag `bigdecimal`)|`numeric`/`decimal`|
272/// |[`XmlData`]|`xml`|
273/// |[`NaiveDate`] (with `chrono` feature, TDS 7.3 >)|`date`|
274/// |[`NaiveTime`] (with `chrono` feature, TDS 7.3 >)|`time`|
275/// |[`DateTime`] (with `chrono` feature, TDS 7.3 >)|`datetimeoffset`|
276/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.3 >)|`datetime2`|
277/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.2)|`datetime`|
278///
279/// It is possible to use some of the types to write into columns that are not
280/// of the same type. For example on systems following the TDS 7.3 standard (SQL
281/// Server 2008 and later), the chrono type `NaiveDateTime` can also be used to
282/// write to `datetime`, `datetime2` and `smalldatetime` columns. All string
283/// types can also be used with `ntext`, `text`, `varchar`, `nchar` and `char`
284/// columns. All binary types can also be used with `binary` and `image`
285/// columns.
286///
287/// See the [`time`] module for more information about the date and time structs.
288///
289/// [`Client#query`]: struct.Client.html#method.query
290/// [`Client#execute`]: struct.Client.html#method.execute
291/// [`time`]: time/index.html
292/// [`Uuid`]: struct.Uuid.html
293/// [`Numeric`]: numeric/struct.Numeric.html
294/// [`Decimal`]: numeric/struct.Decimal.html
295/// [`BigDecimal`]: numeric/struct.BigDecimal.html
296/// [`XmlData`]: xml/struct.XmlData.html
297/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html
298/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html
299/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html
300/// [`DateTime`]: time/chrono/struct.DateTime.html
301impl<'a> tiberius::IntoSql<'a> for ScalarImplTiberiusWrapper {
302    fn into_sql(self) -> tiberius::ColumnData<'a> {
303        match self.0 {
304            ScalarImpl::Int16(v) => v.into_sql(),
305            ScalarImpl::Int32(v) => v.into_sql(),
306            ScalarImpl::Int64(v) => v.into_sql(),
307            ScalarImpl::Float32(v) => v.0.into_sql(),
308            ScalarImpl::Float64(v) => v.0.into_sql(),
309            ScalarImpl::Bool(v) => v.into_sql(),
310            ScalarImpl::Decimal(v) => DecimalTiberiusWrapper::from(v).into_sql(),
311            ScalarImpl::Date(v) => DateTiberiusWrapper::from(v).into_sql(),
312            ScalarImpl::Timestamp(v) => TimestampTiberiusWrapper::from(v).into_sql(),
313            ScalarImpl::Timestamptz(v) => TimestamptzTiberiusWrapper::from(v).into_sql(),
314            ScalarImpl::Time(v) => TimeTiberiusWrapper::from(v).into_sql(),
315            // ScalarImpl::Bytea(v) => (*v.clone()).into_sql(),
316            value => {
317                // Utf8, Serial, Interval, Jsonb, Int256, Struct, List are not supported yet
318                unimplemented!("the sql server decoding for {:?} is unsupported", value);
319            }
320        }
321    }
322}