risingwave_connector/source/cdc/external/
mysql.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType, NumericAttr};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::connector_common::SslMode;
41// Re-export SslMode for convenience
42pub use crate::connector_common::SslMode as MySqlSslMode;
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::source::CdcTableSnapshotSplit;
45use crate::source::cdc::external::{
46    CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
47    ExternalTableConfig, ExternalTableReader, SchemaTableName, mysql_row_to_owned_row,
48};
49
50/// Build MySQL connection pool with proper SSL configuration.
51///
52/// This helper function creates a `mysql_async::Pool` with all necessary configurations
53/// including SSL settings. Use this function to ensure consistent MySQL connection setup
54/// across the codebase.
55///
56/// # Arguments
57/// * `host` - MySQL server hostname or IP address
58/// * `port` - MySQL server port
59/// * `username` - MySQL username
60/// * `password` - MySQL password
61/// * `database` - Database name
62/// * `ssl_mode` - SSL mode configuration (disabled, preferred, required, verify-ca, verify-full)
63///
64/// # Returns
65/// Returns a configured `mysql_async::Pool` ready for use
66pub fn build_mysql_connection_pool(
67    host: &str,
68    port: u16,
69    username: &str,
70    password: &str,
71    database: &str,
72    ssl_mode: SslMode,
73) -> mysql_async::Pool {
74    let mut opts_builder = mysql_async::OptsBuilder::default()
75        .user(Some(username))
76        .pass(Some(password))
77        .ip_or_hostname(host)
78        .tcp_port(port)
79        .db_name(Some(database));
80
81    opts_builder = match ssl_mode {
82        SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
83        // verify-ca and verify-full are same as required for mysql now
84        SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
85            let ssl_without_verify = mysql_async::SslOpts::default()
86                .with_danger_accept_invalid_certs(true)
87                .with_danger_skip_domain_validation(true);
88            opts_builder.ssl_opts(Some(ssl_without_verify))
89        }
90    };
91
92    mysql_async::Pool::new(opts_builder)
93}
94
95#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
96pub struct MySqlOffset {
97    pub filename: String,
98    pub position: u64,
99}
100
101impl MySqlOffset {
102    pub fn new(filename: String, position: u64) -> Self {
103        Self { filename, position }
104    }
105}
106
107impl MySqlOffset {
108    pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
109        let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
110            .with_context(|| format!("invalid upstream offset: {}", offset))?;
111
112        Ok(Self {
113            filename: dbz_offset
114                .source_offset
115                .file
116                .context("binlog file not found in offset")?,
117            position: dbz_offset
118                .source_offset
119                .pos
120                .context("binlog position not found in offset")?,
121        })
122    }
123}
124
125pub struct MySqlExternalTable {
126    column_descs: Vec<ColumnDesc>,
127    pk_names: Vec<String>,
128}
129
130impl MySqlExternalTable {
131    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
132        tracing::debug!("connect to mysql");
133        let options = MySqlConnectOptions::new()
134            .username(&config.username)
135            .password(&config.password)
136            .host(&config.host)
137            .port(config.port.parse::<u16>().unwrap())
138            .database(&config.database)
139            .ssl_mode(match config.ssl_mode {
140                SslMode::Disabled => sqlx::mysql::MySqlSslMode::Disabled,
141                SslMode::Preferred => sqlx::mysql::MySqlSslMode::Preferred,
142                SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
143                _ => {
144                    return Err(anyhow!("unsupported SSL mode").into());
145                }
146            });
147
148        let connection = MySqlPool::connect_with(options).await?;
149        let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
150
151        // discover system version first
152        let system_info = schema_discovery.discover_system().await?;
153        schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
154        let schema = Alias::new(config.database.as_str()).into_iden();
155        let table = Alias::new(config.table.as_str()).into_iden();
156        let columns = schema_discovery
157            .discover_columns(schema, table, &system_info)
158            .await?;
159        let mut column_descs = vec![];
160        let mut pk_names = vec![];
161        for col in columns {
162            let data_type = mysql_type_to_rw_type(&col.col_type)?;
163            // column name in mysql is case-insensitive, convert to lowercase
164            let col_name = col.name.to_lowercase();
165            let column_desc = if let Some(default) = col.default {
166                let snapshot_value = derive_default_value(default.clone(), &data_type)
167                    .unwrap_or_else(|e| {
168                        tracing::warn!(
169                            column = col_name,
170                            ?default,
171                            %data_type,
172                            error = %e.as_report(),
173                            "failed to derive column default value, fallback to `NULL`",
174                        );
175                        None
176                    });
177
178                ColumnDesc::named_with_default_value(
179                    col_name.clone(),
180                    ColumnId::placeholder(),
181                    data_type.clone(),
182                    snapshot_value,
183                )
184            } else {
185                ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
186            };
187
188            column_descs.push(column_desc);
189            if matches!(col.key, ColumnKey::Primary) {
190                pk_names.push(col_name);
191            }
192        }
193
194        if pk_names.is_empty() {
195            return Err(anyhow!("MySQL table doesn't define the primary key").into());
196        }
197        Ok(Self {
198            column_descs,
199            pk_names,
200        })
201    }
202
203    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
204        &self.column_descs
205    }
206
207    pub fn pk_names(&self) -> &Vec<String> {
208        &self.pk_names
209    }
210}
211
212fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
213    let datum = match default {
214        ColumnDefault::Null => None,
215        ColumnDefault::Int(val) => match data_type {
216            DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
217            DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
218            DataType::Int64 => Some(ScalarImpl::Int64(val)),
219            DataType::Varchar => {
220                // should be the Enum type which is mapped to Varchar
221                Some(ScalarImpl::from(val.to_string()))
222            }
223            _ => bail!("unexpected default value type for integer"),
224        },
225        ColumnDefault::Real(val) => match data_type {
226            DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
227            DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
228            DataType::Decimal => Some(ScalarImpl::Decimal(
229                Decimal::try_from(val).context("failed to convert default value to decimal")?,
230            )),
231            _ => bail!("unexpected default value type for real"),
232        },
233        ColumnDefault::String(mut val) => {
234            // mysql timestamp is mapped to timestamptz, we use UTC timezone to
235            // interpret its value
236            if data_type == &DataType::Timestamptz {
237                val = timestamp_val_to_timestamptz(val.as_str())?;
238            }
239            Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
240                "failed to parse mysql default value expression, only constant is supported",
241            )?)
242        }
243        ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
244            bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
245        }
246    };
247    Ok(datum)
248}
249
250pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
251    let format = "%Y-%m-%d %H:%M:%S";
252    let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
253        .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
254    let postgres_timestamptz: DateTime<chrono::Utc> =
255        DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
256    Ok(postgres_timestamptz
257        .format("%Y-%m-%d %H:%M:%S%:z")
258        .to_string())
259}
260
261pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
262    // Debezium schema change message may include extra qualifiers, e.g. `BIGINT UNSIGNED`,
263    // `BIGINT(20) UNSIGNED`, `INT UNSIGNED ZEROFILL`, etc.
264    let ty = ty_name.trim().to_lowercase();
265    let base = ty
266        .split_whitespace()
267        .next()
268        .unwrap_or_default()
269        .split('(')
270        .next()
271        .unwrap_or_default();
272    let is_unsigned = ty.contains("unsigned");
273    let is_zero_fill = ty.contains("zerofill");
274
275    let make_numeric_attr = || {
276        let mut attr = NumericAttr::default();
277        if is_unsigned {
278            attr.unsigned = Some(true);
279        }
280        if is_zero_fill {
281            attr.zero_fill = Some(true);
282        }
283        attr
284    };
285
286    match base {
287        "bit" => Some(ColumnType::Bit(make_numeric_attr())),
288        "tinyint" => Some(ColumnType::TinyInt(make_numeric_attr())),
289        "smallint" => Some(ColumnType::SmallInt(make_numeric_attr())),
290        "mediumint" => Some(ColumnType::MediumInt(make_numeric_attr())),
291        "int" => Some(ColumnType::Int(make_numeric_attr())),
292        "bigint" => Some(ColumnType::BigInt(make_numeric_attr())),
293        "decimal" => Some(ColumnType::Decimal(make_numeric_attr())),
294        "float" => Some(ColumnType::Float(make_numeric_attr())),
295        "double" => Some(ColumnType::Double(make_numeric_attr())),
296        "time" => Some(ColumnType::Time(Default::default())),
297        "datetime" => Some(ColumnType::DateTime(Default::default())),
298        "timestamp" => Some(ColumnType::Timestamp(Default::default())),
299        "char" => Some(ColumnType::Char(Default::default())),
300        "nchar" => Some(ColumnType::NChar(Default::default())),
301        "varchar" => Some(ColumnType::Varchar(Default::default())),
302        "nvarchar" => Some(ColumnType::NVarchar(Default::default())),
303        "binary" => Some(ColumnType::Binary(Default::default())),
304        "varbinary" => Some(ColumnType::Varbinary(Default::default())),
305        "text" => Some(ColumnType::Text(Default::default())),
306        "tinytext" => Some(ColumnType::TinyText(Default::default())),
307        "mediumtext" => Some(ColumnType::MediumText(Default::default())),
308        "longtext" => Some(ColumnType::LongText(Default::default())),
309        "blob" => Some(ColumnType::Blob(Default::default())),
310        "tinyblob" => Some(ColumnType::TinyBlob),
311        "mediumblob" => Some(ColumnType::MediumBlob),
312        "longblob" => Some(ColumnType::LongBlob),
313        "enum" => Some(ColumnType::Enum(Default::default())),
314        "set" => Some(ColumnType::Set(Default::default())),
315        "json" => Some(ColumnType::Json),
316        "date" => Some(ColumnType::Date),
317        "bool" => Some(ColumnType::Bool),
318        "geometry" => Some(ColumnType::Geometry(Default::default())),
319        "point" => Some(ColumnType::Point(Default::default())),
320        "linestring" => Some(ColumnType::LineString(Default::default())),
321        "polygon" => Some(ColumnType::Polygon(Default::default())),
322        "multipoint" => Some(ColumnType::MultiPoint(Default::default())),
323        "multilinestring" => Some(ColumnType::MultiLineString(Default::default())),
324        "multipolygon" => Some(ColumnType::MultiPolygon(Default::default())),
325        "geometrycollection" => Some(ColumnType::GeometryCollection(Default::default())),
326        _ => None,
327    }
328}
329
330pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
331    let dtype = match col_type {
332        ColumnType::Serial => DataType::Int32,
333        ColumnType::Bit(attr) => {
334            if let Some(1) = attr.maximum {
335                DataType::Boolean
336            } else {
337                return Err(
338                    anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
339                );
340            }
341        }
342        // Unsigned integer family needs promotion to avoid overflow.
343        ColumnType::TinyInt(_) => DataType::Int16,
344        ColumnType::SmallInt(attr) => {
345            if attr.unsigned == Some(true) {
346                DataType::Int32
347            } else {
348                DataType::Int16
349            }
350        }
351        ColumnType::Bool => DataType::Boolean,
352        ColumnType::MediumInt(_) => DataType::Int32,
353        ColumnType::Int(attr) => {
354            if attr.unsigned == Some(true) {
355                DataType::Int64
356            } else {
357                DataType::Int32
358            }
359        }
360        ColumnType::BigInt(attr) => {
361            if attr.unsigned == Some(true) {
362                DataType::Decimal
363            } else {
364                DataType::Int64
365            }
366        }
367        ColumnType::Decimal(_) => DataType::Decimal,
368        ColumnType::Float(_) => DataType::Float32,
369        ColumnType::Double(_) => DataType::Float64,
370        ColumnType::Date => DataType::Date,
371        ColumnType::Time(_) => DataType::Time,
372        ColumnType::DateTime(_) => DataType::Timestamp,
373        ColumnType::Timestamp(_) => DataType::Timestamptz,
374        ColumnType::Year => DataType::Int32,
375        ColumnType::Char(_)
376        | ColumnType::NChar(_)
377        | ColumnType::Varchar(_)
378        | ColumnType::NVarchar(_) => DataType::Varchar,
379        ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
380        ColumnType::Text(_)
381        | ColumnType::TinyText(_)
382        | ColumnType::MediumText(_)
383        | ColumnType::LongText(_) => DataType::Varchar,
384        ColumnType::Blob(_)
385        | ColumnType::TinyBlob
386        | ColumnType::MediumBlob
387        | ColumnType::LongBlob => DataType::Bytea,
388        ColumnType::Enum(_) => DataType::Varchar,
389        ColumnType::Json => DataType::Jsonb,
390        ColumnType::Set(_) => {
391            return Err(anyhow!("SET type not supported").into());
392        }
393        ColumnType::Geometry(_) => {
394            return Err(anyhow!("GEOMETRY type not supported").into());
395        }
396        ColumnType::Point(_) => {
397            return Err(anyhow!("POINT type not supported").into());
398        }
399        ColumnType::LineString(_) => {
400            return Err(anyhow!("LINE string type not supported").into());
401        }
402        ColumnType::Polygon(_) => {
403            return Err(anyhow!("POLYGON type not supported").into());
404        }
405        ColumnType::MultiPoint(_) => {
406            return Err(anyhow!("MULTI POINT type not supported").into());
407        }
408        ColumnType::MultiLineString(_) => {
409            return Err(anyhow!("MULTI LINE STRING type not supported").into());
410        }
411        ColumnType::MultiPolygon(_) => {
412            return Err(anyhow!("MULTI POLYGON type not supported").into());
413        }
414        ColumnType::GeometryCollection(_) => {
415            return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
416        }
417        ColumnType::Unknown(_) => {
418            return Err(anyhow!("Unknown MySQL data type").into());
419        }
420    };
421
422    Ok(dtype)
423}
424
425pub struct MySqlExternalTableReader {
426    rw_schema: Schema,
427    field_names: String,
428    pool: mysql_async::Pool,
429    upstream_mysql_pk_infos: Vec<(String, String)>, // (column_name, column_type)
430    mysql_version: (u8, u8),
431}
432
433impl ExternalTableReader for MySqlExternalTableReader {
434    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
435        let mut conn = self.pool.get_conn().await?;
436
437        // Choose SQL command based on MySQL version
438        let sql = if self.is_mysql_8_4_or_later() {
439            "SHOW BINARY LOG STATUS"
440        } else {
441            "SHOW MASTER STATUS"
442        };
443
444        tracing::debug!(
445            "Using SQL command: {} for MySQL version {}.{}",
446            sql,
447            self.mysql_version.0,
448            self.mysql_version.1
449        );
450        let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
451        let row = rs
452            .iter_mut()
453            .exactly_one()
454            .ok()
455            .context("expect exactly one row when reading binlog offset")?;
456        drop(conn);
457        Ok(CdcOffset::MySql(MySqlOffset {
458            filename: row.take("File").unwrap(),
459            position: row.take("Position").unwrap(),
460        }))
461    }
462
463    fn snapshot_read(
464        &self,
465        table_name: SchemaTableName,
466        start_pk: Option<OwnedRow>,
467        primary_keys: Vec<String>,
468        limit: u32,
469    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
470        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
471    }
472
473    async fn disconnect(self) -> ConnectorResult<()> {
474        self.pool.disconnect().await.map_err(|e| e.into())
475    }
476
477    fn get_parallel_cdc_splits(
478        &self,
479        _options: CdcTableSnapshotSplitOption,
480    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
481        // TODO(zw): feat: impl
482        stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
483    }
484
485    fn split_snapshot_read(
486        &self,
487        _table_name: SchemaTableName,
488        _left: OwnedRow,
489        _right: OwnedRow,
490        _split_columns: Vec<Field>,
491    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
492        todo!("implement MySQL CDC parallelized backfill")
493    }
494}
495
496impl MySqlExternalTableReader {
497    /// Get MySQL version from the connection
498    async fn get_mysql_version(pool: &mysql_async::Pool) -> ConnectorResult<(u8, u8)> {
499        let mut conn = pool.get_conn().await?;
500        let result: Option<String> = conn.query_first("SELECT VERSION()").await?;
501
502        if let Some(version_str) = result {
503            let parts: Vec<&str> = version_str.split('.').collect();
504            if parts.len() >= 2 {
505                let major_version = parts[0]
506                    .parse::<u8>()
507                    .context("Failed to parse major version")?;
508                let minor_version = parts[1]
509                    .parse::<u8>()
510                    .context("Failed to parse minor version")?;
511                return Ok((major_version, minor_version));
512            }
513        }
514        Err(anyhow!("Failed to get MySQL version").into())
515    }
516
517    /// Check if MySQL version is 8.4 or later
518    fn is_mysql_8_4_or_later(&self) -> bool {
519        let (major, minor) = self.mysql_version;
520        major > 8 || (major == 8 && minor >= 4)
521    }
522
523    pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
524        let database = config.database.clone();
525        let table = config.table.clone();
526        let pool = build_mysql_connection_pool(
527            &config.host,
528            config.port.parse::<u16>().unwrap(),
529            &config.username,
530            &config.password,
531            &config.database,
532            config.ssl_mode,
533        );
534
535        let field_names = rw_schema
536            .fields
537            .iter()
538            .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
539            .map(|f| Self::quote_column(f.name.as_str()))
540            .join(",");
541
542        // Query MySQL primary key infos for type casting.
543        let upstream_mysql_pk_infos =
544            Self::query_upstream_pk_infos(&pool, &database, &table).await?;
545        // Get MySQL version
546        let mysql_version = Self::get_mysql_version(&pool).await?;
547        tracing::info!(
548            "MySQL version detected: {}.{}",
549            mysql_version.0,
550            mysql_version.1
551        );
552
553        Ok(Self {
554            rw_schema,
555            field_names,
556            pool,
557            upstream_mysql_pk_infos,
558            mysql_version,
559        })
560    }
561
562    pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
563        // schema name is the database name in mysql
564        format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
565    }
566
567    pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
568        Box::new(move |offset| {
569            Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
570                offset,
571            )?))
572        })
573    }
574
575    /// Query upstream primary key data types, used for generating filter conditions with proper type casting.
576    async fn query_upstream_pk_infos(
577        pool: &mysql_async::Pool,
578        database: &str,
579        table: &str,
580    ) -> ConnectorResult<Vec<(String, String)>> {
581        let mut conn = pool.get_conn().await?;
582
583        // Query primary key columns and their data types
584        let sql = format!(
585            "SELECT COLUMN_NAME, COLUMN_TYPE
586            FROM INFORMATION_SCHEMA.COLUMNS
587            WHERE TABLE_SCHEMA = '{}'
588            AND TABLE_NAME = '{}'
589            AND COLUMN_KEY = 'PRI'
590            ORDER BY ORDINAL_POSITION",
591            database, table
592        );
593
594        let rs = conn.query::<mysql_async::Row, _>(sql).await?;
595
596        let mut column_infos = Vec::new();
597        for row in &rs {
598            let column_name: String = row.get(0).unwrap();
599            let column_type: String = row.get(1).unwrap();
600            column_infos.push((column_name, column_type));
601        }
602
603        drop(conn);
604
605        Ok(column_infos)
606    }
607
608    /// Check if a column is unsigned type
609    fn is_unsigned_type(&self, column_name: &str) -> bool {
610        self.upstream_mysql_pk_infos
611            .iter()
612            .find(|(col_name, _)| col_name == column_name)
613            .map(|(_, col_type)| col_type.to_lowercase().contains("unsigned"))
614            .unwrap_or(false)
615    }
616
617    /// Convert negative i64 to unsigned u64 based on column type
618    fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 {
619        negative_val as u64
620    }
621
622    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
623    async fn snapshot_read_inner(
624        &self,
625        table_name: SchemaTableName,
626        start_pk_row: Option<OwnedRow>,
627        primary_keys: Vec<String>,
628        limit: u32,
629    ) {
630        let order_key = primary_keys
631            .iter()
632            .map(|col| Self::quote_column(col))
633            .join(",");
634        let sql = if start_pk_row.is_none() {
635            format!(
636                "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
637                self.field_names,
638                Self::get_normalized_table_name(&table_name),
639                order_key,
640            )
641        } else {
642            let filter_expr = Self::filter_expression(&primary_keys);
643            format!(
644                "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
645                self.field_names,
646                Self::get_normalized_table_name(&table_name),
647                filter_expr,
648                order_key,
649            )
650        };
651        let mut conn = self.pool.get_conn().await?;
652        // Set session timezone to UTC
653        conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
654
655        if let Some(start_pk_row) = start_pk_row {
656            let field_map = self
657                .rw_schema
658                .fields
659                .iter()
660                .map(|f| (f.name.as_str(), f.data_type.clone()))
661                .collect::<HashMap<_, _>>();
662
663            // fill in start primary key params
664            let params: Vec<_> = primary_keys
665                .iter()
666                .zip_eq_fast(start_pk_row.into_iter())
667                .map(|(pk, datum)| {
668                    if let Some(value) = datum {
669                        let ty = field_map.get(pk.as_str()).unwrap();
670                        let val = match ty {
671                            DataType::Boolean => Value::from(value.into_bool()),
672                            DataType::Int16 => Value::from(value.into_int16()),
673                            DataType::Int32 => Value::from(value.into_int32()),
674                            DataType::Int64 => {
675                                let int64_val = value.into_int64();
676                                if int64_val < 0 && self.is_unsigned_type(pk.as_str()) {
677                                    Value::from(self.convert_negative_to_unsigned(int64_val))
678                                } else {
679                                    Value::from(int64_val)
680                                }
681                            }
682                            DataType::Float32 => Value::from(value.into_float32().into_inner()),
683                            DataType::Float64 => Value::from(value.into_float64().into_inner()),
684                            DataType::Varchar => Value::from(String::from(value.into_utf8())),
685                            DataType::Date => Value::from(value.into_date().0),
686                            DataType::Time => Value::from(value.into_time().0),
687                            DataType::Timestamp => Value::from(value.into_timestamp().0),
688                            DataType::Decimal => Value::from(value.into_decimal().to_string()),
689                            DataType::Timestamptz => {
690                                // Convert timestamptz to NaiveDateTime for MySQL TIMESTAMP comparison
691                                // MySQL expects NaiveDateTime for TIMESTAMP parameters
692                                let ts = value.into_timestamptz();
693                                let datetime_utc = ts.to_datetime_utc();
694                                let naive_datetime = datetime_utc.naive_utc();
695                                Value::from(naive_datetime)
696                            }
697                            _ => bail!("unsupported primary key data type: {}", ty),
698                        };
699                        ConnectorResult::Ok((pk.to_lowercase(), val))
700                    } else {
701                        bail!("primary key {} cannot be null", pk);
702                    }
703                })
704                .try_collect::<_, _, ConnectorError>()?;
705
706            tracing::debug!("snapshot read params: {:?}", &params);
707            let rs_stream = sql
708                .with(Params::from(params))
709                .stream::<mysql_async::Row, _>(&mut conn)
710                .await?;
711
712            let row_stream = rs_stream.map(|row| {
713                // convert mysql row into OwnedRow
714                let mut row = row?;
715                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
716            });
717            pin_mut!(row_stream);
718            #[for_await]
719            for row in row_stream {
720                let row = row?;
721                yield row;
722            }
723        } else {
724            let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
725            let row_stream = rs_stream.map(|row| {
726                // convert mysql row into OwnedRow
727                let mut row = row?;
728                Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
729            });
730            pin_mut!(row_stream);
731            #[for_await]
732            for row in row_stream {
733                let row = row?;
734                yield row;
735            }
736        }
737        drop(conn);
738    }
739
740    // mysql cannot leverage the given key to narrow down the range of scan,
741    // we need to rewrite the comparison conditions by our own.
742    // (a, b) > (x, y) => (`a` > x) OR ((`a` = x) AND (`b` > y))
743    fn filter_expression(columns: &[String]) -> String {
744        let mut conditions = vec![];
745        // push the first condition
746        conditions.push(format!(
747            "({} > :{})",
748            Self::quote_column(&columns[0]),
749            columns[0].to_lowercase()
750        ));
751        for i in 2..=columns.len() {
752            // '=' condition
753            let mut condition = String::new();
754            for (j, col) in columns.iter().enumerate().take(i - 1) {
755                if j == 0 {
756                    condition.push_str(&format!(
757                        "({} = :{})",
758                        Self::quote_column(col),
759                        col.to_lowercase()
760                    ));
761                } else {
762                    condition.push_str(&format!(
763                        " AND ({} = :{})",
764                        Self::quote_column(col),
765                        col.to_lowercase()
766                    ));
767                }
768            }
769            // '>' condition
770            condition.push_str(&format!(
771                " AND ({} > :{})",
772                Self::quote_column(&columns[i - 1]),
773                columns[i - 1].to_lowercase()
774            ));
775            conditions.push(format!("({})", condition));
776        }
777        if columns.len() > 1 {
778            conditions.join(" OR ")
779        } else {
780            conditions.join("")
781        }
782    }
783
784    fn quote_column(column: &str) -> String {
785        format!("`{}`", column)
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use std::collections::HashMap;
792
793    use futures::pin_mut;
794    use futures_async_stream::for_await;
795    use maplit::{convert_args, hashmap};
796    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
797    use risingwave_common::types::DataType;
798
799    use crate::source::cdc::external::mysql::MySqlExternalTable;
800    use crate::source::cdc::external::{
801        CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
802        SchemaTableName,
803    };
804
805    #[ignore]
806    #[tokio::test]
807    async fn test_mysql_schema() {
808        let config = ExternalTableConfig {
809            connector: "mysql-cdc".to_owned(),
810            host: "localhost".to_owned(),
811            port: "8306".to_owned(),
812            username: "root".to_owned(),
813            password: "123456".to_owned(),
814            database: "mydb".to_owned(),
815            schema: "".to_owned(),
816            table: "part".to_owned(),
817            ssl_mode: Default::default(),
818            ssl_root_cert: None,
819            encrypt: "false".to_owned(),
820        };
821
822        let table = MySqlExternalTable::connect(config).await.unwrap();
823        println!("columns: {:?}", &table.column_descs);
824        println!("primary keys: {:?}", &table.pk_names);
825    }
826
827    #[test]
828    fn test_mysql_filter_expr() {
829        let cols = vec!["id".to_owned()];
830        let expr = MySqlExternalTableReader::filter_expression(&cols);
831        assert_eq!(expr, "(`id` > :id)");
832
833        let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
834        let expr = MySqlExternalTableReader::filter_expression(&cols);
835        assert_eq!(
836            expr,
837            "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
838        );
839    }
840
841    #[test]
842    fn test_mysql_binlog_offset() {
843        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
844        let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
845        let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
846        let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
847        let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
848
849        let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
850        let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
851        let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
852        let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
853        let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
854
855        assert!(off0 <= off1);
856        assert!(off1 > off2);
857        assert!(off2 < off3);
858        assert_eq!(off3, off4);
859    }
860
861    // manual test case
862    #[ignore]
863    #[tokio::test]
864    async fn test_mysql_table_reader() {
865        let columns = [
866            ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
867            ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
868            ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
869            ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
870        ];
871        let rw_schema = Schema {
872            fields: columns.iter().map(Field::from).collect(),
873        };
874        let props: HashMap<String, String> = convert_args!(hashmap!(
875                "hostname" => "localhost",
876                "port" => "8306",
877                "username" => "root",
878                "password" => "123456",
879                "database.name" => "mytest",
880                "table.name" => "t1"));
881
882        let config =
883            serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
884                .unwrap();
885        let reader = MySqlExternalTableReader::new(config, rw_schema)
886            .await
887            .unwrap();
888        let offset = reader.current_cdc_offset().await.unwrap();
889        println!("BinlogOffset: {:?}", offset);
890
891        let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
892        let parser = MySqlExternalTableReader::get_cdc_offset_parser();
893        println!("parsed offset: {:?}", parser(off0_str).unwrap());
894        let table_name = SchemaTableName {
895            schema_name: "mytest".to_owned(),
896            table_name: "t1".to_owned(),
897        };
898
899        let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
900        pin_mut!(stream);
901        #[for_await]
902        for row in stream {
903            println!("OwnedRow: {:?}", row);
904        }
905    }
906}