risingwave_connector/parser/maxwell/
simd_json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use risingwave_common::array::Op;
18    use risingwave_common::row::Row;
19    use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
20
21    use crate::parser::maxwell::MaxwellParser;
22    use crate::parser::{
23        EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc,
24        SourceStreamChunkBuilder, SpecificParserConfig,
25    };
26    use crate::source::{SourceContext, SourceCtrlOpts};
27
28    #[tokio::test]
29    async fn test_json_parser() {
30        let descs = vec![
31            SourceColumnDesc::simple("ID", DataType::Int32, 0.into()),
32            SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
33            SourceColumnDesc::simple("is_adult", DataType::Int16, 2.into()),
34            SourceColumnDesc::simple("birthday", DataType::Timestamp, 3.into()),
35        ];
36
37        let props = SpecificParserConfig {
38            encoding_config: EncodingProperties::Json(JsonProperties {
39                use_schema_registry: false,
40                timestamptz_handling: None,
41                timestamp_handling: None,
42                time_handling: None,
43                bigint_unsigned_handling: None,
44                handle_toast_columns: false,
45            }),
46            protocol_config: ProtocolProperties::Maxwell,
47        };
48        let mut parser = MaxwellParser::new(props, descs.clone(), SourceContext::dummy().into())
49            .await
50            .unwrap();
51
52        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
53        let payloads = vec![
54            br#"{"database":"test","table":"t","type":"insert","ts":1666937996,"xid":1171,"commit":true,"data":{"id":1,"name":"tom","is_adult":0,"birthday":"2017-12-31 16:00:01"}}"#.to_vec(),
55            br#"{"database":"test","table":"t","type":"insert","ts":1666938023,"xid":1254,"commit":true,"data":{"id":2,"name":"alex","is_adult":1,"birthday":"1999-12-31 16:00:01"}}"#.to_vec(),
56            br#"{"database":"test","table":"t","type":"update","ts":1666938068,"xid":1373,"commit":true,"data":{"id":2,"name":"chi","is_adult":1,"birthday":"1999-12-31 16:00:01"},"old":{"name":"alex"}}"#.to_vec()
57        ];
58
59        for payload in payloads {
60            parser
61                .parse_inner(payload, builder.row_writer())
62                .await
63                .unwrap();
64        }
65
66        builder.finish_current_chunk();
67        let chunk = builder.consume_ready_chunks().next().unwrap();
68
69        let mut rows = chunk.rows();
70
71        {
72            let (op, row) = rows.next().unwrap();
73            assert_eq!(op, Op::Insert);
74            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
75            assert_eq!(
76                row.datum_at(1).to_owned_datum(),
77                (Some(ScalarImpl::Utf8("tom".into())))
78            );
79            assert_eq!(
80                row.datum_at(2).to_owned_datum(),
81                (Some(ScalarImpl::Int16(0)))
82            );
83            assert_eq!(
84                row.datum_at(3).to_owned_datum(),
85                (Some(ScalarImpl::Timestamp(
86                    "2017-12-31 16:00:01".parse().unwrap()
87                )))
88            )
89        }
90
91        {
92            let (op, row) = rows.next().unwrap();
93            assert_eq!(op, Op::Insert);
94            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2)));
95            assert_eq!(
96                row.datum_at(1).to_owned_datum(),
97                (Some(ScalarImpl::Utf8("alex".into())))
98            );
99            assert_eq!(
100                row.datum_at(2).to_owned_datum(),
101                (Some(ScalarImpl::Int16(1)))
102            );
103            assert_eq!(
104                row.datum_at(3).to_owned_datum(),
105                (Some(ScalarImpl::Timestamp(
106                    "1999-12-31 16:00:01".parse().unwrap()
107                )))
108            )
109        }
110
111        {
112            let (op, row) = rows.next().unwrap();
113            assert_eq!(op, Op::Insert);
114            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2)));
115            assert_eq!(
116                row.datum_at(1).to_owned_datum(),
117                (Some(ScalarImpl::Utf8("chi".into())))
118            );
119            assert_eq!(
120                row.datum_at(2).to_owned_datum(),
121                (Some(ScalarImpl::Int16(1)))
122            );
123            assert_eq!(
124                row.datum_at(3).to_owned_datum(),
125                (Some(ScalarImpl::Timestamp(
126                    "1999-12-31 16:00:01".parse().unwrap()
127                )))
128            )
129        }
130    }
131}