1use risingwave_common::bail;
16
17use super::unified::json::TimestamptzHandling;
18use super::unified::kv_event::KvEvent;
19use super::{
20 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
21 SpecificParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::parser::bytes_parser::BytesAccessBuilder;
25use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
26use crate::parser::unified::AccessImpl;
27use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
28use crate::parser::upsert_parser::get_key_column_name;
29use crate::parser::{BytesProperties, ParseResult, ParserFormat};
30use crate::source::cdc::CdcMessageType;
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
32
33#[derive(Debug)]
35pub struct PlainParser {
36 pub key_builder: Option<AccessBuilderImpl>,
37 pub payload_builder: AccessBuilderImpl,
38 pub(crate) rw_columns: Vec<SourceColumnDesc>,
39 pub source_ctx: SourceContextRef,
40 pub transaction_meta_builder: Option<AccessBuilderImpl>,
42 pub schema_change_builder: Option<AccessBuilderImpl>,
43}
44
45impl PlainParser {
46 pub async fn new(
47 props: SpecificParserConfig,
48 rw_columns: Vec<SourceColumnDesc>,
49 source_ctx: SourceContextRef,
50 ) -> ConnectorResult<Self> {
51 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
52 Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
53 EncodingProperties::Bytes(BytesProperties {
54 column_name: Some(key_column_name),
55 }),
56 )?))
57 } else {
58 None
59 };
60
61 let payload_builder = match props.encoding_config {
62 EncodingProperties::Json(_)
63 | EncodingProperties::Protobuf(_)
64 | EncodingProperties::Avro(_)
65 | EncodingProperties::Bytes(_) => {
66 AccessBuilderImpl::new_default(props.encoding_config).await?
67 }
68 _ => bail!("Unsupported encoding for Plain"),
69 };
70
71 let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
72 DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?,
73 ));
74
75 let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
76 DebeziumJsonAccessBuilder::new_for_schema_event()?,
77 ));
78
79 Ok(Self {
80 key_builder,
81 payload_builder,
82 rw_columns,
83 source_ctx,
84 transaction_meta_builder,
85 schema_change_builder,
86 })
87 }
88
89 pub async fn parse_inner(
90 &mut self,
91 key: Option<Vec<u8>>,
92 payload: Option<Vec<u8>>,
93 writer: SourceStreamChunkRowWriter<'_>,
94 ) -> ConnectorResult<ParseResult> {
95 if let Some(msg_meta) = writer.row_meta()
98 && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
99 && let Some(data) = payload
100 {
101 match cdc_meta.msg_type {
102 CdcMessageType::Data | CdcMessageType::Heartbeat => {
103 return self.parse_rows(key, Some(data), writer).await;
104 }
105 CdcMessageType::TransactionMeta => {
106 let accessor = self
107 .transaction_meta_builder
108 .as_mut()
109 .expect("expect transaction metadata access builder")
110 .generate_accessor(data, writer.source_meta())
111 .await?;
112 return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
113 {
114 Ok(transaction_control) => {
115 Ok(ParseResult::TransactionControl(transaction_control))
116 }
117 Err(err) => Err(err)?,
118 };
119 }
120 CdcMessageType::SchemaChange => {
121 let accessor = self
122 .schema_change_builder
123 .as_mut()
124 .expect("expect schema change access builder")
125 .generate_accessor(data, writer.source_meta())
126 .await?;
127
128 return match parse_schema_change(
129 &accessor,
130 self.source_ctx.source_id.into(),
131 &self.source_ctx.connector_props,
132 ) {
133 Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
134 Err(err) => Err(err)?,
135 };
136 }
137 CdcMessageType::Unspecified => {
138 unreachable!()
139 }
140 }
141 }
142
143 self.parse_rows(key, payload, writer).await
145 }
146
147 async fn parse_rows(
148 &mut self,
149 key: Option<Vec<u8>>,
150 payload: Option<Vec<u8>>,
151 mut writer: SourceStreamChunkRowWriter<'_>,
152 ) -> ConnectorResult<ParseResult> {
153 let meta = writer.source_meta();
154 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
155
156 if let Some(data) = key
157 && let Some(key_builder) = self.key_builder.as_mut()
158 {
159 row_op.with_key(key_builder.generate_accessor(data, meta).await?);
161 }
162 if let Some(data) = payload {
163 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
165 }
166
167 writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
168
169 Ok(ParseResult::Rows)
170 }
171}
172
173impl ByteStreamSourceParser for PlainParser {
174 fn columns(&self) -> &[SourceColumnDesc] {
175 &self.rw_columns
176 }
177
178 fn source_ctx(&self) -> &SourceContext {
179 &self.source_ctx
180 }
181
182 fn parser_format(&self) -> ParserFormat {
183 ParserFormat::Plain
184 }
185
186 async fn parse_one<'a>(
187 &'a mut self,
188 _key: Option<Vec<u8>>,
189 _payload: Option<Vec<u8>>,
190 _writer: SourceStreamChunkRowWriter<'a>,
191 ) -> ConnectorResult<()> {
192 unreachable!("should call `parse_one_with_txn` instead")
193 }
194
195 async fn parse_one_with_txn<'a>(
196 &'a mut self,
197 key: Option<Vec<u8>>,
198 payload: Option<Vec<u8>>,
199 writer: SourceStreamChunkRowWriter<'a>,
200 ) -> ConnectorResult<ParseResult> {
201 self.parse_inner(key, payload, writer).await
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::ops::Deref;
208 use std::sync::Arc;
209
210 use expect_test::expect;
211 use futures::StreamExt;
212 use futures::executor::block_on;
213 use futures_async_stream::try_stream;
214 use itertools::Itertools;
215 use risingwave_common::catalog::ColumnCatalog;
216 use risingwave_pb::connector_service::cdc_message;
217
218 use super::*;
219 use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
220 use crate::source::cdc::DebeziumCdcMeta;
221 use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
222
223 #[tokio::test]
224 async fn test_emit_transactional_chunk() {
225 let schema = ColumnCatalog::debezium_cdc_source_cols();
226
227 let columns = schema
228 .iter()
229 .map(|c| SourceColumnDesc::from(&c.column_desc))
230 .collect::<Vec<_>>();
231
232 let source_ctx = SourceContext {
233 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
234 ..SourceContext::dummy()
235 };
236 let source_ctx = Arc::new(source_ctx);
237 let parser = PlainParser::new(
239 SpecificParserConfig::DEFAULT_PLAIN_JSON,
240 columns.clone(),
241 source_ctx.clone(),
242 )
243 .await
244 .unwrap();
245
246 let mut transactional = false;
247 let message_stream = source_message_stream(transactional);
249 let chunk_stream = crate::parser::parse_message_stream(
250 parser,
251 message_stream.boxed(),
252 SourceCtrlOpts::for_test(),
253 );
254 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
255 .into_iter()
256 .collect();
257 let output = output
258 .unwrap()
259 .into_iter()
260 .filter(|c| c.cardinality() > 0)
261 .enumerate()
262 .map(|(i, c)| {
263 if i == 0 {
264 assert_eq!(4, c.cardinality());
266 }
267 if i == 1 {
268 assert_eq!(3, c.cardinality());
270 }
271 c
272 })
273 .collect_vec();
274
275 assert_eq!(2, output.len());
277
278 let parser = PlainParser::new(
280 SpecificParserConfig::DEFAULT_PLAIN_JSON,
281 columns.clone(),
282 source_ctx,
283 )
284 .await
285 .unwrap();
286
287 transactional = true;
289 let message_stream = source_message_stream(transactional);
290 let chunk_stream = crate::parser::parse_message_stream(
291 parser,
292 message_stream.boxed(),
293 SourceCtrlOpts::for_test(),
294 );
295 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
296 .into_iter()
297 .collect();
298 let output = output
299 .unwrap()
300 .into_iter()
301 .filter(|c| c.cardinality() > 0)
302 .inspect(|c| {
303 assert_eq!(5, c.cardinality());
305 })
306 .collect_vec();
307
308 assert_eq!(1, output.len());
310 }
311
312 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
313 async fn source_message_stream(transactional: bool) {
314 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
315 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
316 let data_batches = [
317 vec![
318 r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
319 r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
320 r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
321 ],
322 vec![
323 r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
324 r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
325 ],
326 ];
327 for (i, batch) in data_batches.iter().enumerate() {
328 let mut source_msg_batch = vec![];
329 if i == 0 {
330 source_msg_batch.push(SourceMessage {
332 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
333 "orders".to_owned(),
334 0,
335 if transactional {
336 cdc_message::CdcMessageType::TransactionMeta
337 } else {
338 cdc_message::CdcMessageType::Data
339 },
340 )),
341 split_id: SplitId::from("1001"),
342 offset: "0".into(),
343 key: None,
344 payload: Some(begin_msg.as_bytes().to_vec()),
345 });
346 }
347 for data_msg in batch {
349 source_msg_batch.push(SourceMessage {
350 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
351 "orders".to_owned(),
352 0,
353 cdc_message::CdcMessageType::Data,
354 )),
355 split_id: SplitId::from("1001"),
356 offset: "0".into(),
357 key: None,
358 payload: Some(data_msg.as_bytes().to_vec()),
359 });
360 }
361 if i == data_batches.len() - 1 {
362 source_msg_batch.push(SourceMessage {
364 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
365 "orders".to_owned(),
366 0,
367 if transactional {
368 cdc_message::CdcMessageType::TransactionMeta
369 } else {
370 cdc_message::CdcMessageType::Data
371 },
372 )),
373 split_id: SplitId::from("1001"),
374 offset: "0".into(),
375 key: None,
376 payload: Some(commit_msg.as_bytes().to_vec()),
377 });
378 }
379 yield source_msg_batch;
380 }
381 }
382
383 #[tokio::test]
384 async fn test_parse_transaction_metadata() {
385 let schema = ColumnCatalog::debezium_cdc_source_cols();
386
387 let columns = schema
388 .iter()
389 .map(|c| SourceColumnDesc::from(&c.column_desc))
390 .collect::<Vec<_>>();
391
392 let source_ctx = SourceContext {
394 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
395 ..SourceContext::dummy()
396 };
397 let mut parser = PlainParser::new(
398 SpecificParserConfig::DEFAULT_PLAIN_JSON,
399 columns.clone(),
400 Arc::new(source_ctx),
401 )
402 .await
403 .unwrap();
404 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
405
406 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
408 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
409
410 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
411 "orders".to_owned(),
412 0,
413 cdc_message::CdcMessageType::TransactionMeta,
414 ));
415 let msg_meta = MessageMeta {
416 source_meta: &cdc_meta,
417 split_id: "1001",
418 offset: "",
419 };
420
421 let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
422 let res = parser
423 .parse_one_with_txn(
424 None,
425 Some(begin_msg.as_bytes().to_vec()),
426 builder.row_writer().with_meta(msg_meta),
427 )
428 .await;
429 match res {
430 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
431 assert_eq!(id.deref(), expect_tx_id);
432 }
433 _ => panic!("unexpected parse result: {:?}", res),
434 }
435 let res = parser
436 .parse_one_with_txn(
437 None,
438 Some(commit_msg.as_bytes().to_vec()),
439 builder.row_writer().with_meta(msg_meta),
440 )
441 .await;
442 match res {
443 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
444 assert_eq!(id.deref(), expect_tx_id);
445 }
446 _ => panic!("unexpected parse result: {:?}", res),
447 }
448
449 builder.finish_current_chunk();
450 assert!(builder.consume_ready_chunks().next().is_none());
451 }
452
453 #[tokio::test]
454 async fn test_parse_schema_change() {
455 let schema = ColumnCatalog::debezium_cdc_source_cols();
456
457 let columns = schema
458 .iter()
459 .map(|c| SourceColumnDesc::from(&c.column_desc))
460 .collect::<Vec<_>>();
461
462 let source_ctx = SourceContext {
464 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
465 ..SourceContext::dummy()
466 };
467 let mut parser = PlainParser::new(
468 SpecificParserConfig::DEFAULT_PLAIN_JSON,
469 columns.clone(),
470 Arc::new(source_ctx),
471 )
472 .await
473 .unwrap();
474 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
475
476 let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
477 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
478 "mydb.test".to_owned(),
479 0,
480 cdc_message::CdcMessageType::SchemaChange,
481 ));
482 let msg_meta = MessageMeta {
483 source_meta: &cdc_meta,
484 split_id: "1001",
485 offset: "",
486 };
487
488 let res = parser
489 .parse_one_with_txn(
490 None,
491 Some(msg.as_bytes().to_vec()),
492 dummy_builder.row_writer().with_meta(msg_meta),
493 )
494 .await;
495
496 let res = res.unwrap();
497 expect![[r#"
498 SchemaChange(
499 SchemaChangeEnvelope {
500 table_changes: [
501 TableSchemaChange {
502 cdc_table_id: "0.mydb.test",
503 columns: [
504 ColumnCatalog {
505 column_desc: ColumnDesc {
506 data_type: Int32,
507 column_id: #2147483646,
508 name: "id",
509 generated_or_default_column: None,
510 description: None,
511 additional_column: AdditionalColumn {
512 column_type: None,
513 },
514 version: Pr13707,
515 system_column: None,
516 nullable: true,
517 },
518 is_hidden: false,
519 },
520 ColumnCatalog {
521 column_desc: ColumnDesc {
522 data_type: Timestamptz,
523 column_id: #2147483646,
524 name: "v1",
525 generated_or_default_column: None,
526 description: None,
527 additional_column: AdditionalColumn {
528 column_type: None,
529 },
530 version: Pr13707,
531 system_column: None,
532 nullable: true,
533 },
534 is_hidden: false,
535 },
536 ColumnCatalog {
537 column_desc: ColumnDesc {
538 data_type: Varchar,
539 column_id: #2147483646,
540 name: "v2",
541 generated_or_default_column: None,
542 description: None,
543 additional_column: AdditionalColumn {
544 column_type: None,
545 },
546 version: Pr13707,
547 system_column: None,
548 nullable: true,
549 },
550 is_hidden: false,
551 },
552 ],
553 change_type: Alter,
554 upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
555 },
556 ],
557 },
558 )
559 "#]]
560 .assert_debug_eq(&res);
561 }
562}