risingwave_connector/source/cdc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_pb::catalog::PbSource;
27use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
28use risingwave_pb::plan_common::ExternalTableDesc;
29use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
30use simd_json::prelude::ArrayTrait;
31pub use source::*;
32
33use crate::enforce_secret::EnforceSecret;
34use crate::error::ConnectorResult;
35use crate::source::{SourceProperties, SplitImpl, TryFromBTreeMap};
36use crate::{for_all_classified_sources, impl_cdc_source_type};
37
38pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
39pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
40pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
41pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
42// User can set snapshot='false' to disable cdc backfill
43pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
44pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
45pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
46// We enable transaction for shared cdc source by default
47pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
48pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
49pub const CDC_AUTO_SCHEMA_CHANGE_KEY: &str = "auto.schema.change";
50
51// User can set strong-schema='true' to enable strong schema for mongo cdc source
52pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
53
54pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
55pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
56pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
57pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
58pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
59
60/// Build a unique CDC table identifier from a source ID and external table name
61pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
62    format!("{}.{}", source_id, external_table_name)
63}
64
65pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
66    const CDC_CONNECTOR_NAME: &'static str;
67    fn source_type() -> CdcSourceType;
68}
69
70for_all_classified_sources!(impl_cdc_source_type);
71
72impl<'a> From<&'a str> for CdcSourceType {
73    fn from(name: &'a str) -> Self {
74        match name {
75            MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
76            POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
77            CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
78            MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
79            SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
80            _ => CdcSourceType::Unspecified,
81        }
82    }
83}
84
85impl CdcSourceType {
86    pub fn as_str_name(&self) -> &str {
87        match self {
88            CdcSourceType::Mysql => "MySQL",
89            CdcSourceType::Postgres => "Postgres",
90            CdcSourceType::Citus => "Citus",
91            CdcSourceType::Mongodb => "MongoDB",
92            CdcSourceType::SqlServer => "SQL Server",
93            CdcSourceType::Unspecified => "Unspecified",
94        }
95    }
96}
97
98#[derive(Clone, Debug, Default)]
99pub struct CdcProperties<T: CdcSourceTypeTrait> {
100    /// Properties specified in the WITH clause by user
101    pub properties: BTreeMap<String, String>,
102
103    /// Schema of the source specified by users
104    pub table_schema: TableSchema,
105
106    /// Whether it is created by a cdc source job
107    pub is_cdc_source_job: bool,
108
109    /// For validation purpose, mark if the table is a backfill cdc table
110    pub is_backfill_table: bool,
111
112    pub _phantom: PhantomData<T>,
113}
114
115pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
116    TableSchema {
117        columns: table_schema
118            .columns
119            .iter()
120            .filter(|col| {
121                col.additional_column
122                    .as_ref()
123                    .is_some_and(|val| val.column_type.is_none())
124            })
125            .cloned()
126            .collect(),
127        pk_indices: table_schema.pk_indices.clone(),
128    }
129}
130
131impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
132    fn try_from_btreemap(
133        properties: BTreeMap<String, String>,
134        _deny_unknown_fields: bool,
135    ) -> ConnectorResult<Self> {
136        let is_share_source: bool = properties
137            .get(CDC_SHARING_MODE_KEY)
138            .is_some_and(|v| v == "true");
139        Ok(CdcProperties {
140            properties,
141            table_schema: Default::default(),
142            // TODO(siyuan): use serde to deserialize input hashmap
143            is_cdc_source_job: is_share_source,
144            is_backfill_table: false,
145            _phantom: PhantomData,
146        })
147    }
148}
149
150impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} // todo: enforce jdbc like properties
151
152impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
153where
154    DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
155    DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T>,
156{
157    type Split = DebeziumCdcSplit<T>;
158    type SplitEnumerator = DebeziumSplitEnumerator<T>;
159    type SplitReader = CdcSplitReader<T>;
160
161    const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
162
163    fn init_from_pb_source(&mut self, source: &PbSource) {
164        let pk_indices = source
165            .pk_column_ids
166            .iter()
167            .map(|&id| {
168                source
169                    .columns
170                    .iter()
171                    .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
172                    .unwrap() as u32
173            })
174            .collect_vec();
175
176        let table_schema = PbTableSchema {
177            columns: source
178                .columns
179                .iter()
180                .flat_map(|col| &col.column_desc)
181                .filter(|col| {
182                    !matches!(
183                        col.generated_or_default_column,
184                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
185                    )
186                })
187                .cloned()
188                .collect(),
189            pk_indices,
190        };
191        self.table_schema = table_schema;
192        if let Some(info) = source.info.as_ref() {
193            self.is_cdc_source_job = info.is_shared();
194        }
195    }
196
197    fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
198        let table_schema = TableSchema {
199            columns: table_desc
200                .columns
201                .iter()
202                .filter(|col| {
203                    !matches!(
204                        col.generated_or_default_column,
205                        Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
206                    )
207                })
208                .cloned()
209                .collect(),
210            pk_indices: table_desc.stream_key.clone(),
211        };
212
213        self.table_schema = table_schema;
214        self.is_cdc_source_job = false;
215        self.is_backfill_table = true;
216    }
217}
218
219impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
220    fn unknown_fields(&self) -> HashMap<String, String> {
221        // FIXME: CDC does not handle unknown fields yet
222        HashMap::new()
223    }
224}
225
226impl<T: CdcSourceTypeTrait> CdcProperties<T> {
227    pub fn get_source_type_pb(&self) -> SourceType {
228        SourceType::from(T::source_type())
229    }
230}