1pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Field, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::catalog::table::CdcTableType as PbCdcTableType;
32use risingwave_pb::secret::PbSecretRef;
33use serde::{Deserialize, Serialize};
34
35use crate::WithPropertiesExt;
36use crate::connector_common::{PostgresExternalTable, SslMode};
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::parser::mysql_row_to_owned_row;
39use crate::source::CdcTableSnapshotSplit;
40use crate::source::cdc::CdcSourceType;
41use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
42use crate::source::cdc::external::mysql::{
43 MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
44};
45use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
46use crate::source::cdc::external::sql_server::{
47 SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
48};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum ExternalCdcTableType {
52 Undefined,
53 Mock,
54 MySql,
55 Postgres,
56 SqlServer,
57 Citus,
58 Mongo,
59}
60
61impl ExternalCdcTableType {
62 pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
63 let connector = with_properties.get_connector().unwrap_or_default();
64 match connector.as_str() {
65 "mysql-cdc" => Self::MySql,
66 "postgres-cdc" => Self::Postgres,
67 "citus-cdc" => Self::Citus,
68 "sqlserver-cdc" => Self::SqlServer,
69 "mongodb-cdc" => Self::Mongo,
70 _ => Self::Undefined,
71 }
72 }
73
74 pub fn can_backfill(&self) -> bool {
75 matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
76 }
77
78 pub fn enable_transaction_metadata(&self) -> bool {
79 matches!(self, Self::MySql | Self::Postgres)
83 }
84
85 pub fn shareable_only(&self) -> bool {
86 matches!(self, Self::SqlServer)
87 }
88
89 pub async fn create_table_reader(
90 &self,
91 config: ExternalTableConfig,
92 schema: Schema,
93 pk_indices: Vec<usize>,
94 schema_table_name: SchemaTableName,
95 ) -> ConnectorResult<ExternalTableReaderImpl> {
96 match self {
97 Self::MySql => Ok(ExternalTableReaderImpl::MySql(
98 MySqlExternalTableReader::new(config, schema)?,
99 )),
100 Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
101 PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name)
102 .await?,
103 )),
104 Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
105 SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
106 )),
107 Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
109 _ => bail!("invalid external table type: {:?}", *self),
110 }
111 }
112}
113
114impl From<ExternalCdcTableType> for PbCdcTableType {
115 fn from(cdc_table_type: ExternalCdcTableType) -> Self {
116 match cdc_table_type {
117 ExternalCdcTableType::Postgres => Self::Postgres,
118 ExternalCdcTableType::MySql => Self::Mysql,
119 ExternalCdcTableType::SqlServer => Self::Sqlserver,
120
121 ExternalCdcTableType::Citus => Self::Citus,
122 ExternalCdcTableType::Mongo => Self::Mongo,
123 ExternalCdcTableType::Undefined | ExternalCdcTableType::Mock => Self::Unspecified,
124 }
125 }
126}
127
128impl From<PbCdcTableType> for ExternalCdcTableType {
129 fn from(cdc_table_type: PbCdcTableType) -> Self {
130 match cdc_table_type {
131 PbCdcTableType::Postgres => Self::Postgres,
132 PbCdcTableType::Mysql => Self::MySql,
133 PbCdcTableType::Sqlserver => Self::SqlServer,
134 PbCdcTableType::Mongo => Self::Mongo,
135 PbCdcTableType::Citus => Self::Citus,
136 PbCdcTableType::Unspecified => Self::Undefined,
137 }
138 }
139}
140
141#[derive(Debug, Clone, PartialEq)]
142pub struct SchemaTableName {
143 pub schema_name: String,
145 pub table_name: String,
146}
147
148pub const TABLE_NAME_KEY: &str = "table.name";
149pub const SCHEMA_NAME_KEY: &str = "schema.name";
150pub const DATABASE_NAME_KEY: &str = "database.name";
151
152impl SchemaTableName {
153 pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
154 let table_type = ExternalCdcTableType::from_properties(properties);
155 let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
156
157 let schema_name = match table_type {
158 ExternalCdcTableType::MySql => properties
159 .get(DATABASE_NAME_KEY)
160 .cloned()
161 .unwrap_or_default(),
162 ExternalCdcTableType::Postgres | ExternalCdcTableType::Citus => {
163 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
164 }
165 ExternalCdcTableType::SqlServer => {
166 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
167 }
168 _ => {
169 unreachable!("invalid external table type: {:?}", table_type);
170 }
171 };
172
173 Self {
174 schema_name,
175 table_name,
176 }
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
181pub enum CdcOffset {
182 MySql(MySqlOffset),
183 Postgres(PostgresOffset),
184 SqlServer(SqlServerOffset),
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct DebeziumOffset {
204 #[serde(rename = "sourcePartition")]
205 pub source_partition: HashMap<String, String>,
206 #[serde(rename = "sourceOffset")]
207 pub source_offset: DebeziumSourceOffset,
208 #[serde(rename = "isHeartbeat")]
209 pub is_heartbeat: bool,
210}
211
212#[derive(Debug, Default, Clone, Serialize, Deserialize)]
213pub struct DebeziumSourceOffset {
214 pub last_snapshot_record: Option<bool>,
216 pub snapshot: Option<bool>,
218
219 pub file: Option<String>,
221 pub pos: Option<u64>,
222
223 pub lsn: Option<u64>,
225 #[serde(rename = "txId")]
226 pub txid: Option<i64>,
227 pub tx_usec: Option<u64>,
228 pub lsn_commit: Option<u64>,
229 pub lsn_proc: Option<u64>,
230
231 pub commit_lsn: Option<String>,
233 pub change_lsn: Option<String>,
234}
235
236pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
237
238pub trait ExternalTableReader: Sized {
239 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
240
241 #[allow(clippy::unused_async)]
243 async fn disconnect(self) -> ConnectorResult<()> {
244 Ok(())
245 }
246
247 fn snapshot_read(
248 &self,
249 table_name: SchemaTableName,
250 start_pk: Option<OwnedRow>,
251 primary_keys: Vec<String>,
252 limit: u32,
253 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
254
255 fn get_parallel_cdc_splits(
256 &self,
257 options: CdcTableSnapshotSplitOption,
258 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>>;
259
260 fn split_snapshot_read(
261 &self,
262 table_name: SchemaTableName,
263 left: OwnedRow,
264 right: OwnedRow,
265 split_columns: Vec<Field>,
266 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
267}
268
269pub struct CdcTableSnapshotSplitOption {
270 pub backfill_num_rows_per_split: u64,
271 pub backfill_as_even_splits: bool,
272 pub backfill_split_pk_column_index: u32,
273}
274
275pub enum ExternalTableReaderImpl {
276 MySql(MySqlExternalTableReader),
277 Postgres(PostgresExternalTableReader),
278 SqlServer(SqlServerExternalTableReader),
279 Mock(MockExternalTableReader),
280}
281
282#[derive(Debug, Default, Clone, Deserialize)]
283pub struct ExternalTableConfig {
284 pub connector: String,
285
286 #[serde(rename = "hostname")]
287 pub host: String,
288 pub port: String,
289 pub username: String,
290 pub password: String,
291 #[serde(rename = "database.name")]
292 pub database: String,
293 #[serde(rename = "schema.name", default = "Default::default")]
294 pub schema: String,
295 #[serde(rename = "table.name")]
296 pub table: String,
297 #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
301 #[serde(alias = "debezium.database.sslmode")]
302 pub ssl_mode: SslMode,
303
304 #[serde(rename = "ssl.root.cert")]
305 #[serde(alias = "debezium.database.sslrootcert")]
306 pub ssl_root_cert: Option<String>,
307
308 #[serde(rename = "database.encrypt", default = "Default::default")]
311 pub encrypt: String,
312}
313
314fn postgres_ssl_mode_default() -> SslMode {
315 SslMode::Disabled
317}
318
319impl ExternalTableConfig {
320 pub fn try_from_btreemap(
321 connect_properties: BTreeMap<String, String>,
322 secret_refs: BTreeMap<String, PbSecretRef>,
323 ) -> ConnectorResult<Self> {
324 let options_with_secret =
325 LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
326 let json_value = serde_json::to_value(options_with_secret)?;
327 let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
328 Ok(config)
329 }
330}
331
332impl ExternalTableReader for ExternalTableReaderImpl {
333 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
334 match self {
335 ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
336 ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
337 ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
338 ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
339 }
340 }
341
342 fn snapshot_read(
343 &self,
344 table_name: SchemaTableName,
345 start_pk: Option<OwnedRow>,
346 primary_keys: Vec<String>,
347 limit: u32,
348 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
349 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
350 }
351
352 fn get_parallel_cdc_splits(
353 &self,
354 options: CdcTableSnapshotSplitOption,
355 ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
356 self.get_parallel_cdc_splits_inner(options)
357 }
358
359 fn split_snapshot_read(
360 &self,
361 table_name: SchemaTableName,
362 left: OwnedRow,
363 right: OwnedRow,
364 split_columns: Vec<Field>,
365 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
366 self.split_snapshot_read_inner(table_name, left, right, split_columns)
367 }
368}
369
370impl ExternalTableReaderImpl {
371 pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
372 match self {
373 ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
374 ExternalTableReaderImpl::Postgres(_) => {
375 PostgresExternalTableReader::get_cdc_offset_parser()
376 }
377 ExternalTableReaderImpl::SqlServer(_) => {
378 SqlServerExternalTableReader::get_cdc_offset_parser()
379 }
380 ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
381 }
382 }
383
384 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
385 async fn snapshot_read_inner(
386 &self,
387 table_name: SchemaTableName,
388 start_pk: Option<OwnedRow>,
389 primary_keys: Vec<String>,
390 limit: u32,
391 ) {
392 let stream = match self {
393 ExternalTableReaderImpl::MySql(mysql) => {
394 mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
395 }
396 ExternalTableReaderImpl::Postgres(postgres) => {
397 postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
398 }
399 ExternalTableReaderImpl::SqlServer(sql_server) => {
400 sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
401 }
402 ExternalTableReaderImpl::Mock(mock) => {
403 mock.snapshot_read(table_name, start_pk, primary_keys, limit)
404 }
405 };
406
407 pin_mut!(stream);
408 #[for_await]
409 for row in stream {
410 let row = row?;
411 yield row;
412 }
413 }
414
415 #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
416 async fn get_parallel_cdc_splits_inner(&self, options: CdcTableSnapshotSplitOption) {
417 let stream = match self {
418 ExternalTableReaderImpl::MySql(e) => e.get_parallel_cdc_splits(options),
419 ExternalTableReaderImpl::Postgres(e) => e.get_parallel_cdc_splits(options),
420 ExternalTableReaderImpl::SqlServer(e) => e.get_parallel_cdc_splits(options),
421 ExternalTableReaderImpl::Mock(e) => e.get_parallel_cdc_splits(options),
422 };
423 pin_mut!(stream);
424 #[for_await]
425 for row in stream {
426 let row = row?;
427 yield row;
428 }
429 }
430
431 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
432 async fn split_snapshot_read_inner(
433 &self,
434 table_name: SchemaTableName,
435 left: OwnedRow,
436 right: OwnedRow,
437 split_columns: Vec<Field>,
438 ) {
439 let stream = match self {
440 ExternalTableReaderImpl::MySql(mysql) => {
441 mysql.split_snapshot_read(table_name, left, right, split_columns)
442 }
443 ExternalTableReaderImpl::Postgres(postgres) => {
444 postgres.split_snapshot_read(table_name, left, right, split_columns)
445 }
446 ExternalTableReaderImpl::SqlServer(sql_server) => {
447 sql_server.split_snapshot_read(table_name, left, right, split_columns)
448 }
449 ExternalTableReaderImpl::Mock(mock) => {
450 mock.split_snapshot_read(table_name, left, right, split_columns)
451 }
452 };
453
454 pin_mut!(stream);
455 #[for_await]
456 for row in stream {
457 let row = row?;
458 yield row;
459 }
460 }
461}
462
463pub enum ExternalTableImpl {
464 MySql(MySqlExternalTable),
465 Postgres(PostgresExternalTable),
466 SqlServer(SqlServerExternalTable),
467}
468
469impl ExternalTableImpl {
470 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
471 let cdc_source_type = CdcSourceType::from(config.connector.as_str());
472 match cdc_source_type {
473 CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
474 MySqlExternalTable::connect(config).await?,
475 )),
476 CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
477 PostgresExternalTable::connect(
478 &config.username,
479 &config.password,
480 &config.host,
481 config.port.parse::<u16>().unwrap(),
482 &config.database,
483 &config.schema,
484 &config.table,
485 &config.ssl_mode,
486 &config.ssl_root_cert,
487 false,
488 )
489 .await?,
490 )),
491 CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
492 SqlServerExternalTable::connect(config).await?,
493 )),
494 _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
495 }
496 }
497
498 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
499 match self {
500 ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
501 ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
502 ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
503 }
504 }
505
506 pub fn pk_names(&self) -> &Vec<String> {
507 match self {
508 ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
509 ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
510 ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
511 }
512 }
513}
514
515pub const CDC_TABLE_SPLIT_ID_START: i64 = 1;