risingwave_connector/source/cdc/
mod.rs1pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_common::id::{ActorId, SourceId};
27use risingwave_pb::catalog::PbSource;
28use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
29use risingwave_pb::plan_common::ExternalTableDesc;
30use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
31use risingwave_pb::source::{PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplitsWithGeneration};
32use risingwave_pb::stream_plan::StreamCdcScanOptions;
33use simd_json::prelude::ArrayTrait;
34pub use source::*;
35
36use crate::enforce_secret::EnforceSecret;
37use crate::error::ConnectorResult;
38use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
39use crate::{for_all_classified_sources, impl_cdc_source_type};
40
41pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
42pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
43pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
44pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
45pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
47pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
48pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
49pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
50pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
51pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
52pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
53pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
55pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
56pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
57
58pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
60
61pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
62pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
63pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
64pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
65pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
66
67pub fn build_cdc_table_id(source_id: SourceId, external_table_name: &str) -> String {
69 format!("{}.{}", source_id, external_table_name)
70}
71
72pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
73 const CDC_CONNECTOR_NAME: &'static str;
74 fn source_type() -> CdcSourceType;
75}
76
77for_all_classified_sources!(impl_cdc_source_type);
78
79impl<'a> From<&'a str> for CdcSourceType {
80 fn from(name: &'a str) -> Self {
81 match name {
82 MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
83 POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
84 CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
85 MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
86 SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
87 _ => CdcSourceType::Unspecified,
88 }
89 }
90}
91
92impl CdcSourceType {
93 pub fn as_str_name(&self) -> &str {
94 match self {
95 CdcSourceType::Mysql => "MySQL",
96 CdcSourceType::Postgres => "Postgres",
97 CdcSourceType::Citus => "Citus",
98 CdcSourceType::Mongodb => "MongoDB",
99 CdcSourceType::SqlServer => "SQL Server",
100 CdcSourceType::Unspecified => "Unspecified",
101 }
102 }
103}
104
105#[derive(Clone, Debug, Default)]
106pub struct CdcProperties<T: CdcSourceTypeTrait> {
107 pub properties: BTreeMap<String, String>,
109
110 pub table_schema: TableSchema,
112
113 pub is_cdc_source_job: bool,
115
116 pub is_backfill_table: bool,
118
119 pub _phantom: PhantomData<T>,
120}
121
122pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
123 TableSchema {
124 columns: table_schema
125 .columns
126 .iter()
127 .filter(|col| {
128 col.additional_column
129 .as_ref()
130 .is_some_and(|val| val.column_type.is_none())
131 })
132 .cloned()
133 .collect(),
134 pk_indices: table_schema.pk_indices.clone(),
135 }
136}
137
138impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
139 fn try_from_btreemap(
140 properties: BTreeMap<String, String>,
141 _deny_unknown_fields: bool,
142 ) -> ConnectorResult<Self> {
143 let is_share_source: bool = properties
144 .get(CDC_SHARING_MODE_KEY)
145 .is_some_and(|v| v == "true");
146 Ok(CdcProperties {
147 properties,
148 table_schema: Default::default(),
149 is_cdc_source_job: is_share_source,
151 is_backfill_table: false,
152 _phantom: PhantomData,
153 })
154 }
155}
156
157impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
160where
161 DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
162 DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
163{
164 type Split = DebeziumCdcSplit<T>;
165 type SplitEnumerator = DebeziumSplitEnumerator<T>;
166 type SplitReader = CdcSplitReader<T>;
167
168 const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
169
170 fn init_from_pb_source(&mut self, source: &PbSource) {
171 let pk_indices = source
172 .pk_column_ids
173 .iter()
174 .map(|&id| {
175 source
176 .columns
177 .iter()
178 .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
179 .unwrap() as u32
180 })
181 .collect_vec();
182
183 let table_schema = PbTableSchema {
184 columns: source
185 .columns
186 .iter()
187 .flat_map(|col| &col.column_desc)
188 .filter(|col| {
189 !matches!(
190 col.generated_or_default_column,
191 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
192 )
193 })
194 .cloned()
195 .collect(),
196 pk_indices,
197 };
198 self.table_schema = table_schema;
199 if let Some(info) = source.info.as_ref() {
200 self.is_cdc_source_job = info.is_shared();
201 }
202 }
203
204 fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
205 let table_schema = TableSchema {
206 columns: table_desc
207 .columns
208 .iter()
209 .filter(|col| {
210 !matches!(
211 col.generated_or_default_column,
212 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
213 )
214 })
215 .cloned()
216 .collect(),
217 pk_indices: table_desc.stream_key.clone(),
218 };
219
220 self.table_schema = table_schema;
221 self.is_cdc_source_job = false;
222 self.is_backfill_table = true;
223 }
224}
225
226impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
227 fn unknown_fields(&self) -> HashMap<String, String> {
228 HashMap::new()
230 }
231}
232
233impl<T: CdcSourceTypeTrait> CdcProperties<T> {
234 pub fn get_source_type_pb(&self) -> SourceType {
235 SourceType::from(T::source_type())
236 }
237}
238
239pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
240pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
241
242#[derive(Clone, Debug, PartialEq, Default)]
243pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
244 pub splits: HashMap<ActorId, (Vec<CdcTableSnapshotSplitRaw>, u64)>,
245}
246
247impl CdcTableSnapshotSplitAssignmentWithGeneration {
248 pub fn new(splits: HashMap<ActorId, (Vec<CdcTableSnapshotSplitRaw>, u64)>) -> Self {
249 Self { splits }
250 }
251
252 pub fn empty() -> Self {
253 Self {
254 splits: HashMap::default(),
255 }
256 }
257}
258
259pub fn build_cdc_table_snapshot_split(s: &CdcTableSnapshotSplitRaw) -> PbCdcTableSnapshotSplit {
260 PbCdcTableSnapshotSplit {
261 split_id: s.split_id,
262 left_bound_inclusive: s.left_bound_inclusive.clone(),
263 right_bound_exclusive: s.right_bound_exclusive.clone(),
264 }
265}
266
267pub fn build_actor_cdc_table_snapshot_splits_with_generation(
268 pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
269) -> CdcTableSnapshotSplitAssignmentWithGeneration {
270 let splits = pb_cdc_table_snapshot_split_assignment
271 .splits
272 .into_iter()
273 .map(|(actor_id, splits)| {
274 let generation = splits.generation;
275 let splits = splits
276 .splits
277 .into_iter()
278 .map(|s| CdcTableSnapshotSplitRaw {
279 split_id: s.split_id,
280 left_bound_inclusive: s.left_bound_inclusive,
281 right_bound_exclusive: s.right_bound_exclusive,
282 })
283 .collect();
284 (actor_id, (splits, generation))
285 })
286 .collect();
287 CdcTableSnapshotSplitAssignmentWithGeneration { splits }
288}
289
290#[derive(Debug, Clone, Hash, PartialEq)]
291pub struct CdcScanOptions {
292 pub disable_backfill: bool,
294 pub snapshot_barrier_interval: u32,
296 pub snapshot_batch_size: u32,
298 pub backfill_parallelism: u32,
300 pub backfill_num_rows_per_split: u64,
302 pub backfill_as_even_splits: bool,
304 pub backfill_split_pk_column_index: u32,
306}
307
308impl Default for CdcScanOptions {
309 fn default() -> Self {
310 Self {
311 disable_backfill: false,
312 snapshot_barrier_interval: 1,
313 snapshot_batch_size: 1000,
314 backfill_parallelism: 0,
316 backfill_num_rows_per_split: 100_000,
317 backfill_as_even_splits: true,
318 backfill_split_pk_column_index: 0,
319 }
320 }
321}
322
323impl CdcScanOptions {
324 pub fn to_proto(&self) -> StreamCdcScanOptions {
325 StreamCdcScanOptions {
326 disable_backfill: self.disable_backfill,
327 snapshot_barrier_interval: self.snapshot_barrier_interval,
328 snapshot_batch_size: self.snapshot_batch_size,
329 backfill_parallelism: self.backfill_parallelism,
330 backfill_num_rows_per_split: self.backfill_num_rows_per_split,
331 backfill_as_even_splits: self.backfill_as_even_splits,
332 backfill_split_pk_column_index: self.backfill_split_pk_column_index,
333 }
334 }
335
336 pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
337 Self {
338 disable_backfill: proto.disable_backfill,
339 snapshot_barrier_interval: proto.snapshot_barrier_interval,
340 snapshot_batch_size: proto.snapshot_batch_size,
341 backfill_parallelism: proto.backfill_parallelism,
342 backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
343 backfill_as_even_splits: proto.backfill_as_even_splits,
344 backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
345 }
346 }
347
348 pub fn is_parallelized_backfill(&self) -> bool {
349 !self.disable_backfill
350 && self.backfill_num_rows_per_split > 0
351 && self.backfill_parallelism > 0
352 }
353}