risingwave_connector/source/cdc/external/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Field, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::catalog::table::CdcTableType as PbCdcTableType;
32use risingwave_pb::secret::PbSecretRef;
33use serde::{Deserialize, Serialize};
34
35use crate::WithPropertiesExt;
36use crate::connector_common::{PostgresExternalTable, SslMode};
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::parser::mysql_row_to_owned_row;
39use crate::source::CdcTableSnapshotSplit;
40use crate::source::cdc::CdcSourceType;
41use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
42use crate::source::cdc::external::mysql::{
43    MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
44};
45use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
46use crate::source::cdc::external::sql_server::{
47    SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
48};
49
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum ExternalCdcTableType {
52    Undefined,
53    Mock,
54    MySql,
55    Postgres,
56    SqlServer,
57    Citus,
58    Mongo,
59}
60
61impl ExternalCdcTableType {
62    pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
63        let connector = with_properties.get_connector().unwrap_or_default();
64        match connector.as_str() {
65            "mysql-cdc" => Self::MySql,
66            "postgres-cdc" => Self::Postgres,
67            "citus-cdc" => Self::Citus,
68            "sqlserver-cdc" => Self::SqlServer,
69            "mongodb-cdc" => Self::Mongo,
70            _ => Self::Undefined,
71        }
72    }
73
74    pub fn can_backfill(&self) -> bool {
75        matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
76    }
77
78    pub fn enable_transaction_metadata(&self) -> bool {
79        // In Debezium, transactional metadata cause delay of the newest events, as the `END` message is never sent unless a new transaction starts.
80        // So we only allow transactional metadata for MySQL and Postgres.
81        // See more in https://debezium.io/documentation/reference/2.6/connectors/sqlserver.html#sqlserver-transaction-metadata
82        matches!(self, Self::MySql | Self::Postgres)
83    }
84
85    pub async fn create_table_reader(
86        &self,
87        config: ExternalTableConfig,
88        schema: Schema,
89        pk_indices: Vec<usize>,
90        schema_table_name: SchemaTableName,
91    ) -> ConnectorResult<ExternalTableReaderImpl> {
92        match self {
93            Self::MySql => Ok(ExternalTableReaderImpl::MySql(
94                MySqlExternalTableReader::new(config, schema).await?,
95            )),
96            Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
97                PostgresExternalTableReader::new(config, schema, pk_indices, schema_table_name)
98                    .await?,
99            )),
100            Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
101                SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
102            )),
103            // citus is never supported for cdc backfill (create source + create table).
104            Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
105            _ => bail!("invalid external table type: {:?}", *self),
106        }
107    }
108}
109
110impl From<ExternalCdcTableType> for PbCdcTableType {
111    fn from(cdc_table_type: ExternalCdcTableType) -> Self {
112        match cdc_table_type {
113            ExternalCdcTableType::Postgres => Self::Postgres,
114            ExternalCdcTableType::MySql => Self::Mysql,
115            ExternalCdcTableType::SqlServer => Self::Sqlserver,
116
117            ExternalCdcTableType::Citus => Self::Citus,
118            ExternalCdcTableType::Mongo => Self::Mongo,
119            ExternalCdcTableType::Undefined | ExternalCdcTableType::Mock => Self::Unspecified,
120        }
121    }
122}
123
124impl From<PbCdcTableType> for ExternalCdcTableType {
125    fn from(cdc_table_type: PbCdcTableType) -> Self {
126        match cdc_table_type {
127            PbCdcTableType::Postgres => Self::Postgres,
128            PbCdcTableType::Mysql => Self::MySql,
129            PbCdcTableType::Sqlserver => Self::SqlServer,
130            PbCdcTableType::Mongo => Self::Mongo,
131            PbCdcTableType::Citus => Self::Citus,
132            PbCdcTableType::Unspecified => Self::Undefined,
133        }
134    }
135}
136
137#[derive(Debug, Clone, PartialEq)]
138pub struct SchemaTableName {
139    // namespace of the table, e.g. database in mysql, schema in postgres
140    pub schema_name: String,
141    pub table_name: String,
142}
143
144pub const TABLE_NAME_KEY: &str = "table.name";
145pub const SCHEMA_NAME_KEY: &str = "schema.name";
146pub const DATABASE_NAME_KEY: &str = "database.name";
147
148impl SchemaTableName {
149    pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
150        let table_type = ExternalCdcTableType::from_properties(properties);
151        let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
152
153        let schema_name = match table_type {
154            ExternalCdcTableType::MySql => properties
155                .get(DATABASE_NAME_KEY)
156                .cloned()
157                .unwrap_or_default(),
158            ExternalCdcTableType::Postgres | ExternalCdcTableType::Citus => {
159                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
160            }
161            ExternalCdcTableType::SqlServer => {
162                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
163            }
164            _ => {
165                unreachable!("invalid external table type: {:?}", table_type);
166            }
167        };
168
169        Self {
170            schema_name,
171            table_name,
172        }
173    }
174}
175
176#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
177pub enum CdcOffset {
178    MySql(MySqlOffset),
179    Postgres(PostgresOffset),
180    SqlServer(SqlServerOffset),
181}
182
183// Example debezium offset for Postgres:
184// {
185//     "sourcePartition":
186//     {
187//         "server": "RW_CDC_1004"
188//     },
189//     "sourceOffset":
190//     {
191//         "last_snapshot_record": false,
192//         "lsn": 29973552,
193//         "txId": 1046,
194//         "ts_usec": 1670826189008456,
195//         "snapshot": true
196//     }
197// }
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct DebeziumOffset {
200    #[serde(rename = "sourcePartition")]
201    pub source_partition: HashMap<String, String>,
202    #[serde(rename = "sourceOffset")]
203    pub source_offset: DebeziumSourceOffset,
204    #[serde(rename = "isHeartbeat")]
205    pub is_heartbeat: bool,
206}
207
208#[derive(Debug, Default, Clone, Serialize, Deserialize)]
209pub struct DebeziumSourceOffset {
210    // postgres snapshot progress
211    pub last_snapshot_record: Option<bool>,
212    // mysql snapshot progress
213    pub snapshot: Option<bool>,
214
215    // mysql binlog offset
216    pub file: Option<String>,
217    pub pos: Option<u64>,
218
219    // postgres offset
220    pub lsn: Option<u64>,
221    #[serde(rename = "txId")]
222    pub txid: Option<i64>,
223    pub tx_usec: Option<u64>,
224    pub lsn_commit: Option<u64>,
225    pub lsn_proc: Option<u64>,
226
227    // sql server offset
228    pub commit_lsn: Option<String>,
229    pub change_lsn: Option<String>,
230}
231
232pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
233
234pub trait ExternalTableReader: Sized {
235    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
236
237    // Currently, MySQL cdc uses a connection pool to manage connections to MySQL, and other CDC processes do not require the disconnect step for now.
238
239    async fn disconnect(self) -> ConnectorResult<()> {
240        Ok(())
241    }
242
243    fn snapshot_read(
244        &self,
245        table_name: SchemaTableName,
246        start_pk: Option<OwnedRow>,
247        primary_keys: Vec<String>,
248        limit: u32,
249    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
250
251    fn get_parallel_cdc_splits(
252        &self,
253        options: CdcTableSnapshotSplitOption,
254    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>>;
255
256    fn split_snapshot_read(
257        &self,
258        table_name: SchemaTableName,
259        left: OwnedRow,
260        right: OwnedRow,
261        split_columns: Vec<Field>,
262    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
263}
264
265pub struct CdcTableSnapshotSplitOption {
266    pub backfill_num_rows_per_split: u64,
267    pub backfill_as_even_splits: bool,
268    pub backfill_split_pk_column_index: u32,
269}
270
271pub enum ExternalTableReaderImpl {
272    MySql(MySqlExternalTableReader),
273    Postgres(PostgresExternalTableReader),
274    SqlServer(SqlServerExternalTableReader),
275    Mock(MockExternalTableReader),
276}
277
278#[derive(Debug, Default, Clone, Deserialize)]
279pub struct ExternalTableConfig {
280    pub connector: String,
281
282    #[serde(rename = "hostname")]
283    pub host: String,
284    pub port: String,
285    pub username: String,
286    pub password: String,
287    #[serde(rename = "database.name")]
288    pub database: String,
289    #[serde(rename = "schema.name", default = "Default::default")]
290    pub schema: String,
291    #[serde(rename = "table.name")]
292    pub table: String,
293    /// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres.
294    /// Choices include `disabled`, `preferred`, and `required`.
295    /// This field is optional.
296    #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
297    #[serde(alias = "debezium.database.sslmode")]
298    pub ssl_mode: SslMode,
299
300    #[serde(rename = "ssl.root.cert")]
301    #[serde(alias = "debezium.database.sslrootcert")]
302    pub ssl_root_cert: Option<String>,
303
304    /// `encrypt` specifies whether connect to SQL Server using SSL.
305    /// Only "true" means using SSL. All other values are treated as "false".
306    #[serde(rename = "database.encrypt", default = "Default::default")]
307    pub encrypt: String,
308}
309
310fn postgres_ssl_mode_default() -> SslMode {
311    // NOTE(StrikeW): Default to `disabled` for backward compatibility
312    SslMode::Disabled
313}
314
315impl ExternalTableConfig {
316    pub fn try_from_btreemap(
317        connect_properties: BTreeMap<String, String>,
318        secret_refs: BTreeMap<String, PbSecretRef>,
319    ) -> ConnectorResult<Self> {
320        let options_with_secret =
321            LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
322        let json_value = serde_json::to_value(options_with_secret)?;
323        let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
324        Ok(config)
325    }
326}
327
328impl ExternalTableReader for ExternalTableReaderImpl {
329    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
330        match self {
331            ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
332            ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
333            ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
334            ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
335        }
336    }
337
338    fn snapshot_read(
339        &self,
340        table_name: SchemaTableName,
341        start_pk: Option<OwnedRow>,
342        primary_keys: Vec<String>,
343        limit: u32,
344    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
345        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
346    }
347
348    fn get_parallel_cdc_splits(
349        &self,
350        options: CdcTableSnapshotSplitOption,
351    ) -> BoxStream<'_, ConnectorResult<CdcTableSnapshotSplit>> {
352        self.get_parallel_cdc_splits_inner(options)
353    }
354
355    fn split_snapshot_read(
356        &self,
357        table_name: SchemaTableName,
358        left: OwnedRow,
359        right: OwnedRow,
360        split_columns: Vec<Field>,
361    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
362        self.split_snapshot_read_inner(table_name, left, right, split_columns)
363    }
364}
365
366impl ExternalTableReaderImpl {
367    pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
368        match self {
369            ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
370            ExternalTableReaderImpl::Postgres(_) => {
371                PostgresExternalTableReader::get_cdc_offset_parser()
372            }
373            ExternalTableReaderImpl::SqlServer(_) => {
374                SqlServerExternalTableReader::get_cdc_offset_parser()
375            }
376            ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
377        }
378    }
379
380    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
381    async fn snapshot_read_inner(
382        &self,
383        table_name: SchemaTableName,
384        start_pk: Option<OwnedRow>,
385        primary_keys: Vec<String>,
386        limit: u32,
387    ) {
388        let stream = match self {
389            ExternalTableReaderImpl::MySql(mysql) => {
390                mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
391            }
392            ExternalTableReaderImpl::Postgres(postgres) => {
393                postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
394            }
395            ExternalTableReaderImpl::SqlServer(sql_server) => {
396                sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
397            }
398            ExternalTableReaderImpl::Mock(mock) => {
399                mock.snapshot_read(table_name, start_pk, primary_keys, limit)
400            }
401        };
402
403        pin_mut!(stream);
404        #[for_await]
405        for row in stream {
406            let row = row?;
407            yield row;
408        }
409    }
410
411    #[try_stream(boxed, ok = CdcTableSnapshotSplit, error = ConnectorError)]
412    async fn get_parallel_cdc_splits_inner(&self, options: CdcTableSnapshotSplitOption) {
413        let stream = match self {
414            ExternalTableReaderImpl::MySql(e) => e.get_parallel_cdc_splits(options),
415            ExternalTableReaderImpl::Postgres(e) => e.get_parallel_cdc_splits(options),
416            ExternalTableReaderImpl::SqlServer(e) => e.get_parallel_cdc_splits(options),
417            ExternalTableReaderImpl::Mock(e) => e.get_parallel_cdc_splits(options),
418        };
419        pin_mut!(stream);
420        #[for_await]
421        for row in stream {
422            let row = row?;
423            yield row;
424        }
425    }
426
427    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
428    async fn split_snapshot_read_inner(
429        &self,
430        table_name: SchemaTableName,
431        left: OwnedRow,
432        right: OwnedRow,
433        split_columns: Vec<Field>,
434    ) {
435        let stream = match self {
436            ExternalTableReaderImpl::MySql(mysql) => {
437                mysql.split_snapshot_read(table_name, left, right, split_columns)
438            }
439            ExternalTableReaderImpl::Postgres(postgres) => {
440                postgres.split_snapshot_read(table_name, left, right, split_columns)
441            }
442            ExternalTableReaderImpl::SqlServer(sql_server) => {
443                sql_server.split_snapshot_read(table_name, left, right, split_columns)
444            }
445            ExternalTableReaderImpl::Mock(mock) => {
446                mock.split_snapshot_read(table_name, left, right, split_columns)
447            }
448        };
449
450        pin_mut!(stream);
451        #[for_await]
452        for row in stream {
453            let row = row?;
454            yield row;
455        }
456    }
457}
458
459pub enum ExternalTableImpl {
460    MySql(MySqlExternalTable),
461    Postgres(PostgresExternalTable),
462    SqlServer(SqlServerExternalTable),
463}
464
465impl ExternalTableImpl {
466    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
467        let cdc_source_type = CdcSourceType::from(config.connector.as_str());
468        match cdc_source_type {
469            CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
470                MySqlExternalTable::connect(config).await?,
471            )),
472            CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
473                PostgresExternalTable::connect(
474                    &config.username,
475                    &config.password,
476                    &config.host,
477                    config.port.parse::<u16>().unwrap(),
478                    &config.database,
479                    &config.schema,
480                    &config.table,
481                    &config.ssl_mode,
482                    &config.ssl_root_cert,
483                    false,
484                )
485                .await?,
486            )),
487            CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
488                SqlServerExternalTable::connect(config).await?,
489            )),
490            _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
491        }
492    }
493
494    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
495        match self {
496            ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
497            ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
498            ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
499        }
500    }
501
502    pub fn pk_names(&self) -> &Vec<String> {
503        match self {
504            ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
505            ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
506            ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
507        }
508    }
509}
510
511pub const CDC_TABLE_SPLIT_ID_START: i64 = 1;