risingwave_connector/parser/
mysql.rs1use std::sync::LazyLock;
16
17use mysql_async::Row as MysqlRow;
18use risingwave_common::catalog::Schema;
19use risingwave_common::log::LogSuppresser;
20use risingwave_common::row::OwnedRow;
21use thiserror_ext::AsReport;
22
23use crate::parser::utils::log_error;
24
25static LOG_SUPPERSSER: LazyLock<LogSuppresser> = LazyLock::new(LogSuppresser::default);
26use anyhow::anyhow;
27use chrono::NaiveDate;
28use risingwave_common::bail;
29use risingwave_common::types::{
30 DataType, Date, Datum, Decimal, JsonbVal, ScalarImpl, Time, Timestamp, Timestamptz,
31};
32use rust_decimal::Decimal as RustDecimal;
33
34macro_rules! handle_data_type {
35 ($row:expr, $i:expr, $name:expr, $typ:ty) => {{
36 match $row.take_opt::<Option<$typ>, _>($i) {
37 None => bail!("no value found at column: {}, index: {}", $name, $i),
38 Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(v))),
39 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
40 .context("failed to deserialize MySQL value into rust value")
41 .context(format!(
42 "column: {}, index: {}, rust_type: {}",
43 $name,
44 $i,
45 stringify!($typ),
46 ))),
47 }
48 }};
49 ($row:expr, $i:expr, $name:expr, $typ:ty, $rw_type:ty) => {{
50 match $row.take_opt::<Option<$typ>, _>($i) {
51 None => bail!("no value found at column: {}, index: {}", $name, $i),
52 Some(Ok(val)) => Ok(val.map(|v| ScalarImpl::from(<$rw_type>::from(v)))),
53 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
54 .context("failed to deserialize MySQL value into rw value")
55 .context(format!(
56 "column: {}, index: {}, rw_type: {}",
57 $name,
58 $i,
59 stringify!($rw_type),
60 ))),
61 }
62 }};
63}
64
65pub fn mysql_datum_to_rw_datum(
71 mysql_row: &mut MysqlRow,
72 mysql_datum_index: usize,
73 column_name: &str,
74 rw_data_type: &DataType,
75) -> Result<Datum, anyhow::Error> {
76 match rw_data_type {
77 DataType::Boolean => {
78 if let Some(Ok(val)) = mysql_row.get_opt::<Option<bool>, _>(mysql_datum_index) {
83 let _ = mysql_row.take::<bool, _>(mysql_datum_index);
84 return Ok(val.map(ScalarImpl::from));
85 }
86 match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
88 None => bail!(
89 "no value found at column: {}, index: {}",
90 column_name,
91 mysql_datum_index
92 ),
93 Some(Ok(val)) => match val {
94 None => Ok(None),
95 Some(val) => match val.as_slice() {
96 [0] => Ok(Some(ScalarImpl::from(false))),
97 [1] => Ok(Some(ScalarImpl::from(true))),
98 _ => Err(anyhow!("invalid value for boolean: {:?}", val)),
99 },
100 },
101 Some(Err(e)) => Err(anyhow::Error::new(e.clone())
102 .context("failed to deserialize MySQL value into rust value")
103 .context(format!(
104 "column: {}, index: {}, rust_type: Vec<u8>",
105 column_name, mysql_datum_index,
106 ))),
107 }
108 }
109 DataType::Int16 => {
110 handle_data_type!(mysql_row, mysql_datum_index, column_name, i16)
111 }
112 DataType::Int32 => {
113 handle_data_type!(mysql_row, mysql_datum_index, column_name, i32)
114 }
115 DataType::Int64 => {
116 handle_data_type!(mysql_row, mysql_datum_index, column_name, i64)
117 }
118 DataType::Float32 => {
119 handle_data_type!(mysql_row, mysql_datum_index, column_name, f32)
120 }
121 DataType::Float64 => {
122 handle_data_type!(mysql_row, mysql_datum_index, column_name, f64)
123 }
124 DataType::Decimal => {
125 handle_data_type!(
126 mysql_row,
127 mysql_datum_index,
128 column_name,
129 RustDecimal,
130 Decimal
131 )
132 }
133 DataType::Varchar => {
134 handle_data_type!(mysql_row, mysql_datum_index, column_name, String)
135 }
136 DataType::Date => {
137 handle_data_type!(mysql_row, mysql_datum_index, column_name, NaiveDate, Date)
138 }
139 DataType::Time => {
140 handle_data_type!(
141 mysql_row,
142 mysql_datum_index,
143 column_name,
144 chrono::NaiveTime,
145 Time
146 )
147 }
148 DataType::Timestamp => {
149 handle_data_type!(
150 mysql_row,
151 mysql_datum_index,
152 column_name,
153 chrono::NaiveDateTime,
154 Timestamp
155 )
156 }
157 DataType::Timestamptz => {
158 match mysql_row.take_opt::<Option<chrono::NaiveDateTime>, _>(mysql_datum_index) {
159 None => bail!(
160 "no value found at column: {}, index: {}",
161 column_name,
162 mysql_datum_index
163 ),
164 Some(Ok(val)) => Ok(val.map(|v| {
165 ScalarImpl::from(Timestamptz::from_micros(v.and_utc().timestamp_micros()))
166 })),
167 Some(Err(err)) => Err(anyhow::Error::new(err.clone())
168 .context("failed to deserialize MySQL value into rust value")
169 .context(format!(
170 "column: {}, index: {}, rust_type: chrono::NaiveDateTime",
171 column_name, mysql_datum_index,
172 ))),
173 }
174 }
175 DataType::Bytea => match mysql_row.take_opt::<Option<Vec<u8>>, _>(mysql_datum_index) {
176 None => bail!(
177 "no value found at column: {}, index: {}",
178 column_name,
179 mysql_datum_index
180 ),
181 Some(Ok(val)) => Ok(val.map(ScalarImpl::from)),
182 Some(Err(err)) => Err(anyhow::Error::new(err.clone())
183 .context("failed to deserialize MySQL value into rust value")
184 .context(format!(
185 "column: {}, index: {}, rust_type: Vec<u8>",
186 column_name, mysql_datum_index,
187 ))),
188 },
189 DataType::Jsonb => {
190 handle_data_type!(
191 mysql_row,
192 mysql_datum_index,
193 column_name,
194 serde_json::Value,
195 JsonbVal
196 )
197 }
198 DataType::Interval
199 | DataType::Struct(_)
200 | DataType::List(_)
201 | DataType::Int256
202 | DataType::Serial
203 | DataType::Map(_) => Err(anyhow!(
204 "unsupported data type: {}, set to null",
205 rw_data_type
206 )),
207 }
208}
209
210pub fn mysql_row_to_owned_row(mysql_row: &mut MysqlRow, schema: &Schema) -> OwnedRow {
211 let mut datums = vec![];
212 for i in 0..schema.fields.len() {
213 let rw_field = &schema.fields[i];
214 let name = rw_field.name.as_str();
215 let datum = match mysql_datum_to_rw_datum(mysql_row, i, name, &rw_field.data_type) {
216 Ok(val) => val,
217 Err(e) => {
218 log_error!(name, e, "parse column failed");
219 None
220 }
221 };
222 datums.push(datum);
223 }
224 OwnedRow::new(datums)
225}
226
227#[cfg(test)]
228mod tests {
229
230 use futures::pin_mut;
231 use mysql_async::Row as MySqlRow;
232 use mysql_async::prelude::*;
233 use risingwave_common::catalog::{Field, Schema};
234 use risingwave_common::row::Row;
235 use risingwave_common::types::DataType;
236 use tokio_stream::StreamExt;
237
238 use crate::parser::mysql_row_to_owned_row;
239
240 #[ignore]
242 #[tokio::test]
243 async fn test_convert_mysql_row_to_owned_row() {
244 let pool = mysql_async::Pool::new("mysql://root:123456@localhost:8306/mydb");
245
246 let t1schema = Schema::new(vec![
247 Field::with_name(DataType::Int32, "v1"),
248 Field::with_name(DataType::Int32, "v2"),
249 Field::with_name(DataType::Timestamptz, "v3"),
250 ]);
251
252 let mut conn = pool.get_conn().await.unwrap();
253 conn.exec_drop("SET time_zone = \"+08:00\"", ())
254 .await
255 .unwrap();
256
257 let mut result_set = conn.query_iter("SELECT * FROM `t1m`").await.unwrap();
258 let s = result_set.stream::<MySqlRow>().await.unwrap().unwrap();
259 let row_stream = s.map(|row| {
260 let mut mysql_row = row.unwrap();
262 Ok::<_, anyhow::Error>(Some(mysql_row_to_owned_row(&mut mysql_row, &t1schema)))
263 });
264 pin_mut!(row_stream);
265 while let Some(row) = row_stream.next().await {
266 if let Ok(ro) = row
267 && ro.is_some()
268 {
269 let owned_row = ro.unwrap();
270 let d = owned_row.datum_at(2);
271 if let Some(scalar) = d {
272 let v = scalar.into_timestamptz();
273 println!("timestamp: {:?}", v);
274 }
275 }
276 }
277 }
278}