1use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde_derive::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::CdcTableSnapshotSplit;
42use crate::source::cdc::external::{
43 CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
44 ExternalTableConfig, ExternalTableReader, SchemaTableName, SslMode, mysql_row_to_owned_row,
45};
46
47#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
48pub struct MySqlOffset {
49 pub filename: String,
50 pub position: u64,
51}
52
53impl MySqlOffset {
54 pub fn new(filename: String, position: u64) -> Self {
55 Self { filename, position }
56 }
57}
58
59impl MySqlOffset {
60 pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
61 let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
62 .with_context(|| format!("invalid upstream offset: {}", offset))?;
63
64 Ok(Self {
65 filename: dbz_offset
66 .source_offset
67 .file
68 .context("binlog file not found in offset")?,
69 position: dbz_offset
70 .source_offset
71 .pos
72 .context("binlog position not found in offset")?,
73 })
74 }
75}
76
77pub struct MySqlExternalTable {
78 column_descs: Vec<ColumnDesc>,
79 pk_names: Vec<String>,
80}
81
82impl MySqlExternalTable {
83 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
84 tracing::debug!("connect to mysql");
85 let options = MySqlConnectOptions::new()
86 .username(&config.username)
87 .password(&config.password)
88 .host(&config.host)
89 .port(config.port.parse::<u16>().unwrap())
90 .database(&config.database)
91 .ssl_mode(match config.ssl_mode {
92 SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled,
93 SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
94 _ => {
95 return Err(anyhow!("unsupported SSL mode").into());
96 }
97 });
98
99 let connection = MySqlPool::connect_with(options).await?;
100 let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
101
102 let system_info = schema_discovery.discover_system().await?;
104 schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
105
106 let schema = Alias::new(config.database.as_str()).into_iden();
107 let table = Alias::new(config.table.as_str()).into_iden();
108 let columns = schema_discovery
109 .discover_columns(schema, table, &system_info)
110 .await?;
111
112 let mut column_descs = vec![];
113 let mut pk_names = vec![];
114 for col in columns {
115 let data_type = mysql_type_to_rw_type(&col.col_type)?;
116 let col_name = col.name.to_lowercase();
118 let column_desc = if let Some(default) = col.default {
119 let snapshot_value = derive_default_value(default.clone(), &data_type)
120 .unwrap_or_else(|e| {
121 tracing::warn!(
122 column = col_name,
123 ?default,
124 %data_type,
125 error = %e.as_report(),
126 "failed to derive column default value, fallback to `NULL`",
127 );
128 None
129 });
130
131 ColumnDesc::named_with_default_value(
132 col_name.clone(),
133 ColumnId::placeholder(),
134 data_type.clone(),
135 snapshot_value,
136 )
137 } else {
138 ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
139 };
140
141 column_descs.push(column_desc);
142 if matches!(col.key, ColumnKey::Primary) {
143 pk_names.push(col_name);
144 }
145 }
146
147 if pk_names.is_empty() {
148 return Err(anyhow!("MySQL table doesn't define the primary key").into());
149 }
150
151 Ok(Self {
152 column_descs,
153 pk_names,
154 })
155 }
156
157 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
158 &self.column_descs
159 }
160
161 pub fn pk_names(&self) -> &Vec<String> {
162 &self.pk_names
163 }
164}
165
166fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
167 let datum = match default {
168 ColumnDefault::Null => None,
169 ColumnDefault::Int(val) => match data_type {
170 DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
171 DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
172 DataType::Int64 => Some(ScalarImpl::Int64(val)),
173 DataType::Varchar => {
174 Some(ScalarImpl::from(val.to_string()))
176 }
177 _ => bail!("unexpected default value type for integer"),
178 },
179 ColumnDefault::Real(val) => match data_type {
180 DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
181 DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
182 DataType::Decimal => Some(ScalarImpl::Decimal(
183 Decimal::try_from(val).context("failed to convert default value to decimal")?,
184 )),
185 _ => bail!("unexpected default value type for real"),
186 },
187 ColumnDefault::String(mut val) => {
188 if data_type == &DataType::Timestamptz {
191 val = timestamp_val_to_timestamptz(val.as_str())?;
192 }
193 Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
194 "failed to parse mysql default value expression, only constant is supported",
195 )?)
196 }
197 ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
198 bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
199 }
200 };
201 Ok(datum)
202}
203
204pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
205 let format = "%Y-%m-%d %H:%M:%S";
206 let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
207 .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
208 let postgres_timestamptz: DateTime<chrono::Utc> =
209 DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
210 Ok(postgres_timestamptz
211 .format("%Y-%m-%d %H:%M:%S%:z")
212 .to_string())
213}
214
215pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
216 macro_rules! column_type {
217 ($($name:literal => $variant:ident),* $(,)?) => {
218 match ty_name.to_lowercase().as_str() {
219 $(
220 $name => Some(ColumnType::$variant(Default::default())),
221 )*
222 "json" => Some(ColumnType::Json),
223 "date" => Some(ColumnType::Date),
224 "bool" => Some(ColumnType::Bool),
225 "tinyblob" => Some(ColumnType::TinyBlob),
226 "mediumblob" => Some(ColumnType::MediumBlob),
227 "longblob" => Some(ColumnType::LongBlob),
228 _ => None,
229 }
230 };
231 }
232
233 column_type! {
234 "bit" => Bit,
235 "tinyint" => TinyInt,
236 "smallint" => SmallInt,
237 "mediumint" => MediumInt,
238 "int" => Int,
239 "bigint" => BigInt,
240 "decimal" => Decimal,
241 "float" => Float,
242 "double" => Double,
243 "time" => Time,
244 "datetime" => DateTime,
245 "timestamp" => Timestamp,
246 "char" => Char,
247 "nchar" => NChar,
248 "varchar" => Varchar,
249 "nvarchar" => NVarchar,
250 "binary" => Binary,
251 "varbinary" => Varbinary,
252 "text" => Text,
253 "tinytext" => TinyText,
254 "mediumtext" => MediumText,
255 "longtext" => LongText,
256 "blob" => Blob,
257 "enum" => Enum,
258 "set" => Set,
259 "geometry" => Geometry,
260 "point" => Point,
261 "linestring" => LineString,
262 "polygon" => Polygon,
263 "multipoint" => MultiPoint,
264 "multilinestring" => MultiLineString,
265 "multipolygon" => MultiPolygon,
266 "geometrycollection" => GeometryCollection,
267 }
268}
269
270pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
271 let dtype = match col_type {
272 ColumnType::Serial => DataType::Int32,
273 ColumnType::Bit(attr) => {
274 if let Some(1) = attr.maximum {
275 DataType::Boolean
276 } else {
277 return Err(
278 anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
279 );
280 }
281 }
282 ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
283 ColumnType::Bool => DataType::Boolean,
284 ColumnType::MediumInt(_) => DataType::Int32,
285 ColumnType::Int(_) => DataType::Int32,
286 ColumnType::BigInt(_) => DataType::Int64,
287 ColumnType::Decimal(_) => DataType::Decimal,
288 ColumnType::Float(_) => DataType::Float32,
289 ColumnType::Double(_) => DataType::Float64,
290 ColumnType::Date => DataType::Date,
291 ColumnType::Time(_) => DataType::Time,
292 ColumnType::DateTime(_) => DataType::Timestamp,
293 ColumnType::Timestamp(_) => DataType::Timestamptz,
294 ColumnType::Year => DataType::Int32,
295 ColumnType::Char(_)
296 | ColumnType::NChar(_)
297 | ColumnType::Varchar(_)
298 | ColumnType::NVarchar(_) => DataType::Varchar,
299 ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
300 ColumnType::Text(_)
301 | ColumnType::TinyText(_)
302 | ColumnType::MediumText(_)
303 | ColumnType::LongText(_) => DataType::Varchar,
304 ColumnType::Blob(_)
305 | ColumnType::TinyBlob
306 | ColumnType::MediumBlob
307 | ColumnType::LongBlob => DataType::Bytea,
308 ColumnType::Enum(_) => DataType::Varchar,
309 ColumnType::Json => DataType::Jsonb,
310 ColumnType::Set(_) => {
311 return Err(anyhow!("SET type not supported").into());
312 }
313 ColumnType::Geometry(_) => {
314 return Err(anyhow!("GEOMETRY type not supported").into());
315 }
316 ColumnType::Point(_) => {
317 return Err(anyhow!("POINT type not supported").into());
318 }
319 ColumnType::LineString(_) => {
320 return Err(anyhow!("LINE string type not supported").into());
321 }
322 ColumnType::Polygon(_) => {
323 return Err(anyhow!("POLYGON type not supported").into());
324 }
325 ColumnType::MultiPoint(_) => {
326 return Err(anyhow!("MULTI POINT type not supported").into());
327 }
328 ColumnType::MultiLineString(_) => {
329 return Err(anyhow!("MULTI LINE STRING type not supported").into());
330 }
331 ColumnType::MultiPolygon(_) => {
332 return Err(anyhow!("MULTI POLYGON type not supported").into());
333 }
334 ColumnType::GeometryCollection(_) => {
335 return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
336 }
337 ColumnType::Unknown(_) => {
338 return Err(anyhow!("Unknown MySQL data type").into());
339 }
340 };
341
342 Ok(dtype)
343}
344
345pub struct MySqlExternalTableReader {
346 rw_schema: Schema,
347 field_names: String,
348 pool: mysql_async::Pool,
349}
350
351impl ExternalTableReader for MySqlExternalTableReader {
352 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
353 let mut conn = self.pool.get_conn().await?;
354
355 let sql = "SHOW MASTER STATUS".to_owned();
356 let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
357 let row = rs
358 .iter_mut()
359 .exactly_one()
360 .ok()
361 .context("expect exactly one row when reading binlog offset")?;
362 drop(conn);
363 Ok(CdcOffset::MySql(MySqlOffset {
364 filename: row.take("File").unwrap(),
365 position: row.take("Position").unwrap(),
366 }))
367 }
368
369 fn snapshot_read(
370 &self,
371 table_name: SchemaTableName,
372 start_pk: Option<OwnedRow>,
373 primary_keys: Vec<String>,
374 limit: u32,
375 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
376 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
377 }
378
379 async fn disconnect(self) -> ConnectorResult<()> {
380 self.pool.disconnect().await.map_err(|e| e.into())
381 }
382
383 fn get_parallel_cdc_splits(
384 &self,
385 _options: CdcTableSnapshotSplitOption,
386 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
387 stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
389 }
390
391 fn split_snapshot_read(
392 &self,
393 _table_name: SchemaTableName,
394 _left: OwnedRow,
395 _right: OwnedRow,
396 _split_columns: Vec<Field>,
397 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
398 todo!("implement MySQL CDC parallelized backfill")
399 }
400}
401
402impl MySqlExternalTableReader {
403 pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
404 let mut opts_builder = mysql_async::OptsBuilder::default()
405 .user(Some(config.username))
406 .pass(Some(config.password))
407 .ip_or_hostname(config.host)
408 .tcp_port(config.port.parse::<u16>().unwrap())
409 .db_name(Some(config.database));
410
411 opts_builder = match config.ssl_mode {
412 SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
413 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
415 let ssl_without_verify = mysql_async::SslOpts::default()
416 .with_danger_accept_invalid_certs(true)
417 .with_danger_skip_domain_validation(true);
418 opts_builder.ssl_opts(Some(ssl_without_verify))
419 }
420 };
421 let pool = mysql_async::Pool::new(opts_builder);
422
423 let field_names = rw_schema
424 .fields
425 .iter()
426 .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
427 .map(|f| Self::quote_column(f.name.as_str()))
428 .join(",");
429
430 Ok(Self {
431 rw_schema,
432 field_names,
433 pool,
434 })
435 }
436
437 pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
438 format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
440 }
441
442 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
443 Box::new(move |offset| {
444 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
445 offset,
446 )?))
447 })
448 }
449
450 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
451 async fn snapshot_read_inner(
452 &self,
453 table_name: SchemaTableName,
454 start_pk_row: Option<OwnedRow>,
455 primary_keys: Vec<String>,
456 limit: u32,
457 ) {
458 let order_key = primary_keys
459 .iter()
460 .map(|col| Self::quote_column(col))
461 .join(",");
462 let sql = if start_pk_row.is_none() {
463 format!(
464 "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
465 self.field_names,
466 Self::get_normalized_table_name(&table_name),
467 order_key,
468 )
469 } else {
470 let filter_expr = Self::filter_expression(&primary_keys);
471 format!(
472 "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
473 self.field_names,
474 Self::get_normalized_table_name(&table_name),
475 filter_expr,
476 order_key,
477 )
478 };
479
480 let mut conn = self.pool.get_conn().await?;
481 conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
483
484 if start_pk_row.is_none() {
485 let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
486 let row_stream = rs_stream.map(|row| {
487 let mut row = row?;
489 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
490 });
491 pin_mut!(row_stream);
492 #[for_await]
493 for row in row_stream {
494 let row = row?;
495 yield row;
496 }
497 } else {
498 let field_map = self
499 .rw_schema
500 .fields
501 .iter()
502 .map(|f| (f.name.as_str(), f.data_type.clone()))
503 .collect::<HashMap<_, _>>();
504
505 let params: Vec<_> = primary_keys
507 .iter()
508 .zip_eq_fast(start_pk_row.unwrap().into_iter())
509 .map(|(pk, datum)| {
510 if let Some(value) = datum {
511 let ty = field_map.get(pk.as_str()).unwrap();
512 let val = match ty {
513 DataType::Boolean => Value::from(value.into_bool()),
514 DataType::Int16 => Value::from(value.into_int16()),
515 DataType::Int32 => Value::from(value.into_int32()),
516 DataType::Int64 => Value::from(value.into_int64()),
517 DataType::Float32 => Value::from(value.into_float32().into_inner()),
518 DataType::Float64 => Value::from(value.into_float64().into_inner()),
519 DataType::Varchar => Value::from(String::from(value.into_utf8())),
520 DataType::Date => Value::from(value.into_date().0),
521 DataType::Time => Value::from(value.into_time().0),
522 DataType::Timestamp => Value::from(value.into_timestamp().0),
523 _ => bail!("unsupported primary key data type: {}", ty),
524 };
525 ConnectorResult::Ok((pk.to_lowercase(), val))
526 } else {
527 bail!("primary key {} cannot be null", pk);
528 }
529 })
530 .try_collect::<_, _, ConnectorError>()?;
531
532 tracing::debug!("snapshot read params: {:?}", ¶ms);
533 let rs_stream = sql
534 .with(Params::from(params))
535 .stream::<mysql_async::Row, _>(&mut conn)
536 .await?;
537
538 let row_stream = rs_stream.map(|row| {
539 let mut row = row?;
541 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
542 });
543 pin_mut!(row_stream);
544 #[for_await]
545 for row in row_stream {
546 let row = row?;
547 yield row;
548 }
549 };
550 drop(conn);
551 }
552
553 fn filter_expression(columns: &[String]) -> String {
557 let mut conditions = vec![];
558 conditions.push(format!(
560 "({} > :{})",
561 Self::quote_column(&columns[0]),
562 columns[0].to_lowercase()
563 ));
564 for i in 2..=columns.len() {
565 let mut condition = String::new();
567 for (j, col) in columns.iter().enumerate().take(i - 1) {
568 if j == 0 {
569 condition.push_str(&format!(
570 "({} = :{})",
571 Self::quote_column(col),
572 col.to_lowercase()
573 ));
574 } else {
575 condition.push_str(&format!(
576 " AND ({} = :{})",
577 Self::quote_column(col),
578 col.to_lowercase()
579 ));
580 }
581 }
582 condition.push_str(&format!(
584 " AND ({} > :{})",
585 Self::quote_column(&columns[i - 1]),
586 columns[i - 1].to_lowercase()
587 ));
588 conditions.push(format!("({})", condition));
589 }
590 if columns.len() > 1 {
591 conditions.join(" OR ")
592 } else {
593 conditions.join("")
594 }
595 }
596
597 fn quote_column(column: &str) -> String {
598 format!("`{}`", column)
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use std::collections::HashMap;
605
606 use futures::pin_mut;
607 use futures_async_stream::for_await;
608 use maplit::{convert_args, hashmap};
609 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
610 use risingwave_common::types::DataType;
611
612 use crate::source::cdc::external::mysql::MySqlExternalTable;
613 use crate::source::cdc::external::{
614 CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
615 SchemaTableName,
616 };
617
618 #[ignore]
619 #[tokio::test]
620 async fn test_mysql_schema() {
621 let config = ExternalTableConfig {
622 connector: "mysql-cdc".to_owned(),
623 host: "localhost".to_owned(),
624 port: "8306".to_owned(),
625 username: "root".to_owned(),
626 password: "123456".to_owned(),
627 database: "mydb".to_owned(),
628 schema: "".to_owned(),
629 table: "part".to_owned(),
630 ssl_mode: Default::default(),
631 ssl_root_cert: None,
632 encrypt: "false".to_owned(),
633 };
634
635 let table = MySqlExternalTable::connect(config).await.unwrap();
636 println!("columns: {:?}", &table.column_descs);
637 println!("primary keys: {:?}", &table.pk_names);
638 }
639
640 #[test]
641 fn test_mysql_filter_expr() {
642 let cols = vec!["id".to_owned()];
643 let expr = MySqlExternalTableReader::filter_expression(&cols);
644 assert_eq!(expr, "(`id` > :id)");
645
646 let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
647 let expr = MySqlExternalTableReader::filter_expression(&cols);
648 assert_eq!(
649 expr,
650 "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
651 );
652 }
653
654 #[test]
655 fn test_mysql_binlog_offset() {
656 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
657 let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
658 let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
659 let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
660 let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
661
662 let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
663 let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
664 let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
665 let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
666 let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
667
668 assert!(off0 <= off1);
669 assert!(off1 > off2);
670 assert!(off2 < off3);
671 assert_eq!(off3, off4);
672 }
673
674 #[ignore]
676 #[tokio::test]
677 async fn test_mysql_table_reader() {
678 let columns = vec![
679 ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
680 ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
681 ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
682 ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
683 ];
684 let rw_schema = Schema {
685 fields: columns.iter().map(Field::from).collect(),
686 };
687 let props: HashMap<String, String> = convert_args!(hashmap!(
688 "hostname" => "localhost",
689 "port" => "8306",
690 "username" => "root",
691 "password" => "123456",
692 "database.name" => "mytest",
693 "table.name" => "t1"));
694
695 let config =
696 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
697 .unwrap();
698 let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap();
699 let offset = reader.current_cdc_offset().await.unwrap();
700 println!("BinlogOffset: {:?}", offset);
701
702 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
703 let parser = MySqlExternalTableReader::get_cdc_offset_parser();
704 println!("parsed offset: {:?}", parser(off0_str).unwrap());
705 let table_name = SchemaTableName {
706 schema_name: "mytest".to_owned(),
707 table_name: "t1".to_owned(),
708 };
709
710 let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
711 pin_mut!(stream);
712 #[for_await]
713 for row in stream {
714 println!("OwnedRow: {:?}", row);
715 }
716 }
717}