1use risingwave_common::bail;
16
17use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
18use super::unified::kv_event::KvEvent;
19use super::{
20 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
21 SpecificParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::parser::bytes_parser::BytesAccessBuilder;
25use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
26use crate::parser::unified::AccessImpl;
27use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
28use crate::parser::upsert_parser::get_key_column_name;
29use crate::parser::{BytesProperties, ParseResult, ParserFormat};
30use crate::source::cdc::CdcMessageType;
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
32
33#[derive(Debug)]
35pub struct PlainParser {
36 pub key_builder: Option<AccessBuilderImpl>,
37 pub payload_builder: AccessBuilderImpl,
38 pub(crate) rw_columns: Vec<SourceColumnDesc>,
39 pub source_ctx: SourceContextRef,
40 pub transaction_meta_builder: Option<AccessBuilderImpl>,
42 pub schema_change_builder: Option<AccessBuilderImpl>,
43}
44
45impl PlainParser {
46 pub async fn new(
47 props: SpecificParserConfig,
48 rw_columns: Vec<SourceColumnDesc>,
49 source_ctx: SourceContextRef,
50 ) -> ConnectorResult<Self> {
51 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
52 Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
53 EncodingProperties::Bytes(BytesProperties {
54 column_name: Some(key_column_name),
55 }),
56 )?))
57 } else {
58 None
59 };
60
61 let payload_builder = match props.encoding_config {
62 EncodingProperties::Json(_)
63 | EncodingProperties::Protobuf(_)
64 | EncodingProperties::Avro(_)
65 | EncodingProperties::Bytes(_) => {
66 AccessBuilderImpl::new_default(props.encoding_config).await?
67 }
68 _ => bail!("Unsupported encoding for Plain"),
69 };
70
71 let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
72 DebeziumJsonAccessBuilder::new(
73 TimestamptzHandling::GuessNumberUnit,
74 TimestampHandling::GuessNumberUnit,
75 TimeHandling::Micro,
76 )?,
77 ));
78
79 let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
80 DebeziumJsonAccessBuilder::new_for_schema_event()?,
81 ));
82
83 Ok(Self {
84 key_builder,
85 payload_builder,
86 rw_columns,
87 source_ctx,
88 transaction_meta_builder,
89 schema_change_builder,
90 })
91 }
92
93 pub async fn parse_inner(
94 &mut self,
95 key: Option<Vec<u8>>,
96 payload: Option<Vec<u8>>,
97 writer: SourceStreamChunkRowWriter<'_>,
98 ) -> ConnectorResult<ParseResult> {
99 if let Some(msg_meta) = writer.row_meta()
102 && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
103 && let Some(data) = payload
104 {
105 match cdc_meta.msg_type {
106 CdcMessageType::Data | CdcMessageType::Heartbeat => {
107 return self.parse_rows(key, Some(data), writer).await;
108 }
109 CdcMessageType::TransactionMeta => {
110 let accessor = self
111 .transaction_meta_builder
112 .as_mut()
113 .expect("expect transaction metadata access builder")
114 .generate_accessor(data, writer.source_meta())
115 .await?;
116 return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
117 {
118 Ok(transaction_control) => {
119 Ok(ParseResult::TransactionControl(transaction_control))
120 }
121 Err(err) => Err(err)?,
122 };
123 }
124 CdcMessageType::SchemaChange => {
125 let accessor = self
126 .schema_change_builder
127 .as_mut()
128 .expect("expect schema change access builder")
129 .generate_accessor(data, writer.source_meta())
130 .await?;
131
132 return match parse_schema_change(
133 &accessor,
134 self.source_ctx.source_id.into(),
135 &self.source_ctx.connector_props,
136 ) {
137 Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
138 Err(err) => Err(err)?,
139 };
140 }
141 CdcMessageType::Unspecified => {
142 unreachable!()
143 }
144 }
145 }
146
147 self.parse_rows(key, payload, writer).await
149 }
150
151 async fn parse_rows(
152 &mut self,
153 key: Option<Vec<u8>>,
154 payload: Option<Vec<u8>>,
155 mut writer: SourceStreamChunkRowWriter<'_>,
156 ) -> ConnectorResult<ParseResult> {
157 let meta = writer.source_meta();
158 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
159
160 if let Some(data) = key
161 && let Some(key_builder) = self.key_builder.as_mut()
162 {
163 row_op.with_key(key_builder.generate_accessor(data, meta).await?);
165 }
166 if let Some(data) = payload {
167 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
169 }
170
171 writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
172
173 Ok(ParseResult::Rows)
174 }
175}
176
177impl ByteStreamSourceParser for PlainParser {
178 fn columns(&self) -> &[SourceColumnDesc] {
179 &self.rw_columns
180 }
181
182 fn source_ctx(&self) -> &SourceContext {
183 &self.source_ctx
184 }
185
186 fn parser_format(&self) -> ParserFormat {
187 ParserFormat::Plain
188 }
189
190 async fn parse_one<'a>(
191 &'a mut self,
192 _key: Option<Vec<u8>>,
193 _payload: Option<Vec<u8>>,
194 _writer: SourceStreamChunkRowWriter<'a>,
195 ) -> ConnectorResult<()> {
196 unreachable!("should call `parse_one_with_txn` instead")
197 }
198
199 async fn parse_one_with_txn<'a>(
200 &'a mut self,
201 key: Option<Vec<u8>>,
202 payload: Option<Vec<u8>>,
203 writer: SourceStreamChunkRowWriter<'a>,
204 ) -> ConnectorResult<ParseResult> {
205 self.parse_inner(key, payload, writer).await
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::ops::Deref;
212 use std::sync::Arc;
213
214 use expect_test::expect;
215 use futures::StreamExt;
216 use futures::executor::block_on;
217 use futures_async_stream::try_stream;
218 use itertools::Itertools;
219 use risingwave_common::catalog::ColumnCatalog;
220 use risingwave_pb::connector_service::cdc_message;
221
222 use super::*;
223 use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
224 use crate::source::cdc::DebeziumCdcMeta;
225 use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
226
227 #[tokio::test]
228 async fn test_emit_transactional_chunk() {
229 let schema = ColumnCatalog::debezium_cdc_source_cols();
230
231 let columns = schema
232 .iter()
233 .map(|c| SourceColumnDesc::from(&c.column_desc))
234 .collect::<Vec<_>>();
235
236 let source_ctx = SourceContext {
237 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
238 ..SourceContext::dummy()
239 };
240 let source_ctx = Arc::new(source_ctx);
241 let parser = PlainParser::new(
243 SpecificParserConfig::DEFAULT_PLAIN_JSON,
244 columns.clone(),
245 source_ctx.clone(),
246 )
247 .await
248 .unwrap();
249
250 let mut transactional = false;
251 let message_stream = source_message_stream(transactional);
253 let chunk_stream = crate::parser::parse_message_stream(
254 parser,
255 message_stream.boxed(),
256 SourceCtrlOpts::for_test(),
257 );
258 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
259 .into_iter()
260 .collect();
261 let output = output
262 .unwrap()
263 .into_iter()
264 .filter(|c| c.cardinality() > 0)
265 .enumerate()
266 .map(|(i, c)| {
267 if i == 0 {
268 assert_eq!(4, c.cardinality());
270 }
271 if i == 1 {
272 assert_eq!(3, c.cardinality());
274 }
275 c
276 })
277 .collect_vec();
278
279 assert_eq!(2, output.len());
281
282 let parser = PlainParser::new(
284 SpecificParserConfig::DEFAULT_PLAIN_JSON,
285 columns.clone(),
286 source_ctx,
287 )
288 .await
289 .unwrap();
290
291 transactional = true;
293 let message_stream = source_message_stream(transactional);
294 let chunk_stream = crate::parser::parse_message_stream(
295 parser,
296 message_stream.boxed(),
297 SourceCtrlOpts::for_test(),
298 );
299 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
300 .into_iter()
301 .collect();
302 let output = output
303 .unwrap()
304 .into_iter()
305 .filter(|c| c.cardinality() > 0)
306 .inspect(|c| {
307 assert_eq!(5, c.cardinality());
309 })
310 .collect_vec();
311
312 assert_eq!(1, output.len());
314 }
315
316 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
317 async fn source_message_stream(transactional: bool) {
318 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
319 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
320 let data_batches = [
321 vec![
322 r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
323 r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
324 r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
325 ],
326 vec![
327 r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
328 r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
329 ],
330 ];
331 for (i, batch) in data_batches.iter().enumerate() {
332 let mut source_msg_batch = vec![];
333 if i == 0 {
334 source_msg_batch.push(SourceMessage {
336 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
337 "orders".to_owned(),
338 0,
339 if transactional {
340 cdc_message::CdcMessageType::TransactionMeta
341 } else {
342 cdc_message::CdcMessageType::Data
343 },
344 )),
345 split_id: SplitId::from("1001"),
346 offset: "0".into(),
347 key: None,
348 payload: Some(begin_msg.as_bytes().to_vec()),
349 });
350 }
351 for data_msg in batch {
353 source_msg_batch.push(SourceMessage {
354 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
355 "orders".to_owned(),
356 0,
357 cdc_message::CdcMessageType::Data,
358 )),
359 split_id: SplitId::from("1001"),
360 offset: "0".into(),
361 key: None,
362 payload: Some(data_msg.as_bytes().to_vec()),
363 });
364 }
365 if i == data_batches.len() - 1 {
366 source_msg_batch.push(SourceMessage {
368 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
369 "orders".to_owned(),
370 0,
371 if transactional {
372 cdc_message::CdcMessageType::TransactionMeta
373 } else {
374 cdc_message::CdcMessageType::Data
375 },
376 )),
377 split_id: SplitId::from("1001"),
378 offset: "0".into(),
379 key: None,
380 payload: Some(commit_msg.as_bytes().to_vec()),
381 });
382 }
383 yield source_msg_batch;
384 }
385 }
386
387 #[tokio::test]
388 async fn test_parse_transaction_metadata() {
389 let schema = ColumnCatalog::debezium_cdc_source_cols();
390
391 let columns = schema
392 .iter()
393 .map(|c| SourceColumnDesc::from(&c.column_desc))
394 .collect::<Vec<_>>();
395
396 let source_ctx = SourceContext {
398 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
399 ..SourceContext::dummy()
400 };
401 let mut parser = PlainParser::new(
402 SpecificParserConfig::DEFAULT_PLAIN_JSON,
403 columns.clone(),
404 Arc::new(source_ctx),
405 )
406 .await
407 .unwrap();
408 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
409
410 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
412 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
413
414 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
415 "orders".to_owned(),
416 0,
417 cdc_message::CdcMessageType::TransactionMeta,
418 ));
419 let msg_meta = MessageMeta {
420 source_meta: &cdc_meta,
421 split_id: "1001",
422 offset: "",
423 };
424
425 let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
426 let res = parser
427 .parse_one_with_txn(
428 None,
429 Some(begin_msg.as_bytes().to_vec()),
430 builder.row_writer().with_meta(msg_meta),
431 )
432 .await;
433 match res {
434 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
435 assert_eq!(id.deref(), expect_tx_id);
436 }
437 _ => panic!("unexpected parse result: {:?}", res),
438 }
439 let res = parser
440 .parse_one_with_txn(
441 None,
442 Some(commit_msg.as_bytes().to_vec()),
443 builder.row_writer().with_meta(msg_meta),
444 )
445 .await;
446 match res {
447 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
448 assert_eq!(id.deref(), expect_tx_id);
449 }
450 _ => panic!("unexpected parse result: {:?}", res),
451 }
452
453 builder.finish_current_chunk();
454 assert!(builder.consume_ready_chunks().next().is_none());
455 }
456
457 #[tokio::test]
458 async fn test_parse_schema_change() {
459 let schema = ColumnCatalog::debezium_cdc_source_cols();
460
461 let columns = schema
462 .iter()
463 .map(|c| SourceColumnDesc::from(&c.column_desc))
464 .collect::<Vec<_>>();
465
466 let source_ctx = SourceContext {
468 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
469 ..SourceContext::dummy()
470 };
471 let mut parser = PlainParser::new(
472 SpecificParserConfig::DEFAULT_PLAIN_JSON,
473 columns.clone(),
474 Arc::new(source_ctx),
475 )
476 .await
477 .unwrap();
478 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
479
480 let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
481 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
482 "mydb.test".to_owned(),
483 0,
484 cdc_message::CdcMessageType::SchemaChange,
485 ));
486 let msg_meta = MessageMeta {
487 source_meta: &cdc_meta,
488 split_id: "1001",
489 offset: "",
490 };
491
492 let res = parser
493 .parse_one_with_txn(
494 None,
495 Some(msg.as_bytes().to_vec()),
496 dummy_builder.row_writer().with_meta(msg_meta),
497 )
498 .await;
499
500 let res = res.unwrap();
501 expect![[r#"
502 SchemaChange(
503 SchemaChangeEnvelope {
504 table_changes: [
505 TableSchemaChange {
506 cdc_table_id: "0.mydb.test",
507 columns: [
508 ColumnCatalog {
509 column_desc: ColumnDesc {
510 data_type: Int32,
511 column_id: #2147483646,
512 name: "id",
513 generated_or_default_column: None,
514 description: None,
515 additional_column: AdditionalColumn {
516 column_type: None,
517 },
518 version: Pr13707,
519 system_column: None,
520 nullable: true,
521 },
522 is_hidden: false,
523 },
524 ColumnCatalog {
525 column_desc: ColumnDesc {
526 data_type: Timestamptz,
527 column_id: #2147483646,
528 name: "v1",
529 generated_or_default_column: None,
530 description: None,
531 additional_column: AdditionalColumn {
532 column_type: None,
533 },
534 version: Pr13707,
535 system_column: None,
536 nullable: true,
537 },
538 is_hidden: false,
539 },
540 ColumnCatalog {
541 column_desc: ColumnDesc {
542 data_type: Varchar,
543 column_id: #2147483646,
544 name: "v2",
545 generated_or_default_column: None,
546 description: None,
547 additional_column: AdditionalColumn {
548 column_type: None,
549 },
550 version: Pr13707,
551 system_column: None,
552 nullable: true,
553 },
554 is_hidden: false,
555 },
556 ],
557 change_type: Alter,
558 upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
559 },
560 ],
561 },
562 )
563 "#]]
564 .assert_debug_eq(&res);
565 }
566}