risingwave_connector/source/cdc/
split.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait};
24use crate::source::{SplitId, SplitMetaData};
25
26/// The base states of a CDC split, which will be persisted to checkpoint.
27/// CDC source only has single split, so we use the `source_id` to identify the split.
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30    pub split_id: u32,
31    pub start_offset: Option<String>,
32    /// For a shared sources, this field is always false.
33    pub snapshot_done: bool,
34}
35
36impl CdcSplitBase {
37    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
38        Self {
39            split_id,
40            start_offset,
41            snapshot_done: false,
42        }
43    }
44}
45
46trait CdcSplitTrait: Send + Sync {
47    fn split_id(&self) -> u32;
48    fn start_offset(&self) -> &Option<String>;
49    fn is_snapshot_done(&self) -> bool;
50    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
51
52    // MySQL and MongoDB shares the same logic to extract the snapshot flag
53    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
54        // if snapshot_done is already true, it won't be changed
55        let mut snapshot_done = self.is_snapshot_done();
56        if snapshot_done {
57            return Ok(snapshot_done);
58        }
59
60        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
61            format!(
62                "invalid cdc offset: {}, split: {}",
63                start_offset,
64                self.split_id()
65            )
66        })?;
67
68        // heartbeat event should not update the `snapshot_done` flag
69        if !dbz_offset.is_heartbeat {
70            snapshot_done = match dbz_offset.source_offset.snapshot {
71                Some(val) => !val,
72                None => true,
73            };
74        }
75        Ok(snapshot_done)
76    }
77}
78
79#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
80pub struct MySqlCdcSplit {
81    pub inner: CdcSplitBase,
82}
83
84#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
85pub struct PostgresCdcSplit {
86    pub inner: CdcSplitBase,
87    // the hostname and port of a node that holding shard tables (for Citus)
88    pub server_addr: Option<String>,
89}
90
91#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
92pub struct MongoDbCdcSplit {
93    pub inner: CdcSplitBase,
94}
95
96#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
97pub struct SqlServerCdcSplit {
98    pub inner: CdcSplitBase,
99}
100
101impl MySqlCdcSplit {
102    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
103        let split = CdcSplitBase {
104            split_id,
105            start_offset,
106            snapshot_done: false,
107        };
108        Self { inner: split }
109    }
110}
111
112impl CdcSplitTrait for MySqlCdcSplit {
113    fn split_id(&self) -> u32 {
114        self.inner.split_id
115    }
116
117    fn start_offset(&self) -> &Option<String> {
118        &self.inner.start_offset
119    }
120
121    fn is_snapshot_done(&self) -> bool {
122        self.inner.snapshot_done
123    }
124
125    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
126        // if snapshot_done is already true, it won't be updated
127        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
128        self.inner.start_offset = Some(last_seen_offset);
129        Ok(())
130    }
131}
132
133impl PostgresCdcSplit {
134    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
135        let split = CdcSplitBase {
136            split_id,
137            start_offset,
138            snapshot_done: false,
139        };
140        Self {
141            inner: split,
142            server_addr,
143        }
144    }
145}
146
147impl CdcSplitTrait for PostgresCdcSplit {
148    fn split_id(&self) -> u32 {
149        self.inner.split_id
150    }
151
152    fn start_offset(&self) -> &Option<String> {
153        &self.inner.start_offset
154    }
155
156    fn is_snapshot_done(&self) -> bool {
157        self.inner.snapshot_done
158    }
159
160    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
161        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
162        self.inner.start_offset = Some(last_seen_offset);
163        Ok(())
164    }
165
166    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
167        // if snapshot_done is already true, it won't be changed
168        let mut snapshot_done = self.is_snapshot_done();
169        if snapshot_done {
170            return Ok(snapshot_done);
171        }
172
173        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
174            format!(
175                "invalid postgres offset: {}, split: {}",
176                start_offset, self.inner.split_id
177            )
178        })?;
179
180        // heartbeat event should not update the `snapshot_done` flag
181        if !dbz_offset.is_heartbeat {
182            snapshot_done = dbz_offset
183                .source_offset
184                .last_snapshot_record
185                .unwrap_or(false);
186        }
187        Ok(snapshot_done)
188    }
189}
190
191impl MongoDbCdcSplit {
192    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
193        let split = CdcSplitBase {
194            split_id,
195            start_offset,
196            snapshot_done: false,
197        };
198        Self { inner: split }
199    }
200}
201
202impl CdcSplitTrait for MongoDbCdcSplit {
203    fn split_id(&self) -> u32 {
204        self.inner.split_id
205    }
206
207    fn start_offset(&self) -> &Option<String> {
208        &self.inner.start_offset
209    }
210
211    fn is_snapshot_done(&self) -> bool {
212        self.inner.snapshot_done
213    }
214
215    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
216        // if snapshot_done is already true, it will remain true
217        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
218        self.inner.start_offset = Some(last_seen_offset);
219        Ok(())
220    }
221}
222
223impl SqlServerCdcSplit {
224    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
225        let split = CdcSplitBase {
226            split_id,
227            start_offset,
228            snapshot_done: false,
229        };
230        Self { inner: split }
231    }
232}
233
234impl CdcSplitTrait for SqlServerCdcSplit {
235    fn split_id(&self) -> u32 {
236        self.inner.split_id
237    }
238
239    fn start_offset(&self) -> &Option<String> {
240        &self.inner.start_offset
241    }
242
243    fn is_snapshot_done(&self) -> bool {
244        self.inner.snapshot_done
245    }
246
247    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
248        // if snapshot_done is already true, it will remain true
249        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
250        self.inner.start_offset = Some(last_seen_offset);
251        Ok(())
252    }
253}
254
255/// We use this struct to wrap the specific split, which act as an interface to other modules
256#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
257pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
258    pub mysql_split: Option<MySqlCdcSplit>,
259
260    #[serde(rename = "pg_split")] // backward compatibility
261    pub postgres_split: Option<PostgresCdcSplit>,
262    pub citus_split: Option<PostgresCdcSplit>,
263    pub mongodb_split: Option<MongoDbCdcSplit>,
264    pub sql_server_split: Option<SqlServerCdcSplit>,
265
266    #[serde(skip)]
267    pub _phantom: PhantomData<T>,
268}
269
270macro_rules! dispatch_cdc_split_inner {
271    ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
272        match T::source_type() {
273            $(
274                CdcSourceType::$cdc_source_type => {
275                    $crate::paste! {
276                        $dbz_split.[<$cdc_source_split>]
277                            .[<as_ $as_type>]()
278                            .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
279                            .$body
280                    }
281                }
282            )*
283            CdcSourceType::Unspecified => {
284                unreachable!("invalid debezium split");
285            }
286        }
287    }
288}
289
290// call corresponding split method of the specific cdc source type
291macro_rules! dispatch_cdc_split {
292    ($dbz_split:expr, $as_type:tt, $body:expr) => {
293        dispatch_cdc_split_inner!($dbz_split, $as_type, {
294            {Mysql, mysql_split},
295            {Postgres, postgres_split},
296            {Citus, citus_split},
297            {Mongodb, mongodb_split},
298            {SqlServer, sql_server_split}
299        }, $body)
300    }
301}
302
303impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
304    fn id(&self) -> SplitId {
305        format!("{}", self.split_id()).into()
306    }
307
308    fn encode_to_json(&self) -> JsonbVal {
309        serde_json::to_value(self.clone()).unwrap().into()
310    }
311
312    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
313        serde_json::from_value(value.take()).map_err(Into::into)
314    }
315
316    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
317        self.update_offset_inner(last_seen_offset)
318    }
319}
320
321impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
322    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
323        let mut ret = Self {
324            mysql_split: None,
325            postgres_split: None,
326            citus_split: None,
327            mongodb_split: None,
328            sql_server_split: None,
329            _phantom: PhantomData,
330        };
331        match T::source_type() {
332            CdcSourceType::Mysql => {
333                let split = MySqlCdcSplit::new(split_id, start_offset);
334                ret.mysql_split = Some(split);
335            }
336            CdcSourceType::Postgres => {
337                let split = PostgresCdcSplit::new(split_id, start_offset, None);
338                ret.postgres_split = Some(split);
339            }
340            CdcSourceType::Citus => {
341                let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
342                ret.citus_split = Some(split);
343            }
344            CdcSourceType::Mongodb => {
345                let split = MongoDbCdcSplit::new(split_id, start_offset);
346                ret.mongodb_split = Some(split);
347            }
348            CdcSourceType::SqlServer => {
349                let split = SqlServerCdcSplit::new(split_id, start_offset);
350                ret.sql_server_split = Some(split);
351            }
352            CdcSourceType::Unspecified => {
353                unreachable!("invalid debezium split")
354            }
355        }
356        ret
357    }
358
359    pub fn split_id(&self) -> u32 {
360        dispatch_cdc_split!(self, ref, split_id())
361    }
362
363    pub fn start_offset(&self) -> &Option<String> {
364        dispatch_cdc_split!(self, ref, start_offset())
365    }
366
367    pub fn snapshot_done(&self) -> bool {
368        dispatch_cdc_split!(self, ref, is_snapshot_done())
369    }
370
371    pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
372        dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
373        Ok(())
374    }
375}