risingwave_connector/source/cdc/
mod.rs1pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_pb::catalog::PbSource;
27use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
28use risingwave_pb::plan_common::ExternalTableDesc;
29use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
30use risingwave_pb::source::{
31 PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration,
32};
33use risingwave_pb::stream_plan::StreamCdcScanOptions;
34use simd_json::prelude::ArrayTrait;
35pub use source::*;
36
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
40use crate::{for_all_classified_sources, impl_cdc_source_type};
41
42pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
43pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
44pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
45pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
46pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
48pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
49pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
50pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
51pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
52pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
53pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
54pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
56pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
57pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
58
59pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
61
62pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
63pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
64pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
65pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
66pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
67
68pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
70 format!("{}.{}", source_id, external_table_name)
71}
72
73pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
74 const CDC_CONNECTOR_NAME: &'static str;
75 fn source_type() -> CdcSourceType;
76}
77
78for_all_classified_sources!(impl_cdc_source_type);
79
80impl<'a> From<&'a str> for CdcSourceType {
81 fn from(name: &'a str) -> Self {
82 match name {
83 MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
84 POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
85 CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
86 MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
87 SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
88 _ => CdcSourceType::Unspecified,
89 }
90 }
91}
92
93impl CdcSourceType {
94 pub fn as_str_name(&self) -> &str {
95 match self {
96 CdcSourceType::Mysql => "MySQL",
97 CdcSourceType::Postgres => "Postgres",
98 CdcSourceType::Citus => "Citus",
99 CdcSourceType::Mongodb => "MongoDB",
100 CdcSourceType::SqlServer => "SQL Server",
101 CdcSourceType::Unspecified => "Unspecified",
102 }
103 }
104}
105
106#[derive(Clone, Debug, Default)]
107pub struct CdcProperties<T: CdcSourceTypeTrait> {
108 pub properties: BTreeMap<String, String>,
110
111 pub table_schema: TableSchema,
113
114 pub is_cdc_source_job: bool,
116
117 pub is_backfill_table: bool,
119
120 pub _phantom: PhantomData<T>,
121}
122
123pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
124 TableSchema {
125 columns: table_schema
126 .columns
127 .iter()
128 .filter(|col| {
129 col.additional_column
130 .as_ref()
131 .is_some_and(|val| val.column_type.is_none())
132 })
133 .cloned()
134 .collect(),
135 pk_indices: table_schema.pk_indices.clone(),
136 }
137}
138
139impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
140 fn try_from_btreemap(
141 properties: BTreeMap<String, String>,
142 _deny_unknown_fields: bool,
143 ) -> ConnectorResult<Self> {
144 let is_share_source: bool = properties
145 .get(CDC_SHARING_MODE_KEY)
146 .is_some_and(|v| v == "true");
147 Ok(CdcProperties {
148 properties,
149 table_schema: Default::default(),
150 is_cdc_source_job: is_share_source,
152 is_backfill_table: false,
153 _phantom: PhantomData,
154 })
155 }
156}
157
158impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
161where
162 DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
163 DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
164{
165 type Split = DebeziumCdcSplit<T>;
166 type SplitEnumerator = DebeziumSplitEnumerator<T>;
167 type SplitReader = CdcSplitReader<T>;
168
169 const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
170
171 fn init_from_pb_source(&mut self, source: &PbSource) {
172 let pk_indices = source
173 .pk_column_ids
174 .iter()
175 .map(|&id| {
176 source
177 .columns
178 .iter()
179 .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
180 .unwrap() as u32
181 })
182 .collect_vec();
183
184 let table_schema = PbTableSchema {
185 columns: source
186 .columns
187 .iter()
188 .flat_map(|col| &col.column_desc)
189 .filter(|col| {
190 !matches!(
191 col.generated_or_default_column,
192 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
193 )
194 })
195 .cloned()
196 .collect(),
197 pk_indices,
198 };
199 self.table_schema = table_schema;
200 if let Some(info) = source.info.as_ref() {
201 self.is_cdc_source_job = info.is_shared();
202 }
203 }
204
205 fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
206 let table_schema = TableSchema {
207 columns: table_desc
208 .columns
209 .iter()
210 .filter(|col| {
211 !matches!(
212 col.generated_or_default_column,
213 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
214 )
215 })
216 .cloned()
217 .collect(),
218 pk_indices: table_desc.stream_key.clone(),
219 };
220
221 self.table_schema = table_schema;
222 self.is_cdc_source_job = false;
223 self.is_backfill_table = true;
224 }
225}
226
227impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
228 fn unknown_fields(&self) -> HashMap<String, String> {
229 HashMap::new()
231 }
232}
233
234impl<T: CdcSourceTypeTrait> CdcProperties<T> {
235 pub fn get_source_type_pb(&self) -> SourceType {
236 SourceType::from(T::source_type())
237 }
238}
239
240pub type CdcTableSnapshotSplitAssignment = HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>;
241
242pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
243pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
244
245#[derive(Clone, Debug, PartialEq, Default)]
246pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
247 pub splits: HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>,
248 pub generation: u64,
249}
250
251impl CdcTableSnapshotSplitAssignmentWithGeneration {
252 pub fn new(splits: HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>, generation: u64) -> Self {
253 Self { splits, generation }
254 }
255
256 pub fn empty() -> Self {
257 Self {
258 splits: HashMap::default(),
259 generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
260 }
261 }
262}
263
264pub fn build_pb_actor_cdc_table_snapshot_splits_with_generation(
265 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
266) -> PbCdcTableSnapshotSplitsWithGeneration {
267 let splits =
268 build_pb_actor_cdc_table_snapshot_splits(cdc_table_snapshot_split_assignment.splits);
269 PbCdcTableSnapshotSplitsWithGeneration {
270 splits,
271 generation: cdc_table_snapshot_split_assignment.generation,
272 }
273}
274
275pub fn build_pb_actor_cdc_table_snapshot_splits(
276 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
277) -> HashMap<u32, PbCdcTableSnapshotSplits> {
278 cdc_table_snapshot_split_assignment
279 .into_iter()
280 .map(|(actor_id, splits)| {
281 let splits = PbCdcTableSnapshotSplits {
282 splits: splits
283 .into_iter()
284 .map(|s| PbCdcTableSnapshotSplit {
285 split_id: s.split_id,
286 left_bound_inclusive: s.left_bound_inclusive,
287 right_bound_exclusive: s.right_bound_exclusive,
288 })
289 .collect(),
290 };
291 (actor_id, splits)
292 })
293 .collect()
294}
295
296pub fn build_actor_cdc_table_snapshot_splits_with_generation(
297 pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
298) -> CdcTableSnapshotSplitAssignmentWithGeneration {
299 let splits = pb_cdc_table_snapshot_split_assignment
300 .splits
301 .into_iter()
302 .map(|(actor_id, splits)| {
303 let splits = splits
304 .splits
305 .into_iter()
306 .map(|s| CdcTableSnapshotSplitRaw {
307 split_id: s.split_id,
308 left_bound_inclusive: s.left_bound_inclusive,
309 right_bound_exclusive: s.right_bound_exclusive,
310 })
311 .collect();
312 (actor_id, splits)
313 })
314 .collect();
315 let generation = pb_cdc_table_snapshot_split_assignment.generation;
316 CdcTableSnapshotSplitAssignmentWithGeneration { splits, generation }
317}
318
319#[derive(Debug, Clone, Hash, PartialEq)]
320pub struct CdcScanOptions {
321 pub disable_backfill: bool,
323 pub snapshot_barrier_interval: u32,
325 pub snapshot_batch_size: u32,
327 pub backfill_parallelism: u32,
329 pub backfill_num_rows_per_split: u64,
331 pub backfill_as_even_splits: bool,
333 pub backfill_split_pk_column_index: u32,
335}
336
337impl Default for CdcScanOptions {
338 fn default() -> Self {
339 Self {
340 disable_backfill: false,
341 snapshot_barrier_interval: 1,
342 snapshot_batch_size: 1000,
343 backfill_parallelism: 0,
345 backfill_num_rows_per_split: 100_000,
346 backfill_as_even_splits: true,
347 backfill_split_pk_column_index: 0,
348 }
349 }
350}
351
352impl CdcScanOptions {
353 pub fn to_proto(&self) -> StreamCdcScanOptions {
354 StreamCdcScanOptions {
355 disable_backfill: self.disable_backfill,
356 snapshot_barrier_interval: self.snapshot_barrier_interval,
357 snapshot_batch_size: self.snapshot_batch_size,
358 backfill_parallelism: self.backfill_parallelism,
359 backfill_num_rows_per_split: self.backfill_num_rows_per_split,
360 backfill_as_even_splits: self.backfill_as_even_splits,
361 backfill_split_pk_column_index: self.backfill_split_pk_column_index,
362 }
363 }
364
365 pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
366 Self {
367 disable_backfill: proto.disable_backfill,
368 snapshot_barrier_interval: proto.snapshot_barrier_interval,
369 snapshot_batch_size: proto.snapshot_batch_size,
370 backfill_parallelism: proto.backfill_parallelism,
371 backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
372 backfill_as_even_splits: proto.backfill_as_even_splits,
373 backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
374 }
375 }
376
377 pub fn is_parallelized_backfill(&self) -> bool {
378 !self.disable_backfill
379 && self.backfill_num_rows_per_split > 0
380 && self.backfill_parallelism > 0
381 }
382}