risingwave_connector/source/cdc/
mod.rs1pub mod enumerator;
16pub mod external;
17pub mod jni_source;
18pub mod source;
19pub mod split;
20
21use std::collections::{BTreeMap, HashMap};
22use std::marker::PhantomData;
23
24pub use enumerator::*;
25use itertools::Itertools;
26use risingwave_pb::catalog::PbSource;
27use risingwave_pb::connector_service::{PbSourceType, PbTableSchema, SourceType, TableSchema};
28use risingwave_pb::plan_common::ExternalTableDesc;
29use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
30use simd_json::prelude::ArrayTrait;
31pub use source::*;
32
33use crate::enforce_secret::EnforceSecret;
34use crate::error::ConnectorResult;
35use crate::source::{SourceProperties, SplitImpl, TryFromBTreeMap};
36use crate::{for_all_classified_sources, impl_cdc_source_type};
37
38pub const CDC_CONNECTOR_NAME_SUFFIX: &str = "-cdc";
39pub const CDC_SNAPSHOT_MODE_KEY: &str = "debezium.snapshot.mode";
40pub const CDC_SNAPSHOT_BACKFILL: &str = "rw_cdc_backfill";
41pub const CDC_SHARING_MODE_KEY: &str = "rw.sharing.mode.enable";
42pub const CDC_BACKFILL_ENABLE_KEY: &str = "snapshot";
44pub const CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY: &str = "snapshot.interval";
45pub const CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY: &str = "snapshot.batch_size";
46pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
48pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
49
50pub const CDC_MONGODB_STRONG_SCHEMA_KEY: &str = "strong_schema";
52
53pub const MYSQL_CDC_CONNECTOR: &str = Mysql::CDC_CONNECTOR_NAME;
54pub const POSTGRES_CDC_CONNECTOR: &str = Postgres::CDC_CONNECTOR_NAME;
55pub const CITUS_CDC_CONNECTOR: &str = Citus::CDC_CONNECTOR_NAME;
56pub const MONGODB_CDC_CONNECTOR: &str = Mongodb::CDC_CONNECTOR_NAME;
57pub const SQL_SERVER_CDC_CONNECTOR: &str = SqlServer::CDC_CONNECTOR_NAME;
58
59pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
61 format!("{}.{}", source_id, external_table_name)
62}
63
64pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
65 const CDC_CONNECTOR_NAME: &'static str;
66 fn source_type() -> CdcSourceType;
67}
68
69for_all_classified_sources!(impl_cdc_source_type);
70
71impl<'a> From<&'a str> for CdcSourceType {
72 fn from(name: &'a str) -> Self {
73 match name {
74 MYSQL_CDC_CONNECTOR => CdcSourceType::Mysql,
75 POSTGRES_CDC_CONNECTOR => CdcSourceType::Postgres,
76 CITUS_CDC_CONNECTOR => CdcSourceType::Citus,
77 MONGODB_CDC_CONNECTOR => CdcSourceType::Mongodb,
78 SQL_SERVER_CDC_CONNECTOR => CdcSourceType::SqlServer,
79 _ => CdcSourceType::Unspecified,
80 }
81 }
82}
83
84impl CdcSourceType {
85 pub fn as_str_name(&self) -> &str {
86 match self {
87 CdcSourceType::Mysql => "MySQL",
88 CdcSourceType::Postgres => "Postgres",
89 CdcSourceType::Citus => "Citus",
90 CdcSourceType::Mongodb => "MongoDB",
91 CdcSourceType::SqlServer => "SQL Server",
92 CdcSourceType::Unspecified => "Unspecified",
93 }
94 }
95}
96
97#[derive(Clone, Debug, Default)]
98pub struct CdcProperties<T: CdcSourceTypeTrait> {
99 pub properties: BTreeMap<String, String>,
101
102 pub table_schema: TableSchema,
104
105 pub is_cdc_source_job: bool,
107
108 pub is_backfill_table: bool,
110
111 pub _phantom: PhantomData<T>,
112}
113
114pub fn table_schema_exclude_additional_columns(table_schema: &TableSchema) -> TableSchema {
115 TableSchema {
116 columns: table_schema
117 .columns
118 .iter()
119 .filter(|col| {
120 col.additional_column
121 .as_ref()
122 .is_some_and(|val| val.column_type.is_none())
123 })
124 .cloned()
125 .collect(),
126 pk_indices: table_schema.pk_indices.clone(),
127 }
128}
129
130impl<T: CdcSourceTypeTrait> TryFromBTreeMap for CdcProperties<T> {
131 fn try_from_btreemap(
132 properties: BTreeMap<String, String>,
133 _deny_unknown_fields: bool,
134 ) -> ConnectorResult<Self> {
135 let is_share_source: bool = properties
136 .get(CDC_SHARING_MODE_KEY)
137 .is_some_and(|v| v == "true");
138 Ok(CdcProperties {
139 properties,
140 table_schema: Default::default(),
141 is_cdc_source_job: is_share_source,
143 is_backfill_table: false,
144 _phantom: PhantomData,
145 })
146 }
147}
148
149impl<T: CdcSourceTypeTrait> EnforceSecret for CdcProperties<T> {} impl<T: CdcSourceTypeTrait> SourceProperties for CdcProperties<T>
152where
153 DebeziumCdcSplit<T>: TryFrom<SplitImpl, Error = crate::error::ConnectorError> + Into<SplitImpl>,
154 DebeziumSplitEnumerator<T>: ListCdcSplits<CdcSourceType = T>,
155{
156 type Split = DebeziumCdcSplit<T>;
157 type SplitEnumerator = DebeziumSplitEnumerator<T>;
158 type SplitReader = CdcSplitReader<T>;
159
160 const SOURCE_NAME: &'static str = T::CDC_CONNECTOR_NAME;
161
162 fn init_from_pb_source(&mut self, source: &PbSource) {
163 let pk_indices = source
164 .pk_column_ids
165 .iter()
166 .map(|&id| {
167 source
168 .columns
169 .iter()
170 .position(|col| col.column_desc.as_ref().unwrap().column_id == id)
171 .unwrap() as u32
172 })
173 .collect_vec();
174
175 let table_schema = PbTableSchema {
176 columns: source
177 .columns
178 .iter()
179 .flat_map(|col| &col.column_desc)
180 .filter(|col| {
181 !matches!(
182 col.generated_or_default_column,
183 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
184 )
185 })
186 .cloned()
187 .collect(),
188 pk_indices,
189 };
190 self.table_schema = table_schema;
191 if let Some(info) = source.info.as_ref() {
192 self.is_cdc_source_job = info.is_shared();
193 }
194 }
195
196 fn init_from_pb_cdc_table_desc(&mut self, table_desc: &ExternalTableDesc) {
197 let table_schema = TableSchema {
198 columns: table_desc
199 .columns
200 .iter()
201 .filter(|col| {
202 !matches!(
203 col.generated_or_default_column,
204 Some(GeneratedOrDefaultColumn::GeneratedColumn(_))
205 )
206 })
207 .cloned()
208 .collect(),
209 pk_indices: table_desc.stream_key.clone(),
210 };
211
212 self.table_schema = table_schema;
213 self.is_cdc_source_job = false;
214 self.is_backfill_table = true;
215 }
216}
217
218impl<T: CdcSourceTypeTrait> crate::source::UnknownFields for CdcProperties<T> {
219 fn unknown_fields(&self) -> HashMap<String, String> {
220 HashMap::new()
222 }
223}
224
225impl<T: CdcSourceTypeTrait> CdcProperties<T> {
226 pub fn get_source_type_pb(&self) -> SourceType {
227 SourceType::from(T::source_type())
228 }
229}