risingwave_connector/source/cdc/external/
mysql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde_derive::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::cdc::external::{
42    CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
43    SchemaTableName, SslMode, mysql_row_to_owned_row,
44};
45
46#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
47pub struct MySqlOffset {
48    pub filename: String,
49    pub position: u64,
50}
51
52impl MySqlOffset {
53    pub fn new(filename: String, position: u64) -> Self {
54        Self { filename, position }
55    }
56}
57
58impl MySqlOffset {
59    pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
60        let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
61            .with_context(|| format!("invalid upstream offset: {}", offset))?;
62
63        Ok(Self {
64            filename: dbz_offset
65                .source_offset
66                .file
67                .context("binlog file not found in offset")?,
68            position: dbz_offset
69                .source_offset
70                .pos
71                .context("binlog position not found in offset")?,
72        })
73    }
74}
75
76pub struct MySqlExternalTable {
77    column_descs: Vec<ColumnDesc>,
78    pk_names: Vec<String>,
79}
80
81impl MySqlExternalTable {
82    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
83        tracing::debug!("connect to mysql");
84        let options = MySqlConnectOptions::new()
85            .username(&config.username)
86            .password(&config.password)
87            .host(&config.host)
88            .port(config.port.parse::<u16>().unwrap())
89            .database(&config.database)
90            .ssl_mode(match config.ssl_mode {
91                SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled,
92                SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
93                _ => {
94                    return Err(anyhow!("unsupported SSL mode").into());
95                }
96            });
97
98        let connection = MySqlPool::connect_with(options).await?;
99        let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
100
101        // discover system version first
102        let system_info = schema_discovery.discover_system().await?;
103        schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
104
105        let schema = Alias::new(config.database.as_str()).into_iden();
106        let table = Alias::new(config.table.as_str()).into_iden();
107        let columns = schema_discovery
108            .discover_columns(schema, table, &system_info)
109            .await?;
110
111        let mut column_descs = vec![];
112        let mut pk_names = vec![];
113        for col in columns {
114            let data_type = mysql_type_to_rw_type(&col.col_type)?;
115            // column name in mysql is case-insensitive, convert to lowercase
116            let col_name = col.name.to_lowercase();
117            let column_desc = if let Some(default) = col.default {
118                let snapshot_value = match default {
119                    ColumnDefault::Null => None,
120                    ColumnDefault::Int(val) => match data_type {
121                        DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
122                        DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
123                        DataType::Int64 => Some(ScalarImpl::Int64(val)),
124                        DataType::Varchar => {
125                            // should be the Enum type which is mapped to Varchar
126                            Some(ScalarImpl::from(val.to_string()))
127                        }
128                        _ => {
129                            tracing::error!(
130                                column = col_name,
131                                ?data_type,
132                                default_val = val,
133                                "unexpected default value type for column, set default to null"
134                            );
135                            None
136                        }
137                    },
138                    ColumnDefault::Real(val) => match data_type {
139                        DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
140                        DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
141                        DataType::Decimal => Some(ScalarImpl::Decimal(
142                            Decimal::try_from(val).map_err(|err| {
143                                anyhow!("failed to convert default value to decimal").context(err)
144                            })?,
145                        )),
146                        _ => {
147                            tracing::error!(
148                                column = col_name,
149                                ?data_type,
150                                default_val = val,
151                                "unexpected default value type for column, set default to null"
152                            );
153                            None
154                        }
155                    },
156                    ColumnDefault::String(mut val) => {
157                        // mysql timestamp is mapped to timestamptz, we use UTC timezone to
158                        // interpret its value
159                        if data_type == DataType::Timestamptz {
160                            val = timestamp_val_to_timestamptz(val.as_str())?;
161                        }
162                        match ScalarImpl::from_text(val.as_str(), &data_type) {
163                            Ok(scalar) => Some(scalar),
164                            Err(err) => {
165                                tracing::warn!(error=%err.as_report(), "failed to parse mysql default value expression, only constant is supported");
166                                None
167                            }
168                        }
169                    }
170                    ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
171                        tracing::warn!(
172                            "MySQL CURRENT_TIMESTAMP and custom expression default value not supported"
173                        );
174                        None
175                    }
176                };
177
178                ColumnDesc::named_with_default_value(
179                    col_name.clone(),
180                    ColumnId::placeholder(),
181                    data_type.clone(),
182                    snapshot_value,
183                )
184            } else {
185                ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
186            };
187
188            column_descs.push(column_desc);
189            if matches!(col.key, ColumnKey::Primary) {
190                pk_names.push(col_name);
191            }
192        }
193
194        if pk_names.is_empty() {
195            return Err(anyhow!("MySQL table doesn't define the primary key").into());
196        }
197
198        Ok(Self {
199            column_descs,
200            pk_names,
201        })
202    }
203
204    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
205        &self.column_descs
206    }
207
208    pub fn pk_names(&self) -> &Vec<String> {
209        &self.pk_names
210    }
211}
212
213pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
214    let format = "%Y-%m-%d %H:%M:%S";
215    let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
216        .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
217    let postgres_timestamptz: DateTime<chrono::Utc> =
218        DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
219    Ok(postgres_timestamptz
220        .format("%Y-%m-%d %H:%M:%S%:z")
221        .to_string())
222}
223
224pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
225    macro_rules! column_type {
226        ($($name:literal => $variant:ident),* $(,)?) => {
227            match ty_name.to_lowercase().as_str() {
228                $(
229                    $name => Some(ColumnType::$variant(Default::default())),
230                )*
231                "json" => Some(ColumnType::Json),
232                "date" => Some(ColumnType::Date),
233                "bool" => Some(ColumnType::Bool),
234                "tinyblob" => Some(ColumnType::TinyBlob),
235                "mediumblob" => Some(ColumnType::MediumBlob),
236                "longblob" => Some(ColumnType::LongBlob),
237                _ => None,
238            }
239        };
240    }
241
242    column_type! {
243        "bit" => Bit,
244        "tinyint" => TinyInt,
245        "smallint" => SmallInt,
246        "mediumint" => MediumInt,
247        "int" => Int,
248        "bigint" => BigInt,
249        "decimal" => Decimal,
250        "float" => Float,
251        "double" => Double,
252        "time" => Time,
253        "datetime" => DateTime,
254        "timestamp" => Timestamp,
255        "char" => Char,
256        "nchar" => NChar,
257        "varchar" => Varchar,
258        "nvarchar" => NVarchar,
259        "binary" => Binary,
260        "varbinary" => Varbinary,
261        "text" => Text,
262        "tinytext" => TinyText,
263        "mediumtext" => MediumText,
264        "longtext" => LongText,
265        "blob" => Blob,
266        "enum" => Enum,
267        "set" => Set,
268        "geometry" => Geometry,
269        "point" => Point,
270        "linestring" => LineString,
271        "polygon" => Polygon,
272        "multipoint" => MultiPoint,
273        "multilinestring" => MultiLineString,
274        "multipolygon" => MultiPolygon,
275        "geometrycollection" => GeometryCollection,
276    }
277}
278
279pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
280    let dtype = match col_type {
281        ColumnType::Serial => DataType::Int32,
282        ColumnType::Bit(attr) => {
283            if let Some(1) = attr.maximum {
284                DataType::Boolean
285            } else {
286                return Err(
287                    anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
288                );
289            }
290        }
291        ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
292        ColumnType::Bool => DataType::Boolean,
293        ColumnType::MediumInt(_) => DataType::Int32,
294        ColumnType::Int(_) => DataType::Int32,
295        ColumnType::BigInt(_) => DataType::Int64,
296        ColumnType::Decimal(_) => DataType::Decimal,
297        ColumnType::Float(_) => DataType::Float32,
298        ColumnType::Double(_) => DataType::Float64,
299        ColumnType::Date => DataType::Date,
300        ColumnType::Time(_) => DataType::Time,
301        ColumnType::DateTime(_) => DataType::Timestamp,
302        ColumnType::Timestamp(_) => DataType::Timestamptz,
303        ColumnType::Year => DataType::Int32,
304        ColumnType::Char(_)
305        | ColumnType::NChar(_)
306        | ColumnType::Varchar(_)
307        | ColumnType::NVarchar(_) => DataType::Varchar,
308        ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
309        ColumnType::Text(_)
310        | ColumnType::TinyText(_)
311        | ColumnType::MediumText(_)
312        | ColumnType::LongText(_) => DataType::Varchar,
313        ColumnType::Blob(_)
314        | ColumnType::TinyBlob
315        | ColumnType::MediumBlob
316        | ColumnType::LongBlob => DataType::Bytea,
317        ColumnType::Enum(_) => DataType::Varchar,
318        ColumnType::Json => DataType::Jsonb,
319        ColumnType::Set(_) => {
320            return Err(anyhow!("SET type not supported").into());
321        }
322        ColumnType::Geometry(_) => {
323            return Err(anyhow!("GEOMETRY type not supported").into());
324        }
325        ColumnType::Point(_) => {
326            return Err(anyhow!("POINT type not supported").into());
327        }
328        ColumnType::LineString(_) => {
329            return Err(anyhow!("LINE string type not supported").into());
330        }
331        ColumnType::Polygon(_) => {
332            return Err(anyhow!("POLYGON type not supported").into());
333        }
334        ColumnType::MultiPoint(_) => {
335            return Err(anyhow!("MULTI POINT type not supported").into());
336        }
337        ColumnType::MultiLineString(_) => {
338            return Err(anyhow!("MULTI LINE STRING type not supported").into());
339        }
340        ColumnType::MultiPolygon(_) => {
341            return Err(anyhow!("MULTI POLYGON type not supported").into());
342        }
343        ColumnType::GeometryCollection(_) => {
344            return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
345        }
346        ColumnType::Unknown(_) => {
347            return Err(anyhow!("Unknown MySQL data type").into());
348        }
349    };
350
351    Ok(dtype)
352}
353
354pub struct MySqlExternalTableReader {
355    rw_schema: Schema,
356    field_names: String,
357    // use mutex to provide shared mutable access to the connection
358    conn: tokio::sync::Mutex<mysql_async::Conn>,
359}
360
361impl ExternalTableReader for MySqlExternalTableReader {
362    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
363        let mut conn = self.conn.lock().await;
364
365        let sql = "SHOW MASTER STATUS".to_owned();
366        let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
367        let row = rs
368            .iter_mut()
369            .exactly_one()
370            .ok()
371            .context("expect exactly one row when reading binlog offset")?;
372
373        Ok(CdcOffset::MySql(MySqlOffset {
374            filename: row.take("File").unwrap(),
375            position: row.take("Position").unwrap(),
376        }))
377    }
378
379    fn snapshot_read(
380        &self,
381        table_name: SchemaTableName,
382        start_pk: Option<OwnedRow>,
383        primary_keys: Vec<String>,
384        limit: u32,
385    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
386        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
387    }
388}
389
390impl MySqlExternalTableReader {
391    pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
392        let mut opts_builder = mysql_async::OptsBuilder::default()
393            .user(Some(config.username))
394            .pass(Some(config.password))
395            .ip_or_hostname(config.host)
396            .tcp_port(config.port.parse::<u16>().unwrap())
397            .db_name(Some(config.database));
398
399        opts_builder = match config.ssl_mode {
400            SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
401            // verify-ca and verify-full are same as required for mysql now
402            SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
403                let ssl_without_verify = mysql_async::SslOpts::default()
404                    .with_danger_accept_invalid_certs(true)
405                    .with_danger_skip_domain_validation(true);
406                opts_builder.ssl_opts(Some(ssl_without_verify))
407            }
408        };
409
410        let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?;
411
412        let field_names = rw_schema
413            .fields
414            .iter()
415            .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
416            .map(|f| Self::quote_column(f.name.as_str()))
417            .join(",");
418
419        Ok(Self {
420            rw_schema,
421            field_names,
422            conn: tokio::sync::Mutex::new(conn),
423        })
424    }
425
426    pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
427        // schema name is the database name in mysql
428        format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
429    }
430
431    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
432        Box::new(move |offset| {
433            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
434                offset,
435            )?))
436        })
437    }
438
439    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
440    async fn snapshot_read_inner(
441        &self,
442        table_name: SchemaTableName,
443        start_pk_row: Option<OwnedRow>,
444        primary_keys: Vec<String>,
445        limit: u32,
446    ) {
447        let order_key = primary_keys
448            .iter()
449            .map(|col| Self::quote_column(col))
450            .join(",");
451        let sql = if start_pk_row.is_none() {
452            format!(
453                "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
454                self.field_names,
455                Self::get_normalized_table_name(&table_name),
456                order_key,
457            )
458        } else {
459            let filter_expr = Self::filter_expression(&primary_keys);
460            format!(
461                "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
462                self.field_names,
463                Self::get_normalized_table_name(&table_name),
464                filter_expr,
465                order_key,
466            )
467        };
468
469        let mut conn = self.conn.lock().await;
470
471        // Set session timezone to UTC
472        conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
473
474        if start_pk_row.is_none() {
475            let rs_stream = sql.stream::<mysql_async::Row, _>(&mut *conn).await?;
476            let row_stream = rs_stream.map(|row| {
477                // convert mysql row into OwnedRow
478                let mut row = row?;
479                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
480            });
481
482            pin_mut!(row_stream);
483            #[for_await]
484            for row in row_stream {
485                let row = row?;
486                yield row;
487            }
488        } else {
489            let field_map = self
490                .rw_schema
491                .fields
492                .iter()
493                .map(|f| (f.name.as_str(), f.data_type.clone()))
494                .collect::<HashMap<_, _>>();
495
496            // fill in start primary key params
497            let params: Vec<_> = primary_keys
498                .iter()
499                .zip_eq_fast(start_pk_row.unwrap().into_iter())
500                .map(|(pk, datum)| {
501                    if let Some(value) = datum {
502                        let ty = field_map.get(pk.as_str()).unwrap();
503                        let val = match ty {
504                            DataType::Boolean => Value::from(value.into_bool()),
505                            DataType::Int16 => Value::from(value.into_int16()),
506                            DataType::Int32 => Value::from(value.into_int32()),
507                            DataType::Int64 => Value::from(value.into_int64()),
508                            DataType::Float32 => Value::from(value.into_float32().into_inner()),
509                            DataType::Float64 => Value::from(value.into_float64().into_inner()),
510                            DataType::Varchar => Value::from(String::from(value.into_utf8())),
511                            DataType::Date => Value::from(value.into_date().0),
512                            DataType::Time => Value::from(value.into_time().0),
513                            DataType::Timestamp => Value::from(value.into_timestamp().0),
514                            _ => bail!("unsupported primary key data type: {}", ty),
515                        };
516                        ConnectorResult::Ok((pk.to_lowercase(), val))
517                    } else {
518                        bail!("primary key {} cannot be null", pk);
519                    }
520                })
521                .try_collect::<_, _, ConnectorError>()?;
522
523            tracing::debug!("snapshot read params: {:?}", &params);
524            let rs_stream = sql
525                .with(Params::from(params))
526                .stream::<mysql_async::Row, _>(&mut *conn)
527                .await?;
528
529            let row_stream = rs_stream.map(|row| {
530                // convert mysql row into OwnedRow
531                let mut row = row?;
532                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
533            });
534
535            pin_mut!(row_stream);
536            #[for_await]
537            for row in row_stream {
538                let row = row?;
539                yield row;
540            }
541        };
542    }
543
544    // mysql cannot leverage the given key to narrow down the range of scan,
545    // we need to rewrite the comparison conditions by our own.
546    // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y))
547    fn filter_expression(columns: &[String]) -> String {
548        let mut conditions = vec![];
549        // push the first condition
550        conditions.push(format!(
551            "({} > :{})",
552            Self::quote_column(&columns[0]),
553            columns[0].to_lowercase()
554        ));
555        for i in 2..=columns.len() {
556            // '=' condition
557            let mut condition = String::new();
558            for (j, col) in columns.iter().enumerate().take(i - 1) {
559                if j == 0 {
560                    condition.push_str(&format!(
561                        "({} = :{})",
562                        Self::quote_column(col),
563                        col.to_lowercase()
564                    ));
565                } else {
566                    condition.push_str(&format!(
567                        " AND ({} = :{})",
568                        Self::quote_column(col),
569                        col.to_lowercase()
570                    ));
571                }
572            }
573            // '>' condition
574            condition.push_str(&format!(
575                " AND ({} > :{})",
576                Self::quote_column(&columns[i - 1]),
577                columns[i - 1].to_lowercase()
578            ));
579            conditions.push(format!("({})", condition));
580        }
581        if columns.len() > 1 {
582            conditions.join(" OR ")
583        } else {
584            conditions.join("")
585        }
586    }
587
588    fn quote_column(column: &str) -> String {
589        format!("`{}`", column)
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use std::collections::HashMap;
596
597    use futures::pin_mut;
598    use futures_async_stream::for_await;
599    use maplit::{convert_args, hashmap};
600    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
601    use risingwave_common::types::DataType;
602
603    use crate::source::cdc::external::mysql::MySqlExternalTable;
604    use crate::source::cdc::external::{
605        CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
606        SchemaTableName,
607    };
608
609    #[ignore]
610    #[tokio::test]
611    async fn test_mysql_schema() {
612        let config = ExternalTableConfig {
613            connector: "mysql-cdc".to_owned(),
614            host: "localhost".to_owned(),
615            port: "8306".to_owned(),
616            username: "root".to_owned(),
617            password: "123456".to_owned(),
618            database: "mydb".to_owned(),
619            schema: "".to_owned(),
620            table: "part".to_owned(),
621            ssl_mode: Default::default(),
622            ssl_root_cert: None,
623            encrypt: "false".to_owned(),
624        };
625
626        let table = MySqlExternalTable::connect(config).await.unwrap();
627        println!("columns: {:?}", &table.column_descs);
628        println!("primary keys: {:?}", &table.pk_names);
629    }
630
631    #[test]
632    fn test_mysql_filter_expr() {
633        let cols = vec!["id".to_owned()];
634        let expr = MySqlExternalTableReader::filter_expression(&cols);
635        assert_eq!(expr, "(`id` > :id)");
636
637        let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
638        let expr = MySqlExternalTableReader::filter_expression(&cols);
639        assert_eq!(
640            expr,
641            "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
642        );
643    }
644
645    #[test]
646    fn test_mysql_binlog_offset() {
647        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
648        let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
649        let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
650        let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
651        let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
652
653        let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
654        let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
655        let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
656        let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
657        let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
658
659        assert!(off0 <= off1);
660        assert!(off1 > off2);
661        assert!(off2 < off3);
662        assert_eq!(off3, off4);
663    }
664
665    // manual test case
666    #[ignore]
667    #[tokio::test]
668    async fn test_mysql_table_reader() {
669        let columns = vec![
670            ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
671            ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
672            ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
673            ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
674        ];
675        let rw_schema = Schema {
676            fields: columns.iter().map(Field::from).collect(),
677        };
678        let props: HashMap<String, String> = convert_args!(hashmap!(
679                "hostname" => "localhost",
680                "port" => "8306",
681                "username" => "root",
682                "password" => "123456",
683                "database.name" => "mytest",
684                "table.name" => "t1"));
685
686        let config =
687            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
688                .unwrap();
689        let reader = MySqlExternalTableReader::new(config, rw_schema)
690            .await
691            .unwrap();
692        let offset = reader.current_cdc_offset().await.unwrap();
693        println!("BinlogOffset: {:?}", offset);
694
695        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
696        let parser = MySqlExternalTableReader::get_cdc_offset_parser();
697        println!("parsed offset: {:?}", parser(off0_str).unwrap());
698        let table_name = SchemaTableName {
699            schema_name: "mytest".to_owned(),
700            table_name: "t1".to_owned(),
701        };
702
703        let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
704        pin_mut!(stream);
705        #[for_await]
706        for row in stream {
707            println!("OwnedRow: {:?}", row);
708        }
709    }
710}