1use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
19use super::unified::kv_event::KvEvent;
20use super::{
21 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
22 SpecificParserConfig,
23};
24use crate::error::ConnectorResult;
25use crate::parser::bytes_parser::BytesAccessBuilder;
26use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
27use crate::parser::unified::AccessImpl;
28use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
29use crate::parser::upsert_parser::get_key_column_name;
30use crate::parser::{BytesProperties, ParseResult, ParserFormat};
31use crate::source::cdc::CdcMessageType;
32use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
33
34#[derive(Debug)]
36pub struct PlainParser {
37 pub key_builder: Option<AccessBuilderImpl>,
38 pub payload_builder: AccessBuilderImpl,
39 pub(crate) rw_columns: Vec<SourceColumnDesc>,
40 pub source_ctx: SourceContextRef,
41 pub transaction_meta_builder: Option<AccessBuilderImpl>,
43 pub schema_change_builder: Option<AccessBuilderImpl>,
44}
45
46impl PlainParser {
47 pub async fn new(
48 props: SpecificParserConfig,
49 rw_columns: Vec<SourceColumnDesc>,
50 source_ctx: SourceContextRef,
51 ) -> ConnectorResult<Self> {
52 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
53 Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
54 EncodingProperties::Bytes(BytesProperties {
55 column_name: Some(key_column_name),
56 }),
57 )?))
58 } else {
59 None
60 };
61
62 let payload_builder = match props.encoding_config {
63 EncodingProperties::Json(_)
64 | EncodingProperties::Protobuf(_)
65 | EncodingProperties::Avro(_)
66 | EncodingProperties::Bytes(_) => {
67 AccessBuilderImpl::new_default(props.encoding_config).await?
68 }
69 _ => bail!("Unsupported encoding for Plain"),
70 };
71
72 let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
73 DebeziumJsonAccessBuilder::new(
74 TimestamptzHandling::GuessNumberUnit,
75 TimestampHandling::GuessNumberUnit,
76 TimeHandling::Micro,
77 false,
78 )?,
79 ));
80
81 let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
82 DebeziumJsonAccessBuilder::new_for_schema_event()?,
83 ));
84
85 Ok(Self {
86 key_builder,
87 payload_builder,
88 rw_columns,
89 source_ctx,
90 transaction_meta_builder,
91 schema_change_builder,
92 })
93 }
94
95 pub async fn parse_inner(
96 &mut self,
97 key: Option<Vec<u8>>,
98 payload: Option<Vec<u8>>,
99 writer: SourceStreamChunkRowWriter<'_>,
100 ) -> ConnectorResult<ParseResult> {
101 if let Some(msg_meta) = writer.row_meta()
104 && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
105 && let Some(data) = payload
106 {
107 match cdc_meta.msg_type {
108 CdcMessageType::Data | CdcMessageType::Heartbeat => {
109 return self.parse_rows(key, Some(data), writer).await;
110 }
111 CdcMessageType::TransactionMeta => {
112 let accessor = self
113 .transaction_meta_builder
114 .as_mut()
115 .expect("expect transaction metadata access builder")
116 .generate_accessor(data, writer.source_meta())
117 .await?;
118 return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
119 {
120 Ok(transaction_control) => {
121 Ok(ParseResult::TransactionControl(transaction_control))
122 }
123 Err(err) => Err(err)?,
124 };
125 }
126 CdcMessageType::SchemaChange => {
127 let accessor = self
128 .schema_change_builder
129 .as_mut()
130 .expect("expect schema change access builder")
131 .generate_accessor(data, writer.source_meta())
132 .await?;
133
134 return match parse_schema_change(
135 &accessor,
136 self.source_ctx.source_id.into(),
137 &self.source_ctx.source_name,
138 &self.source_ctx.connector_props,
139 ) {
140 Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
141 Err(err) => {
142 let (fail_info, table_name, cdc_table_id) = match &err {
144 crate::parser::AccessError::CdcAutoSchemaChangeError {
145 ty,
146 table_name,
147 ..
148 } => {
149 let clean_table_name =
151 table_name.trim_matches('"').replace("\".\"", ".");
152 let fail_info = format!(
153 "Unsupported data type '{}' in source '{}' table '{}'",
154 ty, self.source_ctx.source_name, clean_table_name
155 );
156 let cdc_table_id = format!(
158 "{}.{}",
159 self.source_ctx.source_name, clean_table_name
160 );
161
162 (fail_info, clean_table_name, cdc_table_id)
163 }
164 _ => {
165 let fail_info = format!(
166 "Failed to parse schema change: {:?}, source: {}",
167 err.as_report(),
168 self.source_ctx.source_name
169 );
170 (fail_info, "".to_owned(), "".to_owned())
171 }
172 };
173 self.source_ctx.on_cdc_auto_schema_change_failure(
174 self.source_ctx.source_id.table_id,
175 table_name,
176 cdc_table_id,
177 "".to_owned(), fail_info,
179 );
180
181 Err(err)?
182 }
183 };
184 }
185 CdcMessageType::Unspecified => {
186 unreachable!()
187 }
188 }
189 }
190
191 self.parse_rows(key, payload, writer).await
193 }
194
195 async fn parse_rows(
196 &mut self,
197 key: Option<Vec<u8>>,
198 payload: Option<Vec<u8>>,
199 mut writer: SourceStreamChunkRowWriter<'_>,
200 ) -> ConnectorResult<ParseResult> {
201 let meta = writer.source_meta();
202 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
203
204 if let Some(data) = key
205 && let Some(key_builder) = self.key_builder.as_mut()
206 {
207 row_op.with_key(key_builder.generate_accessor(data, meta).await?);
209 }
210 if let Some(data) = payload {
211 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
213 }
214
215 writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
216
217 Ok(ParseResult::Rows)
218 }
219}
220
221impl ByteStreamSourceParser for PlainParser {
222 fn columns(&self) -> &[SourceColumnDesc] {
223 &self.rw_columns
224 }
225
226 fn source_ctx(&self) -> &SourceContext {
227 &self.source_ctx
228 }
229
230 fn parser_format(&self) -> ParserFormat {
231 ParserFormat::Plain
232 }
233
234 async fn parse_one<'a>(
235 &'a mut self,
236 _key: Option<Vec<u8>>,
237 _payload: Option<Vec<u8>>,
238 _writer: SourceStreamChunkRowWriter<'a>,
239 ) -> ConnectorResult<()> {
240 unreachable!("should call `parse_one_with_txn` instead")
241 }
242
243 async fn parse_one_with_txn<'a>(
244 &'a mut self,
245 key: Option<Vec<u8>>,
246 payload: Option<Vec<u8>>,
247 writer: SourceStreamChunkRowWriter<'a>,
248 ) -> ConnectorResult<ParseResult> {
249 self.parse_inner(key, payload, writer).await
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use std::ops::Deref;
256 use std::sync::Arc;
257
258 use expect_test::expect;
259 use futures::StreamExt;
260 use futures::executor::block_on;
261 use futures_async_stream::try_stream;
262 use itertools::Itertools;
263 use risingwave_common::catalog::ColumnCatalog;
264 use risingwave_pb::connector_service::cdc_message;
265
266 use super::*;
267 use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
268 use crate::source::cdc::DebeziumCdcMeta;
269 use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
270
271 #[tokio::test]
272 async fn test_emit_transactional_chunk() {
273 let schema = ColumnCatalog::debezium_cdc_source_cols();
274
275 let columns = schema
276 .iter()
277 .map(|c| SourceColumnDesc::from(&c.column_desc))
278 .collect::<Vec<_>>();
279
280 let source_ctx = SourceContext {
281 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
282 ..SourceContext::dummy()
283 };
284 let source_ctx = Arc::new(source_ctx);
285 let parser = PlainParser::new(
287 SpecificParserConfig::DEFAULT_PLAIN_JSON,
288 columns.clone(),
289 source_ctx.clone(),
290 )
291 .await
292 .unwrap();
293
294 let mut transactional = false;
295 let message_stream = source_message_stream(transactional);
297 let chunk_stream = crate::parser::parse_message_stream(
298 parser,
299 message_stream.boxed(),
300 SourceCtrlOpts::for_test(),
301 );
302 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
303 .into_iter()
304 .collect();
305 let output = output
306 .unwrap()
307 .into_iter()
308 .filter(|c| c.cardinality() > 0)
309 .enumerate()
310 .map(|(i, c)| {
311 if i == 0 {
312 assert_eq!(4, c.cardinality());
314 }
315 if i == 1 {
316 assert_eq!(3, c.cardinality());
318 }
319 c
320 })
321 .collect_vec();
322
323 assert_eq!(2, output.len());
325
326 let parser = PlainParser::new(
328 SpecificParserConfig::DEFAULT_PLAIN_JSON,
329 columns.clone(),
330 source_ctx,
331 )
332 .await
333 .unwrap();
334
335 transactional = true;
337 let message_stream = source_message_stream(transactional);
338 let chunk_stream = crate::parser::parse_message_stream(
339 parser,
340 message_stream.boxed(),
341 SourceCtrlOpts::for_test(),
342 );
343 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
344 .into_iter()
345 .collect();
346 let output = output
347 .unwrap()
348 .into_iter()
349 .filter(|c| c.cardinality() > 0)
350 .inspect(|c| {
351 assert_eq!(5, c.cardinality());
353 })
354 .collect_vec();
355
356 assert_eq!(1, output.len());
358 }
359
360 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
361 async fn source_message_stream(transactional: bool) {
362 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
363 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
364 let data_batches = [
365 vec![
366 r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
367 r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
368 r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
369 ],
370 vec![
371 r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
372 r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
373 ],
374 ];
375 for (i, batch) in data_batches.iter().enumerate() {
376 let mut source_msg_batch = vec![];
377 if i == 0 {
378 source_msg_batch.push(SourceMessage {
380 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
381 "orders".to_owned(),
382 0,
383 if transactional {
384 cdc_message::CdcMessageType::TransactionMeta
385 } else {
386 cdc_message::CdcMessageType::Data
387 },
388 )),
389 split_id: SplitId::from("1001"),
390 offset: "0".into(),
391 key: None,
392 payload: Some(begin_msg.as_bytes().to_vec()),
393 });
394 }
395 for data_msg in batch {
397 source_msg_batch.push(SourceMessage {
398 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
399 "orders".to_owned(),
400 0,
401 cdc_message::CdcMessageType::Data,
402 )),
403 split_id: SplitId::from("1001"),
404 offset: "0".into(),
405 key: None,
406 payload: Some(data_msg.as_bytes().to_vec()),
407 });
408 }
409 if i == data_batches.len() - 1 {
410 source_msg_batch.push(SourceMessage {
412 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
413 "orders".to_owned(),
414 0,
415 if transactional {
416 cdc_message::CdcMessageType::TransactionMeta
417 } else {
418 cdc_message::CdcMessageType::Data
419 },
420 )),
421 split_id: SplitId::from("1001"),
422 offset: "0".into(),
423 key: None,
424 payload: Some(commit_msg.as_bytes().to_vec()),
425 });
426 }
427 yield source_msg_batch;
428 }
429 }
430
431 #[tokio::test]
432 async fn test_parse_transaction_metadata() {
433 let schema = ColumnCatalog::debezium_cdc_source_cols();
434
435 let columns = schema
436 .iter()
437 .map(|c| SourceColumnDesc::from(&c.column_desc))
438 .collect::<Vec<_>>();
439
440 let source_ctx = SourceContext {
442 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
443 ..SourceContext::dummy()
444 };
445 let mut parser = PlainParser::new(
446 SpecificParserConfig::DEFAULT_PLAIN_JSON,
447 columns.clone(),
448 Arc::new(source_ctx),
449 )
450 .await
451 .unwrap();
452 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
453
454 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
456 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
457
458 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
459 "orders".to_owned(),
460 0,
461 cdc_message::CdcMessageType::TransactionMeta,
462 ));
463 let msg_meta = MessageMeta {
464 source_meta: &cdc_meta,
465 split_id: "1001",
466 offset: "",
467 };
468
469 let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
470 let res = parser
471 .parse_one_with_txn(
472 None,
473 Some(begin_msg.as_bytes().to_vec()),
474 builder.row_writer().with_meta(msg_meta),
475 )
476 .await;
477 match res {
478 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
479 assert_eq!(id.deref(), expect_tx_id);
480 }
481 _ => panic!("unexpected parse result: {:?}", res),
482 }
483 let res = parser
484 .parse_one_with_txn(
485 None,
486 Some(commit_msg.as_bytes().to_vec()),
487 builder.row_writer().with_meta(msg_meta),
488 )
489 .await;
490 match res {
491 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
492 assert_eq!(id.deref(), expect_tx_id);
493 }
494 _ => panic!("unexpected parse result: {:?}", res),
495 }
496
497 builder.finish_current_chunk();
498 assert!(builder.consume_ready_chunks().next().is_none());
499 }
500
501 #[tokio::test]
502 async fn test_parse_schema_change() {
503 let schema = ColumnCatalog::debezium_cdc_source_cols();
504
505 let columns = schema
506 .iter()
507 .map(|c| SourceColumnDesc::from(&c.column_desc))
508 .collect::<Vec<_>>();
509
510 let source_ctx = SourceContext {
512 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
513 ..SourceContext::dummy()
514 };
515 let mut parser = PlainParser::new(
516 SpecificParserConfig::DEFAULT_PLAIN_JSON,
517 columns.clone(),
518 Arc::new(source_ctx),
519 )
520 .await
521 .unwrap();
522 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
523
524 let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
525 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
526 "mydb.test".to_owned(),
527 0,
528 cdc_message::CdcMessageType::SchemaChange,
529 ));
530 let msg_meta = MessageMeta {
531 source_meta: &cdc_meta,
532 split_id: "1001",
533 offset: "",
534 };
535
536 let res = parser
537 .parse_one_with_txn(
538 None,
539 Some(msg.as_bytes().to_vec()),
540 dummy_builder.row_writer().with_meta(msg_meta),
541 )
542 .await;
543
544 let res = res.unwrap();
545 expect![[r#"
546 SchemaChange(
547 SchemaChangeEnvelope {
548 table_changes: [
549 TableSchemaChange {
550 cdc_table_id: "0.mydb.test",
551 columns: [
552 ColumnCatalog {
553 column_desc: ColumnDesc {
554 data_type: Int32,
555 column_id: #2147483646,
556 name: "id",
557 generated_or_default_column: None,
558 description: None,
559 additional_column: AdditionalColumn {
560 column_type: None,
561 },
562 version: Pr13707,
563 system_column: None,
564 nullable: true,
565 },
566 is_hidden: false,
567 },
568 ColumnCatalog {
569 column_desc: ColumnDesc {
570 data_type: Timestamptz,
571 column_id: #2147483646,
572 name: "v1",
573 generated_or_default_column: None,
574 description: None,
575 additional_column: AdditionalColumn {
576 column_type: None,
577 },
578 version: Pr13707,
579 system_column: None,
580 nullable: true,
581 },
582 is_hidden: false,
583 },
584 ColumnCatalog {
585 column_desc: ColumnDesc {
586 data_type: Varchar,
587 column_id: #2147483646,
588 name: "v2",
589 generated_or_default_column: None,
590 description: None,
591 additional_column: AdditionalColumn {
592 column_type: None,
593 },
594 version: Pr13707,
595 system_column: None,
596 nullable: true,
597 },
598 is_hidden: false,
599 },
600 ],
601 change_type: Alter,
602 upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
603 },
604 ],
605 },
606 )
607 "#]]
608 .assert_debug_eq(&res);
609 }
610}