risingwave_connector/source/cdc/external/
mysql.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::CdcTableSnapshotSplit;
42use crate::source::cdc::external::{
43    CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
44    ExternalTableConfig, ExternalTableReader, SchemaTableName, SslMode, mysql_row_to_owned_row,
45};
46
47#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
48pub struct MySqlOffset {
49    pub filename: String,
50    pub position: u64,
51}
52
53impl MySqlOffset {
54    pub fn new(filename: String, position: u64) -> Self {
55        Self { filename, position }
56    }
57}
58
59impl MySqlOffset {
60    pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
61        let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
62            .with_context(|| format!("invalid upstream offset: {}", offset))?;
63
64        Ok(Self {
65            filename: dbz_offset
66                .source_offset
67                .file
68                .context("binlog file not found in offset")?,
69            position: dbz_offset
70                .source_offset
71                .pos
72                .context("binlog position not found in offset")?,
73        })
74    }
75}
76
77pub struct MySqlExternalTable {
78    column_descs: Vec<ColumnDesc>,
79    pk_names: Vec<String>,
80}
81
82impl MySqlExternalTable {
83    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
84        tracing::debug!("connect to mysql");
85        let options = MySqlConnectOptions::new()
86            .username(&config.username)
87            .password(&config.password)
88            .host(&config.host)
89            .port(config.port.parse::<u16>().unwrap())
90            .database(&config.database)
91            .ssl_mode(match config.ssl_mode {
92                SslMode::Disabled => sqlx::mysql::MySqlSslMode::Disabled,
93                SslMode::Preferred => sqlx::mysql::MySqlSslMode::Preferred,
94                SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
95                _ => {
96                    return Err(anyhow!("unsupported SSL mode").into());
97                }
98            });
99
100        let connection = MySqlPool::connect_with(options).await?;
101        let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
102
103        // discover system version first
104        let system_info = schema_discovery.discover_system().await?;
105        schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
106        let schema = Alias::new(config.database.as_str()).into_iden();
107        let table = Alias::new(config.table.as_str()).into_iden();
108        let columns = schema_discovery
109            .discover_columns(schema, table, &system_info)
110            .await?;
111        let mut column_descs = vec![];
112        let mut pk_names = vec![];
113        for col in columns {
114            let data_type = mysql_type_to_rw_type(&col.col_type)?;
115            // column name in mysql is case-insensitive, convert to lowercase
116            let col_name = col.name.to_lowercase();
117            let column_desc = if let Some(default) = col.default {
118                let snapshot_value = derive_default_value(default.clone(), &data_type)
119                    .unwrap_or_else(|e| {
120                        tracing::warn!(
121                            column = col_name,
122                            ?default,
123                            %data_type,
124                            error = %e.as_report(),
125                            "failed to derive column default value, fallback to `NULL`",
126                        );
127                        None
128                    });
129
130                ColumnDesc::named_with_default_value(
131                    col_name.clone(),
132                    ColumnId::placeholder(),
133                    data_type.clone(),
134                    snapshot_value,
135                )
136            } else {
137                ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
138            };
139
140            column_descs.push(column_desc);
141            if matches!(col.key, ColumnKey::Primary) {
142                pk_names.push(col_name);
143            }
144        }
145
146        if pk_names.is_empty() {
147            return Err(anyhow!("MySQL table doesn't define the primary key").into());
148        }
149        Ok(Self {
150            column_descs,
151            pk_names,
152        })
153    }
154
155    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
156        &self.column_descs
157    }
158
159    pub fn pk_names(&self) -> &Vec<String> {
160        &self.pk_names
161    }
162}
163
164fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
165    let datum = match default {
166        ColumnDefault::Null => None,
167        ColumnDefault::Int(val) => match data_type {
168            DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
169            DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
170            DataType::Int64 => Some(ScalarImpl::Int64(val)),
171            DataType::Varchar => {
172                // should be the Enum type which is mapped to Varchar
173                Some(ScalarImpl::from(val.to_string()))
174            }
175            _ => bail!("unexpected default value type for integer"),
176        },
177        ColumnDefault::Real(val) => match data_type {
178            DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
179            DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
180            DataType::Decimal => Some(ScalarImpl::Decimal(
181                Decimal::try_from(val).context("failed to convert default value to decimal")?,
182            )),
183            _ => bail!("unexpected default value type for real"),
184        },
185        ColumnDefault::String(mut val) => {
186            // mysql timestamp is mapped to timestamptz, we use UTC timezone to
187            // interpret its value
188            if data_type == &DataType::Timestamptz {
189                val = timestamp_val_to_timestamptz(val.as_str())?;
190            }
191            Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
192                "failed to parse mysql default value expression, only constant is supported",
193            )?)
194        }
195        ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
196            bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
197        }
198    };
199    Ok(datum)
200}
201
202pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
203    let format = "%Y-%m-%d %H:%M:%S";
204    let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
205        .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
206    let postgres_timestamptz: DateTime<chrono::Utc> =
207        DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
208    Ok(postgres_timestamptz
209        .format("%Y-%m-%d %H:%M:%S%:z")
210        .to_string())
211}
212
213pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
214    macro_rules! column_type {
215        ($($name:literal => $variant:ident),* $(,)?) => {
216            match ty_name.to_lowercase().as_str() {
217                $(
218                    $name => Some(ColumnType::$variant(Default::default())),
219                )*
220                "json" => Some(ColumnType::Json),
221                "date" => Some(ColumnType::Date),
222                "bool" => Some(ColumnType::Bool),
223                "tinyblob" => Some(ColumnType::TinyBlob),
224                "mediumblob" => Some(ColumnType::MediumBlob),
225                "longblob" => Some(ColumnType::LongBlob),
226                _ => None,
227            }
228        };
229    }
230
231    column_type! {
232        "bit" => Bit,
233        "tinyint" => TinyInt,
234        "smallint" => SmallInt,
235        "mediumint" => MediumInt,
236        "int" => Int,
237        "bigint" => BigInt,
238        "decimal" => Decimal,
239        "float" => Float,
240        "double" => Double,
241        "time" => Time,
242        "datetime" => DateTime,
243        "timestamp" => Timestamp,
244        "char" => Char,
245        "nchar" => NChar,
246        "varchar" => Varchar,
247        "nvarchar" => NVarchar,
248        "binary" => Binary,
249        "varbinary" => Varbinary,
250        "text" => Text,
251        "tinytext" => TinyText,
252        "mediumtext" => MediumText,
253        "longtext" => LongText,
254        "blob" => Blob,
255        "enum" => Enum,
256        "set" => Set,
257        "geometry" => Geometry,
258        "point" => Point,
259        "linestring" => LineString,
260        "polygon" => Polygon,
261        "multipoint" => MultiPoint,
262        "multilinestring" => MultiLineString,
263        "multipolygon" => MultiPolygon,
264        "geometrycollection" => GeometryCollection,
265    }
266}
267
268pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
269    let dtype = match col_type {
270        ColumnType::Serial => DataType::Int32,
271        ColumnType::Bit(attr) => {
272            if let Some(1) = attr.maximum {
273                DataType::Boolean
274            } else {
275                return Err(
276                    anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
277                );
278            }
279        }
280        ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
281        ColumnType::Bool => DataType::Boolean,
282        ColumnType::MediumInt(_) => DataType::Int32,
283        ColumnType::Int(_) => DataType::Int32,
284        ColumnType::BigInt(_) => DataType::Int64,
285        ColumnType::Decimal(_) => DataType::Decimal,
286        ColumnType::Float(_) => DataType::Float32,
287        ColumnType::Double(_) => DataType::Float64,
288        ColumnType::Date => DataType::Date,
289        ColumnType::Time(_) => DataType::Time,
290        ColumnType::DateTime(_) => DataType::Timestamp,
291        ColumnType::Timestamp(_) => DataType::Timestamptz,
292        ColumnType::Year => DataType::Int32,
293        ColumnType::Char(_)
294        | ColumnType::NChar(_)
295        | ColumnType::Varchar(_)
296        | ColumnType::NVarchar(_) => DataType::Varchar,
297        ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
298        ColumnType::Text(_)
299        | ColumnType::TinyText(_)
300        | ColumnType::MediumText(_)
301        | ColumnType::LongText(_) => DataType::Varchar,
302        ColumnType::Blob(_)
303        | ColumnType::TinyBlob
304        | ColumnType::MediumBlob
305        | ColumnType::LongBlob => DataType::Bytea,
306        ColumnType::Enum(_) => DataType::Varchar,
307        ColumnType::Json => DataType::Jsonb,
308        ColumnType::Set(_) => {
309            return Err(anyhow!("SET type not supported").into());
310        }
311        ColumnType::Geometry(_) => {
312            return Err(anyhow!("GEOMETRY type not supported").into());
313        }
314        ColumnType::Point(_) => {
315            return Err(anyhow!("POINT type not supported").into());
316        }
317        ColumnType::LineString(_) => {
318            return Err(anyhow!("LINE string type not supported").into());
319        }
320        ColumnType::Polygon(_) => {
321            return Err(anyhow!("POLYGON type not supported").into());
322        }
323        ColumnType::MultiPoint(_) => {
324            return Err(anyhow!("MULTI POINT type not supported").into());
325        }
326        ColumnType::MultiLineString(_) => {
327            return Err(anyhow!("MULTI LINE STRING type not supported").into());
328        }
329        ColumnType::MultiPolygon(_) => {
330            return Err(anyhow!("MULTI POLYGON type not supported").into());
331        }
332        ColumnType::GeometryCollection(_) => {
333            return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
334        }
335        ColumnType::Unknown(_) => {
336            return Err(anyhow!("Unknown MySQL data type").into());
337        }
338    };
339
340    Ok(dtype)
341}
342
343pub struct MySqlExternalTableReader {
344    rw_schema: Schema,
345    field_names: String,
346    pool: mysql_async::Pool,
347    upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type)
348    mysql_version: (u8, u8),
349}
350
351impl ExternalTableReader for MySqlExternalTableReader {
352    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
353        let mut conn = self.pool.get_conn().await?;
354
355        // Choose SQL command based on MySQL version
356        let sql = if self.is_mysql_8_4_or_later() {
357            "SHOW BINARY LOG STATUS"
358        } else {
359            "SHOW MASTER STATUS"
360        };
361
362        tracing::debug!(
363            "Using SQL command: {} for MySQL version {}.{}",
364            sql,
365            self.mysql_version.0,
366            self.mysql_version.1
367        );
368        let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
369        let row = rs
370            .iter_mut()
371            .exactly_one()
372            .ok()
373            .context("expect exactly one row when reading binlog offset")?;
374        drop(conn);
375        Ok(CdcOffset::MySql(MySqlOffset {
376            filename: row.take("File").unwrap(),
377            position: row.take("Position").unwrap(),
378        }))
379    }
380
381    fn snapshot_read(
382        &self,
383        table_name: SchemaTableName,
384        start_pk: Option<OwnedRow>,
385        primary_keys: Vec<String>,
386        limit: u32,
387    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
388        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
389    }
390
391    async fn disconnect(self) -> ConnectorResult<()> {
392        self.pool.disconnect().await.map_err(|e| e.into())
393    }
394
395    fn get_parallel_cdc_splits(
396        &self,
397        _options: CdcTableSnapshotSplitOption,
398    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
399        // TODO(zw): feat: impl
400        stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
401    }
402
403    fn split_snapshot_read(
404        &self,
405        _table_name: SchemaTableName,
406        _left: OwnedRow,
407        _right: OwnedRow,
408        _split_columns: Vec<Field>,
409    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
410        todo!("implement MySQL CDC parallelized backfill")
411    }
412}
413
414impl MySqlExternalTableReader {
415    /// Get MySQL version from the connection
416    async fn get_mysql_version(pool: &mysql_async::Pool) -> ConnectorResult<(u8, u8)> {
417        let mut conn = pool.get_conn().await?;
418        let result: Option<String> = conn.query_first("SELECT VERSION()").await?;
419
420        if let Some(version_str) = result {
421            let parts: Vec<&str> = version_str.split('.').collect();
422            if parts.len() >= 2 {
423                let major_version = parts[0]
424                    .parse::<u8>()
425                    .context("Failed to parse major version")?;
426                let minor_version = parts[1]
427                    .parse::<u8>()
428                    .context("Failed to parse minor version")?;
429                return Ok((major_version, minor_version));
430            }
431        }
432        Err(anyhow!("Failed to get MySQL version").into())
433    }
434
435    /// Check if MySQL version is 8.4 or later
436    fn is_mysql_8_4_or_later(&self) -> bool {
437        let (major, minor) = self.mysql_version;
438        major > 8 || (major == 8 && minor >= 4)
439    }
440
441    pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
442        let database = config.database.clone();
443        let table = config.table.clone();
444
445        let mut opts_builder = mysql_async::OptsBuilder::default()
446            .user(Some(config.username))
447            .pass(Some(config.password))
448            .ip_or_hostname(config.host)
449            .tcp_port(config.port.parse::<u16>().unwrap())
450            .db_name(Some(config.database));
451
452        opts_builder = match config.ssl_mode {
453            SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
454            // verify-ca and verify-full are same as required for mysql now
455            SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
456                let ssl_without_verify = mysql_async::SslOpts::default()
457                    .with_danger_accept_invalid_certs(true)
458                    .with_danger_skip_domain_validation(true);
459                opts_builder.ssl_opts(Some(ssl_without_verify))
460            }
461        };
462        let pool = mysql_async::Pool::new(opts_builder);
463
464        let field_names = rw_schema
465            .fields
466            .iter()
467            .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
468            .map(|f| Self::quote_column(f.name.as_str()))
469            .join(",");
470
471        // Query MySQL primary key infos for type casting.
472        let upstream_mysql_pk_infos =
473            Self::query_upstream_pk_infos(&pool, &database, &table).await?;
474        // Get MySQL version
475        let mysql_version = Self::get_mysql_version(&pool).await?;
476        tracing::info!(
477            "MySQL version detected: {}.{}",
478            mysql_version.0,
479            mysql_version.1
480        );
481
482        Ok(Self {
483            rw_schema,
484            field_names,
485            pool,
486            upstream_mysql_pk_infos,
487            mysql_version,
488        })
489    }
490
491    pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
492        // schema name is the database name in mysql
493        format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
494    }
495
496    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
497        Box::new(move |offset| {
498            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
499                offset,
500            )?))
501        })
502    }
503
504    /// Query upstream primary key data types, used for generating filter conditions with proper type casting.
505    async fn query_upstream_pk_infos(
506        pool: &mysql_async::Pool,
507        database: &str,
508        table: &str,
509    ) -> ConnectorResult<Vec<(String, String)>> {
510        let mut conn = pool.get_conn().await?;
511
512        // Query primary key columns and their data types
513        let sql = format!(
514            "SELECT COLUMN_NAME, COLUMN_TYPE
515            FROM INFORMATION_SCHEMA.COLUMNS
516            WHERE TABLE_SCHEMA = '{}'
517            AND TABLE_NAME = '{}'
518            AND COLUMN_KEY = 'PRI'
519            ORDER BY ORDINAL_POSITION",
520            database, table
521        );
522
523        let rs = conn.query::<mysql_async::Row, _>(sql).await?;
524
525        let mut column_infos = Vec::new();
526        for row in &rs {
527            let column_name: String = row.get(0).unwrap();
528            let column_type: String = row.get(1).unwrap();
529            column_infos.push((column_name, column_type));
530        }
531
532        drop(conn);
533
534        Ok(column_infos)
535    }
536
537    /// Check if a column is unsigned type
538    fn is_unsigned_type(&self, column_name: &str) -> bool {
539        self.upstream_mysql_pk_infos
540            .iter()
541            .find(|(col_name, _)| col_name == column_name)
542            .map(|(_, col_type)| col_type.to_lowercase().contains("unsigned"))
543            .unwrap_or(false)
544    }
545
546    /// Convert negative i64 to unsigned u64 based on column type
547    fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 {
548        negative_val as u64
549    }
550
551    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
552    async fn snapshot_read_inner(
553        &self,
554        table_name: SchemaTableName,
555        start_pk_row: Option<OwnedRow>,
556        primary_keys: Vec<String>,
557        limit: u32,
558    ) {
559        let order_key = primary_keys
560            .iter()
561            .map(|col| Self::quote_column(col))
562            .join(",");
563        let sql = if start_pk_row.is_none() {
564            format!(
565                "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
566                self.field_names,
567                Self::get_normalized_table_name(&table_name),
568                order_key,
569            )
570        } else {
571            let filter_expr = Self::filter_expression(&primary_keys);
572            format!(
573                "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
574                self.field_names,
575                Self::get_normalized_table_name(&table_name),
576                filter_expr,
577                order_key,
578            )
579        };
580        let mut conn = self.pool.get_conn().await?;
581        // Set session timezone to UTC
582        conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
583
584        if let Some(start_pk_row) = start_pk_row {
585            let field_map = self
586                .rw_schema
587                .fields
588                .iter()
589                .map(|f| (f.name.as_str(), f.data_type.clone()))
590                .collect::<HashMap<_, _>>();
591
592            // fill in start primary key params
593            let params: Vec<_> = primary_keys
594                .iter()
595                .zip_eq_fast(start_pk_row.into_iter())
596                .map(|(pk, datum)| {
597                    if let Some(value) = datum {
598                        let ty = field_map.get(pk.as_str()).unwrap();
599                        let val = match ty {
600                            DataType::Boolean => Value::from(value.into_bool()),
601                            DataType::Int16 => Value::from(value.into_int16()),
602                            DataType::Int32 => Value::from(value.into_int32()),
603                            DataType::Int64 => {
604                                let int64_val = value.into_int64();
605                                if int64_val < 0 && self.is_unsigned_type(pk.as_str()) {
606                                    Value::from(self.convert_negative_to_unsigned(int64_val))
607                                } else {
608                                    Value::from(int64_val)
609                                }
610                            }
611                            DataType::Float32 => Value::from(value.into_float32().into_inner()),
612                            DataType::Float64 => Value::from(value.into_float64().into_inner()),
613                            DataType::Varchar => Value::from(String::from(value.into_utf8())),
614                            DataType::Date => Value::from(value.into_date().0),
615                            DataType::Time => Value::from(value.into_time().0),
616                            DataType::Timestamp => Value::from(value.into_timestamp().0),
617                            DataType::Decimal => Value::from(value.into_decimal().to_string()),
618                            DataType::Timestamptz => {
619                                // Convert timestamptz to NaiveDateTime for MySQL TIMESTAMP comparison
620                                // MySQL expects NaiveDateTime for TIMESTAMP parameters
621                                let ts = value.into_timestamptz();
622                                let datetime_utc = ts.to_datetime_utc();
623                                let naive_datetime = datetime_utc.naive_utc();
624                                Value::from(naive_datetime)
625                            }
626                            _ => bail!("unsupported primary key data type: {}", ty),
627                        };
628                        ConnectorResult::Ok((pk.to_lowercase(), val))
629                    } else {
630                        bail!("primary key {} cannot be null", pk);
631                    }
632                })
633                .try_collect::<_, _, ConnectorError>()?;
634
635            tracing::debug!("snapshot read params: {:?}", &params);
636            let rs_stream = sql
637                .with(Params::from(params))
638                .stream::<mysql_async::Row, _>(&mut conn)
639                .await?;
640
641            let row_stream = rs_stream.map(|row| {
642                // convert mysql row into OwnedRow
643                let mut row = row?;
644                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
645            });
646            pin_mut!(row_stream);
647            #[for_await]
648            for row in row_stream {
649                let row = row?;
650                yield row;
651            }
652        } else {
653            let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
654            let row_stream = rs_stream.map(|row| {
655                // convert mysql row into OwnedRow
656                let mut row = row?;
657                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
658            });
659            pin_mut!(row_stream);
660            #[for_await]
661            for row in row_stream {
662                let row = row?;
663                yield row;
664            }
665        }
666        drop(conn);
667    }
668
669    // mysql cannot leverage the given key to narrow down the range of scan,
670    // we need to rewrite the comparison conditions by our own.
671    // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y))
672    fn filter_expression(columns: &[String]) -> String {
673        let mut conditions = vec![];
674        // push the first condition
675        conditions.push(format!(
676            "({} > :{})",
677            Self::quote_column(&columns[0]),
678            columns[0].to_lowercase()
679        ));
680        for i in 2..=columns.len() {
681            // '=' condition
682            let mut condition = String::new();
683            for (j, col) in columns.iter().enumerate().take(i - 1) {
684                if j == 0 {
685                    condition.push_str(&format!(
686                        "({} = :{})",
687                        Self::quote_column(col),
688                        col.to_lowercase()
689                    ));
690                } else {
691                    condition.push_str(&format!(
692                        " AND ({} = :{})",
693                        Self::quote_column(col),
694                        col.to_lowercase()
695                    ));
696                }
697            }
698            // '>' condition
699            condition.push_str(&format!(
700                " AND ({} > :{})",
701                Self::quote_column(&columns[i - 1]),
702                columns[i - 1].to_lowercase()
703            ));
704            conditions.push(format!("({})", condition));
705        }
706        if columns.len() > 1 {
707            conditions.join(" OR ")
708        } else {
709            conditions.join("")
710        }
711    }
712
713    fn quote_column(column: &str) -> String {
714        format!("`{}`", column)
715    }
716}
717
718#[cfg(test)]
719mod tests {
720    use std::collections::HashMap;
721
722    use futures::pin_mut;
723    use futures_async_stream::for_await;
724    use maplit::{convert_args, hashmap};
725    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
726    use risingwave_common::types::DataType;
727
728    use crate::source::cdc::external::mysql::MySqlExternalTable;
729    use crate::source::cdc::external::{
730        CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
731        SchemaTableName,
732    };
733
734    #[ignore]
735    #[tokio::test]
736    async fn test_mysql_schema() {
737        let config = ExternalTableConfig {
738            connector: "mysql-cdc".to_owned(),
739            host: "localhost".to_owned(),
740            port: "8306".to_owned(),
741            username: "root".to_owned(),
742            password: "123456".to_owned(),
743            database: "mydb".to_owned(),
744            schema: "".to_owned(),
745            table: "part".to_owned(),
746            ssl_mode: Default::default(),
747            ssl_root_cert: None,
748            encrypt: "false".to_owned(),
749        };
750
751        let table = MySqlExternalTable::connect(config).await.unwrap();
752        println!("columns: {:?}", &table.column_descs);
753        println!("primary keys: {:?}", &table.pk_names);
754    }
755
756    #[test]
757    fn test_mysql_filter_expr() {
758        let cols = vec!["id".to_owned()];
759        let expr = MySqlExternalTableReader::filter_expression(&cols);
760        assert_eq!(expr, "(`id` > :id)");
761
762        let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
763        let expr = MySqlExternalTableReader::filter_expression(&cols);
764        assert_eq!(
765            expr,
766            "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
767        );
768    }
769
770    #[test]
771    fn test_mysql_binlog_offset() {
772        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
773        let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
774        let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
775        let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
776        let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
777
778        let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
779        let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
780        let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
781        let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
782        let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
783
784        assert!(off0 <= off1);
785        assert!(off1 > off2);
786        assert!(off2 < off3);
787        assert_eq!(off3, off4);
788    }
789
790    // manual test case
791    #[ignore]
792    #[tokio::test]
793    async fn test_mysql_table_reader() {
794        let columns = [
795            ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
796            ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
797            ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
798            ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
799        ];
800        let rw_schema = Schema {
801            fields: columns.iter().map(Field::from).collect(),
802        };
803        let props: HashMap<String, String> = convert_args!(hashmap!(
804                "hostname" => "localhost",
805                "port" => "8306",
806                "username" => "root",
807                "password" => "123456",
808                "database.name" => "mytest",
809                "table.name" => "t1"));
810
811        let config =
812            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
813                .unwrap();
814        let reader = MySqlExternalTableReader::new(config, rw_schema)
815            .await
816            .unwrap();
817        let offset = reader.current_cdc_offset().await.unwrap();
818        println!("BinlogOffset: {:?}", offset);
819
820        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
821        let parser = MySqlExternalTableReader::get_cdc_offset_parser();
822        println!("parsed offset: {:?}", parser(off0_str).unwrap());
823        let table_name = SchemaTableName {
824            schema_name: "mytest".to_owned(),
825            table_name: "t1".to_owned(),
826        };
827
828        let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
829        pin_mut!(stream);
830        #[for_await]
831        for row in stream {
832            println!("OwnedRow: {:?}", row);
833        }
834    }
835}