1use risingwave_common::bail;
16use thiserror_ext::AsReport;
17
18use super::unified::json::{
19 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
20};
21use super::unified::kv_event::KvEvent;
22use super::{
23 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, SourceStreamChunkRowWriter,
24 SpecificParserConfig,
25};
26use crate::error::ConnectorResult;
27use crate::parser::bytes_parser::BytesAccessBuilder;
28use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
29use crate::parser::unified::AccessImpl;
30use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta};
31use crate::parser::upsert_parser::get_key_column_name;
32use crate::parser::{BytesProperties, ParseResult, ParserFormat};
33use crate::source::cdc::CdcMessageType;
34use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta};
35
36#[derive(Debug)]
38pub struct PlainParser {
39 pub key_builder: Option<AccessBuilderImpl>,
40 pub payload_builder: AccessBuilderImpl,
41 pub(crate) rw_columns: Vec<SourceColumnDesc>,
42 pub source_ctx: SourceContextRef,
43 pub transaction_meta_builder: Option<AccessBuilderImpl>,
45 pub schema_change_builder: Option<AccessBuilderImpl>,
46}
47
48impl PlainParser {
49 pub async fn new(
50 props: SpecificParserConfig,
51 rw_columns: Vec<SourceColumnDesc>,
52 source_ctx: SourceContextRef,
53 ) -> ConnectorResult<Self> {
54 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
55 Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new(
56 EncodingProperties::Bytes(BytesProperties {
57 column_name: Some(key_column_name),
58 }),
59 )?))
60 } else {
61 None
62 };
63
64 let payload_builder = match props.encoding_config {
65 EncodingProperties::Json(_)
66 | EncodingProperties::Protobuf(_)
67 | EncodingProperties::Avro(_)
68 | EncodingProperties::Bytes(_) => {
69 AccessBuilderImpl::new_default(props.encoding_config).await?
70 }
71 _ => bail!("Unsupported encoding for Plain"),
72 };
73
74 let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson(
75 DebeziumJsonAccessBuilder::new(
76 TimestamptzHandling::GuessNumberUnit,
77 TimestampHandling::GuessNumberUnit,
78 TimeHandling::Micro,
79 BigintUnsignedHandlingMode::Long,
80 false,
81 )?,
82 ));
83
84 let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson(
85 DebeziumJsonAccessBuilder::new_for_schema_event()?,
86 ));
87
88 Ok(Self {
89 key_builder,
90 payload_builder,
91 rw_columns,
92 source_ctx,
93 transaction_meta_builder,
94 schema_change_builder,
95 })
96 }
97
98 pub async fn parse_inner(
99 &mut self,
100 key: Option<Vec<u8>>,
101 payload: Option<Vec<u8>>,
102 writer: SourceStreamChunkRowWriter<'_>,
103 ) -> ConnectorResult<ParseResult> {
104 if let Some(msg_meta) = writer.row_meta()
107 && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.source_meta
108 && let Some(data) = payload
109 {
110 match cdc_meta.msg_type {
111 CdcMessageType::Data | CdcMessageType::Heartbeat => {
112 return self.parse_rows(key, Some(data), writer).await;
113 }
114 CdcMessageType::TransactionMeta => {
115 let accessor = self
116 .transaction_meta_builder
117 .as_mut()
118 .expect("expect transaction metadata access builder")
119 .generate_accessor(data, writer.source_meta())
120 .await?;
121 return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props)
122 {
123 Ok(transaction_control) => {
124 Ok(ParseResult::TransactionControl(transaction_control))
125 }
126 Err(err) => Err(err)?,
127 };
128 }
129 CdcMessageType::SchemaChange => {
130 let accessor = self
131 .schema_change_builder
132 .as_mut()
133 .expect("expect schema change access builder")
134 .generate_accessor(data, writer.source_meta())
135 .await?;
136
137 return match parse_schema_change(
138 &accessor,
139 self.source_ctx.source_id,
140 &self.source_ctx.source_name,
141 &self.source_ctx.connector_props,
142 )
143 .await
144 {
145 Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
146 Err(err) => {
147 let (fail_info, table_name, cdc_table_id) = match &err {
149 crate::parser::AccessError::CdcAutoSchemaChangeError {
150 ty,
151 table_name,
152 ..
153 } => {
154 let clean_table_name =
156 table_name.trim_matches('"').replace("\".\"", ".");
157 let fail_info = format!(
158 "Unsupported data type '{}' in source '{}' table '{}'",
159 ty, self.source_ctx.source_name, clean_table_name
160 );
161 let cdc_table_id = format!(
163 "{}.{}",
164 self.source_ctx.source_name, clean_table_name
165 );
166
167 (fail_info, clean_table_name, cdc_table_id)
168 }
169 _ => {
170 let fail_info = format!(
171 "Failed to parse schema change: {:?}, source: {}",
172 err.as_report(),
173 self.source_ctx.source_name
174 );
175 (fail_info, "".to_owned(), "".to_owned())
176 }
177 };
178 self.source_ctx.on_cdc_auto_schema_change_failure(
179 self.source_ctx.source_id,
180 table_name,
181 cdc_table_id,
182 "".to_owned(), fail_info,
184 );
185
186 Err(err)?
187 }
188 };
189 }
190 CdcMessageType::Unspecified => {
191 unreachable!()
192 }
193 }
194 }
195
196 self.parse_rows(key, payload, writer).await
198 }
199
200 async fn parse_rows(
201 &mut self,
202 key: Option<Vec<u8>>,
203 payload: Option<Vec<u8>>,
204 mut writer: SourceStreamChunkRowWriter<'_>,
205 ) -> ConnectorResult<ParseResult> {
206 let meta = writer.source_meta();
207 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
208
209 if let Some(data) = key
210 && let Some(key_builder) = self.key_builder.as_mut()
211 {
212 row_op.with_key(key_builder.generate_accessor(data, meta).await?);
214 }
215 if let Some(data) = payload {
216 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
218 }
219
220 writer.do_insert(|column: &SourceColumnDesc| row_op.access_field::<false>(column))?;
221
222 Ok(ParseResult::Rows)
223 }
224}
225
226impl ByteStreamSourceParser for PlainParser {
227 fn columns(&self) -> &[SourceColumnDesc] {
228 &self.rw_columns
229 }
230
231 fn source_ctx(&self) -> &SourceContext {
232 &self.source_ctx
233 }
234
235 fn parser_format(&self) -> ParserFormat {
236 ParserFormat::Plain
237 }
238
239 async fn parse_one<'a>(
240 &'a mut self,
241 _key: Option<Vec<u8>>,
242 _payload: Option<Vec<u8>>,
243 _writer: SourceStreamChunkRowWriter<'a>,
244 ) -> ConnectorResult<()> {
245 unreachable!("should call `parse_one_with_txn` instead")
246 }
247
248 async fn parse_one_with_txn<'a>(
249 &'a mut self,
250 key: Option<Vec<u8>>,
251 payload: Option<Vec<u8>>,
252 writer: SourceStreamChunkRowWriter<'a>,
253 ) -> ConnectorResult<ParseResult> {
254 self.parse_inner(key, payload, writer).await
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use std::ops::Deref;
261 use std::sync::Arc;
262
263 use expect_test::expect;
264 use futures::executor::block_on;
265 use futures::{StreamExt, TryStreamExt};
266 use futures_async_stream::try_stream;
267 use itertools::Itertools;
268 use risingwave_common::catalog::ColumnCatalog;
269 use risingwave_pb::connector_service::{SourceType, cdc_message};
270
271 use super::*;
272 use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl};
273 use crate::source::cdc::DebeziumCdcMeta;
274 use crate::source::{
275 ConnectorProperties, SourceCtrlOpts, SourceMessage, SourceMessageEvent, SourceReaderEvent,
276 SplitId,
277 };
278
279 #[tokio::test]
280 async fn test_emit_transactional_chunk() {
281 let schema = ColumnCatalog::debezium_cdc_source_cols();
282
283 let columns = schema
284 .iter()
285 .map(|c| SourceColumnDesc::from(&c.column_desc))
286 .collect::<Vec<_>>();
287
288 let source_ctx = SourceContext {
289 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
290 ..SourceContext::dummy()
291 };
292 let source_ctx = Arc::new(source_ctx);
293 let parser = PlainParser::new(
295 SpecificParserConfig::DEFAULT_PLAIN_JSON,
296 columns.clone(),
297 source_ctx.clone(),
298 )
299 .await
300 .unwrap();
301
302 let mut transactional = false;
303 let message_stream = source_message_stream(transactional);
305 let chunk_stream = crate::parser::parse_message_stream(
306 parser,
307 message_stream.map_ok(SourceMessageEvent::Data).boxed(),
308 SourceCtrlOpts::for_test(),
309 );
310 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
311 let output = output
312 .unwrap()
313 .into_iter()
314 .filter_map(|event| match event {
315 SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
316 SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
317 })
318 .enumerate()
319 .map(|(i, c)| {
320 if i == 0 {
321 assert_eq!(4, c.cardinality());
323 }
324 if i == 1 {
325 assert_eq!(3, c.cardinality());
327 }
328 c
329 })
330 .collect_vec();
331
332 assert_eq!(2, output.len());
334
335 let parser = PlainParser::new(
337 SpecificParserConfig::DEFAULT_PLAIN_JSON,
338 columns.clone(),
339 source_ctx,
340 )
341 .await
342 .unwrap();
343
344 transactional = true;
346 let message_stream = source_message_stream(transactional);
347 let chunk_stream = crate::parser::parse_message_stream(
348 parser,
349 message_stream.map_ok(SourceMessageEvent::Data).boxed(),
350 SourceCtrlOpts::for_test(),
351 );
352 let output: std::result::Result<Vec<_>, _> = block_on(chunk_stream.try_collect::<Vec<_>>());
353 let output = output
354 .unwrap()
355 .into_iter()
356 .filter_map(|event| match event {
357 SourceReaderEvent::DataChunk(chunk) if chunk.cardinality() > 0 => Some(chunk),
358 SourceReaderEvent::DataChunk(_) | SourceReaderEvent::SplitProgress(_) => None,
359 })
360 .inspect(|c| {
361 assert_eq!(5, c.cardinality());
363 })
364 .collect_vec();
365
366 assert_eq!(1, output.len());
368 }
369
370 #[try_stream(ok = Vec<SourceMessage>, error = crate::error::ConnectorError)]
371 async fn source_message_stream(transactional: bool) {
372 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
373 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
374 let data_batches = [
375 vec![
376 r#"{ "schema": null, "payload": {"after": {"customer_name": "a1", "order_date": "2020-01-30", "order_id": 10021, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
377 r#"{ "schema": null, "payload": {"after": {"customer_name": "a2", "order_date": "2020-02-30", "order_id": 10022, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
378 r#"{ "schema": null, "payload": {"after": {"customer_name": "a3", "order_date": "2020-03-30", "order_id": 10023, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
379 ],
380 vec![
381 r#"{ "schema": null, "payload": {"after": {"customer_name": "a4", "order_date": "2020-04-30", "order_id": 10024, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
382 r#"{ "schema": null, "payload": {"after": {"customer_name": "a5", "order_date": "2020-05-30", "order_id": 10025, "order_status": false, "price": "50.50", "product_id": 102}, "before": null, "op": "c", "source": {"connector": "postgresql", "db": "mydb", "lsn": 3963199336, "name": "RW_CDC_1001", "schema": "public", "sequence": "[\"3963198512\",\"3963199336\"]", "snapshot": "false", "table": "orders_tx", "ts_ms": 1704355505506, "txId": 35352, "version": "2.4.2.Final", "xmin": null}, "transaction": {"data_collection_order": 1, "id": "35392:3963199336", "total_order": 1}, "ts_ms": 1704355839905} }"#,
383 ],
384 ];
385 for (i, batch) in data_batches.iter().enumerate() {
386 let mut source_msg_batch = vec![];
387 if i == 0 {
388 source_msg_batch.push(SourceMessage {
390 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
391 "orders".to_owned(),
392 0,
393 if transactional {
394 cdc_message::CdcMessageType::TransactionMeta
395 } else {
396 cdc_message::CdcMessageType::Data
397 },
398 SourceType::Unspecified,
399 )),
400 split_id: SplitId::from("1001"),
401 offset: "0".into(),
402 key: None,
403 payload: Some(begin_msg.as_bytes().to_vec()),
404 });
405 }
406 for data_msg in batch {
408 source_msg_batch.push(SourceMessage {
409 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
410 "orders".to_owned(),
411 0,
412 cdc_message::CdcMessageType::Data,
413 SourceType::Unspecified,
414 )),
415 split_id: SplitId::from("1001"),
416 offset: "0".into(),
417 key: None,
418 payload: Some(data_msg.as_bytes().to_vec()),
419 });
420 }
421 if i == data_batches.len() - 1 {
422 source_msg_batch.push(SourceMessage {
424 meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
425 "orders".to_owned(),
426 0,
427 if transactional {
428 cdc_message::CdcMessageType::TransactionMeta
429 } else {
430 cdc_message::CdcMessageType::Data
431 },
432 SourceType::Unspecified,
433 )),
434 split_id: SplitId::from("1001"),
435 offset: "0".into(),
436 key: None,
437 payload: Some(commit_msg.as_bytes().to_vec()),
438 });
439 }
440 yield source_msg_batch;
441 }
442 }
443
444 #[tokio::test]
445 async fn test_parse_transaction_metadata() {
446 let schema = ColumnCatalog::debezium_cdc_source_cols();
447
448 let columns = schema
449 .iter()
450 .map(|c| SourceColumnDesc::from(&c.column_desc))
451 .collect::<Vec<_>>();
452
453 let source_ctx = SourceContext {
455 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
456 ..SourceContext::dummy()
457 };
458 let mut parser = PlainParser::new(
459 SpecificParserConfig::DEFAULT_PLAIN_JSON,
460 columns.clone(),
461 Arc::new(source_ctx),
462 )
463 .await
464 .unwrap();
465 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
466
467 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
469 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
470
471 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
472 "orders".to_owned(),
473 0,
474 cdc_message::CdcMessageType::TransactionMeta,
475 SourceType::Unspecified,
476 ));
477 let msg_meta = MessageMeta {
478 source_meta: &cdc_meta,
479 split_id: "1001",
480 offset: "",
481 };
482
483 let expect_tx_id = "3E11FA47-71CA-11E1-9E33-C80AA9429562:23";
484 let res = parser
485 .parse_one_with_txn(
486 None,
487 Some(begin_msg.as_bytes().to_vec()),
488 builder.row_writer().with_meta(msg_meta),
489 )
490 .await;
491 match res {
492 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
493 assert_eq!(id.deref(), expect_tx_id);
494 }
495 _ => panic!("unexpected parse result: {:?}", res),
496 }
497 let res = parser
498 .parse_one_with_txn(
499 None,
500 Some(commit_msg.as_bytes().to_vec()),
501 builder.row_writer().with_meta(msg_meta),
502 )
503 .await;
504 match res {
505 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
506 assert_eq!(id.deref(), expect_tx_id);
507 }
508 _ => panic!("unexpected parse result: {:?}", res),
509 }
510
511 builder.finish_current_chunk();
512 assert!(builder.consume_ready_chunks().next().is_none());
513 }
514
515 #[tokio::test]
516 async fn test_parse_schema_change() {
517 let schema = ColumnCatalog::debezium_cdc_source_cols();
518
519 let columns = schema
520 .iter()
521 .map(|c| SourceColumnDesc::from(&c.column_desc))
522 .collect::<Vec<_>>();
523
524 let source_ctx = SourceContext {
526 connector_props: ConnectorProperties::MysqlCdc(Box::default()),
527 ..SourceContext::dummy()
528 };
529 let mut parser = PlainParser::new(
530 SpecificParserConfig::DEFAULT_PLAIN_JSON,
531 columns.clone(),
532 Arc::new(source_ctx),
533 )
534 .await
535 .unwrap();
536 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
537
538 let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#;
539 let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new(
540 "mydb.test".to_owned(),
541 0,
542 cdc_message::CdcMessageType::SchemaChange,
543 SourceType::Mysql,
544 ));
545 let msg_meta = MessageMeta {
546 source_meta: &cdc_meta,
547 split_id: "1001",
548 offset: "",
549 };
550
551 let res = parser
552 .parse_one_with_txn(
553 None,
554 Some(msg.as_bytes().to_vec()),
555 dummy_builder.row_writer().with_meta(msg_meta),
556 )
557 .await;
558
559 let res = res.unwrap();
560 expect![[r#"
561 SchemaChange(
562 SchemaChangeEnvelope {
563 table_changes: [
564 TableSchemaChange {
565 cdc_table_id: "0.mydb.test",
566 columns: [
567 ColumnCatalog {
568 column_desc: ColumnDesc {
569 data_type: Int32,
570 column_id: #2147483646,
571 name: "id",
572 generated_or_default_column: None,
573 description: None,
574 additional_column: AdditionalColumn {
575 column_type: None,
576 },
577 version: Pr13707,
578 system_column: None,
579 nullable: true,
580 },
581 is_hidden: false,
582 },
583 ColumnCatalog {
584 column_desc: ColumnDesc {
585 data_type: Timestamptz,
586 column_id: #2147483646,
587 name: "v1",
588 generated_or_default_column: None,
589 description: None,
590 additional_column: AdditionalColumn {
591 column_type: None,
592 },
593 version: Pr13707,
594 system_column: None,
595 nullable: true,
596 },
597 is_hidden: false,
598 },
599 ColumnCatalog {
600 column_desc: ColumnDesc {
601 data_type: Varchar,
602 column_id: #2147483646,
603 name: "v2",
604 generated_or_default_column: None,
605 description: None,
606 additional_column: AdditionalColumn {
607 column_type: None,
608 },
609 version: Pr13707,
610 system_column: None,
611 nullable: true,
612 },
613 is_hidden: false,
614 },
615 ],
616 change_type: Alter,
617 upstream_ddl: "ALTER TABLE test add column v2 varchar(32)",
618 },
619 ],
620 },
621 )
622 "#]]
623 .assert_debug_eq(&res);
624 }
625}