risingwave_connector/source/cdc/
split.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait, Mysql, Postgres, SqlServer};
24use crate::source::{SplitId, SplitMetaData};
25
26/// The base states of a CDC split, which will be persisted to checkpoint.
27/// CDC source only has single split, so we use the `source_id` to identify the split.
28#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30    pub split_id: u32,
31    pub start_offset: Option<String>,
32    pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37        Self {
38            split_id,
39            start_offset,
40            snapshot_done: false,
41        }
42    }
43}
44
45trait CdcSplitTrait: Send + Sync {
46    fn split_id(&self) -> u32;
47    fn start_offset(&self) -> &Option<String>;
48    fn is_snapshot_done(&self) -> bool;
49    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51    // MySQL and MongoDB shares the same logic to extract the snapshot flag
52    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53        // if snapshot_done is already true, it won't be changed
54        let mut snapshot_done = self.is_snapshot_done();
55        if snapshot_done {
56            return Ok(snapshot_done);
57        }
58
59        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60            format!(
61                "invalid cdc offset: {}, split: {}",
62                start_offset,
63                self.split_id()
64            )
65        })?;
66
67        // heartbeat event should not update the `snapshot_done` flag
68        if !dbz_offset.is_heartbeat {
69            snapshot_done = match dbz_offset.source_offset.snapshot {
70                Some(val) => !val,
71                None => true,
72            };
73        }
74        Ok(snapshot_done)
75    }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80    pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85    pub inner: CdcSplitBase,
86    // the hostname and port of a node that holding shard tables (for Citus)
87    pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92    pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97    pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102        let split = CdcSplitBase {
103            split_id,
104            start_offset,
105            snapshot_done: false,
106        };
107        Self { inner: split }
108    }
109
110    /// Extract MySQL CDC binlog offset (file sequence and position) from the offset JSON string.
111    ///
112    /// MySQL binlog offset format:
113    /// ```json
114    /// {
115    ///   "sourcePartition": { "server": "..." },
116    ///   "sourceOffset": {
117    ///     "file": "binlog.000123",
118    ///     "pos": 456789,
119    ///     ...
120    ///   }
121    /// }
122    /// ```
123    ///
124    /// Returns `Some((file_seq, position))` where:
125    /// - `file_seq`: the numeric part of binlog filename (e.g., 123 from "binlog.000123")
126    /// - `position`: the byte offset within the binlog file
127    pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
128        let offset_str = self.inner.start_offset.as_ref()?;
129        let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
130        let source_offset = offset.get("sourceOffset")?;
131
132        let file = source_offset.get("file")?.as_str()?;
133        let pos = source_offset.get("pos")?.as_u64()?;
134
135        // Extract numeric sequence from "binlog.NNNNNN"
136        let file_seq = file.strip_prefix("binlog.")?.parse::<u64>().ok()?;
137
138        Some((file_seq, pos))
139    }
140}
141
142impl CdcSplitTrait for MySqlCdcSplit {
143    fn split_id(&self) -> u32 {
144        self.inner.split_id
145    }
146
147    fn start_offset(&self) -> &Option<String> {
148        &self.inner.start_offset
149    }
150
151    fn is_snapshot_done(&self) -> bool {
152        self.inner.snapshot_done
153    }
154
155    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
156        // if snapshot_done is already true, it won't be updated
157        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
158        self.inner.start_offset = Some(last_seen_offset);
159        Ok(())
160    }
161}
162
163impl PostgresCdcSplit {
164    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
165        let split = CdcSplitBase {
166            split_id,
167            start_offset,
168            snapshot_done: false,
169        };
170        Self {
171            inner: split,
172            server_addr,
173        }
174    }
175
176    /// Extract PostgreSQL LSN value from the offset JSON string.
177    ///
178    /// This function parses the offset JSON and extracts the LSN value from the sourceOffset.lsn field.
179    /// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
180    pub fn pg_lsn(&self) -> Option<u64> {
181        let offset_str = self.inner.start_offset.as_ref()?;
182        let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
183        let source_offset = offset.get("sourceOffset")?;
184        let lsn = source_offset.get("lsn")?;
185        lsn.as_u64()
186    }
187}
188
189impl CdcSplitTrait for PostgresCdcSplit {
190    fn split_id(&self) -> u32 {
191        self.inner.split_id
192    }
193
194    fn start_offset(&self) -> &Option<String> {
195        &self.inner.start_offset
196    }
197
198    fn is_snapshot_done(&self) -> bool {
199        self.inner.snapshot_done
200    }
201
202    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
203        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
204        self.inner.start_offset = Some(last_seen_offset);
205        Ok(())
206    }
207
208    fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
209        // if snapshot_done is already true, it won't be changed
210        let mut snapshot_done = self.is_snapshot_done();
211        if snapshot_done {
212            return Ok(snapshot_done);
213        }
214
215        let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
216            format!(
217                "invalid postgres offset: {}, split: {}",
218                start_offset, self.inner.split_id
219            )
220        })?;
221
222        // heartbeat event should not update the `snapshot_done` flag
223        if !dbz_offset.is_heartbeat {
224            snapshot_done = dbz_offset
225                .source_offset
226                .last_snapshot_record
227                .unwrap_or(false);
228        }
229        Ok(snapshot_done)
230    }
231}
232
233impl MongoDbCdcSplit {
234    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
235        let split = CdcSplitBase {
236            split_id,
237            start_offset,
238            snapshot_done: false,
239        };
240        Self { inner: split }
241    }
242}
243
244impl CdcSplitTrait for MongoDbCdcSplit {
245    fn split_id(&self) -> u32 {
246        self.inner.split_id
247    }
248
249    fn start_offset(&self) -> &Option<String> {
250        &self.inner.start_offset
251    }
252
253    fn is_snapshot_done(&self) -> bool {
254        self.inner.snapshot_done
255    }
256
257    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
258        // if snapshot_done is already true, it will remain true
259        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
260        self.inner.start_offset = Some(last_seen_offset);
261        Ok(())
262    }
263}
264
265impl SqlServerCdcSplit {
266    pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
267        let split = CdcSplitBase {
268            split_id,
269            start_offset,
270            snapshot_done: false,
271        };
272        Self { inner: split }
273    }
274
275    /// Extract SQL Server `change_lsn` value from the offset JSON string.
276    pub fn sql_server_change_lsn(&self) -> Option<u128> {
277        let offset_str = self.inner.start_offset.as_ref()?;
278        extract_sql_server_change_lsn_from_offset_str(offset_str)
279    }
280
281    /// Extract SQL Server `commit_lsn` value from the offset JSON string.
282    pub fn sql_server_commit_lsn(&self) -> Option<u128> {
283        let offset_str = self.inner.start_offset.as_ref()?;
284        extract_sql_server_commit_lsn_from_offset_str(offset_str)
285    }
286}
287
288impl CdcSplitTrait for SqlServerCdcSplit {
289    fn split_id(&self) -> u32 {
290        self.inner.split_id
291    }
292
293    fn start_offset(&self) -> &Option<String> {
294        &self.inner.start_offset
295    }
296
297    fn is_snapshot_done(&self) -> bool {
298        self.inner.snapshot_done
299    }
300
301    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
302        // if snapshot_done is already true, it will remain true
303        self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
304        self.inner.start_offset = Some(last_seen_offset);
305        Ok(())
306    }
307}
308
309/// We use this struct to wrap the specific split, which act as an interface to other modules
310#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
311pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
312    pub mysql_split: Option<MySqlCdcSplit>,
313
314    #[serde(rename = "pg_split")] // backward compatibility
315    pub postgres_split: Option<PostgresCdcSplit>,
316    pub citus_split: Option<PostgresCdcSplit>,
317    pub mongodb_split: Option<MongoDbCdcSplit>,
318    pub sql_server_split: Option<SqlServerCdcSplit>,
319
320    #[serde(skip)]
321    pub _phantom: PhantomData<T>,
322}
323
324macro_rules! dispatch_cdc_split_inner {
325    ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
326        match T::source_type() {
327            $(
328                CdcSourceType::$cdc_source_type => {
329                    $crate::paste! {
330                        $dbz_split.[<$cdc_source_split>]
331                            .[<as_ $as_type>]()
332                            .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
333                            .$body
334                    }
335                }
336            )*
337            CdcSourceType::Unspecified => {
338                unreachable!("invalid debezium split");
339            }
340        }
341    }
342}
343
344// call corresponding split method of the specific cdc source type
345macro_rules! dispatch_cdc_split {
346    ($dbz_split:expr, $as_type:tt, $body:expr) => {
347        dispatch_cdc_split_inner!($dbz_split, $as_type, {
348            {Mysql, mysql_split},
349            {Postgres, postgres_split},
350            {Citus, citus_split},
351            {Mongodb, mongodb_split},
352            {SqlServer, sql_server_split}
353        }, $body)
354    }
355}
356
357impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
358    fn id(&self) -> SplitId {
359        format!("{}", self.split_id()).into()
360    }
361
362    fn encode_to_json(&self) -> JsonbVal {
363        serde_json::to_value(self.clone()).unwrap().into()
364    }
365
366    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
367        serde_json::from_value(value.take()).map_err(Into::into)
368    }
369
370    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
371        self.update_offset_inner(last_seen_offset)
372    }
373}
374
375impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
376    pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
377        let mut ret = Self {
378            mysql_split: None,
379            postgres_split: None,
380            citus_split: None,
381            mongodb_split: None,
382            sql_server_split: None,
383            _phantom: PhantomData,
384        };
385        match T::source_type() {
386            CdcSourceType::Mysql => {
387                let split = MySqlCdcSplit::new(split_id, start_offset);
388                ret.mysql_split = Some(split);
389            }
390            CdcSourceType::Postgres => {
391                let split = PostgresCdcSplit::new(split_id, start_offset, None);
392                ret.postgres_split = Some(split);
393            }
394            CdcSourceType::Citus => {
395                let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
396                ret.citus_split = Some(split);
397            }
398            CdcSourceType::Mongodb => {
399                let split = MongoDbCdcSplit::new(split_id, start_offset);
400                ret.mongodb_split = Some(split);
401            }
402            CdcSourceType::SqlServer => {
403                let split = SqlServerCdcSplit::new(split_id, start_offset);
404                ret.sql_server_split = Some(split);
405            }
406            CdcSourceType::Unspecified => {
407                unreachable!("invalid debezium split")
408            }
409        }
410        ret
411    }
412
413    pub fn split_id(&self) -> u32 {
414        dispatch_cdc_split!(self, ref, split_id())
415    }
416
417    pub fn start_offset(&self) -> &Option<String> {
418        dispatch_cdc_split!(self, ref, start_offset())
419    }
420
421    pub fn snapshot_done(&self) -> bool {
422        dispatch_cdc_split!(self, ref, is_snapshot_done())
423    }
424
425    pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
426        dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
427        Ok(())
428    }
429}
430
431impl DebeziumCdcSplit<Postgres> {
432    /// Extract PostgreSQL LSN value from the current split offset.
433    ///
434    /// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
435    pub fn pg_lsn(&self) -> Option<u64> {
436        self.postgres_split.as_ref()?.pg_lsn()
437    }
438}
439
440impl DebeziumCdcSplit<Mysql> {
441    /// Extract MySQL CDC binlog offset (file sequence and position) from the current split offset.
442    ///
443    /// Returns `Some((file_seq, position))` where:
444    /// - `file_seq`: the numeric part of binlog filename (e.g., 123 from "binlog.000123")
445    /// - `position`: the byte offset within the binlog file
446    pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
447        self.mysql_split.as_ref()?.mysql_binlog_offset()
448    }
449}
450
451impl DebeziumCdcSplit<SqlServer> {
452    /// Extract SQL Server CDC `change_lsn` from the current split offset.
453    pub fn sql_server_change_lsn(&self) -> Option<u128> {
454        self.sql_server_split.as_ref()?.sql_server_change_lsn()
455    }
456
457    /// Extract SQL Server CDC `commit_lsn` from the current split offset.
458    pub fn sql_server_commit_lsn(&self) -> Option<u128> {
459        self.sql_server_split.as_ref()?.sql_server_commit_lsn()
460    }
461}
462
463/// Extract PostgreSQL LSN value from a CDC offset JSON string.
464///
465/// This is a standalone helper function that can be used when you only have the offset string
466/// (e.g., in callbacks) and don't have access to the Split object.
467///
468/// Returns Some(lsn) if the LSN is found and can be parsed as u64, None otherwise.
469pub fn extract_postgres_lsn_from_offset_str(offset_str: &str) -> Option<u64> {
470    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
471    let source_offset = offset.get("sourceOffset")?;
472    let lsn = source_offset.get("lsn")?;
473    lsn.as_u64()
474}
475
476/// Parse SQL Server LSN string (`XXXXXXXX:XXXXXXXX:XXXX`) into a comparable integer.
477pub fn parse_sql_server_lsn_str(lsn: &str) -> Option<u128> {
478    let mut parts = lsn.split(':');
479    let part0 = u32::from_str_radix(parts.next()?, 16).ok()? as u128;
480    let part1 = u32::from_str_radix(parts.next()?, 16).ok()? as u128;
481    let part2 = u16::from_str_radix(parts.next()?, 16).ok()? as u128;
482    if parts.next().is_some() {
483        return None;
484    }
485
486    Some((part0 << 48) | (part1 << 16) | part2)
487}
488
489/// Extract SQL Server `change_lsn` from a CDC offset JSON string.
490pub fn extract_sql_server_change_lsn_from_offset_str(offset_str: &str) -> Option<u128> {
491    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
492    let source_offset = offset.get("sourceOffset")?;
493    let lsn = source_offset.get("change_lsn")?.as_str()?;
494    parse_sql_server_lsn_str(lsn)
495}
496
497/// Extract SQL Server `commit_lsn` from a CDC offset JSON string.
498pub fn extract_sql_server_commit_lsn_from_offset_str(offset_str: &str) -> Option<u128> {
499    let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
500    let source_offset = offset.get("sourceOffset")?;
501    let lsn = source_offset.get("commit_lsn")?.as_str()?;
502    parse_sql_server_lsn_str(lsn)
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_parse_sql_server_lsn_str() {
511        let lsn = "00000027:00000ac0:0002";
512        let parsed = parse_sql_server_lsn_str(lsn).unwrap();
513        let expected = ((0x00000027_u128) << 48) | ((0x00000ac0_u128) << 16) | (0x0002_u128);
514        assert_eq!(parsed, expected);
515    }
516
517    #[test]
518    fn test_extract_sql_server_lsn_from_offset_str() {
519        let offset = r#"{
520            "sourcePartition": {"server":"RW_CDC_1001"},
521            "sourceOffset": {
522                "change_lsn":"00000027:00000ac0:0001",
523                "commit_lsn":"00000027:00000ac0:0002"
524            },
525            "isHeartbeat": false
526        }"#;
527
528        let change_lsn = extract_sql_server_change_lsn_from_offset_str(offset).unwrap();
529        let commit_lsn = extract_sql_server_commit_lsn_from_offset_str(offset).unwrap();
530        assert!(change_lsn < commit_lsn);
531    }
532}