risingwave_connector/parser/debezium/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25    JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling,
26};
27use crate::parser::{AccessBuilder, MongoProperties};
28
29#[derive(Debug)]
30pub struct DebeziumJsonAccessBuilder {
31    value: Option<Vec<u8>>,
32    json_parse_options: JsonParseOptions,
33}
34
35impl DebeziumJsonAccessBuilder {
36    pub fn new(
37        timestamptz_handling: TimestamptzHandling,
38        timestamp_handling: TimestampHandling,
39        time_handling: TimeHandling,
40        handle_toast_columns: bool,
41    ) -> ConnectorResult<Self> {
42        Ok(Self {
43            value: None,
44            json_parse_options: JsonParseOptions::new_for_debezium(
45                timestamptz_handling,
46                timestamp_handling,
47                time_handling,
48                handle_toast_columns,
49            ),
50        })
51    }
52
53    pub fn new_for_schema_event() -> ConnectorResult<Self> {
54        Ok(Self {
55            value: None,
56            json_parse_options: JsonParseOptions::default(),
57        })
58    }
59}
60
61impl AccessBuilder for DebeziumJsonAccessBuilder {
62    #[allow(clippy::unused_async)]
63    async fn generate_accessor(
64        &mut self,
65        payload: Vec<u8>,
66        _: &crate::source::SourceMeta,
67    ) -> ConnectorResult<AccessImpl<'_>> {
68        self.value = Some(payload);
69        let mut event: BorrowedValue<'_> =
70            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
71                .context("failed to parse debezium json payload")?;
72
73        let payload = if let Some(payload) = event.get_mut("payload") {
74            std::mem::take(payload)
75        } else {
76            event
77        };
78
79        Ok(AccessImpl::Json(JsonAccess::new_with_options(
80            payload,
81            &self.json_parse_options,
82        )))
83    }
84}
85
86#[derive(Debug)]
87pub struct DebeziumMongoJsonAccessBuilder {
88    value: Option<Vec<u8>>,
89    json_parse_options: JsonParseOptions,
90    strong_schema: bool,
91}
92
93impl DebeziumMongoJsonAccessBuilder {
94    pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
95        Ok(Self {
96            value: None,
97            json_parse_options: JsonParseOptions::new_for_debezium(
98                TimestamptzHandling::GuessNumberUnit,
99                TimestampHandling::GuessNumberUnit,
100                TimeHandling::Micro,
101                false,
102            ),
103            strong_schema: props.strong_schema,
104        })
105    }
106}
107
108impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
109    #[allow(clippy::unused_async)]
110    async fn generate_accessor(
111        &mut self,
112        payload: Vec<u8>,
113        _: &crate::source::SourceMeta,
114    ) -> ConnectorResult<AccessImpl<'_>> {
115        self.value = Some(payload);
116        let mut event: BorrowedValue<'_> =
117            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
118                .context("failed to parse debezium mongo json payload")?;
119
120        let payload = if let Some(payload) = event.get_mut("payload") {
121            std::mem::take(payload)
122        } else {
123            event
124        };
125
126        Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
127            JsonAccess::new_with_options(payload, &self.json_parse_options),
128            self.strong_schema,
129        )))
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use chrono::{NaiveDate, NaiveTime};
136    use risingwave_common::array::{Op, StructValue};
137    use risingwave_common::catalog::ColumnId;
138    use risingwave_common::row::{OwnedRow, Row};
139    use risingwave_common::types::{
140        DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
141    };
142    use serde_json::Value;
143    use thiserror_ext::AsReport;
144
145    use crate::parser::{
146        DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
147        SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
148    };
149    use crate::source::{SourceContext, SourceCtrlOpts};
150
151    fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
152        if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
153            let mut json_string = String::new();
154            json_val
155                .as_scalar_ref()
156                .force_str(&mut json_string)
157                .unwrap();
158            let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
159            let val2: Value = serde_json::from_str(json_str).unwrap();
160            assert_eq!(val1, val2);
161        }
162    }
163
164    async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
165        let props = SpecificParserConfig {
166            encoding_config: EncodingProperties::Json(JsonProperties {
167                use_schema_registry: false,
168                timestamptz_handling: None,
169                timestamp_handling: None,
170                time_handling: None,
171                handle_toast_columns: false,
172            }),
173            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
174        };
175        DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
176            .await
177            .unwrap()
178    }
179
180    async fn parse_one(
181        mut parser: DebeziumParser,
182        columns: Vec<SourceColumnDesc>,
183        payload: Vec<u8>,
184    ) -> Vec<(Op, OwnedRow)> {
185        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
186        parser
187            .parse_inner(None, Some(payload), builder.row_writer())
188            .await
189            .unwrap();
190        builder.finish_current_chunk();
191        let chunk = builder.consume_ready_chunks().next().unwrap();
192        chunk
193            .rows()
194            .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
195            .collect::<Vec<_>>()
196    }
197
198    mod test1_basic {
199        use super::*;
200
201        fn get_test1_columns() -> Vec<SourceColumnDesc> {
202            vec![
203                SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
204                SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
205                SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
206                SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
207            ]
208        }
209
210        #[tokio::test]
211        async fn test1_debezium_json_parser_read() {
212            //     "before": null,
213            //     "after": {
214            //       "id": 101,
215            //       "name": "scooter",
216            //       "description": "Small 2-wheel scooter",
217            //       "weight": 1.234
218            //     },
219            let input = vec![
220                // data with payload field
221                br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
222                // data without payload field
223                br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
224
225            let columns = get_test1_columns();
226
227            for data in input {
228                let parser = build_parser(columns.clone()).await;
229                let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
230                    .await
231                    .try_into()
232                    .unwrap();
233
234                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
235                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
236                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
237                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
238            }
239        }
240
241        #[tokio::test]
242        async fn test1_debezium_json_parser_insert() {
243            //     "before": null,
244            //     "after": {
245            //       "id": 102,
246            //       "name": "car battery",
247            //       "description": "12V car battery",
248            //       "weight": 8.1
249            //     },
250            let input = vec![
251                // data with payload field
252                br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
253                // data without payload field
254                br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
255
256            let columns = get_test1_columns();
257
258            for data in input {
259                let parser = build_parser(columns.clone()).await;
260                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
261                    .await
262                    .try_into()
263                    .unwrap();
264                assert_eq!(op, Op::Insert);
265
266                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
267                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
268                assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
269                assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
270            }
271        }
272
273        #[tokio::test]
274        async fn test1_debezium_json_parser_delete() {
275            //     "before": {
276            //       "id": 101,
277            //       "name": "scooter",
278            //       "description": "Small 2-wheel scooter",
279            //       "weight": 1.234
280            //     },
281            //     "after": null,
282            let input = vec![
283                // data with payload field
284                br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
285                // data without payload field
286                br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
287
288            for data in input {
289                let columns = get_test1_columns();
290                let parser = build_parser(columns.clone()).await;
291                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
292                    .await
293                    .try_into()
294                    .unwrap();
295
296                assert_eq!(op, Op::Delete);
297
298                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
299                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
300                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
301                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
302            }
303        }
304
305        #[tokio::test]
306        async fn test1_debezium_json_parser_update() {
307            //     "before": {
308            //       "id": 102,
309            //       "name": "car battery",
310            //       "description": "12V car battery",
311            //       "weight": 8.1
312            //     },
313            //     "after": {
314            //       "id": 102,
315            //       "name": "car battery",
316            //       "description": "24V car battery",
317            //       "weight": 9.1
318            //     },
319            let input = vec![
320                // data with payload field
321                br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
322                // data without payload field
323                br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
324
325            let columns = get_test1_columns();
326
327            for data in input {
328                let parser = build_parser(columns.clone()).await;
329                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
330                    .await
331                    .try_into()
332                    .unwrap();
333
334                assert_eq!(op, Op::Insert);
335
336                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
337                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
338                assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
339                assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
340            }
341        }
342    }
343    // test2 covers read/insert/update/delete event on the following MySQL table for debezium json:
344    // CREATE TABLE IF NOT EXISTS orders (
345    //     O_KEY BIGINT NOT NULL,
346    //     O_BOOL BOOLEAN,
347    //     O_TINY TINYINT,
348    //     O_INT INT,
349    //     O_REAL REAL,
350    //     O_DOUBLE DOUBLE,
351    //     O_DECIMAL DECIMAL(15, 2),
352    //     O_CHAR CHAR(15),
353    //     O_DATE DATE,
354    //     O_TIME TIME,
355    //     O_DATETIME DATETIME,
356    //     O_TIMESTAMP TIMESTAMP,
357    //     O_JSON JSON,
358    //     PRIMARY KEY (O_KEY));
359    // test2 also covers overflow tests on basic types
360    mod test2_mysql {
361        use super::*;
362
363        fn get_test2_columns() -> Vec<SourceColumnDesc> {
364            vec![
365                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
366                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
367                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
368                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
369                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
370                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
371                SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
372                SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
373                SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
374                SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
375                SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
376                SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
377                SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
378            ]
379        }
380
381        #[tokio::test]
382        async fn test2_debezium_json_parser_read() {
383            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
384
385            let columns = get_test2_columns();
386
387            let parser = build_parser(columns.clone()).await;
388
389            let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
390                .await
391                .try_into()
392                .unwrap();
393
394            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
395            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
396            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
397            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
398            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
399            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
400            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
401            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
402            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
403                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
404            )))));
405            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
406                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
407            )))));
408            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
409                "1970-01-01T00:00:00".parse().unwrap()
410            )))));
411            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
412                "1970-01-01T00:00:01Z".parse().unwrap()
413            ))));
414            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
415        }
416
417        #[tokio::test]
418        async fn test2_debezium_json_parser_insert() {
419            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
420
421            let columns = get_test2_columns();
422            let parser = build_parser(columns.clone()).await;
423            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
424                .await
425                .try_into()
426                .unwrap();
427            assert_eq!(op, Op::Insert);
428
429            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
430            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
431            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
432            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
433            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
434            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
435            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
436            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
437            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
438                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
439            )))));
440            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
441                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
442            )))));
443            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
444                "1970-01-01T00:00:00".parse().unwrap()
445            )))));
446            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
447                "1970-01-01T00:00:01Z".parse().unwrap()
448            ))));
449            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
450        }
451
452        #[tokio::test]
453        async fn test2_debezium_json_parser_delete() {
454            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
455
456            let columns = get_test2_columns();
457            let parser = build_parser(columns.clone()).await;
458            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
459                .await
460                .try_into()
461                .unwrap();
462
463            assert_eq!(op, Op::Delete);
464
465            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
466            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
467            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
468            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
469            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
470            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
471            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
472            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
473            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
474                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
475            )))));
476            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
477                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
478            )))));
479            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
480                "5138-11-16T09:46:39".parse().unwrap()
481            )))));
482            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
483                "2038-01-09T03:14:07Z".parse().unwrap()
484            ))));
485            assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
486        }
487
488        #[tokio::test]
489        async fn test2_debezium_json_parser_update() {
490            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
491
492            let columns = get_test2_columns();
493
494            let parser = build_parser(columns.clone()).await;
495            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
496                .await
497                .try_into()
498                .unwrap();
499
500            assert_eq!(op, Op::Insert);
501
502            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
503            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
504            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
505            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
506            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
507            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
508            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
509            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
510            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
511                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
512            )))));
513            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
514                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
515            )))));
516            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
517                "5138-11-16T09:46:39".parse().unwrap()
518            )))));
519            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
520                "2038-01-09T03:14:07Z".parse().unwrap()
521            ))));
522            assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
523        }
524
525        #[cfg(not(madsim))] // Traced test does not work with madsim
526        #[tokio::test]
527        #[tracing_test::traced_test]
528        async fn test2_debezium_json_parser_overflow() {
529            let columns = vec![
530                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
531                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
532                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
533                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
534                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
535                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
536            ];
537            let mut parser = build_parser(columns.clone()).await;
538
539            let mut dummy_builder =
540                SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
541
542            let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
543            let overflow_values = [
544                "9223372036854775808",
545                "2",
546                "32768",
547                "2147483648",
548                "3.80282347E38",
549                "1.797695E308",
550            ];
551
552            for i in 0..6 {
553                let mut values = normal_values;
554                values[i] = overflow_values[i];
555                let data = format!(
556                    r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
557                    values[0], values[1], values[2], values[3], values[4], values[5]
558                ).as_bytes().to_vec();
559
560                let res = parser
561                    .parse_inner(None, Some(data), dummy_builder.row_writer())
562                    .await;
563                if i < 5 {
564                    // For other overflow, the parsing succeeds but the type conversion fails
565                    // The errors are ignored and logged.
566                    res.unwrap();
567                    assert!(logs_contain("expected type"), "{i}");
568                } else {
569                    // For f64 overflow, the parsing fails
570                    let e = res.unwrap_err();
571                    assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
572                }
573            }
574        }
575    }
576
577    // postgres-specific data-type mapping tests
578    mod test3_postgres {
579        use risingwave_pb::plan_common::AdditionalColumn;
580
581        use super::*;
582        use crate::source::SourceColumnType;
583
584        // schema for temporal-type test
585        fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
586            vec![
587                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
588                SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
589                SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
590                SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
591                SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
592                SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
593                SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
594                SourceColumnDesc::simple(
595                    "o_timestampz_0",
596                    DataType::Timestamptz,
597                    ColumnId::from(7),
598                ),
599                SourceColumnDesc::simple(
600                    "o_timestampz_6",
601                    DataType::Timestamptz,
602                    ColumnId::from(8),
603                ),
604                SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
605                SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
606            ]
607        }
608
609        // schema for numeric-type test
610        fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
611            vec![
612                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
613                SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
614                SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
615                SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
616                SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
617                SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
618                SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
619                SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
620                SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
621            ]
622        }
623
624        // schema for the remaining types
625        fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
626            vec![
627                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
628                SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
629                SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
630                SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
631                SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
632                SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
633                SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
634                SourceColumnDesc {
635                    name: "o_point".to_owned(),
636                    data_type: DataType::Struct(StructType::new(vec![
637                        ("x", DataType::Float32),
638                        ("y", DataType::Float32),
639                    ])),
640                    column_id: 7.into(),
641                    column_type: SourceColumnType::Normal,
642                    is_pk: false,
643                    is_hidden_addition_col: false,
644                    additional_column: AdditionalColumn { column_type: None },
645                },
646                SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
647                SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
648                SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
649                SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
650                SourceColumnDesc::simple(
651                    "o_character_varying",
652                    DataType::Varchar,
653                    ColumnId::from(12),
654                ),
655            ]
656        }
657
658        #[tokio::test]
659        async fn test_temporal_types() {
660            // this test includes all supported temporal types, with the schema
661            // CREATE TABLE orders (
662            //     o_key integer,
663            //     o_time_0 time(0),
664            //     o_time_6 time(6),
665            //     o_timez_0 time(0) with time zone,
666            //     o_timez_6 time(6) with time zone,
667            //     o_timestamp_0 timestamp(0),
668            //     o_timestamp_6 timestamp(6),
669            //     o_timestampz_0 timestamp(0) with time zone,
670            //     o_timestampz_6 timestamp(6) with time zone,
671            //     o_interval interval,
672            //     o_date date,
673            //     PRIMARY KEY (o_key)
674            // );
675            // this test covers an insert event on the table above
676            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
677            let columns = get_temporal_test_columns();
678            let parser = build_parser(columns.clone()).await;
679            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
680                .await
681                .try_into()
682                .unwrap();
683            assert_eq!(op, Op::Insert);
684            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
685            assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
686                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
687            )))));
688            assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
689                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
690            )))));
691            assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
692                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
693            )))));
694            assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
695                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
696            )))));
697            assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
698                "2011-11-11T11:11:11".parse().unwrap()
699            )))));
700            assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
701                "2011-11-11T11:11:11.123456".parse().unwrap()
702            )))));
703            assert!(
704                row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
705                    14,
706                    3,
707                    14706780000
708                ))))
709            );
710            assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
711                NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
712            )))));
713        }
714
715        #[tokio::test]
716        async fn test_numeric_types() {
717            // this test includes all supported numeric types, with the schema
718            // CREATE TABLE orders (
719            //     o_key integer,
720            //     o_smallint smallint,
721            //     o_integer integer,
722            //     o_bigint bigint,
723            //     o_real real,
724            //     o_double double precision,
725            //     o_numeric numeric,
726            //     o_numeric_6_3 numeric(6,3),
727            //     o_money money,
728            //     PRIMARY KEY (o_key)
729            // );
730            // this test covers an insert event on the table above
731            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
732            let columns = get_numeric_test_columns();
733            let parser = build_parser(columns.clone()).await;
734            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
735                .await
736                .try_into()
737                .unwrap();
738            assert_eq!(op, Op::Insert);
739            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
740            assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
741            assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
742            assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
743            assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
744            assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
745            assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
746            assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
747            assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
748        }
749
750        #[tokio::test]
751        async fn test_other_types() {
752            // this test includes the remaining types, with the schema
753            // CREATE TABLE orders (
754            //     o_key integer,
755            //     o_boolean boolean,
756            //     o_bit bit,
757            //     o_bytea bytea,
758            //     o_json jsonb,
759            //     o_xml xml,
760            //     o_uuid uuid,
761            //     o_point point,
762            //     o_enum bear,
763            //     o_char char,
764            //     o_varchar varchar,
765            //     o_character character,
766            //     o_character_varying character varying,
767            //     PRIMARY KEY (o_key)
768            //  );
769            // this test covers an insert event on the table above
770            let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
771            let columns = get_other_types_test_columns();
772            let parser = build_parser(columns.clone()).await;
773            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
774                .await
775                .try_into()
776                .unwrap();
777            assert_eq!(op, Op::Insert);
778            assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
779            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
780            assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
781            assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
782                u8::from_str_radix("01", 16).unwrap(),
783                u8::from_str_radix("23", 16).unwrap(),
784                u8::from_str_radix("45", 16).unwrap(),
785                u8::from_str_radix("67", 16).unwrap(),
786                u8::from_str_radix("89", 16).unwrap(),
787                u8::from_str_radix("AB", 16).unwrap(),
788                u8::from_str_radix("CD", 16).unwrap(),
789                u8::from_str_radix("EF", 16).unwrap()
790            ])))));
791            assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
792            assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
793            assert!(row[6].eq(&Some(ScalarImpl::Utf8(
794                "60f14fe2-f857-404a-b586-3b5375b3259f".into()
795            ))));
796            assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
797                Some(ScalarImpl::Float32(1.into())),
798                Some(ScalarImpl::Float32(2.into()))
799            ])))));
800            assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
801            assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
802            assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
803            assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
804            assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
805        }
806    }
807}