risingwave_connector/parser/
plain_parser.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24    SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36/// Parser for `FORMAT PLAIN`, i.e., append-only source.
37#[derive(Debug)]
38pub struct PlainParser {
39    pub key_builder: Option<AccessBuilderImpl>,
40    pub payload_builder: AccessBuilderImpl,
41    pub(crate) rw_columns: Vec<SourceColumnDesc>,
42    pub source_ctx: SourceContextRef,
43    // parsing transaction metadata for shared cdc source
44    pub transaction_meta_builder: Option<AccessBuilderImpl>,
45    pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49    pub async fn new(
50        props: SpecificParserConfig,
51        rw_columns: Vec<SourceColumnDesc>,
52        source_ctx: SourceContextRef,
53    ) -> ConnectorResult<Self> {
54        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56                EncodingProperties::Bytes(BytesProperties {
57                    column_name: Some(key_column_name),
58                }),
59            )?))
60        } else {
61            None
62        };
63
64        let payload_builder = match props.encoding_config {
65            EncodingProperties::Json(_)
66            | EncodingProperties::Protobuf(_)
67            | EncodingProperties::Avro(_)
68            | EncodingProperties::Bytes(_) => {
69                AccessBuilderImpl::new_default(props.encoding_config).await?
70            }
71            _ => bail!("Unsupported encoding for Plain"),
72        };
73
74        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75            DebeziumJsonAccessBuilder::new(
76                TimestamptzHandling::GuessNumberUnit,
77                TimestampHandling::GuessNumberUnit,
78                TimeHandling::Micro,
79                BigintUnsignedHandlingMode::Long,
80                false,
81            )?,
82        ));
83
84        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85            DebeziumJsonAccessBuilder::new_for_schema_event()?,
86        ));
87
88        Ok(Self {
89            key_builder,
90            payload_builder,
91            rw_columns,
92            source_ctx,
93            transaction_meta_builder,
94            schema_change_builder,
95        })
96    }
97
98    pub async fn parse_inner(
99        &mut self,
100        key: Option<Vec<u8>>,
101        payload: Option<Vec<u8>>,
102        writer: SourceStreamChunkRowWriter<'_>,
103    ) -> ConnectorResult<ParseResult> {
104        // plain parser also used in the shared cdc source,
105        // we need to handle transaction metadata and schema change messages here
106        if let Some(msg_meta) = writer.row_meta()
107            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108            && let Some(data) = payload
109        {
110            match cdc_meta.msg_type {
111                CdcMessageType::Data | CdcMessageType::Heartbeat => {
112                    return self.parse_rows(key, Some(data), writer).await;
113                }
114                CdcMessageType::TransactionMeta => {
115                    let accessor = self
116                        .transaction_meta_builder
117                        .as_mut()
118                        .expect("expect transaction metadata access builder")
119                        .generate_accessor(data, writer.source_meta())
120                        .await?;
121                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122                    {
123                        Ok(transaction_control) => {
124                            Ok(ParseResult::TransactionControl(transaction_control))
125                        }
126                        Err(err) => Err(err)?,
127                    };
128                }
129                CdcMessageType::SchemaChange => {
130                    let accessor = self
131                        .schema_change_builder
132                        .as_mut()
133                        .expect("expect schema change access builder")
134                        .generate_accessor(data, writer.source_meta())
135                        .await?;
136
137                    return match parse_schema_change(
138                        &accessor,
139                        self.source_ctx.source_id,
140                        &self.source_ctx.source_name,
141                        &self.source_ctx.connector_props,
142                    ) {
143                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
144                        Err(err) => {
145                            // Report CDC auto schema change fail event
146                            let (fail_info, table_name, cdc_table_id) = match &err {
147                                crate::parser::AccessError::CdcAutoSchemaChangeError {
148                                    ty,
149                                    table_name,
150                                    ..
151                                } => {
152                                    // Parse table_name format: "schema"."table" -> schema.table
153                                    let clean_table_name =
154                                        table_name.trim_matches('"').replace("\".\"", ".");
155                                    let fail_info = format!(
156                                        "Unsupported data type '{}' in source '{}' table '{}'",
157                                        ty, self.source_ctx.source_name, clean_table_name
158                                    );
159                                    // Build cdc_table_id: source_name.schema.table_name
160                                    let cdc_table_id = format!(
161                                        "{}.{}",
162                                        self.source_ctx.source_name, clean_table_name
163                                    );
164
165                                    (fail_info, clean_table_name, cdc_table_id)
166                                }
167                                _ => {
168                                    let fail_info = format!(
169                                        "Failed to parse schema change: {:?}, source: {}",
170                                        err.as_report(),
171                                        self.source_ctx.source_name
172                                    );
173                                    (fail_info, "".to_owned(), "".to_owned())
174                                }
175                            };
176                            self.source_ctx.on_cdc_auto_schema_change_failure(
177                                self.source_ctx.source_id,
178                                table_name,
179                                cdc_table_id,
180                                "".to_owned(), // upstream_ddl is not available in this context
181                                fail_info,
182                            );
183
184                            Err(err)?
185                        }
186                    };
187                }
188                CdcMessageType::Unspecified => {
189                    unreachable!()
190                }
191            }
192        }
193
194        // for non-cdc source messages
195        self.parse_rows(key, payload, writer).await
196    }
197
198    async fn parse_rows(
199        &mut self,
200        key: Option<Vec<u8>>,
201        payload: Option<Vec<u8>>,
202        mut writer: SourceStreamChunkRowWriter<'_>,
203    ) -> ConnectorResult<ParseResult> {
204        let meta = writer.source_meta();
205        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
206
207        if let Some(data) = key
208            && let Some(key_builder) = self.key_builder.as_mut()
209        {
210            // key is optional in format plain
211            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
212        }
213        if let Some(data) = payload {
214            // the data part also can be an empty vec
215            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
216        }
217
218        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
219
220        Ok(ParseResult::Rows)
221    }
222}
223
224impl ByteStreamSourceParser for PlainParser {
225    fn columns(&self) -> &[SourceColumnDesc] {
226        &self.rw_columns
227    }
228
229    fn source_ctx(&self) -> &SourceContext {
230        &self.source_ctx
231    }
232
233    fn parser_format(&self) -> ParserFormat {
234        ParserFormat::Plain
235    }
236
237    async fn parse_one<'a>(
238        &'a mut self,
239        _key: Option<Vec<u8>>,
240        _payload: Option<Vec<u8>>,
241        _writer: SourceStreamChunkRowWriter<'a>,
242    ) -> ConnectorResult<()> {
243        unreachable!("should call `parse_one_with_txn` instead")
244    }
245
246    async fn parse_one_with_txn<'a>(
247        &'a mut self,
248        key: Option<Vec<u8>>,
249        payload: Option<Vec<u8>>,
250        writer: SourceStreamChunkRowWriter<'a>,
251    ) -> ConnectorResult<ParseResult> {
252        self.parse_inner(key, payload, writer).await
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use std::ops::Deref;
259    use std::sync::Arc;
260
261    use expect_test::expect;
262    use futures::StreamExt;
263    use futures::executor::block_on;
264    use futures_async_stream::try_stream;
265    use itertools::Itertools;
266    use risingwave_common::catalog::ColumnCatalog;
267    use risingwave_pb::connector_service::{SourceType, cdc_message};
268
269    use super::*;
270    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
271    use crate::source::cdc::DebeziumCdcMeta;
272    use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
273
274    #[tokio::test]
275    async fn test_emit_transactional_chunk() {
276        let schema = ColumnCatalog::debezium_cdc_source_cols();
277
278        let columns = schema
279            .iter()
280            .map(|c| SourceColumnDesc::from(&c.column_desc))
281            .collect::<Vec<_>>();
282
283        let source_ctx = SourceContext {
284            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
285            ..SourceContext::dummy()
286        };
287        let source_ctx = Arc::new(source_ctx);
288        // format plain encode json parser
289        let parser = PlainParser::new(
290            SpecificParserConfig::DEFAULT_PLAIN_JSON,
291            columns.clone(),
292            source_ctx.clone(),
293        )
294        .await
295        .unwrap();
296
297        let mut transactional = false;
298        // for untransactional source, we expect emit a chunk for each message batch
299        let message_stream = source_message_stream(transactional);
300        let chunk_stream = crate::parser::parse_message_stream(
301            parser,
302            message_stream.boxed(),
303            SourceCtrlOpts::for_test(),
304        );
305        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
306            .into_iter()
307            .collect();
308        let output = output
309            .unwrap()
310            .into_iter()
311            .filter(|c| c.cardinality() > 0)
312            .enumerate()
313            .map(|(i, c)| {
314                if i == 0 {
315                    // begin + 3 data messages
316                    assert_eq!(4, c.cardinality());
317                }
318                if i == 1 {
319                    // 2 data messages + 1 end
320                    assert_eq!(3, c.cardinality());
321                }
322                c
323            })
324            .collect_vec();
325
326        // 2 chunks for 2 message batches
327        assert_eq!(2, output.len());
328
329        // format plain encode json parser
330        let parser = PlainParser::new(
331            SpecificParserConfig::DEFAULT_PLAIN_JSON,
332            columns.clone(),
333            source_ctx,
334        )
335        .await
336        .unwrap();
337
338        // for transactional source, we expect emit a single chunk for the transaction
339        transactional = true;
340        let message_stream = source_message_stream(transactional);
341        let chunk_stream = crate::parser::parse_message_stream(
342            parser,
343            message_stream.boxed(),
344            SourceCtrlOpts::for_test(),
345        );
346        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
347            .into_iter()
348            .collect();
349        let output = output
350            .unwrap()
351            .into_iter()
352            .filter(|c| c.cardinality() > 0)
353            .inspect(|c| {
354                // 5 data messages in a single chunk
355                assert_eq!(5, c.cardinality());
356            })
357            .collect_vec();
358
359        // a single transactional chunk
360        assert_eq!(1, output.len());
361    }
362
363    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
364    async fn source_message_stream(transactional: bool) {
365        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
366        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
367        let data_batches = [
368            vec![
369                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
370                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
371                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
372            ],
373            vec![
374                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
375                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
376            ],
377        ];
378        for (i, batch) in data_batches.iter().enumerate() {
379            let mut source_msg_batch = vec![];
380            if i == 0 {
381                // put begin message at first
382                source_msg_batch.push(SourceMessage {
383                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
384                        "orders".to_owned(),
385                        0,
386                        if transactional {
387                            cdc_message::CdcMessageType::TransactionMeta
388                        } else {
389                            cdc_message::CdcMessageType::Data
390                        },
391                        SourceType::Unspecified,
392                    )),
393                    split_id: SplitId::from("1001"),
394                    offset: "0".into(),
395                    key: None,
396                    payload: Some(begin_msg.as_bytes().to_vec()),
397                });
398            }
399            // put data messages
400            for data_msg in batch {
401                source_msg_batch.push(SourceMessage {
402                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
403                        "orders".to_owned(),
404                        0,
405                        cdc_message::CdcMessageType::Data,
406                        SourceType::Unspecified,
407                    )),
408                    split_id: SplitId::from("1001"),
409                    offset: "0".into(),
410                    key: None,
411                    payload: Some(data_msg.as_bytes().to_vec()),
412                });
413            }
414            if i == data_batches.len() - 1 {
415                // put commit message at last
416                source_msg_batch.push(SourceMessage {
417                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
418                        "orders".to_owned(),
419                        0,
420                        if transactional {
421                            cdc_message::CdcMessageType::TransactionMeta
422                        } else {
423                            cdc_message::CdcMessageType::Data
424                        },
425                        SourceType::Unspecified,
426                    )),
427                    split_id: SplitId::from("1001"),
428                    offset: "0".into(),
429                    key: None,
430                    payload: Some(commit_msg.as_bytes().to_vec()),
431                });
432            }
433            yield source_msg_batch;
434        }
435    }
436
437    #[tokio::test]
438    async fn test_parse_transaction_metadata() {
439        let schema = ColumnCatalog::debezium_cdc_source_cols();
440
441        let columns = schema
442            .iter()
443            .map(|c| SourceColumnDesc::from(&c.column_desc))
444            .collect::<Vec<_>>();
445
446        // format plain encode json parser
447        let source_ctx = SourceContext {
448            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
449            ..SourceContext::dummy()
450        };
451        let mut parser = PlainParser::new(
452            SpecificParserConfig::DEFAULT_PLAIN_JSON,
453            columns.clone(),
454            Arc::new(source_ctx),
455        )
456        .await
457        .unwrap();
458        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
459
460        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
461        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
462        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
463
464        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
465            "orders".to_owned(),
466            0,
467            cdc_message::CdcMessageType::TransactionMeta,
468            SourceType::Unspecified,
469        ));
470        let msg_meta = MessageMeta {
471            source_meta: &cdc_meta,
472            split_id: "1001",
473            offset: "",
474        };
475
476        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
477        let res = parser
478            .parse_one_with_txn(
479                None,
480                Some(begin_msg.as_bytes().to_vec()),
481                builder.row_writer().with_meta(msg_meta),
482            )
483            .await;
484        match res {
485            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
486                assert_eq!(id.deref(), expect_tx_id);
487            }
488            _ => panic!("unexpected parse result: {:?}", res),
489        }
490        let res = parser
491            .parse_one_with_txn(
492                None,
493                Some(commit_msg.as_bytes().to_vec()),
494                builder.row_writer().with_meta(msg_meta),
495            )
496            .await;
497        match res {
498            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
499                assert_eq!(id.deref(), expect_tx_id);
500            }
501            _ => panic!("unexpected parse result: {:?}", res),
502        }
503
504        builder.finish_current_chunk();
505        assert!(builder.consume_ready_chunks().next().is_none());
506    }
507
508    #[tokio::test]
509    async fn test_parse_schema_change() {
510        let schema = ColumnCatalog::debezium_cdc_source_cols();
511
512        let columns = schema
513            .iter()
514            .map(|c| SourceColumnDesc::from(&c.column_desc))
515            .collect::<Vec<_>>();
516
517        // format plain encode json parser
518        let source_ctx = SourceContext {
519            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
520            ..SourceContext::dummy()
521        };
522        let mut parser = PlainParser::new(
523            SpecificParserConfig::DEFAULT_PLAIN_JSON,
524            columns.clone(),
525            Arc::new(source_ctx),
526        )
527        .await
528        .unwrap();
529        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
530
531        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
532        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
533            "mydb.test".to_owned(),
534            0,
535            cdc_message::CdcMessageType::SchemaChange,
536            SourceType::Mysql,
537        ));
538        let msg_meta = MessageMeta {
539            source_meta: &cdc_meta,
540            split_id: "1001",
541            offset: "",
542        };
543
544        let res = parser
545            .parse_one_with_txn(
546                None,
547                Some(msg.as_bytes().to_vec()),
548                dummy_builder.row_writer().with_meta(msg_meta),
549            )
550            .await;
551
552        let res = res.unwrap();
553        expect![[r#"
554            SchemaChange(
555                SchemaChangeEnvelope {
556                    table_changes: [
557                        TableSchemaChange {
558                            cdc_table_id: "0.mydb.test",
559                            columns: [
560                                ColumnCatalog {
561                                    column_desc: ColumnDesc {
562                                        data_type: Int32,
563                                        column_id: #2147483646,
564                                        name: "id",
565                                        generated_or_default_column: None,
566                                        description: None,
567                                        additional_column: AdditionalColumn {
568                                            column_type: None,
569                                        },
570                                        version: Pr13707,
571                                        system_column: None,
572                                        nullable: true,
573                                    },
574                                    is_hidden: false,
575                                },
576                                ColumnCatalog {
577                                    column_desc: ColumnDesc {
578                                        data_type: Timestamptz,
579                                        column_id: #2147483646,
580                                        name: "v1",
581                                        generated_or_default_column: None,
582                                        description: None,
583                                        additional_column: AdditionalColumn {
584                                            column_type: None,
585                                        },
586                                        version: Pr13707,
587                                        system_column: None,
588                                        nullable: true,
589                                    },
590                                    is_hidden: false,
591                                },
592                                ColumnCatalog {
593                                    column_desc: ColumnDesc {
594                                        data_type: Varchar,
595                                        column_id: #2147483646,
596                                        name: "v2",
597                                        generated_or_default_column: None,
598                                        description: None,
599                                        additional_column: AdditionalColumn {
600                                            column_type: None,
601                                        },
602                                        version: Pr13707,
603                                        system_column: None,
604                                        nullable: true,
605                                    },
606                                    is_hidden: false,
607                                },
608                            ],
609                            change_type: Alter,
610                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
611                        },
612                    ],
613                },
614            )
615        "#]]
616        .assert_debug_eq(&res);
617    }
618}