risingwave_connector/parser/maxwell/
simd_json_parser.rs1#[cfg(test)]
16mod tests {
17 use risingwave_common::array::Op;
18 use risingwave_common::row::Row;
19 use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
20
21 use crate::parser::maxwell::MaxwellParser;
22 use crate::parser::{
23 EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc,
24 SourceStreamChunkBuilder, SpecificParserConfig,
25 };
26 use crate::source::{SourceContext, SourceCtrlOpts};
27
28 #[tokio::test]
29 async fn test_json_parser() {
30 let descs = vec![
31 SourceColumnDesc::simple("ID", DataType::Int32, 0.into()),
32 SourceColumnDesc::simple("NAME", DataType::Varchar, 1.into()),
33 SourceColumnDesc::simple("is_adult", DataType::Int16, 2.into()),
34 SourceColumnDesc::simple("birthday", DataType::Timestamp, 3.into()),
35 ];
36
37 let props = SpecificParserConfig {
38 encoding_config: EncodingProperties::Json(JsonProperties {
39 use_schema_registry: false,
40 timestamptz_handling: None,
41 timestamp_handling: None,
42 time_handling: None,
43 bigint_unsigned_handling: None,
44 handle_toast_columns: false,
45 }),
46 protocol_config: ProtocolProperties::Maxwell,
47 };
48 let mut parser = MaxwellParser::new(props, descs.clone(), SourceContext::dummy().into())
49 .await
50 .unwrap();
51
52 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
53 let payloads = vec![
54 br#"{"database":"test","table":"t","type":"insert","ts":1666937996,"xid":1171,"commit":true,"data":{"id":1,"name":"tom","is_adult":0,"birthday":"2017-12-31 16:00:01"}}"#.to_vec(),
55 br#"{"database":"test","table":"t","type":"insert","ts":1666938023,"xid":1254,"commit":true,"data":{"id":2,"name":"alex","is_adult":1,"birthday":"1999-12-31 16:00:01"}}"#.to_vec(),
56 br#"{"database":"test","table":"t","type":"update","ts":1666938068,"xid":1373,"commit":true,"data":{"id":2,"name":"chi","is_adult":1,"birthday":"1999-12-31 16:00:01"},"old":{"name":"alex"}}"#.to_vec()
57 ];
58
59 for payload in payloads {
60 parser
61 .parse_inner(payload, builder.row_writer())
62 .await
63 .unwrap();
64 }
65
66 builder.finish_current_chunk();
67 let chunk = builder.consume_ready_chunks().next().unwrap();
68
69 let mut rows = chunk.rows();
70
71 {
72 let (op, row) = rows.next().unwrap();
73 assert_eq!(op, Op::Insert);
74 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
75 assert_eq!(
76 row.datum_at(1).to_owned_datum(),
77 (Some(ScalarImpl::Utf8("tom".into())))
78 );
79 assert_eq!(
80 row.datum_at(2).to_owned_datum(),
81 (Some(ScalarImpl::Int16(0)))
82 );
83 assert_eq!(
84 row.datum_at(3).to_owned_datum(),
85 (Some(ScalarImpl::Timestamp(
86 "2017-12-31 16:00:01".parse().unwrap()
87 )))
88 )
89 }
90
91 {
92 let (op, row) = rows.next().unwrap();
93 assert_eq!(op, Op::Insert);
94 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2)));
95 assert_eq!(
96 row.datum_at(1).to_owned_datum(),
97 (Some(ScalarImpl::Utf8("alex".into())))
98 );
99 assert_eq!(
100 row.datum_at(2).to_owned_datum(),
101 (Some(ScalarImpl::Int16(1)))
102 );
103 assert_eq!(
104 row.datum_at(3).to_owned_datum(),
105 (Some(ScalarImpl::Timestamp(
106 "1999-12-31 16:00:01".parse().unwrap()
107 )))
108 )
109 }
110
111 {
112 let (op, row) = rows.next().unwrap();
113 assert_eq!(op, Op::Insert);
114 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(2)));
115 assert_eq!(
116 row.datum_at(1).to_owned_datum(),
117 (Some(ScalarImpl::Utf8("chi".into())))
118 );
119 assert_eq!(
120 row.datum_at(2).to_owned_datum(),
121 (Some(ScalarImpl::Int16(1)))
122 );
123 assert_eq!(
124 row.datum_at(3).to_owned_datum(),
125 (Some(ScalarImpl::Timestamp(
126 "1999-12-31 16:00:01".parse().unwrap()
127 )))
128 )
129 }
130 }
131}