risingwave_connector/source/cdc/
split.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait, Mysql, Postgres};
24use crate::source::{SplitId, SplitMetaData};
25
26#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30 pub split_id: u32,
31 pub start_offset: Option<String>,
32 pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37 Self {
38 split_id,
39 start_offset,
40 snapshot_done: false,
41 }
42 }
43}
44
45trait CdcSplitTrait: Send + Sync {
46 fn split_id(&self) -> u32;
47 fn start_offset(&self) -> &Option<String>;
48 fn is_snapshot_done(&self) -> bool;
49 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53 let mut snapshot_done = self.is_snapshot_done();
55 if snapshot_done {
56 return Ok(snapshot_done);
57 }
58
59 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60 format!(
61 "invalid cdc offset: {}, split: {}",
62 start_offset,
63 self.split_id()
64 )
65 })?;
66
67 if !dbz_offset.is_heartbeat {
69 snapshot_done = match dbz_offset.source_offset.snapshot {
70 Some(val) => !val,
71 None => true,
72 };
73 }
74 Ok(snapshot_done)
75 }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80 pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85 pub inner: CdcSplitBase,
86 pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92 pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97 pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102 let split = CdcSplitBase {
103 split_id,
104 start_offset,
105 snapshot_done: false,
106 };
107 Self { inner: split }
108 }
109
110 pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
128 let offset_str = self.inner.start_offset.as_ref()?;
129 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
130 let source_offset = offset.get("sourceOffset")?;
131
132 let file = source_offset.get("file")?.as_str()?;
133 let pos = source_offset.get("pos")?.as_u64()?;
134
135 let file_seq = file.strip_prefix("binlog.")?.parse::<u64>().ok()?;
137
138 Some((file_seq, pos))
139 }
140}
141
142impl CdcSplitTrait for MySqlCdcSplit {
143 fn split_id(&self) -> u32 {
144 self.inner.split_id
145 }
146
147 fn start_offset(&self) -> &Option<String> {
148 &self.inner.start_offset
149 }
150
151 fn is_snapshot_done(&self) -> bool {
152 self.inner.snapshot_done
153 }
154
155 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
156 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
158 self.inner.start_offset = Some(last_seen_offset);
159 Ok(())
160 }
161}
162
163impl PostgresCdcSplit {
164 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
165 let split = CdcSplitBase {
166 split_id,
167 start_offset,
168 snapshot_done: false,
169 };
170 Self {
171 inner: split,
172 server_addr,
173 }
174 }
175
176 pub fn pg_lsn(&self) -> Option<u64> {
181 let offset_str = self.inner.start_offset.as_ref()?;
182 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
183 let source_offset = offset.get("sourceOffset")?;
184 let lsn = source_offset.get("lsn")?;
185 lsn.as_u64()
186 }
187}
188
189impl CdcSplitTrait for PostgresCdcSplit {
190 fn split_id(&self) -> u32 {
191 self.inner.split_id
192 }
193
194 fn start_offset(&self) -> &Option<String> {
195 &self.inner.start_offset
196 }
197
198 fn is_snapshot_done(&self) -> bool {
199 self.inner.snapshot_done
200 }
201
202 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
203 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
204 self.inner.start_offset = Some(last_seen_offset);
205 Ok(())
206 }
207
208 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
209 let mut snapshot_done = self.is_snapshot_done();
211 if snapshot_done {
212 return Ok(snapshot_done);
213 }
214
215 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
216 format!(
217 "invalid postgres offset: {}, split: {}",
218 start_offset, self.inner.split_id
219 )
220 })?;
221
222 if !dbz_offset.is_heartbeat {
224 snapshot_done = dbz_offset
225 .source_offset
226 .last_snapshot_record
227 .unwrap_or(false);
228 }
229 Ok(snapshot_done)
230 }
231}
232
233impl MongoDbCdcSplit {
234 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
235 let split = CdcSplitBase {
236 split_id,
237 start_offset,
238 snapshot_done: false,
239 };
240 Self { inner: split }
241 }
242}
243
244impl CdcSplitTrait for MongoDbCdcSplit {
245 fn split_id(&self) -> u32 {
246 self.inner.split_id
247 }
248
249 fn start_offset(&self) -> &Option<String> {
250 &self.inner.start_offset
251 }
252
253 fn is_snapshot_done(&self) -> bool {
254 self.inner.snapshot_done
255 }
256
257 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
258 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
260 self.inner.start_offset = Some(last_seen_offset);
261 Ok(())
262 }
263}
264
265impl SqlServerCdcSplit {
266 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
267 let split = CdcSplitBase {
268 split_id,
269 start_offset,
270 snapshot_done: false,
271 };
272 Self { inner: split }
273 }
274}
275
276impl CdcSplitTrait for SqlServerCdcSplit {
277 fn split_id(&self) -> u32 {
278 self.inner.split_id
279 }
280
281 fn start_offset(&self) -> &Option<String> {
282 &self.inner.start_offset
283 }
284
285 fn is_snapshot_done(&self) -> bool {
286 self.inner.snapshot_done
287 }
288
289 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
290 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
292 self.inner.start_offset = Some(last_seen_offset);
293 Ok(())
294 }
295}
296
297#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
299pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
300 pub mysql_split: Option<MySqlCdcSplit>,
301
302 #[serde(rename = "pg_split")] pub postgres_split: Option<PostgresCdcSplit>,
304 pub citus_split: Option<PostgresCdcSplit>,
305 pub mongodb_split: Option<MongoDbCdcSplit>,
306 pub sql_server_split: Option<SqlServerCdcSplit>,
307
308 #[serde(skip)]
309 pub _phantom: PhantomData<T>,
310}
311
312macro_rules! dispatch_cdc_split_inner {
313 ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
314 match T::source_type() {
315 $(
316 CdcSourceType::$cdc_source_type => {
317 $crate::paste! {
318 $dbz_split.[<$cdc_source_split>]
319 .[<as_ $as_type>]()
320 .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
321 .$body
322 }
323 }
324 )*
325 CdcSourceType::Unspecified => {
326 unreachable!("invalid debezium split");
327 }
328 }
329 }
330}
331
332macro_rules! dispatch_cdc_split {
334 ($dbz_split:expr, $as_type:tt, $body:expr) => {
335 dispatch_cdc_split_inner!($dbz_split, $as_type, {
336 {Mysql, mysql_split},
337 {Postgres, postgres_split},
338 {Citus, citus_split},
339 {Mongodb, mongodb_split},
340 {SqlServer, sql_server_split}
341 }, $body)
342 }
343}
344
345impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
346 fn id(&self) -> SplitId {
347 format!("{}", self.split_id()).into()
348 }
349
350 fn encode_to_json(&self) -> JsonbVal {
351 serde_json::to_value(self.clone()).unwrap().into()
352 }
353
354 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
355 serde_json::from_value(value.take()).map_err(Into::into)
356 }
357
358 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
359 self.update_offset_inner(last_seen_offset)
360 }
361}
362
363impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
364 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
365 let mut ret = Self {
366 mysql_split: None,
367 postgres_split: None,
368 citus_split: None,
369 mongodb_split: None,
370 sql_server_split: None,
371 _phantom: PhantomData,
372 };
373 match T::source_type() {
374 CdcSourceType::Mysql => {
375 let split = MySqlCdcSplit::new(split_id, start_offset);
376 ret.mysql_split = Some(split);
377 }
378 CdcSourceType::Postgres => {
379 let split = PostgresCdcSplit::new(split_id, start_offset, None);
380 ret.postgres_split = Some(split);
381 }
382 CdcSourceType::Citus => {
383 let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
384 ret.citus_split = Some(split);
385 }
386 CdcSourceType::Mongodb => {
387 let split = MongoDbCdcSplit::new(split_id, start_offset);
388 ret.mongodb_split = Some(split);
389 }
390 CdcSourceType::SqlServer => {
391 let split = SqlServerCdcSplit::new(split_id, start_offset);
392 ret.sql_server_split = Some(split);
393 }
394 CdcSourceType::Unspecified => {
395 unreachable!("invalid debezium split")
396 }
397 }
398 ret
399 }
400
401 pub fn split_id(&self) -> u32 {
402 dispatch_cdc_split!(self, ref, split_id())
403 }
404
405 pub fn start_offset(&self) -> &Option<String> {
406 dispatch_cdc_split!(self, ref, start_offset())
407 }
408
409 pub fn snapshot_done(&self) -> bool {
410 dispatch_cdc_split!(self, ref, is_snapshot_done())
411 }
412
413 pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
414 dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
415 Ok(())
416 }
417}
418
419impl DebeziumCdcSplit<Postgres> {
420 pub fn pg_lsn(&self) -> Option<u64> {
424 self.postgres_split.as_ref()?.pg_lsn()
425 }
426}
427
428impl DebeziumCdcSplit<Mysql> {
429 pub fn mysql_binlog_offset(&self) -> Option<(u64, u64)> {
435 self.mysql_split.as_ref()?.mysql_binlog_offset()
436 }
437}
438
439pub fn extract_postgres_lsn_from_offset_str(offset_str: &str) -> Option<u64> {
446 let offset = serde_json::from_str::<serde_json::Value>(offset_str).ok()?;
447 let source_offset = offset.get("sourceOffset")?;
448 let lsn = source_offset.get("lsn")?;
449 lsn.as_u64()
450}