1use std::sync::LazyLock;
16
17use mysql_async::Row as MysqlRow;
18use mysql_common::constants::ColumnFlags;
19use risingwave_common::catalog::Schema;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::row::OwnedRow;
22use thiserror_ext::AsReport;
23
24use crate::parser::utils::log_error;
25
26static LOG_SUPPRESSOR: LazyLock<LogSuppressor> = LazyLock::new(LogSuppressor::default);
27use anyhow::anyhow;
28use chrono::NaiveDate;
29use risingwave_common::bail;
30use risingwave_common::types::{
31 DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
32};
33use rust_decimal::Decimal as RustDecimal;
34
35macro_rules! handle_data_type {
36 ($row:expr, $i:expr, $name:expr, $typ:ty) => {{
37 match $row.take_opt::<Option<$typ>, _>($i) {
38 None => bail!("no value found at column: {}, index: {}", $name, $i),
39 Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))),
40 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
41 .context("failed to deserialize MySQL value into rust value")
42 .context(format!(
43 "column: {}, index: {}, rust_type: {}",
44 $name,
45 $i,
46 stringify!($typ),
47 ))),
48 }
49 }};
50 ($row:expr, $i:expr, $name:expr, $typ:ty, $rw_type:ty) => {{
51 match $row.take_opt::<Option<$typ>, _>($i) {
52 None => bail!("no value found at column: {}, index: {}", $name, $i),
53 Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))),
54 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
55 .context("failed to deserialize MySQL value into rw value")
56 .context(format!(
57 "column: {}, index: {}, rw_type: {}",
58 $name,
59 $i,
60 stringify!($rw_type),
61 ))),
62 }
63 }};
64}
65
66macro_rules! handle_data_type_with_signed {
67 (
68 $mysql_row:expr,
69 $mysql_datum_index:expr,
70 $column_name:expr,
71 $signed_type:ty,
72 $unsigned_type:ty
73 ) => {{
74 let column_flags = $mysql_row.columns()[$mysql_datum_index].flags();
75
76 if column_flags.contains(ColumnFlags::UNSIGNED_FLAG) {
77 match $mysql_row.take_opt::<Option<$unsigned_type>, _>($mysql_datum_index) {
79 Some(Ok(Some(val))) => Ok(Some(ScalarImpl::from(val as $signed_type))),
82 Some(Ok(None)) => Ok(None),
83 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
84 .context("failed to deserialize MySQL value into rust value")
85 .context(format!(
86 "column: {}, index: {}, rust_type: {}",
87 $column_name,
88 $mysql_datum_index,
89 stringify!($unsigned_type),
90 ))),
91 None => bail!(
92 "no value found at column: {}, index: {}",
93 $column_name,
94 $mysql_datum_index
95 ),
96 }
97 } else {
98 handle_data_type!($mysql_row, $mysql_datum_index, $column_name, $signed_type)
100 }
101 }};
102}
103
104pub fn mysql_datum_to_rw_datum(
110 mysql_row: &mut MysqlRow,
111 mysql_datum_index: usize,
112 column_name: &str,
113 rw_data_type: &DataType,
114) -> Result<Datum, anyhow::Error> {
115 match rw_data_type {
116 DataType::Boolean => {
117 if let Some(Ok(val)) = mysql_row.get_opt::<Option<bool>, _>(mysql_datum_index) {
122 return Ok(val.map(ScalarImpl::from));
123 }
124 match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
126 None => bail!(
127 "no value found at column: {}, index: {}",
128 column_name,
129 mysql_datum_index
130 ),
131 Some(Ok(val)) => match val {
132 None => Ok(None),
133 Some(val) => match val.as_slice() {
134 [0] => Ok(Some(ScalarImpl::from(false))),
135 [1] => Ok(Some(ScalarImpl::from(true))),
136 _ => Err(anyhow!("invalid value for boolean: {:?}", val)),
137 },
138 },
139 Some(Err(e)) => Err(anyhow::Error::new(e)
140 .context("failed to deserialize MySQL value into rust value")
141 .context(format!(
142 "column: {}, index: {}, rust_type: Vec<u8>",
143 column_name, mysql_datum_index,
144 ))),
145 }
146 }
147 DataType::Int16 => {
148 handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
149 }
150 DataType::Int32 => {
151 handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
152 }
153 DataType::Int64 => {
154 handle_data_type_with_signed!(mysql_row, mysql_datum_index, column_name, i64, u64)
155 }
156 DataType::Float32 => {
157 handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
158 }
159 DataType::Float64 => {
160 handle_data_type!(mysql_row, mysql_datum_index, column_name, f64)
161 }
162 DataType::Decimal => {
163 handle_data_type!(
164 mysql_row,
165 mysql_datum_index,
166 column_name,
167 RustDecimal,
168 Decimal
169 )
170 }
171 DataType::Varchar => {
172 handle_data_type!(mysql_row, mysql_datum_index, column_name, String)
173 }
174 DataType::Date => {
175 handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date)
176 }
177 DataType::Time => {
178 handle_data_type!(
179 mysql_row,
180 mysql_datum_index,
181 column_name,
182 chrono::NaiveTime,
183 Time
184 )
185 }
186 DataType::Timestamp => {
187 handle_data_type!(
188 mysql_row,
189 mysql_datum_index,
190 column_name,
191 chrono::NaiveDateTime,
192 Timestamp
193 )
194 }
195 DataType::Timestamptz => {
196 match mysql_row.take_opt::<Option<chrono::NaiveDateTime>, _>(mysql_datum_index) {
197 None => bail!(
198 "no value found at column: {}, index: {}",
199 column_name,
200 mysql_datum_index
201 ),
202 Some(Ok(val)) => Ok(val.map(|v| {
203 ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros()))
204 })),
205 Some(Err(err)) => Err(anyhow::Error::new(err)
206 .context("failed to deserialize MySQL value into rust value")
207 .context(format!(
208 "column: {}, index: {}, rust_type: chrono::NaiveDateTime",
209 column_name, mysql_datum_index,
210 ))),
211 }
212 }
213 DataType::Bytea => match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
214 None => bail!(
215 "no value found at column: {}, index: {}",
216 column_name,
217 mysql_datum_index
218 ),
219 Some(Ok(val)) => Ok(val.map(ScalarImpl::from)),
220 Some(Err(err)) => Err(anyhow::Error::new(err)
221 .context("failed to deserialize MySQL value into rust value")
222 .context(format!(
223 "column: {}, index: {}, rust_type: Vec<u8>",
224 column_name, mysql_datum_index,
225 ))),
226 },
227 DataType::Jsonb => {
228 handle_data_type!(
229 mysql_row,
230 mysql_datum_index,
231 column_name,
232 serde_json::Value,
233 JsonbVal
234 )
235 }
236 DataType::Vector(_)
237 | DataType::Interval
238 | DataType::Struct(_)
239 | DataType::List(_)
240 | DataType::Int256
241 | DataType::Serial
242 | DataType::Map(_) => Err(anyhow!(
243 "unsupported data type: {}, set to null",
244 rw_data_type
245 )),
246 }
247}
248
249pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow {
250 let mut datums = vec![];
251 for i in 0..schema.fields.len() {
252 let rw_field = &schema.fields[i];
253 let name = rw_field.name.as_str();
254 let datum = match mysql_datum_to_rw_datum(mysql_row, i, name, &rw_field.data_type) {
255 Ok(val) => val,
256 Err(e) => {
257 log_error!(name, e, "parse column failed");
258 None
259 }
260 };
261 datums.push(datum);
262 }
263 OwnedRow::new(datums)
264}
265
266#[cfg(test)]
267mod tests {
268
269 use futures::pin_mut;
270 use mysql_async::Row as MySqlRow;
271 use mysql_async::prelude::*;
272 use risingwave_common::catalog::{Field, Schema};
273 use risingwave_common::row::Row;
274 use risingwave_common::types::DataType;
275 use tokio_stream::StreamExt;
276
277 use crate::parser::mysql_row_to_owned_row;
278
279 #[ignore]
281 #[tokio::test]
282 async fn test_convert_mysql_row_to_owned_row() {
283 let pool = mysql_async::Pool::new("mysql://root:123456@localhost:8306/mydb");
284
285 let t1schema = Schema::new(vec![
286 Field::with_name(DataType::Int32, "v1"),
287 Field::with_name(DataType::Int32, "v2"),
288 Field::with_name(DataType::Timestamptz, "v3"),
289 ]);
290
291 let mut conn = pool.get_conn().await.unwrap();
292 conn.exec_drop("SET time_zone = \"+08:00\"", ())
293 .await
294 .unwrap();
295
296 let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap();
297 let s = result_set.stream::<MySqlRow>().await.unwrap().unwrap();
298 let row_stream = s.map(|row| {
299 let mut mysql_row = row.unwrap();
301 Ok::<_, anyhow::Error>(Some(mysql_row_to_owned_row(&mut mysql_row, &t1schema)))
302 });
303 pin_mut!(row_stream);
304 while let Some(row) = row_stream.next().await {
305 if let Ok(ro) = row
306 && ro.is_some()
307 {
308 let owned_row = ro.unwrap();
309 let d = owned_row.datum_at(2);
310 if let Some(scalar) = d {
311 let v = scalar.into_timestamptz();
312 println!("timestamp: {:?}", v);
313 }
314 }
315 }
316 }
317}