risingwave_connector/parser/debezium/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25    JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling,
26};
27use crate::parser::{AccessBuilder, MongoProperties};
28
29#[derive(Debug)]
30pub struct DebeziumJsonAccessBuilder {
31    value: Option<Vec<u8>>,
32    json_parse_options: JsonParseOptions,
33}
34
35impl DebeziumJsonAccessBuilder {
36    pub fn new(
37        timestamptz_handling: TimestamptzHandling,
38        timestamp_handling: TimestampHandling,
39        time_handling: TimeHandling,
40    ) -> ConnectorResult<Self> {
41        Ok(Self {
42            value: None,
43            json_parse_options: JsonParseOptions::new_for_debezium(
44                timestamptz_handling,
45                timestamp_handling,
46                time_handling,
47            ),
48        })
49    }
50
51    pub fn new_for_schema_event() -> ConnectorResult<Self> {
52        Ok(Self {
53            value: None,
54            json_parse_options: JsonParseOptions::default(),
55        })
56    }
57}
58
59impl AccessBuilder for DebeziumJsonAccessBuilder {
60    #[allow(clippy::unused_async)]
61    async fn generate_accessor(
62        &mut self,
63        payload: Vec<u8>,
64        _: &crate::source::SourceMeta,
65    ) -> ConnectorResult<AccessImpl<'_>> {
66        self.value = Some(payload);
67        let mut event: BorrowedValue<'_> =
68            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
69                .context("failed to parse debezium json payload")?;
70
71        let payload = if let Some(payload) = event.get_mut("payload") {
72            std::mem::take(payload)
73        } else {
74            event
75        };
76
77        Ok(AccessImpl::Json(JsonAccess::new_with_options(
78            payload,
79            &self.json_parse_options,
80        )))
81    }
82}
83
84#[derive(Debug)]
85pub struct DebeziumMongoJsonAccessBuilder {
86    value: Option<Vec<u8>>,
87    json_parse_options: JsonParseOptions,
88    strong_schema: bool,
89}
90
91impl DebeziumMongoJsonAccessBuilder {
92    pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
93        Ok(Self {
94            value: None,
95            json_parse_options: JsonParseOptions::new_for_debezium(
96                TimestamptzHandling::GuessNumberUnit,
97                TimestampHandling::GuessNumberUnit,
98                TimeHandling::Micro,
99            ),
100            strong_schema: props.strong_schema,
101        })
102    }
103}
104
105impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
106    #[allow(clippy::unused_async)]
107    async fn generate_accessor(
108        &mut self,
109        payload: Vec<u8>,
110        _: &crate::source::SourceMeta,
111    ) -> ConnectorResult<AccessImpl<'_>> {
112        self.value = Some(payload);
113        let mut event: BorrowedValue<'_> =
114            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
115                .context("failed to parse debezium mongo json payload")?;
116
117        let payload = if let Some(payload) = event.get_mut("payload") {
118            std::mem::take(payload)
119        } else {
120            event
121        };
122
123        Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
124            JsonAccess::new_with_options(payload, &self.json_parse_options),
125            self.strong_schema,
126        )))
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use chrono::{NaiveDate, NaiveTime};
133    use risingwave_common::array::{Op, StructValue};
134    use risingwave_common::catalog::ColumnId;
135    use risingwave_common::row::{OwnedRow, Row};
136    use risingwave_common::types::{
137        DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
138    };
139    use serde_json::Value;
140    use thiserror_ext::AsReport;
141
142    use crate::parser::{
143        DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
144        SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
145    };
146    use crate::source::{SourceContext, SourceCtrlOpts};
147
148    fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
149        if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
150            let mut json_string = String::new();
151            json_val
152                .as_scalar_ref()
153                .force_str(&mut json_string)
154                .unwrap();
155            let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
156            let val2: Value = serde_json::from_str(json_str).unwrap();
157            assert_eq!(val1, val2);
158        }
159    }
160
161    async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
162        let props = SpecificParserConfig {
163            encoding_config: EncodingProperties::Json(JsonProperties {
164                use_schema_registry: false,
165                timestamptz_handling: None,
166                timestamp_handling: None,
167                time_handling: None,
168            }),
169            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
170        };
171        DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
172            .await
173            .unwrap()
174    }
175
176    async fn parse_one(
177        mut parser: DebeziumParser,
178        columns: Vec<SourceColumnDesc>,
179        payload: Vec<u8>,
180    ) -> Vec<(Op, OwnedRow)> {
181        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
182        parser
183            .parse_inner(None, Some(payload), builder.row_writer())
184            .await
185            .unwrap();
186        builder.finish_current_chunk();
187        let chunk = builder.consume_ready_chunks().next().unwrap();
188        chunk
189            .rows()
190            .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
191            .collect::<Vec<_>>()
192    }
193
194    mod test1_basic {
195        use super::*;
196
197        fn get_test1_columns() -> Vec<SourceColumnDesc> {
198            vec![
199                SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
200                SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
201                SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
202                SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
203            ]
204        }
205
206        #[tokio::test]
207        async fn test1_debezium_json_parser_read() {
208            //     "before": null,
209            //     "after": {
210            //       "id": 101,
211            //       "name": "scooter",
212            //       "description": "Small 2-wheel scooter",
213            //       "weight": 1.234
214            //     },
215            let input = vec![
216                // data with payload field
217                br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
218                // data without payload field
219                br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
220
221            let columns = get_test1_columns();
222
223            for data in input {
224                let parser = build_parser(columns.clone()).await;
225                let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
226                    .await
227                    .try_into()
228                    .unwrap();
229
230                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
231                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
232                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
233                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
234            }
235        }
236
237        #[tokio::test]
238        async fn test1_debezium_json_parser_insert() {
239            //     "before": null,
240            //     "after": {
241            //       "id": 102,
242            //       "name": "car battery",
243            //       "description": "12V car battery",
244            //       "weight": 8.1
245            //     },
246            let input = vec![
247                // data with payload field
248                br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
249                // data without payload field
250                br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
251
252            let columns = get_test1_columns();
253
254            for data in input {
255                let parser = build_parser(columns.clone()).await;
256                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
257                    .await
258                    .try_into()
259                    .unwrap();
260                assert_eq!(op, Op::Insert);
261
262                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
263                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
264                assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
265                assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
266            }
267        }
268
269        #[tokio::test]
270        async fn test1_debezium_json_parser_delete() {
271            //     "before": {
272            //       "id": 101,
273            //       "name": "scooter",
274            //       "description": "Small 2-wheel scooter",
275            //       "weight": 1.234
276            //     },
277            //     "after": null,
278            let input = vec![
279                // data with payload field
280                br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
281                // data without payload field
282                br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
283
284            for data in input {
285                let columns = get_test1_columns();
286                let parser = build_parser(columns.clone()).await;
287                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
288                    .await
289                    .try_into()
290                    .unwrap();
291
292                assert_eq!(op, Op::Delete);
293
294                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
295                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
296                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
297                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
298            }
299        }
300
301        #[tokio::test]
302        async fn test1_debezium_json_parser_update() {
303            //     "before": {
304            //       "id": 102,
305            //       "name": "car battery",
306            //       "description": "12V car battery",
307            //       "weight": 8.1
308            //     },
309            //     "after": {
310            //       "id": 102,
311            //       "name": "car battery",
312            //       "description": "24V car battery",
313            //       "weight": 9.1
314            //     },
315            let input = vec![
316                // data with payload field
317                br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
318                // data without payload field
319                br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
320
321            let columns = get_test1_columns();
322
323            for data in input {
324                let parser = build_parser(columns.clone()).await;
325                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
326                    .await
327                    .try_into()
328                    .unwrap();
329
330                assert_eq!(op, Op::Insert);
331
332                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
333                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
334                assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
335                assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
336            }
337        }
338    }
339    // test2 covers read/insert/update/delete event on the following MySQL table for debezium json:
340    // CREATE TABLE IF NOT EXISTS orders (
341    //     O_KEY BIGINT NOT NULL,
342    //     O_BOOL BOOLEAN,
343    //     O_TINY TINYINT,
344    //     O_INT INT,
345    //     O_REAL REAL,
346    //     O_DOUBLE DOUBLE,
347    //     O_DECIMAL DECIMAL(15, 2),
348    //     O_CHAR CHAR(15),
349    //     O_DATE DATE,
350    //     O_TIME TIME,
351    //     O_DATETIME DATETIME,
352    //     O_TIMESTAMP TIMESTAMP,
353    //     O_JSON JSON,
354    //     PRIMARY KEY (O_KEY));
355    // test2 also covers overflow tests on basic types
356    mod test2_mysql {
357        use super::*;
358
359        fn get_test2_columns() -> Vec<SourceColumnDesc> {
360            vec![
361                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
362                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
363                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
364                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
365                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
366                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
367                SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
368                SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
369                SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
370                SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
371                SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
372                SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
373                SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
374            ]
375        }
376
377        #[tokio::test]
378        async fn test2_debezium_json_parser_read() {
379            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
380
381            let columns = get_test2_columns();
382
383            let parser = build_parser(columns.clone()).await;
384
385            let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
386                .await
387                .try_into()
388                .unwrap();
389
390            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
391            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
392            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
393            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
394            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
395            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
396            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
397            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
398            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
399                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
400            )))));
401            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
402                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
403            )))));
404            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
405                "1970-01-01T00:00:00".parse().unwrap()
406            )))));
407            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
408                "1970-01-01T00:00:01Z".parse().unwrap()
409            ))));
410            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
411        }
412
413        #[tokio::test]
414        async fn test2_debezium_json_parser_insert() {
415            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
416
417            let columns = get_test2_columns();
418            let parser = build_parser(columns.clone()).await;
419            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
420                .await
421                .try_into()
422                .unwrap();
423            assert_eq!(op, Op::Insert);
424
425            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
426            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
427            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
428            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
429            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
430            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
431            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
432            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
433            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
434                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
435            )))));
436            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
437                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
438            )))));
439            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
440                "1970-01-01T00:00:00".parse().unwrap()
441            )))));
442            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
443                "1970-01-01T00:00:01Z".parse().unwrap()
444            ))));
445            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
446        }
447
448        #[tokio::test]
449        async fn test2_debezium_json_parser_delete() {
450            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
451
452            let columns = get_test2_columns();
453            let parser = build_parser(columns.clone()).await;
454            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
455                .await
456                .try_into()
457                .unwrap();
458
459            assert_eq!(op, Op::Delete);
460
461            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
462            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
463            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
464            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
465            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
466            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
467            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
468            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
469            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
470                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
471            )))));
472            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
473                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
474            )))));
475            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
476                "5138-11-16T09:46:39".parse().unwrap()
477            )))));
478            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
479                "2038-01-09T03:14:07Z".parse().unwrap()
480            ))));
481            assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
482        }
483
484        #[tokio::test]
485        async fn test2_debezium_json_parser_update() {
486            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
487
488            let columns = get_test2_columns();
489
490            let parser = build_parser(columns.clone()).await;
491            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
492                .await
493                .try_into()
494                .unwrap();
495
496            assert_eq!(op, Op::Insert);
497
498            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
499            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
500            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
501            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
502            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
503            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
504            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
505            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
506            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
507                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
508            )))));
509            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
510                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
511            )))));
512            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
513                "5138-11-16T09:46:39".parse().unwrap()
514            )))));
515            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
516                "2038-01-09T03:14:07Z".parse().unwrap()
517            ))));
518            assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
519        }
520
521        #[cfg(not(madsim))] // Traced test does not work with madsim
522        #[tokio::test]
523        #[tracing_test::traced_test]
524        async fn test2_debezium_json_parser_overflow() {
525            let columns = vec![
526                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
527                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
528                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
529                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
530                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
531                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
532            ];
533            let mut parser = build_parser(columns.clone()).await;
534
535            let mut dummy_builder =
536                SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
537
538            let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
539            let overflow_values = [
540                "9223372036854775808",
541                "2",
542                "32768",
543                "2147483648",
544                "3.80282347E38",
545                "1.797695E308",
546            ];
547
548            for i in 0..6 {
549                let mut values = normal_values;
550                values[i] = overflow_values[i];
551                let data = format!(
552                    r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
553                    values[0], values[1], values[2], values[3], values[4], values[5]
554                ).as_bytes().to_vec();
555
556                let res = parser
557                    .parse_inner(None, Some(data), dummy_builder.row_writer())
558                    .await;
559                if i < 5 {
560                    // For other overflow, the parsing succeeds but the type conversion fails
561                    // The errors are ignored and logged.
562                    res.unwrap();
563                    assert!(logs_contain("expected type"), "{i}");
564                } else {
565                    // For f64 overflow, the parsing fails
566                    let e = res.unwrap_err();
567                    assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
568                }
569            }
570        }
571    }
572
573    // postgres-specific data-type mapping tests
574    mod test3_postgres {
575        use risingwave_pb::plan_common::AdditionalColumn;
576
577        use super::*;
578        use crate::source::SourceColumnType;
579
580        // schema for temporal-type test
581        fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
582            vec![
583                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
584                SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
585                SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
586                SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
587                SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
588                SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
589                SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
590                SourceColumnDesc::simple(
591                    "o_timestampz_0",
592                    DataType::Timestamptz,
593                    ColumnId::from(7),
594                ),
595                SourceColumnDesc::simple(
596                    "o_timestampz_6",
597                    DataType::Timestamptz,
598                    ColumnId::from(8),
599                ),
600                SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
601                SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
602            ]
603        }
604
605        // schema for numeric-type test
606        fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
607            vec![
608                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
609                SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
610                SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
611                SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
612                SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
613                SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
614                SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
615                SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
616                SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
617            ]
618        }
619
620        // schema for the remaining types
621        fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
622            vec![
623                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
624                SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
625                SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
626                SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
627                SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
628                SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
629                SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
630                SourceColumnDesc {
631                    name: "o_point".to_owned(),
632                    data_type: DataType::Struct(StructType::new(vec![
633                        ("x", DataType::Float32),
634                        ("y", DataType::Float32),
635                    ])),
636                    column_id: 7.into(),
637                    column_type: SourceColumnType::Normal,
638                    is_pk: false,
639                    is_hidden_addition_col: false,
640                    additional_column: AdditionalColumn { column_type: None },
641                },
642                SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
643                SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
644                SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
645                SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
646                SourceColumnDesc::simple(
647                    "o_character_varying",
648                    DataType::Varchar,
649                    ColumnId::from(12),
650                ),
651            ]
652        }
653
654        #[tokio::test]
655        async fn test_temporal_types() {
656            // this test includes all supported temporal types, with the schema
657            // CREATE TABLE orders (
658            //     o_key integer,
659            //     o_time_0 time(0),
660            //     o_time_6 time(6),
661            //     o_timez_0 time(0) with time zone,
662            //     o_timez_6 time(6) with time zone,
663            //     o_timestamp_0 timestamp(0),
664            //     o_timestamp_6 timestamp(6),
665            //     o_timestampz_0 timestamp(0) with time zone,
666            //     o_timestampz_6 timestamp(6) with time zone,
667            //     o_interval interval,
668            //     o_date date,
669            //     PRIMARY KEY (o_key)
670            // );
671            // this test covers an insert event on the table above
672            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
673            let columns = get_temporal_test_columns();
674            let parser = build_parser(columns.clone()).await;
675            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
676                .await
677                .try_into()
678                .unwrap();
679            assert_eq!(op, Op::Insert);
680            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
681            assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
682                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
683            )))));
684            assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
685                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
686            )))));
687            assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
688                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
689            )))));
690            assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
691                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
692            )))));
693            assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
694                "2011-11-11T11:11:11".parse().unwrap()
695            )))));
696            assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
697                "2011-11-11T11:11:11.123456".parse().unwrap()
698            )))));
699            assert!(
700                row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
701                    14,
702                    3,
703                    14706780000
704                ))))
705            );
706            assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
707                NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
708            )))));
709        }
710
711        #[tokio::test]
712        async fn test_numeric_types() {
713            // this test includes all supported numeric types, with the schema
714            // CREATE TABLE orders (
715            //     o_key integer,
716            //     o_smallint smallint,
717            //     o_integer integer,
718            //     o_bigint bigint,
719            //     o_real real,
720            //     o_double double precision,
721            //     o_numeric numeric,
722            //     o_numeric_6_3 numeric(6,3),
723            //     o_money money,
724            //     PRIMARY KEY (o_key)
725            // );
726            // this test covers an insert event on the table above
727            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
728            let columns = get_numeric_test_columns();
729            let parser = build_parser(columns.clone()).await;
730            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
731                .await
732                .try_into()
733                .unwrap();
734            assert_eq!(op, Op::Insert);
735            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
736            assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
737            assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
738            assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
739            assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
740            assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
741            assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
742            assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
743            assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
744        }
745
746        #[tokio::test]
747        async fn test_other_types() {
748            // this test includes the remaining types, with the schema
749            // CREATE TABLE orders (
750            //     o_key integer,
751            //     o_boolean boolean,
752            //     o_bit bit,
753            //     o_bytea bytea,
754            //     o_json jsonb,
755            //     o_xml xml,
756            //     o_uuid uuid,
757            //     o_point point,
758            //     o_enum bear,
759            //     o_char char,
760            //     o_varchar varchar,
761            //     o_character character,
762            //     o_character_varying character varying,
763            //     PRIMARY KEY (o_key)
764            //  );
765            // this test covers an insert event on the table above
766            let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
767            let columns = get_other_types_test_columns();
768            let parser = build_parser(columns.clone()).await;
769            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
770                .await
771                .try_into()
772                .unwrap();
773            assert_eq!(op, Op::Insert);
774            assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
775            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
776            assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
777            assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
778                u8::from_str_radix("01", 16).unwrap(),
779                u8::from_str_radix("23", 16).unwrap(),
780                u8::from_str_radix("45", 16).unwrap(),
781                u8::from_str_radix("67", 16).unwrap(),
782                u8::from_str_radix("89", 16).unwrap(),
783                u8::from_str_radix("AB", 16).unwrap(),
784                u8::from_str_radix("CD", 16).unwrap(),
785                u8::from_str_radix("EF", 16).unwrap()
786            ])))));
787            assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
788            assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
789            assert!(row[6].eq(&Some(ScalarImpl::Utf8(
790                "60f14fe2-f857-404a-b586-3b5375b3259f".into()
791            ))));
792            assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
793                Some(ScalarImpl::Float32(1.into())),
794                Some(ScalarImpl::Float32(2.into()))
795            ])))));
796            assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
797            assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
798            assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
799            assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
800            assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
801        }
802    }
803}