risingwave_connector/parser/
sql_server.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
18use risingwave_common::catalog::Schema;
19use risingwave_common::log::LogSuppresser;
20use risingwave_common::row::OwnedRow;
21use risingwave_common::types::{Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
22use rust_decimal::Decimal as RustDecimal;
23use thiserror_ext::AsReport;
24use tiberius::Row;
25use tiberius::xml::XmlData;
26use uuid::Uuid;
27
28use crate::parser::utils::log_error;
29
30static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
31
32pub fn sql_server_row_to_owned_row(row: &mut Row, schema: &Schema) -> OwnedRow {
33    let mut datums: Vec<Option<ScalarImpl>> = vec![];
34    for i in 0..schema.fields.len() {
35        let rw_field = &schema.fields[i];
36        let name = rw_field.name.as_str();
37        let datum = match row.try_get::<ScalarImplTiberiusWrapper, usize>(i) {
38            Ok(datum) => datum.map(|d| d.0),
39            Err(err) => {
40                log_error!(name, err, "parse column failed");
41                None
42            }
43        };
44        datums.push(datum);
45    }
46    OwnedRow::new(datums)
47}
48macro_rules! impl_tiberius_wrapper {
49    ($wrapper_name:ident, $variant_name:ident) => {
50        pub struct $wrapper_name($variant_name);
51
52        impl From<$variant_name> for $wrapper_name {
53            fn from(value: $variant_name) -> Self {
54                Self(value)
55            }
56        }
57    };
58}
59
60impl_tiberius_wrapper!(ScalarImplTiberiusWrapper, ScalarImpl);
61impl_tiberius_wrapper!(TimeTiberiusWrapper, Time);
62impl_tiberius_wrapper!(DateTiberiusWrapper, Date);
63impl_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp);
64impl_tiberius_wrapper!(TimestamptzTiberiusWrapper, Timestamptz);
65impl_tiberius_wrapper!(DecimalTiberiusWrapper, Decimal);
66
67macro_rules! impl_chrono_tiberius_wrapper {
68    ($wrapper_name:ident, $variant_name:ident, $chrono:ty) => {
69        impl<'a> tiberius::IntoSql<'a> for $wrapper_name {
70            fn into_sql(self) -> tiberius::ColumnData<'a> {
71                self.0.0.into_sql()
72            }
73        }
74
75        impl<'a> tiberius::FromSql<'a> for $wrapper_name {
76            fn from_sql(
77                value: &'a tiberius::ColumnData<'static>,
78            ) -> tiberius::Result<Option<Self>> {
79                let instant = <$chrono>::from_sql(value)?;
80                let time = instant.map($variant_name::from).map($wrapper_name::from);
81                tiberius::Result::Ok(time)
82            }
83        }
84    };
85}
86
87impl_chrono_tiberius_wrapper!(TimeTiberiusWrapper, Time, NaiveTime);
88impl_chrono_tiberius_wrapper!(DateTiberiusWrapper, Date, NaiveDate);
89impl_chrono_tiberius_wrapper!(TimestampTiberiusWrapper, Timestamp, NaiveDateTime);
90
91impl<'a> tiberius::IntoSql<'a> for DecimalTiberiusWrapper {
92    fn into_sql(self) -> tiberius::ColumnData<'a> {
93        match self.0 {
94            Decimal::Normalized(d) => d.into_sql(),
95            Decimal::NaN => tiberius::ColumnData::Numeric(None),
96            Decimal::PositiveInf => tiberius::ColumnData::Numeric(None),
97            Decimal::NegativeInf => tiberius::ColumnData::Numeric(None),
98        }
99    }
100}
101
102impl<'a> tiberius::FromSql<'a> for DecimalTiberiusWrapper {
103    // TODO(kexiang): will sql server have inf/-inf/nan for decimal?
104    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
105        tiberius::Result::Ok(
106            RustDecimal::from_sql(value)?
107                .map(Decimal::Normalized)
108                .map(DecimalTiberiusWrapper::from),
109        )
110    }
111}
112
113impl<'a> tiberius::IntoSql<'a> for TimestamptzTiberiusWrapper {
114    fn into_sql(self) -> tiberius::ColumnData<'a> {
115        self.0.to_datetime_utc().into_sql()
116    }
117}
118
119impl<'a> tiberius::FromSql<'a> for TimestamptzTiberiusWrapper {
120    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
121        let instant = DateTime::<Utc>::from_sql(value)?;
122        let time = instant
123            .map(Timestamptz::from)
124            .map(TimestamptzTiberiusWrapper::from);
125        tiberius::Result::Ok(time)
126    }
127}
128
129/// The following table shows the mapping between Rust types and Sql Server types in tiberius.
130/// |Rust Type|Sql Server Type|
131/// |`u8`|`tinyint`|
132/// |`i16`|`smallint`|
133/// |`i32`|`int`|
134/// |`i64`|`bigint`|
135/// |`f32`|`float(24)`|
136/// |`f64`|`float(53)`|
137/// |`bool`|`bit`|
138/// |`String`/`&str`|`nvarchar`/`varchar`/`nchar`/`char`/`ntext`/`text`|
139/// |`Vec<u8>`/`&[u8]`|`binary`/`varbinary`/`image`|
140/// |[`Uuid`]|`uniqueidentifier`|
141/// |[`Numeric`]|`numeric`/`decimal`|
142/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`|
143/// |[`XmlData`]|`xml`|
144/// |[`NaiveDateTime`] (with feature flag `chrono`)|`datetime`/`datetime2`/`smalldatetime`|
145/// |[`NaiveDate`] (with feature flag `chrono`)|`date`|
146/// |[`NaiveTime`] (with feature flag `chrono`)|`time`|
147/// |[`DateTime`] (with feature flag `chrono`)|`datetimeoffset`|
148///
149/// See the [`time`] module for more information about the date and time structs.
150///
151/// [`Row#get`]: struct.Row.html#method.get
152/// [`Row#try_get`]: struct.Row.html#method.try_get
153/// [`time`]: time/index.html
154/// [`Uuid`]: struct.Uuid.html
155/// [`Numeric`]: numeric/struct.Numeric.html
156/// [`Decimal`]: numeric/struct.Decimal.html
157/// [`XmlData`]: xml/struct.XmlData.html
158/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html
159/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html
160/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html
161/// [`DateTime`]: time/chrono/struct.DateTime.html
162impl<'a> tiberius::FromSql<'a> for ScalarImplTiberiusWrapper {
163    fn from_sql(value: &'a tiberius::ColumnData<'static>) -> tiberius::Result<Option<Self>> {
164        Ok(match &value {
165            tiberius::ColumnData::U8(_) => u8::from_sql(value)?
166                .map(|v| ScalarImplTiberiusWrapper::from(ScalarImpl::from(v as i16))),
167            tiberius::ColumnData::I16(_) => i16::from_sql(value)?
168                .map(ScalarImpl::from)
169                .map(ScalarImplTiberiusWrapper::from),
170            tiberius::ColumnData::I32(_) => i32::from_sql(value)?
171                .map(ScalarImpl::from)
172                .map(ScalarImplTiberiusWrapper::from),
173            tiberius::ColumnData::I64(_) => i64::from_sql(value)?
174                .map(ScalarImpl::from)
175                .map(ScalarImplTiberiusWrapper::from),
176            tiberius::ColumnData::F32(_) => f32::from_sql(value)?
177                .map(ScalarImpl::from)
178                .map(ScalarImplTiberiusWrapper::from),
179            tiberius::ColumnData::F64(_) => f64::from_sql(value)?
180                .map(ScalarImpl::from)
181                .map(ScalarImplTiberiusWrapper::from),
182            tiberius::ColumnData::Bit(_) => bool::from_sql(value)?
183                .map(ScalarImpl::from)
184                .map(ScalarImplTiberiusWrapper::from),
185            tiberius::ColumnData::String(_) => <&str>::from_sql(value)?
186                .map(ScalarImpl::from)
187                .map(ScalarImplTiberiusWrapper::from),
188            tiberius::ColumnData::Numeric(_) => DecimalTiberiusWrapper::from_sql(value)?
189                .map(|w| ScalarImpl::from(w.0))
190                .map(ScalarImplTiberiusWrapper::from),
191            tiberius::ColumnData::DateTime(_)
192            | tiberius::ColumnData::DateTime2(_)
193            | tiberius::ColumnData::SmallDateTime(_) => TimestampTiberiusWrapper::from_sql(value)?
194                .map(|w| ScalarImpl::from(w.0))
195                .map(ScalarImplTiberiusWrapper::from),
196            tiberius::ColumnData::Time(_) => TimeTiberiusWrapper::from_sql(value)?
197                .map(|w| ScalarImpl::from(w.0))
198                .map(ScalarImplTiberiusWrapper::from),
199            tiberius::ColumnData::Date(_) => DateTiberiusWrapper::from_sql(value)?
200                .map(|w| ScalarImpl::from(w.0))
201                .map(ScalarImplTiberiusWrapper::from),
202            tiberius::ColumnData::DateTimeOffset(_) => TimestamptzTiberiusWrapper::from_sql(value)?
203                .map(|w| ScalarImpl::from(w.0))
204                .map(ScalarImplTiberiusWrapper::from),
205            tiberius::ColumnData::Binary(_) => <&[u8]>::from_sql(value)?
206                .map(ScalarImpl::from)
207                .map(ScalarImplTiberiusWrapper::from),
208            tiberius::ColumnData::Guid(_) => <Uuid>::from_sql(value)?
209                .map(|uuid| uuid.to_string().to_uppercase())
210                .map(ScalarImpl::from)
211                .map(ScalarImplTiberiusWrapper::from),
212            tiberius::ColumnData::Xml(_) => <&XmlData>::from_sql(value)?
213                .map(|xml| xml.clone().into_string())
214                .map(ScalarImpl::from)
215                .map(ScalarImplTiberiusWrapper::from),
216        })
217    }
218}
219
220/// The following table shows the mapping between Rust types and Sql Server types in tiberius.
221/// |Rust type|Sql Server type|
222/// |--------|--------|
223/// |`u8`|`tinyint`|
224/// |`i16`|`smallint`|
225/// |`i32`|`int`|
226/// |`i64`|`bigint`|
227/// |`f32`|`float(24)`|
228/// |`f64`|`float(53)`|
229/// |`bool`|`bit`|
230/// |`String`/`&str` (< 4000 characters)|`nvarchar(4000)`|
231/// |`String`/`&str`|`nvarchar(max)`|
232/// |`Vec<u8>`/`&[u8]` (< 8000 bytes)|`varbinary(8000)`|
233/// |`Vec<u8>`/`&[u8]`|`varbinary(max)`|
234/// |[`Uuid`]|`uniqueidentifier`|
235/// |[`Numeric`]|`numeric`/`decimal`|
236/// |[`Decimal`] (with feature flag `rust_decimal`)|`numeric`/`decimal`|
237/// |[`BigDecimal`] (with feature flag `bigdecimal`)|`numeric`/`decimal`|
238/// |[`XmlData`]|`xml`|
239/// |[`NaiveDate`] (with `chrono` feature, TDS 7.3 >)|`date`|
240/// |[`NaiveTime`] (with `chrono` feature, TDS 7.3 >)|`time`|
241/// |[`DateTime`] (with `chrono` feature, TDS 7.3 >)|`datetimeoffset`|
242/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.3 >)|`datetime2`|
243/// |[`NaiveDateTime`] (with `chrono` feature, TDS 7.2)|`datetime`|
244///
245/// It is possible to use some of the types to write into columns that are not
246/// of the same type. For example on systems following the TDS 7.3 standard (SQL
247/// Server 2008 and later), the chrono type `NaiveDateTime` can also be used to
248/// write to `datetime`, `datetime2` and `smalldatetime` columns. All string
249/// types can also be used with `ntext`, `text`, `varchar`, `nchar` and `char`
250/// columns. All binary types can also be used with `binary` and `image`
251/// columns.
252///
253/// See the [`time`] module for more information about the date and time structs.
254///
255/// [`Client#query`]: struct.Client.html#method.query
256/// [`Client#execute`]: struct.Client.html#method.execute
257/// [`time`]: time/index.html
258/// [`Uuid`]: struct.Uuid.html
259/// [`Numeric`]: numeric/struct.Numeric.html
260/// [`Decimal`]: numeric/struct.Decimal.html
261/// [`BigDecimal`]: numeric/struct.BigDecimal.html
262/// [`XmlData`]: xml/struct.XmlData.html
263/// [`NaiveDateTime`]: time/chrono/struct.NaiveDateTime.html
264/// [`NaiveDate`]: time/chrono/struct.NaiveDate.html
265/// [`NaiveTime`]: time/chrono/struct.NaiveTime.html
266/// [`DateTime`]: time/chrono/struct.DateTime.html
267impl<'a> tiberius::IntoSql<'a> for ScalarImplTiberiusWrapper {
268    fn into_sql(self) -> tiberius::ColumnData<'a> {
269        match self.0 {
270            ScalarImpl::Int16(v) => v.into_sql(),
271            ScalarImpl::Int32(v) => v.into_sql(),
272            ScalarImpl::Int64(v) => v.into_sql(),
273            ScalarImpl::Float32(v) => v.0.into_sql(),
274            ScalarImpl::Float64(v) => v.0.into_sql(),
275            ScalarImpl::Bool(v) => v.into_sql(),
276            ScalarImpl::Decimal(v) => DecimalTiberiusWrapper::from(v).into_sql(),
277            ScalarImpl::Date(v) => DateTiberiusWrapper::from(v).into_sql(),
278            ScalarImpl::Timestamp(v) => TimestampTiberiusWrapper::from(v).into_sql(),
279            ScalarImpl::Timestamptz(v) => TimestamptzTiberiusWrapper::from(v).into_sql(),
280            ScalarImpl::Time(v) => TimeTiberiusWrapper::from(v).into_sql(),
281            // ScalarImpl::Bytea(v) => (*v.clone()).into_sql(),
282            value => {
283                // Utf8, Serial, Interval, Jsonb, Int256, Struct, List are not supported yet
284                unimplemented!("the sql server decoding for {:?} is unsupported", value);
285            }
286        }
287    }
288}