risingwave_connector/parser/debezium/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25    BigintUnsignedHandlingMode, JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling,
26    TimestamptzHandling,
27};
28use crate::parser::{AccessBuilder, MongoProperties};
29
30#[derive(Debug)]
31pub struct DebeziumJsonAccessBuilder {
32    value: Option<Vec<u8>>,
33    json_parse_options: JsonParseOptions,
34}
35
36impl DebeziumJsonAccessBuilder {
37    pub fn new(
38        timestamptz_handling: TimestamptzHandling,
39        timestamp_handling: TimestampHandling,
40        time_handling: TimeHandling,
41        bigint_unsigned_handling: BigintUnsignedHandlingMode,
42        handle_toast_columns: bool,
43    ) -> ConnectorResult<Self> {
44        Ok(Self {
45            value: None,
46            json_parse_options: JsonParseOptions::new_for_debezium(
47                timestamptz_handling,
48                timestamp_handling,
49                time_handling,
50                bigint_unsigned_handling,
51                handle_toast_columns,
52            ),
53        })
54    }
55
56    pub fn new_for_schema_event() -> ConnectorResult<Self> {
57        Ok(Self {
58            value: None,
59            json_parse_options: JsonParseOptions::default(),
60        })
61    }
62}
63
64impl AccessBuilder for DebeziumJsonAccessBuilder {
65    #[allow(clippy::unused_async)]
66    async fn generate_accessor(
67        &mut self,
68        payload: Vec<u8>,
69        _: &crate::source::SourceMeta,
70    ) -> ConnectorResult<AccessImpl<'_>> {
71        self.value = Some(payload);
72        let mut event: BorrowedValue<'_> =
73            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
74                .context("failed to parse debezium json payload")?;
75
76        let payload = if let Some(payload) = event.get_mut("payload") {
77            std::mem::take(payload)
78        } else {
79            event
80        };
81
82        Ok(AccessImpl::Json(JsonAccess::new_with_options(
83            payload,
84            &self.json_parse_options,
85        )))
86    }
87}
88
89#[derive(Debug)]
90pub struct DebeziumMongoJsonAccessBuilder {
91    value: Option<Vec<u8>>,
92    json_parse_options: JsonParseOptions,
93    strong_schema: bool,
94}
95
96impl DebeziumMongoJsonAccessBuilder {
97    pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
98        Ok(Self {
99            value: None,
100            json_parse_options: JsonParseOptions::new_for_debezium(
101                TimestamptzHandling::GuessNumberUnit,
102                TimestampHandling::GuessNumberUnit,
103                TimeHandling::Micro,
104                BigintUnsignedHandlingMode::Long,
105                false,
106            ),
107            strong_schema: props.strong_schema,
108        })
109    }
110}
111
112impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
113    #[allow(clippy::unused_async)]
114    async fn generate_accessor(
115        &mut self,
116        payload: Vec<u8>,
117        _: &crate::source::SourceMeta,
118    ) -> ConnectorResult<AccessImpl<'_>> {
119        self.value = Some(payload);
120        let mut event: BorrowedValue<'_> =
121            simd_json::to_borrowed_value(self.value.as_mut().unwrap())
122                .context("failed to parse debezium mongo json payload")?;
123
124        let payload = if let Some(payload) = event.get_mut("payload") {
125            std::mem::take(payload)
126        } else {
127            event
128        };
129
130        Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
131            JsonAccess::new_with_options(payload, &self.json_parse_options),
132            self.strong_schema,
133        )))
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use chrono::{NaiveDate, NaiveTime};
140    use risingwave_common::array::{Op, StructValue};
141    use risingwave_common::catalog::ColumnId;
142    use risingwave_common::row::{OwnedRow, Row};
143    use risingwave_common::types::{
144        DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
145    };
146    use serde_json::Value;
147    use thiserror_ext::AsReport;
148
149    use crate::parser::{
150        DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
151        SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
152    };
153    use crate::source::{SourceContext, SourceCtrlOpts};
154
155    fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
156        if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
157            let mut json_string = String::new();
158            json_val
159                .as_scalar_ref()
160                .force_str(&mut json_string)
161                .unwrap();
162            let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
163            let val2: Value = serde_json::from_str(json_str).unwrap();
164            assert_eq!(val1, val2);
165        }
166    }
167
168    async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
169        let props = SpecificParserConfig {
170            encoding_config: EncodingProperties::Json(JsonProperties {
171                use_schema_registry: false,
172                timestamptz_handling: None,
173                timestamp_handling: None,
174                time_handling: None,
175                bigint_unsigned_handling: None,
176                handle_toast_columns: false,
177            }),
178            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
179        };
180        DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
181            .await
182            .unwrap()
183    }
184
185    async fn parse_one(
186        mut parser: DebeziumParser,
187        columns: Vec<SourceColumnDesc>,
188        payload: Vec<u8>,
189    ) -> Vec<(Op, OwnedRow)> {
190        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
191        parser
192            .parse_inner(None, Some(payload), builder.row_writer())
193            .await
194            .unwrap();
195        builder.finish_current_chunk();
196        let chunk = builder.consume_ready_chunks().next().unwrap();
197        chunk
198            .rows()
199            .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
200            .collect::<Vec<_>>()
201    }
202
203    mod test1_basic {
204        use super::*;
205
206        fn get_test1_columns() -> Vec<SourceColumnDesc> {
207            vec![
208                SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
209                SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
210                SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
211                SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
212            ]
213        }
214
215        #[tokio::test]
216        async fn test1_debezium_json_parser_read() {
217            //     "before": null,
218            //     "after": {
219            //       "id": 101,
220            //       "name": "scooter",
221            //       "description": "Small 2-wheel scooter",
222            //       "weight": 1.234
223            //     },
224            let input = vec![
225                // data with payload field
226                br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
227                // data without payload field
228                br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
229
230            let columns = get_test1_columns();
231
232            for data in input {
233                let parser = build_parser(columns.clone()).await;
234                let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
235                    .await
236                    .try_into()
237                    .unwrap();
238
239                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
240                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
241                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
242                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
243            }
244        }
245
246        #[tokio::test]
247        async fn test1_debezium_json_parser_insert() {
248            //     "before": null,
249            //     "after": {
250            //       "id": 102,
251            //       "name": "car battery",
252            //       "description": "12V car battery",
253            //       "weight": 8.1
254            //     },
255            let input = vec![
256                // data with payload field
257                br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
258                // data without payload field
259                br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
260
261            let columns = get_test1_columns();
262
263            for data in input {
264                let parser = build_parser(columns.clone()).await;
265                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
266                    .await
267                    .try_into()
268                    .unwrap();
269                assert_eq!(op, Op::Insert);
270
271                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
272                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
273                assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
274                assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
275            }
276        }
277
278        #[tokio::test]
279        async fn test1_debezium_json_parser_delete() {
280            //     "before": {
281            //       "id": 101,
282            //       "name": "scooter",
283            //       "description": "Small 2-wheel scooter",
284            //       "weight": 1.234
285            //     },
286            //     "after": null,
287            let input = vec![
288                // data with payload field
289                br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
290                // data without payload field
291                br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
292
293            for data in input {
294                let columns = get_test1_columns();
295                let parser = build_parser(columns.clone()).await;
296                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
297                    .await
298                    .try_into()
299                    .unwrap();
300
301                assert_eq!(op, Op::Delete);
302
303                assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
304                assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
305                assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
306                assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
307            }
308        }
309
310        #[tokio::test]
311        async fn test1_debezium_json_parser_update() {
312            //     "before": {
313            //       "id": 102,
314            //       "name": "car battery",
315            //       "description": "12V car battery",
316            //       "weight": 8.1
317            //     },
318            //     "after": {
319            //       "id": 102,
320            //       "name": "car battery",
321            //       "description": "24V car battery",
322            //       "weight": 9.1
323            //     },
324            let input = vec![
325                // data with payload field
326                br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
327                // data without payload field
328                br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
329
330            let columns = get_test1_columns();
331
332            for data in input {
333                let parser = build_parser(columns.clone()).await;
334                let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
335                    .await
336                    .try_into()
337                    .unwrap();
338
339                assert_eq!(op, Op::Insert);
340
341                assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
342                assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
343                assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
344                assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
345            }
346        }
347    }
348    // test2 covers read/insert/update/delete event on the following MySQL table for debezium json:
349    // CREATE TABLE IF NOT EXISTS orders (
350    //     O_KEY BIGINT NOT NULL,
351    //     O_BOOL BOOLEAN,
352    //     O_TINY TINYINT,
353    //     O_INT INT,
354    //     O_REAL REAL,
355    //     O_DOUBLE DOUBLE,
356    //     O_DECIMAL DECIMAL(15, 2),
357    //     O_CHAR CHAR(15),
358    //     O_DATE DATE,
359    //     O_TIME TIME,
360    //     O_DATETIME DATETIME,
361    //     O_TIMESTAMP TIMESTAMP,
362    //     O_JSON JSON,
363    //     PRIMARY KEY (O_KEY));
364    // test2 also covers overflow tests on basic types
365    mod test2_mysql {
366        use super::*;
367
368        fn get_test2_columns() -> Vec<SourceColumnDesc> {
369            vec![
370                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
371                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
372                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
373                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
374                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
375                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
376                SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
377                SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
378                SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
379                SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
380                SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
381                SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
382                SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
383            ]
384        }
385
386        #[tokio::test]
387        async fn test2_debezium_json_parser_read() {
388            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
389
390            let columns = get_test2_columns();
391
392            let parser = build_parser(columns.clone()).await;
393
394            let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
395                .await
396                .try_into()
397                .unwrap();
398
399            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
400            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
401            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
402            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
403            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
404            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
405            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
406            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
407            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
408                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
409            )))));
410            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
411                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
412            )))));
413            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
414                "1970-01-01T00:00:00".parse().unwrap()
415            )))));
416            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
417                "1970-01-01T00:00:01Z".parse().unwrap()
418            ))));
419            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
420        }
421
422        #[tokio::test]
423        async fn test2_debezium_json_parser_insert() {
424            let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
425
426            let columns = get_test2_columns();
427            let parser = build_parser(columns.clone()).await;
428            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
429                .await
430                .try_into()
431                .unwrap();
432            assert_eq!(op, Op::Insert);
433
434            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
435            assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
436            assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
437            assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
438            assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
439            assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
440            assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
441            assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
442            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
443                NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
444            )))));
445            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
446                NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
447            )))));
448            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
449                "1970-01-01T00:00:00".parse().unwrap()
450            )))));
451            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
452                "1970-01-01T00:00:01Z".parse().unwrap()
453            ))));
454            assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
455        }
456
457        #[tokio::test]
458        async fn test2_debezium_json_parser_delete() {
459            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
460
461            let columns = get_test2_columns();
462            let parser = build_parser(columns.clone()).await;
463            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
464                .await
465                .try_into()
466                .unwrap();
467
468            assert_eq!(op, Op::Delete);
469
470            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
471            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
472            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
473            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
474            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
475            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
476            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
477            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
478            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
479                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
480            )))));
481            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
482                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
483            )))));
484            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
485                "5138-11-16T09:46:39".parse().unwrap()
486            )))));
487            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
488                "2038-01-09T03:14:07Z".parse().unwrap()
489            ))));
490            assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
491        }
492
493        #[tokio::test]
494        async fn test2_debezium_json_parser_update() {
495            let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
496
497            let columns = get_test2_columns();
498
499            let parser = build_parser(columns.clone()).await;
500            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
501                .await
502                .try_into()
503                .unwrap();
504
505            assert_eq!(op, Op::Insert);
506
507            assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
508            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
509            assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
510            assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
511            assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
512            assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
513            assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
514            assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
515            assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
516                NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
517            )))));
518            assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
519                NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
520            )))));
521            assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
522                "5138-11-16T09:46:39".parse().unwrap()
523            )))));
524            assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
525                "2038-01-09T03:14:07Z".parse().unwrap()
526            ))));
527            assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
528        }
529
530        #[cfg(not(madsim))] // Traced test does not work with madsim
531        #[tokio::test]
532        #[tracing_test::traced_test]
533        async fn test2_debezium_json_parser_overflow() {
534            let columns = vec![
535                SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
536                SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
537                SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
538                SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
539                SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
540                SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
541            ];
542            let mut parser = build_parser(columns.clone()).await;
543
544            let mut dummy_builder =
545                SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
546
547            let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
548            let overflow_values = [
549                "9223372036854775808",
550                "2",
551                "32768",
552                "2147483648",
553                "3.80282347E38",
554                "1.797695E308",
555            ];
556
557            for i in 0..6 {
558                let mut values = normal_values;
559                values[i] = overflow_values[i];
560                let data = format!(
561                    r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
562                    values[0], values[1], values[2], values[3], values[4], values[5]
563                ).as_bytes().to_vec();
564
565                let res = parser
566                    .parse_inner(None, Some(data), dummy_builder.row_writer())
567                    .await;
568                if i < 5 {
569                    // For other overflow, the parsing succeeds but the type conversion fails
570                    // The errors are ignored and logged.
571                    res.unwrap();
572                    assert!(logs_contain("expected type"), "{i}");
573                } else {
574                    // For f64 overflow, the parsing fails
575                    let e = res.unwrap_err();
576                    assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
577                }
578            }
579        }
580    }
581
582    // postgres-specific data-type mapping tests
583    mod test3_postgres {
584        use risingwave_pb::plan_common::AdditionalColumn;
585
586        use super::*;
587        use crate::source::SourceColumnType;
588
589        // schema for temporal-type test
590        fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
591            vec![
592                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
593                SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
594                SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
595                SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
596                SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
597                SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
598                SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
599                SourceColumnDesc::simple(
600                    "o_timestampz_0",
601                    DataType::Timestamptz,
602                    ColumnId::from(7),
603                ),
604                SourceColumnDesc::simple(
605                    "o_timestampz_6",
606                    DataType::Timestamptz,
607                    ColumnId::from(8),
608                ),
609                SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
610                SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
611            ]
612        }
613
614        // schema for numeric-type test
615        fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
616            vec![
617                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
618                SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
619                SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
620                SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
621                SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
622                SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
623                SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
624                SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
625                SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
626            ]
627        }
628
629        // schema for the remaining types
630        fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
631            vec![
632                SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
633                SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
634                SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
635                SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
636                SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
637                SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
638                SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
639                SourceColumnDesc {
640                    name: "o_point".to_owned(),
641                    data_type: DataType::Struct(StructType::new(vec![
642                        ("x", DataType::Float32),
643                        ("y", DataType::Float32),
644                    ])),
645                    column_id: 7.into(),
646                    column_type: SourceColumnType::Normal,
647                    is_pk: false,
648                    is_hidden_addition_col: false,
649                    additional_column: AdditionalColumn { column_type: None },
650                },
651                SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
652                SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
653                SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
654                SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
655                SourceColumnDesc::simple(
656                    "o_character_varying",
657                    DataType::Varchar,
658                    ColumnId::from(12),
659                ),
660            ]
661        }
662
663        #[tokio::test]
664        async fn test_temporal_types() {
665            // this test includes all supported temporal types, with the schema
666            // CREATE TABLE orders (
667            //     o_key integer,
668            //     o_time_0 time(0),
669            //     o_time_6 time(6),
670            //     o_timez_0 time(0) with time zone,
671            //     o_timez_6 time(6) with time zone,
672            //     o_timestamp_0 timestamp(0),
673            //     o_timestamp_6 timestamp(6),
674            //     o_timestampz_0 timestamp(0) with time zone,
675            //     o_timestampz_6 timestamp(6) with time zone,
676            //     o_interval interval,
677            //     o_date date,
678            //     PRIMARY KEY (o_key)
679            // );
680            // this test covers an insert event on the table above
681            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
682            let columns = get_temporal_test_columns();
683            let parser = build_parser(columns.clone()).await;
684            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
685                .await
686                .try_into()
687                .unwrap();
688            assert_eq!(op, Op::Insert);
689            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
690            assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
691                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
692            )))));
693            assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
694                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
695            )))));
696            assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
697                NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
698            )))));
699            assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
700                NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
701            )))));
702            assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
703                "2011-11-11T11:11:11".parse().unwrap()
704            )))));
705            assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
706                "2011-11-11T11:11:11.123456".parse().unwrap()
707            )))));
708            assert!(
709                row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
710                    14,
711                    3,
712                    14706780000
713                ))))
714            );
715            assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
716                NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
717            )))));
718        }
719
720        #[tokio::test]
721        async fn test_numeric_types() {
722            // this test includes all supported numeric types, with the schema
723            // CREATE TABLE orders (
724            //     o_key integer,
725            //     o_smallint smallint,
726            //     o_integer integer,
727            //     o_bigint bigint,
728            //     o_real real,
729            //     o_double double precision,
730            //     o_numeric numeric,
731            //     o_numeric_6_3 numeric(6,3),
732            //     o_money money,
733            //     PRIMARY KEY (o_key)
734            // );
735            // this test covers an insert event on the table above
736            let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
737            let columns = get_numeric_test_columns();
738            let parser = build_parser(columns.clone()).await;
739            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
740                .await
741                .try_into()
742                .unwrap();
743            assert_eq!(op, Op::Insert);
744            assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
745            assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
746            assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
747            assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
748            assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
749            assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
750            assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
751            assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
752            assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
753        }
754
755        #[tokio::test]
756        async fn test_other_types() {
757            // this test includes the remaining types, with the schema
758            // CREATE TABLE orders (
759            //     o_key integer,
760            //     o_boolean boolean,
761            //     o_bit bit,
762            //     o_bytea bytea,
763            //     o_json jsonb,
764            //     o_xml xml,
765            //     o_uuid uuid,
766            //     o_point point,
767            //     o_enum bear,
768            //     o_char char,
769            //     o_varchar varchar,
770            //     o_character character,
771            //     o_character_varying character varying,
772            //     PRIMARY KEY (o_key)
773            //  );
774            // this test covers an insert event on the table above
775            let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
776            let columns = get_other_types_test_columns();
777            let parser = build_parser(columns.clone()).await;
778            let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
779                .await
780                .try_into()
781                .unwrap();
782            assert_eq!(op, Op::Insert);
783            assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
784            assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
785            assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
786            assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
787                u8::from_str_radix("01", 16).unwrap(),
788                u8::from_str_radix("23", 16).unwrap(),
789                u8::from_str_radix("45", 16).unwrap(),
790                u8::from_str_radix("67", 16).unwrap(),
791                u8::from_str_radix("89", 16).unwrap(),
792                u8::from_str_radix("AB", 16).unwrap(),
793                u8::from_str_radix("CD", 16).unwrap(),
794                u8::from_str_radix("EF", 16).unwrap()
795            ])))));
796            assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
797            assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
798            assert!(row[6].eq(&Some(ScalarImpl::Utf8(
799                "60f14fe2-f857-404a-b586-3b5375b3259f".into()
800            ))));
801            assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
802                Some(ScalarImpl::Float32(1.into())),
803                Some(ScalarImpl::Float32(2.into()))
804            ])))));
805            assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
806            assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
807            assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
808            assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
809            assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
810        }
811    }
812}