risingwave_connector/parser/
mysql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use mysql_async::Row as MysqlRow;
18use risingwave_common::catalog::Schema;
19use risingwave_common::log::LogSuppresser;
20use risingwave_common::row::OwnedRow;
21use thiserror_ext::AsReport;
22
23use crate::parser::utils::log_error;
24
25static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
26use anyhow::anyhow;
27use chrono::NaiveDate;
28use risingwave_common::bail;
29use risingwave_common::types::{
30    DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
31};
32use rust_decimal::Decimal as RustDecimal;
33
34macro_rules! handle_data_type {
35    ($row:expr, $i:expr, $name:expr, $typ:ty) => {{
36        match $row.take_opt::<Option<$typ>, _>($i) {
37            None => bail!("no value found at column: {}, index: {}", $name, $i),
38            Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))),
39            Some(Err(e)) => Err(anyhow::Error::new(e.clone())
40                .context("failed to deserialize MySQL value into rust value")
41                .context(format!(
42                    "column: {}, index: {}, rust_type: {}",
43                    $name,
44                    $i,
45                    stringify!($typ),
46                ))),
47        }
48    }};
49    ($row:expr, $i:expr, $name:expr, $typ:ty, $rw_type:ty) => {{
50        match $row.take_opt::<Option<$typ>, _>($i) {
51            None => bail!("no value found at column: {}, index: {}", $name, $i),
52            Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))),
53            Some(Err(e)) => Err(anyhow::Error::new(e.clone())
54                .context("failed to deserialize MySQL value into rw value")
55                .context(format!(
56                    "column: {}, index: {}, rw_type: {}",
57                    $name,
58                    $i,
59                    stringify!($rw_type),
60                ))),
61        }
62    }};
63}
64
65/// The decoding result can be interpreted as follows:
66/// Ok(value) => The value was found and successfully decoded.
67/// Err(error) => The value was found but could not be decoded,
68///               either because it was not supported,
69///               or there was an error during conversion.
70pub fn mysql_datum_to_rw_datum(
71    mysql_row: &mut MysqlRow,
72    mysql_datum_index: usize,
73    column_name: &str,
74    rw_data_type: &DataType,
75) -> Result<Datum, anyhow::Error> {
76    match rw_data_type {
77        DataType::Boolean => {
78            // TinyInt(1) is used to represent boolean in MySQL
79            // This handles backwards compatibility,
80            // before https://github.com/risingwavelabs/risingwave/pull/19071
81            // we permit boolean and tinyint(1) to be equivalent to boolean in RW.
82            if let Some(Ok(val)) = mysql_row.get_opt::<Option<bool>, _>(mysql_datum_index) {
83                let _ = mysql_row.take::<bool, _>(mysql_datum_index);
84                return Ok(val.map(ScalarImpl::from));
85            }
86            // Bit(1)
87            match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
88                None => bail!(
89                    "no value found at column: {}, index: {}",
90                    column_name,
91                    mysql_datum_index
92                ),
93                Some(Ok(val)) => match val {
94                    None => Ok(None),
95                    Some(val) => match val.as_slice() {
96                        [0] => Ok(Some(ScalarImpl::from(false))),
97                        [1] => Ok(Some(ScalarImpl::from(true))),
98                        _ => Err(anyhow!("invalid value for boolean: {:?}", val)),
99                    },
100                },
101                Some(Err(e)) => Err(anyhow::Error::new(e.clone())
102                    .context("failed to deserialize MySQL value into rust value")
103                    .context(format!(
104                        "column: {}, index: {}, rust_type: Vec<u8>",
105                        column_name, mysql_datum_index,
106                    ))),
107            }
108        }
109        DataType::Int16 => {
110            handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
111        }
112        DataType::Int32 => {
113            handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
114        }
115        DataType::Int64 => {
116            handle_data_type!(mysql_row, mysql_datum_index, column_name, i64)
117        }
118        DataType::Float32 => {
119            handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
120        }
121        DataType::Float64 => {
122            handle_data_type!(mysql_row, mysql_datum_index, column_name, f64)
123        }
124        DataType::Decimal => {
125            handle_data_type!(
126                mysql_row,
127                mysql_datum_index,
128                column_name,
129                RustDecimal,
130                Decimal
131            )
132        }
133        DataType::Varchar => {
134            handle_data_type!(mysql_row, mysql_datum_index, column_name, String)
135        }
136        DataType::Date => {
137            handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date)
138        }
139        DataType::Time => {
140            handle_data_type!(
141                mysql_row,
142                mysql_datum_index,
143                column_name,
144                chrono::NaiveTime,
145                Time
146            )
147        }
148        DataType::Timestamp => {
149            handle_data_type!(
150                mysql_row,
151                mysql_datum_index,
152                column_name,
153                chrono::NaiveDateTime,
154                Timestamp
155            )
156        }
157        DataType::Timestamptz => {
158            match mysql_row.take_opt::<Option<chrono::NaiveDateTime>, _>(mysql_datum_index) {
159                None => bail!(
160                    "no value found at column: {}, index: {}",
161                    column_name,
162                    mysql_datum_index
163                ),
164                Some(Ok(val)) => Ok(val.map(|v| {
165                    ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros()))
166                })),
167                Some(Err(err)) => Err(anyhow::Error::new(err.clone())
168                    .context("failed to deserialize MySQL value into rust value")
169                    .context(format!(
170                        "column: {}, index: {}, rust_type: chrono::NaiveDateTime",
171                        column_name, mysql_datum_index,
172                    ))),
173            }
174        }
175        DataType::Bytea => match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
176            None => bail!(
177                "no value found at column: {}, index: {}",
178                column_name,
179                mysql_datum_index
180            ),
181            Some(Ok(val)) => Ok(val.map(ScalarImpl::from)),
182            Some(Err(err)) => Err(anyhow::Error::new(err.clone())
183                .context("failed to deserialize MySQL value into rust value")
184                .context(format!(
185                    "column: {}, index: {}, rust_type: Vec<u8>",
186                    column_name, mysql_datum_index,
187                ))),
188        },
189        DataType::Jsonb => {
190            handle_data_type!(
191                mysql_row,
192                mysql_datum_index,
193                column_name,
194                serde_json::Value,
195                JsonbVal
196            )
197        }
198        DataType::Interval
199        | DataType::Struct(_)
200        | DataType::List(_)
201        | DataType::Int256
202        | DataType::Serial
203        | DataType::Map(_) => Err(anyhow!(
204            "unsupported data type: {}, set to null",
205            rw_data_type
206        )),
207    }
208}
209
210pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow {
211    let mut datums = vec![];
212    for i in 0..schema.fields.len() {
213        let rw_field = &schema.fields[i];
214        let name = rw_field.name.as_str();
215        let datum = match mysql_datum_to_rw_datum(mysql_row, i, name, &rw_field.data_type) {
216            Ok(val) => val,
217            Err(e) => {
218                log_error!(name, e, "parse column failed");
219                None
220            }
221        };
222        datums.push(datum);
223    }
224    OwnedRow::new(datums)
225}
226
227#[cfg(test)]
228mod tests {
229
230    use futures::pin_mut;
231    use mysql_async::Row as MySqlRow;
232    use mysql_async::prelude::*;
233    use risingwave_common::catalog::{Field, Schema};
234    use risingwave_common::row::Row;
235    use risingwave_common::types::DataType;
236    use tokio_stream::StreamExt;
237
238    use crate::parser::mysql_row_to_owned_row;
239
240    // manual test case
241    #[ignore]
242    #[tokio::test]
243    async fn test_convert_mysql_row_to_owned_row() {
244        let pool = mysql_async::Pool::new("mysql://root:123456@localhost:8306/mydb");
245
246        let t1schema = Schema::new(vec![
247            Field::with_name(DataType::Int32, "v1"),
248            Field::with_name(DataType::Int32, "v2"),
249            Field::with_name(DataType::Timestamptz, "v3"),
250        ]);
251
252        let mut conn = pool.get_conn().await.unwrap();
253        conn.exec_drop("SET time_zone = \"+08:00\"", ())
254            .await
255            .unwrap();
256
257        let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap();
258        let s = result_set.stream::<MySqlRow>().await.unwrap().unwrap();
259        let row_stream = s.map(|row| {
260            // convert mysql row into OwnedRow
261            let mut mysql_row = row.unwrap();
262            Ok::<_, anyhow::Error>(Some(mysql_row_to_owned_row(&mut mysql_row, &t1schema)))
263        });
264        pin_mut!(row_stream);
265        while let Some(row) = row_stream.next().await {
266            if let Ok(ro) = row
267                && ro.is_some()
268            {
269                let owned_row = ro.unwrap();
270                let d = owned_row.datum_at(2);
271                if let Some(scalar) = d {
272                    let v = scalar.into_timestamptz();
273                    println!("timestamp: {:?}", v);
274                }
275            }
276        }
277    }
278}