risingwave_connector/source/cdc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_pb::catalog::PbSource;
27use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
28use risingwave_pb::plan_common::ExternalTableDesc;
29use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
30use risingwave_pb::source::{
31    PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration,
32};
33use risingwave_pb::stream_plan::StreamCdcScanOptions;
34use simd_json::prelude::ArrayTrait;
35pub use source::*;
36
37use crate::enforce_secret::EnforceSecret;
38use crate::error::ConnectorResult;
39use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
40use crate::{for_all_classified_sources, impl_cdc_source_type};
41
42pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
43pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
44pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
45pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
46// User can set snapshot='false' to disable cdc backfill
47pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
48pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
49pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
50pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
51pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
52pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
53pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
54// We enable transaction for shared cdc source by default
55pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
56pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
57pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
58
59// User can set strong-schema='true' to enable strong schema for mongo cdc source
60pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
61
62pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
63pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
64pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
65pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
66pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
67
68/// Build a unique CDC table identifier from a source ID and external table name
69pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
70    format!("{}.{}", source_id, external_table_name)
71}
72
73pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
74    const CDC_CONNECTOR_NAME: &'static str;
75    fn source_type() -> CdcSourceType;
76}
77
78for_all_classified_sources!(impl_cdc_source_type);
79
80impl<'a> From<&'a str> for CdcSourceType {
81    fn from(name: &'a str) -> Self {
82        match name {
83            MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
84            POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
85            CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
86            MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
87            SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
88            _ => CdcSourceType::Unspecified,
89        }
90    }
91}
92
93impl CdcSourceType {
94    pub fn as_str_name(&self) -> &str {
95        match self {
96            CdcSourceType::Mysql => "MySQL",
97            CdcSourceType::Postgres => "Postgres",
98            CdcSourceType::Citus => "Citus",
99            CdcSourceType::Mongodb => "MongoDB",
100            CdcSourceType::SqlServer => "SQL Server",
101            CdcSourceType::Unspecified => "Unspecified",
102        }
103    }
104}
105
106#[derive(Clone, Debug, Default)]
107pub struct CdcProperties<T: CdcSourceTypeTrait> {
108    /// Properties specified in the WITH clause by user
109    pub properties: BTreeMap<String, String>,
110
111    /// Schema of the source specified by users
112    pub table_schema: TableSchema,
113
114    /// Whether it is created by a cdc source job
115    pub is_cdc_source_job: bool,
116
117    /// For validation purpose, mark if the table is a backfill cdc table
118    pub is_backfill_table: bool,
119
120    pub _phantom: PhantomData<T>,
121}
122
123pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
124    TableSchema {
125        columns: table_schema
126            .columns
127            .iter()
128            .filter(|col| {
129                col.additional_column
130                    .as_ref()
131                    .is_some_and(|val| val.column_type.is_none())
132            })
133            .cloned()
134            .collect(),
135        pk_indices: table_schema.pk_indices.clone(),
136    }
137}
138
139impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
140    fn try_from_btreemap(
141        properties: BTreeMap<String, String>,
142        _deny_unknown_fields: bool,
143    ) -> ConnectorResult<Self> {
144        let is_share_source: bool = properties
145            .get(CDC_SHARING_MODE_KEY)
146            .is_some_and(|v| v == "true");
147        Ok(CdcProperties {
148            properties,
149            table_schema: Default::default(),
150            // TODO(siyuan): use serde to deserialize input hashmap
151            is_cdc_source_job: is_share_source,
152            is_backfill_table: false,
153            _phantom: PhantomData,
154        })
155    }
156}
157
158impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} // todo: enforce jdbc like properties
159
160impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
161where
162    DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
163    DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
164{
165    type Split = DebeziumCdcSplit<T>;
166    type SplitEnumerator = DebeziumSplitEnumerator<T>;
167    type SplitReader = CdcSplitReader<T>;
168
169    const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
170
171    fn init_from_pb_source(&mut self, source: &PbSource) {
172        let pk_indices = source
173            .pk_column_ids
174            .iter()
175            .map(|&id| {
176                source
177                    .columns
178                    .iter()
179                    .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
180                    .unwrap() as u32
181            })
182            .collect_vec();
183
184        let table_schema = PbTableSchema {
185            columns: source
186                .columns
187                .iter()
188                .flat_map(|col| &col.column_desc)
189                .filter(|col| {
190                    !matches!(
191                        col.generated_or_default_column,
192                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
193                    )
194                })
195                .cloned()
196                .collect(),
197            pk_indices,
198        };
199        self.table_schema = table_schema;
200        if let Some(info) = source.info.as_ref() {
201            self.is_cdc_source_job = info.is_shared();
202        }
203    }
204
205    fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
206        let table_schema = TableSchema {
207            columns: table_desc
208                .columns
209                .iter()
210                .filter(|col| {
211                    !matches!(
212                        col.generated_or_default_column,
213                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
214                    )
215                })
216                .cloned()
217                .collect(),
218            pk_indices: table_desc.stream_key.clone(),
219        };
220
221        self.table_schema = table_schema;
222        self.is_cdc_source_job = false;
223        self.is_backfill_table = true;
224    }
225}
226
227impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
228    fn unknown_fields(&self) -> HashMap<String, String> {
229        // FIXME: CDC does not handle unknown fields yet
230        HashMap::new()
231    }
232}
233
234impl<T: CdcSourceTypeTrait> CdcProperties<T> {
235    pub fn get_source_type_pb(&self) -> SourceType {
236        SourceType::from(T::source_type())
237    }
238}
239
240pub type CdcTableSnapshotSplitAssignment = HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>;
241
242pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
243pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
244
245#[derive(Clone, Debug, PartialEq, Default)]
246pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
247    pub splits: HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>,
248    pub generation: u64,
249}
250
251impl CdcTableSnapshotSplitAssignmentWithGeneration {
252    pub fn new(splits: HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>, generation: u64) -> Self {
253        Self { splits, generation }
254    }
255
256    pub fn empty() -> Self {
257        Self {
258            splits: HashMap::default(),
259            generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
260        }
261    }
262}
263
264pub fn build_pb_actor_cdc_table_snapshot_splits_with_generation(
265    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
266) -> PbCdcTableSnapshotSplitsWithGeneration {
267    let splits =
268        build_pb_actor_cdc_table_snapshot_splits(cdc_table_snapshot_split_assignment.splits);
269    PbCdcTableSnapshotSplitsWithGeneration {
270        splits,
271        generation: cdc_table_snapshot_split_assignment.generation,
272    }
273}
274
275pub fn build_pb_actor_cdc_table_snapshot_splits(
276    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
277) -> HashMap<u32, PbCdcTableSnapshotSplits> {
278    cdc_table_snapshot_split_assignment
279        .into_iter()
280        .map(|(actor_id, splits)| {
281            let splits = PbCdcTableSnapshotSplits {
282                splits: splits
283                    .into_iter()
284                    .map(|s| PbCdcTableSnapshotSplit {
285                        split_id: s.split_id,
286                        left_bound_inclusive: s.left_bound_inclusive,
287                        right_bound_exclusive: s.right_bound_exclusive,
288                    })
289                    .collect(),
290            };
291            (actor_id, splits)
292        })
293        .collect()
294}
295
296pub fn build_actor_cdc_table_snapshot_splits_with_generation(
297    pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
298) -> CdcTableSnapshotSplitAssignmentWithGeneration {
299    let splits = pb_cdc_table_snapshot_split_assignment
300        .splits
301        .into_iter()
302        .map(|(actor_id, splits)| {
303            let splits = splits
304                .splits
305                .into_iter()
306                .map(|s| CdcTableSnapshotSplitRaw {
307                    split_id: s.split_id,
308                    left_bound_inclusive: s.left_bound_inclusive,
309                    right_bound_exclusive: s.right_bound_exclusive,
310                })
311                .collect();
312            (actor_id, splits)
313        })
314        .collect();
315    let generation = pb_cdc_table_snapshot_split_assignment.generation;
316    CdcTableSnapshotSplitAssignmentWithGeneration { splits, generation }
317}
318
319#[derive(Debug, Clone, Hash, PartialEq)]
320pub struct CdcScanOptions {
321    /// Used by Used in non-parallel backfill, i.e. backfill V1.
322    pub disable_backfill: bool,
323    /// Used by non-parallelized backfill. The frequency of snapshot read resets for consuming the WAL backlog.
324    pub snapshot_barrier_interval: u32,
325    /// Used by non-parallelized backfill. The number of rows to fetch in a single batch when reading from an external table.
326    pub snapshot_batch_size: u32,
327    /// Used by parallelized backfill, i.e. backfill V2. The initial parallelism of parallel backfill.
328    pub backfill_parallelism: u32,
329    /// Used by parallelized backfill. The estimated number of rows per split used in splits generation.
330    pub backfill_num_rows_per_split: u64,
331    /// Used by parallelized backfill. For supported split column data type, assume an uniform distribution and adopt a much faster splits generation method.
332    pub backfill_as_even_splits: bool,
333    /// Used by parallelized backfill. Specify the index of primary key column to use as split column.
334    pub backfill_split_pk_column_index: u32,
335}
336
337impl Default for CdcScanOptions {
338    fn default() -> Self {
339        Self {
340            disable_backfill: false,
341            snapshot_barrier_interval: 1,
342            snapshot_batch_size: 1000,
343            // 0 means disable backfill v2.
344            backfill_parallelism: 0,
345            backfill_num_rows_per_split: 100_000,
346            backfill_as_even_splits: true,
347            backfill_split_pk_column_index: 0,
348        }
349    }
350}
351
352impl CdcScanOptions {
353    pub fn to_proto(&self) -> StreamCdcScanOptions {
354        StreamCdcScanOptions {
355            disable_backfill: self.disable_backfill,
356            snapshot_barrier_interval: self.snapshot_barrier_interval,
357            snapshot_batch_size: self.snapshot_batch_size,
358            backfill_parallelism: self.backfill_parallelism,
359            backfill_num_rows_per_split: self.backfill_num_rows_per_split,
360            backfill_as_even_splits: self.backfill_as_even_splits,
361            backfill_split_pk_column_index: self.backfill_split_pk_column_index,
362        }
363    }
364
365    pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
366        Self {
367            disable_backfill: proto.disable_backfill,
368            snapshot_barrier_interval: proto.snapshot_barrier_interval,
369            snapshot_batch_size: proto.snapshot_batch_size,
370            backfill_parallelism: proto.backfill_parallelism,
371            backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
372            backfill_as_even_splits: proto.backfill_as_even_splits,
373            backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
374        }
375    }
376
377    pub fn is_parallelized_backfill(&self) -> bool {
378        !self.disable_backfill
379            && self.backfill_num_rows_per_split > 0
380            && self.backfill_parallelism > 0
381    }
382}