1use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24 SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36#[derive(Debug)]
38pub struct PlainParser {
39 pub key_builder: Option<AccessBuilderImpl>,
40 pub payload_builder: AccessBuilderImpl,
41 pub(crate) rw_columns: Vec<SourceColumnDesc>,
42 pub source_ctx: SourceContextRef,
43 pub transaction_meta_builder: Option<AccessBuilderImpl>,
45 pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49 pub async fn new(
50 props: SpecificParserConfig,
51 rw_columns: Vec<SourceColumnDesc>,
52 source_ctx: SourceContextRef,
53 ) -> ConnectorResult<Self> {
54 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55 Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56 EncodingProperties::Bytes(BytesProperties {
57 column_name: Some(key_column_name),
58 }),
59 )?))
60 } else {
61 None
62 };
63
64 let payload_builder = match props.encoding_config {
65 EncodingProperties::Json(_)
66 | EncodingProperties::Protobuf(_)
67 | EncodingProperties::Avro(_)
68 | EncodingProperties::Bytes(_) => {
69 AccessBuilderImpl::new_default(props.encoding_config).await?
70 }
71 _ => bail!("Unsupported encoding for Plain"),
72 };
73
74 let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75 DebeziumJsonAccessBuilder::new(
76 TimestamptzHandling::GuessNumberUnit,
77 TimestampHandling::GuessNumberUnit,
78 TimeHandling::Micro,
79 BigintUnsignedHandlingMode::Long,
80 false,
81 )?,
82 ));
83
84 let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85 DebeziumJsonAccessBuilder::new_for_schema_event()?,
86 ));
87
88 Ok(Self {
89 key_builder,
90 payload_builder,
91 rw_columns,
92 source_ctx,
93 transaction_meta_builder,
94 schema_change_builder,
95 })
96 }
97
98 pub async fn parse_inner(
99 &mut self,
100 key: Option<Vec<u8>>,
101 payload: Option<Vec<u8>>,
102 writer: SourceStreamChunkRowWriter<'_>,
103 ) -> ConnectorResult<ParseResult> {
104 if let Some(msg_meta) = writer.row_meta()
107 && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108 && let Some(data) = payload
109 {
110 match cdc_meta.msg_type {
111 CdcMessageType::Data | CdcMessageType::Heartbeat => {
112 return self.parse_rows(key, Some(data), writer).await;
113 }
114 CdcMessageType::TransactionMeta => {
115 let accessor = self
116 .transaction_meta_builder
117 .as_mut()
118 .expect("expect transaction metadata access builder")
119 .generate_accessor(data, writer.source_meta())
120 .await?;
121 return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122 {
123 Ok(transaction_control) => {
124 Ok(ParseResult::TransactionControl(transaction_control))
125 }
126 Err(err) => Err(err)?,
127 };
128 }
129 CdcMessageType::SchemaChange => {
130 let accessor = self
131 .schema_change_builder
132 .as_mut()
133 .expect("expect schema change access builder")
134 .generate_accessor(data, writer.source_meta())
135 .await?;
136
137 return match parse_schema_change(
138 &accessor,
139 self.source_ctx.source_id.into(),
140 &self.source_ctx.source_name,
141 &self.source_ctx.connector_props,
142 ) {
143 Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
144 Err(err) => {
145 let (fail_info, table_name, cdc_table_id) = match &err {
147 crate::parser::AccessError::CdcAutoSchemaChangeError {
148 ty,
149 table_name,
150 ..
151 } => {
152 let clean_table_name =
154 table_name.trim_matches('"').replace("\".\"", ".");
155 let fail_info = format!(
156 "Unsupported data type '{}' in source '{}' table '{}'",
157 ty, self.source_ctx.source_name, clean_table_name
158 );
159 let cdc_table_id = format!(
161 "{}.{}",
162 self.source_ctx.source_name, clean_table_name
163 );
164
165 (fail_info, clean_table_name, cdc_table_id)
166 }
167 _ => {
168 let fail_info = format!(
169 "Failed to parse schema change: {:?}, source: {}",
170 err.as_report(),
171 self.source_ctx.source_name
172 );
173 (fail_info, "".to_owned(), "".to_owned())
174 }
175 };
176 self.source_ctx.on_cdc_auto_schema_change_failure(
177 self.source_ctx.source_id.as_raw_id(),
178 table_name,
179 cdc_table_id,
180 "".to_owned(), fail_info,
182 );
183
184 Err(err)?
185 }
186 };
187 }
188 CdcMessageType::Unspecified => {
189 unreachable!()
190 }
191 }
192 }
193
194 self.parse_rows(key, payload, writer).await
196 }
197
198 async fn parse_rows(
199 &mut self,
200 key: Option<Vec<u8>>,
201 payload: Option<Vec<u8>>,
202 mut writer: SourceStreamChunkRowWriter<'_>,
203 ) -> ConnectorResult<ParseResult> {
204 let meta = writer.source_meta();
205 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
206
207 if let Some(data) = key
208 && let Some(key_builder) = self.key_builder.as_mut()
209 {
210 row_op.with_key(key_builder.generate_accessor(data, meta).await?);
212 }
213 if let Some(data) = payload {
214 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
216 }
217
218 writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
219
220 Ok(ParseResult::Rows)
221 }
222}
223
224impl ByteStreamSourceParser for PlainParser {
225 fn columns(&self) -> &[SourceColumnDesc] {
226 &self.rw_columns
227 }
228
229 fn source_ctx(&self) -> &SourceContext {
230 &self.source_ctx
231 }
232
233 fn parser_format(&self) -> ParserFormat {
234 ParserFormat::Plain
235 }
236
237 async fn parse_one<'a>(
238 &'a mut self,
239 _key: Option<Vec<u8>>,
240 _payload: Option<Vec<u8>>,
241 _writer: SourceStreamChunkRowWriter<'a>,
242 ) -> ConnectorResult<()> {
243 unreachable!("should call `parse_one_with_txn` instead")
244 }
245
246 async fn parse_one_with_txn<'a>(
247 &'a mut self,
248 key: Option<Vec<u8>>,
249 payload: Option<Vec<u8>>,
250 writer: SourceStreamChunkRowWriter<'a>,
251 ) -> ConnectorResult<ParseResult> {
252 self.parse_inner(key, payload, writer).await
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use std::ops::Deref;
259 use std::sync::Arc;
260
261 use expect_test::expect;
262 use futures::StreamExt;
263 use futures::executor::block_on;
264 use futures_async_stream::try_stream;
265 use itertools::Itertools;
266 use risingwave_common::catalog::ColumnCatalog;
267 use risingwave_pb::connector_service::cdc_message;
268
269 use super::*;
270 use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
271 use crate::source::cdc::DebeziumCdcMeta;
272 use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
273
274 #[tokio::test]
275 async fn test_emit_transactional_chunk() {
276 let schema = ColumnCatalog::debezium_cdc_source_cols();
277
278 let columns = schema
279 .iter()
280 .map(|c| SourceColumnDesc::from(&c.column_desc))
281 .collect::<Vec<_>>();
282
283 let source_ctx = SourceContext {
284 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
285 ..SourceContext::dummy()
286 };
287 let source_ctx = Arc::new(source_ctx);
288 let parser = PlainParser::new(
290 SpecificParserConfig::DEFAULT_PLAIN_JSON,
291 columns.clone(),
292 source_ctx.clone(),
293 )
294 .await
295 .unwrap();
296
297 let mut transactional = false;
298 let message_stream = source_message_stream(transactional);
300 let chunk_stream = crate::parser::parse_message_stream(
301 parser,
302 message_stream.boxed(),
303 SourceCtrlOpts::for_test(),
304 );
305 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
306 .into_iter()
307 .collect();
308 let output = output
309 .unwrap()
310 .into_iter()
311 .filter(|c| c.cardinality() > 0)
312 .enumerate()
313 .map(|(i, c)| {
314 if i == 0 {
315 assert_eq!(4, c.cardinality());
317 }
318 if i == 1 {
319 assert_eq!(3, c.cardinality());
321 }
322 c
323 })
324 .collect_vec();
325
326 assert_eq!(2, output.len());
328
329 let parser = PlainParser::new(
331 SpecificParserConfig::DEFAULT_PLAIN_JSON,
332 columns.clone(),
333 source_ctx,
334 )
335 .await
336 .unwrap();
337
338 transactional = true;
340 let message_stream = source_message_stream(transactional);
341 let chunk_stream = crate::parser::parse_message_stream(
342 parser,
343 message_stream.boxed(),
344 SourceCtrlOpts::for_test(),
345 );
346 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
347 .into_iter()
348 .collect();
349 let output = output
350 .unwrap()
351 .into_iter()
352 .filter(|c| c.cardinality() > 0)
353 .inspect(|c| {
354 assert_eq!(5, c.cardinality());
356 })
357 .collect_vec();
358
359 assert_eq!(1, output.len());
361 }
362
363 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
364 async fn source_message_stream(transactional: bool) {
365 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
366 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
367 let data_batches = [
368 vec![
369 r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
370 r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
371 r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
372 ],
373 vec![
374 r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
375 r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
376 ],
377 ];
378 for (i, batch) in data_batches.iter().enumerate() {
379 let mut source_msg_batch = vec![];
380 if i == 0 {
381 source_msg_batch.push(SourceMessage {
383 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
384 "orders".to_owned(),
385 0,
386 if transactional {
387 cdc_message::CdcMessageType::TransactionMeta
388 } else {
389 cdc_message::CdcMessageType::Data
390 },
391 )),
392 split_id: SplitId::from("1001"),
393 offset: "0".into(),
394 key: None,
395 payload: Some(begin_msg.as_bytes().to_vec()),
396 });
397 }
398 for data_msg in batch {
400 source_msg_batch.push(SourceMessage {
401 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
402 "orders".to_owned(),
403 0,
404 cdc_message::CdcMessageType::Data,
405 )),
406 split_id: SplitId::from("1001"),
407 offset: "0".into(),
408 key: None,
409 payload: Some(data_msg.as_bytes().to_vec()),
410 });
411 }
412 if i == data_batches.len() - 1 {
413 source_msg_batch.push(SourceMessage {
415 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
416 "orders".to_owned(),
417 0,
418 if transactional {
419 cdc_message::CdcMessageType::TransactionMeta
420 } else {
421 cdc_message::CdcMessageType::Data
422 },
423 )),
424 split_id: SplitId::from("1001"),
425 offset: "0".into(),
426 key: None,
427 payload: Some(commit_msg.as_bytes().to_vec()),
428 });
429 }
430 yield source_msg_batch;
431 }
432 }
433
434 #[tokio::test]
435 async fn test_parse_transaction_metadata() {
436 let schema = ColumnCatalog::debezium_cdc_source_cols();
437
438 let columns = schema
439 .iter()
440 .map(|c| SourceColumnDesc::from(&c.column_desc))
441 .collect::<Vec<_>>();
442
443 let source_ctx = SourceContext {
445 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
446 ..SourceContext::dummy()
447 };
448 let mut parser = PlainParser::new(
449 SpecificParserConfig::DEFAULT_PLAIN_JSON,
450 columns.clone(),
451 Arc::new(source_ctx),
452 )
453 .await
454 .unwrap();
455 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
456
457 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
459 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
460
461 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
462 "orders".to_owned(),
463 0,
464 cdc_message::CdcMessageType::TransactionMeta,
465 ));
466 let msg_meta = MessageMeta {
467 source_meta: &cdc_meta,
468 split_id: "1001",
469 offset: "",
470 };
471
472 let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
473 let res = parser
474 .parse_one_with_txn(
475 None,
476 Some(begin_msg.as_bytes().to_vec()),
477 builder.row_writer().with_meta(msg_meta),
478 )
479 .await;
480 match res {
481 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
482 assert_eq!(id.deref(), expect_tx_id);
483 }
484 _ => panic!("unexpected parse result: {:?}", res),
485 }
486 let res = parser
487 .parse_one_with_txn(
488 None,
489 Some(commit_msg.as_bytes().to_vec()),
490 builder.row_writer().with_meta(msg_meta),
491 )
492 .await;
493 match res {
494 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
495 assert_eq!(id.deref(), expect_tx_id);
496 }
497 _ => panic!("unexpected parse result: {:?}", res),
498 }
499
500 builder.finish_current_chunk();
501 assert!(builder.consume_ready_chunks().next().is_none());
502 }
503
504 #[tokio::test]
505 async fn test_parse_schema_change() {
506 let schema = ColumnCatalog::debezium_cdc_source_cols();
507
508 let columns = schema
509 .iter()
510 .map(|c| SourceColumnDesc::from(&c.column_desc))
511 .collect::<Vec<_>>();
512
513 let source_ctx = SourceContext {
515 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
516 ..SourceContext::dummy()
517 };
518 let mut parser = PlainParser::new(
519 SpecificParserConfig::DEFAULT_PLAIN_JSON,
520 columns.clone(),
521 Arc::new(source_ctx),
522 )
523 .await
524 .unwrap();
525 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
526
527 let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
528 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
529 "mydb.test".to_owned(),
530 0,
531 cdc_message::CdcMessageType::SchemaChange,
532 ));
533 let msg_meta = MessageMeta {
534 source_meta: &cdc_meta,
535 split_id: "1001",
536 offset: "",
537 };
538
539 let res = parser
540 .parse_one_with_txn(
541 None,
542 Some(msg.as_bytes().to_vec()),
543 dummy_builder.row_writer().with_meta(msg_meta),
544 )
545 .await;
546
547 let res = res.unwrap();
548 expect![[r#"
549 SchemaChange(
550 SchemaChangeEnvelope {
551 table_changes: [
552 TableSchemaChange {
553 cdc_table_id: "0.mydb.test",
554 columns: [
555 ColumnCatalog {
556 column_desc: ColumnDesc {
557 data_type: Int32,
558 column_id: #2147483646,
559 name: "id",
560 generated_or_default_column: None,
561 description: None,
562 additional_column: AdditionalColumn {
563 column_type: None,
564 },
565 version: Pr13707,
566 system_column: None,
567 nullable: true,
568 },
569 is_hidden: false,
570 },
571 ColumnCatalog {
572 column_desc: ColumnDesc {
573 data_type: Timestamptz,
574 column_id: #2147483646,
575 name: "v1",
576 generated_or_default_column: None,
577 description: None,
578 additional_column: AdditionalColumn {
579 column_type: None,
580 },
581 version: Pr13707,
582 system_column: None,
583 nullable: true,
584 },
585 is_hidden: false,
586 },
587 ColumnCatalog {
588 column_desc: ColumnDesc {
589 data_type: Varchar,
590 column_id: #2147483646,
591 name: "v2",
592 generated_or_default_column: None,
593 description: None,
594 additional_column: AdditionalColumn {
595 column_type: None,
596 },
597 version: Pr13707,
598 system_column: None,
599 nullable: true,
600 },
601 is_hidden: false,
602 },
603 ],
604 change_type: Alter,
605 upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
606 },
607 ],
608 },
609 )
610 "#]]
611 .assert_debug_eq(&res);
612 }
613}