risingwave_connector/parser/
mysql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use mysql_async::Row as MysqlRow;
18use mysql_common::constants::ColumnFlags;
19use risingwave_common::catalog::Schema;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::row::OwnedRow;
22use thiserror_ext::AsReport;
23
24use crate::parser::utils::log_error;
25
26static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
27use anyhow::anyhow;
28use chrono::NaiveDate;
29use risingwave_common::bail;
30use risingwave_common::types::{
31    DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
32};
33use rust_decimal::Decimal as RustDecimal;
34
35macro_rules! handle_data_type {
36    ($row:expr, $i:expr, $name:expr, $typ:ty) => {{
37        match $row.take_opt::<Option<$typ>, _>($i) {
38            None => bail!("no value found at column: {}, index: {}", $name, $i),
39            Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))),
40            Some(Err(e)) => Err(anyhow::Error::new(e.clone())
41                .context("failed to deserialize MySQL value into rust value")
42                .context(format!(
43                    "column: {}, index: {}, rust_type: {}",
44                    $name,
45                    $i,
46                    stringify!($typ),
47                ))),
48        }
49    }};
50    ($row:expr, $i:expr, $name:expr, $typ:ty, $rw_type:ty) => {{
51        match $row.take_opt::<Option<$typ>, _>($i) {
52            None => bail!("no value found at column: {}, index: {}", $name, $i),
53            Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))),
54            Some(Err(e)) => Err(anyhow::Error::new(e.clone())
55                .context("failed to deserialize MySQL value into rw value")
56                .context(format!(
57                    "column: {}, index: {}, rw_type: {}",
58                    $name,
59                    $i,
60                    stringify!($rw_type),
61                ))),
62        }
63    }};
64}
65
66macro_rules! handle_data_type_with_signed {
67    (
68        $mysql_row:expr,
69        $mysql_datum_index:expr,
70        $column_name:expr,
71        $signed_type:ty,
72        $unsigned_type:ty
73    ) => {{
74        let column_flags = $mysql_row.columns()[$mysql_datum_index].flags();
75
76        if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) {
77            // UNSIGNED type: use unsigned type conversion, then convert to signed
78            match $mysql_row.take_opt::<Option<$unsigned_type>, _>($mysql_datum_index) {
79                // Note: We are intentionally converting unsigned to signed here.
80                // For example, 18446251075179777772u64 will be converted to -492998529773844i64.
81                Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))),
82                Some(Ok(None)) => Ok(None),
83                Some(Err(e)) => Err(anyhow::Error::new(e.clone())
84                    .context("failed to deserialize MySQL value into rust value")
85                    .context(format!(
86                        "column: {}, index: {}, rust_type: {}",
87                        $column_name,
88                        $mysql_datum_index,
89                        stringify!($unsigned_type),
90                    ))),
91                None => bail!(
92                    "no value found at column: {}, index: {}",
93                    $column_name,
94                    $mysql_datum_index
95                ),
96            }
97        } else {
98            // SIGNED type: use default signed type conversion
99            handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type)
100        }
101    }};
102}
103
104/// The decoding result can be interpreted as follows:
105/// Ok(value) => The value was found and successfully decoded.
106/// Err(error) => The value was found but could not be decoded,
107///               either because it was not supported,
108///               or there was an error during conversion.
109pub fn mysql_datum_to_rw_datum(
110    mysql_row: &mut MysqlRow,
111    mysql_datum_index: usize,
112    column_name: &str,
113    rw_data_type: &DataType,
114) -> Result<Datum, anyhow::Error> {
115    match rw_data_type {
116        DataType::Boolean => {
117            // TinyInt(1) is used to represent boolean in MySQL
118            // This handles backwards compatibility,
119            // before https://github.com/risingwavelabs/risingwave/pull/19071
120            // we permit boolean and tinyint(1) to be equivalent to boolean in RW.
121            if let Some(Ok(val)) = mysql_row.get_opt::<Option<bool>, _>(mysql_datum_index) {
122                return Ok(val.map(ScalarImpl::from));
123            }
124            // Bit(1)
125            match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
126                None => bail!(
127                    "no value found at column: {}, index: {}",
128                    column_name,
129                    mysql_datum_index
130                ),
131                Some(Ok(val)) => match val {
132                    None => Ok(None),
133                    Some(val) => match val.as_slice() {
134                        [0] => Ok(Some(ScalarImpl::from(false))),
135                        [1] => Ok(Some(ScalarImpl::from(true))),
136                        _ => Err(anyhow!("invalid value for boolean: {:?}", val)),
137                    },
138                },
139                Some(Err(e)) => Err(anyhow::Error::new(e)
140                    .context("failed to deserialize MySQL value into rust value")
141                    .context(format!(
142                        "column: {}, index: {}, rust_type: Vec<u8>",
143                        column_name, mysql_datum_index,
144                    ))),
145            }
146        }
147        DataType::Int16 => {
148            handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
149        }
150        DataType::Int32 => {
151            handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
152        }
153        DataType::Int64 => {
154            handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64)
155        }
156        DataType::Float32 => {
157            handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
158        }
159        DataType::Float64 => {
160            handle_data_type!(mysql_row, mysql_datum_index, column_name, f64)
161        }
162        DataType::Decimal => {
163            handle_data_type!(
164                mysql_row,
165                mysql_datum_index,
166                column_name,
167                RustDecimal,
168                Decimal
169            )
170        }
171        DataType::Varchar => {
172            handle_data_type!(mysql_row, mysql_datum_index, column_name, String)
173        }
174        DataType::Date => {
175            handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date)
176        }
177        DataType::Time => {
178            handle_data_type!(
179                mysql_row,
180                mysql_datum_index,
181                column_name,
182                chrono::NaiveTime,
183                Time
184            )
185        }
186        DataType::Timestamp => {
187            handle_data_type!(
188                mysql_row,
189                mysql_datum_index,
190                column_name,
191                chrono::NaiveDateTime,
192                Timestamp
193            )
194        }
195        DataType::Timestamptz => {
196            match mysql_row.take_opt::<Option<chrono::NaiveDateTime>, _>(mysql_datum_index) {
197                None => bail!(
198                    "no value found at column: {}, index: {}",
199                    column_name,
200                    mysql_datum_index
201                ),
202                Some(Ok(val)) => Ok(val.map(|v| {
203                    ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros()))
204                })),
205                Some(Err(err)) => Err(anyhow::Error::new(err)
206                    .context("failed to deserialize MySQL value into rust value")
207                    .context(format!(
208                        "column: {}, index: {}, rust_type: chrono::NaiveDateTime",
209                        column_name, mysql_datum_index,
210                    ))),
211            }
212        }
213        DataType::Bytea => match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
214            None => bail!(
215                "no value found at column: {}, index: {}",
216                column_name,
217                mysql_datum_index
218            ),
219            Some(Ok(val)) => Ok(val.map(ScalarImpl::from)),
220            Some(Err(err)) => Err(anyhow::Error::new(err)
221                .context("failed to deserialize MySQL value into rust value")
222                .context(format!(
223                    "column: {}, index: {}, rust_type: Vec<u8>",
224                    column_name, mysql_datum_index,
225                ))),
226        },
227        DataType::Jsonb => {
228            handle_data_type!(
229                mysql_row,
230                mysql_datum_index,
231                column_name,
232                serde_json::Value,
233                JsonbVal
234            )
235        }
236        DataType::Vector(_)
237        | DataType::Interval
238        | DataType::Struct(_)
239        | DataType::List(_)
240        | DataType::Int256
241        | DataType::Serial
242        | DataType::Map(_) => Err(anyhow!(
243            "unsupported data type: {}, set to null",
244            rw_data_type
245        )),
246    }
247}
248
249pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow {
250    let mut datums = vec![];
251    for i in 0..schema.fields.len() {
252        let rw_field = &schema.fields[i];
253        let name = rw_field.name.as_str();
254        let datum = match mysql_datum_to_rw_datum(mysql_row, i, name, &rw_field.data_type) {
255            Ok(val) => val,
256            Err(e) => {
257                log_error!(name, e, "parse column failed");
258                None
259            }
260        };
261        datums.push(datum);
262    }
263    OwnedRow::new(datums)
264}
265
266#[cfg(test)]
267mod tests {
268
269    use futures::pin_mut;
270    use mysql_async::Row as MySqlRow;
271    use mysql_async::prelude::*;
272    use risingwave_common::catalog::{Field, Schema};
273    use risingwave_common::row::Row;
274    use risingwave_common::types::DataType;
275    use tokio_stream::StreamExt;
276
277    use crate::parser::mysql_row_to_owned_row;
278
279    // manual test case
280    #[ignore]
281    #[tokio::test]
282    async fn test_convert_mysql_row_to_owned_row() {
283        let pool = mysql_async::Pool::new("mysql://root:123456@localhost:8306/mydb");
284
285        let t1schema = Schema::new(vec![
286            Field::with_name(DataType::Int32, "v1"),
287            Field::with_name(DataType::Int32, "v2"),
288            Field::with_name(DataType::Timestamptz, "v3"),
289        ]);
290
291        let mut conn = pool.get_conn().await.unwrap();
292        conn.exec_drop("SET time_zone = \"+08:00\"", ())
293            .await
294            .unwrap();
295
296        let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap();
297        let s = result_set.stream::<MySqlRow>().await.unwrap().unwrap();
298        let row_stream = s.map(|row| {
299            // convert mysql row into OwnedRow
300            let mut mysql_row = row.unwrap();
301            Ok::<_, anyhow::Error>(Some(mysql_row_to_owned_row(&mut mysql_row, &t1schema)))
302        });
303        pin_mut!(row_stream);
304        while let Some(row) = row_stream.next().await {
305            if let Ok(ro) = row
306                && ro.is_some()
307            {
308                let owned_row = ro.unwrap();
309                let d = owned_row.datum_at(2);
310                if let Some(scalar) = d {
311                    let v = scalar.into_timestamptz();
312                    println!("timestamp: {:?}", v);
313                }
314            }
315        }
316    }
317}