1use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde_derive::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::cdc::external::{
42 CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
43 SchemaTableName, SslMode, mysql_row_to_owned_row,
44};
45
46#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
47pub struct MySqlOffset {
48 pub filename: String,
49 pub position: u64,
50}
51
52impl MySqlOffset {
53 pub fn new(filename: String, position: u64) -> Self {
54 Self { filename, position }
55 }
56}
57
58impl MySqlOffset {
59 pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
60 let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
61 .with_context(|| format!("invalid upstream offset: {}", offset))?;
62
63 Ok(Self {
64 filename: dbz_offset
65 .source_offset
66 .file
67 .context("binlog file not found in offset")?,
68 position: dbz_offset
69 .source_offset
70 .pos
71 .context("binlog position not found in offset")?,
72 })
73 }
74}
75
76pub struct MySqlExternalTable {
77 column_descs: Vec<ColumnDesc>,
78 pk_names: Vec<String>,
79}
80
81impl MySqlExternalTable {
82 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
83 tracing::debug!("connect to mysql");
84 let options = MySqlConnectOptions::new()
85 .username(&config.username)
86 .password(&config.password)
87 .host(&config.host)
88 .port(config.port.parse::<u16>().unwrap())
89 .database(&config.database)
90 .ssl_mode(match config.ssl_mode {
91 SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled,
92 SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
93 _ => {
94 return Err(anyhow!("unsupported SSL mode").into());
95 }
96 });
97
98 let connection = MySqlPool::connect_with(options).await?;
99 let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
100
101 let system_info = schema_discovery.discover_system().await?;
103 schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
104
105 let schema = Alias::new(config.database.as_str()).into_iden();
106 let table = Alias::new(config.table.as_str()).into_iden();
107 let columns = schema_discovery
108 .discover_columns(schema, table, &system_info)
109 .await?;
110
111 let mut column_descs = vec![];
112 let mut pk_names = vec![];
113 for col in columns {
114 let data_type = mysql_type_to_rw_type(&col.col_type)?;
115 let col_name = col.name.to_lowercase();
117 let column_desc = if let Some(default) = col.default {
118 let snapshot_value = match default {
119 ColumnDefault::Null => None,
120 ColumnDefault::Int(val) => match data_type {
121 DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
122 DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
123 DataType::Int64 => Some(ScalarImpl::Int64(val)),
124 DataType::Varchar => {
125 Some(ScalarImpl::from(val.to_string()))
127 }
128 _ => {
129 tracing::error!(
130 column = col_name,
131 ?data_type,
132 default_val = val,
133 "unexpected default value type for column, set default to null"
134 );
135 None
136 }
137 },
138 ColumnDefault::Real(val) => match data_type {
139 DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
140 DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
141 DataType::Decimal => Some(ScalarImpl::Decimal(
142 Decimal::try_from(val).map_err(|err| {
143 anyhow!("failed to convert default value to decimal").context(err)
144 })?,
145 )),
146 _ => {
147 tracing::error!(
148 column = col_name,
149 ?data_type,
150 default_val = val,
151 "unexpected default value type for column, set default to null"
152 );
153 None
154 }
155 },
156 ColumnDefault::String(mut val) => {
157 if data_type == DataType::Timestamptz {
160 val = timestamp_val_to_timestamptz(val.as_str())?;
161 }
162 match ScalarImpl::from_text(val.as_str(), &data_type) {
163 Ok(scalar) => Some(scalar),
164 Err(err) => {
165 tracing::warn!(error=%err.as_report(), "failed to parse mysql default value expression, only constant is supported");
166 None
167 }
168 }
169 }
170 ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
171 tracing::warn!(
172 "MySQL CURRENT_TIMESTAMP and custom expression default value not supported"
173 );
174 None
175 }
176 };
177
178 ColumnDesc::named_with_default_value(
179 col_name.clone(),
180 ColumnId::placeholder(),
181 data_type.clone(),
182 snapshot_value,
183 )
184 } else {
185 ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
186 };
187
188 column_descs.push(column_desc);
189 if matches!(col.key, ColumnKey::Primary) {
190 pk_names.push(col_name);
191 }
192 }
193
194 if pk_names.is_empty() {
195 return Err(anyhow!("MySQL table doesn't define the primary key").into());
196 }
197
198 Ok(Self {
199 column_descs,
200 pk_names,
201 })
202 }
203
204 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
205 &self.column_descs
206 }
207
208 pub fn pk_names(&self) -> &Vec<String> {
209 &self.pk_names
210 }
211}
212
213pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
214 let format = "%Y-%m-%d %H:%M:%S";
215 let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
216 .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
217 let postgres_timestamptz: DateTime<chrono::Utc> =
218 DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
219 Ok(postgres_timestamptz
220 .format("%Y-%m-%d %H:%M:%S%:z")
221 .to_string())
222}
223
224pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
225 macro_rules! column_type {
226 ($($name:literal => $variant:ident),* $(,)?) => {
227 match ty_name.to_lowercase().as_str() {
228 $(
229 $name => Some(ColumnType::$variant(Default::default())),
230 )*
231 "json" => Some(ColumnType::Json),
232 "date" => Some(ColumnType::Date),
233 "bool" => Some(ColumnType::Bool),
234 "tinyblob" => Some(ColumnType::TinyBlob),
235 "mediumblob" => Some(ColumnType::MediumBlob),
236 "longblob" => Some(ColumnType::LongBlob),
237 _ => None,
238 }
239 };
240 }
241
242 column_type! {
243 "bit" => Bit,
244 "tinyint" => TinyInt,
245 "smallint" => SmallInt,
246 "mediumint" => MediumInt,
247 "int" => Int,
248 "bigint" => BigInt,
249 "decimal" => Decimal,
250 "float" => Float,
251 "double" => Double,
252 "time" => Time,
253 "datetime" => DateTime,
254 "timestamp" => Timestamp,
255 "char" => Char,
256 "nchar" => NChar,
257 "varchar" => Varchar,
258 "nvarchar" => NVarchar,
259 "binary" => Binary,
260 "varbinary" => Varbinary,
261 "text" => Text,
262 "tinytext" => TinyText,
263 "mediumtext" => MediumText,
264 "longtext" => LongText,
265 "blob" => Blob,
266 "enum" => Enum,
267 "set" => Set,
268 "geometry" => Geometry,
269 "point" => Point,
270 "linestring" => LineString,
271 "polygon" => Polygon,
272 "multipoint" => MultiPoint,
273 "multilinestring" => MultiLineString,
274 "multipolygon" => MultiPolygon,
275 "geometrycollection" => GeometryCollection,
276 }
277}
278
279pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
280 let dtype = match col_type {
281 ColumnType::Serial => DataType::Int32,
282 ColumnType::Bit(attr) => {
283 if let Some(1) = attr.maximum {
284 DataType::Boolean
285 } else {
286 return Err(
287 anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
288 );
289 }
290 }
291 ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
292 ColumnType::Bool => DataType::Boolean,
293 ColumnType::MediumInt(_) => DataType::Int32,
294 ColumnType::Int(_) => DataType::Int32,
295 ColumnType::BigInt(_) => DataType::Int64,
296 ColumnType::Decimal(_) => DataType::Decimal,
297 ColumnType::Float(_) => DataType::Float32,
298 ColumnType::Double(_) => DataType::Float64,
299 ColumnType::Date => DataType::Date,
300 ColumnType::Time(_) => DataType::Time,
301 ColumnType::DateTime(_) => DataType::Timestamp,
302 ColumnType::Timestamp(_) => DataType::Timestamptz,
303 ColumnType::Year => DataType::Int32,
304 ColumnType::Char(_)
305 | ColumnType::NChar(_)
306 | ColumnType::Varchar(_)
307 | ColumnType::NVarchar(_) => DataType::Varchar,
308 ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
309 ColumnType::Text(_)
310 | ColumnType::TinyText(_)
311 | ColumnType::MediumText(_)
312 | ColumnType::LongText(_) => DataType::Varchar,
313 ColumnType::Blob(_)
314 | ColumnType::TinyBlob
315 | ColumnType::MediumBlob
316 | ColumnType::LongBlob => DataType::Bytea,
317 ColumnType::Enum(_) => DataType::Varchar,
318 ColumnType::Json => DataType::Jsonb,
319 ColumnType::Set(_) => {
320 return Err(anyhow!("SET type not supported").into());
321 }
322 ColumnType::Geometry(_) => {
323 return Err(anyhow!("GEOMETRY type not supported").into());
324 }
325 ColumnType::Point(_) => {
326 return Err(anyhow!("POINT type not supported").into());
327 }
328 ColumnType::LineString(_) => {
329 return Err(anyhow!("LINE string type not supported").into());
330 }
331 ColumnType::Polygon(_) => {
332 return Err(anyhow!("POLYGON type not supported").into());
333 }
334 ColumnType::MultiPoint(_) => {
335 return Err(anyhow!("MULTI POINT type not supported").into());
336 }
337 ColumnType::MultiLineString(_) => {
338 return Err(anyhow!("MULTI LINE STRING type not supported").into());
339 }
340 ColumnType::MultiPolygon(_) => {
341 return Err(anyhow!("MULTI POLYGON type not supported").into());
342 }
343 ColumnType::GeometryCollection(_) => {
344 return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
345 }
346 ColumnType::Unknown(_) => {
347 return Err(anyhow!("Unknown MySQL data type").into());
348 }
349 };
350
351 Ok(dtype)
352}
353
354pub struct MySqlExternalTableReader {
355 rw_schema: Schema,
356 field_names: String,
357 conn: tokio::sync::Mutex<mysql_async::Conn>,
359}
360
361impl ExternalTableReader for MySqlExternalTableReader {
362 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
363 let mut conn = self.conn.lock().await;
364
365 let sql = "SHOW MASTER STATUS".to_owned();
366 let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
367 let row = rs
368 .iter_mut()
369 .exactly_one()
370 .ok()
371 .context("expect exactly one row when reading binlog offset")?;
372
373 Ok(CdcOffset::MySql(MySqlOffset {
374 filename: row.take("File").unwrap(),
375 position: row.take("Position").unwrap(),
376 }))
377 }
378
379 fn snapshot_read(
380 &self,
381 table_name: SchemaTableName,
382 start_pk: Option<OwnedRow>,
383 primary_keys: Vec<String>,
384 limit: u32,
385 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
386 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
387 }
388}
389
390impl MySqlExternalTableReader {
391 pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
392 let mut opts_builder = mysql_async::OptsBuilder::default()
393 .user(Some(config.username))
394 .pass(Some(config.password))
395 .ip_or_hostname(config.host)
396 .tcp_port(config.port.parse::<u16>().unwrap())
397 .db_name(Some(config.database));
398
399 opts_builder = match config.ssl_mode {
400 SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
401 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
403 let ssl_without_verify = mysql_async::SslOpts::default()
404 .with_danger_accept_invalid_certs(true)
405 .with_danger_skip_domain_validation(true);
406 opts_builder.ssl_opts(Some(ssl_without_verify))
407 }
408 };
409
410 let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?;
411
412 let field_names = rw_schema
413 .fields
414 .iter()
415 .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
416 .map(|f| Self::quote_column(f.name.as_str()))
417 .join(",");
418
419 Ok(Self {
420 rw_schema,
421 field_names,
422 conn: tokio::sync::Mutex::new(conn),
423 })
424 }
425
426 pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
427 format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
429 }
430
431 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
432 Box::new(move |offset| {
433 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
434 offset,
435 )?))
436 })
437 }
438
439 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
440 async fn snapshot_read_inner(
441 &self,
442 table_name: SchemaTableName,
443 start_pk_row: Option<OwnedRow>,
444 primary_keys: Vec<String>,
445 limit: u32,
446 ) {
447 let order_key = primary_keys
448 .iter()
449 .map(|col| Self::quote_column(col))
450 .join(",");
451 let sql = if start_pk_row.is_none() {
452 format!(
453 "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
454 self.field_names,
455 Self::get_normalized_table_name(&table_name),
456 order_key,
457 )
458 } else {
459 let filter_expr = Self::filter_expression(&primary_keys);
460 format!(
461 "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
462 self.field_names,
463 Self::get_normalized_table_name(&table_name),
464 filter_expr,
465 order_key,
466 )
467 };
468
469 let mut conn = self.conn.lock().await;
470
471 conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
473
474 if start_pk_row.is_none() {
475 let rs_stream = sql.stream::<mysql_async::Row, _>(&mut *conn).await?;
476 let row_stream = rs_stream.map(|row| {
477 let mut row = row?;
479 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
480 });
481
482 pin_mut!(row_stream);
483 #[for_await]
484 for row in row_stream {
485 let row = row?;
486 yield row;
487 }
488 } else {
489 let field_map = self
490 .rw_schema
491 .fields
492 .iter()
493 .map(|f| (f.name.as_str(), f.data_type.clone()))
494 .collect::<HashMap<_, _>>();
495
496 let params: Vec<_> = primary_keys
498 .iter()
499 .zip_eq_fast(start_pk_row.unwrap().into_iter())
500 .map(|(pk, datum)| {
501 if let Some(value) = datum {
502 let ty = field_map.get(pk.as_str()).unwrap();
503 let val = match ty {
504 DataType::Boolean => Value::from(value.into_bool()),
505 DataType::Int16 => Value::from(value.into_int16()),
506 DataType::Int32 => Value::from(value.into_int32()),
507 DataType::Int64 => Value::from(value.into_int64()),
508 DataType::Float32 => Value::from(value.into_float32().into_inner()),
509 DataType::Float64 => Value::from(value.into_float64().into_inner()),
510 DataType::Varchar => Value::from(String::from(value.into_utf8())),
511 DataType::Date => Value::from(value.into_date().0),
512 DataType::Time => Value::from(value.into_time().0),
513 DataType::Timestamp => Value::from(value.into_timestamp().0),
514 _ => bail!("unsupported primary key data type: {}", ty),
515 };
516 ConnectorResult::Ok((pk.to_lowercase(), val))
517 } else {
518 bail!("primary key {} cannot be null", pk);
519 }
520 })
521 .try_collect::<_, _, ConnectorError>()?;
522
523 tracing::debug!("snapshot read params: {:?}", ¶ms);
524 let rs_stream = sql
525 .with(Params::from(params))
526 .stream::<mysql_async::Row, _>(&mut *conn)
527 .await?;
528
529 let row_stream = rs_stream.map(|row| {
530 let mut row = row?;
532 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
533 });
534
535 pin_mut!(row_stream);
536 #[for_await]
537 for row in row_stream {
538 let row = row?;
539 yield row;
540 }
541 };
542 }
543
544 fn filter_expression(columns: &[String]) -> String {
548 let mut conditions = vec![];
549 conditions.push(format!(
551 "({} > :{})",
552 Self::quote_column(&columns[0]),
553 columns[0].to_lowercase()
554 ));
555 for i in 2..=columns.len() {
556 let mut condition = String::new();
558 for (j, col) in columns.iter().enumerate().take(i - 1) {
559 if j == 0 {
560 condition.push_str(&format!(
561 "({} = :{})",
562 Self::quote_column(col),
563 col.to_lowercase()
564 ));
565 } else {
566 condition.push_str(&format!(
567 " AND ({} = :{})",
568 Self::quote_column(col),
569 col.to_lowercase()
570 ));
571 }
572 }
573 condition.push_str(&format!(
575 " AND ({} > :{})",
576 Self::quote_column(&columns[i - 1]),
577 columns[i - 1].to_lowercase()
578 ));
579 conditions.push(format!("({})", condition));
580 }
581 if columns.len() > 1 {
582 conditions.join(" OR ")
583 } else {
584 conditions.join("")
585 }
586 }
587
588 fn quote_column(column: &str) -> String {
589 format!("`{}`", column)
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use std::collections::HashMap;
596
597 use futures::pin_mut;
598 use futures_async_stream::for_await;
599 use maplit::{convert_args, hashmap};
600 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
601 use risingwave_common::types::DataType;
602
603 use crate::source::cdc::external::mysql::MySqlExternalTable;
604 use crate::source::cdc::external::{
605 CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
606 SchemaTableName,
607 };
608
609 #[ignore]
610 #[tokio::test]
611 async fn test_mysql_schema() {
612 let config = ExternalTableConfig {
613 connector: "mysql-cdc".to_owned(),
614 host: "localhost".to_owned(),
615 port: "8306".to_owned(),
616 username: "root".to_owned(),
617 password: "123456".to_owned(),
618 database: "mydb".to_owned(),
619 schema: "".to_owned(),
620 table: "part".to_owned(),
621 ssl_mode: Default::default(),
622 ssl_root_cert: None,
623 encrypt: "false".to_owned(),
624 };
625
626 let table = MySqlExternalTable::connect(config).await.unwrap();
627 println!("columns: {:?}", &table.column_descs);
628 println!("primary keys: {:?}", &table.pk_names);
629 }
630
631 #[test]
632 fn test_mysql_filter_expr() {
633 let cols = vec!["id".to_owned()];
634 let expr = MySqlExternalTableReader::filter_expression(&cols);
635 assert_eq!(expr, "(`id` > :id)");
636
637 let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
638 let expr = MySqlExternalTableReader::filter_expression(&cols);
639 assert_eq!(
640 expr,
641 "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
642 );
643 }
644
645 #[test]
646 fn test_mysql_binlog_offset() {
647 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
648 let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
649 let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
650 let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
651 let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
652
653 let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
654 let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
655 let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
656 let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
657 let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
658
659 assert!(off0 <= off1);
660 assert!(off1 > off2);
661 assert!(off2 < off3);
662 assert_eq!(off3, off4);
663 }
664
665 #[ignore]
667 #[tokio::test]
668 async fn test_mysql_table_reader() {
669 let columns = vec![
670 ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
671 ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
672 ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
673 ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
674 ];
675 let rw_schema = Schema {
676 fields: columns.iter().map(Field::from).collect(),
677 };
678 let props: HashMap<String, String> = convert_args!(hashmap!(
679 "hostname" => "localhost",
680 "port" => "8306",
681 "username" => "root",
682 "password" => "123456",
683 "database.name" => "mytest",
684 "table.name" => "t1"));
685
686 let config =
687 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
688 .unwrap();
689 let reader = MySqlExternalTableReader::new(config, rw_schema)
690 .await
691 .unwrap();
692 let offset = reader.current_cdc_offset().await.unwrap();
693 println!("BinlogOffset: {:?}", offset);
694
695 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
696 let parser = MySqlExternalTableReader::get_cdc_offset_parser();
697 println!("parsed offset: {:?}", parser(off0_str).unwrap());
698 let table_name = SchemaTableName {
699 schema_name: "mytest".to_owned(),
700 table_name: "t1".to_owned(),
701 };
702
703 let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
704 pin_mut!(stream);
705 #[for_await]
706 for row in stream {
707 println!("OwnedRow: {:?}", row);
708 }
709 }
710}