risingwave_connector/source/cdc/external/
mysql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde_derive::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::CdcTableSnapshotSplit;
42use crate::source::cdc::external::{
43    CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
44    ExternalTableConfig, ExternalTableReader, SchemaTableName, SslMode, mysql_row_to_owned_row,
45};
46
47#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
48pub struct MySqlOffset {
49    pub filename: String,
50    pub position: u64,
51}
52
53impl MySqlOffset {
54    pub fn new(filename: String, position: u64) -> Self {
55        Self { filename, position }
56    }
57}
58
59impl MySqlOffset {
60    pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
61        let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
62            .with_context(|| format!("invalid upstream offset: {}", offset))?;
63
64        Ok(Self {
65            filename: dbz_offset
66                .source_offset
67                .file
68                .context("binlog file not found in offset")?,
69            position: dbz_offset
70                .source_offset
71                .pos
72                .context("binlog position not found in offset")?,
73        })
74    }
75}
76
77pub struct MySqlExternalTable {
78    column_descs: Vec<ColumnDesc>,
79    pk_names: Vec<String>,
80}
81
82impl MySqlExternalTable {
83    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
84        tracing::debug!("connect to mysql");
85        let options = MySqlConnectOptions::new()
86            .username(&config.username)
87            .password(&config.password)
88            .host(&config.host)
89            .port(config.port.parse::<u16>().unwrap())
90            .database(&config.database)
91            .ssl_mode(match config.ssl_mode {
92                SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled,
93                SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
94                _ => {
95                    return Err(anyhow!("unsupported SSL mode").into());
96                }
97            });
98
99        let connection = MySqlPool::connect_with(options).await?;
100        let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
101
102        // discover system version first
103        let system_info = schema_discovery.discover_system().await?;
104        schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
105
106        let schema = Alias::new(config.database.as_str()).into_iden();
107        let table = Alias::new(config.table.as_str()).into_iden();
108        let columns = schema_discovery
109            .discover_columns(schema, table, &system_info)
110            .await?;
111
112        let mut column_descs = vec![];
113        let mut pk_names = vec![];
114        for col in columns {
115            let data_type = mysql_type_to_rw_type(&col.col_type)?;
116            // column name in mysql is case-insensitive, convert to lowercase
117            let col_name = col.name.to_lowercase();
118            let column_desc = if let Some(default) = col.default {
119                let snapshot_value = derive_default_value(default.clone(), &data_type)
120                    .unwrap_or_else(|e| {
121                        tracing::warn!(
122                            column = col_name,
123                            ?default,
124                            %data_type,
125                            error = %e.as_report(),
126                            "failed to derive column default value, fallback to `NULL`",
127                        );
128                        None
129                    });
130
131                ColumnDesc::named_with_default_value(
132                    col_name.clone(),
133                    ColumnId::placeholder(),
134                    data_type.clone(),
135                    snapshot_value,
136                )
137            } else {
138                ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
139            };
140
141            column_descs.push(column_desc);
142            if matches!(col.key, ColumnKey::Primary) {
143                pk_names.push(col_name);
144            }
145        }
146
147        if pk_names.is_empty() {
148            return Err(anyhow!("MySQL table doesn't define the primary key").into());
149        }
150
151        Ok(Self {
152            column_descs,
153            pk_names,
154        })
155    }
156
157    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
158        &self.column_descs
159    }
160
161    pub fn pk_names(&self) -> &Vec<String> {
162        &self.pk_names
163    }
164}
165
166fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
167    let datum = match default {
168        ColumnDefault::Null => None,
169        ColumnDefault::Int(val) => match data_type {
170            DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
171            DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
172            DataType::Int64 => Some(ScalarImpl::Int64(val)),
173            DataType::Varchar => {
174                // should be the Enum type which is mapped to Varchar
175                Some(ScalarImpl::from(val.to_string()))
176            }
177            _ => bail!("unexpected default value type for integer"),
178        },
179        ColumnDefault::Real(val) => match data_type {
180            DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
181            DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
182            DataType::Decimal => Some(ScalarImpl::Decimal(
183                Decimal::try_from(val).context("failed to convert default value to decimal")?,
184            )),
185            _ => bail!("unexpected default value type for real"),
186        },
187        ColumnDefault::String(mut val) => {
188            // mysql timestamp is mapped to timestamptz, we use UTC timezone to
189            // interpret its value
190            if data_type == &DataType::Timestamptz {
191                val = timestamp_val_to_timestamptz(val.as_str())?;
192            }
193            Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
194                "failed to parse mysql default value expression, only constant is supported",
195            )?)
196        }
197        ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
198            bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
199        }
200    };
201    Ok(datum)
202}
203
204pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
205    let format = "%Y-%m-%d %H:%M:%S";
206    let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
207        .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
208    let postgres_timestamptz: DateTime<chrono::Utc> =
209        DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
210    Ok(postgres_timestamptz
211        .format("%Y-%m-%d %H:%M:%S%:z")
212        .to_string())
213}
214
215pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
216    macro_rules! column_type {
217        ($($name:literal => $variant:ident),* $(,)?) => {
218            match ty_name.to_lowercase().as_str() {
219                $(
220                    $name => Some(ColumnType::$variant(Default::default())),
221                )*
222                "json" => Some(ColumnType::Json),
223                "date" => Some(ColumnType::Date),
224                "bool" => Some(ColumnType::Bool),
225                "tinyblob" => Some(ColumnType::TinyBlob),
226                "mediumblob" => Some(ColumnType::MediumBlob),
227                "longblob" => Some(ColumnType::LongBlob),
228                _ => None,
229            }
230        };
231    }
232
233    column_type! {
234        "bit" => Bit,
235        "tinyint" => TinyInt,
236        "smallint" => SmallInt,
237        "mediumint" => MediumInt,
238        "int" => Int,
239        "bigint" => BigInt,
240        "decimal" => Decimal,
241        "float" => Float,
242        "double" => Double,
243        "time" => Time,
244        "datetime" => DateTime,
245        "timestamp" => Timestamp,
246        "char" => Char,
247        "nchar" => NChar,
248        "varchar" => Varchar,
249        "nvarchar" => NVarchar,
250        "binary" => Binary,
251        "varbinary" => Varbinary,
252        "text" => Text,
253        "tinytext" => TinyText,
254        "mediumtext" => MediumText,
255        "longtext" => LongText,
256        "blob" => Blob,
257        "enum" => Enum,
258        "set" => Set,
259        "geometry" => Geometry,
260        "point" => Point,
261        "linestring" => LineString,
262        "polygon" => Polygon,
263        "multipoint" => MultiPoint,
264        "multilinestring" => MultiLineString,
265        "multipolygon" => MultiPolygon,
266        "geometrycollection" => GeometryCollection,
267    }
268}
269
270pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
271    let dtype = match col_type {
272        ColumnType::Serial => DataType::Int32,
273        ColumnType::Bit(attr) => {
274            if let Some(1) = attr.maximum {
275                DataType::Boolean
276            } else {
277                return Err(
278                    anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
279                );
280            }
281        }
282        ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
283        ColumnType::Bool => DataType::Boolean,
284        ColumnType::MediumInt(_) => DataType::Int32,
285        ColumnType::Int(_) => DataType::Int32,
286        ColumnType::BigInt(_) => DataType::Int64,
287        ColumnType::Decimal(_) => DataType::Decimal,
288        ColumnType::Float(_) => DataType::Float32,
289        ColumnType::Double(_) => DataType::Float64,
290        ColumnType::Date => DataType::Date,
291        ColumnType::Time(_) => DataType::Time,
292        ColumnType::DateTime(_) => DataType::Timestamp,
293        ColumnType::Timestamp(_) => DataType::Timestamptz,
294        ColumnType::Year => DataType::Int32,
295        ColumnType::Char(_)
296        | ColumnType::NChar(_)
297        | ColumnType::Varchar(_)
298        | ColumnType::NVarchar(_) => DataType::Varchar,
299        ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
300        ColumnType::Text(_)
301        | ColumnType::TinyText(_)
302        | ColumnType::MediumText(_)
303        | ColumnType::LongText(_) => DataType::Varchar,
304        ColumnType::Blob(_)
305        | ColumnType::TinyBlob
306        | ColumnType::MediumBlob
307        | ColumnType::LongBlob => DataType::Bytea,
308        ColumnType::Enum(_) => DataType::Varchar,
309        ColumnType::Json => DataType::Jsonb,
310        ColumnType::Set(_) => {
311            return Err(anyhow!("SET type not supported").into());
312        }
313        ColumnType::Geometry(_) => {
314            return Err(anyhow!("GEOMETRY type not supported").into());
315        }
316        ColumnType::Point(_) => {
317            return Err(anyhow!("POINT type not supported").into());
318        }
319        ColumnType::LineString(_) => {
320            return Err(anyhow!("LINE string type not supported").into());
321        }
322        ColumnType::Polygon(_) => {
323            return Err(anyhow!("POLYGON type not supported").into());
324        }
325        ColumnType::MultiPoint(_) => {
326            return Err(anyhow!("MULTI POINT type not supported").into());
327        }
328        ColumnType::MultiLineString(_) => {
329            return Err(anyhow!("MULTI LINE STRING type not supported").into());
330        }
331        ColumnType::MultiPolygon(_) => {
332            return Err(anyhow!("MULTI POLYGON type not supported").into());
333        }
334        ColumnType::GeometryCollection(_) => {
335            return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
336        }
337        ColumnType::Unknown(_) => {
338            return Err(anyhow!("Unknown MySQL data type").into());
339        }
340    };
341
342    Ok(dtype)
343}
344
345pub struct MySqlExternalTableReader {
346    rw_schema: Schema,
347    field_names: String,
348    pool: mysql_async::Pool,
349}
350
351impl ExternalTableReader for MySqlExternalTableReader {
352    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
353        let mut conn = self.pool.get_conn().await?;
354
355        let sql = "SHOW MASTER STATUS".to_owned();
356        let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
357        let row = rs
358            .iter_mut()
359            .exactly_one()
360            .ok()
361            .context("expect exactly one row when reading binlog offset")?;
362        drop(conn);
363        Ok(CdcOffset::MySql(MySqlOffset {
364            filename: row.take("File").unwrap(),
365            position: row.take("Position").unwrap(),
366        }))
367    }
368
369    fn snapshot_read(
370        &self,
371        table_name: SchemaTableName,
372        start_pk: Option<OwnedRow>,
373        primary_keys: Vec<String>,
374        limit: u32,
375    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
376        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
377    }
378
379    async fn disconnect(self) -> ConnectorResult<()> {
380        self.pool.disconnect().await.map_err(|e| e.into())
381    }
382
383    fn get_parallel_cdc_splits(
384        &self,
385        _options: CdcTableSnapshotSplitOption,
386    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
387        // TODO(zw): feat: impl
388        stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
389    }
390
391    fn split_snapshot_read(
392        &self,
393        _table_name: SchemaTableName,
394        _left: OwnedRow,
395        _right: OwnedRow,
396        _split_columns: Vec<Field>,
397    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
398        todo!("implement MySQL CDC parallelized backfill")
399    }
400}
401
402impl MySqlExternalTableReader {
403    pub fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
404        let mut opts_builder = mysql_async::OptsBuilder::default()
405            .user(Some(config.username))
406            .pass(Some(config.password))
407            .ip_or_hostname(config.host)
408            .tcp_port(config.port.parse::<u16>().unwrap())
409            .db_name(Some(config.database));
410
411        opts_builder = match config.ssl_mode {
412            SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
413            // verify-ca and verify-full are same as required for mysql now
414            SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
415                let ssl_without_verify = mysql_async::SslOpts::default()
416                    .with_danger_accept_invalid_certs(true)
417                    .with_danger_skip_domain_validation(true);
418                opts_builder.ssl_opts(Some(ssl_without_verify))
419            }
420        };
421        let pool = mysql_async::Pool::new(opts_builder);
422
423        let field_names = rw_schema
424            .fields
425            .iter()
426            .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
427            .map(|f| Self::quote_column(f.name.as_str()))
428            .join(",");
429
430        Ok(Self {
431            rw_schema,
432            field_names,
433            pool,
434        })
435    }
436
437    pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
438        // schema name is the database name in mysql
439        format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
440    }
441
442    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
443        Box::new(move |offset| {
444            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
445                offset,
446            )?))
447        })
448    }
449
450    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
451    async fn snapshot_read_inner(
452        &self,
453        table_name: SchemaTableName,
454        start_pk_row: Option<OwnedRow>,
455        primary_keys: Vec<String>,
456        limit: u32,
457    ) {
458        let order_key = primary_keys
459            .iter()
460            .map(|col| Self::quote_column(col))
461            .join(",");
462        let sql = if start_pk_row.is_none() {
463            format!(
464                "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
465                self.field_names,
466                Self::get_normalized_table_name(&table_name),
467                order_key,
468            )
469        } else {
470            let filter_expr = Self::filter_expression(&primary_keys);
471            format!(
472                "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
473                self.field_names,
474                Self::get_normalized_table_name(&table_name),
475                filter_expr,
476                order_key,
477            )
478        };
479
480        let mut conn = self.pool.get_conn().await?;
481        // Set session timezone to UTC
482        conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
483
484        if start_pk_row.is_none() {
485            let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
486            let row_stream = rs_stream.map(|row| {
487                // convert mysql row into OwnedRow
488                let mut row = row?;
489                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
490            });
491            pin_mut!(row_stream);
492            #[for_await]
493            for row in row_stream {
494                let row = row?;
495                yield row;
496            }
497        } else {
498            let field_map = self
499                .rw_schema
500                .fields
501                .iter()
502                .map(|f| (f.name.as_str(), f.data_type.clone()))
503                .collect::<HashMap<_, _>>();
504
505            // fill in start primary key params
506            let params: Vec<_> = primary_keys
507                .iter()
508                .zip_eq_fast(start_pk_row.unwrap().into_iter())
509                .map(|(pk, datum)| {
510                    if let Some(value) = datum {
511                        let ty = field_map.get(pk.as_str()).unwrap();
512                        let val = match ty {
513                            DataType::Boolean => Value::from(value.into_bool()),
514                            DataType::Int16 => Value::from(value.into_int16()),
515                            DataType::Int32 => Value::from(value.into_int32()),
516                            DataType::Int64 => Value::from(value.into_int64()),
517                            DataType::Float32 => Value::from(value.into_float32().into_inner()),
518                            DataType::Float64 => Value::from(value.into_float64().into_inner()),
519                            DataType::Varchar => Value::from(String::from(value.into_utf8())),
520                            DataType::Date => Value::from(value.into_date().0),
521                            DataType::Time => Value::from(value.into_time().0),
522                            DataType::Timestamp => Value::from(value.into_timestamp().0),
523                            _ => bail!("unsupported primary key data type: {}", ty),
524                        };
525                        ConnectorResult::Ok((pk.to_lowercase(), val))
526                    } else {
527                        bail!("primary key {} cannot be null", pk);
528                    }
529                })
530                .try_collect::<_, _, ConnectorError>()?;
531
532            tracing::debug!("snapshot read params: {:?}", &params);
533            let rs_stream = sql
534                .with(Params::from(params))
535                .stream::<mysql_async::Row, _>(&mut conn)
536                .await?;
537
538            let row_stream = rs_stream.map(|row| {
539                // convert mysql row into OwnedRow
540                let mut row = row?;
541                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
542            });
543            pin_mut!(row_stream);
544            #[for_await]
545            for row in row_stream {
546                let row = row?;
547                yield row;
548            }
549        };
550        drop(conn);
551    }
552
553    // mysql cannot leverage the given key to narrow down the range of scan,
554    // we need to rewrite the comparison conditions by our own.
555    // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y))
556    fn filter_expression(columns: &[String]) -> String {
557        let mut conditions = vec![];
558        // push the first condition
559        conditions.push(format!(
560            "({} > :{})",
561            Self::quote_column(&columns[0]),
562            columns[0].to_lowercase()
563        ));
564        for i in 2..=columns.len() {
565            // '=' condition
566            let mut condition = String::new();
567            for (j, col) in columns.iter().enumerate().take(i - 1) {
568                if j == 0 {
569                    condition.push_str(&format!(
570                        "({} = :{})",
571                        Self::quote_column(col),
572                        col.to_lowercase()
573                    ));
574                } else {
575                    condition.push_str(&format!(
576                        " AND ({} = :{})",
577                        Self::quote_column(col),
578                        col.to_lowercase()
579                    ));
580                }
581            }
582            // '>' condition
583            condition.push_str(&format!(
584                " AND ({} > :{})",
585                Self::quote_column(&columns[i - 1]),
586                columns[i - 1].to_lowercase()
587            ));
588            conditions.push(format!("({})", condition));
589        }
590        if columns.len() > 1 {
591            conditions.join(" OR ")
592        } else {
593            conditions.join("")
594        }
595    }
596
597    fn quote_column(column: &str) -> String {
598        format!("`{}`", column)
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use std::collections::HashMap;
605
606    use futures::pin_mut;
607    use futures_async_stream::for_await;
608    use maplit::{convert_args, hashmap};
609    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
610    use risingwave_common::types::DataType;
611
612    use crate::source::cdc::external::mysql::MySqlExternalTable;
613    use crate::source::cdc::external::{
614        CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
615        SchemaTableName,
616    };
617
618    #[ignore]
619    #[tokio::test]
620    async fn test_mysql_schema() {
621        let config = ExternalTableConfig {
622            connector: "mysql-cdc".to_owned(),
623            host: "localhost".to_owned(),
624            port: "8306".to_owned(),
625            username: "root".to_owned(),
626            password: "123456".to_owned(),
627            database: "mydb".to_owned(),
628            schema: "".to_owned(),
629            table: "part".to_owned(),
630            ssl_mode: Default::default(),
631            ssl_root_cert: None,
632            encrypt: "false".to_owned(),
633        };
634
635        let table = MySqlExternalTable::connect(config).await.unwrap();
636        println!("columns: {:?}", &table.column_descs);
637        println!("primary keys: {:?}", &table.pk_names);
638    }
639
640    #[test]
641    fn test_mysql_filter_expr() {
642        let cols = vec!["id".to_owned()];
643        let expr = MySqlExternalTableReader::filter_expression(&cols);
644        assert_eq!(expr, "(`id` > :id)");
645
646        let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
647        let expr = MySqlExternalTableReader::filter_expression(&cols);
648        assert_eq!(
649            expr,
650            "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
651        );
652    }
653
654    #[test]
655    fn test_mysql_binlog_offset() {
656        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
657        let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
658        let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
659        let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
660        let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
661
662        let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
663        let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
664        let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
665        let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
666        let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
667
668        assert!(off0 <= off1);
669        assert!(off1 > off2);
670        assert!(off2 < off3);
671        assert_eq!(off3, off4);
672    }
673
674    // manual test case
675    #[ignore]
676    #[tokio::test]
677    async fn test_mysql_table_reader() {
678        let columns = vec![
679            ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
680            ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
681            ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
682            ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
683        ];
684        let rw_schema = Schema {
685            fields: columns.iter().map(Field::from).collect(),
686        };
687        let props: HashMap<String, String> = convert_args!(hashmap!(
688                "hostname" => "localhost",
689                "port" => "8306",
690                "username" => "root",
691                "password" => "123456",
692                "database.name" => "mytest",
693                "table.name" => "t1"));
694
695        let config =
696            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
697                .unwrap();
698        let reader = MySqlExternalTableReader::new(config, rw_schema).unwrap();
699        let offset = reader.current_cdc_offset().await.unwrap();
700        println!("BinlogOffset: {:?}", offset);
701
702        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
703        let parser = MySqlExternalTableReader::get_cdc_offset_parser();
704        println!("parsed offset: {:?}", parser(off0_str).unwrap());
705        let table_name = SchemaTableName {
706            schema_name: "mytest".to_owned(),
707            table_name: "t1".to_owned(),
708        };
709
710        let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
711        pin_mut!(stream);
712        #[for_await]
713        for row in stream {
714            println!("OwnedRow: {:?}", row);
715        }
716    }
717}