risingwave_connector/parser/
plain_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16
17use super::unified::json::TimestamptzHandling;
18use super::unified::kv_event::KvEvent;
19use super::{
20    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
21    SpecificParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::parser::bytes_parser::BytesAccessBuilder;
25use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
26use crate::parser::unified::AccessImpl;
27use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
28use crate::parser::upsert_parser::get_key_column_name;
29use crate::parser::{BytesProperties, ParseResult, ParserFormat};
30use crate::source::cdc::CdcMessageType;
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
32
33/// Parser for `FORMAT PLAIN`, i.e., append-only source.
34#[derive(Debug)]
35pub struct PlainParser {
36    pub key_builder: Option<AccessBuilderImpl>,
37    pub payload_builder: AccessBuilderImpl,
38    pub(crate) rw_columns: Vec<SourceColumnDesc>,
39    pub source_ctx: SourceContextRef,
40    // parsing transaction metadata for shared cdc source
41    pub transaction_meta_builder: Option<AccessBuilderImpl>,
42    pub schema_change_builder: Option<AccessBuilderImpl>,
43}
44
45impl PlainParser {
46    pub async fn new(
47        props: SpecificParserConfig,
48        rw_columns: Vec<SourceColumnDesc>,
49        source_ctx: SourceContextRef,
50    ) -> ConnectorResult<Self> {
51        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
52            Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
53                EncodingProperties::Bytes(BytesProperties {
54                    column_name: Some(key_column_name),
55                }),
56            )?))
57        } else {
58            None
59        };
60
61        let payload_builder = match props.encoding_config {
62            EncodingProperties::Json(_)
63            | EncodingProperties::Protobuf(_)
64            | EncodingProperties::Avro(_)
65            | EncodingProperties::Bytes(_) => {
66                AccessBuilderImpl::new_default(props.encoding_config).await?
67            }
68            _ => bail!("Unsupported encoding for Plain"),
69        };
70
71        let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
72            DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?,
73        ));
74
75        let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
76            DebeziumJsonAccessBuilder::new_for_schema_event()?,
77        ));
78
79        Ok(Self {
80            key_builder,
81            payload_builder,
82            rw_columns,
83            source_ctx,
84            transaction_meta_builder,
85            schema_change_builder,
86        })
87    }
88
89    pub async fn parse_inner(
90        &mut self,
91        key: Option<Vec<u8>>,
92        payload: Option<Vec<u8>>,
93        writer: SourceStreamChunkRowWriter<'_>,
94    ) -> ConnectorResult<ParseResult> {
95        // plain parser also used in the shared cdc source,
96        // we need to handle transaction metadata and schema change messages here
97        if let Some(msg_meta) = writer.row_meta()
98            && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
99            && let Some(data) = payload
100        {
101            match cdc_meta.msg_type {
102                CdcMessageType::Data | CdcMessageType::Heartbeat => {
103                    return self.parse_rows(key, Some(data), writer).await;
104                }
105                CdcMessageType::TransactionMeta => {
106                    let accessor = self
107                        .transaction_meta_builder
108                        .as_mut()
109                        .expect("expect transaction metadata access builder")
110                        .generate_accessor(data, writer.source_meta())
111                        .await?;
112                    return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
113                    {
114                        Ok(transaction_control) => {
115                            Ok(ParseResult::TransactionControl(transaction_control))
116                        }
117                        Err(err) => Err(err)?,
118                    };
119                }
120                CdcMessageType::SchemaChange => {
121                    let accessor = self
122                        .schema_change_builder
123                        .as_mut()
124                        .expect("expect schema change access builder")
125                        .generate_accessor(data, writer.source_meta())
126                        .await?;
127
128                    return match parse_schema_change(
129                        &accessor,
130                        self.source_ctx.source_id.into(),
131                        &self.source_ctx.connector_props,
132                    ) {
133                        Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
134                        Err(err) => Err(err)?,
135                    };
136                }
137                CdcMessageType::Unspecified => {
138                    unreachable!()
139                }
140            }
141        }
142
143        // for non-cdc source messages
144        self.parse_rows(key, payload, writer).await
145    }
146
147    async fn parse_rows(
148        &mut self,
149        key: Option<Vec<u8>>,
150        payload: Option<Vec<u8>>,
151        mut writer: SourceStreamChunkRowWriter<'_>,
152    ) -> ConnectorResult<ParseResult> {
153        let meta = writer.source_meta();
154        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
155
156        if let Some(data) = key
157            && let Some(key_builder) = self.key_builder.as_mut()
158        {
159            // key is optional in format plain
160            row_op.with_key(key_builder.generate_accessor(data, meta).await?);
161        }
162        if let Some(data) = payload {
163            // the data part also can be an empty vec
164            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
165        }
166
167        writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
168
169        Ok(ParseResult::Rows)
170    }
171}
172
173impl ByteStreamSourceParser for PlainParser {
174    fn columns(&self) -> &[SourceColumnDesc] {
175        &self.rw_columns
176    }
177
178    fn source_ctx(&self) -> &SourceContext {
179        &self.source_ctx
180    }
181
182    fn parser_format(&self) -> ParserFormat {
183        ParserFormat::Plain
184    }
185
186    async fn parse_one<'a>(
187        &'a mut self,
188        _key: Option<Vec<u8>>,
189        _payload: Option<Vec<u8>>,
190        _writer: SourceStreamChunkRowWriter<'a>,
191    ) -> ConnectorResult<()> {
192        unreachable!("should call `parse_one_with_txn` instead")
193    }
194
195    async fn parse_one_with_txn<'a>(
196        &'a mut self,
197        key: Option<Vec<u8>>,
198        payload: Option<Vec<u8>>,
199        writer: SourceStreamChunkRowWriter<'a>,
200    ) -> ConnectorResult<ParseResult> {
201        self.parse_inner(key, payload, writer).await
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::ops::Deref;
208    use std::sync::Arc;
209
210    use expect_test::expect;
211    use futures::StreamExt;
212    use futures::executor::block_on;
213    use futures_async_stream::try_stream;
214    use itertools::Itertools;
215    use risingwave_common::catalog::ColumnCatalog;
216    use risingwave_pb::connector_service::cdc_message;
217
218    use super::*;
219    use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
220    use crate::source::cdc::DebeziumCdcMeta;
221    use crate::source::{ConnectorProperties, SourceCtrlOpts, SourceMessage, SplitId};
222
223    #[tokio::test]
224    async fn test_emit_transactional_chunk() {
225        let schema = ColumnCatalog::debezium_cdc_source_cols();
226
227        let columns = schema
228            .iter()
229            .map(|c| SourceColumnDesc::from(&c.column_desc))
230            .collect::<Vec<_>>();
231
232        let source_ctx = SourceContext {
233            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
234            ..SourceContext::dummy()
235        };
236        let source_ctx = Arc::new(source_ctx);
237        // format plain encode json parser
238        let parser = PlainParser::new(
239            SpecificParserConfig::DEFAULT_PLAIN_JSON,
240            columns.clone(),
241            source_ctx.clone(),
242        )
243        .await
244        .unwrap();
245
246        let mut transactional = false;
247        // for untransactional source, we expect emit a chunk for each message batch
248        let message_stream = source_message_stream(transactional);
249        let chunk_stream = crate::parser::parse_message_stream(
250            parser,
251            message_stream.boxed(),
252            SourceCtrlOpts::for_test(),
253        );
254        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
255            .into_iter()
256            .collect();
257        let output = output
258            .unwrap()
259            .into_iter()
260            .filter(|c| c.cardinality() > 0)
261            .enumerate()
262            .map(|(i, c)| {
263                if i == 0 {
264                    // begin + 3 data messages
265                    assert_eq!(4, c.cardinality());
266                }
267                if i == 1 {
268                    // 2 data messages + 1 end
269                    assert_eq!(3, c.cardinality());
270                }
271                c
272            })
273            .collect_vec();
274
275        // 2 chunks for 2 message batches
276        assert_eq!(2, output.len());
277
278        // format plain encode json parser
279        let parser = PlainParser::new(
280            SpecificParserConfig::DEFAULT_PLAIN_JSON,
281            columns.clone(),
282            source_ctx,
283        )
284        .await
285        .unwrap();
286
287        // for transactional source, we expect emit a single chunk for the transaction
288        transactional = true;
289        let message_stream = source_message_stream(transactional);
290        let chunk_stream = crate::parser::parse_message_stream(
291            parser,
292            message_stream.boxed(),
293            SourceCtrlOpts::for_test(),
294        );
295        let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.collect::<Vec<_>>())
296            .into_iter()
297            .collect();
298        let output = output
299            .unwrap()
300            .into_iter()
301            .filter(|c| c.cardinality() > 0)
302            .inspect(|c| {
303                // 5 data messages in a single chunk
304                assert_eq!(5, c.cardinality());
305            })
306            .collect_vec();
307
308        // a single transactional chunk
309        assert_eq!(1, output.len());
310    }
311
312    #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
313    async fn source_message_stream(transactional: bool) {
314        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
315        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
316        let data_batches = [
317            vec![
318                r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
319                r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
320                r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
321            ],
322            vec![
323                r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
324                r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
325            ],
326        ];
327        for (i, batch) in data_batches.iter().enumerate() {
328            let mut source_msg_batch = vec![];
329            if i == 0 {
330                // put begin message at first
331                source_msg_batch.push(SourceMessage {
332                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
333                        "orders".to_owned(),
334                        0,
335                        if transactional {
336                            cdc_message::CdcMessageType::TransactionMeta
337                        } else {
338                            cdc_message::CdcMessageType::Data
339                        },
340                    )),
341                    split_id: SplitId::from("1001"),
342                    offset: "0".into(),
343                    key: None,
344                    payload: Some(begin_msg.as_bytes().to_vec()),
345                });
346            }
347            // put data messages
348            for data_msg in batch {
349                source_msg_batch.push(SourceMessage {
350                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
351                        "orders".to_owned(),
352                        0,
353                        cdc_message::CdcMessageType::Data,
354                    )),
355                    split_id: SplitId::from("1001"),
356                    offset: "0".into(),
357                    key: None,
358                    payload: Some(data_msg.as_bytes().to_vec()),
359                });
360            }
361            if i == data_batches.len() - 1 {
362                // put commit message at last
363                source_msg_batch.push(SourceMessage {
364                    meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
365                        "orders".to_owned(),
366                        0,
367                        if transactional {
368                            cdc_message::CdcMessageType::TransactionMeta
369                        } else {
370                            cdc_message::CdcMessageType::Data
371                        },
372                    )),
373                    split_id: SplitId::from("1001"),
374                    offset: "0".into(),
375                    key: None,
376                    payload: Some(commit_msg.as_bytes().to_vec()),
377                });
378            }
379            yield source_msg_batch;
380        }
381    }
382
383    #[tokio::test]
384    async fn test_parse_transaction_metadata() {
385        let schema = ColumnCatalog::debezium_cdc_source_cols();
386
387        let columns = schema
388            .iter()
389            .map(|c| SourceColumnDesc::from(&c.column_desc))
390            .collect::<Vec<_>>();
391
392        // format plain encode json parser
393        let source_ctx = SourceContext {
394            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
395            ..SourceContext::dummy()
396        };
397        let mut parser = PlainParser::new(
398            SpecificParserConfig::DEFAULT_PLAIN_JSON,
399            columns.clone(),
400            Arc::new(source_ctx),
401        )
402        .await
403        .unwrap();
404        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
405
406        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
407        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
408        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
409
410        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
411            "orders".to_owned(),
412            0,
413            cdc_message::CdcMessageType::TransactionMeta,
414        ));
415        let msg_meta = MessageMeta {
416            source_meta: &cdc_meta,
417            split_id: "1001",
418            offset: "",
419        };
420
421        let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
422        let res = parser
423            .parse_one_with_txn(
424                None,
425                Some(begin_msg.as_bytes().to_vec()),
426                builder.row_writer().with_meta(msg_meta),
427            )
428            .await;
429        match res {
430            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
431                assert_eq!(id.deref(), expect_tx_id);
432            }
433            _ => panic!("unexpected parse result: {:?}", res),
434        }
435        let res = parser
436            .parse_one_with_txn(
437                None,
438                Some(commit_msg.as_bytes().to_vec()),
439                builder.row_writer().with_meta(msg_meta),
440            )
441            .await;
442        match res {
443            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
444                assert_eq!(id.deref(), expect_tx_id);
445            }
446            _ => panic!("unexpected parse result: {:?}", res),
447        }
448
449        builder.finish_current_chunk();
450        assert!(builder.consume_ready_chunks().next().is_none());
451    }
452
453    #[tokio::test]
454    async fn test_parse_schema_change() {
455        let schema = ColumnCatalog::debezium_cdc_source_cols();
456
457        let columns = schema
458            .iter()
459            .map(|c| SourceColumnDesc::from(&c.column_desc))
460            .collect::<Vec<_>>();
461
462        // format plain encode json parser
463        let source_ctx = SourceContext {
464            connector_props: ConnectorProperties::MysqlCdc(Box::default()),
465            ..SourceContext::dummy()
466        };
467        let mut parser = PlainParser::new(
468            SpecificParserConfig::DEFAULT_PLAIN_JSON,
469            columns.clone(),
470            Arc::new(source_ctx),
471        )
472        .await
473        .unwrap();
474        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
475
476        let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
477        let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
478            "mydb.test".to_owned(),
479            0,
480            cdc_message::CdcMessageType::SchemaChange,
481        ));
482        let msg_meta = MessageMeta {
483            source_meta: &cdc_meta,
484            split_id: "1001",
485            offset: "",
486        };
487
488        let res = parser
489            .parse_one_with_txn(
490                None,
491                Some(msg.as_bytes().to_vec()),
492                dummy_builder.row_writer().with_meta(msg_meta),
493            )
494            .await;
495
496        let res = res.unwrap();
497        expect![[r#"
498            SchemaChange(
499                SchemaChangeEnvelope {
500                    table_changes: [
501                        TableSchemaChange {
502                            cdc_table_id: "0.mydb.test",
503                            columns: [
504                                ColumnCatalog {
505                                    column_desc: ColumnDesc {
506                                        data_type: Int32,
507                                        column_id: #2147483646,
508                                        name: "id",
509                                        generated_or_default_column: None,
510                                        description: None,
511                                        additional_column: AdditionalColumn {
512                                            column_type: None,
513                                        },
514                                        version: Pr13707,
515                                        system_column: None,
516                                        nullable: true,
517                                    },
518                                    is_hidden: false,
519                                },
520                                ColumnCatalog {
521                                    column_desc: ColumnDesc {
522                                        data_type: Timestamptz,
523                                        column_id: #2147483646,
524                                        name: "v1",
525                                        generated_or_default_column: None,
526                                        description: None,
527                                        additional_column: AdditionalColumn {
528                                            column_type: None,
529                                        },
530                                        version: Pr13707,
531                                        system_column: None,
532                                        nullable: true,
533                                    },
534                                    is_hidden: false,
535                                },
536                                ColumnCatalog {
537                                    column_desc: ColumnDesc {
538                                        data_type: Varchar,
539                                        column_id: #2147483646,
540                                        name: "v2",
541                                        generated_or_default_column: None,
542                                        description: None,
543                                        additional_column: AdditionalColumn {
544                                            column_type: None,
545                                        },
546                                        version: Pr13707,
547                                        system_column: None,
548                                        nullable: true,
549                                    },
550                                    is_hidden: false,
551                                },
552                            ],
553                            change_type: Alter,
554                            upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
555                        },
556                    ],
557                },
558            )
559        "#]]
560        .assert_debug_eq(&res);
561    }
562}