risingwave_connector/source/cdc/external/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Field, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::catalog::table::CdcTableType as PbCdcTableType;
32use risingwave_pb::secret::PbSecretRef;
33use serde::{Deserialize, Serialize};
34
35use crate::WithPropertiesExt;
36use crate::connector_common::{PostgresExternalTable, SslMode};
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::parser::mysql_row_to_owned_row;
39use crate::source::CdcTableSnapshotSplit;
40use crate::source::cdc::CdcSourceType;
41use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
42use crate::source::cdc::external::mysql::{
43    MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
44};
45use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
46use crate::source::cdc::external::sql_server::{
47    SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
48};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum ExternalCdcTableType {
52    Undefined,
53    Mock,
54    MySql,
55    Postgres,
56    SqlServer,
57    Citus,
58    Mongo,
59}
60
61impl ExternalCdcTableType {
62    pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
63        let connector = with_properties.get_connector().unwrap_or_default();
64        match connector.as_str() {
65            "mysql-cdc" => Self::MySql,
66            "postgres-cdc" => Self::Postgres,
67            "citus-cdc" => Self::Citus,
68            "sqlserver-cdc" => Self::SqlServer,
69            "mongodb-cdc" => Self::Mongo,
70            _ => Self::Undefined,
71        }
72    }
73
74    pub fn can_backfill(&self) -> bool {
75        matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
76    }
77
78    pub fn enable_transaction_metadata(&self) -> bool {
79        // In Debezium, transactional metadata cause delay of the newest events, as the `END` message is never sent unless a new transaction starts.
80        // So we only allow transactional metadata for MySQL and Postgres.
81        // See more in https://debezium.io/documentation/reference/2.6/connectors/sqlserver.html#sqlserver-transaction-metadata
82        matches!(self, Self::MySql | Self::Postgres)
83    }
84
85    pub fn shareable_only(&self) -> bool {
86        matches!(self, Self::SqlServer)
87    }
88
89    pub async fn create_table_reader(
90        &self,
91        config: ExternalTableConfig,
92        schema: Schema,
93        pk_indices: Vec<usize>,
94        schema_table_name: SchemaTableName,
95    ) -> ConnectorResult<ExternalTableReaderImpl> {
96        match self {
97            Self::MySql => Ok(ExternalTableReaderImpl::MySql(
98                MySqlExternalTableReader::new(config, schema)?,
99            )),
100            Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
101                PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name)
102                    .await?,
103            )),
104            Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
105                SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
106            )),
107            // citus is never supported for cdc backfill (create source + create table).
108            Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
109            _ => bail!("invalid external table type: {:?}", *self),
110        }
111    }
112}
113
114impl From<ExternalCdcTableType> for PbCdcTableType {
115    fn from(cdc_table_type: ExternalCdcTableType) -> Self {
116        match cdc_table_type {
117            ExternalCdcTableType::Postgres => Self::Postgres,
118            ExternalCdcTableType::MySql => Self::Mysql,
119            ExternalCdcTableType::SqlServer => Self::Sqlserver,
120
121            ExternalCdcTableType::Citus => Self::Citus,
122            ExternalCdcTableType::Mongo => Self::Mongo,
123            ExternalCdcTableType::Undefined | ExternalCdcTableType::Mock => Self::Unspecified,
124        }
125    }
126}
127
128impl From<PbCdcTableType> for ExternalCdcTableType {
129    fn from(cdc_table_type: PbCdcTableType) -> Self {
130        match cdc_table_type {
131            PbCdcTableType::Postgres => Self::Postgres,
132            PbCdcTableType::Mysql => Self::MySql,
133            PbCdcTableType::Sqlserver => Self::SqlServer,
134            PbCdcTableType::Mongo => Self::Mongo,
135            PbCdcTableType::Citus => Self::Citus,
136            PbCdcTableType::Unspecified => Self::Undefined,
137        }
138    }
139}
140
141#[derive(Debug, Clone, PartialEq)]
142pub struct SchemaTableName {
143    // namespace of the table, e.g. database in mysql, schema in postgres
144    pub schema_name: String,
145    pub table_name: String,
146}
147
148pub const TABLE_NAME_KEY: &str = "table.name";
149pub const SCHEMA_NAME_KEY: &str = "schema.name";
150pub const DATABASE_NAME_KEY: &str = "database.name";
151
152impl SchemaTableName {
153    pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
154        let table_type = ExternalCdcTableType::from_properties(properties);
155        let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
156
157        let schema_name = match table_type {
158            ExternalCdcTableType::MySql => properties
159                .get(DATABASE_NAME_KEY)
160                .cloned()
161                .unwrap_or_default(),
162            ExternalCdcTableType::Postgres | ExternalCdcTableType::Citus => {
163                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
164            }
165            ExternalCdcTableType::SqlServer => {
166                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
167            }
168            _ => {
169                unreachable!("invalid external table type: {:?}", table_type);
170            }
171        };
172
173        Self {
174            schema_name,
175            table_name,
176        }
177    }
178}
179
180#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
181pub enum CdcOffset {
182    MySql(MySqlOffset),
183    Postgres(PostgresOffset),
184    SqlServer(SqlServerOffset),
185}
186
187// Example debezium offset for Postgres:
188// {
189//     "sourcePartition":
190//     {
191//         "server": "RW_CDC_1004"
192//     },
193//     "sourceOffset":
194//     {
195//         "last_snapshot_record": false,
196//         "lsn": 29973552,
197//         "txId": 1046,
198//         "ts_usec": 1670826189008456,
199//         "snapshot": true
200//     }
201// }
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct DebeziumOffset {
204    #[serde(rename = "sourcePartition")]
205    pub source_partition: HashMap<String, String>,
206    #[serde(rename = "sourceOffset")]
207    pub source_offset: DebeziumSourceOffset,
208    #[serde(rename = "isHeartbeat")]
209    pub is_heartbeat: bool,
210}
211
212#[derive(Debug, Default, Clone, Serialize, Deserialize)]
213pub struct DebeziumSourceOffset {
214    // postgres snapshot progress
215    pub last_snapshot_record: Option<bool>,
216    // mysql snapshot progress
217    pub snapshot: Option<bool>,
218
219    // mysql binlog offset
220    pub file: Option<String>,
221    pub pos: Option<u64>,
222
223    // postgres offset
224    pub lsn: Option<u64>,
225    #[serde(rename = "txId")]
226    pub txid: Option<i64>,
227    pub tx_usec: Option<u64>,
228    pub lsn_commit: Option<u64>,
229    pub lsn_proc: Option<u64>,
230
231    // sql server offset
232    pub commit_lsn: Option<String>,
233    pub change_lsn: Option<String>,
234}
235
236pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
237
238pub trait ExternalTableReader: Sized {
239    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
240
241    // Currently, MySQL cdc uses a connection pool to manage connections to MySQL, and other CDC processes do not require the disconnect step for now.
242    #[allow(clippy::unused_async)]
243    async fn disconnect(self) -> ConnectorResult<()> {
244        Ok(())
245    }
246
247    fn snapshot_read(
248        &self,
249        table_name: SchemaTableName,
250        start_pk: Option<OwnedRow>,
251        primary_keys: Vec<String>,
252        limit: u32,
253    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
254
255    fn get_parallel_cdc_splits(
256        &self,
257        options: CdcTableSnapshotSplitOption,
258    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>>;
259
260    fn split_snapshot_read(
261        &self,
262        table_name: SchemaTableName,
263        left: OwnedRow,
264        right: OwnedRow,
265        split_columns: Vec<Field>,
266    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
267}
268
269pub struct CdcTableSnapshotSplitOption {
270    pub backfill_num_rows_per_split: u64,
271    pub backfill_as_even_splits: bool,
272    pub backfill_split_pk_column_index: u32,
273}
274
275pub enum ExternalTableReaderImpl {
276    MySql(MySqlExternalTableReader),
277    Postgres(PostgresExternalTableReader),
278    SqlServer(SqlServerExternalTableReader),
279    Mock(MockExternalTableReader),
280}
281
282#[derive(Debug, Default, Clone, Deserialize)]
283pub struct ExternalTableConfig {
284    pub connector: String,
285
286    #[serde(rename = "hostname")]
287    pub host: String,
288    pub port: String,
289    pub username: String,
290    pub password: String,
291    #[serde(rename = "database.name")]
292    pub database: String,
293    #[serde(rename = "schema.name", default = "Default::default")]
294    pub schema: String,
295    #[serde(rename = "table.name")]
296    pub table: String,
297    /// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres.
298    /// Choices include `disabled`, `preferred`, and `required`.
299    /// This field is optional.
300    #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
301    #[serde(alias = "debezium.database.sslmode")]
302    pub ssl_mode: SslMode,
303
304    #[serde(rename = "ssl.root.cert")]
305    #[serde(alias = "debezium.database.sslrootcert")]
306    pub ssl_root_cert: Option<String>,
307
308    /// `encrypt` specifies whether connect to SQL Server using SSL.
309    /// Only "true" means using SSL. All other values are treated as "false".
310    #[serde(rename = "database.encrypt", default = "Default::default")]
311    pub encrypt: String,
312}
313
314fn postgres_ssl_mode_default() -> SslMode {
315    // NOTE(StrikeW): Default to `disabled` for backward compatibility
316    SslMode::Disabled
317}
318
319impl ExternalTableConfig {
320    pub fn try_from_btreemap(
321        connect_properties: BTreeMap<String, String>,
322        secret_refs: BTreeMap<String, PbSecretRef>,
323    ) -> ConnectorResult<Self> {
324        let options_with_secret =
325            LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
326        let json_value = serde_json::to_value(options_with_secret)?;
327        let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
328        Ok(config)
329    }
330}
331
332impl ExternalTableReader for ExternalTableReaderImpl {
333    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
334        match self {
335            ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
336            ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
337            ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
338            ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
339        }
340    }
341
342    fn snapshot_read(
343        &self,
344        table_name: SchemaTableName,
345        start_pk: Option<OwnedRow>,
346        primary_keys: Vec<String>,
347        limit: u32,
348    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
349        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
350    }
351
352    fn get_parallel_cdc_splits(
353        &self,
354        options: CdcTableSnapshotSplitOption,
355    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
356        self.get_parallel_cdc_splits_inner(options)
357    }
358
359    fn split_snapshot_read(
360        &self,
361        table_name: SchemaTableName,
362        left: OwnedRow,
363        right: OwnedRow,
364        split_columns: Vec<Field>,
365    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
366        self.split_snapshot_read_inner(table_name, left, right, split_columns)
367    }
368}
369
370impl ExternalTableReaderImpl {
371    pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
372        match self {
373            ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
374            ExternalTableReaderImpl::Postgres(_) => {
375                PostgresExternalTableReader::get_cdc_offset_parser()
376            }
377            ExternalTableReaderImpl::SqlServer(_) => {
378                SqlServerExternalTableReader::get_cdc_offset_parser()
379            }
380            ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
381        }
382    }
383
384    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
385    async fn snapshot_read_inner(
386        &self,
387        table_name: SchemaTableName,
388        start_pk: Option<OwnedRow>,
389        primary_keys: Vec<String>,
390        limit: u32,
391    ) {
392        let stream = match self {
393            ExternalTableReaderImpl::MySql(mysql) => {
394                mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
395            }
396            ExternalTableReaderImpl::Postgres(postgres) => {
397                postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
398            }
399            ExternalTableReaderImpl::SqlServer(sql_server) => {
400                sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
401            }
402            ExternalTableReaderImpl::Mock(mock) => {
403                mock.snapshot_read(table_name, start_pk, primary_keys, limit)
404            }
405        };
406
407        pin_mut!(stream);
408        #[for_await]
409        for row in stream {
410            let row = row?;
411            yield row;
412        }
413    }
414
415    #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
416    async fn get_parallel_cdc_splits_inner(&self, options: CdcTableSnapshotSplitOption) {
417        let stream = match self {
418            ExternalTableReaderImpl::MySql(e) => e.get_parallel_cdc_splits(options),
419            ExternalTableReaderImpl::Postgres(e) => e.get_parallel_cdc_splits(options),
420            ExternalTableReaderImpl::SqlServer(e) => e.get_parallel_cdc_splits(options),
421            ExternalTableReaderImpl::Mock(e) => e.get_parallel_cdc_splits(options),
422        };
423        pin_mut!(stream);
424        #[for_await]
425        for row in stream {
426            let row = row?;
427            yield row;
428        }
429    }
430
431    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
432    async fn split_snapshot_read_inner(
433        &self,
434        table_name: SchemaTableName,
435        left: OwnedRow,
436        right: OwnedRow,
437        split_columns: Vec<Field>,
438    ) {
439        let stream = match self {
440            ExternalTableReaderImpl::MySql(mysql) => {
441                mysql.split_snapshot_read(table_name, left, right, split_columns)
442            }
443            ExternalTableReaderImpl::Postgres(postgres) => {
444                postgres.split_snapshot_read(table_name, left, right, split_columns)
445            }
446            ExternalTableReaderImpl::SqlServer(sql_server) => {
447                sql_server.split_snapshot_read(table_name, left, right, split_columns)
448            }
449            ExternalTableReaderImpl::Mock(mock) => {
450                mock.split_snapshot_read(table_name, left, right, split_columns)
451            }
452        };
453
454        pin_mut!(stream);
455        #[for_await]
456        for row in stream {
457            let row = row?;
458            yield row;
459        }
460    }
461}
462
463pub enum ExternalTableImpl {
464    MySql(MySqlExternalTable),
465    Postgres(PostgresExternalTable),
466    SqlServer(SqlServerExternalTable),
467}
468
469impl ExternalTableImpl {
470    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
471        let cdc_source_type = CdcSourceType::from(config.connector.as_str());
472        match cdc_source_type {
473            CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
474                MySqlExternalTable::connect(config).await?,
475            )),
476            CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
477                PostgresExternalTable::connect(
478                    &config.username,
479                    &config.password,
480                    &config.host,
481                    config.port.parse::<u16>().unwrap(),
482                    &config.database,
483                    &config.schema,
484                    &config.table,
485                    &config.ssl_mode,
486                    &config.ssl_root_cert,
487                    false,
488                )
489                .await?,
490            )),
491            CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
492                SqlServerExternalTable::connect(config).await?,
493            )),
494            _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
495        }
496    }
497
498    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
499        match self {
500            ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
501            ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
502            ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
503        }
504    }
505
506    pub fn pk_names(&self) -> &Vec<String> {
507        match self {
508            ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
509            ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
510            ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
511        }
512    }
513}
514
515pub const CDC_TABLE_SPLIT_ID_START: i64 = 1;