risingwave_connector/parser/
plain_parser.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24    SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36/// Parser for `FORMAT PLAIN`, i.e., append-only source.
37#[derive(Debug)]
38pub struct PlainParser {
39    pub key_builder: Option<AccessBuilderImpl>,
40    pub payload_builder: AccessBuilderImpl,
41    pub(crate) rw_columns: Vec<SourceColumnDesc>,
42    pub source_ctx: SourceContextRef,
43    // parsing transaction metadata for shared cdc source
44    pub transaction_meta_builder: Option<AccessBuilderImpl>,
45    pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49    pub async fn new(
50        props: SpecificParserConfig,
51        rw_columns: Vec<SourceColumnDesc>,
52        source_ctx: SourceContextRef,
53    ) -> ConnectorResult<Self> {
54        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56                EncodingProperties::Bytes(BytesProperties {
57                    column_name: Some(key_column_name),
58                }),
59            )?))
60        } else {
61            None
62        };
63
64        let payload_builder = match props.encoding_config {
65            EncodingProperties::Json(_)
66            | EncodingProperties::Protobuf(_)
67            | EncodingProperties::Avro(_)
68            | EncodingProperties::Bytes(_) => {
69                AccessBuilderImpl::new_default(props.encoding_config).await?
70            }
71            _ => bail!("Unsupported encoding for Plain"),
72        };
73
74        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75            DebeziumJsonAccessBuilder::new(
76                TimestamptzHandling::GuessNumberUnit,
77                TimestampHandling::GuessNumberUnit,
78                TimeHandling::Micro,
79                BigintUnsignedHandlingMode::Long,
80                false,
81            )?,
82        ));
83
84        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85            DebeziumJsonAccessBuilder::new_for_schema_event()?,
86        ));
87
88        Ok(Self {
89            key_builder,
90            payload_builder,
91            rw_columns,
92            source_ctx,
93            transaction_meta_builder,
94            schema_change_builder,
95        })
96    }
97
98    pub async fn parse_inner(
99        &mut self,
100        key: Option<Vec<u8>>,
101        payload: Option<Vec<u8>>,
102        writer: SourceStreamChunkRowWriter<'_>,
103    ) -> ConnectorResult<ParseResult> {
104        // plain parser also used in the shared cdc source,
105        // we need to handle transaction metadata and schema change messages here
106        if let Some(msg_meta) = writer.row_meta()
107            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108            && let Some(data) = payload
109        {
110            match cdc_meta.msg_type {
111                CdcMessageType::Data | CdcMessageType::Heartbeat => {
112                    return self.parse_rows(key, Some(data), writer).await;
113                }
114                CdcMessageType::TransactionMeta => {
115                    let accessor = self
116                        .transaction_meta_builder
117                        .as_mut()
118                        .expect("expect transaction metadata access builder")
119                        .generate_accessor(data, writer.source_meta())
120                        .await?;
121                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122                    {
123                        Ok(transaction_control) => {
124                            Ok(ParseResult::TransactionControl(transaction_control))
125                        }
126                        Err(err) => Err(err)?,
127                    };
128                }
129                CdcMessageType::SchemaChange => {
130                    let accessor = self
131                        .schema_change_builder
132                        .as_mut()
133                        .expect("expect schema change access builder")
134                        .generate_accessor(data, writer.source_meta())
135                        .await?;
136
137                    return match parse_schema_change(
138                        &accessor,
139                        self.source_ctx.source_id,
140                        &self.source_ctx.source_name,
141                        &self.source_ctx.connector_props,
142                    )
143                    .await
144                    {
145                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
146                        Err(err) => {
147                            // Report CDC auto schema change fail event
148                            let (fail_info, table_name, cdc_table_id) = match &err {
149                                crate::parser::AccessError::CdcAutoSchemaChangeError {
150                                    ty,
151                                    table_name,
152                                    ..
153                                } => {
154                                    // Parse table_name format: "schema"."table" -> schema.table
155                                    let clean_table_name =
156                                        table_name.trim_matches('"').replace("\".\"", ".");
157                                    let fail_info = format!(
158                                        "Unsupported data type '{}' in source '{}' table '{}'",
159                                        ty, self.source_ctx.source_name, clean_table_name
160                                    );
161                                    // Build cdc_table_id: source_name.schema.table_name
162                                    let cdc_table_id = format!(
163                                        "{}.{}",
164                                        self.source_ctx.source_name, clean_table_name
165                                    );
166
167                                    (fail_info, clean_table_name, cdc_table_id)
168                                }
169                                _ => {
170                                    let fail_info = format!(
171                                        "Failed to parse schema change: {:?}, source: {}",
172                                        err.as_report(),
173                                        self.source_ctx.source_name
174                                    );
175                                    (fail_info, "".to_owned(), "".to_owned())
176                                }
177                            };
178                            self.source_ctx.on_cdc_auto_schema_change_failure(
179                                self.source_ctx.source_id,
180                                table_name,
181                                cdc_table_id,
182                                "".to_owned(), // upstream_ddl is not available in this context
183                                fail_info,
184                            );
185
186                            Err(err)?
187                        }
188                    };
189                }
190                CdcMessageType::Unspecified => {
191                    unreachable!()
192                }
193            }
194        }
195
196        // for non-cdc source messages
197        self.parse_rows(key, payload, writer).await
198    }
199
200    async fn parse_rows(
201        &mut self,
202        key: Option<Vec<u8>>,
203        payload: Option<Vec<u8>>,
204        mut writer: SourceStreamChunkRowWriter<'_>,
205    ) -> ConnectorResult<ParseResult> {
206        let meta = writer.source_meta();
207        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
208
209        if let Some(data) = key
210            && let Some(key_builder) = self.key_builder.as_mut()
211        {
212            // key is optional in format plain
213            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
214        }
215        if let Some(data) = payload {
216            // the data part also can be an empty vec
217            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
218        }
219
220        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
221
222        Ok(ParseResult::Rows)
223    }
224}
225
226impl ByteStreamSourceParser for PlainParser {
227    fn columns(&self) -> &[SourceColumnDesc] {
228        &self.rw_columns
229    }
230
231    fn source_ctx(&self) -> &SourceContext {
232        &self.source_ctx
233    }
234
235    fn parser_format(&self) -> ParserFormat {
236        ParserFormat::Plain
237    }
238
239    async fn parse_one<'a>(
240        &'a mut self,
241        _key: Option<Vec<u8>>,
242        _payload: Option<Vec<u8>>,
243        _writer: SourceStreamChunkRowWriter<'a>,
244    ) -> ConnectorResult<()> {
245        unreachable!("should call `parse_one_with_txn` instead")
246    }
247
248    async fn parse_one_with_txn<'a>(
249        &'a mut self,
250        key: Option<Vec<u8>>,
251        payload: Option<Vec<u8>>,
252        writer: SourceStreamChunkRowWriter<'a>,
253    ) -> ConnectorResult<ParseResult> {
254        self.parse_inner(key, payload, writer).await
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use std::ops::Deref;
261    use std::sync::Arc;
262
263    use expect_test::expect;
264    use futures::executor::block_on;
265    use futures::{StreamExt, TryStreamExt};
266    use futures_async_stream::try_stream;
267    use itertools::Itertools;
268    use risingwave_common::catalog::ColumnCatalog;
269    use risingwave_pb::connector_service::{SourceType, cdc_message};
270
271    use super::*;
272    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
273    use crate::source::cdc::DebeziumCdcMeta;
274    use crate::source::{
275        ConnectorProperties, SourceCtrlOpts, SourceMessage, SourceMessageEvent, SourceReaderEvent,
276        SplitId,
277    };
278
279    #[tokio::test]
280    async fn test_emit_transactional_chunk() {
281        let schema = ColumnCatalog::debezium_cdc_source_cols();
282
283        let columns = schema
284            .iter()
285            .map(|c| SourceColumnDesc::from(&c.column_desc))
286            .collect::<Vec<_>>();
287
288        let source_ctx = SourceContext {
289            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
290            ..SourceContext::dummy()
291        };
292        let source_ctx = Arc::new(source_ctx);
293        // format plain encode json parser
294        let parser = PlainParser::new(
295            SpecificParserConfig::DEFAULT_PLAIN_JSON,
296            columns.clone(),
297            source_ctx.clone(),
298        )
299        .await
300        .unwrap();
301
302        let mut transactional = false;
303        // for untransactional source, we expect emit a chunk for each message batch
304        let message_stream = source_message_stream(transactional);
305        let chunk_stream = crate::parser::parse_message_stream(
306            parser,
307            message_stream.map_ok(SourceMessageEvent::Data).boxed(),
308            SourceCtrlOpts::for_test(),
309        );
310        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
311        let output = output
312            .unwrap()
313            .into_iter()
314            .filter_map(|event| match event {
315                SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
316                SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
317            })
318            .enumerate()
319            .map(|(i, c)| {
320                if i == 0 {
321                    // begin + 3 data messages
322                    assert_eq!(4, c.cardinality());
323                }
324                if i == 1 {
325                    // 2 data messages + 1 end
326                    assert_eq!(3, c.cardinality());
327                }
328                c
329            })
330            .collect_vec();
331
332        // 2 chunks for 2 message batches
333        assert_eq!(2, output.len());
334
335        // format plain encode json parser
336        let parser = PlainParser::new(
337            SpecificParserConfig::DEFAULT_PLAIN_JSON,
338            columns.clone(),
339            source_ctx,
340        )
341        .await
342        .unwrap();
343
344        // for transactional source, we expect emit a single chunk for the transaction
345        transactional = true;
346        let message_stream = source_message_stream(transactional);
347        let chunk_stream = crate::parser::parse_message_stream(
348            parser,
349            message_stream.map_ok(SourceMessageEvent::Data).boxed(),
350            SourceCtrlOpts::for_test(),
351        );
352        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
353        let output = output
354            .unwrap()
355            .into_iter()
356            .filter_map(|event| match event {
357                SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
358                SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
359            })
360            .inspect(|c| {
361                // 5 data messages in a single chunk
362                assert_eq!(5, c.cardinality());
363            })
364            .collect_vec();
365
366        // a single transactional chunk
367        assert_eq!(1, output.len());
368    }
369
370    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
371    async fn source_message_stream(transactional: bool) {
372        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
373        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
374        let data_batches = [
375            vec![
376                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
377                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
378                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
379            ],
380            vec![
381                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
382                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
383            ],
384        ];
385        for (i, batch) in data_batches.iter().enumerate() {
386            let mut source_msg_batch = vec![];
387            if i == 0 {
388                // put begin message at first
389                source_msg_batch.push(SourceMessage {
390                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
391                        "orders".to_owned(),
392                        0,
393                        if transactional {
394                            cdc_message::CdcMessageType::TransactionMeta
395                        } else {
396                            cdc_message::CdcMessageType::Data
397                        },
398                        SourceType::Unspecified,
399                    )),
400                    split_id: SplitId::from("1001"),
401                    offset: "0".into(),
402                    key: None,
403                    payload: Some(begin_msg.as_bytes().to_vec()),
404                });
405            }
406            // put data messages
407            for data_msg in batch {
408                source_msg_batch.push(SourceMessage {
409                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
410                        "orders".to_owned(),
411                        0,
412                        cdc_message::CdcMessageType::Data,
413                        SourceType::Unspecified,
414                    )),
415                    split_id: SplitId::from("1001"),
416                    offset: "0".into(),
417                    key: None,
418                    payload: Some(data_msg.as_bytes().to_vec()),
419                });
420            }
421            if i == data_batches.len() - 1 {
422                // put commit message at last
423                source_msg_batch.push(SourceMessage {
424                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
425                        "orders".to_owned(),
426                        0,
427                        if transactional {
428                            cdc_message::CdcMessageType::TransactionMeta
429                        } else {
430                            cdc_message::CdcMessageType::Data
431                        },
432                        SourceType::Unspecified,
433                    )),
434                    split_id: SplitId::from("1001"),
435                    offset: "0".into(),
436                    key: None,
437                    payload: Some(commit_msg.as_bytes().to_vec()),
438                });
439            }
440            yield source_msg_batch;
441        }
442    }
443
444    #[tokio::test]
445    async fn test_parse_transaction_metadata() {
446        let schema = ColumnCatalog::debezium_cdc_source_cols();
447
448        let columns = schema
449            .iter()
450            .map(|c| SourceColumnDesc::from(&c.column_desc))
451            .collect::<Vec<_>>();
452
453        // format plain encode json parser
454        let source_ctx = SourceContext {
455            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
456            ..SourceContext::dummy()
457        };
458        let mut parser = PlainParser::new(
459            SpecificParserConfig::DEFAULT_PLAIN_JSON,
460            columns.clone(),
461            Arc::new(source_ctx),
462        )
463        .await
464        .unwrap();
465        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
466
467        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
468        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
469        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
470
471        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
472            "orders".to_owned(),
473            0,
474            cdc_message::CdcMessageType::TransactionMeta,
475            SourceType::Unspecified,
476        ));
477        let msg_meta = MessageMeta {
478            source_meta: &cdc_meta,
479            split_id: "1001",
480            offset: "",
481        };
482
483        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
484        let res = parser
485            .parse_one_with_txn(
486                None,
487                Some(begin_msg.as_bytes().to_vec()),
488                builder.row_writer().with_meta(msg_meta),
489            )
490            .await;
491        match res {
492            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
493                assert_eq!(id.deref(), expect_tx_id);
494            }
495            _ => panic!("unexpected parse result: {:?}", res),
496        }
497        let res = parser
498            .parse_one_with_txn(
499                None,
500                Some(commit_msg.as_bytes().to_vec()),
501                builder.row_writer().with_meta(msg_meta),
502            )
503            .await;
504        match res {
505            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
506                assert_eq!(id.deref(), expect_tx_id);
507            }
508            _ => panic!("unexpected parse result: {:?}", res),
509        }
510
511        builder.finish_current_chunk();
512        assert!(builder.consume_ready_chunks().next().is_none());
513    }
514
515    #[tokio::test]
516    async fn test_parse_schema_change() {
517        let schema = ColumnCatalog::debezium_cdc_source_cols();
518
519        let columns = schema
520            .iter()
521            .map(|c| SourceColumnDesc::from(&c.column_desc))
522            .collect::<Vec<_>>();
523
524        // format plain encode json parser
525        let source_ctx = SourceContext {
526            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
527            ..SourceContext::dummy()
528        };
529        let mut parser = PlainParser::new(
530            SpecificParserConfig::DEFAULT_PLAIN_JSON,
531            columns.clone(),
532            Arc::new(source_ctx),
533        )
534        .await
535        .unwrap();
536        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
537
538        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
539        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
540            "mydb.test".to_owned(),
541            0,
542            cdc_message::CdcMessageType::SchemaChange,
543            SourceType::Mysql,
544        ));
545        let msg_meta = MessageMeta {
546            source_meta: &cdc_meta,
547            split_id: "1001",
548            offset: "",
549        };
550
551        let res = parser
552            .parse_one_with_txn(
553                None,
554                Some(msg.as_bytes().to_vec()),
555                dummy_builder.row_writer().with_meta(msg_meta),
556            )
557            .await;
558
559        let res = res.unwrap();
560        expect![[r#"
561            SchemaChange(
562                SchemaChangeEnvelope {
563                    table_changes: [
564                        TableSchemaChange {
565                            cdc_table_id: "0.mydb.test",
566                            columns: [
567                                ColumnCatalog {
568                                    column_desc: ColumnDesc {
569                                        data_type: Int32,
570                                        column_id: #2147483646,
571                                        name: "id",
572                                        generated_or_default_column: None,
573                                        description: None,
574                                        additional_column: AdditionalColumn {
575                                            column_type: None,
576                                        },
577                                        version: Pr13707,
578                                        system_column: None,
579                                        nullable: true,
580                                    },
581                                    is_hidden: false,
582                                },
583                                ColumnCatalog {
584                                    column_desc: ColumnDesc {
585                                        data_type: Timestamptz,
586                                        column_id: #2147483646,
587                                        name: "v1",
588                                        generated_or_default_column: None,
589                                        description: None,
590                                        additional_column: AdditionalColumn {
591                                            column_type: None,
592                                        },
593                                        version: Pr13707,
594                                        system_column: None,
595                                        nullable: true,
596                                    },
597                                    is_hidden: false,
598                                },
599                                ColumnCatalog {
600                                    column_desc: ColumnDesc {
601                                        data_type: Varchar,
602                                        column_id: #2147483646,
603                                        name: "v2",
604                                        generated_or_default_column: None,
605                                        description: None,
606                                        additional_column: AdditionalColumn {
607                                            column_type: None,
608                                        },
609                                        version: Pr13707,
610                                        system_column: None,
611                                        nullable: true,
612                                    },
613                                    is_hidden: false,
614                                },
615                            ],
616                            change_type: Alter,
617                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
618                        },
619                    ],
620                },
621            )
622        "#]]
623        .assert_debug_eq(&res);
624    }
625}