risingwave_connector/source/cdc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_common::id::{ActorId, SourceId};
27use risingwave_pb::catalog::PbSource;
28use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
29use risingwave_pb::plan_common::ExternalTableDesc;
30use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
31use risingwave_pb::source::{
32    PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplits, PbCdcTableSnapshotSplitsWithGeneration,
33};
34use risingwave_pb::stream_plan::StreamCdcScanOptions;
35use simd_json::prelude::ArrayTrait;
36pub use source::*;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::error::ConnectorResult;
40use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
41use crate::{for_all_classified_sources, impl_cdc_source_type};
42
43pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
44pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
45pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
46pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
47// User can set snapshot='false' to disable cdc backfill
48pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
49pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
50pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
51pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
52pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
53pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
54pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
55// We enable transaction for shared cdc source by default
56pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
57pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
58pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
59
60// User can set strong-schema='true' to enable strong schema for mongo cdc source
61pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
62
63pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
64pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
65pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
66pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
67pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
68
69/// Build a unique CDC table identifier from a source ID and external table name
70pub fn build_cdc_table_id(source_id: SourceId, external_table_name: &str) -> String {
71    format!("{}.{}", source_id, external_table_name)
72}
73
74pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
75    const CDC_CONNECTOR_NAME: &'static str;
76    fn source_type() -> CdcSourceType;
77}
78
79for_all_classified_sources!(impl_cdc_source_type);
80
81impl<'a> From<&'a str> for CdcSourceType {
82    fn from(name: &'a str) -> Self {
83        match name {
84            MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
85            POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
86            CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
87            MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
88            SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
89            _ => CdcSourceType::Unspecified,
90        }
91    }
92}
93
94impl CdcSourceType {
95    pub fn as_str_name(&self) -> &str {
96        match self {
97            CdcSourceType::Mysql => "MySQL",
98            CdcSourceType::Postgres => "Postgres",
99            CdcSourceType::Citus => "Citus",
100            CdcSourceType::Mongodb => "MongoDB",
101            CdcSourceType::SqlServer => "SQL Server",
102            CdcSourceType::Unspecified => "Unspecified",
103        }
104    }
105}
106
107#[derive(Clone, Debug, Default)]
108pub struct CdcProperties<T: CdcSourceTypeTrait> {
109    /// Properties specified in the WITH clause by user
110    pub properties: BTreeMap<String, String>,
111
112    /// Schema of the source specified by users
113    pub table_schema: TableSchema,
114
115    /// Whether it is created by a cdc source job
116    pub is_cdc_source_job: bool,
117
118    /// For validation purpose, mark if the table is a backfill cdc table
119    pub is_backfill_table: bool,
120
121    pub _phantom: PhantomData<T>,
122}
123
124pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
125    TableSchema {
126        columns: table_schema
127            .columns
128            .iter()
129            .filter(|col| {
130                col.additional_column
131                    .as_ref()
132                    .is_some_and(|val| val.column_type.is_none())
133            })
134            .cloned()
135            .collect(),
136        pk_indices: table_schema.pk_indices.clone(),
137    }
138}
139
140impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
141    fn try_from_btreemap(
142        properties: BTreeMap<String, String>,
143        _deny_unknown_fields: bool,
144    ) -> ConnectorResult<Self> {
145        let is_share_source: bool = properties
146            .get(CDC_SHARING_MODE_KEY)
147            .is_some_and(|v| v == "true");
148        Ok(CdcProperties {
149            properties,
150            table_schema: Default::default(),
151            // TODO(siyuan): use serde to deserialize input hashmap
152            is_cdc_source_job: is_share_source,
153            is_backfill_table: false,
154            _phantom: PhantomData,
155        })
156    }
157}
158
159impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} // todo: enforce jdbc like properties
160
161impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
162where
163    DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
164    DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T> + enumerator::CdcMonitor,
165{
166    type Split = DebeziumCdcSplit<T>;
167    type SplitEnumerator = DebeziumSplitEnumerator<T>;
168    type SplitReader = CdcSplitReader<T>;
169
170    const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
171
172    fn init_from_pb_source(&mut self, source: &PbSource) {
173        let pk_indices = source
174            .pk_column_ids
175            .iter()
176            .map(|&id| {
177                source
178                    .columns
179                    .iter()
180                    .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
181                    .unwrap() as u32
182            })
183            .collect_vec();
184
185        let table_schema = PbTableSchema {
186            columns: source
187                .columns
188                .iter()
189                .flat_map(|col| &col.column_desc)
190                .filter(|col| {
191                    !matches!(
192                        col.generated_or_default_column,
193                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
194                    )
195                })
196                .cloned()
197                .collect(),
198            pk_indices,
199        };
200        self.table_schema = table_schema;
201        if let Some(info) = source.info.as_ref() {
202            self.is_cdc_source_job = info.is_shared();
203        }
204    }
205
206    fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
207        let table_schema = TableSchema {
208            columns: table_desc
209                .columns
210                .iter()
211                .filter(|col| {
212                    !matches!(
213                        col.generated_or_default_column,
214                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
215                    )
216                })
217                .cloned()
218                .collect(),
219            pk_indices: table_desc.stream_key.clone(),
220        };
221
222        self.table_schema = table_schema;
223        self.is_cdc_source_job = false;
224        self.is_backfill_table = true;
225    }
226}
227
228impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
229    fn unknown_fields(&self) -> HashMap<String, String> {
230        // FIXME: CDC does not handle unknown fields yet
231        HashMap::new()
232    }
233}
234
235impl<T: CdcSourceTypeTrait> CdcProperties<T> {
236    pub fn get_source_type_pb(&self) -> SourceType {
237        SourceType::from(T::source_type())
238    }
239}
240
241pub type CdcTableSnapshotSplitAssignment = HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>;
242
243pub const INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 0;
244pub const INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID: u64 = 1;
245
246#[derive(Clone, Debug, PartialEq, Default)]
247pub struct CdcTableSnapshotSplitAssignmentWithGeneration {
248    pub splits: HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>,
249    pub generation: u64,
250}
251
252impl CdcTableSnapshotSplitAssignmentWithGeneration {
253    pub fn new(splits: HashMap<ActorId, Vec<CdcTableSnapshotSplitRaw>>, generation: u64) -> Self {
254        Self { splits, generation }
255    }
256
257    pub fn empty() -> Self {
258        Self {
259            splits: HashMap::default(),
260            generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
261        }
262    }
263}
264
265pub fn build_pb_actor_cdc_table_snapshot_splits_with_generation(
266    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignmentWithGeneration,
267) -> PbCdcTableSnapshotSplitsWithGeneration {
268    let splits =
269        build_pb_actor_cdc_table_snapshot_splits(cdc_table_snapshot_split_assignment.splits);
270    PbCdcTableSnapshotSplitsWithGeneration {
271        splits,
272        generation: cdc_table_snapshot_split_assignment.generation,
273    }
274}
275
276pub fn build_pb_actor_cdc_table_snapshot_splits(
277    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
278) -> HashMap<ActorId, PbCdcTableSnapshotSplits> {
279    cdc_table_snapshot_split_assignment
280        .into_iter()
281        .map(|(actor_id, splits)| {
282            let splits = PbCdcTableSnapshotSplits {
283                splits: splits
284                    .into_iter()
285                    .map(|s| PbCdcTableSnapshotSplit {
286                        split_id: s.split_id,
287                        left_bound_inclusive: s.left_bound_inclusive,
288                        right_bound_exclusive: s.right_bound_exclusive,
289                    })
290                    .collect(),
291            };
292            (actor_id, splits)
293        })
294        .collect()
295}
296
297pub fn build_actor_cdc_table_snapshot_splits_with_generation(
298    pb_cdc_table_snapshot_split_assignment: PbCdcTableSnapshotSplitsWithGeneration,
299) -> CdcTableSnapshotSplitAssignmentWithGeneration {
300    let splits = pb_cdc_table_snapshot_split_assignment
301        .splits
302        .into_iter()
303        .map(|(actor_id, splits)| {
304            let splits = splits
305                .splits
306                .into_iter()
307                .map(|s| CdcTableSnapshotSplitRaw {
308                    split_id: s.split_id,
309                    left_bound_inclusive: s.left_bound_inclusive,
310                    right_bound_exclusive: s.right_bound_exclusive,
311                })
312                .collect();
313            (actor_id, splits)
314        })
315        .collect();
316    let generation = pb_cdc_table_snapshot_split_assignment.generation;
317    CdcTableSnapshotSplitAssignmentWithGeneration { splits, generation }
318}
319
320#[derive(Debug, Clone, Hash, PartialEq)]
321pub struct CdcScanOptions {
322    /// Used by Used in non-parallel backfill, i.e. backfill V1.
323    pub disable_backfill: bool,
324    /// Used by non-parallelized backfill. The frequency of snapshot read resets for consuming the WAL backlog.
325    pub snapshot_barrier_interval: u32,
326    /// Used by non-parallelized backfill. The number of rows to fetch in a single batch when reading from an external table.
327    pub snapshot_batch_size: u32,
328    /// Used by parallelized backfill, i.e. backfill V2. The initial parallelism of parallel backfill.
329    pub backfill_parallelism: u32,
330    /// Used by parallelized backfill. The estimated number of rows per split used in splits generation.
331    pub backfill_num_rows_per_split: u64,
332    /// Used by parallelized backfill. For supported split column data type, assume an uniform distribution and adopt a much faster splits generation method.
333    pub backfill_as_even_splits: bool,
334    /// Used by parallelized backfill. Specify the index of primary key column to use as split column.
335    pub backfill_split_pk_column_index: u32,
336}
337
338impl Default for CdcScanOptions {
339    fn default() -> Self {
340        Self {
341            disable_backfill: false,
342            snapshot_barrier_interval: 1,
343            snapshot_batch_size: 1000,
344            // 0 means disable backfill v2.
345            backfill_parallelism: 0,
346            backfill_num_rows_per_split: 100_000,
347            backfill_as_even_splits: true,
348            backfill_split_pk_column_index: 0,
349        }
350    }
351}
352
353impl CdcScanOptions {
354    pub fn to_proto(&self) -> StreamCdcScanOptions {
355        StreamCdcScanOptions {
356            disable_backfill: self.disable_backfill,
357            snapshot_barrier_interval: self.snapshot_barrier_interval,
358            snapshot_batch_size: self.snapshot_batch_size,
359            backfill_parallelism: self.backfill_parallelism,
360            backfill_num_rows_per_split: self.backfill_num_rows_per_split,
361            backfill_as_even_splits: self.backfill_as_even_splits,
362            backfill_split_pk_column_index: self.backfill_split_pk_column_index,
363        }
364    }
365
366    pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
367        Self {
368            disable_backfill: proto.disable_backfill,
369            snapshot_barrier_interval: proto.snapshot_barrier_interval,
370            snapshot_batch_size: proto.snapshot_batch_size,
371            backfill_parallelism: proto.backfill_parallelism,
372            backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
373            backfill_as_even_splits: proto.backfill_as_even_splits,
374            backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
375        }
376    }
377
378    pub fn is_parallelized_backfill(&self) -> bool {
379        !self.disable_backfill
380            && self.backfill_num_rows_per_split > 0
381            && self.backfill_parallelism > 0
382    }
383}