1use std::collections::HashMap;
16
17use anyhow::{Context, anyhow};
18use chrono::{DateTime, NaiveDateTime};
19use futures::stream::BoxStream;
20use futures::{StreamExt, pin_mut, stream};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use mysql_async::prelude::*;
24use mysql_common::params::Params;
25use mysql_common::value::Value;
26use risingwave_common::bail;
27use risingwave_common::catalog::{CDC_OFFSET_COLUMN_NAME, ColumnDesc, ColumnId, Field, Schema};
28use risingwave_common::row::OwnedRow;
29use risingwave_common::types::{DataType, Datum, Decimal, F32, ScalarImpl};
30use risingwave_common::util::iter_util::ZipEqFast;
31use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType, NumericAttr};
32use sea_schema::mysql::discovery::SchemaDiscovery;
33use sea_schema::mysql::query::SchemaQueryBuilder;
34use sea_schema::sea_query::{Alias, IntoIden};
35use serde::{Deserialize, Serialize};
36use sqlx::MySqlPool;
37use sqlx::mysql::MySqlConnectOptions;
38use thiserror_ext::AsReport;
39
40use crate::connector_common::SslMode;
41pub use crate::connector_common::SslMode as MySqlSslMode;
43use crate::error::{ConnectorError, ConnectorResult};
44use crate::source::CdcTableSnapshotSplit;
45use crate::source::cdc::external::{
46 CdcOffset, CdcOffsetParseFunc, CdcTableSnapshotSplitOption, DebeziumOffset,
47 ExternalTableConfig, ExternalTableReader, SchemaTableName, mysql_row_to_owned_row,
48};
49
50pub fn build_mysql_connection_pool(
67 host: &str,
68 port: u16,
69 username: &str,
70 password: &str,
71 database: &str,
72 ssl_mode: SslMode,
73) -> mysql_async::Pool {
74 let mut opts_builder = mysql_async::OptsBuilder::default()
75 .user(Some(username))
76 .pass(Some(password))
77 .ip_or_hostname(host)
78 .tcp_port(port)
79 .db_name(Some(database));
80
81 opts_builder = match ssl_mode {
82 SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
83 SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
85 let ssl_without_verify = mysql_async::SslOpts::default()
86 .with_danger_accept_invalid_certs(true)
87 .with_danger_skip_domain_validation(true);
88 opts_builder.ssl_opts(Some(ssl_without_verify))
89 }
90 };
91
92 mysql_async::Pool::new(opts_builder)
93}
94
95#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Serialize, Deserialize)]
96pub struct MySqlOffset {
97 pub filename: String,
98 pub position: u64,
99}
100
101impl MySqlOffset {
102 pub fn new(filename: String, position: u64) -> Self {
103 Self { filename, position }
104 }
105}
106
107impl MySqlOffset {
108 pub fn parse_debezium_offset(offset: &str) -> ConnectorResult<Self> {
109 let dbz_offset: DebeziumOffset = serde_json::from_str(offset)
110 .with_context(|| format!("invalid upstream offset: {}", offset))?;
111
112 Ok(Self {
113 filename: dbz_offset
114 .source_offset
115 .file
116 .context("binlog file not found in offset")?,
117 position: dbz_offset
118 .source_offset
119 .pos
120 .context("binlog position not found in offset")?,
121 })
122 }
123}
124
125pub struct MySqlExternalTable {
126 column_descs: Vec<ColumnDesc>,
127 pk_names: Vec<String>,
128}
129
130impl MySqlExternalTable {
131 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
132 tracing::debug!("connect to mysql");
133 let options = MySqlConnectOptions::new()
134 .username(&config.username)
135 .password(&config.password)
136 .host(&config.host)
137 .port(config.port.parse::<u16>().unwrap())
138 .database(&config.database)
139 .ssl_mode(match config.ssl_mode {
140 SslMode::Disabled => sqlx::mysql::MySqlSslMode::Disabled,
141 SslMode::Preferred => sqlx::mysql::MySqlSslMode::Preferred,
142 SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
143 _ => {
144 return Err(anyhow!("unsupported SSL mode").into());
145 }
146 });
147
148 let connection = MySqlPool::connect_with(options).await?;
149 let mut schema_discovery = SchemaDiscovery::new(connection, config.database.as_str());
150
151 let system_info = schema_discovery.discover_system().await?;
153 schema_discovery.query = SchemaQueryBuilder::new(system_info.clone());
154 let schema = Alias::new(config.database.as_str()).into_iden();
155 let table = Alias::new(config.table.as_str()).into_iden();
156 let columns = schema_discovery
157 .discover_columns(schema, table, &system_info)
158 .await?;
159 let mut column_descs = vec![];
160 let mut pk_names = vec![];
161 for col in columns {
162 let data_type = mysql_type_to_rw_type(&col.col_type)?;
163 let col_name = col.name.to_lowercase();
165 let column_desc = if let Some(default) = col.default {
166 let snapshot_value = derive_default_value(default.clone(), &data_type)
167 .unwrap_or_else(|e| {
168 tracing::warn!(
169 column = col_name,
170 ?default,
171 %data_type,
172 error = %e.as_report(),
173 "failed to derive column default value, fallback to `NULL`",
174 );
175 None
176 });
177
178 ColumnDesc::named_with_default_value(
179 col_name.clone(),
180 ColumnId::placeholder(),
181 data_type.clone(),
182 snapshot_value,
183 )
184 } else {
185 ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
186 };
187
188 column_descs.push(column_desc);
189 if matches!(col.key, ColumnKey::Primary) {
190 pk_names.push(col_name);
191 }
192 }
193
194 if pk_names.is_empty() {
195 return Err(anyhow!("MySQL table doesn't define the primary key").into());
196 }
197 Ok(Self {
198 column_descs,
199 pk_names,
200 })
201 }
202
203 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
204 &self.column_descs
205 }
206
207 pub fn pk_names(&self) -> &Vec<String> {
208 &self.pk_names
209 }
210}
211
212fn derive_default_value(default: ColumnDefault, data_type: &DataType) -> ConnectorResult<Datum> {
213 let datum = match default {
214 ColumnDefault::Null => None,
215 ColumnDefault::Int(val) => match data_type {
216 DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
217 DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
218 DataType::Int64 => Some(ScalarImpl::Int64(val)),
219 DataType::Varchar => {
220 Some(ScalarImpl::from(val.to_string()))
222 }
223 _ => bail!("unexpected default value type for integer"),
224 },
225 ColumnDefault::Real(val) => match data_type {
226 DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
227 DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
228 DataType::Decimal => Some(ScalarImpl::Decimal(
229 Decimal::try_from(val).context("failed to convert default value to decimal")?,
230 )),
231 _ => bail!("unexpected default value type for real"),
232 },
233 ColumnDefault::String(mut val) => {
234 if data_type == &DataType::Timestamptz {
237 val = timestamp_val_to_timestamptz(val.as_str())?;
238 }
239 Some(ScalarImpl::from_text(val.as_str(), data_type).map_err(|e| anyhow!(e)).context(
240 "failed to parse mysql default value expression, only constant is supported",
241 )?)
242 }
243 ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
244 bail!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported")
245 }
246 };
247 Ok(datum)
248}
249
250pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
251 let format = "%Y-%m-%d %H:%M:%S";
252 let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
253 .map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
254 let postgres_timestamptz: DateTime<chrono::Utc> =
255 DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
256 Ok(postgres_timestamptz
257 .format("%Y-%m-%d %H:%M:%S%:z")
258 .to_string())
259}
260
261pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
262 let ty = ty_name.trim().to_lowercase();
265 let base = ty
266 .split_whitespace()
267 .next()
268 .unwrap_or_default()
269 .split('(')
270 .next()
271 .unwrap_or_default();
272 let is_unsigned = ty.contains("unsigned");
273 let is_zero_fill = ty.contains("zerofill");
274
275 let make_numeric_attr = || {
276 let mut attr = NumericAttr::default();
277 if is_unsigned {
278 attr.unsigned = Some(true);
279 }
280 if is_zero_fill {
281 attr.zero_fill = Some(true);
282 }
283 attr
284 };
285
286 match base {
287 "bit" => Some(ColumnType::Bit(make_numeric_attr())),
288 "tinyint" => Some(ColumnType::TinyInt(make_numeric_attr())),
289 "smallint" => Some(ColumnType::SmallInt(make_numeric_attr())),
290 "mediumint" => Some(ColumnType::MediumInt(make_numeric_attr())),
291 "int" => Some(ColumnType::Int(make_numeric_attr())),
292 "bigint" => Some(ColumnType::BigInt(make_numeric_attr())),
293 "decimal" => Some(ColumnType::Decimal(make_numeric_attr())),
294 "float" => Some(ColumnType::Float(make_numeric_attr())),
295 "double" => Some(ColumnType::Double(make_numeric_attr())),
296 "time" => Some(ColumnType::Time(Default::default())),
297 "datetime" => Some(ColumnType::DateTime(Default::default())),
298 "timestamp" => Some(ColumnType::Timestamp(Default::default())),
299 "char" => Some(ColumnType::Char(Default::default())),
300 "nchar" => Some(ColumnType::NChar(Default::default())),
301 "varchar" => Some(ColumnType::Varchar(Default::default())),
302 "nvarchar" => Some(ColumnType::NVarchar(Default::default())),
303 "binary" => Some(ColumnType::Binary(Default::default())),
304 "varbinary" => Some(ColumnType::Varbinary(Default::default())),
305 "text" => Some(ColumnType::Text(Default::default())),
306 "tinytext" => Some(ColumnType::TinyText(Default::default())),
307 "mediumtext" => Some(ColumnType::MediumText(Default::default())),
308 "longtext" => Some(ColumnType::LongText(Default::default())),
309 "blob" => Some(ColumnType::Blob(Default::default())),
310 "tinyblob" => Some(ColumnType::TinyBlob),
311 "mediumblob" => Some(ColumnType::MediumBlob),
312 "longblob" => Some(ColumnType::LongBlob),
313 "enum" => Some(ColumnType::Enum(Default::default())),
314 "set" => Some(ColumnType::Set(Default::default())),
315 "json" => Some(ColumnType::Json),
316 "date" => Some(ColumnType::Date),
317 "bool" => Some(ColumnType::Bool),
318 "geometry" => Some(ColumnType::Geometry(Default::default())),
319 "point" => Some(ColumnType::Point(Default::default())),
320 "linestring" => Some(ColumnType::LineString(Default::default())),
321 "polygon" => Some(ColumnType::Polygon(Default::default())),
322 "multipoint" => Some(ColumnType::MultiPoint(Default::default())),
323 "multilinestring" => Some(ColumnType::MultiLineString(Default::default())),
324 "multipolygon" => Some(ColumnType::MultiPolygon(Default::default())),
325 "geometrycollection" => Some(ColumnType::GeometryCollection(Default::default())),
326 _ => None,
327 }
328}
329
330pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult<DataType> {
331 let dtype = match col_type {
332 ColumnType::Serial => DataType::Int32,
333 ColumnType::Bit(attr) => {
334 if let Some(1) = attr.maximum {
335 DataType::Boolean
336 } else {
337 return Err(
338 anyhow!("BIT({}) type not supported", attr.maximum.unwrap_or(0)).into(),
339 );
340 }
341 }
342 ColumnType::TinyInt(_) => DataType::Int16,
344 ColumnType::SmallInt(attr) => {
345 if attr.unsigned == Some(true) {
346 DataType::Int32
347 } else {
348 DataType::Int16
349 }
350 }
351 ColumnType::Bool => DataType::Boolean,
352 ColumnType::MediumInt(_) => DataType::Int32,
353 ColumnType::Int(attr) => {
354 if attr.unsigned == Some(true) {
355 DataType::Int64
356 } else {
357 DataType::Int32
358 }
359 }
360 ColumnType::BigInt(attr) => {
361 if attr.unsigned == Some(true) {
362 DataType::Decimal
363 } else {
364 DataType::Int64
365 }
366 }
367 ColumnType::Decimal(_) => DataType::Decimal,
368 ColumnType::Float(_) => DataType::Float32,
369 ColumnType::Double(_) => DataType::Float64,
370 ColumnType::Date => DataType::Date,
371 ColumnType::Time(_) => DataType::Time,
372 ColumnType::DateTime(_) => DataType::Timestamp,
373 ColumnType::Timestamp(_) => DataType::Timestamptz,
374 ColumnType::Year => DataType::Int32,
375 ColumnType::Char(_)
376 | ColumnType::NChar(_)
377 | ColumnType::Varchar(_)
378 | ColumnType::NVarchar(_) => DataType::Varchar,
379 ColumnType::Binary(_) | ColumnType::Varbinary(_) => DataType::Bytea,
380 ColumnType::Text(_)
381 | ColumnType::TinyText(_)
382 | ColumnType::MediumText(_)
383 | ColumnType::LongText(_) => DataType::Varchar,
384 ColumnType::Blob(_)
385 | ColumnType::TinyBlob
386 | ColumnType::MediumBlob
387 | ColumnType::LongBlob => DataType::Bytea,
388 ColumnType::Enum(_) => DataType::Varchar,
389 ColumnType::Json => DataType::Jsonb,
390 ColumnType::Set(_) => {
391 return Err(anyhow!("SET type not supported").into());
392 }
393 ColumnType::Geometry(_) => {
394 return Err(anyhow!("GEOMETRY type not supported").into());
395 }
396 ColumnType::Point(_) => {
397 return Err(anyhow!("POINT type not supported").into());
398 }
399 ColumnType::LineString(_) => {
400 return Err(anyhow!("LINE string type not supported").into());
401 }
402 ColumnType::Polygon(_) => {
403 return Err(anyhow!("POLYGON type not supported").into());
404 }
405 ColumnType::MultiPoint(_) => {
406 return Err(anyhow!("MULTI POINT type not supported").into());
407 }
408 ColumnType::MultiLineString(_) => {
409 return Err(anyhow!("MULTI LINE STRING type not supported").into());
410 }
411 ColumnType::MultiPolygon(_) => {
412 return Err(anyhow!("MULTI POLYGON type not supported").into());
413 }
414 ColumnType::GeometryCollection(_) => {
415 return Err(anyhow!("GEOMETRY COLLECTION type not supported").into());
416 }
417 ColumnType::Unknown(_) => {
418 return Err(anyhow!("Unknown MySQL data type").into());
419 }
420 };
421
422 Ok(dtype)
423}
424
425pub struct MySqlExternalTableReader {
426 rw_schema: Schema,
427 field_names: String,
428 pool: mysql_async::Pool,
429 upstream_mysql_pk_infos: Vec<(String, String)>, mysql_version: (u8, u8),
431}
432
433impl ExternalTableReader for MySqlExternalTableReader {
434 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
435 let mut conn = self.pool.get_conn().await?;
436
437 let sql = if self.is_mysql_8_4_or_later() {
439 "SHOW BINARY LOG STATUS"
440 } else {
441 "SHOW MASTER STATUS"
442 };
443
444 tracing::debug!(
445 "Using SQL command: {} for MySQL version {}.{}",
446 sql,
447 self.mysql_version.0,
448 self.mysql_version.1
449 );
450 let mut rs = conn.query::<mysql_async::Row, _>(sql).await?;
451 let row = rs
452 .iter_mut()
453 .exactly_one()
454 .ok()
455 .context("expect exactly one row when reading binlog offset")?;
456 drop(conn);
457 Ok(CdcOffset::MySql(MySqlOffset {
458 filename: row.take("File").unwrap(),
459 position: row.take("Position").unwrap(),
460 }))
461 }
462
463 fn snapshot_read(
464 &self,
465 table_name: SchemaTableName,
466 start_pk: Option<OwnedRow>,
467 primary_keys: Vec<String>,
468 limit: u32,
469 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
470 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
471 }
472
473 async fn disconnect(self) -> ConnectorResult<()> {
474 self.pool.disconnect().await.map_err(|e| e.into())
475 }
476
477 fn get_parallel_cdc_splits(
478 &self,
479 _options: CdcTableSnapshotSplitOption,
480 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
481 stream::empty::<ConnectorResult<CdcTableSnapshotSplit>>().boxed()
483 }
484
485 fn split_snapshot_read(
486 &self,
487 _table_name: SchemaTableName,
488 _left: OwnedRow,
489 _right: OwnedRow,
490 _split_columns: Vec<Field>,
491 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
492 todo!("implement MySQL CDC parallelized backfill")
493 }
494}
495
496impl MySqlExternalTableReader {
497 async fn get_mysql_version(pool: &mysql_async::Pool) -> ConnectorResult<(u8, u8)> {
499 let mut conn = pool.get_conn().await?;
500 let result: Option<String> = conn.query_first("SELECT VERSION()").await?;
501
502 if let Some(version_str) = result {
503 let parts: Vec<&str> = version_str.split('.').collect();
504 if parts.len() >= 2 {
505 let major_version = parts[0]
506 .parse::<u8>()
507 .context("Failed to parse major version")?;
508 let minor_version = parts[1]
509 .parse::<u8>()
510 .context("Failed to parse minor version")?;
511 return Ok((major_version, minor_version));
512 }
513 }
514 Err(anyhow!("Failed to get MySQL version").into())
515 }
516
517 fn is_mysql_8_4_or_later(&self) -> bool {
519 let (major, minor) = self.mysql_version;
520 major > 8 || (major == 8 && minor >= 4)
521 }
522
523 pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult<Self> {
524 let database = config.database.clone();
525 let table = config.table.clone();
526 let pool = build_mysql_connection_pool(
527 &config.host,
528 config.port.parse::<u16>().unwrap(),
529 &config.username,
530 &config.password,
531 &config.database,
532 config.ssl_mode,
533 );
534
535 let field_names = rw_schema
536 .fields
537 .iter()
538 .filter(|f| f.name != CDC_OFFSET_COLUMN_NAME)
539 .map(|f| Self::quote_column(f.name.as_str()))
540 .join(",");
541
542 let upstream_mysql_pk_infos =
544 Self::query_upstream_pk_infos(&pool, &database, &table).await?;
545 let mysql_version = Self::get_mysql_version(&pool).await?;
547 tracing::info!(
548 "MySQL version detected: {}.{}",
549 mysql_version.0,
550 mysql_version.1
551 );
552
553 Ok(Self {
554 rw_schema,
555 field_names,
556 pool,
557 upstream_mysql_pk_infos,
558 mysql_version,
559 })
560 }
561
562 pub fn get_normalized_table_name(table_name: &SchemaTableName) -> String {
563 format!("`{}`.`{}`", table_name.schema_name, table_name.table_name)
565 }
566
567 pub fn get_cdc_offset_parser() -> CdcOffsetParseFunc {
568 Box::new(move |offset| {
569 Ok(CdcOffset::MySql(MySqlOffset::parse_debezium_offset(
570 offset,
571 )?))
572 })
573 }
574
575 async fn query_upstream_pk_infos(
577 pool: &mysql_async::Pool,
578 database: &str,
579 table: &str,
580 ) -> ConnectorResult<Vec<(String, String)>> {
581 let mut conn = pool.get_conn().await?;
582
583 let sql = format!(
585 "SELECT COLUMN_NAME, COLUMN_TYPE
586 FROM INFORMATION_SCHEMA.COLUMNS
587 WHERE TABLE_SCHEMA = '{}'
588 AND TABLE_NAME = '{}'
589 AND COLUMN_KEY = 'PRI'
590 ORDER BY ORDINAL_POSITION",
591 database, table
592 );
593
594 let rs = conn.query::<mysql_async::Row, _>(sql).await?;
595
596 let mut column_infos = Vec::new();
597 for row in &rs {
598 let column_name: String = row.get(0).unwrap();
599 let column_type: String = row.get(1).unwrap();
600 column_infos.push((column_name, column_type));
601 }
602
603 drop(conn);
604
605 Ok(column_infos)
606 }
607
608 fn is_unsigned_type(&self, column_name: &str) -> bool {
610 self.upstream_mysql_pk_infos
611 .iter()
612 .find(|(col_name, _)| col_name == column_name)
613 .map(|(_, col_type)| col_type.to_lowercase().contains("unsigned"))
614 .unwrap_or(false)
615 }
616
617 fn convert_negative_to_unsigned(&self, negative_val: i64) -> u64 {
619 negative_val as u64
620 }
621
622 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
623 async fn snapshot_read_inner(
624 &self,
625 table_name: SchemaTableName,
626 start_pk_row: Option<OwnedRow>,
627 primary_keys: Vec<String>,
628 limit: u32,
629 ) {
630 let order_key = primary_keys
631 .iter()
632 .map(|col| Self::quote_column(col))
633 .join(",");
634 let sql = if start_pk_row.is_none() {
635 format!(
636 "SELECT {} FROM {} ORDER BY {} LIMIT {limit}",
637 self.field_names,
638 Self::get_normalized_table_name(&table_name),
639 order_key,
640 )
641 } else {
642 let filter_expr = Self::filter_expression(&primary_keys);
643 format!(
644 "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {limit}",
645 self.field_names,
646 Self::get_normalized_table_name(&table_name),
647 filter_expr,
648 order_key,
649 )
650 };
651 let mut conn = self.pool.get_conn().await?;
652 conn.exec_drop("SET time_zone = \"+00:00\"", ()).await?;
654
655 if let Some(start_pk_row) = start_pk_row {
656 let field_map = self
657 .rw_schema
658 .fields
659 .iter()
660 .map(|f| (f.name.as_str(), f.data_type.clone()))
661 .collect::<HashMap<_, _>>();
662
663 let params: Vec<_> = primary_keys
665 .iter()
666 .zip_eq_fast(start_pk_row.into_iter())
667 .map(|(pk, datum)| {
668 if let Some(value) = datum {
669 let ty = field_map.get(pk.as_str()).unwrap();
670 let val = match ty {
671 DataType::Boolean => Value::from(value.into_bool()),
672 DataType::Int16 => Value::from(value.into_int16()),
673 DataType::Int32 => Value::from(value.into_int32()),
674 DataType::Int64 => {
675 let int64_val = value.into_int64();
676 if int64_val < 0 && self.is_unsigned_type(pk.as_str()) {
677 Value::from(self.convert_negative_to_unsigned(int64_val))
678 } else {
679 Value::from(int64_val)
680 }
681 }
682 DataType::Float32 => Value::from(value.into_float32().into_inner()),
683 DataType::Float64 => Value::from(value.into_float64().into_inner()),
684 DataType::Varchar => Value::from(String::from(value.into_utf8())),
685 DataType::Date => Value::from(value.into_date().0),
686 DataType::Time => Value::from(value.into_time().0),
687 DataType::Timestamp => Value::from(value.into_timestamp().0),
688 DataType::Decimal => Value::from(value.into_decimal().to_string()),
689 DataType::Timestamptz => {
690 let ts = value.into_timestamptz();
693 let datetime_utc = ts.to_datetime_utc();
694 let naive_datetime = datetime_utc.naive_utc();
695 Value::from(naive_datetime)
696 }
697 _ => bail!("unsupported primary key data type: {}", ty),
698 };
699 ConnectorResult::Ok((pk.to_lowercase(), val))
700 } else {
701 bail!("primary key {} cannot be null", pk);
702 }
703 })
704 .try_collect::<_, _, ConnectorError>()?;
705
706 tracing::debug!("snapshot read params: {:?}", ¶ms);
707 let rs_stream = sql
708 .with(Params::from(params))
709 .stream::<mysql_async::Row, _>(&mut conn)
710 .await?;
711
712 let row_stream = rs_stream.map(|row| {
713 let mut row = row?;
715 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
716 });
717 pin_mut!(row_stream);
718 #[for_await]
719 for row in row_stream {
720 let row = row?;
721 yield row;
722 }
723 } else {
724 let rs_stream = sql.stream::<mysql_async::Row, _>(&mut conn).await?;
725 let row_stream = rs_stream.map(|row| {
726 let mut row = row?;
728 Ok::<_, ConnectorError>(mysql_row_to_owned_row(&mut row, &self.rw_schema))
729 });
730 pin_mut!(row_stream);
731 #[for_await]
732 for row in row_stream {
733 let row = row?;
734 yield row;
735 }
736 }
737 drop(conn);
738 }
739
740 fn filter_expression(columns: &[String]) -> String {
744 let mut conditions = vec![];
745 conditions.push(format!(
747 "({} > :{})",
748 Self::quote_column(&columns[0]),
749 columns[0].to_lowercase()
750 ));
751 for i in 2..=columns.len() {
752 let mut condition = String::new();
754 for (j, col) in columns.iter().enumerate().take(i - 1) {
755 if j == 0 {
756 condition.push_str(&format!(
757 "({} = :{})",
758 Self::quote_column(col),
759 col.to_lowercase()
760 ));
761 } else {
762 condition.push_str(&format!(
763 " AND ({} = :{})",
764 Self::quote_column(col),
765 col.to_lowercase()
766 ));
767 }
768 }
769 condition.push_str(&format!(
771 " AND ({} > :{})",
772 Self::quote_column(&columns[i - 1]),
773 columns[i - 1].to_lowercase()
774 ));
775 conditions.push(format!("({})", condition));
776 }
777 if columns.len() > 1 {
778 conditions.join(" OR ")
779 } else {
780 conditions.join("")
781 }
782 }
783
784 fn quote_column(column: &str) -> String {
785 format!("`{}`", column)
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use std::collections::HashMap;
792
793 use futures::pin_mut;
794 use futures_async_stream::for_await;
795 use maplit::{convert_args, hashmap};
796 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema};
797 use risingwave_common::types::DataType;
798
799 use crate::source::cdc::external::mysql::MySqlExternalTable;
800 use crate::source::cdc::external::{
801 CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset,
802 SchemaTableName,
803 };
804
805 #[ignore]
806 #[tokio::test]
807 async fn test_mysql_schema() {
808 let config = ExternalTableConfig {
809 connector: "mysql-cdc".to_owned(),
810 host: "localhost".to_owned(),
811 port: "8306".to_owned(),
812 username: "root".to_owned(),
813 password: "123456".to_owned(),
814 database: "mydb".to_owned(),
815 schema: "".to_owned(),
816 table: "part".to_owned(),
817 ssl_mode: Default::default(),
818 ssl_root_cert: None,
819 encrypt: "false".to_owned(),
820 };
821
822 let table = MySqlExternalTable::connect(config).await.unwrap();
823 println!("columns: {:?}", &table.column_descs);
824 println!("primary keys: {:?}", &table.pk_names);
825 }
826
827 #[test]
828 fn test_mysql_filter_expr() {
829 let cols = vec!["id".to_owned()];
830 let expr = MySqlExternalTableReader::filter_expression(&cols);
831 assert_eq!(expr, "(`id` > :id)");
832
833 let cols = vec!["aa".to_owned(), "bb".to_owned(), "cc".to_owned()];
834 let expr = MySqlExternalTableReader::filter_expression(&cols);
835 assert_eq!(
836 expr,
837 "(`aa` > :aa) OR ((`aa` = :aa) AND (`bb` > :bb)) OR ((`aa` = :aa) AND (`bb` = :bb) AND (`cc` > :cc))"
838 );
839 }
840
841 #[test]
842 fn test_mysql_binlog_offset() {
843 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
844 let off1_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 1062363217, "snapshot": true }, "isHeartbeat": false }"#;
845 let off2_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000007", "pos": 659687560, "snapshot": true }, "isHeartbeat": false }"#;
846 let off3_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
847 let off4_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000008", "pos": 7665875, "snapshot": true }, "isHeartbeat": false }"#;
848
849 let off0 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off0_str).unwrap());
850 let off1 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off1_str).unwrap());
851 let off2 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off2_str).unwrap());
852 let off3 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off3_str).unwrap());
853 let off4 = CdcOffset::MySql(MySqlOffset::parse_debezium_offset(off4_str).unwrap());
854
855 assert!(off0 <= off1);
856 assert!(off1 > off2);
857 assert!(off2 < off3);
858 assert_eq!(off3, off4);
859 }
860
861 #[ignore]
863 #[tokio::test]
864 async fn test_mysql_table_reader() {
865 let columns = [
866 ColumnDesc::named("v1", ColumnId::new(1), DataType::Int32),
867 ColumnDesc::named("v2", ColumnId::new(2), DataType::Decimal),
868 ColumnDesc::named("v3", ColumnId::new(3), DataType::Varchar),
869 ColumnDesc::named("v4", ColumnId::new(4), DataType::Date),
870 ];
871 let rw_schema = Schema {
872 fields: columns.iter().map(Field::from).collect(),
873 };
874 let props: HashMap<String, String> = convert_args!(hashmap!(
875 "hostname" => "localhost",
876 "port" => "8306",
877 "username" => "root",
878 "password" => "123456",
879 "database.name" => "mytest",
880 "table.name" => "t1"));
881
882 let config =
883 serde_json::from_value::<ExternalTableConfig>(serde_json::to_value(props).unwrap())
884 .unwrap();
885 let reader = MySqlExternalTableReader::new(config, rw_schema)
886 .await
887 .unwrap();
888 let offset = reader.current_cdc_offset().await.unwrap();
889 println!("BinlogOffset: {:?}", offset);
890
891 let off0_str = r#"{ "sourcePartition": { "server": "test" }, "sourceOffset": { "ts_sec": 1670876905, "file": "binlog.000001", "pos": 105622, "snapshot": true }, "isHeartbeat": false }"#;
892 let parser = MySqlExternalTableReader::get_cdc_offset_parser();
893 println!("parsed offset: {:?}", parser(off0_str).unwrap());
894 let table_name = SchemaTableName {
895 schema_name: "mytest".to_owned(),
896 table_name: "t1".to_owned(),
897 };
898
899 let stream = reader.snapshot_read(table_name, None, vec!["v1".to_owned()], 1000);
900 pin_mut!(stream);
901 #[for_await]
902 for row in stream {
903 println!("OwnedRow: {:?}", row);
904 }
905 }
906}