risingwave_connector/source/cdc/
split.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait, Mysql, Postgres, SqlServer};
24use crate::source::{SplitId, SplitMetaData};
25
26#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30 pub split_id: u32,
31 pub start_offset: Option<String>,
32 pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37 Self {
38 split_id,
39 start_offset,
40 snapshot_done: false,
41 }
42 }
43}
44
45trait CdcSplitTrait: Send + Sync {
46 fn split_id(&self) -> u32;
47 fn start_offset(&self) -> &Option<String>;
48 fn is_snapshot_done(&self) -> bool;
49 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53 let mut snapshot_done = self.is_snapshot_done();
55 if snapshot_done {
56 return Ok(snapshot_done);
57 }
58
59 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60 format!(
61 "invalid cdc offset: {}, split: {}",
62 start_offset,
63 self.split_id()
64 )
65 })?;
66
67 if !dbz_offset.is_heartbeat {
69 snapshot_done = match dbz_offset.source_offset.snapshot {
70 Some(val) => !val,
71 None => true,
72 };
73 }
74 Ok(snapshot_done)
75 }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80 pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85 pub inner: CdcSplitBase,
86 pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92 pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97 pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102 let split = CdcSplitBase {
103 split_id,
104 start_offset,
105 snapshot_done: false,
106 };
107 Self { inner: split }
108 }
109
110 pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
128 let offset_str = self.inner.start_offset.as_ref()?;
129 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
130 let source_offset = offset.get("sourceOffset")?;
131
132 let file = source_offset.get("file")?.as_str()?;
133 let pos = source_offset.get("pos")?.as_u64()?;
134
135 let file_seq = file.strip_prefix("binlog.")?.parse::<u64>().ok()?;
137
138 Some((file_seq, pos))
139 }
140}
141
142impl CdcSplitTrait for MySqlCdcSplit {
143 fn split_id(&self) -> u32 {
144 self.inner.split_id
145 }
146
147 fn start_offset(&self) -> &Option<String> {
148 &self.inner.start_offset
149 }
150
151 fn is_snapshot_done(&self) -> bool {
152 self.inner.snapshot_done
153 }
154
155 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
156 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
158 self.inner.start_offset = Some(last_seen_offset);
159 Ok(())
160 }
161}
162
163impl PostgresCdcSplit {
164 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
165 let split = CdcSplitBase {
166 split_id,
167 start_offset,
168 snapshot_done: false,
169 };
170 Self {
171 inner: split,
172 server_addr,
173 }
174 }
175
176 pub fn pg_lsn(&self) -> Option<u64> {
181 let offset_str = self.inner.start_offset.as_ref()?;
182 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
183 let source_offset = offset.get("sourceOffset")?;
184 let lsn = source_offset.get("lsn")?;
185 lsn.as_u64()
186 }
187}
188
189impl CdcSplitTrait for PostgresCdcSplit {
190 fn split_id(&self) -> u32 {
191 self.inner.split_id
192 }
193
194 fn start_offset(&self) -> &Option<String> {
195 &self.inner.start_offset
196 }
197
198 fn is_snapshot_done(&self) -> bool {
199 self.inner.snapshot_done
200 }
201
202 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
203 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
204 self.inner.start_offset = Some(last_seen_offset);
205 Ok(())
206 }
207
208 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
209 let mut snapshot_done = self.is_snapshot_done();
211 if snapshot_done {
212 return Ok(snapshot_done);
213 }
214
215 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
216 format!(
217 "invalid postgres offset: {}, split: {}",
218 start_offset, self.inner.split_id
219 )
220 })?;
221
222 if !dbz_offset.is_heartbeat {
224 snapshot_done = dbz_offset
225 .source_offset
226 .last_snapshot_record
227 .unwrap_or(false);
228 }
229 Ok(snapshot_done)
230 }
231}
232
233impl MongoDbCdcSplit {
234 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
235 let split = CdcSplitBase {
236 split_id,
237 start_offset,
238 snapshot_done: false,
239 };
240 Self { inner: split }
241 }
242}
243
244impl CdcSplitTrait for MongoDbCdcSplit {
245 fn split_id(&self) -> u32 {
246 self.inner.split_id
247 }
248
249 fn start_offset(&self) -> &Option<String> {
250 &self.inner.start_offset
251 }
252
253 fn is_snapshot_done(&self) -> bool {
254 self.inner.snapshot_done
255 }
256
257 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
258 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
260 self.inner.start_offset = Some(last_seen_offset);
261 Ok(())
262 }
263}
264
265impl SqlServerCdcSplit {
266 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
267 let split = CdcSplitBase {
268 split_id,
269 start_offset,
270 snapshot_done: false,
271 };
272 Self { inner: split }
273 }
274
275 pub fn sql_server_change_lsn(&self) -> Option<u128> {
277 let offset_str = self.inner.start_offset.as_ref()?;
278 extract_sql_server_change_lsn_from_offset_str(offset_str)
279 }
280
281 pub fn sql_server_commit_lsn(&self) -> Option<u128> {
283 let offset_str = self.inner.start_offset.as_ref()?;
284 extract_sql_server_commit_lsn_from_offset_str(offset_str)
285 }
286}
287
288impl CdcSplitTrait for SqlServerCdcSplit {
289 fn split_id(&self) -> u32 {
290 self.inner.split_id
291 }
292
293 fn start_offset(&self) -> &Option<String> {
294 &self.inner.start_offset
295 }
296
297 fn is_snapshot_done(&self) -> bool {
298 self.inner.snapshot_done
299 }
300
301 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
302 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
304 self.inner.start_offset = Some(last_seen_offset);
305 Ok(())
306 }
307}
308
309#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
311pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
312 pub mysql_split: Option<MySqlCdcSplit>,
313
314 #[serde(rename = "pg_split")] pub postgres_split: Option<PostgresCdcSplit>,
316 pub citus_split: Option<PostgresCdcSplit>,
317 pub mongodb_split: Option<MongoDbCdcSplit>,
318 pub sql_server_split: Option<SqlServerCdcSplit>,
319
320 #[serde(skip)]
321 pub _phantom: PhantomData<T>,
322}
323
324macro_rules! dispatch_cdc_split_inner {
325 ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
326 match T::source_type() {
327 $(
328 CdcSourceType::$cdc_source_type => {
329 $crate::paste! {
330 $dbz_split.[<$cdc_source_split>]
331 .[<as_ $as_type>]()
332 .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
333 .$body
334 }
335 }
336 )*
337 CdcSourceType::Unspecified => {
338 unreachable!("invalid debezium split");
339 }
340 }
341 }
342}
343
344macro_rules! dispatch_cdc_split {
346 ($dbz_split:expr, $as_type:tt, $body:expr) => {
347 dispatch_cdc_split_inner!($dbz_split, $as_type, {
348 {Mysql, mysql_split},
349 {Postgres, postgres_split},
350 {Citus, citus_split},
351 {Mongodb, mongodb_split},
352 {SqlServer, sql_server_split}
353 }, $body)
354 }
355}
356
357impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
358 fn id(&self) -> SplitId {
359 format!("{}", self.split_id()).into()
360 }
361
362 fn encode_to_json(&self) -> JsonbVal {
363 serde_json::to_value(self.clone()).unwrap().into()
364 }
365
366 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
367 serde_json::from_value(value.take()).map_err(Into::into)
368 }
369
370 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
371 self.update_offset_inner(last_seen_offset)
372 }
373}
374
375impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
376 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
377 let mut ret = Self {
378 mysql_split: None,
379 postgres_split: None,
380 citus_split: None,
381 mongodb_split: None,
382 sql_server_split: None,
383 _phantom: PhantomData,
384 };
385 match T::source_type() {
386 CdcSourceType::Mysql => {
387 let split = MySqlCdcSplit::new(split_id, start_offset);
388 ret.mysql_split = Some(split);
389 }
390 CdcSourceType::Postgres => {
391 let split = PostgresCdcSplit::new(split_id, start_offset, None);
392 ret.postgres_split = Some(split);
393 }
394 CdcSourceType::Citus => {
395 let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
396 ret.citus_split = Some(split);
397 }
398 CdcSourceType::Mongodb => {
399 let split = MongoDbCdcSplit::new(split_id, start_offset);
400 ret.mongodb_split = Some(split);
401 }
402 CdcSourceType::SqlServer => {
403 let split = SqlServerCdcSplit::new(split_id, start_offset);
404 ret.sql_server_split = Some(split);
405 }
406 CdcSourceType::Unspecified => {
407 unreachable!("invalid debezium split")
408 }
409 }
410 ret
411 }
412
413 pub fn split_id(&self) -> u32 {
414 dispatch_cdc_split!(self, ref, split_id())
415 }
416
417 pub fn start_offset(&self) -> &Option<String> {
418 dispatch_cdc_split!(self, ref, start_offset())
419 }
420
421 pub fn snapshot_done(&self) -> bool {
422 dispatch_cdc_split!(self, ref, is_snapshot_done())
423 }
424
425 pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
426 dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
427 Ok(())
428 }
429}
430
431impl DebeziumCdcSplit<Postgres> {
432 pub fn pg_lsn(&self) -> Option<u64> {
436 self.postgres_split.as_ref()?.pg_lsn()
437 }
438}
439
440impl DebeziumCdcSplit<Mysql> {
441 pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
447 self.mysql_split.as_ref()?.mysql_binlog_offset()
448 }
449}
450
451impl DebeziumCdcSplit<SqlServer> {
452 pub fn sql_server_change_lsn(&self) -> Option<u128> {
454 self.sql_server_split.as_ref()?.sql_server_change_lsn()
455 }
456
457 pub fn sql_server_commit_lsn(&self) -> Option<u128> {
459 self.sql_server_split.as_ref()?.sql_server_commit_lsn()
460 }
461}
462
463pub fn extract_postgres_lsn_from_offset_str(offset_str: &str) -> Option<u64> {
470 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
471 let source_offset = offset.get("sourceOffset")?;
472 let lsn = source_offset.get("lsn")?;
473 lsn.as_u64()
474}
475
476pub fn parse_sql_server_lsn_str(lsn: &str) -> Option<u128> {
478 let mut parts = lsn.split(':');
479 let part0 = u32::from_str_radix(parts.next()?, 16).ok()? as u128;
480 let part1 = u32::from_str_radix(parts.next()?, 16).ok()? as u128;
481 let part2 = u16::from_str_radix(parts.next()?, 16).ok()? as u128;
482 if parts.next().is_some() {
483 return None;
484 }
485
486 Some((part0 << 48) | (part1 << 16) | part2)
487}
488
489pub fn extract_sql_server_change_lsn_from_offset_str(offset_str: &str) -> Option<u128> {
491 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
492 let source_offset = offset.get("sourceOffset")?;
493 let lsn = source_offset.get("change_lsn")?.as_str()?;
494 parse_sql_server_lsn_str(lsn)
495}
496
497pub fn extract_sql_server_commit_lsn_from_offset_str(offset_str: &str) -> Option<u128> {
499 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
500 let source_offset = offset.get("sourceOffset")?;
501 let lsn = source_offset.get("commit_lsn")?.as_str()?;
502 parse_sql_server_lsn_str(lsn)
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_parse_sql_server_lsn_str() {
511 let lsn = "00000027:00000ac0:0002";
512 let parsed = parse_sql_server_lsn_str(lsn).unwrap();
513 let expected = ((0x00000027_u128) << 48) | ((0x00000ac0_u128) << 16) | (0x0002_u128);
514 assert_eq!(parsed, expected);
515 }
516
517 #[test]
518 fn test_extract_sql_server_lsn_from_offset_str() {
519 let offset = r#"{
520 "sourcePartition": {"server":"RW_CDC_1001"},
521 "sourceOffset": {
522 "change_lsn":"00000027:00000ac0:0001",
523 "commit_lsn":"00000027:00000ac0:0002"
524 },
525 "isHeartbeat": false
526 }"#;
527
528 let change_lsn = extract_sql_server_change_lsn_from_offset_str(offset).unwrap();
529 let commit_lsn = extract_sql_server_commit_lsn_from_offset_str(offset).unwrap();
530 assert!(change_lsn < commit_lsn);
531 }
532}