risingwave_connector/source/cdc/
mod.rs1pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_common::id::{ActorId, SourceId};
27use risingwave_pb::catalog::PbSource;
28use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
29use risingwave_pb::plan_common::ExternalTableDesc;
30use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
31use risingwave_pb::source::{PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplitsWithGeneration};
32use risingwave_pb::stream_plan::StreamCdcScanOptions;
33use simd_json::prelude::ArrayTrait;
34pub use source::*;
35
36use crate::enforce_secret::EnforceSecret;
37use crate::error::ConnectorResult;
38use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
39use crate::{for_all_classified_sources, impl_cdc_source_type};
40
41pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
42pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
43pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
44pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
45pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
47pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
48pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
49pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
50pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
51pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
52pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
53pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
55pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
56pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
57
58pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
60
61pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
62pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
63pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
64pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
65pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
66
67pub fn build_cdc_table_id(source_id: SourceId, external_table_name: &str) -> String {
69 format!("{}.{}", source_id, external_table_name)
70}
71
72pub fn normalize_simple_postgres_quoted_table_name(table_name: &str) -> Option<String> {
73 let (schema_name, table_name) = table_name.split_once('.')?;
74 let table_name = table_name.strip_prefix('"')?.strip_suffix('"')?;
75 if schema_name.is_empty()
76 || table_name.is_empty()
77 || table_name.contains('.')
78 || table_name.contains('"')
79 || table_name.contains('\\')
80 || schema_name.contains('"')
81 {
82 return None;
83 }
84
85 Some(format!("{schema_name}.{table_name}"))
86}
87
88pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
89 const CDC_CONNECTOR_NAME: &'static str;
90 fn source_type() -> CdcSourceType;
91}
92
93for_all_classified_sources!(impl_cdc_source_type);
94
95impl<'a> From<&'a str> for CdcSourceType {
96 fn from(name: &'a str) -> Self {
97 match name {
98 MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
99 POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
100 CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
101 MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
102 SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
103 _ => CdcSourceType::Unspecified,
104 }
105 }
106}
107
108impl CdcSourceType {
109 pub fn as_str_name(&self) -> &str {
110 match self {
111 CdcSourceType::Mysql => "MySQL",
112 CdcSourceType::Postgres => "Postgres",
113 CdcSourceType::Citus => "Citus",
114 CdcSourceType::Mongodb => "MongoDB",
115 CdcSourceType::SqlServer => "SQL Server",
116 CdcSourceType::Unspecified => "Unspecified",
117 }
118 }
119}
120
121#[derive(Clone, Debug, Default)]
122pub struct CdcProperties<T: CdcSourceTypeTrait> {
123 pub properties: BTreeMap<String, String>,
125
126 pub table_schema: TableSchema,
128
129 pub is_cdc_source_job: bool,
131
132 pub is_backfill_table: bool,
134
135 pub _phantom: PhantomData<T>,
136}
137
138pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
139 TableSchema {
140 columns: table_schema
141 .columns
142 .iter()
143 .filter(|col| {
144 col.additional_column
145 .as_ref()
146 .is_some_and(|val| val.column_type.is_none())
147 })
148 .cloned()
149 .collect(),
150 pk_indices: table_schema.pk_indices.clone(),
151 }
152}
153
154impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
155 fn try_from_btreemap(
156 properties: BTreeMap<String, String>,
157 _deny_unknown_fields: bool,
158 ) -> ConnectorResult<Self> {
159 let is_share_source: bool = properties
160 .get(CDC_SHARING_MODE_KEY)
161 .is_some_and(|v| v == "true");
162 Ok(CdcProperties {
163 properties,
164 table_schema: Default::default(),
165 is_cdc_source_job: is_share_source,
167 is_backfill_table: false,
168 _phantom: PhantomData,
169 })
170 }
171}
172
173impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
176where
177 DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
178 DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
179{
180 type Split = DebeziumCdcSplit<T>;
181 type SplitEnumerator = DebeziumSplitEnumerator<T>;
182 type SplitReader = CdcSplitReader<T>;
183
184 const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
185
186 fn init_from_pb_source(&mut self, source: &PbSource) {
187 let pk_indices = source
188 .pk_column_ids
189 .iter()
190 .map(|&id| {
191 source
192 .columns
193 .iter()
194 .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
195 .unwrap() as u32
196 })
197 .collect_vec();
198
199 let table_schema = PbTableSchema {
200 columns: source
201 .columns
202 .iter()
203 .flat_map(|col| &col.column_desc)
204 .filter(|col| {
205 !matches!(
206 col.generated_or_default_column,
207 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
208 )
209 })
210 .cloned()
211 .collect(),
212 pk_indices,
213 };
214 self.table_schema = table_schema;
215 if let Some(info) = source.info.as_ref() {
216 self.is_cdc_source_job = info.is_shared();
217 }
218 }
219
220 fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
221 let table_schema = TableSchema {
222 columns: table_desc
223 .columns
224 .iter()
225 .filter(|col| {
226 !matches!(
227 col.generated_or_default_column,
228 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
229 )
230 })
231 .cloned()
232 .collect(),
233 pk_indices: table_desc.stream_key.clone(),
234 };
235
236 self.table_schema = table_schema;
237 self.is_cdc_source_job = false;
238 self.is_backfill_table = true;
239 }
240}
241
242impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
243 fn unknown_fields(&self) -> HashMap<String, String> {
244 HashMap::new()
246 }
247}
248
249impl<T: CdcSourceTypeTrait> CdcProperties<T> {
250 pub fn get_source_type_pb(&self) -> SourceType {
251 SourceType::from(T::source_type())
252 }
253}
254
255pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
256pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
257
258#[derive(Clone, Debug, PartialEq, Default)]
259pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
260 pub splits: HashMap<ActorId, (Vec<CdcTableSnapshotSplitRaw>, u64)>,
261}
262
263impl CdcTableSnapshotSplitAssignmentWithGeneration {
264 pub fn new(splits: HashMap<ActorId, (Vec<CdcTableSnapshotSplitRaw>, u64)>) -> Self {
265 Self { splits }
266 }
267
268 pub fn empty() -> Self {
269 Self {
270 splits: HashMap::default(),
271 }
272 }
273}
274
275pub fn build_cdc_table_snapshot_split(s: &CdcTableSnapshotSplitRaw) -> PbCdcTableSnapshotSplit {
276 PbCdcTableSnapshotSplit {
277 split_id: s.split_id,
278 left_bound_inclusive: s.left_bound_inclusive.clone(),
279 right_bound_exclusive: s.right_bound_exclusive.clone(),
280 }
281}
282
283pub fn build_actor_cdc_table_snapshot_splits_with_generation(
284 pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
285) -> CdcTableSnapshotSplitAssignmentWithGeneration {
286 let splits = pb_cdc_table_snapshot_split_assignment
287 .splits
288 .into_iter()
289 .map(|(actor_id, splits)| {
290 let generation = splits.generation;
291 let splits = splits
292 .splits
293 .into_iter()
294 .map(|s| CdcTableSnapshotSplitRaw {
295 split_id: s.split_id,
296 left_bound_inclusive: s.left_bound_inclusive,
297 right_bound_exclusive: s.right_bound_exclusive,
298 })
299 .collect();
300 (actor_id, (splits, generation))
301 })
302 .collect();
303 CdcTableSnapshotSplitAssignmentWithGeneration { splits }
304}
305
306#[derive(Debug, Clone, Hash, PartialEq)]
307pub struct CdcScanOptions {
308 pub disable_backfill: bool,
310 pub snapshot_barrier_interval: u32,
312 pub snapshot_batch_size: u32,
314 pub backfill_parallelism: u32,
316 pub backfill_num_rows_per_split: u64,
318 pub backfill_as_even_splits: bool,
320 pub backfill_split_pk_column_index: u32,
322}
323
324impl Default for CdcScanOptions {
325 fn default() -> Self {
326 Self {
327 disable_backfill: false,
328 snapshot_barrier_interval: 10,
329 snapshot_batch_size: 1000,
330 backfill_parallelism: 0,
332 backfill_num_rows_per_split: 100_000,
333 backfill_as_even_splits: true,
334 backfill_split_pk_column_index: 0,
335 }
336 }
337}
338
339impl CdcScanOptions {
340 pub fn to_proto(&self) -> StreamCdcScanOptions {
341 StreamCdcScanOptions {
342 disable_backfill: self.disable_backfill,
343 snapshot_barrier_interval: self.snapshot_barrier_interval,
344 snapshot_batch_size: self.snapshot_batch_size,
345 backfill_parallelism: self.backfill_parallelism,
346 backfill_num_rows_per_split: self.backfill_num_rows_per_split,
347 backfill_as_even_splits: self.backfill_as_even_splits,
348 backfill_split_pk_column_index: self.backfill_split_pk_column_index,
349 }
350 }
351
352 pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
353 Self {
354 disable_backfill: proto.disable_backfill,
355 snapshot_barrier_interval: proto.snapshot_barrier_interval,
356 snapshot_batch_size: proto.snapshot_batch_size,
357 backfill_parallelism: proto.backfill_parallelism,
358 backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
359 backfill_as_even_splits: proto.backfill_as_even_splits,
360 backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
361 }
362 }
363
364 pub fn is_parallelized_backfill(&self) -> bool {
365 !self.disable_backfill
366 && self.backfill_num_rows_per_split > 0
367 && self.backfill_parallelism > 0
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_normalize_simple_postgres_quoted_table_name() {
377 assert_eq!(
378 normalize_simple_postgres_quoted_table_name(r#"public."TableName""#).as_deref(),
379 Some("public.TableName")
380 );
381 assert_eq!(
382 normalize_simple_postgres_quoted_table_name(r#"public."pg_my_table""#).as_deref(),
383 Some("public.pg_my_table")
384 );
385 assert_eq!(
386 normalize_simple_postgres_quoted_table_name("public.table"),
387 None
388 );
389 assert_eq!(
390 normalize_simple_postgres_quoted_table_name(r#""Mixed.Schema"."TableName""#),
391 None
392 );
393 assert_eq!(
394 normalize_simple_postgres_quoted_table_name(r#"public."table.name""#),
395 None
396 );
397 assert_eq!(
398 normalize_simple_postgres_quoted_table_name(r#"public."table\name""#),
399 None
400 );
401 }
402}