1pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Field, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::catalog::table::CdcTableType as PbCdcTableType;
32use risingwave_pb::secret::PbSecretRef;
33use serde::{Deserialize, Serialize};
34
35use crate::WithPropertiesExt;
36use crate::connector_common::{PostgresExternalTable, SslMode};
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::parser::mysql_row_to_owned_row;
39use crate::source::CdcTableSnapshotSplit;
40use crate::source::cdc::CdcSourceType;
41use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
42use crate::source::cdc::external::mysql::{
43 MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
44};
45use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
46use crate::source::cdc::external::sql_server::{
47 SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
48};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum ExternalCdcTableType {
52 Undefined,
53 Mock,
54 MySql,
55 Postgres,
56 SqlServer,
57 Citus,
58 Mongo,
59}
60
61impl ExternalCdcTableType {
62 pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
63 let connector = with_properties.get_connector().unwrap_or_default();
64 match connector.as_str() {
65 "mysql-cdc" => Self::MySql,
66 "postgres-cdc" => Self::Postgres,
67 "citus-cdc" => Self::Citus,
68 "sqlserver-cdc" => Self::SqlServer,
69 "mongodb-cdc" => Self::Mongo,
70 _ => Self::Undefined,
71 }
72 }
73
74 pub fn can_backfill(&self) -> bool {
75 matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
76 }
77
78 pub fn enable_transaction_metadata(&self) -> bool {
79 matches!(self, Self::MySql | Self::Postgres)
83 }
84
85 pub async fn create_table_reader(
86 &self,
87 config: ExternalTableConfig,
88 schema: Schema,
89 pk_indices: Vec<usize>,
90 schema_table_name: SchemaTableName,
91 ) -> ConnectorResult<ExternalTableReaderImpl> {
92 match self {
93 Self::MySql => Ok(ExternalTableReaderImpl::MySql(
94 MySqlExternalTableReader::new(config, schema).await?,
95 )),
96 Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
97 PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name)
98 .await?,
99 )),
100 Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
101 SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
102 )),
103 Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
105 _ => bail!("invalid external table type: {:?}", *self),
106 }
107 }
108}
109
110impl From<ExternalCdcTableType> for PbCdcTableType {
111 fn from(cdc_table_type: ExternalCdcTableType) -> Self {
112 match cdc_table_type {
113 ExternalCdcTableType::Postgres => Self::Postgres,
114 ExternalCdcTableType::MySql => Self::Mysql,
115 ExternalCdcTableType::SqlServer => Self::Sqlserver,
116
117 ExternalCdcTableType::Citus => Self::Citus,
118 ExternalCdcTableType::Mongo => Self::Mongo,
119 ExternalCdcTableType::Undefined | ExternalCdcTableType::Mock => Self::Unspecified,
120 }
121 }
122}
123
124impl From<PbCdcTableType> for ExternalCdcTableType {
125 fn from(cdc_table_type: PbCdcTableType) -> Self {
126 match cdc_table_type {
127 PbCdcTableType::Postgres => Self::Postgres,
128 PbCdcTableType::Mysql => Self::MySql,
129 PbCdcTableType::Sqlserver => Self::SqlServer,
130 PbCdcTableType::Mongo => Self::Mongo,
131 PbCdcTableType::Citus => Self::Citus,
132 PbCdcTableType::Unspecified => Self::Undefined,
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq)]
138pub struct SchemaTableName {
139 pub schema_name: String,
141 pub table_name: String,
142}
143
144pub const TABLE_NAME_KEY: &str = "table.name";
145pub const SCHEMA_NAME_KEY: &str = "schema.name";
146pub const DATABASE_NAME_KEY: &str = "database.name";
147
148impl SchemaTableName {
149 pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
150 let table_type = ExternalCdcTableType::from_properties(properties);
151 let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
152
153 let schema_name = match table_type {
154 ExternalCdcTableType::MySql => properties
155 .get(DATABASE_NAME_KEY)
156 .cloned()
157 .unwrap_or_default(),
158 ExternalCdcTableType::Postgres | ExternalCdcTableType::Citus => {
159 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
160 }
161 ExternalCdcTableType::SqlServer => {
162 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
163 }
164 _ => {
165 unreachable!("invalid external table type: {:?}", table_type);
166 }
167 };
168
169 Self {
170 schema_name,
171 table_name,
172 }
173 }
174}
175
176#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
177pub enum CdcOffset {
178 MySql(MySqlOffset),
179 Postgres(PostgresOffset),
180 SqlServer(SqlServerOffset),
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct DebeziumOffset {
200 #[serde(rename = "sourcePartition")]
201 pub source_partition: HashMap<String, String>,
202 #[serde(rename = "sourceOffset")]
203 pub source_offset: DebeziumSourceOffset,
204 #[serde(rename = "isHeartbeat")]
205 pub is_heartbeat: bool,
206}
207
208#[derive(Debug, Default, Clone, Serialize, Deserialize)]
209pub struct DebeziumSourceOffset {
210 pub last_snapshot_record: Option<bool>,
212 pub snapshot: Option<bool>,
214
215 pub file: Option<String>,
217 pub pos: Option<u64>,
218
219 pub lsn: Option<u64>,
221 #[serde(rename = "txId")]
222 pub txid: Option<i64>,
223 pub tx_usec: Option<u64>,
224 pub lsn_commit: Option<u64>,
225 pub lsn_proc: Option<u64>,
226
227 pub commit_lsn: Option<String>,
229 pub change_lsn: Option<String>,
230}
231
232pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
233
234pub trait ExternalTableReader: Sized {
235 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
236
237 async fn disconnect(self) -> ConnectorResult<()> {
240 Ok(())
241 }
242
243 fn snapshot_read(
244 &self,
245 table_name: SchemaTableName,
246 start_pk: Option<OwnedRow>,
247 primary_keys: Vec<String>,
248 limit: u32,
249 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
250
251 fn get_parallel_cdc_splits(
252 &self,
253 options: CdcTableSnapshotSplitOption,
254 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>>;
255
256 fn split_snapshot_read(
257 &self,
258 table_name: SchemaTableName,
259 left: OwnedRow,
260 right: OwnedRow,
261 split_columns: Vec<Field>,
262 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
263}
264
265pub struct CdcTableSnapshotSplitOption {
266 pub backfill_num_rows_per_split: u64,
267 pub backfill_as_even_splits: bool,
268 pub backfill_split_pk_column_index: u32,
269}
270
271pub enum ExternalTableReaderImpl {
272 MySql(MySqlExternalTableReader),
273 Postgres(PostgresExternalTableReader),
274 SqlServer(SqlServerExternalTableReader),
275 Mock(MockExternalTableReader),
276}
277
278#[derive(Debug, Default, Clone, Deserialize)]
279pub struct ExternalTableConfig {
280 pub connector: String,
281
282 #[serde(rename = "hostname")]
283 pub host: String,
284 pub port: String,
285 pub username: String,
286 pub password: String,
287 #[serde(rename = "database.name")]
288 pub database: String,
289 #[serde(rename = "schema.name", default = "Default::default")]
290 pub schema: String,
291 #[serde(rename = "table.name")]
292 pub table: String,
293 #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
297 #[serde(alias = "debezium.database.sslmode")]
298 pub ssl_mode: SslMode,
299
300 #[serde(rename = "ssl.root.cert")]
301 #[serde(alias = "debezium.database.sslrootcert")]
302 pub ssl_root_cert: Option<String>,
303
304 #[serde(rename = "database.encrypt", default = "Default::default")]
307 pub encrypt: String,
308}
309
310fn postgres_ssl_mode_default() -> SslMode {
311 SslMode::Disabled
313}
314
315impl ExternalTableConfig {
316 pub fn try_from_btreemap(
317 connect_properties: BTreeMap<String, String>,
318 secret_refs: BTreeMap<String, PbSecretRef>,
319 ) -> ConnectorResult<Self> {
320 let options_with_secret =
321 LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
322 let json_value = serde_json::to_value(options_with_secret)?;
323 let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
324 Ok(config)
325 }
326}
327
328impl ExternalTableReader for ExternalTableReaderImpl {
329 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
330 match self {
331 ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
332 ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
333 ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
334 ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
335 }
336 }
337
338 fn snapshot_read(
339 &self,
340 table_name: SchemaTableName,
341 start_pk: Option<OwnedRow>,
342 primary_keys: Vec<String>,
343 limit: u32,
344 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
345 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
346 }
347
348 fn get_parallel_cdc_splits(
349 &self,
350 options: CdcTableSnapshotSplitOption,
351 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
352 self.get_parallel_cdc_splits_inner(options)
353 }
354
355 fn split_snapshot_read(
356 &self,
357 table_name: SchemaTableName,
358 left: OwnedRow,
359 right: OwnedRow,
360 split_columns: Vec<Field>,
361 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
362 self.split_snapshot_read_inner(table_name, left, right, split_columns)
363 }
364}
365
366impl ExternalTableReaderImpl {
367 pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
368 match self {
369 ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
370 ExternalTableReaderImpl::Postgres(_) => {
371 PostgresExternalTableReader::get_cdc_offset_parser()
372 }
373 ExternalTableReaderImpl::SqlServer(_) => {
374 SqlServerExternalTableReader::get_cdc_offset_parser()
375 }
376 ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
377 }
378 }
379
380 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
381 async fn snapshot_read_inner(
382 &self,
383 table_name: SchemaTableName,
384 start_pk: Option<OwnedRow>,
385 primary_keys: Vec<String>,
386 limit: u32,
387 ) {
388 let stream = match self {
389 ExternalTableReaderImpl::MySql(mysql) => {
390 mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
391 }
392 ExternalTableReaderImpl::Postgres(postgres) => {
393 postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
394 }
395 ExternalTableReaderImpl::SqlServer(sql_server) => {
396 sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
397 }
398 ExternalTableReaderImpl::Mock(mock) => {
399 mock.snapshot_read(table_name, start_pk, primary_keys, limit)
400 }
401 };
402
403 pin_mut!(stream);
404 #[for_await]
405 for row in stream {
406 let row = row?;
407 yield row;
408 }
409 }
410
411 #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
412 async fn get_parallel_cdc_splits_inner(&self, options: CdcTableSnapshotSplitOption) {
413 let stream = match self {
414 ExternalTableReaderImpl::MySql(e) => e.get_parallel_cdc_splits(options),
415 ExternalTableReaderImpl::Postgres(e) => e.get_parallel_cdc_splits(options),
416 ExternalTableReaderImpl::SqlServer(e) => e.get_parallel_cdc_splits(options),
417 ExternalTableReaderImpl::Mock(e) => e.get_parallel_cdc_splits(options),
418 };
419 pin_mut!(stream);
420 #[for_await]
421 for row in stream {
422 let row = row?;
423 yield row;
424 }
425 }
426
427 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
428 async fn split_snapshot_read_inner(
429 &self,
430 table_name: SchemaTableName,
431 left: OwnedRow,
432 right: OwnedRow,
433 split_columns: Vec<Field>,
434 ) {
435 let stream = match self {
436 ExternalTableReaderImpl::MySql(mysql) => {
437 mysql.split_snapshot_read(table_name, left, right, split_columns)
438 }
439 ExternalTableReaderImpl::Postgres(postgres) => {
440 postgres.split_snapshot_read(table_name, left, right, split_columns)
441 }
442 ExternalTableReaderImpl::SqlServer(sql_server) => {
443 sql_server.split_snapshot_read(table_name, left, right, split_columns)
444 }
445 ExternalTableReaderImpl::Mock(mock) => {
446 mock.split_snapshot_read(table_name, left, right, split_columns)
447 }
448 };
449
450 pin_mut!(stream);
451 #[for_await]
452 for row in stream {
453 let row = row?;
454 yield row;
455 }
456 }
457}
458
459pub enum ExternalTableImpl {
460 MySql(MySqlExternalTable),
461 Postgres(PostgresExternalTable),
462 SqlServer(SqlServerExternalTable),
463}
464
465impl ExternalTableImpl {
466 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
467 let cdc_source_type = CdcSourceType::from(config.connector.as_str());
468 match cdc_source_type {
469 CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
470 MySqlExternalTable::connect(config).await?,
471 )),
472 CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
473 PostgresExternalTable::connect(
474 &config.username,
475 &config.password,
476 &config.host,
477 config.port.parse::<u16>().unwrap(),
478 &config.database,
479 &config.schema,
480 &config.table,
481 &config.ssl_mode,
482 &config.ssl_root_cert,
483 false,
484 )
485 .await?,
486 )),
487 CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
488 SqlServerExternalTable::connect(config).await?,
489 )),
490 _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
491 }
492 }
493
494 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
495 match self {
496 ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
497 ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
498 ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
499 }
500 }
501
502 pub fn pk_names(&self) -> &Vec<String> {
503 match self {
504 ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
505 ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
506 ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
507 }
508 }
509}
510
511pub const CDC_TABLE_SPLIT_ID_START: i64 = 1;