risingwave_connector/source/cdc/
split.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait, Mysql, Postgres};
24use crate::source::{SplitId, SplitMetaData};
25
26/// The base states of a CDC split, which will be persisted to checkpoint.
27/// CDC source only has single split, so we use the `source_id` to identify the split.
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30    pub split_id: u32,
31    pub start_offset: Option<String>,
32    pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37        Self {
38            split_id,
39            start_offset,
40            snapshot_done: false,
41        }
42    }
43}
44
45trait CdcSplitTrait: Send + Sync {
46    fn split_id(&self) -> u32;
47    fn start_offset(&self) -> &Option<String>;
48    fn is_snapshot_done(&self) -> bool;
49    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51    // MySQL and MongoDB shares the same logic to extract the snapshot flag
52    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53        // if snapshot_done is already true, it won't be changed
54        let mut snapshot_done = self.is_snapshot_done();
55        if snapshot_done {
56            return Ok(snapshot_done);
57        }
58
59        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60            format!(
61                "invalid cdc offset: {}, split: {}",
62                start_offset,
63                self.split_id()
64            )
65        })?;
66
67        // heartbeat event should not update the `snapshot_done` flag
68        if !dbz_offset.is_heartbeat {
69            snapshot_done = match dbz_offset.source_offset.snapshot {
70                Some(val) => !val,
71                None => true,
72            };
73        }
74        Ok(snapshot_done)
75    }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80    pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85    pub inner: CdcSplitBase,
86    // the hostname and port of a node that holding shard tables (for Citus)
87    pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92    pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97    pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102        let split = CdcSplitBase {
103            split_id,
104            start_offset,
105            snapshot_done: false,
106        };
107        Self { inner: split }
108    }
109
110    /// Extract MySQL CDC binlog offset (file sequence and position) from the offset JSON string.
111    ///
112    /// MySQL binlog offset format:
113    /// ```json
114    /// {
115    ///   "sourcePartition": { "server": "..." },
116    ///   "sourceOffset": {
117    ///     "file": "binlog.000123",
118    ///     "pos": 456789,
119    ///     ...
120    ///   }
121    /// }
122    /// ```
123    ///
124    /// Returns `Some((file_seq, position))` where:
125    /// - `file_seq`: the numeric part of binlog filename (e.g., 123 from "binlog.000123")
126    /// - `position`: the byte offset within the binlog file
127    pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
128        let offset_str = self.inner.start_offset.as_ref()?;
129        let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
130        let source_offset = offset.get("sourceOffset")?;
131
132        let file = source_offset.get("file")?.as_str()?;
133        let pos = source_offset.get("pos")?.as_u64()?;
134
135        // Extract numeric sequence from "binlog.NNNNNN"
136        let file_seq = file.strip_prefix("binlog.")?.parse::<u64>().ok()?;
137
138        Some((file_seq, pos))
139    }
140}
141
142impl CdcSplitTrait for MySqlCdcSplit {
143    fn split_id(&self) -> u32 {
144        self.inner.split_id
145    }
146
147    fn start_offset(&self) -> &Option<String> {
148        &self.inner.start_offset
149    }
150
151    fn is_snapshot_done(&self) -> bool {
152        self.inner.snapshot_done
153    }
154
155    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
156        // if snapshot_done is already true, it won't be updated
157        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
158        self.inner.start_offset = Some(last_seen_offset);
159        Ok(())
160    }
161}
162
163impl PostgresCdcSplit {
164    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
165        let split = CdcSplitBase {
166            split_id,
167            start_offset,
168            snapshot_done: false,
169        };
170        Self {
171            inner: split,
172            server_addr,
173        }
174    }
175
176    /// Extract PostgreSQL LSN value from the offset JSON string.
177    ///
178    /// This function parses the offset JSON and extracts the LSN value from the sourceOffset.lsn field.
179    /// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
180    pub fn pg_lsn(&self) -> Option<u64> {
181        let offset_str = self.inner.start_offset.as_ref()?;
182        let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
183        let source_offset = offset.get("sourceOffset")?;
184        let lsn = source_offset.get("lsn")?;
185        lsn.as_u64()
186    }
187}
188
189impl CdcSplitTrait for PostgresCdcSplit {
190    fn split_id(&self) -> u32 {
191        self.inner.split_id
192    }
193
194    fn start_offset(&self) -> &Option<String> {
195        &self.inner.start_offset
196    }
197
198    fn is_snapshot_done(&self) -> bool {
199        self.inner.snapshot_done
200    }
201
202    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
203        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
204        self.inner.start_offset = Some(last_seen_offset);
205        Ok(())
206    }
207
208    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
209        // if snapshot_done is already true, it won't be changed
210        let mut snapshot_done = self.is_snapshot_done();
211        if snapshot_done {
212            return Ok(snapshot_done);
213        }
214
215        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
216            format!(
217                "invalid postgres offset: {}, split: {}",
218                start_offset, self.inner.split_id
219            )
220        })?;
221
222        // heartbeat event should not update the `snapshot_done` flag
223        if !dbz_offset.is_heartbeat {
224            snapshot_done = dbz_offset
225                .source_offset
226                .last_snapshot_record
227                .unwrap_or(false);
228        }
229        Ok(snapshot_done)
230    }
231}
232
233impl MongoDbCdcSplit {
234    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
235        let split = CdcSplitBase {
236            split_id,
237            start_offset,
238            snapshot_done: false,
239        };
240        Self { inner: split }
241    }
242}
243
244impl CdcSplitTrait for MongoDbCdcSplit {
245    fn split_id(&self) -> u32 {
246        self.inner.split_id
247    }
248
249    fn start_offset(&self) -> &Option<String> {
250        &self.inner.start_offset
251    }
252
253    fn is_snapshot_done(&self) -> bool {
254        self.inner.snapshot_done
255    }
256
257    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
258        // if snapshot_done is already true, it will remain true
259        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
260        self.inner.start_offset = Some(last_seen_offset);
261        Ok(())
262    }
263}
264
265impl SqlServerCdcSplit {
266    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
267        let split = CdcSplitBase {
268            split_id,
269            start_offset,
270            snapshot_done: false,
271        };
272        Self { inner: split }
273    }
274}
275
276impl CdcSplitTrait for SqlServerCdcSplit {
277    fn split_id(&self) -> u32 {
278        self.inner.split_id
279    }
280
281    fn start_offset(&self) -> &Option<String> {
282        &self.inner.start_offset
283    }
284
285    fn is_snapshot_done(&self) -> bool {
286        self.inner.snapshot_done
287    }
288
289    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
290        // if snapshot_done is already true, it will remain true
291        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
292        self.inner.start_offset = Some(last_seen_offset);
293        Ok(())
294    }
295}
296
297/// We use this struct to wrap the specific split, which act as an interface to other modules
298#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
299pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
300    pub mysql_split: Option<MySqlCdcSplit>,
301
302    #[serde(rename = "pg_split")] // backward compatibility
303    pub postgres_split: Option<PostgresCdcSplit>,
304    pub citus_split: Option<PostgresCdcSplit>,
305    pub mongodb_split: Option<MongoDbCdcSplit>,
306    pub sql_server_split: Option<SqlServerCdcSplit>,
307
308    #[serde(skip)]
309    pub _phantom: PhantomData<T>,
310}
311
312macro_rules! dispatch_cdc_split_inner {
313    ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
314        match T::source_type() {
315            $(
316                CdcSourceType::$cdc_source_type => {
317                    $crate::paste! {
318                        $dbz_split.[<$cdc_source_split>]
319                            .[<as_ $as_type>]()
320                            .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
321                            .$body
322                    }
323                }
324            )*
325            CdcSourceType::Unspecified => {
326                unreachable!("invalid debezium split");
327            }
328        }
329    }
330}
331
332// call corresponding split method of the specific cdc source type
333macro_rules! dispatch_cdc_split {
334    ($dbz_split:expr, $as_type:tt, $body:expr) => {
335        dispatch_cdc_split_inner!($dbz_split, $as_type, {
336            {Mysql, mysql_split},
337            {Postgres, postgres_split},
338            {Citus, citus_split},
339            {Mongodb, mongodb_split},
340            {SqlServer, sql_server_split}
341        }, $body)
342    }
343}
344
345impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
346    fn id(&self) -> SplitId {
347        format!("{}", self.split_id()).into()
348    }
349
350    fn encode_to_json(&self) -> JsonbVal {
351        serde_json::to_value(self.clone()).unwrap().into()
352    }
353
354    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
355        serde_json::from_value(value.take()).map_err(Into::into)
356    }
357
358    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
359        self.update_offset_inner(last_seen_offset)
360    }
361}
362
363impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
364    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
365        let mut ret = Self {
366            mysql_split: None,
367            postgres_split: None,
368            citus_split: None,
369            mongodb_split: None,
370            sql_server_split: None,
371            _phantom: PhantomData,
372        };
373        match T::source_type() {
374            CdcSourceType::Mysql => {
375                let split = MySqlCdcSplit::new(split_id, start_offset);
376                ret.mysql_split = Some(split);
377            }
378            CdcSourceType::Postgres => {
379                let split = PostgresCdcSplit::new(split_id, start_offset, None);
380                ret.postgres_split = Some(split);
381            }
382            CdcSourceType::Citus => {
383                let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
384                ret.citus_split = Some(split);
385            }
386            CdcSourceType::Mongodb => {
387                let split = MongoDbCdcSplit::new(split_id, start_offset);
388                ret.mongodb_split = Some(split);
389            }
390            CdcSourceType::SqlServer => {
391                let split = SqlServerCdcSplit::new(split_id, start_offset);
392                ret.sql_server_split = Some(split);
393            }
394            CdcSourceType::Unspecified => {
395                unreachable!("invalid debezium split")
396            }
397        }
398        ret
399    }
400
401    pub fn split_id(&self) -> u32 {
402        dispatch_cdc_split!(self, ref, split_id())
403    }
404
405    pub fn start_offset(&self) -> &Option<String> {
406        dispatch_cdc_split!(self, ref, start_offset())
407    }
408
409    pub fn snapshot_done(&self) -> bool {
410        dispatch_cdc_split!(self, ref, is_snapshot_done())
411    }
412
413    pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
414        dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
415        Ok(())
416    }
417}
418
419impl DebeziumCdcSplit<Postgres> {
420    /// Extract PostgreSQL LSN value from the current split offset.
421    ///
422    /// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
423    pub fn pg_lsn(&self) -> Option<u64> {
424        self.postgres_split.as_ref()?.pg_lsn()
425    }
426}
427
428impl DebeziumCdcSplit<Mysql> {
429    /// Extract MySQL CDC binlog offset (file sequence and position) from the current split offset.
430    ///
431    /// Returns `Some((file_seq, position))` where:
432    /// - `file_seq`: the numeric part of binlog filename (e.g., 123 from "binlog.000123")
433    /// - `position`: the byte offset within the binlog file
434    pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
435        self.mysql_split.as_ref()?.mysql_binlog_offset()
436    }
437}
438
439/// Extract PostgreSQL LSN value from a CDC offset JSON string.
440///
441/// This is a standalone helper function that can be used when you only have the offset string
442/// (e.g., in callbacks) and don't have access to the Split object.
443///
444/// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
445pub fn extract_postgres_lsn_from_offset_str(offset_str: &str) -> Option<u64> {
446    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
447    let source_offset = offset.get("sourceOffset")?;
448    let lsn = source_offset.get("lsn")?;
449    lsn.as_u64()
450}