risingwave_connector/parser/debezium/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{JsonAccess, JsonParseOptions, TimestamptzHandling};
25use crate::parser::{AccessBuilder, MongoProperties};
26
27#[derive(Debug)]
28pub struct DebeziumJsonAccessBuilder {
29    value: Option<Vec<u8>>,
30    json_parse_options: JsonParseOptions,
31}
32
33impl DebeziumJsonAccessBuilder {
34    pub fn new(timestamptz_handling: TimestamptzHandling) -> ConnectorResult<Self> {
35        Ok(Self {
36            value: None,
37            json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling),
38        })
39    }
40
41    pub fn new_for_schema_event() -> ConnectorResult<Self> {
42        Ok(Self {
43            value: None,
44            json_parse_options: JsonParseOptions::default(),
45        })
46    }
47}
48
49impl AccessBuilder for DebeziumJsonAccessBuilder {
50    #[allow(clippy::unused_async)]
51    async fn generate_accessor(
52        &mut self,
53        payload: Vec<u8>,
54        _: &crate::source::SourceMeta,
55    ) -> ConnectorResult<AccessImpl<'_>> {
56        self.value = Some(payload);
57        let mut event: BorrowedValue<'_> =
58            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
59                .context("failed to parse debezium json payload")?;
60
61        let payload = if let Some(payload) = event.get_mut("payload") {
62            std::mem::take(payload)
63        } else {
64            event
65        };
66
67        Ok(AccessImpl::Json(JsonAccess::new_with_options(
68            payload,
69            &self.json_parse_options,
70        )))
71    }
72}
73
74#[derive(Debug)]
75pub struct DebeziumMongoJsonAccessBuilder {
76    value: Option<Vec<u8>>,
77    json_parse_options: JsonParseOptions,
78    strong_schema: bool,
79}
80
81impl DebeziumMongoJsonAccessBuilder {
82    pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
83        Ok(Self {
84            value: None,
85            json_parse_options: JsonParseOptions::new_for_debezium(
86                TimestamptzHandling::GuessNumberUnit,
87            ),
88            strong_schema: props.strong_schema,
89        })
90    }
91}
92
93impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
94    #[allow(clippy::unused_async)]
95    async fn generate_accessor(
96        &mut self,
97        payload: Vec<u8>,
98        _: &crate::source::SourceMeta,
99    ) -> ConnectorResult<AccessImpl<'_>> {
100        self.value = Some(payload);
101        let mut event: BorrowedValue<'_> =
102            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
103                .context("failed to parse debezium mongo json payload")?;
104
105        let payload = if let Some(payload) = event.get_mut("payload") {
106            std::mem::take(payload)
107        } else {
108            event
109        };
110
111        Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
112            JsonAccess::new_with_options(payload, &self.json_parse_options),
113            self.strong_schema,
114        )))
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use chrono::{NaiveDate, NaiveTime};
121    use risingwave_common::array::{Op, StructValue};
122    use risingwave_common::catalog::ColumnId;
123    use risingwave_common::row::{OwnedRow, Row};
124    use risingwave_common::types::{
125        DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
126    };
127    use serde_json::Value;
128    use thiserror_ext::AsReport;
129
130    use crate::parser::{
131        DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
132        SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
133    };
134    use crate::source::{SourceContext, SourceCtrlOpts};
135
136    fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
137        if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
138            let mut json_string = String::new();
139            json_val
140                .as_scalar_ref()
141                .force_str(&mut json_string)
142                .unwrap();
143            let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
144            let val2: Value = serde_json::from_str(json_str).unwrap();
145            assert_eq!(val1, val2);
146        }
147    }
148
149    async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
150        let props = SpecificParserConfig {
151            encoding_config: EncodingProperties::Json(JsonProperties {
152                use_schema_registry: false,
153                timestamptz_handling: None,
154            }),
155            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
156        };
157        DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
158            .await
159            .unwrap()
160    }
161
162    async fn parse_one(
163        mut parser: DebeziumParser,
164        columns: Vec<SourceColumnDesc>,
165        payload: Vec<u8>,
166    ) -> Vec<(Op, OwnedRow)> {
167        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
168        parser
169            .parse_inner(None, Some(payload), builder.row_writer())
170            .await
171            .unwrap();
172        builder.finish_current_chunk();
173        let chunk = builder.consume_ready_chunks().next().unwrap();
174        chunk
175            .rows()
176            .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
177            .collect::<Vec<_>>()
178    }
179
180    mod test1_basic {
181        use super::*;
182
183        fn get_test1_columns() -> Vec<SourceColumnDesc> {
184            vec![
185                SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
186                SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
187                SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
188                SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
189            ]
190        }
191
192        #[tokio::test]
193        async fn test1_debezium_json_parser_read() {
194            //     "before": null,
195            //     "after": {
196            //       "id": 101,
197            //       "name": "scooter",
198            //       "description": "Small 2-wheel scooter",
199            //       "weight": 1.234
200            //     },
201            let input = vec![
202                // data with payload field
203                br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
204                // data without payload field
205                br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
206
207            let columns = get_test1_columns();
208
209            for data in input {
210                let parser = build_parser(columns.clone()).await;
211                let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
212                    .await
213                    .try_into()
214                    .unwrap();
215
216                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
217                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
218                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
219                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
220            }
221        }
222
223        #[tokio::test]
224        async fn test1_debezium_json_parser_insert() {
225            //     "before": null,
226            //     "after": {
227            //       "id": 102,
228            //       "name": "car battery",
229            //       "description": "12V car battery",
230            //       "weight": 8.1
231            //     },
232            let input = vec![
233                // data with payload field
234                br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
235                // data without payload field
236                br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
237
238            let columns = get_test1_columns();
239
240            for data in input {
241                let parser = build_parser(columns.clone()).await;
242                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
243                    .await
244                    .try_into()
245                    .unwrap();
246                assert_eq!(op, Op::Insert);
247
248                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
249                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
250                assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
251                assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
252            }
253        }
254
255        #[tokio::test]
256        async fn test1_debezium_json_parser_delete() {
257            //     "before": {
258            //       "id": 101,
259            //       "name": "scooter",
260            //       "description": "Small 2-wheel scooter",
261            //       "weight": 1.234
262            //     },
263            //     "after": null,
264            let input = vec![
265                // data with payload field
266                br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
267                // data without payload field
268                br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
269
270            for data in input {
271                let columns = get_test1_columns();
272                let parser = build_parser(columns.clone()).await;
273                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
274                    .await
275                    .try_into()
276                    .unwrap();
277
278                assert_eq!(op, Op::Delete);
279
280                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
281                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
282                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
283                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
284            }
285        }
286
287        #[tokio::test]
288        async fn test1_debezium_json_parser_update() {
289            //     "before": {
290            //       "id": 102,
291            //       "name": "car battery",
292            //       "description": "12V car battery",
293            //       "weight": 8.1
294            //     },
295            //     "after": {
296            //       "id": 102,
297            //       "name": "car battery",
298            //       "description": "24V car battery",
299            //       "weight": 9.1
300            //     },
301            let input = vec![
302                // data with payload field
303                br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
304                // data without payload field
305                br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
306
307            let columns = get_test1_columns();
308
309            for data in input {
310                let parser = build_parser(columns.clone()).await;
311                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
312                    .await
313                    .try_into()
314                    .unwrap();
315
316                assert_eq!(op, Op::Insert);
317
318                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
319                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
320                assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
321                assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
322            }
323        }
324    }
325    // test2 covers read/insert/update/delete event on the following MySQL table for debezium json:
326    // CREATE TABLE IF NOT EXISTS orders (
327    //     O_KEY BIGINT NOT NULL,
328    //     O_BOOL BOOLEAN,
329    //     O_TINY TINYINT,
330    //     O_INT INT,
331    //     O_REAL REAL,
332    //     O_DOUBLE DOUBLE,
333    //     O_DECIMAL DECIMAL(15, 2),
334    //     O_CHAR CHAR(15),
335    //     O_DATE DATE,
336    //     O_TIME TIME,
337    //     O_DATETIME DATETIME,
338    //     O_TIMESTAMP TIMESTAMP,
339    //     O_JSON JSON,
340    //     PRIMARY KEY (O_KEY));
341    // test2 also covers overflow tests on basic types
342    mod test2_mysql {
343        use super::*;
344
345        fn get_test2_columns() -> Vec<SourceColumnDesc> {
346            vec![
347                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
348                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
349                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
350                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
351                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
352                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
353                SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
354                SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
355                SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
356                SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
357                SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
358                SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
359                SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
360            ]
361        }
362
363        #[tokio::test]
364        async fn test2_debezium_json_parser_read() {
365            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
366
367            let columns = get_test2_columns();
368
369            let parser = build_parser(columns.clone()).await;
370
371            let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
372                .await
373                .try_into()
374                .unwrap();
375
376            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
377            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
378            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
379            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
380            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
381            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
382            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
383            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
384            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
385                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
386            )))));
387            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
388                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
389            )))));
390            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
391                "1970-01-01T00:00:00".parse().unwrap()
392            )))));
393            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
394                "1970-01-01T00:00:01Z".parse().unwrap()
395            ))));
396            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
397        }
398
399        #[tokio::test]
400        async fn test2_debezium_json_parser_insert() {
401            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
402
403            let columns = get_test2_columns();
404            let parser = build_parser(columns.clone()).await;
405            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
406                .await
407                .try_into()
408                .unwrap();
409            assert_eq!(op, Op::Insert);
410
411            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
412            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
413            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
414            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
415            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
416            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
417            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
418            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
419            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
420                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
421            )))));
422            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
423                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
424            )))));
425            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
426                "1970-01-01T00:00:00".parse().unwrap()
427            )))));
428            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
429                "1970-01-01T00:00:01Z".parse().unwrap()
430            ))));
431            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
432        }
433
434        #[tokio::test]
435        async fn test2_debezium_json_parser_delete() {
436            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
437
438            let columns = get_test2_columns();
439            let parser = build_parser(columns.clone()).await;
440            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
441                .await
442                .try_into()
443                .unwrap();
444
445            assert_eq!(op, Op::Delete);
446
447            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
448            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
449            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
450            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
451            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
452            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
453            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
454            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
455            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
456                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
457            )))));
458            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
459                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
460            )))));
461            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
462                "5138-11-16T09:46:39".parse().unwrap()
463            )))));
464            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
465                "2038-01-09T03:14:07Z".parse().unwrap()
466            ))));
467            assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
468        }
469
470        #[tokio::test]
471        async fn test2_debezium_json_parser_update() {
472            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
473
474            let columns = get_test2_columns();
475
476            let parser = build_parser(columns.clone()).await;
477            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
478                .await
479                .try_into()
480                .unwrap();
481
482            assert_eq!(op, Op::Insert);
483
484            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
485            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
486            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
487            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
488            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
489            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
490            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
491            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
492            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
493                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
494            )))));
495            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
496                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
497            )))));
498            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
499                "5138-11-16T09:46:39".parse().unwrap()
500            )))));
501            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
502                "2038-01-09T03:14:07Z".parse().unwrap()
503            ))));
504            assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
505        }
506
507        #[cfg(not(madsim))] // Traced test does not work with madsim
508        #[tokio::test]
509        #[tracing_test::traced_test]
510        async fn test2_debezium_json_parser_overflow() {
511            let columns = vec![
512                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
513                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
514                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
515                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
516                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
517                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
518            ];
519            let mut parser = build_parser(columns.clone()).await;
520
521            let mut dummy_builder =
522                SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
523
524            let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
525            let overflow_values = [
526                "9223372036854775808",
527                "2",
528                "32768",
529                "2147483648",
530                "3.80282347E38",
531                "1.797695E308",
532            ];
533
534            for i in 0..6 {
535                let mut values = normal_values;
536                values[i] = overflow_values[i];
537                let data = format!(
538                    r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
539                    values[0], values[1], values[2], values[3], values[4], values[5]
540                ).as_bytes().to_vec();
541
542                let res = parser
543                    .parse_inner(None, Some(data), dummy_builder.row_writer())
544                    .await;
545                if i < 5 {
546                    // For other overflow, the parsing succeeds but the type conversion fails
547                    // The errors are ignored and logged.
548                    res.unwrap();
549                    assert!(logs_contain("expected type"), "{i}");
550                } else {
551                    // For f64 overflow, the parsing fails
552                    let e = res.unwrap_err();
553                    assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
554                }
555            }
556        }
557    }
558
559    // postgres-specific data-type mapping tests
560    mod test3_postgres {
561        use risingwave_pb::plan_common::AdditionalColumn;
562
563        use super::*;
564        use crate::source::SourceColumnType;
565
566        // schema for temporal-type test
567        fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
568            vec![
569                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
570                SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
571                SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
572                SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
573                SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
574                SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
575                SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
576                SourceColumnDesc::simple(
577                    "o_timestampz_0",
578                    DataType::Timestamptz,
579                    ColumnId::from(7),
580                ),
581                SourceColumnDesc::simple(
582                    "o_timestampz_6",
583                    DataType::Timestamptz,
584                    ColumnId::from(8),
585                ),
586                SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
587                SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
588            ]
589        }
590
591        // schema for numeric-type test
592        fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
593            vec![
594                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
595                SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
596                SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
597                SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
598                SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
599                SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
600                SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
601                SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
602                SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
603            ]
604        }
605
606        // schema for the remaining types
607        fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
608            vec![
609                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
610                SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
611                SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
612                SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
613                SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
614                SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
615                SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
616                SourceColumnDesc {
617                    name: "o_point".to_owned(),
618                    data_type: DataType::Struct(StructType::new(vec![
619                        ("x", DataType::Float32),
620                        ("y", DataType::Float32),
621                    ])),
622                    column_id: 7.into(),
623                    column_type: SourceColumnType::Normal,
624                    is_pk: false,
625                    is_hidden_addition_col: false,
626                    additional_column: AdditionalColumn { column_type: None },
627                },
628                SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
629                SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
630                SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
631                SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
632                SourceColumnDesc::simple(
633                    "o_character_varying",
634                    DataType::Varchar,
635                    ColumnId::from(12),
636                ),
637            ]
638        }
639
640        #[tokio::test]
641        async fn test_temporal_types() {
642            // this test includes all supported temporal types, with the schema
643            // CREATE TABLE orders (
644            //     o_key integer,
645            //     o_time_0 time(0),
646            //     o_time_6 time(6),
647            //     o_timez_0 time(0) with time zone,
648            //     o_timez_6 time(6) with time zone,
649            //     o_timestamp_0 timestamp(0),
650            //     o_timestamp_6 timestamp(6),
651            //     o_timestampz_0 timestamp(0) with time zone,
652            //     o_timestampz_6 timestamp(6) with time zone,
653            //     o_interval interval,
654            //     o_date date,
655            //     PRIMARY KEY (o_key)
656            // );
657            // this test covers an insert event on the table above
658            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
659            let columns = get_temporal_test_columns();
660            let parser = build_parser(columns.clone()).await;
661            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
662                .await
663                .try_into()
664                .unwrap();
665            assert_eq!(op, Op::Insert);
666            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
667            assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
668                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
669            )))));
670            assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
671                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
672            )))));
673            assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
674                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
675            )))));
676            assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
677                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
678            )))));
679            assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
680                "2011-11-11T11:11:11".parse().unwrap()
681            )))));
682            assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
683                "2011-11-11T11:11:11.123456".parse().unwrap()
684            )))));
685            assert!(
686                row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
687                    14,
688                    3,
689                    14706780000
690                ))))
691            );
692            assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
693                NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
694            )))));
695        }
696
697        #[tokio::test]
698        async fn test_numeric_types() {
699            // this test includes all supported numeric types, with the schema
700            // CREATE TABLE orders (
701            //     o_key integer,
702            //     o_smallint smallint,
703            //     o_integer integer,
704            //     o_bigint bigint,
705            //     o_real real,
706            //     o_double double precision,
707            //     o_numeric numeric,
708            //     o_numeric_6_3 numeric(6,3),
709            //     o_money money,
710            //     PRIMARY KEY (o_key)
711            // );
712            // this test covers an insert event on the table above
713            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
714            let columns = get_numeric_test_columns();
715            let parser = build_parser(columns.clone()).await;
716            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
717                .await
718                .try_into()
719                .unwrap();
720            assert_eq!(op, Op::Insert);
721            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
722            assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
723            assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
724            assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
725            assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
726            assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
727            assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
728            assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
729            assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
730        }
731
732        #[tokio::test]
733        async fn test_other_types() {
734            // this test includes the remaining types, with the schema
735            // CREATE TABLE orders (
736            //     o_key integer,
737            //     o_boolean boolean,
738            //     o_bit bit,
739            //     o_bytea bytea,
740            //     o_json jsonb,
741            //     o_xml xml,
742            //     o_uuid uuid,
743            //     o_point point,
744            //     o_enum bear,
745            //     o_char char,
746            //     o_varchar varchar,
747            //     o_character character,
748            //     o_character_varying character varying,
749            //     PRIMARY KEY (o_key)
750            //  );
751            // this test covers an insert event on the table above
752            let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
753            let columns = get_other_types_test_columns();
754            let parser = build_parser(columns.clone()).await;
755            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
756                .await
757                .try_into()
758                .unwrap();
759            assert_eq!(op, Op::Insert);
760            assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
761            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
762            assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
763            assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
764                u8::from_str_radix("01", 16).unwrap(),
765                u8::from_str_radix("23", 16).unwrap(),
766                u8::from_str_radix("45", 16).unwrap(),
767                u8::from_str_radix("67", 16).unwrap(),
768                u8::from_str_radix("89", 16).unwrap(),
769                u8::from_str_radix("AB", 16).unwrap(),
770                u8::from_str_radix("CD", 16).unwrap(),
771                u8::from_str_radix("EF", 16).unwrap()
772            ])))));
773            assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
774            assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
775            assert!(row[6].eq(&Some(ScalarImpl::Utf8(
776                "60f14fe2-f857-404a-b586-3b5375b3259f".into()
777            ))));
778            assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
779                Some(ScalarImpl::Float32(1.into())),
780                Some(ScalarImpl::Float32(2.into()))
781            ])))));
782            assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
783            assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
784            assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
785            assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
786            assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
787        }
788    }
789}