risingwave_connector/parser/
plain_parser.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24    SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36/// Parser for `FORMAT PLAIN`, i.e., append-only source.
37#[derive(Debug)]
38pub struct PlainParser {
39    pub key_builder: Option<AccessBuilderImpl>,
40    pub payload_builder: AccessBuilderImpl,
41    pub(crate) rw_columns: Vec<SourceColumnDesc>,
42    pub source_ctx: SourceContextRef,
43    // parsing transaction metadata for shared cdc source
44    pub transaction_meta_builder: Option<AccessBuilderImpl>,
45    pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49    pub async fn new(
50        props: SpecificParserConfig,
51        rw_columns: Vec<SourceColumnDesc>,
52        source_ctx: SourceContextRef,
53    ) -> ConnectorResult<Self> {
54        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56                EncodingProperties::Bytes(BytesProperties {
57                    column_name: Some(key_column_name),
58                }),
59            )?))
60        } else {
61            None
62        };
63
64        let payload_builder = match props.encoding_config {
65            EncodingProperties::Json(_)
66            | EncodingProperties::Protobuf(_)
67            | EncodingProperties::Avro(_)
68            | EncodingProperties::Bytes(_) => {
69                AccessBuilderImpl::new_default(props.encoding_config).await?
70            }
71            _ => bail!("Unsupported encoding for Plain"),
72        };
73
74        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75            DebeziumJsonAccessBuilder::new(
76                TimestamptzHandling::GuessNumberUnit,
77                TimestampHandling::GuessNumberUnit,
78                TimeHandling::Micro,
79                BigintUnsignedHandlingMode::Long,
80                false,
81            )?,
82        ));
83
84        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85            DebeziumJsonAccessBuilder::new_for_schema_event()?,
86        ));
87
88        Ok(Self {
89            key_builder,
90            payload_builder,
91            rw_columns,
92            source_ctx,
93            transaction_meta_builder,
94            schema_change_builder,
95        })
96    }
97
98    pub async fn parse_inner(
99        &mut self,
100        key: Option<Vec<u8>>,
101        payload: Option<Vec<u8>>,
102        writer: SourceStreamChunkRowWriter<'_>,
103    ) -> ConnectorResult<ParseResult> {
104        // plain parser also used in the shared cdc source,
105        // we need to handle transaction metadata and schema change messages here
106        if let Some(msg_meta) = writer.row_meta()
107            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108            && let Some(data) = payload
109        {
110            match cdc_meta.msg_type {
111                CdcMessageType::Data | CdcMessageType::Heartbeat => {
112                    return self.parse_rows(key, Some(data), writer).await;
113                }
114                CdcMessageType::TransactionMeta => {
115                    let accessor = self
116                        .transaction_meta_builder
117                        .as_mut()
118                        .expect("expect transaction metadata access builder")
119                        .generate_accessor(data, writer.source_meta())
120                        .await?;
121                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122                    {
123                        Ok(transaction_control) => {
124                            Ok(ParseResult::TransactionControl(transaction_control))
125                        }
126                        Err(err) => Err(err)?,
127                    };
128                }
129                CdcMessageType::SchemaChange => {
130                    let accessor = self
131                        .schema_change_builder
132                        .as_mut()
133                        .expect("expect schema change access builder")
134                        .generate_accessor(data, writer.source_meta())
135                        .await?;
136
137                    return match parse_schema_change(
138                        &accessor,
139                        self.source_ctx.source_id,
140                        &self.source_ctx.source_name,
141                        &self.source_ctx.connector_props,
142                    ) {
143                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
144                        Err(err) => {
145                            // Report CDC auto schema change fail event
146                            let (fail_info, table_name, cdc_table_id) = match &err {
147                                crate::parser::AccessError::CdcAutoSchemaChangeError {
148                                    ty,
149                                    table_name,
150                                    ..
151                                } => {
152                                    // Parse table_name format: "schema"."table" -> schema.table
153                                    let clean_table_name =
154                                        table_name.trim_matches('"').replace("\".\"", ".");
155                                    let fail_info = format!(
156                                        "Unsupported data type '{}' in source '{}' table '{}'",
157                                        ty, self.source_ctx.source_name, clean_table_name
158                                    );
159                                    // Build cdc_table_id: source_name.schema.table_name
160                                    let cdc_table_id = format!(
161                                        "{}.{}",
162                                        self.source_ctx.source_name, clean_table_name
163                                    );
164
165                                    (fail_info, clean_table_name, cdc_table_id)
166                                }
167                                _ => {
168                                    let fail_info = format!(
169                                        "Failed to parse schema change: {:?}, source: {}",
170                                        err.as_report(),
171                                        self.source_ctx.source_name
172                                    );
173                                    (fail_info, "".to_owned(), "".to_owned())
174                                }
175                            };
176                            self.source_ctx.on_cdc_auto_schema_change_failure(
177                                self.source_ctx.source_id,
178                                table_name,
179                                cdc_table_id,
180                                "".to_owned(), // upstream_ddl is not available in this context
181                                fail_info,
182                            );
183
184                            Err(err)?
185                        }
186                    };
187                }
188                CdcMessageType::Unspecified => {
189                    unreachable!()
190                }
191            }
192        }
193
194        // for non-cdc source messages
195        self.parse_rows(key, payload, writer).await
196    }
197
198    async fn parse_rows(
199        &mut self,
200        key: Option<Vec<u8>>,
201        payload: Option<Vec<u8>>,
202        mut writer: SourceStreamChunkRowWriter<'_>,
203    ) -> ConnectorResult<ParseResult> {
204        let meta = writer.source_meta();
205        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
206
207        if let Some(data) = key
208            && let Some(key_builder) = self.key_builder.as_mut()
209        {
210            // key is optional in format plain
211            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
212        }
213        if let Some(data) = payload {
214            // the data part also can be an empty vec
215            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
216        }
217
218        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
219
220        Ok(ParseResult::Rows)
221    }
222}
223
224impl ByteStreamSourceParser for PlainParser {
225    fn columns(&self) -> &[SourceColumnDesc] {
226        &self.rw_columns
227    }
228
229    fn source_ctx(&self) -> &SourceContext {
230        &self.source_ctx
231    }
232
233    fn parser_format(&self) -> ParserFormat {
234        ParserFormat::Plain
235    }
236
237    async fn parse_one<'a>(
238        &'a mut self,
239        _key: Option<Vec<u8>>,
240        _payload: Option<Vec<u8>>,
241        _writer: SourceStreamChunkRowWriter<'a>,
242    ) -> ConnectorResult<()> {
243        unreachable!("should call `parse_one_with_txn` instead")
244    }
245
246    async fn parse_one_with_txn<'a>(
247        &'a mut self,
248        key: Option<Vec<u8>>,
249        payload: Option<Vec<u8>>,
250        writer: SourceStreamChunkRowWriter<'a>,
251    ) -> ConnectorResult<ParseResult> {
252        self.parse_inner(key, payload, writer).await
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use std::ops::Deref;
259    use std::sync::Arc;
260
261    use expect_test::expect;
262    use futures::executor::block_on;
263    use futures::{StreamExt, TryStreamExt};
264    use futures_async_stream::try_stream;
265    use itertools::Itertools;
266    use risingwave_common::catalog::ColumnCatalog;
267    use risingwave_pb::connector_service::{SourceType, cdc_message};
268
269    use super::*;
270    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
271    use crate::source::cdc::DebeziumCdcMeta;
272    use crate::source::{
273        ConnectorProperties, SourceCtrlOpts, SourceMessage, SourceMessageEvent, SourceReaderEvent,
274        SplitId,
275    };
276
277    #[tokio::test]
278    async fn test_emit_transactional_chunk() {
279        let schema = ColumnCatalog::debezium_cdc_source_cols();
280
281        let columns = schema
282            .iter()
283            .map(|c| SourceColumnDesc::from(&c.column_desc))
284            .collect::<Vec<_>>();
285
286        let source_ctx = SourceContext {
287            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
288            ..SourceContext::dummy()
289        };
290        let source_ctx = Arc::new(source_ctx);
291        // format plain encode json parser
292        let parser = PlainParser::new(
293            SpecificParserConfig::DEFAULT_PLAIN_JSON,
294            columns.clone(),
295            source_ctx.clone(),
296        )
297        .await
298        .unwrap();
299
300        let mut transactional = false;
301        // for untransactional source, we expect emit a chunk for each message batch
302        let message_stream = source_message_stream(transactional);
303        let chunk_stream = crate::parser::parse_message_stream(
304            parser,
305            message_stream.map_ok(SourceMessageEvent::Data).boxed(),
306            SourceCtrlOpts::for_test(),
307        );
308        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
309        let output = output
310            .unwrap()
311            .into_iter()
312            .filter_map(|event| match event {
313                SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
314                SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
315            })
316            .enumerate()
317            .map(|(i, c)| {
318                if i == 0 {
319                    // begin + 3 data messages
320                    assert_eq!(4, c.cardinality());
321                }
322                if i == 1 {
323                    // 2 data messages + 1 end
324                    assert_eq!(3, c.cardinality());
325                }
326                c
327            })
328            .collect_vec();
329
330        // 2 chunks for 2 message batches
331        assert_eq!(2, output.len());
332
333        // format plain encode json parser
334        let parser = PlainParser::new(
335            SpecificParserConfig::DEFAULT_PLAIN_JSON,
336            columns.clone(),
337            source_ctx,
338        )
339        .await
340        .unwrap();
341
342        // for transactional source, we expect emit a single chunk for the transaction
343        transactional = true;
344        let message_stream = source_message_stream(transactional);
345        let chunk_stream = crate::parser::parse_message_stream(
346            parser,
347            message_stream.map_ok(SourceMessageEvent::Data).boxed(),
348            SourceCtrlOpts::for_test(),
349        );
350        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
351        let output = output
352            .unwrap()
353            .into_iter()
354            .filter_map(|event| match event {
355                SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
356                SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
357            })
358            .inspect(|c| {
359                // 5 data messages in a single chunk
360                assert_eq!(5, c.cardinality());
361            })
362            .collect_vec();
363
364        // a single transactional chunk
365        assert_eq!(1, output.len());
366    }
367
368    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
369    async fn source_message_stream(transactional: bool) {
370        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
371        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
372        let data_batches = [
373            vec![
374                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
375                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
376                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
377            ],
378            vec![
379                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
380                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
381            ],
382        ];
383        for (i, batch) in data_batches.iter().enumerate() {
384            let mut source_msg_batch = vec![];
385            if i == 0 {
386                // put begin message at first
387                source_msg_batch.push(SourceMessage {
388                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
389                        "orders".to_owned(),
390                        0,
391                        if transactional {
392                            cdc_message::CdcMessageType::TransactionMeta
393                        } else {
394                            cdc_message::CdcMessageType::Data
395                        },
396                        SourceType::Unspecified,
397                    )),
398                    split_id: SplitId::from("1001"),
399                    offset: "0".into(),
400                    key: None,
401                    payload: Some(begin_msg.as_bytes().to_vec()),
402                });
403            }
404            // put data messages
405            for data_msg in batch {
406                source_msg_batch.push(SourceMessage {
407                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
408                        "orders".to_owned(),
409                        0,
410                        cdc_message::CdcMessageType::Data,
411                        SourceType::Unspecified,
412                    )),
413                    split_id: SplitId::from("1001"),
414                    offset: "0".into(),
415                    key: None,
416                    payload: Some(data_msg.as_bytes().to_vec()),
417                });
418            }
419            if i == data_batches.len() - 1 {
420                // put commit message at last
421                source_msg_batch.push(SourceMessage {
422                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
423                        "orders".to_owned(),
424                        0,
425                        if transactional {
426                            cdc_message::CdcMessageType::TransactionMeta
427                        } else {
428                            cdc_message::CdcMessageType::Data
429                        },
430                        SourceType::Unspecified,
431                    )),
432                    split_id: SplitId::from("1001"),
433                    offset: "0".into(),
434                    key: None,
435                    payload: Some(commit_msg.as_bytes().to_vec()),
436                });
437            }
438            yield source_msg_batch;
439        }
440    }
441
442    #[tokio::test]
443    async fn test_parse_transaction_metadata() {
444        let schema = ColumnCatalog::debezium_cdc_source_cols();
445
446        let columns = schema
447            .iter()
448            .map(|c| SourceColumnDesc::from(&c.column_desc))
449            .collect::<Vec<_>>();
450
451        // format plain encode json parser
452        let source_ctx = SourceContext {
453            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
454            ..SourceContext::dummy()
455        };
456        let mut parser = PlainParser::new(
457            SpecificParserConfig::DEFAULT_PLAIN_JSON,
458            columns.clone(),
459            Arc::new(source_ctx),
460        )
461        .await
462        .unwrap();
463        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
464
465        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
466        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
467        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
468
469        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
470            "orders".to_owned(),
471            0,
472            cdc_message::CdcMessageType::TransactionMeta,
473            SourceType::Unspecified,
474        ));
475        let msg_meta = MessageMeta {
476            source_meta: &cdc_meta,
477            split_id: "1001",
478            offset: "",
479        };
480
481        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
482        let res = parser
483            .parse_one_with_txn(
484                None,
485                Some(begin_msg.as_bytes().to_vec()),
486                builder.row_writer().with_meta(msg_meta),
487            )
488            .await;
489        match res {
490            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
491                assert_eq!(id.deref(), expect_tx_id);
492            }
493            _ => panic!("unexpected parse result: {:?}", res),
494        }
495        let res = parser
496            .parse_one_with_txn(
497                None,
498                Some(commit_msg.as_bytes().to_vec()),
499                builder.row_writer().with_meta(msg_meta),
500            )
501            .await;
502        match res {
503            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
504                assert_eq!(id.deref(), expect_tx_id);
505            }
506            _ => panic!("unexpected parse result: {:?}", res),
507        }
508
509        builder.finish_current_chunk();
510        assert!(builder.consume_ready_chunks().next().is_none());
511    }
512
513    #[tokio::test]
514    async fn test_parse_schema_change() {
515        let schema = ColumnCatalog::debezium_cdc_source_cols();
516
517        let columns = schema
518            .iter()
519            .map(|c| SourceColumnDesc::from(&c.column_desc))
520            .collect::<Vec<_>>();
521
522        // format plain encode json parser
523        let source_ctx = SourceContext {
524            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
525            ..SourceContext::dummy()
526        };
527        let mut parser = PlainParser::new(
528            SpecificParserConfig::DEFAULT_PLAIN_JSON,
529            columns.clone(),
530            Arc::new(source_ctx),
531        )
532        .await
533        .unwrap();
534        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
535
536        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
537        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
538            "mydb.test".to_owned(),
539            0,
540            cdc_message::CdcMessageType::SchemaChange,
541            SourceType::Mysql,
542        ));
543        let msg_meta = MessageMeta {
544            source_meta: &cdc_meta,
545            split_id: "1001",
546            offset: "",
547        };
548
549        let res = parser
550            .parse_one_with_txn(
551                None,
552                Some(msg.as_bytes().to_vec()),
553                dummy_builder.row_writer().with_meta(msg_meta),
554            )
555            .await;
556
557        let res = res.unwrap();
558        expect![[r#"
559            SchemaChange(
560                SchemaChangeEnvelope {
561                    table_changes: [
562                        TableSchemaChange {
563                            cdc_table_id: "0.mydb.test",
564                            columns: [
565                                ColumnCatalog {
566                                    column_desc: ColumnDesc {
567                                        data_type: Int32,
568                                        column_id: #2147483646,
569                                        name: "id",
570                                        generated_or_default_column: None,
571                                        description: None,
572                                        additional_column: AdditionalColumn {
573                                            column_type: None,
574                                        },
575                                        version: Pr13707,
576                                        system_column: None,
577                                        nullable: true,
578                                    },
579                                    is_hidden: false,
580                                },
581                                ColumnCatalog {
582                                    column_desc: ColumnDesc {
583                                        data_type: Timestamptz,
584                                        column_id: #2147483646,
585                                        name: "v1",
586                                        generated_or_default_column: None,
587                                        description: None,
588                                        additional_column: AdditionalColumn {
589                                            column_type: None,
590                                        },
591                                        version: Pr13707,
592                                        system_column: None,
593                                        nullable: true,
594                                    },
595                                    is_hidden: false,
596                                },
597                                ColumnCatalog {
598                                    column_desc: ColumnDesc {
599                                        data_type: Varchar,
600                                        column_id: #2147483646,
601                                        name: "v2",
602                                        generated_or_default_column: None,
603                                        description: None,
604                                        additional_column: AdditionalColumn {
605                                            column_type: None,
606                                        },
607                                        version: Pr13707,
608                                        system_column: None,
609                                        nullable: true,
610                                    },
611                                    is_hidden: false,
612                                },
613                            ],
614                            change_type: Alter,
615                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
616                        },
617                    ],
618                },
619            )
620        "#]]
621        .assert_debug_eq(&res);
622    }
623}