risingwave_connector/source/cdc/external/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::{Deserialize, Serialize};
33
34use crate::WithPropertiesExt;
35use crate::connector_common::{PostgresExternalTable, SslMode};
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::parser::mysql_row_to_owned_row;
38use crate::source::cdc::CdcSourceType;
39use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
40use crate::source::cdc::external::mysql::{
41    MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
42};
43use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
44use crate::source::cdc::external::sql_server::{
45    SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
46};
47
48#[derive(Debug, Clone)]
49pub enum CdcTableType {
50    Undefined,
51    Mock,
52    MySql,
53    Postgres,
54    SqlServer,
55    Citus,
56}
57
58impl CdcTableType {
59    pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
60        let connector = with_properties.get_connector().unwrap_or_default();
61        match connector.as_str() {
62            "mysql-cdc" => Self::MySql,
63            "postgres-cdc" => Self::Postgres,
64            "citus-cdc" => Self::Citus,
65            "sqlserver-cdc" => Self::SqlServer,
66            _ => Self::Undefined,
67        }
68    }
69
70    pub fn can_backfill(&self) -> bool {
71        matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
72    }
73
74    pub fn enable_transaction_metadata(&self) -> bool {
75        // In Debezium, transactional metadata cause delay of the newest events, as the `END` message is never sent unless a new transaction starts.
76        // So we only allow transactional metadata for MySQL and Postgres.
77        // See more in https://debezium.io/documentation/reference/2.6/connectors/sqlserver.html#sqlserver-transaction-metadata
78        matches!(self, Self::MySql | Self::Postgres)
79    }
80
81    pub fn shareable_only(&self) -> bool {
82        matches!(self, Self::SqlServer)
83    }
84
85    pub async fn create_table_reader(
86        &self,
87        config: ExternalTableConfig,
88        schema: Schema,
89        pk_indices: Vec<usize>,
90    ) -> ConnectorResult<ExternalTableReaderImpl> {
91        match self {
92            Self::MySql => Ok(ExternalTableReaderImpl::MySql(
93                MySqlExternalTableReader::new(config, schema)?,
94            )),
95            Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
96                PostgresExternalTableReader::new(config, schema, pk_indices).await?,
97            )),
98            Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
99                SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
100            )),
101            Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
102            _ => bail!("invalid external table type: {:?}", *self),
103        }
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct SchemaTableName {
109    // namespace of the table, e.g. database in mysql, schema in postgres
110    pub schema_name: String,
111    pub table_name: String,
112}
113
114pub const TABLE_NAME_KEY: &str = "table.name";
115pub const SCHEMA_NAME_KEY: &str = "schema.name";
116pub const DATABASE_NAME_KEY: &str = "database.name";
117
118impl SchemaTableName {
119    pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
120        let table_type = CdcTableType::from_properties(properties);
121        let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
122
123        let schema_name = match table_type {
124            CdcTableType::MySql => properties
125                .get(DATABASE_NAME_KEY)
126                .cloned()
127                .unwrap_or_default(),
128            CdcTableType::Postgres | CdcTableType::Citus => {
129                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
130            }
131            CdcTableType::SqlServer => properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default(),
132            _ => {
133                unreachable!("invalid external table type: {:?}", table_type);
134            }
135        };
136
137        Self {
138            schema_name,
139            table_name,
140        }
141    }
142}
143
144#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
145pub enum CdcOffset {
146    MySql(MySqlOffset),
147    Postgres(PostgresOffset),
148    SqlServer(SqlServerOffset),
149}
150
151// Example debezium offset for Postgres:
152// {
153//     "sourcePartition":
154//     {
155//         "server": "RW_CDC_1004"
156//     },
157//     "sourceOffset":
158//     {
159//         "last_snapshot_record": false,
160//         "lsn": 29973552,
161//         "txId": 1046,
162//         "ts_usec": 1670826189008456,
163//         "snapshot": true
164//     }
165// }
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct DebeziumOffset {
168    #[serde(rename = "sourcePartition")]
169    pub source_partition: HashMap<String, String>,
170    #[serde(rename = "sourceOffset")]
171    pub source_offset: DebeziumSourceOffset,
172    #[serde(rename = "isHeartbeat")]
173    pub is_heartbeat: bool,
174}
175
176#[derive(Debug, Default, Clone, Serialize, Deserialize)]
177pub struct DebeziumSourceOffset {
178    // postgres snapshot progress
179    pub last_snapshot_record: Option<bool>,
180    // mysql snapshot progress
181    pub snapshot: Option<bool>,
182
183    // mysql binlog offset
184    pub file: Option<String>,
185    pub pos: Option<u64>,
186
187    // postgres offset
188    pub lsn: Option<u64>,
189    #[serde(rename = "txId")]
190    pub txid: Option<i64>,
191    pub tx_usec: Option<u64>,
192
193    // sql server offset
194    pub commit_lsn: Option<String>,
195    pub change_lsn: Option<String>,
196}
197
198pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
199
200pub trait ExternalTableReader: Sized {
201    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
202
203    // Currently, MySQL cdc uses a connection pool to manage connections to MySQL, and other CDC processes do not require the disconnect step for now.
204    #[allow(clippy::unused_async)]
205    async fn disconnect(self) -> ConnectorResult<()> {
206        Ok(())
207    }
208
209    fn snapshot_read(
210        &self,
211        table_name: SchemaTableName,
212        start_pk: Option<OwnedRow>,
213        primary_keys: Vec<String>,
214        limit: u32,
215    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
216}
217
218pub enum ExternalTableReaderImpl {
219    MySql(MySqlExternalTableReader),
220    Postgres(PostgresExternalTableReader),
221    SqlServer(SqlServerExternalTableReader),
222    Mock(MockExternalTableReader),
223}
224
225#[derive(Debug, Default, Clone, Deserialize)]
226pub struct ExternalTableConfig {
227    pub connector: String,
228
229    #[serde(rename = "hostname")]
230    pub host: String,
231    pub port: String,
232    pub username: String,
233    pub password: String,
234    #[serde(rename = "database.name")]
235    pub database: String,
236    #[serde(rename = "schema.name", default = "Default::default")]
237    pub schema: String,
238    #[serde(rename = "table.name")]
239    pub table: String,
240    /// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres.
241    /// Choices include `disabled`, `preferred`, and `required`.
242    /// This field is optional.
243    #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
244    #[serde(alias = "debezium.database.sslmode")]
245    pub ssl_mode: SslMode,
246
247    #[serde(rename = "ssl.root.cert")]
248    #[serde(alias = "debezium.database.sslrootcert")]
249    pub ssl_root_cert: Option<String>,
250
251    /// `encrypt` specifies whether connect to SQL Server using SSL.
252    /// Only "true" means using SSL. All other values are treated as "false".
253    #[serde(rename = "database.encrypt", default = "Default::default")]
254    pub encrypt: String,
255}
256
257fn postgres_ssl_mode_default() -> SslMode {
258    // NOTE(StrikeW): Default to `disabled` for backward compatibility
259    SslMode::Disabled
260}
261
262impl ExternalTableConfig {
263    pub fn try_from_btreemap(
264        connect_properties: BTreeMap<String, String>,
265        secret_refs: BTreeMap<String, PbSecretRef>,
266    ) -> ConnectorResult<Self> {
267        let options_with_secret =
268            LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
269        let json_value = serde_json::to_value(options_with_secret)?;
270        let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
271        Ok(config)
272    }
273}
274
275impl ExternalTableReader for ExternalTableReaderImpl {
276    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
277        match self {
278            ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
279            ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
280            ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
281            ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
282        }
283    }
284
285    fn snapshot_read(
286        &self,
287        table_name: SchemaTableName,
288        start_pk: Option<OwnedRow>,
289        primary_keys: Vec<String>,
290        limit: u32,
291    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
292        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
293    }
294}
295
296impl ExternalTableReaderImpl {
297    pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
298        match self {
299            ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
300            ExternalTableReaderImpl::Postgres(_) => {
301                PostgresExternalTableReader::get_cdc_offset_parser()
302            }
303            ExternalTableReaderImpl::SqlServer(_) => {
304                SqlServerExternalTableReader::get_cdc_offset_parser()
305            }
306            ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
307        }
308    }
309
310    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
311    async fn snapshot_read_inner(
312        &self,
313        table_name: SchemaTableName,
314        start_pk: Option<OwnedRow>,
315        primary_keys: Vec<String>,
316        limit: u32,
317    ) {
318        let stream = match self {
319            ExternalTableReaderImpl::MySql(mysql) => {
320                mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
321            }
322            ExternalTableReaderImpl::Postgres(postgres) => {
323                postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
324            }
325            ExternalTableReaderImpl::SqlServer(sql_server) => {
326                sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
327            }
328            ExternalTableReaderImpl::Mock(mock) => {
329                mock.snapshot_read(table_name, start_pk, primary_keys, limit)
330            }
331        };
332
333        pin_mut!(stream);
334        #[for_await]
335        for row in stream {
336            let row = row?;
337            yield row;
338        }
339    }
340}
341
342pub enum ExternalTableImpl {
343    MySql(MySqlExternalTable),
344    Postgres(PostgresExternalTable),
345    SqlServer(SqlServerExternalTable),
346}
347
348impl ExternalTableImpl {
349    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
350        let cdc_source_type = CdcSourceType::from(config.connector.as_str());
351        match cdc_source_type {
352            CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
353                MySqlExternalTable::connect(config).await?,
354            )),
355            CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
356                PostgresExternalTable::connect(
357                    &config.username,
358                    &config.password,
359                    &config.host,
360                    config.port.parse::<u16>().unwrap(),
361                    &config.database,
362                    &config.schema,
363                    &config.table,
364                    &config.ssl_mode,
365                    &config.ssl_root_cert,
366                    false,
367                )
368                .await?,
369            )),
370            CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
371                SqlServerExternalTable::connect(config).await?,
372            )),
373            _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
374        }
375    }
376
377    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
378        match self {
379            ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
380            ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
381            ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
382        }
383    }
384
385    pub fn pk_names(&self) -> &Vec<String> {
386        match self {
387            ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
388            ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
389            ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
390        }
391    }
392}