risingwave_connector/parser/
plain_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24    SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36/// Parser for `FORMAT PLAIN`, i.e., append-only source.
37#[derive(Debug)]
38pub struct PlainParser {
39    pub key_builder: Option<AccessBuilderImpl>,
40    pub payload_builder: AccessBuilderImpl,
41    pub(crate) rw_columns: Vec<SourceColumnDesc>,
42    pub source_ctx: SourceContextRef,
43    // parsing transaction metadata for shared cdc source
44    pub transaction_meta_builder: Option<AccessBuilderImpl>,
45    pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49    pub async fn new(
50        props: SpecificParserConfig,
51        rw_columns: Vec<SourceColumnDesc>,
52        source_ctx: SourceContextRef,
53    ) -> ConnectorResult<Self> {
54        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56                EncodingProperties::Bytes(BytesProperties {
57                    column_name: Some(key_column_name),
58                }),
59            )?))
60        } else {
61            None
62        };
63
64        let payload_builder = match props.encoding_config {
65            EncodingProperties::Json(_)
66            | EncodingProperties::Protobuf(_)
67            | EncodingProperties::Avro(_)
68            | EncodingProperties::Bytes(_) => {
69                AccessBuilderImpl::new_default(props.encoding_config).await?
70            }
71            _ => bail!("Unsupported encoding for Plain"),
72        };
73
74        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75            DebeziumJsonAccessBuilder::new(
76                TimestamptzHandling::GuessNumberUnit,
77                TimestampHandling::GuessNumberUnit,
78                TimeHandling::Micro,
79                BigintUnsignedHandlingMode::Long,
80                false,
81            )?,
82        ));
83
84        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85            DebeziumJsonAccessBuilder::new_for_schema_event()?,
86        ));
87
88        Ok(Self {
89            key_builder,
90            payload_builder,
91            rw_columns,
92            source_ctx,
93            transaction_meta_builder,
94            schema_change_builder,
95        })
96    }
97
98    pub async fn parse_inner(
99        &mut self,
100        key: Option<Vec<u8>>,
101        payload: Option<Vec<u8>>,
102        writer: SourceStreamChunkRowWriter<'_>,
103    ) -> ConnectorResult<ParseResult> {
104        // plain parser also used in the shared cdc source,
105        // we need to handle transaction metadata and schema change messages here
106        if let Some(msg_meta) = writer.row_meta()
107            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108            && let Some(data) = payload
109        {
110            match cdc_meta.msg_type {
111                CdcMessageType::Data | CdcMessageType::Heartbeat => {
112                    return self.parse_rows(key, Some(data), writer).await;
113                }
114                CdcMessageType::TransactionMeta => {
115                    let accessor = self
116                        .transaction_meta_builder
117                        .as_mut()
118                        .expect("expect transaction metadata access builder")
119                        .generate_accessor(data, writer.source_meta())
120                        .await?;
121                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122                    {
123                        Ok(transaction_control) => {
124                            Ok(ParseResult::TransactionControl(transaction_control))
125                        }
126                        Err(err) => Err(err)?,
127                    };
128                }
129                CdcMessageType::SchemaChange => {
130                    let accessor = self
131                        .schema_change_builder
132                        .as_mut()
133                        .expect("expect schema change access builder")
134                        .generate_accessor(data, writer.source_meta())
135                        .await?;
136
137                    return match parse_schema_change(
138                        &accessor,
139                        self.source_ctx.source_id.into(),
140                        &self.source_ctx.source_name,
141                        &self.source_ctx.connector_props,
142                    ) {
143                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
144                        Err(err) => {
145                            // Report CDC auto schema change fail event
146                            let (fail_info, table_name, cdc_table_id) = match &err {
147                                crate::parser::AccessError::CdcAutoSchemaChangeError {
148                                    ty,
149                                    table_name,
150                                    ..
151                                } => {
152                                    // Parse table_name format: "schema"."table" -> schema.table
153                                    let clean_table_name =
154                                        table_name.trim_matches('"').replace("\".\"", ".");
155                                    let fail_info = format!(
156                                        "Unsupported data type '{}' in source '{}' table '{}'",
157                                        ty, self.source_ctx.source_name, clean_table_name
158                                    );
159                                    // Build cdc_table_id: source_name.schema.table_name
160                                    let cdc_table_id = format!(
161                                        "{}.{}",
162                                        self.source_ctx.source_name, clean_table_name
163                                    );
164
165                                    (fail_info, clean_table_name, cdc_table_id)
166                                }
167                                _ => {
168                                    let fail_info = format!(
169                                        "Failed to parse schema change: {:?}, source: {}",
170                                        err.as_report(),
171                                        self.source_ctx.source_name
172                                    );
173                                    (fail_info, "".to_owned(), "".to_owned())
174                                }
175                            };
176                            self.source_ctx.on_cdc_auto_schema_change_failure(
177                                self.source_ctx.source_id.as_raw_id(),
178                                table_name,
179                                cdc_table_id,
180                                "".to_owned(), // upstream_ddl is not available in this context
181                                fail_info,
182                            );
183
184                            Err(err)?
185                        }
186                    };
187                }
188                CdcMessageType::Unspecified => {
189                    unreachable!()
190                }
191            }
192        }
193
194        // for non-cdc source messages
195        self.parse_rows(key, payload, writer).await
196    }
197
198    async fn parse_rows(
199        &mut self,
200        key: Option<Vec<u8>>,
201        payload: Option<Vec<u8>>,
202        mut writer: SourceStreamChunkRowWriter<'_>,
203    ) -> ConnectorResult<ParseResult> {
204        let meta = writer.source_meta();
205        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
206
207        if let Some(data) = key
208            && let Some(key_builder) = self.key_builder.as_mut()
209        {
210            // key is optional in format plain
211            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
212        }
213        if let Some(data) = payload {
214            // the data part also can be an empty vec
215            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
216        }
217
218        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
219
220        Ok(ParseResult::Rows)
221    }
222}
223
224impl ByteStreamSourceParser for PlainParser {
225    fn columns(&self) -> &[SourceColumnDesc] {
226        &self.rw_columns
227    }
228
229    fn source_ctx(&self) -> &SourceContext {
230        &self.source_ctx
231    }
232
233    fn parser_format(&self) -> ParserFormat {
234        ParserFormat::Plain
235    }
236
237    async fn parse_one<'a>(
238        &'a mut self,
239        _key: Option<Vec<u8>>,
240        _payload: Option<Vec<u8>>,
241        _writer: SourceStreamChunkRowWriter<'a>,
242    ) -> ConnectorResult<()> {
243        unreachable!("should call `parse_one_with_txn` instead")
244    }
245
246    async fn parse_one_with_txn<'a>(
247        &'a mut self,
248        key: Option<Vec<u8>>,
249        payload: Option<Vec<u8>>,
250        writer: SourceStreamChunkRowWriter<'a>,
251    ) -> ConnectorResult<ParseResult> {
252        self.parse_inner(key, payload, writer).await
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use std::ops::Deref;
259    use std::sync::Arc;
260
261    use expect_test::expect;
262    use futures::StreamExt;
263    use futures::executor::block_on;
264    use futures_async_stream::try_stream;
265    use itertools::Itertools;
266    use risingwave_common::catalog::ColumnCatalog;
267    use risingwave_pb::connector_service::cdc_message;
268
269    use super::*;
270    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
271    use crate::source::cdc::DebeziumCdcMeta;
272    use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
273
274    #[tokio::test]
275    async fn test_emit_transactional_chunk() {
276        let schema = ColumnCatalog::debezium_cdc_source_cols();
277
278        let columns = schema
279            .iter()
280            .map(|c| SourceColumnDesc::from(&c.column_desc))
281            .collect::<Vec<_>>();
282
283        let source_ctx = SourceContext {
284            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
285            ..SourceContext::dummy()
286        };
287        let source_ctx = Arc::new(source_ctx);
288        // format plain encode json parser
289        let parser = PlainParser::new(
290            SpecificParserConfig::DEFAULT_PLAIN_JSON,
291            columns.clone(),
292            source_ctx.clone(),
293        )
294        .await
295        .unwrap();
296
297        let mut transactional = false;
298        // for untransactional source, we expect emit a chunk for each message batch
299        let message_stream = source_message_stream(transactional);
300        let chunk_stream = crate::parser::parse_message_stream(
301            parser,
302            message_stream.boxed(),
303            SourceCtrlOpts::for_test(),
304        );
305        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
306            .into_iter()
307            .collect();
308        let output = output
309            .unwrap()
310            .into_iter()
311            .filter(|c| c.cardinality() > 0)
312            .enumerate()
313            .map(|(i, c)| {
314                if i == 0 {
315                    // begin + 3 data messages
316                    assert_eq!(4, c.cardinality());
317                }
318                if i == 1 {
319                    // 2 data messages + 1 end
320                    assert_eq!(3, c.cardinality());
321                }
322                c
323            })
324            .collect_vec();
325
326        // 2 chunks for 2 message batches
327        assert_eq!(2, output.len());
328
329        // format plain encode json parser
330        let parser = PlainParser::new(
331            SpecificParserConfig::DEFAULT_PLAIN_JSON,
332            columns.clone(),
333            source_ctx,
334        )
335        .await
336        .unwrap();
337
338        // for transactional source, we expect emit a single chunk for the transaction
339        transactional = true;
340        let message_stream = source_message_stream(transactional);
341        let chunk_stream = crate::parser::parse_message_stream(
342            parser,
343            message_stream.boxed(),
344            SourceCtrlOpts::for_test(),
345        );
346        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
347            .into_iter()
348            .collect();
349        let output = output
350            .unwrap()
351            .into_iter()
352            .filter(|c| c.cardinality() > 0)
353            .inspect(|c| {
354                // 5 data messages in a single chunk
355                assert_eq!(5, c.cardinality());
356            })
357            .collect_vec();
358
359        // a single transactional chunk
360        assert_eq!(1, output.len());
361    }
362
363    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
364    async fn source_message_stream(transactional: bool) {
365        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
366        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
367        let data_batches = [
368            vec![
369                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
370                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
371                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
372            ],
373            vec![
374                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
375                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
376            ],
377        ];
378        for (i, batch) in data_batches.iter().enumerate() {
379            let mut source_msg_batch = vec![];
380            if i == 0 {
381                // put begin message at first
382                source_msg_batch.push(SourceMessage {
383                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
384                        "orders".to_owned(),
385                        0,
386                        if transactional {
387                            cdc_message::CdcMessageType::TransactionMeta
388                        } else {
389                            cdc_message::CdcMessageType::Data
390                        },
391                    )),
392                    split_id: SplitId::from("1001"),
393                    offset: "0".into(),
394                    key: None,
395                    payload: Some(begin_msg.as_bytes().to_vec()),
396                });
397            }
398            // put data messages
399            for data_msg in batch {
400                source_msg_batch.push(SourceMessage {
401                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
402                        "orders".to_owned(),
403                        0,
404                        cdc_message::CdcMessageType::Data,
405                    )),
406                    split_id: SplitId::from("1001"),
407                    offset: "0".into(),
408                    key: None,
409                    payload: Some(data_msg.as_bytes().to_vec()),
410                });
411            }
412            if i == data_batches.len() - 1 {
413                // put commit message at last
414                source_msg_batch.push(SourceMessage {
415                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
416                        "orders".to_owned(),
417                        0,
418                        if transactional {
419                            cdc_message::CdcMessageType::TransactionMeta
420                        } else {
421                            cdc_message::CdcMessageType::Data
422                        },
423                    )),
424                    split_id: SplitId::from("1001"),
425                    offset: "0".into(),
426                    key: None,
427                    payload: Some(commit_msg.as_bytes().to_vec()),
428                });
429            }
430            yield source_msg_batch;
431        }
432    }
433
434    #[tokio::test]
435    async fn test_parse_transaction_metadata() {
436        let schema = ColumnCatalog::debezium_cdc_source_cols();
437
438        let columns = schema
439            .iter()
440            .map(|c| SourceColumnDesc::from(&c.column_desc))
441            .collect::<Vec<_>>();
442
443        // format plain encode json parser
444        let source_ctx = SourceContext {
445            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
446            ..SourceContext::dummy()
447        };
448        let mut parser = PlainParser::new(
449            SpecificParserConfig::DEFAULT_PLAIN_JSON,
450            columns.clone(),
451            Arc::new(source_ctx),
452        )
453        .await
454        .unwrap();
455        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
456
457        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
458        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
459        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
460
461        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
462            "orders".to_owned(),
463            0,
464            cdc_message::CdcMessageType::TransactionMeta,
465        ));
466        let msg_meta = MessageMeta {
467            source_meta: &cdc_meta,
468            split_id: "1001",
469            offset: "",
470        };
471
472        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
473        let res = parser
474            .parse_one_with_txn(
475                None,
476                Some(begin_msg.as_bytes().to_vec()),
477                builder.row_writer().with_meta(msg_meta),
478            )
479            .await;
480        match res {
481            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
482                assert_eq!(id.deref(), expect_tx_id);
483            }
484            _ => panic!("unexpected parse result: {:?}", res),
485        }
486        let res = parser
487            .parse_one_with_txn(
488                None,
489                Some(commit_msg.as_bytes().to_vec()),
490                builder.row_writer().with_meta(msg_meta),
491            )
492            .await;
493        match res {
494            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
495                assert_eq!(id.deref(), expect_tx_id);
496            }
497            _ => panic!("unexpected parse result: {:?}", res),
498        }
499
500        builder.finish_current_chunk();
501        assert!(builder.consume_ready_chunks().next().is_none());
502    }
503
504    #[tokio::test]
505    async fn test_parse_schema_change() {
506        let schema = ColumnCatalog::debezium_cdc_source_cols();
507
508        let columns = schema
509            .iter()
510            .map(|c| SourceColumnDesc::from(&c.column_desc))
511            .collect::<Vec<_>>();
512
513        // format plain encode json parser
514        let source_ctx = SourceContext {
515            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
516            ..SourceContext::dummy()
517        };
518        let mut parser = PlainParser::new(
519            SpecificParserConfig::DEFAULT_PLAIN_JSON,
520            columns.clone(),
521            Arc::new(source_ctx),
522        )
523        .await
524        .unwrap();
525        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
526
527        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
528        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
529            "mydb.test".to_owned(),
530            0,
531            cdc_message::CdcMessageType::SchemaChange,
532        ));
533        let msg_meta = MessageMeta {
534            source_meta: &cdc_meta,
535            split_id: "1001",
536            offset: "",
537        };
538
539        let res = parser
540            .parse_one_with_txn(
541                None,
542                Some(msg.as_bytes().to_vec()),
543                dummy_builder.row_writer().with_meta(msg_meta),
544            )
545            .await;
546
547        let res = res.unwrap();
548        expect![[r#"
549            SchemaChange(
550                SchemaChangeEnvelope {
551                    table_changes: [
552                        TableSchemaChange {
553                            cdc_table_id: "0.mydb.test",
554                            columns: [
555                                ColumnCatalog {
556                                    column_desc: ColumnDesc {
557                                        data_type: Int32,
558                                        column_id: #2147483646,
559                                        name: "id",
560                                        generated_or_default_column: None,
561                                        description: None,
562                                        additional_column: AdditionalColumn {
563                                            column_type: None,
564                                        },
565                                        version: Pr13707,
566                                        system_column: None,
567                                        nullable: true,
568                                    },
569                                    is_hidden: false,
570                                },
571                                ColumnCatalog {
572                                    column_desc: ColumnDesc {
573                                        data_type: Timestamptz,
574                                        column_id: #2147483646,
575                                        name: "v1",
576                                        generated_or_default_column: None,
577                                        description: None,
578                                        additional_column: AdditionalColumn {
579                                            column_type: None,
580                                        },
581                                        version: Pr13707,
582                                        system_column: None,
583                                        nullable: true,
584                                    },
585                                    is_hidden: false,
586                                },
587                                ColumnCatalog {
588                                    column_desc: ColumnDesc {
589                                        data_type: Varchar,
590                                        column_id: #2147483646,
591                                        name: "v2",
592                                        generated_or_default_column: None,
593                                        description: None,
594                                        additional_column: AdditionalColumn {
595                                            column_type: None,
596                                        },
597                                        version: Pr13707,
598                                        system_column: None,
599                                        nullable: true,
600                                    },
601                                    is_hidden: false,
602                                },
603                            ],
604                            change_type: Alter,
605                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
606                        },
607                    ],
608                },
609            )
610        "#]]
611        .assert_debug_eq(&res);
612    }
613}