risingwave_connector/source/cdc/
mod.rs1pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_common::id::{ActorId, SourceId};
27use risingwave_pb::catalog::PbSource;
28use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
29use risingwave_pb::plan_common::ExternalTableDesc;
30use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
31use risingwave_pb::source::{
32 PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration,
33};
34use risingwave_pb::stream_plan::StreamCdcScanOptions;
35use simd_json::prelude::ArrayTrait;
36pub use source::*;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::error::ConnectorResult;
40use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
41use crate::{for_all_classified_sources, impl_cdc_source_type};
42
43pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
44pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
45pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
46pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
47pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
49pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
50pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
51pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
52pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
53pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
54pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
55pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
57pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
58pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
59
60pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
62
63pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
64pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
65pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
66pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
67pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
68
69pub fn build_cdc_table_id(source_id: SourceId, external_table_name: &str) -> String {
71 format!("{}.{}", source_id, external_table_name)
72}
73
74pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
75 const CDC_CONNECTOR_NAME: &'static str;
76 fn source_type() -> CdcSourceType;
77}
78
79for_all_classified_sources!(impl_cdc_source_type);
80
81impl<'a> From<&'a str> for CdcSourceType {
82 fn from(name: &'a str) -> Self {
83 match name {
84 MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
85 POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
86 CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
87 MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
88 SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
89 _ => CdcSourceType::Unspecified,
90 }
91 }
92}
93
94impl CdcSourceType {
95 pub fn as_str_name(&self) -> &str {
96 match self {
97 CdcSourceType::Mysql => "MySQL",
98 CdcSourceType::Postgres => "Postgres",
99 CdcSourceType::Citus => "Citus",
100 CdcSourceType::Mongodb => "MongoDB",
101 CdcSourceType::SqlServer => "SQL Server",
102 CdcSourceType::Unspecified => "Unspecified",
103 }
104 }
105}
106
107#[derive(Clone, Debug, Default)]
108pub struct CdcProperties<T: CdcSourceTypeTrait> {
109 pub properties: BTreeMap<String, String>,
111
112 pub table_schema: TableSchema,
114
115 pub is_cdc_source_job: bool,
117
118 pub is_backfill_table: bool,
120
121 pub _phantom: PhantomData<T>,
122}
123
124pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
125 TableSchema {
126 columns: table_schema
127 .columns
128 .iter()
129 .filter(|col| {
130 col.additional_column
131 .as_ref()
132 .is_some_and(|val| val.column_type.is_none())
133 })
134 .cloned()
135 .collect(),
136 pk_indices: table_schema.pk_indices.clone(),
137 }
138}
139
140impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
141 fn try_from_btreemap(
142 properties: BTreeMap<String, String>,
143 _deny_unknown_fields: bool,
144 ) -> ConnectorResult<Self> {
145 let is_share_source: bool = properties
146 .get(CDC_SHARING_MODE_KEY)
147 .is_some_and(|v| v == "true");
148 Ok(CdcProperties {
149 properties,
150 table_schema: Default::default(),
151 is_cdc_source_job: is_share_source,
153 is_backfill_table: false,
154 _phantom: PhantomData,
155 })
156 }
157}
158
159impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
162where
163 DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
164 DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
165{
166 type Split = DebeziumCdcSplit<T>;
167 type SplitEnumerator = DebeziumSplitEnumerator<T>;
168 type SplitReader = CdcSplitReader<T>;
169
170 const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
171
172 fn init_from_pb_source(&mut self, source: &PbSource) {
173 let pk_indices = source
174 .pk_column_ids
175 .iter()
176 .map(|&id| {
177 source
178 .columns
179 .iter()
180 .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
181 .unwrap() as u32
182 })
183 .collect_vec();
184
185 let table_schema = PbTableSchema {
186 columns: source
187 .columns
188 .iter()
189 .flat_map(|col| &col.column_desc)
190 .filter(|col| {
191 !matches!(
192 col.generated_or_default_column,
193 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
194 )
195 })
196 .cloned()
197 .collect(),
198 pk_indices,
199 };
200 self.table_schema = table_schema;
201 if let Some(info) = source.info.as_ref() {
202 self.is_cdc_source_job = info.is_shared();
203 }
204 }
205
206 fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
207 let table_schema = TableSchema {
208 columns: table_desc
209 .columns
210 .iter()
211 .filter(|col| {
212 !matches!(
213 col.generated_or_default_column,
214 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
215 )
216 })
217 .cloned()
218 .collect(),
219 pk_indices: table_desc.stream_key.clone(),
220 };
221
222 self.table_schema = table_schema;
223 self.is_cdc_source_job = false;
224 self.is_backfill_table = true;
225 }
226}
227
228impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
229 fn unknown_fields(&self) -> HashMap<String, String> {
230 HashMap::new()
232 }
233}
234
235impl<T: CdcSourceTypeTrait> CdcProperties<T> {
236 pub fn get_source_type_pb(&self) -> SourceType {
237 SourceType::from(T::source_type())
238 }
239}
240
241pub type CdcTableSnapshotSplitAssignment = HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>;
242
243pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
244pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
245
246#[derive(Clone, Debug, PartialEq, Default)]
247pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
248 pub splits: HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>,
249 pub generation: u64,
250}
251
252impl CdcTableSnapshotSplitAssignmentWithGeneration {
253 pub fn new(splits: HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>, generation: u64) -> Self {
254 Self { splits, generation }
255 }
256
257 pub fn empty() -> Self {
258 Self {
259 splits: HashMap::default(),
260 generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
261 }
262 }
263}
264
265pub fn build_pb_actor_cdc_table_snapshot_splits_with_generation(
266 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
267) -> PbCdcTableSnapshotSplitsWithGeneration {
268 let splits =
269 build_pb_actor_cdc_table_snapshot_splits(cdc_table_snapshot_split_assignment.splits);
270 PbCdcTableSnapshotSplitsWithGeneration {
271 splits,
272 generation: cdc_table_snapshot_split_assignment.generation,
273 }
274}
275
276pub fn build_pb_actor_cdc_table_snapshot_splits(
277 cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
278) -> HashMap<ActorId, PbCdcTableSnapshotSplits> {
279 cdc_table_snapshot_split_assignment
280 .into_iter()
281 .map(|(actor_id, splits)| {
282 let splits = PbCdcTableSnapshotSplits {
283 splits: splits
284 .into_iter()
285 .map(|s| PbCdcTableSnapshotSplit {
286 split_id: s.split_id,
287 left_bound_inclusive: s.left_bound_inclusive,
288 right_bound_exclusive: s.right_bound_exclusive,
289 })
290 .collect(),
291 };
292 (actor_id, splits)
293 })
294 .collect()
295}
296
297pub fn build_actor_cdc_table_snapshot_splits_with_generation(
298 pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
299) -> CdcTableSnapshotSplitAssignmentWithGeneration {
300 let splits = pb_cdc_table_snapshot_split_assignment
301 .splits
302 .into_iter()
303 .map(|(actor_id, splits)| {
304 let splits = splits
305 .splits
306 .into_iter()
307 .map(|s| CdcTableSnapshotSplitRaw {
308 split_id: s.split_id,
309 left_bound_inclusive: s.left_bound_inclusive,
310 right_bound_exclusive: s.right_bound_exclusive,
311 })
312 .collect();
313 (actor_id, splits)
314 })
315 .collect();
316 let generation = pb_cdc_table_snapshot_split_assignment.generation;
317 CdcTableSnapshotSplitAssignmentWithGeneration { splits, generation }
318}
319
320#[derive(Debug, Clone, Hash, PartialEq)]
321pub struct CdcScanOptions {
322 pub disable_backfill: bool,
324 pub snapshot_barrier_interval: u32,
326 pub snapshot_batch_size: u32,
328 pub backfill_parallelism: u32,
330 pub backfill_num_rows_per_split: u64,
332 pub backfill_as_even_splits: bool,
334 pub backfill_split_pk_column_index: u32,
336}
337
338impl Default for CdcScanOptions {
339 fn default() -> Self {
340 Self {
341 disable_backfill: false,
342 snapshot_barrier_interval: 1,
343 snapshot_batch_size: 1000,
344 backfill_parallelism: 0,
346 backfill_num_rows_per_split: 100_000,
347 backfill_as_even_splits: true,
348 backfill_split_pk_column_index: 0,
349 }
350 }
351}
352
353impl CdcScanOptions {
354 pub fn to_proto(&self) -> StreamCdcScanOptions {
355 StreamCdcScanOptions {
356 disable_backfill: self.disable_backfill,
357 snapshot_barrier_interval: self.snapshot_barrier_interval,
358 snapshot_batch_size: self.snapshot_batch_size,
359 backfill_parallelism: self.backfill_parallelism,
360 backfill_num_rows_per_split: self.backfill_num_rows_per_split,
361 backfill_as_even_splits: self.backfill_as_even_splits,
362 backfill_split_pk_column_index: self.backfill_split_pk_column_index,
363 }
364 }
365
366 pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
367 Self {
368 disable_backfill: proto.disable_backfill,
369 snapshot_barrier_interval: proto.snapshot_barrier_interval,
370 snapshot_batch_size: proto.snapshot_batch_size,
371 backfill_parallelism: proto.backfill_parallelism,
372 backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
373 backfill_as_even_splits: proto.backfill_as_even_splits,
374 backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
375 }
376 }
377
378 pub fn is_parallelized_backfill(&self) -> bool {
379 !self.disable_backfill
380 && self.backfill_num_rows_per_split > 0
381 && self.backfill_parallelism > 0
382 }
383}