risingwave_connector/source/cdc/
split.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait};
24use crate::source::{SplitId, SplitMetaData};
25
26/// The base states of a CDC split, which will be persisted to checkpoint.
27/// CDC source only has single split, so we use the `source_id` to identify the split.
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30    pub split_id: u32,
31    pub start_offset: Option<String>,
32    pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37        Self {
38            split_id,
39            start_offset,
40            snapshot_done: false,
41        }
42    }
43}
44
45trait CdcSplitTrait: Send + Sync {
46    fn split_id(&self) -> u32;
47    fn start_offset(&self) -> &Option<String>;
48    fn is_snapshot_done(&self) -> bool;
49    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51    // MySQL and MongoDB shares the same logic to extract the snapshot flag
52    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53        // if snapshot_done is already true, it won't be changed
54        let mut snapshot_done = self.is_snapshot_done();
55        if snapshot_done {
56            return Ok(snapshot_done);
57        }
58
59        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60            format!(
61                "invalid cdc offset: {}, split: {}",
62                start_offset,
63                self.split_id()
64            )
65        })?;
66
67        // heartbeat event should not update the `snapshot_done` flag
68        if !dbz_offset.is_heartbeat {
69            snapshot_done = match dbz_offset.source_offset.snapshot {
70                Some(val) => !val,
71                None => true,
72            };
73        }
74        Ok(snapshot_done)
75    }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80    pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85    pub inner: CdcSplitBase,
86    // the hostname and port of a node that holding shard tables (for Citus)
87    pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92    pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97    pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102        let split = CdcSplitBase {
103            split_id,
104            start_offset,
105            snapshot_done: false,
106        };
107        Self { inner: split }
108    }
109}
110
111impl CdcSplitTrait for MySqlCdcSplit {
112    fn split_id(&self) -> u32 {
113        self.inner.split_id
114    }
115
116    fn start_offset(&self) -> &Option<String> {
117        &self.inner.start_offset
118    }
119
120    fn is_snapshot_done(&self) -> bool {
121        self.inner.snapshot_done
122    }
123
124    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
125        // if snapshot_done is already true, it won't be updated
126        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
127        self.inner.start_offset = Some(last_seen_offset);
128        Ok(())
129    }
130}
131
132impl PostgresCdcSplit {
133    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
134        let split = CdcSplitBase {
135            split_id,
136            start_offset,
137            snapshot_done: false,
138        };
139        Self {
140            inner: split,
141            server_addr,
142        }
143    }
144}
145
146impl CdcSplitTrait for PostgresCdcSplit {
147    fn split_id(&self) -> u32 {
148        self.inner.split_id
149    }
150
151    fn start_offset(&self) -> &Option<String> {
152        &self.inner.start_offset
153    }
154
155    fn is_snapshot_done(&self) -> bool {
156        self.inner.snapshot_done
157    }
158
159    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
160        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
161        self.inner.start_offset = Some(last_seen_offset);
162        Ok(())
163    }
164
165    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
166        // if snapshot_done is already true, it won't be changed
167        let mut snapshot_done = self.is_snapshot_done();
168        if snapshot_done {
169            return Ok(snapshot_done);
170        }
171
172        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
173            format!(
174                "invalid postgres offset: {}, split: {}",
175                start_offset, self.inner.split_id
176            )
177        })?;
178
179        // heartbeat event should not update the `snapshot_done` flag
180        if !dbz_offset.is_heartbeat {
181            snapshot_done = dbz_offset
182                .source_offset
183                .last_snapshot_record
184                .unwrap_or(false);
185        }
186        Ok(snapshot_done)
187    }
188}
189
190impl MongoDbCdcSplit {
191    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
192        let split = CdcSplitBase {
193            split_id,
194            start_offset,
195            snapshot_done: false,
196        };
197        Self { inner: split }
198    }
199}
200
201impl CdcSplitTrait for MongoDbCdcSplit {
202    fn split_id(&self) -> u32 {
203        self.inner.split_id
204    }
205
206    fn start_offset(&self) -> &Option<String> {
207        &self.inner.start_offset
208    }
209
210    fn is_snapshot_done(&self) -> bool {
211        self.inner.snapshot_done
212    }
213
214    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
215        // if snapshot_done is already true, it will remain true
216        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
217        self.inner.start_offset = Some(last_seen_offset);
218        Ok(())
219    }
220}
221
222impl SqlServerCdcSplit {
223    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
224        let split = CdcSplitBase {
225            split_id,
226            start_offset,
227            snapshot_done: false,
228        };
229        Self { inner: split }
230    }
231}
232
233impl CdcSplitTrait for SqlServerCdcSplit {
234    fn split_id(&self) -> u32 {
235        self.inner.split_id
236    }
237
238    fn start_offset(&self) -> &Option<String> {
239        &self.inner.start_offset
240    }
241
242    fn is_snapshot_done(&self) -> bool {
243        self.inner.snapshot_done
244    }
245
246    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
247        // if snapshot_done is already true, it will remain true
248        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
249        self.inner.start_offset = Some(last_seen_offset);
250        Ok(())
251    }
252}
253
254/// We use this struct to wrap the specific split, which act as an interface to other modules
255#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
256pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
257    pub mysql_split: Option<MySqlCdcSplit>,
258
259    #[serde(rename = "pg_split")] // backward compatibility
260    pub postgres_split: Option<PostgresCdcSplit>,
261    pub citus_split: Option<PostgresCdcSplit>,
262    pub mongodb_split: Option<MongoDbCdcSplit>,
263    pub sql_server_split: Option<SqlServerCdcSplit>,
264
265    #[serde(skip)]
266    pub _phantom: PhantomData<T>,
267}
268
269macro_rules! dispatch_cdc_split_inner {
270    ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
271        match T::source_type() {
272            $(
273                CdcSourceType::$cdc_source_type => {
274                    $crate::paste! {
275                        $dbz_split.[<$cdc_source_split>]
276                            .[<as_ $as_type>]()
277                            .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
278                            .$body
279                    }
280                }
281            )*
282            CdcSourceType::Unspecified => {
283                unreachable!("invalid debezium split");
284            }
285        }
286    }
287}
288
289// call corresponding split method of the specific cdc source type
290macro_rules! dispatch_cdc_split {
291    ($dbz_split:expr, $as_type:tt, $body:expr) => {
292        dispatch_cdc_split_inner!($dbz_split, $as_type, {
293            {Mysql, mysql_split},
294            {Postgres, postgres_split},
295            {Citus, citus_split},
296            {Mongodb, mongodb_split},
297            {SqlServer, sql_server_split}
298        }, $body)
299    }
300}
301
302impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
303    fn id(&self) -> SplitId {
304        format!("{}", self.split_id()).into()
305    }
306
307    fn encode_to_json(&self) -> JsonbVal {
308        serde_json::to_value(self.clone()).unwrap().into()
309    }
310
311    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
312        serde_json::from_value(value.take()).map_err(Into::into)
313    }
314
315    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
316        self.update_offset_inner(last_seen_offset)
317    }
318}
319
320impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
321    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
322        let mut ret = Self {
323            mysql_split: None,
324            postgres_split: None,
325            citus_split: None,
326            mongodb_split: None,
327            sql_server_split: None,
328            _phantom: PhantomData,
329        };
330        match T::source_type() {
331            CdcSourceType::Mysql => {
332                let split = MySqlCdcSplit::new(split_id, start_offset);
333                ret.mysql_split = Some(split);
334            }
335            CdcSourceType::Postgres => {
336                let split = PostgresCdcSplit::new(split_id, start_offset, None);
337                ret.postgres_split = Some(split);
338            }
339            CdcSourceType::Citus => {
340                let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
341                ret.citus_split = Some(split);
342            }
343            CdcSourceType::Mongodb => {
344                let split = MongoDbCdcSplit::new(split_id, start_offset);
345                ret.mongodb_split = Some(split);
346            }
347            CdcSourceType::SqlServer => {
348                let split = SqlServerCdcSplit::new(split_id, start_offset);
349                ret.sql_server_split = Some(split);
350            }
351            CdcSourceType::Unspecified => {
352                unreachable!("invalid debezium split")
353            }
354        }
355        ret
356    }
357
358    pub fn split_id(&self) -> u32 {
359        dispatch_cdc_split!(self, ref, split_id())
360    }
361
362    pub fn start_offset(&self) -> &Option<String> {
363        dispatch_cdc_split!(self, ref, start_offset())
364    }
365
366    pub fn snapshot_done(&self) -> bool {
367        dispatch_cdc_split!(self, ref, is_snapshot_done())
368    }
369
370    pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
371        dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
372        Ok(())
373    }
374}