1pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::{Deserialize, Serialize};
33
34use crate::WithPropertiesExt;
35use crate::connector_common::{PostgresExternalTable, SslMode};
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::parser::mysql_row_to_owned_row;
38use crate::source::cdc::CdcSourceType;
39use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
40use crate::source::cdc::external::mysql::{
41 MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
42};
43use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
44use crate::source::cdc::external::sql_server::{
45 SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
46};
47
48#[derive(Debug, Clone)]
49pub enum CdcTableType {
50 Undefined,
51 Mock,
52 MySql,
53 Postgres,
54 SqlServer,
55 Citus,
56}
57
58impl CdcTableType {
59 pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
60 let connector = with_properties.get_connector().unwrap_or_default();
61 match connector.as_str() {
62 "mysql-cdc" => Self::MySql,
63 "postgres-cdc" => Self::Postgres,
64 "citus-cdc" => Self::Citus,
65 "sqlserver-cdc" => Self::SqlServer,
66 _ => Self::Undefined,
67 }
68 }
69
70 pub fn can_backfill(&self) -> bool {
71 matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
72 }
73
74 pub fn enable_transaction_metadata(&self) -> bool {
75 matches!(self, Self::MySql | Self::Postgres)
79 }
80
81 pub fn shareable_only(&self) -> bool {
82 matches!(self, Self::SqlServer)
83 }
84
85 pub async fn create_table_reader(
86 &self,
87 config: ExternalTableConfig,
88 schema: Schema,
89 pk_indices: Vec<usize>,
90 ) -> ConnectorResult<ExternalTableReaderImpl> {
91 match self {
92 Self::MySql => Ok(ExternalTableReaderImpl::MySql(
93 MySqlExternalTableReader::new(config, schema)?,
94 )),
95 Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
96 PostgresExternalTableReader::new(config, schema, pk_indices).await?,
97 )),
98 Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
99 SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
100 )),
101 Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
102 _ => bail!("invalid external table type: {:?}", *self),
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
108pub struct SchemaTableName {
109 pub schema_name: String,
111 pub table_name: String,
112}
113
114pub const TABLE_NAME_KEY: &str = "table.name";
115pub const SCHEMA_NAME_KEY: &str = "schema.name";
116pub const DATABASE_NAME_KEY: &str = "database.name";
117
118impl SchemaTableName {
119 pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
120 let table_type = CdcTableType::from_properties(properties);
121 let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
122
123 let schema_name = match table_type {
124 CdcTableType::MySql => properties
125 .get(DATABASE_NAME_KEY)
126 .cloned()
127 .unwrap_or_default(),
128 CdcTableType::Postgres | CdcTableType::Citus => {
129 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
130 }
131 CdcTableType::SqlServer => properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default(),
132 _ => {
133 unreachable!("invalid external table type: {:?}", table_type);
134 }
135 };
136
137 Self {
138 schema_name,
139 table_name,
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
145pub enum CdcOffset {
146 MySql(MySqlOffset),
147 Postgres(PostgresOffset),
148 SqlServer(SqlServerOffset),
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct DebeziumOffset {
168 #[serde(rename = "sourcePartition")]
169 pub source_partition: HashMap<String, String>,
170 #[serde(rename = "sourceOffset")]
171 pub source_offset: DebeziumSourceOffset,
172 #[serde(rename = "isHeartbeat")]
173 pub is_heartbeat: bool,
174}
175
176#[derive(Debug, Default, Clone, Serialize, Deserialize)]
177pub struct DebeziumSourceOffset {
178 pub last_snapshot_record: Option<bool>,
180 pub snapshot: Option<bool>,
182
183 pub file: Option<String>,
185 pub pos: Option<u64>,
186
187 pub lsn: Option<u64>,
189 #[serde(rename = "txId")]
190 pub txid: Option<i64>,
191 pub tx_usec: Option<u64>,
192
193 pub commit_lsn: Option<String>,
195 pub change_lsn: Option<String>,
196}
197
198pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
199
200pub trait ExternalTableReader: Sized {
201 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
202
203 #[allow(clippy::unused_async)]
205 async fn disconnect(self) -> ConnectorResult<()> {
206 Ok(())
207 }
208
209 fn snapshot_read(
210 &self,
211 table_name: SchemaTableName,
212 start_pk: Option<OwnedRow>,
213 primary_keys: Vec<String>,
214 limit: u32,
215 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
216}
217
218pub enum ExternalTableReaderImpl {
219 MySql(MySqlExternalTableReader),
220 Postgres(PostgresExternalTableReader),
221 SqlServer(SqlServerExternalTableReader),
222 Mock(MockExternalTableReader),
223}
224
225#[derive(Debug, Default, Clone, Deserialize)]
226pub struct ExternalTableConfig {
227 pub connector: String,
228
229 #[serde(rename = "hostname")]
230 pub host: String,
231 pub port: String,
232 pub username: String,
233 pub password: String,
234 #[serde(rename = "database.name")]
235 pub database: String,
236 #[serde(rename = "schema.name", default = "Default::default")]
237 pub schema: String,
238 #[serde(rename = "table.name")]
239 pub table: String,
240 #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
244 #[serde(alias = "debezium.database.sslmode")]
245 pub ssl_mode: SslMode,
246
247 #[serde(rename = "ssl.root.cert")]
248 #[serde(alias = "debezium.database.sslrootcert")]
249 pub ssl_root_cert: Option<String>,
250
251 #[serde(rename = "database.encrypt", default = "Default::default")]
254 pub encrypt: String,
255}
256
257fn postgres_ssl_mode_default() -> SslMode {
258 SslMode::Disabled
260}
261
262impl ExternalTableConfig {
263 pub fn try_from_btreemap(
264 connect_properties: BTreeMap<String, String>,
265 secret_refs: BTreeMap<String, PbSecretRef>,
266 ) -> ConnectorResult<Self> {
267 let options_with_secret =
268 LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
269 let json_value = serde_json::to_value(options_with_secret)?;
270 let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
271 Ok(config)
272 }
273}
274
275impl ExternalTableReader for ExternalTableReaderImpl {
276 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
277 match self {
278 ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
279 ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
280 ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
281 ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
282 }
283 }
284
285 fn snapshot_read(
286 &self,
287 table_name: SchemaTableName,
288 start_pk: Option<OwnedRow>,
289 primary_keys: Vec<String>,
290 limit: u32,
291 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
292 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
293 }
294}
295
296impl ExternalTableReaderImpl {
297 pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
298 match self {
299 ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
300 ExternalTableReaderImpl::Postgres(_) => {
301 PostgresExternalTableReader::get_cdc_offset_parser()
302 }
303 ExternalTableReaderImpl::SqlServer(_) => {
304 SqlServerExternalTableReader::get_cdc_offset_parser()
305 }
306 ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
307 }
308 }
309
310 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
311 async fn snapshot_read_inner(
312 &self,
313 table_name: SchemaTableName,
314 start_pk: Option<OwnedRow>,
315 primary_keys: Vec<String>,
316 limit: u32,
317 ) {
318 let stream = match self {
319 ExternalTableReaderImpl::MySql(mysql) => {
320 mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
321 }
322 ExternalTableReaderImpl::Postgres(postgres) => {
323 postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
324 }
325 ExternalTableReaderImpl::SqlServer(sql_server) => {
326 sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
327 }
328 ExternalTableReaderImpl::Mock(mock) => {
329 mock.snapshot_read(table_name, start_pk, primary_keys, limit)
330 }
331 };
332
333 pin_mut!(stream);
334 #[for_await]
335 for row in stream {
336 let row = row?;
337 yield row;
338 }
339 }
340}
341
342pub enum ExternalTableImpl {
343 MySql(MySqlExternalTable),
344 Postgres(PostgresExternalTable),
345 SqlServer(SqlServerExternalTable),
346}
347
348impl ExternalTableImpl {
349 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
350 let cdc_source_type = CdcSourceType::from(config.connector.as_str());
351 match cdc_source_type {
352 CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
353 MySqlExternalTable::connect(config).await?,
354 )),
355 CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
356 PostgresExternalTable::connect(
357 &config.username,
358 &config.password,
359 &config.host,
360 config.port.parse::<u16>().unwrap(),
361 &config.database,
362 &config.schema,
363 &config.table,
364 &config.ssl_mode,
365 &config.ssl_root_cert,
366 false,
367 )
368 .await?,
369 )),
370 CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
371 SqlServerExternalTable::connect(config).await?,
372 )),
373 _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
374 }
375 }
376
377 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
378 match self {
379 ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
380 ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
381 ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
382 }
383 }
384
385 pub fn pk_names(&self) -> &Vec<String> {
386 match self {
387 ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
388 ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
389 ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
390 }
391 }
392}