risingwave_connector/parser/
plain_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
19use super::unified::kv_event::KvEvent;
20use super::{
21    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
22    SpecificParserConfig,
23};
24use crate::error::ConnectorResult;
25use crate::parser::bytes_parser::BytesAccessBuilder;
26use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
27use crate::parser::unified::AccessImpl;
28use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
29use crate::parser::upsert_parser::get_key_column_name;
30use crate::parser::{BytesProperties, ParseResult, ParserFormat};
31use crate::source::cdc::CdcMessageType;
32use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
33
34/// Parser for `FORMAT PLAIN`, i.e., append-only source.
35#[derive(Debug)]
36pub struct PlainParser {
37    pub key_builder: Option<AccessBuilderImpl>,
38    pub payload_builder: AccessBuilderImpl,
39    pub(crate) rw_columns: Vec<SourceColumnDesc>,
40    pub source_ctx: SourceContextRef,
41    // parsing transaction metadata for shared cdc source
42    pub transaction_meta_builder: Option<AccessBuilderImpl>,
43    pub schema_change_builder: Option<AccessBuilderImpl>,
44}
45
46impl PlainParser {
47    pub async fn new(
48        props: SpecificParserConfig,
49        rw_columns: Vec<SourceColumnDesc>,
50        source_ctx: SourceContextRef,
51    ) -> ConnectorResult<Self> {
52        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
53            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
54                EncodingProperties::Bytes(BytesProperties {
55                    column_name: Some(key_column_name),
56                }),
57            )?))
58        } else {
59            None
60        };
61
62        let payload_builder = match props.encoding_config {
63            EncodingProperties::Json(_)
64            | EncodingProperties::Protobuf(_)
65            | EncodingProperties::Avro(_)
66            | EncodingProperties::Bytes(_) => {
67                AccessBuilderImpl::new_default(props.encoding_config).await?
68            }
69            _ => bail!("Unsupported encoding for Plain"),
70        };
71
72        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
73            DebeziumJsonAccessBuilder::new(
74                TimestamptzHandling::GuessNumberUnit,
75                TimestampHandling::GuessNumberUnit,
76                TimeHandling::Micro,
77                false,
78            )?,
79        ));
80
81        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
82            DebeziumJsonAccessBuilder::new_for_schema_event()?,
83        ));
84
85        Ok(Self {
86            key_builder,
87            payload_builder,
88            rw_columns,
89            source_ctx,
90            transaction_meta_builder,
91            schema_change_builder,
92        })
93    }
94
95    pub async fn parse_inner(
96        &mut self,
97        key: Option<Vec<u8>>,
98        payload: Option<Vec<u8>>,
99        writer: SourceStreamChunkRowWriter<'_>,
100    ) -> ConnectorResult<ParseResult> {
101        // plain parser also used in the shared cdc source,
102        // we need to handle transaction metadata and schema change messages here
103        if let Some(msg_meta) = writer.row_meta()
104            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
105            && let Some(data) = payload
106        {
107            match cdc_meta.msg_type {
108                CdcMessageType::Data | CdcMessageType::Heartbeat => {
109                    return self.parse_rows(key, Some(data), writer).await;
110                }
111                CdcMessageType::TransactionMeta => {
112                    let accessor = self
113                        .transaction_meta_builder
114                        .as_mut()
115                        .expect("expect transaction metadata access builder")
116                        .generate_accessor(data, writer.source_meta())
117                        .await?;
118                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
119                    {
120                        Ok(transaction_control) => {
121                            Ok(ParseResult::TransactionControl(transaction_control))
122                        }
123                        Err(err) => Err(err)?,
124                    };
125                }
126                CdcMessageType::SchemaChange => {
127                    let accessor = self
128                        .schema_change_builder
129                        .as_mut()
130                        .expect("expect schema change access builder")
131                        .generate_accessor(data, writer.source_meta())
132                        .await?;
133
134                    return match parse_schema_change(
135                        &accessor,
136                        self.source_ctx.source_id.into(),
137                        &self.source_ctx.source_name,
138                        &self.source_ctx.connector_props,
139                    ) {
140                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
141                        Err(err) => {
142                            // Report CDC auto schema change fail event
143                            let (fail_info, table_name, cdc_table_id) = match &err {
144                                crate::parser::AccessError::CdcAutoSchemaChangeError {
145                                    ty,
146                                    table_name,
147                                    ..
148                                } => {
149                                    // Parse table_name format: "schema"."table" -> schema.table
150                                    let clean_table_name =
151                                        table_name.trim_matches('"').replace("\".\"", ".");
152                                    let fail_info = format!(
153                                        "Unsupported data type '{}' in source '{}' table '{}'",
154                                        ty, self.source_ctx.source_name, clean_table_name
155                                    );
156                                    // Build cdc_table_id: source_name.schema.table_name
157                                    let cdc_table_id = format!(
158                                        "{}.{}",
159                                        self.source_ctx.source_name, clean_table_name
160                                    );
161
162                                    (fail_info, clean_table_name, cdc_table_id)
163                                }
164                                _ => {
165                                    let fail_info = format!(
166                                        "Failed to parse schema change: {:?}, source: {}",
167                                        err.as_report(),
168                                        self.source_ctx.source_name
169                                    );
170                                    (fail_info, "".to_owned(), "".to_owned())
171                                }
172                            };
173                            self.source_ctx.on_cdc_auto_schema_change_failure(
174                                self.source_ctx.source_id.table_id,
175                                table_name,
176                                cdc_table_id,
177                                "".to_owned(), // upstream_ddl is not available in this context
178                                fail_info,
179                            );
180
181                            Err(err)?
182                        }
183                    };
184                }
185                CdcMessageType::Unspecified => {
186                    unreachable!()
187                }
188            }
189        }
190
191        // for non-cdc source messages
192        self.parse_rows(key, payload, writer).await
193    }
194
195    async fn parse_rows(
196        &mut self,
197        key: Option<Vec<u8>>,
198        payload: Option<Vec<u8>>,
199        mut writer: SourceStreamChunkRowWriter<'_>,
200    ) -> ConnectorResult<ParseResult> {
201        let meta = writer.source_meta();
202        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
203
204        if let Some(data) = key
205            && let Some(key_builder) = self.key_builder.as_mut()
206        {
207            // key is optional in format plain
208            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
209        }
210        if let Some(data) = payload {
211            // the data part also can be an empty vec
212            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
213        }
214
215        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
216
217        Ok(ParseResult::Rows)
218    }
219}
220
221impl ByteStreamSourceParser for PlainParser {
222    fn columns(&self) -> &[SourceColumnDesc] {
223        &self.rw_columns
224    }
225
226    fn source_ctx(&self) -> &SourceContext {
227        &self.source_ctx
228    }
229
230    fn parser_format(&self) -> ParserFormat {
231        ParserFormat::Plain
232    }
233
234    async fn parse_one<'a>(
235        &'a mut self,
236        _key: Option<Vec<u8>>,
237        _payload: Option<Vec<u8>>,
238        _writer: SourceStreamChunkRowWriter<'a>,
239    ) -> ConnectorResult<()> {
240        unreachable!("should call `parse_one_with_txn` instead")
241    }
242
243    async fn parse_one_with_txn<'a>(
244        &'a mut self,
245        key: Option<Vec<u8>>,
246        payload: Option<Vec<u8>>,
247        writer: SourceStreamChunkRowWriter<'a>,
248    ) -> ConnectorResult<ParseResult> {
249        self.parse_inner(key, payload, writer).await
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use std::ops::Deref;
256    use std::sync::Arc;
257
258    use expect_test::expect;
259    use futures::StreamExt;
260    use futures::executor::block_on;
261    use futures_async_stream::try_stream;
262    use itertools::Itertools;
263    use risingwave_common::catalog::ColumnCatalog;
264    use risingwave_pb::connector_service::cdc_message;
265
266    use super::*;
267    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
268    use crate::source::cdc::DebeziumCdcMeta;
269    use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
270
271    #[tokio::test]
272    async fn test_emit_transactional_chunk() {
273        let schema = ColumnCatalog::debezium_cdc_source_cols();
274
275        let columns = schema
276            .iter()
277            .map(|c| SourceColumnDesc::from(&c.column_desc))
278            .collect::<Vec<_>>();
279
280        let source_ctx = SourceContext {
281            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
282            ..SourceContext::dummy()
283        };
284        let source_ctx = Arc::new(source_ctx);
285        // format plain encode json parser
286        let parser = PlainParser::new(
287            SpecificParserConfig::DEFAULT_PLAIN_JSON,
288            columns.clone(),
289            source_ctx.clone(),
290        )
291        .await
292        .unwrap();
293
294        let mut transactional = false;
295        // for untransactional source, we expect emit a chunk for each message batch
296        let message_stream = source_message_stream(transactional);
297        let chunk_stream = crate::parser::parse_message_stream(
298            parser,
299            message_stream.boxed(),
300            SourceCtrlOpts::for_test(),
301        );
302        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
303            .into_iter()
304            .collect();
305        let output = output
306            .unwrap()
307            .into_iter()
308            .filter(|c| c.cardinality() > 0)
309            .enumerate()
310            .map(|(i, c)| {
311                if i == 0 {
312                    // begin + 3 data messages
313                    assert_eq!(4, c.cardinality());
314                }
315                if i == 1 {
316                    // 2 data messages + 1 end
317                    assert_eq!(3, c.cardinality());
318                }
319                c
320            })
321            .collect_vec();
322
323        // 2 chunks for 2 message batches
324        assert_eq!(2, output.len());
325
326        // format plain encode json parser
327        let parser = PlainParser::new(
328            SpecificParserConfig::DEFAULT_PLAIN_JSON,
329            columns.clone(),
330            source_ctx,
331        )
332        .await
333        .unwrap();
334
335        // for transactional source, we expect emit a single chunk for the transaction
336        transactional = true;
337        let message_stream = source_message_stream(transactional);
338        let chunk_stream = crate::parser::parse_message_stream(
339            parser,
340            message_stream.boxed(),
341            SourceCtrlOpts::for_test(),
342        );
343        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
344            .into_iter()
345            .collect();
346        let output = output
347            .unwrap()
348            .into_iter()
349            .filter(|c| c.cardinality() > 0)
350            .inspect(|c| {
351                // 5 data messages in a single chunk
352                assert_eq!(5, c.cardinality());
353            })
354            .collect_vec();
355
356        // a single transactional chunk
357        assert_eq!(1, output.len());
358    }
359
360    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
361    async fn source_message_stream(transactional: bool) {
362        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
363        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
364        let data_batches = [
365            vec![
366                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
367                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
368                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
369            ],
370            vec![
371                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
372                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
373            ],
374        ];
375        for (i, batch) in data_batches.iter().enumerate() {
376            let mut source_msg_batch = vec![];
377            if i == 0 {
378                // put begin message at first
379                source_msg_batch.push(SourceMessage {
380                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
381                        "orders".to_owned(),
382                        0,
383                        if transactional {
384                            cdc_message::CdcMessageType::TransactionMeta
385                        } else {
386                            cdc_message::CdcMessageType::Data
387                        },
388                    )),
389                    split_id: SplitId::from("1001"),
390                    offset: "0".into(),
391                    key: None,
392                    payload: Some(begin_msg.as_bytes().to_vec()),
393                });
394            }
395            // put data messages
396            for data_msg in batch {
397                source_msg_batch.push(SourceMessage {
398                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
399                        "orders".to_owned(),
400                        0,
401                        cdc_message::CdcMessageType::Data,
402                    )),
403                    split_id: SplitId::from("1001"),
404                    offset: "0".into(),
405                    key: None,
406                    payload: Some(data_msg.as_bytes().to_vec()),
407                });
408            }
409            if i == data_batches.len() - 1 {
410                // put commit message at last
411                source_msg_batch.push(SourceMessage {
412                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
413                        "orders".to_owned(),
414                        0,
415                        if transactional {
416                            cdc_message::CdcMessageType::TransactionMeta
417                        } else {
418                            cdc_message::CdcMessageType::Data
419                        },
420                    )),
421                    split_id: SplitId::from("1001"),
422                    offset: "0".into(),
423                    key: None,
424                    payload: Some(commit_msg.as_bytes().to_vec()),
425                });
426            }
427            yield source_msg_batch;
428        }
429    }
430
431    #[tokio::test]
432    async fn test_parse_transaction_metadata() {
433        let schema = ColumnCatalog::debezium_cdc_source_cols();
434
435        let columns = schema
436            .iter()
437            .map(|c| SourceColumnDesc::from(&c.column_desc))
438            .collect::<Vec<_>>();
439
440        // format plain encode json parser
441        let source_ctx = SourceContext {
442            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
443            ..SourceContext::dummy()
444        };
445        let mut parser = PlainParser::new(
446            SpecificParserConfig::DEFAULT_PLAIN_JSON,
447            columns.clone(),
448            Arc::new(source_ctx),
449        )
450        .await
451        .unwrap();
452        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
453
454        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
455        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
456        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
457
458        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
459            "orders".to_owned(),
460            0,
461            cdc_message::CdcMessageType::TransactionMeta,
462        ));
463        let msg_meta = MessageMeta {
464            source_meta: &cdc_meta,
465            split_id: "1001",
466            offset: "",
467        };
468
469        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
470        let res = parser
471            .parse_one_with_txn(
472                None,
473                Some(begin_msg.as_bytes().to_vec()),
474                builder.row_writer().with_meta(msg_meta),
475            )
476            .await;
477        match res {
478            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
479                assert_eq!(id.deref(), expect_tx_id);
480            }
481            _ => panic!("unexpected parse result: {:?}", res),
482        }
483        let res = parser
484            .parse_one_with_txn(
485                None,
486                Some(commit_msg.as_bytes().to_vec()),
487                builder.row_writer().with_meta(msg_meta),
488            )
489            .await;
490        match res {
491            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
492                assert_eq!(id.deref(), expect_tx_id);
493            }
494            _ => panic!("unexpected parse result: {:?}", res),
495        }
496
497        builder.finish_current_chunk();
498        assert!(builder.consume_ready_chunks().next().is_none());
499    }
500
501    #[tokio::test]
502    async fn test_parse_schema_change() {
503        let schema = ColumnCatalog::debezium_cdc_source_cols();
504
505        let columns = schema
506            .iter()
507            .map(|c| SourceColumnDesc::from(&c.column_desc))
508            .collect::<Vec<_>>();
509
510        // format plain encode json parser
511        let source_ctx = SourceContext {
512            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
513            ..SourceContext::dummy()
514        };
515        let mut parser = PlainParser::new(
516            SpecificParserConfig::DEFAULT_PLAIN_JSON,
517            columns.clone(),
518            Arc::new(source_ctx),
519        )
520        .await
521        .unwrap();
522        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
523
524        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
525        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
526            "mydb.test".to_owned(),
527            0,
528            cdc_message::CdcMessageType::SchemaChange,
529        ));
530        let msg_meta = MessageMeta {
531            source_meta: &cdc_meta,
532            split_id: "1001",
533            offset: "",
534        };
535
536        let res = parser
537            .parse_one_with_txn(
538                None,
539                Some(msg.as_bytes().to_vec()),
540                dummy_builder.row_writer().with_meta(msg_meta),
541            )
542            .await;
543
544        let res = res.unwrap();
545        expect![[r#"
546            SchemaChange(
547                SchemaChangeEnvelope {
548                    table_changes: [
549                        TableSchemaChange {
550                            cdc_table_id: "0.mydb.test",
551                            columns: [
552                                ColumnCatalog {
553                                    column_desc: ColumnDesc {
554                                        data_type: Int32,
555                                        column_id: #2147483646,
556                                        name: "id",
557                                        generated_or_default_column: None,
558                                        description: None,
559                                        additional_column: AdditionalColumn {
560                                            column_type: None,
561                                        },
562                                        version: Pr13707,
563                                        system_column: None,
564                                        nullable: true,
565                                    },
566                                    is_hidden: false,
567                                },
568                                ColumnCatalog {
569                                    column_desc: ColumnDesc {
570                                        data_type: Timestamptz,
571                                        column_id: #2147483646,
572                                        name: "v1",
573                                        generated_or_default_column: None,
574                                        description: None,
575                                        additional_column: AdditionalColumn {
576                                            column_type: None,
577                                        },
578                                        version: Pr13707,
579                                        system_column: None,
580                                        nullable: true,
581                                    },
582                                    is_hidden: false,
583                                },
584                                ColumnCatalog {
585                                    column_desc: ColumnDesc {
586                                        data_type: Varchar,
587                                        column_id: #2147483646,
588                                        name: "v2",
589                                        generated_or_default_column: None,
590                                        description: None,
591                                        additional_column: AdditionalColumn {
592                                            column_type: None,
593                                        },
594                                        version: Pr13707,
595                                        system_column: None,
596                                        nullable: true,
597                                    },
598                                    is_hidden: false,
599                                },
600                            ],
601                            change_type: Alter,
602                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
603                        },
604                    ],
605                },
606            )
607        "#]]
608        .assert_debug_eq(&res);
609    }
610}