1pub mod mock_external_table;
16pub mod postgres;
17pub mod sql_server;
18
19pub mod mysql;
20
21use std::collections::{BTreeMap, HashMap};
22
23use anyhow::anyhow;
24use futures::pin_mut;
25use futures::stream::BoxStream;
26use futures_async_stream::try_stream;
27use risingwave_common::bail;
28use risingwave_common::catalog::{ColumnDesc, Schema};
29use risingwave_common::row::OwnedRow;
30use risingwave_common::secret::LocalSecretManager;
31use risingwave_pb::secret::PbSecretRef;
32use serde_derive::{Deserialize, Serialize};
33
34use crate::WithPropertiesExt;
35use crate::connector_common::{PostgresExternalTable, SslMode};
36use crate::error::{ConnectorError, ConnectorResult};
37use crate::parser::mysql_row_to_owned_row;
38use crate::source::cdc::CdcSourceType;
39use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
40use crate::source::cdc::external::mysql::{
41 MySqlExternalTable, MySqlExternalTableReader, MySqlOffset,
42};
43use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};
44use crate::source::cdc::external::sql_server::{
45 SqlServerExternalTable, SqlServerExternalTableReader, SqlServerOffset,
46};
47
48#[derive(Debug, Clone)]
49pub enum CdcTableType {
50 Undefined,
51 Mock,
52 MySql,
53 Postgres,
54 SqlServer,
55 Citus,
56}
57
58impl CdcTableType {
59 pub fn from_properties(with_properties: &impl WithPropertiesExt) -> Self {
60 let connector = with_properties.get_connector().unwrap_or_default();
61 match connector.as_str() {
62 "mysql-cdc" => Self::MySql,
63 "postgres-cdc" => Self::Postgres,
64 "citus-cdc" => Self::Citus,
65 "sqlserver-cdc" => Self::SqlServer,
66 _ => Self::Undefined,
67 }
68 }
69
70 pub fn can_backfill(&self) -> bool {
71 matches!(self, Self::MySql | Self::Postgres | Self::SqlServer)
72 }
73
74 pub fn enable_transaction_metadata(&self) -> bool {
75 matches!(self, Self::MySql | Self::Postgres)
79 }
80
81 pub fn shareable_only(&self) -> bool {
82 matches!(self, Self::SqlServer)
83 }
84
85 pub async fn create_table_reader(
86 &self,
87 config: ExternalTableConfig,
88 schema: Schema,
89 pk_indices: Vec<usize>,
90 ) -> ConnectorResult<ExternalTableReaderImpl> {
91 match self {
92 Self::MySql => Ok(ExternalTableReaderImpl::MySql(
93 MySqlExternalTableReader::new(config, schema).await?,
94 )),
95 Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
96 PostgresExternalTableReader::new(config, schema, pk_indices).await?,
97 )),
98 Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
99 SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
100 )),
101 Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
102 _ => bail!("invalid external table type: {:?}", *self),
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
108pub struct SchemaTableName {
109 pub schema_name: String,
111 pub table_name: String,
112}
113
114pub const TABLE_NAME_KEY: &str = "table.name";
115pub const SCHEMA_NAME_KEY: &str = "schema.name";
116pub const DATABASE_NAME_KEY: &str = "database.name";
117
118impl SchemaTableName {
119 pub fn from_properties(properties: &BTreeMap<String, String>) -> Self {
120 let table_type = CdcTableType::from_properties(properties);
121 let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default();
122
123 let schema_name = match table_type {
124 CdcTableType::MySql => properties
125 .get(DATABASE_NAME_KEY)
126 .cloned()
127 .unwrap_or_default(),
128 CdcTableType::Postgres | CdcTableType::Citus => {
129 properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default()
130 }
131 CdcTableType::SqlServer => properties.get(SCHEMA_NAME_KEY).cloned().unwrap_or_default(),
132 _ => {
133 unreachable!("invalid external table type: {:?}", table_type);
134 }
135 };
136
137 Self {
138 schema_name,
139 table_name,
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, PartialOrd, Serialize, Deserialize)]
145pub enum CdcOffset {
146 MySql(MySqlOffset),
147 Postgres(PostgresOffset),
148 SqlServer(SqlServerOffset),
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct DebeziumOffset {
168 #[serde(rename = "sourcePartition")]
169 pub source_partition: HashMap<String, String>,
170 #[serde(rename = "sourceOffset")]
171 pub source_offset: DebeziumSourceOffset,
172 #[serde(rename = "isHeartbeat")]
173 pub is_heartbeat: bool,
174}
175
176#[derive(Debug, Default, Clone, Serialize, Deserialize)]
177pub struct DebeziumSourceOffset {
178 pub last_snapshot_record: Option<bool>,
180 pub snapshot: Option<bool>,
182
183 pub file: Option<String>,
185 pub pos: Option<u64>,
186
187 pub lsn: Option<u64>,
189 #[serde(rename = "txId")]
190 pub txid: Option<i64>,
191 pub tx_usec: Option<u64>,
192
193 pub commit_lsn: Option<String>,
195 pub change_lsn: Option<String>,
196}
197
198pub type CdcOffsetParseFunc = Box<dyn Fn(&str) -> ConnectorResult<CdcOffset> + Send>;
199
200pub trait ExternalTableReader {
201 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset>;
202
203 fn snapshot_read(
204 &self,
205 table_name: SchemaTableName,
206 start_pk: Option<OwnedRow>,
207 primary_keys: Vec<String>,
208 limit: u32,
209 ) -> BoxStream<'_, ConnectorResult<OwnedRow>>;
210}
211
212pub enum ExternalTableReaderImpl {
213 MySql(MySqlExternalTableReader),
214 Postgres(PostgresExternalTableReader),
215 SqlServer(SqlServerExternalTableReader),
216 Mock(MockExternalTableReader),
217}
218
219#[derive(Debug, Default, Clone, Deserialize)]
220pub struct ExternalTableConfig {
221 pub connector: String,
222
223 #[serde(rename = "hostname")]
224 pub host: String,
225 pub port: String,
226 pub username: String,
227 pub password: String,
228 #[serde(rename = "database.name")]
229 pub database: String,
230 #[serde(rename = "schema.name", default = "Default::default")]
231 pub schema: String,
232 #[serde(rename = "table.name")]
233 pub table: String,
234 #[serde(rename = "ssl.mode", default = "postgres_ssl_mode_default")]
238 #[serde(alias = "debezium.database.sslmode")]
239 pub ssl_mode: SslMode,
240
241 #[serde(rename = "ssl.root.cert")]
242 #[serde(alias = "debezium.database.sslrootcert")]
243 pub ssl_root_cert: Option<String>,
244
245 #[serde(rename = "database.encrypt", default = "Default::default")]
248 pub encrypt: String,
249}
250
251fn postgres_ssl_mode_default() -> SslMode {
252 SslMode::Disabled
254}
255
256impl ExternalTableConfig {
257 pub fn try_from_btreemap(
258 connect_properties: BTreeMap<String, String>,
259 secret_refs: BTreeMap<String, PbSecretRef>,
260 ) -> ConnectorResult<Self> {
261 let options_with_secret =
262 LocalSecretManager::global().fill_secrets(connect_properties, secret_refs)?;
263 let json_value = serde_json::to_value(options_with_secret)?;
264 let config = serde_json::from_value::<ExternalTableConfig>(json_value)?;
265 Ok(config)
266 }
267}
268
269impl ExternalTableReader for ExternalTableReaderImpl {
270 async fn current_cdc_offset(&self) -> ConnectorResult<CdcOffset> {
271 match self {
272 ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await,
273 ExternalTableReaderImpl::Postgres(postgres) => postgres.current_cdc_offset().await,
274 ExternalTableReaderImpl::SqlServer(sql_server) => sql_server.current_cdc_offset().await,
275 ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await,
276 }
277 }
278
279 fn snapshot_read(
280 &self,
281 table_name: SchemaTableName,
282 start_pk: Option<OwnedRow>,
283 primary_keys: Vec<String>,
284 limit: u32,
285 ) -> BoxStream<'_, ConnectorResult<OwnedRow>> {
286 self.snapshot_read_inner(table_name, start_pk, primary_keys, limit)
287 }
288}
289
290impl ExternalTableReaderImpl {
291 pub fn get_cdc_offset_parser(&self) -> CdcOffsetParseFunc {
292 match self {
293 ExternalTableReaderImpl::MySql(_) => MySqlExternalTableReader::get_cdc_offset_parser(),
294 ExternalTableReaderImpl::Postgres(_) => {
295 PostgresExternalTableReader::get_cdc_offset_parser()
296 }
297 ExternalTableReaderImpl::SqlServer(_) => {
298 SqlServerExternalTableReader::get_cdc_offset_parser()
299 }
300 ExternalTableReaderImpl::Mock(_) => MockExternalTableReader::get_cdc_offset_parser(),
301 }
302 }
303
304 #[try_stream(boxed, ok = OwnedRow, error = ConnectorError)]
305 async fn snapshot_read_inner(
306 &self,
307 table_name: SchemaTableName,
308 start_pk: Option<OwnedRow>,
309 primary_keys: Vec<String>,
310 limit: u32,
311 ) {
312 let stream = match self {
313 ExternalTableReaderImpl::MySql(mysql) => {
314 mysql.snapshot_read(table_name, start_pk, primary_keys, limit)
315 }
316 ExternalTableReaderImpl::Postgres(postgres) => {
317 postgres.snapshot_read(table_name, start_pk, primary_keys, limit)
318 }
319 ExternalTableReaderImpl::SqlServer(sql_server) => {
320 sql_server.snapshot_read(table_name, start_pk, primary_keys, limit)
321 }
322 ExternalTableReaderImpl::Mock(mock) => {
323 mock.snapshot_read(table_name, start_pk, primary_keys, limit)
324 }
325 };
326
327 pin_mut!(stream);
328 #[for_await]
329 for row in stream {
330 let row = row?;
331 yield row;
332 }
333 }
334}
335
336pub enum ExternalTableImpl {
337 MySql(MySqlExternalTable),
338 Postgres(PostgresExternalTable),
339 SqlServer(SqlServerExternalTable),
340}
341
342impl ExternalTableImpl {
343 pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
344 let cdc_source_type = CdcSourceType::from(config.connector.as_str());
345 match cdc_source_type {
346 CdcSourceType::Mysql => Ok(ExternalTableImpl::MySql(
347 MySqlExternalTable::connect(config).await?,
348 )),
349 CdcSourceType::Postgres => Ok(ExternalTableImpl::Postgres(
350 PostgresExternalTable::connect(
351 &config.username,
352 &config.password,
353 &config.host,
354 config.port.parse::<u16>().unwrap(),
355 &config.database,
356 &config.schema,
357 &config.table,
358 &config.ssl_mode,
359 &config.ssl_root_cert,
360 false,
361 )
362 .await?,
363 )),
364 CdcSourceType::SqlServer => Ok(ExternalTableImpl::SqlServer(
365 SqlServerExternalTable::connect(config).await?,
366 )),
367 _ => Err(anyhow!("Unsupported cdc connector type: {}", config.connector).into()),
368 }
369 }
370
371 pub fn column_descs(&self) -> &Vec<ColumnDesc> {
372 match self {
373 ExternalTableImpl::MySql(mysql) => mysql.column_descs(),
374 ExternalTableImpl::Postgres(postgres) => postgres.column_descs(),
375 ExternalTableImpl::SqlServer(sql_server) => sql_server.column_descs(),
376 }
377 }
378
379 pub fn pk_names(&self) -> &Vec<String> {
380 match self {
381 ExternalTableImpl::MySql(mysql) => mysql.pk_names(),
382 ExternalTableImpl::Postgres(postgres) => postgres.pk_names(),
383 ExternalTableImpl::SqlServer(sql_server) => sql_server.pk_names(),
384 }
385 }
386}