1use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::CdcTableSnapshotSplit;
42use crate::source::cdc::external::{
43 CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
44 ExternalTableConfig, ExternalTableReader, SchemaTableName, SslMode, mysql_row_to_owned_row,
45};
46
47#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
48pub struct MySqlOffset {
49 pub filename: String,
50 pub position: u64,
51}
52
53impl MySqlOffset {
54 pub fn new(filename: String, position: u64) -> Self {
55 Self { filename, position }
56 }
57}
58
59impl MySqlOffset {
60 pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
61 let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
62 .with_context(|| format!("invalid upstream offset: {}", offset))?;
63
64 Ok(Self {
65 filename: dbz_offset
66 .source_offset
67 .file
68 .context("binlog file not found in offset")?,
69 position: dbz_offset
70 .source_offset
71 .pos
72 .context("binlog position not found in offset")?,
73 })
74 }
75}
76
77pub struct MySqlExternalTable {
78 column_descs: Vec<ColumnDesc>,
79 pk_names: Vec<String>,
80}
81
82impl MySqlExternalTable {
83 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
84 tracing::debug!("connect to mysql");
85 let options = MySqlConnectOptions::new()
86 .username(&config.username)
87 .password(&config.password)
88 .host(&config.host)
89 .port(config.port.parse::<u16>().unwrap())
90 .database(&config.database)
91 .ssl_mode(match config.ssl_mode {
92 SslMode::Disabled => sqlx::mysql::MySqlSslMode::Disabled,
93 SslMode::Preferred => sqlx::mysql::MySqlSslMode::Preferred,
94 SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
95 _ => {
96 return Err(anyhow!("unsupported SSL mode").into());
97 }
98 });
99
100 let connection = MySqlPool::connect_with(options).await?;
101 let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
102
103 let system_info = schema_discovery.discover_system().await?;
105 schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
106 let schema = Alias::new(config.database.as_str()).into_iden();
107 let table = Alias::new(config.table.as_str()).into_iden();
108 let columns = schema_discovery
109 .discover_columns(schema, table, &system_info)
110 .await?;
111 let mut column_descs = vec![];
112 let mut pk_names = vec![];
113 for col in columns {
114 let data_type = mysql_type_to_rw_type(&col.col_type)?;
115 let col_name = col.name.to_lowercase();
117 let column_desc = if let Some(default) = col.default {
118 let snapshot_value = derive_default_value(default.clone(), &data_type)
119 .unwrap_or_else(|e| {
120 tracing::warn!(
121 column = col_name,
122 ?default,
123 %data_type,
124 error = %e.as_report(),
125 "failed to derive column default value, fallback to `NULL`",
126 );
127 None
128 });
129
130 ColumnDesc::named_with_default_value(
131 col_name.clone(),
132 ColumnId::placeholder(),
133 data_type.clone(),
134 snapshot_value,
135 )
136 } else {
137 ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
138 };
139
140 column_descs.push(column_desc);
141 if matches!(col.key, ColumnKey::Primary) {
142 pk_names.push(col_name);
143 }
144 }
145
146 if pk_names.is_empty() {
147 return Err(anyhow!("MySQL table doesn't define the primary key").into());
148 }
149 Ok(Self {
150 column_descs,
151 pk_names,
152 })
153 }
154
155 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
156 &self.column_descs
157 }
158
159 pub fn pk_names(&self) -> &Vec<String> {
160 &self.pk_names
161 }
162}
163
164fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
165 let datum = match default {
166 ColumnDefault::Null => None,
167 ColumnDefault::Int(val) => match data_type {
168 DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
169 DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
170 DataType::Int64 => Some(ScalarImpl::Int64(val)),
171 DataType::Varchar => {
172 Some(ScalarImpl::from(val.to_string()))
174 }
175 _ => bail!("unexpected default value type for integer"),
176 },
177 ColumnDefault::Real(val) => match data_type {
178 DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
179 DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
180 DataType::Decimal => Some(ScalarImpl::Decimal(
181 Decimal::try_from(val).context("failed to convert default value to decimal")?,
182 )),
183 _ => bail!("unexpected default value type for real"),
184 },
185 ColumnDefault::String(mut val) => {
186 if data_type == &DataType::Timestamptz {
189 val = timestamp_val_to_timestamptz(val.as_str())?;
190 }
191 Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
192 "failed to parse mysql default value expression, only constant is supported",
193 )?)
194 }
195 ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
196 bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
197 }
198 };
199 Ok(datum)
200}
201
202pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
203 let format = "%Y-%m-%d %H:%M:%S";
204 let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
205 .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
206 let postgres_timestamptz: DateTime<chrono::Utc> =
207 DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
208 Ok(postgres_timestamptz
209 .format("%Y-%m-%d %H:%M:%S%:z")
210 .to_string())
211}
212
213pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
214 macro_rules! column_type {
215 ($($name:literal => $variant:ident),* $(,)?) => {
216 match ty_name.to_lowercase().as_str() {
217 $(
218 $name => Some(ColumnType::$variant(Default::default())),
219 )*
220 "json" => Some(ColumnType::Json),
221 "date" => Some(ColumnType::Date),
222 "bool" => Some(ColumnType::Bool),
223 "tinyblob" => Some(ColumnType::TinyBlob),
224 "mediumblob" => Some(ColumnType::MediumBlob),
225 "longblob" => Some(ColumnType::LongBlob),
226 _ => None,
227 }
228 };
229 }
230
231 column_type! {
232 "bit" => Bit,
233 "tinyint" => TinyInt,
234 "smallint" => SmallInt,
235 "mediumint" => MediumInt,
236 "int" => Int,
237 "bigint" => BigInt,
238 "decimal" => Decimal,
239 "float" => Float,
240 "double" => Double,
241 "time" => Time,
242 "datetime" => DateTime,
243 "timestamp" => Timestamp,
244 "char" => Char,
245 "nchar" => NChar,
246 "varchar" => Varchar,
247 "nvarchar" => NVarchar,
248 "binary" => Binary,
249 "varbinary" => Varbinary,
250 "text" => Text,
251 "tinytext" => TinyText,
252 "mediumtext" => MediumText,
253 "longtext" => LongText,
254 "blob" => Blob,
255 "enum" => Enum,
256 "set" => Set,
257 "geometry" => Geometry,
258 "point" => Point,
259 "linestring" => LineString,
260 "polygon" => Polygon,
261 "multipoint" => MultiPoint,
262 "multilinestring" => MultiLineString,
263 "multipolygon" => MultiPolygon,
264 "geometrycollection" => GeometryCollection,
265 }
266}
267
268pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
269 let dtype = match col_type {
270 ColumnType::Serial => DataType::Int32,
271 ColumnType::Bit(attr) => {
272 if let Some(1) = attr.maximum {
273 DataType::Boolean
274 } else {
275 return Err(
276 anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
277 );
278 }
279 }
280 ColumnType::TinyInt(_) | ColumnType::SmallInt(_) => DataType::Int16,
281 ColumnType::Bool => DataType::Boolean,
282 ColumnType::MediumInt(_) => DataType::Int32,
283 ColumnType::Int(_) => DataType::Int32,
284 ColumnType::BigInt(_) => DataType::Int64,
285 ColumnType::Decimal(_) => DataType::Decimal,
286 ColumnType::Float(_) => DataType::Float32,
287 ColumnType::Double(_) => DataType::Float64,
288 ColumnType::Date => DataType::Date,
289 ColumnType::Time(_) => DataType::Time,
290 ColumnType::DateTime(_) => DataType::Timestamp,
291 ColumnType::Timestamp(_) => DataType::Timestamptz,
292 ColumnType::Year => DataType::Int32,
293 ColumnType::Char(_)
294 | ColumnType::NChar(_)
295 | ColumnType::Varchar(_)
296 | ColumnType::NVarchar(_) => DataType::Varchar,
297 ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
298 ColumnType::Text(_)
299 | ColumnType::TinyText(_)
300 | ColumnType::MediumText(_)
301 | ColumnType::LongText(_) => DataType::Varchar,
302 ColumnType::Blob(_)
303 | ColumnType::TinyBlob
304 | ColumnType::MediumBlob
305 | ColumnType::LongBlob => DataType::Bytea,
306 ColumnType::Enum(_) => DataType::Varchar,
307 ColumnType::Json => DataType::Jsonb,
308 ColumnType::Set(_) => {
309 return Err(anyhow!("SET type not supported").into());
310 }
311 ColumnType::Geometry(_) => {
312 return Err(anyhow!("GEOMETRY type not supported").into());
313 }
314 ColumnType::Point(_) => {
315 return Err(anyhow!("POINT type not supported").into());
316 }
317 ColumnType::LineString(_) => {
318 return Err(anyhow!("LINE string type not supported").into());
319 }
320 ColumnType::Polygon(_) => {
321 return Err(anyhow!("POLYGON type not supported").into());
322 }
323 ColumnType::MultiPoint(_) => {
324 return Err(anyhow!("MULTI POINT type not supported").into());
325 }
326 ColumnType::MultiLineString(_) => {
327 return Err(anyhow!("MULTI LINE STRING type not supported").into());
328 }
329 ColumnType::MultiPolygon(_) => {
330 return Err(anyhow!("MULTI POLYGON type not supported").into());
331 }
332 ColumnType::GeometryCollection(_) => {
333 return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
334 }
335 ColumnType::Unknown(_) => {
336 return Err(anyhow!("Unknown MySQL data type").into());
337 }
338 };
339
340 Ok(dtype)
341}
342
343pub struct MySqlExternalTableReader {
344 rw_schema: Schema,
345 field_names: String,
346 pool: mysql_async::Pool,
347 upstream_mysql_pk_infos: Vec<(String, String)>, mysql_version: (u8, u8),
349}
350
351impl ExternalTableReader for MySqlExternalTableReader {
352 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
353 let mut conn = self.pool.get_conn().await?;
354
355 let sql = if self.is_mysql_8_4_or_later() {
357 "SHOW BINARY LOG STATUS"
358 } else {
359 "SHOW MASTER STATUS"
360 };
361
362 tracing::debug!(
363 "Using SQL command: {} for MySQL version {}.{}",
364 sql,
365 self.mysql_version.0,
366 self.mysql_version.1
367 );
368 let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
369 let row = rs
370 .iter_mut()
371 .exactly_one()
372 .ok()
373 .context("expect exactly one row when reading binlog offset")?;
374 drop(conn);
375 Ok(CdcOffset::MySql(MySqlOffset {
376 filename: row.take("File").unwrap(),
377 position: row.take("Position").unwrap(),
378 }))
379 }
380
381 fn snapshot_read(
382 &self,
383 table_name: SchemaTableName,
384 start_pk: Option<OwnedRow>,
385 primary_keys: Vec<String>,
386 limit: u32,
387 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
388 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
389 }
390
391 async fn disconnect(self) -> ConnectorResult<()> {
392 self.pool.disconnect().await.map_err(|e| e.into())
393 }
394
395 fn get_parallel_cdc_splits(
396 &self,
397 _options: CdcTableSnapshotSplitOption,
398 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
399 stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
401 }
402
403 fn split_snapshot_read(
404 &self,
405 _table_name: SchemaTableName,
406 _left: OwnedRow,
407 _right: OwnedRow,
408 _split_columns: Vec<Field>,
409 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
410 todo!("implement MySQL CDC parallelized backfill")
411 }
412}
413
414impl MySqlExternalTableReader {
415 async fn get_mysql_version(pool: &mysql_async::Pool) -> ConnectorResult<(u8, u8)> {
417 let mut conn = pool.get_conn().await?;
418 let result: Option<String> = conn.query_first("SELECT VERSION()").await?;
419
420 if let Some(version_str) = result {
421 let parts: Vec<&str> = version_str.split('.').collect();
422 if parts.len() >= 2 {
423 let major_version = parts[0]
424 .parse::<u8>()
425 .context("Failed to parse major version")?;
426 let minor_version = parts[1]
427 .parse::<u8>()
428 .context("Failed to parse minor version")?;
429 return Ok((major_version, minor_version));
430 }
431 }
432 Err(anyhow!("Failed to get MySQL version").into())
433 }
434
435 fn is_mysql_8_4_or_later(&self) -> bool {
437 let (major, minor) = self.mysql_version;
438 major > 8 || (major == 8 && minor >= 4)
439 }
440
441 pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
442 let database = config.database.clone();
443 let table = config.table.clone();
444
445 let mut opts_builder = mysql_async::OptsBuilder::default()
446 .user(Some(config.username))
447 .pass(Some(config.password))
448 .ip_or_hostname(config.host)
449 .tcp_port(config.port.parse::<u16>().unwrap())
450 .db_name(Some(config.database));
451
452 opts_builder = match config.ssl_mode {
453 SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
454 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
456 let ssl_without_verify = mysql_async::SslOpts::default()
457 .with_danger_accept_invalid_certs(true)
458 .with_danger_skip_domain_validation(true);
459 opts_builder.ssl_opts(Some(ssl_without_verify))
460 }
461 };
462 let pool = mysql_async::Pool::new(opts_builder);
463
464 let field_names = rw_schema
465 .fields
466 .iter()
467 .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
468 .map(|f| Self::quote_column(f.name.as_str()))
469 .join(",");
470
471 let upstream_mysql_pk_infos =
473 Self::query_upstream_pk_infos(&pool, &database, &table).await?;
474 let mysql_version = Self::get_mysql_version(&pool).await?;
476 tracing::info!(
477 "MySQL version detected: {}.{}",
478 mysql_version.0,
479 mysql_version.1
480 );
481
482 Ok(Self {
483 rw_schema,
484 field_names,
485 pool,
486 upstream_mysql_pk_infos,
487 mysql_version,
488 })
489 }
490
491 pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
492 format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
494 }
495
496 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
497 Box::new(move |offset| {
498 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
499 offset,
500 )?))
501 })
502 }
503
504 async fn query_upstream_pk_infos(
506 pool: &mysql_async::Pool,
507 database: &str,
508 table: &str,
509 ) -> ConnectorResult<Vec<(String, String)>> {
510 let mut conn = pool.get_conn().await?;
511
512 let sql = format!(
514 "SELECT COLUMN_NAME, COLUMN_TYPE
515 FROM INFORMATION_SCHEMA.COLUMNS
516 WHERE TABLE_SCHEMA = '{}'
517 AND TABLE_NAME = '{}'
518 AND COLUMN_KEY = 'PRI'
519 ORDER BY ORDINAL_POSITION",
520 database, table
521 );
522
523 let rs = conn.query::<mysql_async::Row, _>(sql).await?;
524
525 let mut column_infos = Vec::new();
526 for row in &rs {
527 let column_name: String = row.get(0).unwrap();
528 let column_type: String = row.get(1).unwrap();
529 column_infos.push((column_name, column_type));
530 }
531
532 drop(conn);
533
534 Ok(column_infos)
535 }
536
537 fn is_unsigned_type(&self, column_name: &str) -> bool {
539 self.upstream_mysql_pk_infos
540 .iter()
541 .find(|(col_name, _)| col_name == column_name)
542 .map(|(_, col_type)| col_type.to_lowercase().contains("unsigned"))
543 .unwrap_or(false)
544 }
545
546 fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 {
548 negative_val as u64
549 }
550
551 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
552 async fn snapshot_read_inner(
553 &self,
554 table_name: SchemaTableName,
555 start_pk_row: Option<OwnedRow>,
556 primary_keys: Vec<String>,
557 limit: u32,
558 ) {
559 let order_key = primary_keys
560 .iter()
561 .map(|col| Self::quote_column(col))
562 .join(",");
563 let sql = if start_pk_row.is_none() {
564 format!(
565 "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
566 self.field_names,
567 Self::get_normalized_table_name(&table_name),
568 order_key,
569 )
570 } else {
571 let filter_expr = Self::filter_expression(&primary_keys);
572 format!(
573 "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
574 self.field_names,
575 Self::get_normalized_table_name(&table_name),
576 filter_expr,
577 order_key,
578 )
579 };
580 let mut conn = self.pool.get_conn().await?;
581 conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
583
584 if let Some(start_pk_row) = start_pk_row {
585 let field_map = self
586 .rw_schema
587 .fields
588 .iter()
589 .map(|f| (f.name.as_str(), f.data_type.clone()))
590 .collect::<HashMap<_, _>>();
591
592 let params: Vec<_> = primary_keys
594 .iter()
595 .zip_eq_fast(start_pk_row.into_iter())
596 .map(|(pk, datum)| {
597 if let Some(value) = datum {
598 let ty = field_map.get(pk.as_str()).unwrap();
599 let val = match ty {
600 DataType::Boolean => Value::from(value.into_bool()),
601 DataType::Int16 => Value::from(value.into_int16()),
602 DataType::Int32 => Value::from(value.into_int32()),
603 DataType::Int64 => {
604 let int64_val = value.into_int64();
605 if int64_val < 0 && self.is_unsigned_type(pk.as_str()) {
606 Value::from(self.convert_negative_to_unsigned(int64_val))
607 } else {
608 Value::from(int64_val)
609 }
610 }
611 DataType::Float32 => Value::from(value.into_float32().into_inner()),
612 DataType::Float64 => Value::from(value.into_float64().into_inner()),
613 DataType::Varchar => Value::from(String::from(value.into_utf8())),
614 DataType::Date => Value::from(value.into_date().0),
615 DataType::Time => Value::from(value.into_time().0),
616 DataType::Timestamp => Value::from(value.into_timestamp().0),
617 DataType::Decimal => Value::from(value.into_decimal().to_string()),
618 DataType::Timestamptz => {
619 let ts = value.into_timestamptz();
622 let datetime_utc = ts.to_datetime_utc();
623 let naive_datetime = datetime_utc.naive_utc();
624 Value::from(naive_datetime)
625 }
626 _ => bail!("unsupported primary key data type: {}", ty),
627 };
628 ConnectorResult::Ok((pk.to_lowercase(), val))
629 } else {
630 bail!("primary key {} cannot be null", pk);
631 }
632 })
633 .try_collect::<_, _, ConnectorError>()?;
634
635 tracing::debug!("snapshot read params: {:?}", ¶ms);
636 let rs_stream = sql
637 .with(Params::from(params))
638 .stream::<mysql_async::Row, _>(&mut conn)
639 .await?;
640
641 let row_stream = rs_stream.map(|row| {
642 let mut row = row?;
644 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
645 });
646 pin_mut!(row_stream);
647 #[for_await]
648 for row in row_stream {
649 let row = row?;
650 yield row;
651 }
652 } else {
653 let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
654 let row_stream = rs_stream.map(|row| {
655 let mut row = row?;
657 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
658 });
659 pin_mut!(row_stream);
660 #[for_await]
661 for row in row_stream {
662 let row = row?;
663 yield row;
664 }
665 }
666 drop(conn);
667 }
668
669 fn filter_expression(columns: &[String]) -> String {
673 let mut conditions = vec![];
674 conditions.push(format!(
676 "({} > :{})",
677 Self::quote_column(&columns[0]),
678 columns[0].to_lowercase()
679 ));
680 for i in 2..=columns.len() {
681 let mut condition = String::new();
683 for (j, col) in columns.iter().enumerate().take(i - 1) {
684 if j == 0 {
685 condition.push_str(&format!(
686 "({} = :{})",
687 Self::quote_column(col),
688 col.to_lowercase()
689 ));
690 } else {
691 condition.push_str(&format!(
692 " AND ({} = :{})",
693 Self::quote_column(col),
694 col.to_lowercase()
695 ));
696 }
697 }
698 condition.push_str(&format!(
700 " AND ({} > :{})",
701 Self::quote_column(&columns[i - 1]),
702 columns[i - 1].to_lowercase()
703 ));
704 conditions.push(format!("({})", condition));
705 }
706 if columns.len() > 1 {
707 conditions.join(" OR ")
708 } else {
709 conditions.join("")
710 }
711 }
712
713 fn quote_column(column: &str) -> String {
714 format!("`{}`", column)
715 }
716}
717
718#[cfg(test)]
719mod tests {
720 use std::collections::HashMap;
721
722 use futures::pin_mut;
723 use futures_async_stream::for_await;
724 use maplit::{convert_args, hashmap};
725 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
726 use risingwave_common::types::DataType;
727
728 use crate::source::cdc::external::mysql::MySqlExternalTable;
729 use crate::source::cdc::external::{
730 CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
731 SchemaTableName,
732 };
733
734 #[ignore]
735 #[tokio::test]
736 async fn test_mysql_schema() {
737 let config = ExternalTableConfig {
738 connector: "mysql-cdc".to_owned(),
739 host: "localhost".to_owned(),
740 port: "8306".to_owned(),
741 username: "root".to_owned(),
742 password: "123456".to_owned(),
743 database: "mydb".to_owned(),
744 schema: "".to_owned(),
745 table: "part".to_owned(),
746 ssl_mode: Default::default(),
747 ssl_root_cert: None,
748 encrypt: "false".to_owned(),
749 };
750
751 let table = MySqlExternalTable::connect(config).await.unwrap();
752 println!("columns: {:?}", &table.column_descs);
753 println!("primary keys: {:?}", &table.pk_names);
754 }
755
756 #[test]
757 fn test_mysql_filter_expr() {
758 let cols = vec!["id".to_owned()];
759 let expr = MySqlExternalTableReader::filter_expression(&cols);
760 assert_eq!(expr, "(`id` > :id)");
761
762 let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
763 let expr = MySqlExternalTableReader::filter_expression(&cols);
764 assert_eq!(
765 expr,
766 "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
767 );
768 }
769
770 #[test]
771 fn test_mysql_binlog_offset() {
772 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
773 let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
774 let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
775 let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
776 let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
777
778 let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
779 let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
780 let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
781 let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
782 let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
783
784 assert!(off0 <= off1);
785 assert!(off1 > off2);
786 assert!(off2 < off3);
787 assert_eq!(off3, off4);
788 }
789
790 #[ignore]
792 #[tokio::test]
793 async fn test_mysql_table_reader() {
794 let columns = [
795 ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
796 ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
797 ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
798 ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
799 ];
800 let rw_schema = Schema {
801 fields: columns.iter().map(Field::from).collect(),
802 };
803 let props: HashMap<String, String> = convert_args!(hashmap!(
804 "hostname" => "localhost",
805 "port" => "8306",
806 "username" => "root",
807 "password" => "123456",
808 "database.name" => "mytest",
809 "table.name" => "t1"));
810
811 let config =
812 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
813 .unwrap();
814 let reader = MySqlExternalTableReader::new(config, rw_schema)
815 .await
816 .unwrap();
817 let offset = reader.current_cdc_offset().await.unwrap();
818 println!("BinlogOffset: {:?}", offset);
819
820 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
821 let parser = MySqlExternalTableReader::get_cdc_offset_parser();
822 println!("parsed offset: {:?}", parser(off0_str).unwrap());
823 let table_name = SchemaTableName {
824 schema_name: "mytest".to_owned(),
825 table_name: "t1".to_owned(),
826 };
827
828 let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
829 pin_mut!(stream);
830 #[for_await]
831 for row in stream {
832 println!("OwnedRow: {:?}", row);
833 }
834 }
835}