risingwave_connector/parser/
plain_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16
17use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
18use super::unified::kv_event::KvEvent;
19use super::{
20    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
21    SpecificParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::parser::bytes_parser::BytesAccessBuilder;
25use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
26use crate::parser::unified::AccessImpl;
27use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
28use crate::parser::upsert_parser::get_key_column_name;
29use crate::parser::{BytesProperties, ParseResult, ParserFormat};
30use crate::source::cdc::CdcMessageType;
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
32
33/// Parser for `FORMAT PLAIN`, i.e., append-only source.
34#[derive(Debug)]
35pub struct PlainParser {
36    pub key_builder: Option<AccessBuilderImpl>,
37    pub payload_builder: AccessBuilderImpl,
38    pub(crate) rw_columns: Vec<SourceColumnDesc>,
39    pub source_ctx: SourceContextRef,
40    // parsing transaction metadata for shared cdc source
41    pub transaction_meta_builder: Option<AccessBuilderImpl>,
42    pub schema_change_builder: Option<AccessBuilderImpl>,
43}
44
45impl PlainParser {
46    pub async fn new(
47        props: SpecificParserConfig,
48        rw_columns: Vec<SourceColumnDesc>,
49        source_ctx: SourceContextRef,
50    ) -> ConnectorResult<Self> {
51        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
52            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
53                EncodingProperties::Bytes(BytesProperties {
54                    column_name: Some(key_column_name),
55                }),
56            )?))
57        } else {
58            None
59        };
60
61        let payload_builder = match props.encoding_config {
62            EncodingProperties::Json(_)
63            | EncodingProperties::Protobuf(_)
64            | EncodingProperties::Avro(_)
65            | EncodingProperties::Bytes(_) => {
66                AccessBuilderImpl::new_default(props.encoding_config).await?
67            }
68            _ => bail!("Unsupported encoding for Plain"),
69        };
70
71        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
72            DebeziumJsonAccessBuilder::new(
73                TimestamptzHandling::GuessNumberUnit,
74                TimestampHandling::GuessNumberUnit,
75                TimeHandling::Micro,
76            )?,
77        ));
78
79        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
80            DebeziumJsonAccessBuilder::new_for_schema_event()?,
81        ));
82
83        Ok(Self {
84            key_builder,
85            payload_builder,
86            rw_columns,
87            source_ctx,
88            transaction_meta_builder,
89            schema_change_builder,
90        })
91    }
92
93    pub async fn parse_inner(
94        &mut self,
95        key: Option<Vec<u8>>,
96        payload: Option<Vec<u8>>,
97        writer: SourceStreamChunkRowWriter<'_>,
98    ) -> ConnectorResult<ParseResult> {
99        // plain parser also used in the shared cdc source,
100        // we need to handle transaction metadata and schema change messages here
101        if let Some(msg_meta) = writer.row_meta()
102            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
103            && let Some(data) = payload
104        {
105            match cdc_meta.msg_type {
106                CdcMessageType::Data | CdcMessageType::Heartbeat => {
107                    return self.parse_rows(key, Some(data), writer).await;
108                }
109                CdcMessageType::TransactionMeta => {
110                    let accessor = self
111                        .transaction_meta_builder
112                        .as_mut()
113                        .expect("expect transaction metadata access builder")
114                        .generate_accessor(data, writer.source_meta())
115                        .await?;
116                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
117                    {
118                        Ok(transaction_control) => {
119                            Ok(ParseResult::TransactionControl(transaction_control))
120                        }
121                        Err(err) => Err(err)?,
122                    };
123                }
124                CdcMessageType::SchemaChange => {
125                    let accessor = self
126                        .schema_change_builder
127                        .as_mut()
128                        .expect("expect schema change access builder")
129                        .generate_accessor(data, writer.source_meta())
130                        .await?;
131
132                    return match parse_schema_change(
133                        &accessor,
134                        self.source_ctx.source_id.into(),
135                        &self.source_ctx.connector_props,
136                    ) {
137                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
138                        Err(err) => Err(err)?,
139                    };
140                }
141                CdcMessageType::Unspecified => {
142                    unreachable!()
143                }
144            }
145        }
146
147        // for non-cdc source messages
148        self.parse_rows(key, payload, writer).await
149    }
150
151    async fn parse_rows(
152        &mut self,
153        key: Option<Vec<u8>>,
154        payload: Option<Vec<u8>>,
155        mut writer: SourceStreamChunkRowWriter<'_>,
156    ) -> ConnectorResult<ParseResult> {
157        let meta = writer.source_meta();
158        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
159
160        if let Some(data) = key
161            && let Some(key_builder) = self.key_builder.as_mut()
162        {
163            // key is optional in format plain
164            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
165        }
166        if let Some(data) = payload {
167            // the data part also can be an empty vec
168            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
169        }
170
171        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
172
173        Ok(ParseResult::Rows)
174    }
175}
176
177impl ByteStreamSourceParser for PlainParser {
178    fn columns(&self) -> &[SourceColumnDesc] {
179        &self.rw_columns
180    }
181
182    fn source_ctx(&self) -> &SourceContext {
183        &self.source_ctx
184    }
185
186    fn parser_format(&self) -> ParserFormat {
187        ParserFormat::Plain
188    }
189
190    async fn parse_one<'a>(
191        &'a mut self,
192        _key: Option<Vec<u8>>,
193        _payload: Option<Vec<u8>>,
194        _writer: SourceStreamChunkRowWriter<'a>,
195    ) -> ConnectorResult<()> {
196        unreachable!("should call `parse_one_with_txn` instead")
197    }
198
199    async fn parse_one_with_txn<'a>(
200        &'a mut self,
201        key: Option<Vec<u8>>,
202        payload: Option<Vec<u8>>,
203        writer: SourceStreamChunkRowWriter<'a>,
204    ) -> ConnectorResult<ParseResult> {
205        self.parse_inner(key, payload, writer).await
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use std::ops::Deref;
212    use std::sync::Arc;
213
214    use expect_test::expect;
215    use futures::StreamExt;
216    use futures::executor::block_on;
217    use futures_async_stream::try_stream;
218    use itertools::Itertools;
219    use risingwave_common::catalog::ColumnCatalog;
220    use risingwave_pb::connector_service::cdc_message;
221
222    use super::*;
223    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
224    use crate::source::cdc::DebeziumCdcMeta;
225    use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
226
227    #[tokio::test]
228    async fn test_emit_transactional_chunk() {
229        let schema = ColumnCatalog::debezium_cdc_source_cols();
230
231        let columns = schema
232            .iter()
233            .map(|c| SourceColumnDesc::from(&c.column_desc))
234            .collect::<Vec<_>>();
235
236        let source_ctx = SourceContext {
237            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
238            ..SourceContext::dummy()
239        };
240        let source_ctx = Arc::new(source_ctx);
241        // format plain encode json parser
242        let parser = PlainParser::new(
243            SpecificParserConfig::DEFAULT_PLAIN_JSON,
244            columns.clone(),
245            source_ctx.clone(),
246        )
247        .await
248        .unwrap();
249
250        let mut transactional = false;
251        // for untransactional source, we expect emit a chunk for each message batch
252        let message_stream = source_message_stream(transactional);
253        let chunk_stream = crate::parser::parse_message_stream(
254            parser,
255            message_stream.boxed(),
256            SourceCtrlOpts::for_test(),
257        );
258        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
259            .into_iter()
260            .collect();
261        let output = output
262            .unwrap()
263            .into_iter()
264            .filter(|c| c.cardinality() > 0)
265            .enumerate()
266            .map(|(i, c)| {
267                if i == 0 {
268                    // begin + 3 data messages
269                    assert_eq!(4, c.cardinality());
270                }
271                if i == 1 {
272                    // 2 data messages + 1 end
273                    assert_eq!(3, c.cardinality());
274                }
275                c
276            })
277            .collect_vec();
278
279        // 2 chunks for 2 message batches
280        assert_eq!(2, output.len());
281
282        // format plain encode json parser
283        let parser = PlainParser::new(
284            SpecificParserConfig::DEFAULT_PLAIN_JSON,
285            columns.clone(),
286            source_ctx,
287        )
288        .await
289        .unwrap();
290
291        // for transactional source, we expect emit a single chunk for the transaction
292        transactional = true;
293        let message_stream = source_message_stream(transactional);
294        let chunk_stream = crate::parser::parse_message_stream(
295            parser,
296            message_stream.boxed(),
297            SourceCtrlOpts::for_test(),
298        );
299        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
300            .into_iter()
301            .collect();
302        let output = output
303            .unwrap()
304            .into_iter()
305            .filter(|c| c.cardinality() > 0)
306            .inspect(|c| {
307                // 5 data messages in a single chunk
308                assert_eq!(5, c.cardinality());
309            })
310            .collect_vec();
311
312        // a single transactional chunk
313        assert_eq!(1, output.len());
314    }
315
316    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
317    async fn source_message_stream(transactional: bool) {
318        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
319        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
320        let data_batches = [
321            vec![
322                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
323                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
324                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
325            ],
326            vec![
327                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
328                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
329            ],
330        ];
331        for (i, batch) in data_batches.iter().enumerate() {
332            let mut source_msg_batch = vec![];
333            if i == 0 {
334                // put begin message at first
335                source_msg_batch.push(SourceMessage {
336                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
337                        "orders".to_owned(),
338                        0,
339                        if transactional {
340                            cdc_message::CdcMessageType::TransactionMeta
341                        } else {
342                            cdc_message::CdcMessageType::Data
343                        },
344                    )),
345                    split_id: SplitId::from("1001"),
346                    offset: "0".into(),
347                    key: None,
348                    payload: Some(begin_msg.as_bytes().to_vec()),
349                });
350            }
351            // put data messages
352            for data_msg in batch {
353                source_msg_batch.push(SourceMessage {
354                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
355                        "orders".to_owned(),
356                        0,
357                        cdc_message::CdcMessageType::Data,
358                    )),
359                    split_id: SplitId::from("1001"),
360                    offset: "0".into(),
361                    key: None,
362                    payload: Some(data_msg.as_bytes().to_vec()),
363                });
364            }
365            if i == data_batches.len() - 1 {
366                // put commit message at last
367                source_msg_batch.push(SourceMessage {
368                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
369                        "orders".to_owned(),
370                        0,
371                        if transactional {
372                            cdc_message::CdcMessageType::TransactionMeta
373                        } else {
374                            cdc_message::CdcMessageType::Data
375                        },
376                    )),
377                    split_id: SplitId::from("1001"),
378                    offset: "0".into(),
379                    key: None,
380                    payload: Some(commit_msg.as_bytes().to_vec()),
381                });
382            }
383            yield source_msg_batch;
384        }
385    }
386
387    #[tokio::test]
388    async fn test_parse_transaction_metadata() {
389        let schema = ColumnCatalog::debezium_cdc_source_cols();
390
391        let columns = schema
392            .iter()
393            .map(|c| SourceColumnDesc::from(&c.column_desc))
394            .collect::<Vec<_>>();
395
396        // format plain encode json parser
397        let source_ctx = SourceContext {
398            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
399            ..SourceContext::dummy()
400        };
401        let mut parser = PlainParser::new(
402            SpecificParserConfig::DEFAULT_PLAIN_JSON,
403            columns.clone(),
404            Arc::new(source_ctx),
405        )
406        .await
407        .unwrap();
408        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
409
410        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
411        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
412        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
413
414        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
415            "orders".to_owned(),
416            0,
417            cdc_message::CdcMessageType::TransactionMeta,
418        ));
419        let msg_meta = MessageMeta {
420            source_meta: &cdc_meta,
421            split_id: "1001",
422            offset: "",
423        };
424
425        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
426        let res = parser
427            .parse_one_with_txn(
428                None,
429                Some(begin_msg.as_bytes().to_vec()),
430                builder.row_writer().with_meta(msg_meta),
431            )
432            .await;
433        match res {
434            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
435                assert_eq!(id.deref(), expect_tx_id);
436            }
437            _ => panic!("unexpected parse result: {:?}", res),
438        }
439        let res = parser
440            .parse_one_with_txn(
441                None,
442                Some(commit_msg.as_bytes().to_vec()),
443                builder.row_writer().with_meta(msg_meta),
444            )
445            .await;
446        match res {
447            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
448                assert_eq!(id.deref(), expect_tx_id);
449            }
450            _ => panic!("unexpected parse result: {:?}", res),
451        }
452
453        builder.finish_current_chunk();
454        assert!(builder.consume_ready_chunks().next().is_none());
455    }
456
457    #[tokio::test]
458    async fn test_parse_schema_change() {
459        let schema = ColumnCatalog::debezium_cdc_source_cols();
460
461        let columns = schema
462            .iter()
463            .map(|c| SourceColumnDesc::from(&c.column_desc))
464            .collect::<Vec<_>>();
465
466        // format plain encode json parser
467        let source_ctx = SourceContext {
468            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
469            ..SourceContext::dummy()
470        };
471        let mut parser = PlainParser::new(
472            SpecificParserConfig::DEFAULT_PLAIN_JSON,
473            columns.clone(),
474            Arc::new(source_ctx),
475        )
476        .await
477        .unwrap();
478        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
479
480        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
481        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
482            "mydb.test".to_owned(),
483            0,
484            cdc_message::CdcMessageType::SchemaChange,
485        ));
486        let msg_meta = MessageMeta {
487            source_meta: &cdc_meta,
488            split_id: "1001",
489            offset: "",
490        };
491
492        let res = parser
493            .parse_one_with_txn(
494                None,
495                Some(msg.as_bytes().to_vec()),
496                dummy_builder.row_writer().with_meta(msg_meta),
497            )
498            .await;
499
500        let res = res.unwrap();
501        expect![[r#"
502            SchemaChange(
503                SchemaChangeEnvelope {
504                    table_changes: [
505                        TableSchemaChange {
506                            cdc_table_id: "0.mydb.test",
507                            columns: [
508                                ColumnCatalog {
509                                    column_desc: ColumnDesc {
510                                        data_type: Int32,
511                                        column_id: #2147483646,
512                                        name: "id",
513                                        generated_or_default_column: None,
514                                        description: None,
515                                        additional_column: AdditionalColumn {
516                                            column_type: None,
517                                        },
518                                        version: Pr13707,
519                                        system_column: None,
520                                        nullable: true,
521                                    },
522                                    is_hidden: false,
523                                },
524                                ColumnCatalog {
525                                    column_desc: ColumnDesc {
526                                        data_type: Timestamptz,
527                                        column_id: #2147483646,
528                                        name: "v1",
529                                        generated_or_default_column: None,
530                                        description: None,
531                                        additional_column: AdditionalColumn {
532                                            column_type: None,
533                                        },
534                                        version: Pr13707,
535                                        system_column: None,
536                                        nullable: true,
537                                    },
538                                    is_hidden: false,
539                                },
540                                ColumnCatalog {
541                                    column_desc: ColumnDesc {
542                                        data_type: Varchar,
543                                        column_id: #2147483646,
544                                        name: "v2",
545                                        generated_or_default_column: None,
546                                        description: None,
547                                        additional_column: AdditionalColumn {
548                                            column_type: None,
549                                        },
550                                        version: Pr13707,
551                                        system_column: None,
552                                        nullable: true,
553                                    },
554                                    is_hidden: false,
555                                },
556                            ],
557                            change_type: Alter,
558                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
559                        },
560                    ],
561                },
562            )
563        "#]]
564        .assert_debug_eq(&res);
565    }
566}