risingwave_connector/source/cdc/
split.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait};
24use crate::source::{SplitId, SplitMetaData};
25
26#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30 pub split_id: u32,
31 pub start_offset: Option<String>,
32 pub snapshot_done: bool,
33}
34
35impl CdcSplitBase {
36 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
37 Self {
38 split_id,
39 start_offset,
40 snapshot_done: false,
41 }
42 }
43}
44
45trait CdcSplitTrait: Send + Sync {
46 fn split_id(&self) -> u32;
47 fn start_offset(&self) -> &Option<String>;
48 fn is_snapshot_done(&self) -> bool;
49 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
50
51 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
53 let mut snapshot_done = self.is_snapshot_done();
55 if snapshot_done {
56 return Ok(snapshot_done);
57 }
58
59 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
60 format!(
61 "invalid cdc offset: {}, split: {}",
62 start_offset,
63 self.split_id()
64 )
65 })?;
66
67 if !dbz_offset.is_heartbeat {
69 snapshot_done = match dbz_offset.source_offset.snapshot {
70 Some(val) => !val,
71 None => true,
72 };
73 }
74 Ok(snapshot_done)
75 }
76}
77
78#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
79pub struct MySqlCdcSplit {
80 pub inner: CdcSplitBase,
81}
82
83#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
84pub struct PostgresCdcSplit {
85 pub inner: CdcSplitBase,
86 pub server_addr: Option<String>,
88}
89
90#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
91pub struct MongoDbCdcSplit {
92 pub inner: CdcSplitBase,
93}
94
95#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
96pub struct SqlServerCdcSplit {
97 pub inner: CdcSplitBase,
98}
99
100impl MySqlCdcSplit {
101 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
102 let split = CdcSplitBase {
103 split_id,
104 start_offset,
105 snapshot_done: false,
106 };
107 Self { inner: split }
108 }
109}
110
111impl CdcSplitTrait for MySqlCdcSplit {
112 fn split_id(&self) -> u32 {
113 self.inner.split_id
114 }
115
116 fn start_offset(&self) -> &Option<String> {
117 &self.inner.start_offset
118 }
119
120 fn is_snapshot_done(&self) -> bool {
121 self.inner.snapshot_done
122 }
123
124 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
125 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
127 self.inner.start_offset = Some(last_seen_offset);
128 Ok(())
129 }
130}
131
132impl PostgresCdcSplit {
133 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
134 let split = CdcSplitBase {
135 split_id,
136 start_offset,
137 snapshot_done: false,
138 };
139 Self {
140 inner: split,
141 server_addr,
142 }
143 }
144}
145
146impl CdcSplitTrait for PostgresCdcSplit {
147 fn split_id(&self) -> u32 {
148 self.inner.split_id
149 }
150
151 fn start_offset(&self) -> &Option<String> {
152 &self.inner.start_offset
153 }
154
155 fn is_snapshot_done(&self) -> bool {
156 self.inner.snapshot_done
157 }
158
159 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
160 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
161 self.inner.start_offset = Some(last_seen_offset);
162 Ok(())
163 }
164
165 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
166 let mut snapshot_done = self.is_snapshot_done();
168 if snapshot_done {
169 return Ok(snapshot_done);
170 }
171
172 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
173 format!(
174 "invalid postgres offset: {}, split: {}",
175 start_offset, self.inner.split_id
176 )
177 })?;
178
179 if !dbz_offset.is_heartbeat {
181 snapshot_done = dbz_offset
182 .source_offset
183 .last_snapshot_record
184 .unwrap_or(false);
185 }
186 Ok(snapshot_done)
187 }
188}
189
190impl MongoDbCdcSplit {
191 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
192 let split = CdcSplitBase {
193 split_id,
194 start_offset,
195 snapshot_done: false,
196 };
197 Self { inner: split }
198 }
199}
200
201impl CdcSplitTrait for MongoDbCdcSplit {
202 fn split_id(&self) -> u32 {
203 self.inner.split_id
204 }
205
206 fn start_offset(&self) -> &Option<String> {
207 &self.inner.start_offset
208 }
209
210 fn is_snapshot_done(&self) -> bool {
211 self.inner.snapshot_done
212 }
213
214 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
215 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
217 self.inner.start_offset = Some(last_seen_offset);
218 Ok(())
219 }
220}
221
222impl SqlServerCdcSplit {
223 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
224 let split = CdcSplitBase {
225 split_id,
226 start_offset,
227 snapshot_done: false,
228 };
229 Self { inner: split }
230 }
231}
232
233impl CdcSplitTrait for SqlServerCdcSplit {
234 fn split_id(&self) -> u32 {
235 self.inner.split_id
236 }
237
238 fn start_offset(&self) -> &Option<String> {
239 &self.inner.start_offset
240 }
241
242 fn is_snapshot_done(&self) -> bool {
243 self.inner.snapshot_done
244 }
245
246 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
247 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
249 self.inner.start_offset = Some(last_seen_offset);
250 Ok(())
251 }
252}
253
254#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
256pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
257 pub mysql_split: Option<MySqlCdcSplit>,
258
259 #[serde(rename = "pg_split")] pub postgres_split: Option<PostgresCdcSplit>,
261 pub citus_split: Option<PostgresCdcSplit>,
262 pub mongodb_split: Option<MongoDbCdcSplit>,
263 pub sql_server_split: Option<SqlServerCdcSplit>,
264
265 #[serde(skip)]
266 pub _phantom: PhantomData<T>,
267}
268
269macro_rules! dispatch_cdc_split_inner {
270 ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
271 match T::source_type() {
272 $(
273 CdcSourceType::$cdc_source_type => {
274 $crate::paste! {
275 $dbz_split.[<$cdc_source_split>]
276 .[<as_ $as_type>]()
277 .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
278 .$body
279 }
280 }
281 )*
282 CdcSourceType::Unspecified => {
283 unreachable!("invalid debezium split");
284 }
285 }
286 }
287}
288
289macro_rules! dispatch_cdc_split {
291 ($dbz_split:expr, $as_type:tt, $body:expr) => {
292 dispatch_cdc_split_inner!($dbz_split, $as_type, {
293 {Mysql, mysql_split},
294 {Postgres, postgres_split},
295 {Citus, citus_split},
296 {Mongodb, mongodb_split},
297 {SqlServer, sql_server_split}
298 }, $body)
299 }
300}
301
302impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
303 fn id(&self) -> SplitId {
304 format!("{}", self.split_id()).into()
305 }
306
307 fn encode_to_json(&self) -> JsonbVal {
308 serde_json::to_value(self.clone()).unwrap().into()
309 }
310
311 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
312 serde_json::from_value(value.take()).map_err(Into::into)
313 }
314
315 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
316 self.update_offset_inner(last_seen_offset)
317 }
318}
319
320impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
321 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
322 let mut ret = Self {
323 mysql_split: None,
324 postgres_split: None,
325 citus_split: None,
326 mongodb_split: None,
327 sql_server_split: None,
328 _phantom: PhantomData,
329 };
330 match T::source_type() {
331 CdcSourceType::Mysql => {
332 let split = MySqlCdcSplit::new(split_id, start_offset);
333 ret.mysql_split = Some(split);
334 }
335 CdcSourceType::Postgres => {
336 let split = PostgresCdcSplit::new(split_id, start_offset, None);
337 ret.postgres_split = Some(split);
338 }
339 CdcSourceType::Citus => {
340 let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
341 ret.citus_split = Some(split);
342 }
343 CdcSourceType::Mongodb => {
344 let split = MongoDbCdcSplit::new(split_id, start_offset);
345 ret.mongodb_split = Some(split);
346 }
347 CdcSourceType::SqlServer => {
348 let split = SqlServerCdcSplit::new(split_id, start_offset);
349 ret.sql_server_split = Some(split);
350 }
351 CdcSourceType::Unspecified => {
352 unreachable!("invalid debezium split")
353 }
354 }
355 ret
356 }
357
358 pub fn split_id(&self) -> u32 {
359 dispatch_cdc_split!(self, ref, split_id())
360 }
361
362 pub fn start_offset(&self) -> &Option<String> {
363 dispatch_cdc_split!(self, ref, start_offset())
364 }
365
366 pub fn snapshot_done(&self) -> bool {
367 dispatch_cdc_split!(self, ref, is_snapshot_done())
368 }
369
370 pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
371 dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
372 Ok(())
373 }
374}