risingwave_connector/source/cdc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_pb::catalog::PbSource;
27use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
28use risingwave_pb::plan_common::ExternalTableDesc;
29use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
30use risingwave_pb::source::{PbCdcTableSnapshotSplit, PbCdcTableSnapshotSplits};
31use risingwave_pb::stream_plan::StreamCdcScanOptions;
32use simd_json::prelude::ArrayTrait;
33pub use source::*;
34
35use crate::enforce_secret::EnforceSecret;
36use crate::error::ConnectorResult;
37use crate::source::{CdcTableSnapshotSplitRaw, SourceProperties, SplitImpl, TryFromBTreeMap};
38use crate::{for_all_classified_sources, impl_cdc_source_type};
39
40pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
41pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
42pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
43pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
44// User can set snapshot='false' to disable cdc backfill
45pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
46pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
47pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
48pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
49pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
50pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
51pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
52// We enable transaction for shared cdc source by default
53pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
54pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
55pub const CDC_BACKFILL_MAX_PARALLELISM: u32 = 256;
56
57// User can set strong-schema='true' to enable strong schema for mongo cdc source
58pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
59
60pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
61pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
62pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
63pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
64pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
65
66/// Build a unique CDC table identifier from a source ID and external table name
67pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
68    format!("{}.{}", source_id, external_table_name)
69}
70
71pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
72    const CDC_CONNECTOR_NAME: &'static str;
73    fn source_type() -> CdcSourceType;
74}
75
76for_all_classified_sources!(impl_cdc_source_type);
77
78impl<'a> From<&'a str> for CdcSourceType {
79    fn from(name: &'a str) -> Self {
80        match name {
81            MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
82            POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
83            CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
84            MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
85            SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
86            _ => CdcSourceType::Unspecified,
87        }
88    }
89}
90
91impl CdcSourceType {
92    pub fn as_str_name(&self) -> &str {
93        match self {
94            CdcSourceType::Mysql => "MySQL",
95            CdcSourceType::Postgres => "Postgres",
96            CdcSourceType::Citus => "Citus",
97            CdcSourceType::Mongodb => "MongoDB",
98            CdcSourceType::SqlServer => "SQL Server",
99            CdcSourceType::Unspecified => "Unspecified",
100        }
101    }
102}
103
104#[derive(Clone, Debug, Default)]
105pub struct CdcProperties<T: CdcSourceTypeTrait> {
106    /// Properties specified in the WITH clause by user
107    pub properties: BTreeMap<String, String>,
108
109    /// Schema of the source specified by users
110    pub table_schema: TableSchema,
111
112    /// Whether it is created by a cdc source job
113    pub is_cdc_source_job: bool,
114
115    /// For validation purpose, mark if the table is a backfill cdc table
116    pub is_backfill_table: bool,
117
118    pub _phantom: PhantomData<T>,
119}
120
121pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
122    TableSchema {
123        columns: table_schema
124            .columns
125            .iter()
126            .filter(|col| {
127                col.additional_column
128                    .as_ref()
129                    .is_some_and(|val| val.column_type.is_none())
130            })
131            .cloned()
132            .collect(),
133        pk_indices: table_schema.pk_indices.clone(),
134    }
135}
136
137impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
138    fn try_from_btreemap(
139        properties: BTreeMap<String, String>,
140        _deny_unknown_fields: bool,
141    ) -> ConnectorResult<Self> {
142        let is_share_source: bool = properties
143            .get(CDC_SHARING_MODE_KEY)
144            .is_some_and(|v| v == "true");
145        Ok(CdcProperties {
146            properties,
147            table_schema: Default::default(),
148            // TODO(siyuan): use serde to deserialize input hashmap
149            is_cdc_source_job: is_share_source,
150            is_backfill_table: false,
151            _phantom: PhantomData,
152        })
153    }
154}
155
156impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} // todo: enforce jdbc like properties
157
158impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
159where
160    DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
161    DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T>,
162{
163    type Split = DebeziumCdcSplit<T>;
164    type SplitEnumerator = DebeziumSplitEnumerator<T>;
165    type SplitReader = CdcSplitReader<T>;
166
167    const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
168
169    fn init_from_pb_source(&mut self, source: &PbSource) {
170        let pk_indices = source
171            .pk_column_ids
172            .iter()
173            .map(|&id| {
174                source
175                    .columns
176                    .iter()
177                    .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
178                    .unwrap() as u32
179            })
180            .collect_vec();
181
182        let table_schema = PbTableSchema {
183            columns: source
184                .columns
185                .iter()
186                .flat_map(|col| &col.column_desc)
187                .filter(|col| {
188                    !matches!(
189                        col.generated_or_default_column,
190                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
191                    )
192                })
193                .cloned()
194                .collect(),
195            pk_indices,
196        };
197        self.table_schema = table_schema;
198        if let Some(info) = source.info.as_ref() {
199            self.is_cdc_source_job = info.is_shared();
200        }
201    }
202
203    fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
204        let table_schema = TableSchema {
205            columns: table_desc
206                .columns
207                .iter()
208                .filter(|col| {
209                    !matches!(
210                        col.generated_or_default_column,
211                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
212                    )
213                })
214                .cloned()
215                .collect(),
216            pk_indices: table_desc.stream_key.clone(),
217        };
218
219        self.table_schema = table_schema;
220        self.is_cdc_source_job = false;
221        self.is_backfill_table = true;
222    }
223}
224
225impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
226    fn unknown_fields(&self) -> HashMap<String, String> {
227        // FIXME: CDC does not handle unknown fields yet
228        HashMap::new()
229    }
230}
231
232impl<T: CdcSourceTypeTrait> CdcProperties<T> {
233    pub fn get_source_type_pb(&self) -> SourceType {
234        SourceType::from(T::source_type())
235    }
236}
237
238pub type CdcTableSnapshotSplitAssignment = HashMap<u32, Vec<CdcTableSnapshotSplitRaw>>;
239
240pub fn build_pb_actor_cdc_table_snapshot_splits(
241    cdc_table_snapshot_split_assignment: CdcTableSnapshotSplitAssignment,
242) -> HashMap<u32, PbCdcTableSnapshotSplits> {
243    cdc_table_snapshot_split_assignment
244        .into_iter()
245        .map(|(actor_id, splits)| {
246            let splits = PbCdcTableSnapshotSplits {
247                splits: splits
248                    .into_iter()
249                    .map(|s| PbCdcTableSnapshotSplit {
250                        split_id: s.split_id,
251                        left_bound_inclusive: s.left_bound_inclusive,
252                        right_bound_exclusive: s.right_bound_exclusive,
253                    })
254                    .collect(),
255            };
256            (actor_id, splits)
257        })
258        .collect()
259}
260
261pub fn build_actor_cdc_table_snapshot_splits(
262    pb_cdc_table_snapshot_split_assignment: HashMap<u32, PbCdcTableSnapshotSplits>,
263) -> CdcTableSnapshotSplitAssignment {
264    pb_cdc_table_snapshot_split_assignment
265        .into_iter()
266        .map(|(actor_id, splits)| {
267            let splits = splits
268                .splits
269                .into_iter()
270                .map(|s| CdcTableSnapshotSplitRaw {
271                    split_id: s.split_id,
272                    left_bound_inclusive: s.left_bound_inclusive,
273                    right_bound_exclusive: s.right_bound_exclusive,
274                })
275                .collect();
276            (actor_id, splits)
277        })
278        .collect()
279}
280
281#[derive(Debug, Clone, Hash, PartialEq)]
282pub struct CdcScanOptions {
283    /// Used by Used in non-parallel backfill, i.e. backfill V1.
284    pub disable_backfill: bool,
285    /// Used by non-parallelized backfill. The frequency of snapshot read resets for consuming the WAL backlog.
286    pub snapshot_barrier_interval: u32,
287    /// Used by non-parallelized backfill. The number of rows to fetch in a single batch when reading from an external table.
288    pub snapshot_batch_size: u32,
289    /// Used by parallelized backfill, i.e. backfill V2. The initial parallelism of parallel backfill.
290    pub backfill_parallelism: u32,
291    /// Used by parallelized backfill. The estimated number of rows per split used in splits generation.
292    pub backfill_num_rows_per_split: u64,
293    /// Used by parallelized backfill. For supported split column data type, assume an uniform distribution and adopt a much faster splits generation method.
294    pub backfill_as_even_splits: bool,
295    /// Used by parallelized backfill. Specify the index of primary key column to use as split column.
296    pub backfill_split_pk_column_index: u32,
297}
298
299impl Default for CdcScanOptions {
300    fn default() -> Self {
301        Self {
302            disable_backfill: false,
303            snapshot_barrier_interval: 1,
304            snapshot_batch_size: 1000,
305            backfill_parallelism: 1,
306            // 0 means disable backfill v2.
307            backfill_num_rows_per_split: 0,
308            backfill_as_even_splits: true,
309            backfill_split_pk_column_index: 0,
310        }
311    }
312}
313
314impl CdcScanOptions {
315    pub fn to_proto(&self) -> StreamCdcScanOptions {
316        StreamCdcScanOptions {
317            disable_backfill: self.disable_backfill,
318            snapshot_barrier_interval: self.snapshot_barrier_interval,
319            snapshot_batch_size: self.snapshot_batch_size,
320            backfill_parallelism: self.backfill_parallelism,
321            backfill_num_rows_per_split: self.backfill_num_rows_per_split,
322            backfill_as_even_splits: self.backfill_as_even_splits,
323            backfill_split_pk_column_index: self.backfill_split_pk_column_index,
324        }
325    }
326
327    pub fn from_proto(proto: &StreamCdcScanOptions) -> Self {
328        Self {
329            disable_backfill: proto.disable_backfill,
330            snapshot_barrier_interval: proto.snapshot_barrier_interval,
331            snapshot_batch_size: proto.snapshot_batch_size,
332            backfill_parallelism: proto.backfill_parallelism,
333            backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
334            backfill_as_even_splits: proto.backfill_as_even_splits,
335            backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
336        }
337    }
338
339    pub fn is_parallelized_backfill(&self) -> bool {
340        !self.disable_backfill
341            && self.backfill_num_rows_per_split > 0
342            && self.backfill_parallelism > 0
343    }
344}