risingwave_connector/source/cdc/external/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::{Deserialize, Serialize};
33
34use crate::WithPropertiesExt;
35use crate::connector_common::{PostgresExternalTable, SslMode};
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::parser::mysql_row_to_owned_row;
38use crate::source::cdc::CdcSourceType;
39use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
40use crate::source::cdc::external::mysql::{
41    MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
42};
43use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
44use crate::source::cdc::external::sql_server::{
45    SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
46};
47
48#[derive(Debug, Clone)]
49pub enum CdcTableType {
50    Undefined,
51    Mock,
52    MySql,
53    Postgres,
54    SqlServer,
55    Citus,
56}
57
58impl CdcTableType {
59    pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
60        let connector = with_properties.get_connector().unwrap_or_default();
61        match connector.as_str() {
62            "mysql-cdc" => Self::MySql,
63            "postgres-cdc" => Self::Postgres,
64            "citus-cdc" => Self::Citus,
65            "sqlserver-cdc" => Self::SqlServer,
66            _ => Self::Undefined,
67        }
68    }
69
70    pub fn can_backfill(&self) -> bool {
71        matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
72    }
73
74    pub fn enable_transaction_metadata(&self) -> bool {
75        // In Debezium, transactional metadata cause delay of the newest events, as the `END` message is never sent unless a new transaction starts.
76        // So we only allow transactional metadata for MySQL and Postgres.
77        // See more in https://debezium.io/documentation/reference/2.6/connectors/sqlserver.html#sqlserver-transaction-metadata
78        matches!(self, Self::MySql | Self::Postgres)
79    }
80
81    pub fn shareable_only(&self) -> bool {
82        matches!(self, Self::SqlServer)
83    }
84
85    pub async fn create_table_reader(
86        &self,
87        config: ExternalTableConfig,
88        schema: Schema,
89        pk_indices: Vec<usize>,
90    ) -> ConnectorResult<ExternalTableReaderImpl> {
91        match self {
92            Self::MySql => Ok(ExternalTableReaderImpl::MySql(
93                MySqlExternalTableReader::new(config, schema).await?,
94            )),
95            Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
96                PostgresExternalTableReader::new(config, schema, pk_indices).await?,
97            )),
98            Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
99                SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
100            )),
101            Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
102            _ => bail!("invalid external table type: {:?}", *self),
103        }
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct SchemaTableName {
109    // namespace of the table, e.g. database in mysql, schema in postgres
110    pub schema_name: String,
111    pub table_name: String,
112}
113
114pub const TABLE_NAME_KEY: &str = "table.name";
115pub const SCHEMA_NAME_KEY: &str = "schema.name";
116pub const DATABASE_NAME_KEY: &str = "database.name";
117
118impl SchemaTableName {
119    pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
120        let table_type = CdcTableType::from_properties(properties);
121        let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
122
123        let schema_name = match table_type {
124            CdcTableType::MySql => properties
125                .get(DATABASE_NAME_KEY)
126                .cloned()
127                .unwrap_or_default(),
128            CdcTableType::Postgres | CdcTableType::Citus => {
129                properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
130            }
131            CdcTableType::SqlServer => properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default(),
132            _ => {
133                unreachable!("invalid external table type: {:?}", table_type);
134            }
135        };
136
137        Self {
138            schema_name,
139            table_name,
140        }
141    }
142}
143
144#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
145pub enum CdcOffset {
146    MySql(MySqlOffset),
147    Postgres(PostgresOffset),
148    SqlServer(SqlServerOffset),
149}
150
151// Example debezium offset for Postgres:
152// {
153//     "sourcePartition":
154//     {
155//         "server": "RW_CDC_1004"
156//     },
157//     "sourceOffset":
158//     {
159//         "last_snapshot_record": false,
160//         "lsn": 29973552,
161//         "txId": 1046,
162//         "ts_usec": 1670826189008456,
163//         "snapshot": true
164//     }
165// }
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct DebeziumOffset {
168    #[serde(rename = "sourcePartition")]
169    pub source_partition: HashMap<String, String>,
170    #[serde(rename = "sourceOffset")]
171    pub source_offset: DebeziumSourceOffset,
172    #[serde(rename = "isHeartbeat")]
173    pub is_heartbeat: bool,
174}
175
176#[derive(Debug, Default, Clone, Serialize, Deserialize)]
177pub struct DebeziumSourceOffset {
178    // postgres snapshot progress
179    pub last_snapshot_record: Option<bool>,
180    // mysql snapshot progress
181    pub snapshot: Option<bool>,
182
183    // mysql binlog offset
184    pub file: Option<String>,
185    pub pos: Option<u64>,
186
187    // postgres offset
188    pub lsn: Option<u64>,
189    #[serde(rename = "txId")]
190    pub txid: Option<i64>,
191    pub tx_usec: Option<u64>,
192
193    // sql server offset
194    pub commit_lsn: Option<String>,
195    pub change_lsn: Option<String>,
196}
197
198pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
199
200pub trait ExternalTableReader {
201    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
202
203    fn snapshot_read(
204        &self,
205        table_name: SchemaTableName,
206        start_pk: Option<OwnedRow>,
207        primary_keys: Vec<String>,
208        limit: u32,
209    ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
210}
211
212pub enum ExternalTableReaderImpl {
213    MySql(MySqlExternalTableReader),
214    Postgres(PostgresExternalTableReader),
215    SqlServer(SqlServerExternalTableReader),
216    Mock(MockExternalTableReader),
217}
218
219#[derive(Debug, Default, Clone, Deserialize)]
220pub struct ExternalTableConfig {
221    pub connector: String,
222
223    #[serde(rename = "hostname")]
224    pub host: String,
225    pub port: String,
226    pub username: String,
227    pub password: String,
228    #[serde(rename = "database.name")]
229    pub database: String,
230    #[serde(rename = "schema.name", default = "Default::default")]
231    pub schema: String,
232    #[serde(rename = "table.name")]
233    pub table: String,
234    /// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres.
235    /// Choices include `disabled`, `preferred`, and `required`.
236    /// This field is optional.
237    #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
238    #[serde(alias = "debezium.database.sslmode")]
239    pub ssl_mode: SslMode,
240
241    #[serde(rename = "ssl.root.cert")]
242    #[serde(alias = "debezium.database.sslrootcert")]
243    pub ssl_root_cert: Option<String>,
244
245    /// `encrypt` specifies whether connect to SQL Server using SSL.
246    /// Only "true" means using SSL. All other values are treated as "false".
247    #[serde(rename = "database.encrypt", default = "Default::default")]
248    pub encrypt: String,
249}
250
251fn postgres_ssl_mode_default() -> SslMode {
252    // NOTE(StrikeW): Default to `disabled` for backward compatibility
253    SslMode::Disabled
254}
255
256impl ExternalTableConfig {
257    pub fn try_from_btreemap(
258        connect_properties: BTreeMap<String, String>,
259        secret_refs: BTreeMap<String, PbSecretRef>,
260    ) -> ConnectorResult<Self> {
261        let options_with_secret =
262            LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
263        let json_value = serde_json::to_value(options_with_secret)?;
264        let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
265        Ok(config)
266    }
267}
268
269impl ExternalTableReader for ExternalTableReaderImpl {
270    async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
271        match self {
272            ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
273            ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
274            ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
275            ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
276        }
277    }
278
279    fn snapshot_read(
280        &self,
281        table_name: SchemaTableName,
282        start_pk: Option<OwnedRow>,
283        primary_keys: Vec<String>,
284        limit: u32,
285    ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
286        self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
287    }
288}
289
290impl ExternalTableReaderImpl {
291    pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
292        match self {
293            ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
294            ExternalTableReaderImpl::Postgres(_) => {
295                PostgresExternalTableReader::get_cdc_offset_parser()
296            }
297            ExternalTableReaderImpl::SqlServer(_) => {
298                SqlServerExternalTableReader::get_cdc_offset_parser()
299            }
300            ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
301        }
302    }
303
304    #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
305    async fn snapshot_read_inner(
306        &self,
307        table_name: SchemaTableName,
308        start_pk: Option<OwnedRow>,
309        primary_keys: Vec<String>,
310        limit: u32,
311    ) {
312        let stream = match self {
313            ExternalTableReaderImpl::MySql(mysql) => {
314                mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
315            }
316            ExternalTableReaderImpl::Postgres(postgres) => {
317                postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
318            }
319            ExternalTableReaderImpl::SqlServer(sql_server) => {
320                sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
321            }
322            ExternalTableReaderImpl::Mock(mock) => {
323                mock.snapshot_read(table_name, start_pk, primary_keys, limit)
324            }
325        };
326
327        pin_mut!(stream);
328        #[for_await]
329        for row in stream {
330            let row = row?;
331            yield row;
332        }
333    }
334}
335
336pub enum ExternalTableImpl {
337    MySql(MySqlExternalTable),
338    Postgres(PostgresExternalTable),
339    SqlServer(SqlServerExternalTable),
340}
341
342impl ExternalTableImpl {
343    pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
344        let cdc_source_type = CdcSourceType::from(config.connector.as_str());
345        match cdc_source_type {
346            CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
347                MySqlExternalTable::connect(config).await?,
348            )),
349            CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
350                PostgresExternalTable::connect(
351                    &config.username,
352                    &config.password,
353                    &config.host,
354                    config.port.parse::<u16>().unwrap(),
355                    &config.database,
356                    &config.schema,
357                    &config.table,
358                    &config.ssl_mode,
359                    &config.ssl_root_cert,
360                    false,
361                )
362                .await?,
363            )),
364            CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
365                SqlServerExternalTable::connect(config).await?,
366            )),
367            _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
368        }
369    }
370
371    pub fn column_descs(&self) -> &Vec<ColumnDesc> {
372        match self {
373            ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
374            ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
375            ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
376        }
377    }
378
379    pub fn pk_names(&self) -> &Vec<String> {
380        match self {
381            ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
382            ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
383            ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
384        }
385    }
386}