risingwave_connector/source/cdc/
split.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use risingwave_common::types::JsonbVal;
19use serde::{Deserialize, Serialize};
20
21use crate::error::ConnectorResult;
22use crate::source::cdc::external::DebeziumOffset;
23use crate::source::cdc::{CdcSourceType, CdcSourceTypeTrait};
24use crate::source::{SplitId, SplitMetaData};
25
26#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
29pub struct CdcSplitBase {
30 pub split_id: u32,
31 pub start_offset: Option<String>,
32 pub snapshot_done: bool,
34}
35
36impl CdcSplitBase {
37 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
38 Self {
39 split_id,
40 start_offset,
41 snapshot_done: false,
42 }
43 }
44}
45
46trait CdcSplitTrait: Send + Sync {
47 fn split_id(&self) -> u32;
48 fn start_offset(&self) -> &Option<String>;
49 fn is_snapshot_done(&self) -> bool;
50 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()>;
51
52 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
54 let mut snapshot_done = self.is_snapshot_done();
56 if snapshot_done {
57 return Ok(snapshot_done);
58 }
59
60 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
61 format!(
62 "invalid cdc offset: {}, split: {}",
63 start_offset,
64 self.split_id()
65 )
66 })?;
67
68 if !dbz_offset.is_heartbeat {
70 snapshot_done = match dbz_offset.source_offset.snapshot {
71 Some(val) => !val,
72 None => true,
73 };
74 }
75 Ok(snapshot_done)
76 }
77}
78
79#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
80pub struct MySqlCdcSplit {
81 pub inner: CdcSplitBase,
82}
83
84#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
85pub struct PostgresCdcSplit {
86 pub inner: CdcSplitBase,
87 pub server_addr: Option<String>,
89}
90
91#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
92pub struct MongoDbCdcSplit {
93 pub inner: CdcSplitBase,
94}
95
96#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
97pub struct SqlServerCdcSplit {
98 pub inner: CdcSplitBase,
99}
100
101impl MySqlCdcSplit {
102 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
103 let split = CdcSplitBase {
104 split_id,
105 start_offset,
106 snapshot_done: false,
107 };
108 Self { inner: split }
109 }
110}
111
112impl CdcSplitTrait for MySqlCdcSplit {
113 fn split_id(&self) -> u32 {
114 self.inner.split_id
115 }
116
117 fn start_offset(&self) -> &Option<String> {
118 &self.inner.start_offset
119 }
120
121 fn is_snapshot_done(&self) -> bool {
122 self.inner.snapshot_done
123 }
124
125 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
126 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
128 self.inner.start_offset = Some(last_seen_offset);
129 Ok(())
130 }
131}
132
133impl PostgresCdcSplit {
134 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
135 let split = CdcSplitBase {
136 split_id,
137 start_offset,
138 snapshot_done: false,
139 };
140 Self {
141 inner: split,
142 server_addr,
143 }
144 }
145}
146
147impl CdcSplitTrait for PostgresCdcSplit {
148 fn split_id(&self) -> u32 {
149 self.inner.split_id
150 }
151
152 fn start_offset(&self) -> &Option<String> {
153 &self.inner.start_offset
154 }
155
156 fn is_snapshot_done(&self) -> bool {
157 self.inner.snapshot_done
158 }
159
160 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
161 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
162 self.inner.start_offset = Some(last_seen_offset);
163 Ok(())
164 }
165
166 fn extract_snapshot_flag(&self, start_offset: &str) -> ConnectorResult<bool> {
167 let mut snapshot_done = self.is_snapshot_done();
169 if snapshot_done {
170 return Ok(snapshot_done);
171 }
172
173 let dbz_offset: DebeziumOffset = serde_json::from_str(start_offset).with_context(|| {
174 format!(
175 "invalid postgres offset: {}, split: {}",
176 start_offset, self.inner.split_id
177 )
178 })?;
179
180 if !dbz_offset.is_heartbeat {
182 snapshot_done = dbz_offset
183 .source_offset
184 .last_snapshot_record
185 .unwrap_or(false);
186 }
187 Ok(snapshot_done)
188 }
189}
190
191impl MongoDbCdcSplit {
192 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
193 let split = CdcSplitBase {
194 split_id,
195 start_offset,
196 snapshot_done: false,
197 };
198 Self { inner: split }
199 }
200}
201
202impl CdcSplitTrait for MongoDbCdcSplit {
203 fn split_id(&self) -> u32 {
204 self.inner.split_id
205 }
206
207 fn start_offset(&self) -> &Option<String> {
208 &self.inner.start_offset
209 }
210
211 fn is_snapshot_done(&self) -> bool {
212 self.inner.snapshot_done
213 }
214
215 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
216 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
218 self.inner.start_offset = Some(last_seen_offset);
219 Ok(())
220 }
221}
222
223impl SqlServerCdcSplit {
224 pub fn new(split_id: u32, start_offset: Option<String>) -> Self {
225 let split = CdcSplitBase {
226 split_id,
227 start_offset,
228 snapshot_done: false,
229 };
230 Self { inner: split }
231 }
232}
233
234impl CdcSplitTrait for SqlServerCdcSplit {
235 fn split_id(&self) -> u32 {
236 self.inner.split_id
237 }
238
239 fn start_offset(&self) -> &Option<String> {
240 &self.inner.start_offset
241 }
242
243 fn is_snapshot_done(&self) -> bool {
244 self.inner.snapshot_done
245 }
246
247 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
248 self.inner.snapshot_done = self.extract_snapshot_flag(last_seen_offset.as_str())?;
250 self.inner.start_offset = Some(last_seen_offset);
251 Ok(())
252 }
253}
254
255#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)]
257pub struct DebeziumCdcSplit<T: CdcSourceTypeTrait> {
258 pub mysql_split: Option<MySqlCdcSplit>,
259
260 #[serde(rename = "pg_split")] pub postgres_split: Option<PostgresCdcSplit>,
262 pub citus_split: Option<PostgresCdcSplit>,
263 pub mongodb_split: Option<MongoDbCdcSplit>,
264 pub sql_server_split: Option<SqlServerCdcSplit>,
265
266 #[serde(skip)]
267 pub _phantom: PhantomData<T>,
268}
269
270macro_rules! dispatch_cdc_split_inner {
271 ($dbz_split:expr, $as_type:tt, {$({$cdc_source_type:tt, $cdc_source_split:tt}),*}, $body:expr) => {
272 match T::source_type() {
273 $(
274 CdcSourceType::$cdc_source_type => {
275 $crate::paste! {
276 $dbz_split.[<$cdc_source_split>]
277 .[<as_ $as_type>]()
278 .expect(concat!(stringify!([<$cdc_source_type:lower>]), " split must exist"))
279 .$body
280 }
281 }
282 )*
283 CdcSourceType::Unspecified => {
284 unreachable!("invalid debezium split");
285 }
286 }
287 }
288}
289
290macro_rules! dispatch_cdc_split {
292 ($dbz_split:expr, $as_type:tt, $body:expr) => {
293 dispatch_cdc_split_inner!($dbz_split, $as_type, {
294 {Mysql, mysql_split},
295 {Postgres, postgres_split},
296 {Citus, citus_split},
297 {Mongodb, mongodb_split},
298 {SqlServer, sql_server_split}
299 }, $body)
300 }
301}
302
303impl<T: CdcSourceTypeTrait> SplitMetaData for DebeziumCdcSplit<T> {
304 fn id(&self) -> SplitId {
305 format!("{}", self.split_id()).into()
306 }
307
308 fn encode_to_json(&self) -> JsonbVal {
309 serde_json::to_value(self.clone()).unwrap().into()
310 }
311
312 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
313 serde_json::from_value(value.take()).map_err(Into::into)
314 }
315
316 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
317 self.update_offset_inner(last_seen_offset)
318 }
319}
320
321impl<T: CdcSourceTypeTrait> DebeziumCdcSplit<T> {
322 pub fn new(split_id: u32, start_offset: Option<String>, server_addr: Option<String>) -> Self {
323 let mut ret = Self {
324 mysql_split: None,
325 postgres_split: None,
326 citus_split: None,
327 mongodb_split: None,
328 sql_server_split: None,
329 _phantom: PhantomData,
330 };
331 match T::source_type() {
332 CdcSourceType::Mysql => {
333 let split = MySqlCdcSplit::new(split_id, start_offset);
334 ret.mysql_split = Some(split);
335 }
336 CdcSourceType::Postgres => {
337 let split = PostgresCdcSplit::new(split_id, start_offset, None);
338 ret.postgres_split = Some(split);
339 }
340 CdcSourceType::Citus => {
341 let split = PostgresCdcSplit::new(split_id, start_offset, server_addr);
342 ret.citus_split = Some(split);
343 }
344 CdcSourceType::Mongodb => {
345 let split = MongoDbCdcSplit::new(split_id, start_offset);
346 ret.mongodb_split = Some(split);
347 }
348 CdcSourceType::SqlServer => {
349 let split = SqlServerCdcSplit::new(split_id, start_offset);
350 ret.sql_server_split = Some(split);
351 }
352 CdcSourceType::Unspecified => {
353 unreachable!("invalid debezium split")
354 }
355 }
356 ret
357 }
358
359 pub fn split_id(&self) -> u32 {
360 dispatch_cdc_split!(self, ref, split_id())
361 }
362
363 pub fn start_offset(&self) -> &Option<String> {
364 dispatch_cdc_split!(self, ref, start_offset())
365 }
366
367 pub fn snapshot_done(&self) -> bool {
368 dispatch_cdc_split!(self, ref, is_snapshot_done())
369 }
370
371 pub fn update_offset_inner(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
372 dispatch_cdc_split!(self, mut, update_offset(last_seen_offset)?);
373 Ok(())
374 }
375}