1use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{JsonAccess, JsonParseOptions, TimestamptzHandling};
25use crate::parser::{AccessBuilder, MongoProperties};
26
27#[derive(Debug)]
28pub struct DebeziumJsonAccessBuilder {
29 value: Option<Vec<u8>>,
30 json_parse_options: JsonParseOptions,
31}
32
33impl DebeziumJsonAccessBuilder {
34 pub fn new(timestamptz_handling: TimestamptzHandling) -> ConnectorResult<Self> {
35 Ok(Self {
36 value: None,
37 json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling),
38 })
39 }
40
41 pub fn new_for_schema_event() -> ConnectorResult<Self> {
42 Ok(Self {
43 value: None,
44 json_parse_options: JsonParseOptions::default(),
45 })
46 }
47}
48
49impl AccessBuilder for DebeziumJsonAccessBuilder {
50 #[allow(clippy::unused_async)]
51 async fn generate_accessor(
52 &mut self,
53 payload: Vec<u8>,
54 _: &crate::source::SourceMeta,
55 ) -> ConnectorResult<AccessImpl<'_>> {
56 self.value = Some(payload);
57 let mut event: BorrowedValue<'_> =
58 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
59 .context("failed to parse debezium json payload")?;
60
61 let payload = if let Some(payload) = event.get_mut("payload") {
62 std::mem::take(payload)
63 } else {
64 event
65 };
66
67 Ok(AccessImpl::Json(JsonAccess::new_with_options(
68 payload,
69 &self.json_parse_options,
70 )))
71 }
72}
73
74#[derive(Debug)]
75pub struct DebeziumMongoJsonAccessBuilder {
76 value: Option<Vec<u8>>,
77 json_parse_options: JsonParseOptions,
78 strong_schema: bool,
79}
80
81impl DebeziumMongoJsonAccessBuilder {
82 pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
83 Ok(Self {
84 value: None,
85 json_parse_options: JsonParseOptions::new_for_debezium(
86 TimestamptzHandling::GuessNumberUnit,
87 ),
88 strong_schema: props.strong_schema,
89 })
90 }
91}
92
93impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
94 #[allow(clippy::unused_async)]
95 async fn generate_accessor(
96 &mut self,
97 payload: Vec<u8>,
98 _: &crate::source::SourceMeta,
99 ) -> ConnectorResult<AccessImpl<'_>> {
100 self.value = Some(payload);
101 let mut event: BorrowedValue<'_> =
102 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
103 .context("failed to parse debezium mongo json payload")?;
104
105 let payload = if let Some(payload) = event.get_mut("payload") {
106 std::mem::take(payload)
107 } else {
108 event
109 };
110
111 Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
112 JsonAccess::new_with_options(payload, &self.json_parse_options),
113 self.strong_schema,
114 )))
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use chrono::{NaiveDate, NaiveTime};
121 use risingwave_common::array::{Op, StructValue};
122 use risingwave_common::catalog::ColumnId;
123 use risingwave_common::row::{OwnedRow, Row};
124 use risingwave_common::types::{
125 DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
126 };
127 use serde_json::Value;
128 use thiserror_ext::AsReport;
129
130 use crate::parser::{
131 DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
132 SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
133 };
134 use crate::source::{SourceContext, SourceCtrlOpts};
135
136 fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
137 if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
138 let mut json_string = String::new();
139 json_val
140 .as_scalar_ref()
141 .force_str(&mut json_string)
142 .unwrap();
143 let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
144 let val2: Value = serde_json::from_str(json_str).unwrap();
145 assert_eq!(val1, val2);
146 }
147 }
148
149 async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
150 let props = SpecificParserConfig {
151 encoding_config: EncodingProperties::Json(JsonProperties {
152 use_schema_registry: false,
153 timestamptz_handling: None,
154 }),
155 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
156 };
157 DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
158 .await
159 .unwrap()
160 }
161
162 async fn parse_one(
163 mut parser: DebeziumParser,
164 columns: Vec<SourceColumnDesc>,
165 payload: Vec<u8>,
166 ) -> Vec<(Op, OwnedRow)> {
167 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
168 parser
169 .parse_inner(None, Some(payload), builder.row_writer())
170 .await
171 .unwrap();
172 builder.finish_current_chunk();
173 let chunk = builder.consume_ready_chunks().next().unwrap();
174 chunk
175 .rows()
176 .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
177 .collect::<Vec<_>>()
178 }
179
180 mod test1_basic {
181 use super::*;
182
183 fn get_test1_columns() -> Vec<SourceColumnDesc> {
184 vec![
185 SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
186 SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
187 SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
188 SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
189 ]
190 }
191
192 #[tokio::test]
193 async fn test1_debezium_json_parser_read() {
194 let input = vec![
202 br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
204 br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
206
207 let columns = get_test1_columns();
208
209 for data in input {
210 let parser = build_parser(columns.clone()).await;
211 let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
212 .await
213 .try_into()
214 .unwrap();
215
216 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
217 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
218 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
219 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
220 }
221 }
222
223 #[tokio::test]
224 async fn test1_debezium_json_parser_insert() {
225 let input = vec![
233 br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
235 br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
237
238 let columns = get_test1_columns();
239
240 for data in input {
241 let parser = build_parser(columns.clone()).await;
242 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
243 .await
244 .try_into()
245 .unwrap();
246 assert_eq!(op, Op::Insert);
247
248 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
249 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
250 assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
251 assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
252 }
253 }
254
255 #[tokio::test]
256 async fn test1_debezium_json_parser_delete() {
257 let input = vec![
265 br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
267 br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
269
270 for data in input {
271 let columns = get_test1_columns();
272 let parser = build_parser(columns.clone()).await;
273 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
274 .await
275 .try_into()
276 .unwrap();
277
278 assert_eq!(op, Op::Delete);
279
280 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
281 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
282 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
283 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
284 }
285 }
286
287 #[tokio::test]
288 async fn test1_debezium_json_parser_update() {
289 let input = vec![
302 br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
304 br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
306
307 let columns = get_test1_columns();
308
309 for data in input {
310 let parser = build_parser(columns.clone()).await;
311 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
312 .await
313 .try_into()
314 .unwrap();
315
316 assert_eq!(op, Op::Insert);
317
318 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
319 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
320 assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
321 assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
322 }
323 }
324 }
325 mod test2_mysql {
343 use super::*;
344
345 fn get_test2_columns() -> Vec<SourceColumnDesc> {
346 vec![
347 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
348 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
349 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
350 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
351 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
352 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
353 SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
354 SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
355 SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
356 SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
357 SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
358 SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
359 SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
360 ]
361 }
362
363 #[tokio::test]
364 async fn test2_debezium_json_parser_read() {
365 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
366
367 let columns = get_test2_columns();
368
369 let parser = build_parser(columns.clone()).await;
370
371 let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
372 .await
373 .try_into()
374 .unwrap();
375
376 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
377 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
378 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
379 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
380 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
381 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
382 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
383 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
384 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
385 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
386 )))));
387 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
388 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
389 )))));
390 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
391 "1970-01-01T00:00:00".parse().unwrap()
392 )))));
393 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
394 "1970-01-01T00:00:01Z".parse().unwrap()
395 ))));
396 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
397 }
398
399 #[tokio::test]
400 async fn test2_debezium_json_parser_insert() {
401 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
402
403 let columns = get_test2_columns();
404 let parser = build_parser(columns.clone()).await;
405 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
406 .await
407 .try_into()
408 .unwrap();
409 assert_eq!(op, Op::Insert);
410
411 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
412 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
413 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
414 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
415 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
416 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
417 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
418 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
419 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
420 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
421 )))));
422 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
423 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
424 )))));
425 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
426 "1970-01-01T00:00:00".parse().unwrap()
427 )))));
428 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
429 "1970-01-01T00:00:01Z".parse().unwrap()
430 ))));
431 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
432 }
433
434 #[tokio::test]
435 async fn test2_debezium_json_parser_delete() {
436 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
437
438 let columns = get_test2_columns();
439 let parser = build_parser(columns.clone()).await;
440 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
441 .await
442 .try_into()
443 .unwrap();
444
445 assert_eq!(op, Op::Delete);
446
447 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
448 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
449 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
450 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
451 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
452 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
453 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
454 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
455 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
456 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
457 )))));
458 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
459 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
460 )))));
461 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
462 "5138-11-16T09:46:39".parse().unwrap()
463 )))));
464 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
465 "2038-01-09T03:14:07Z".parse().unwrap()
466 ))));
467 assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
468 }
469
470 #[tokio::test]
471 async fn test2_debezium_json_parser_update() {
472 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
473
474 let columns = get_test2_columns();
475
476 let parser = build_parser(columns.clone()).await;
477 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
478 .await
479 .try_into()
480 .unwrap();
481
482 assert_eq!(op, Op::Insert);
483
484 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
485 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
486 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
487 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
488 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
489 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
490 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
491 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
492 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
493 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
494 )))));
495 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
496 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
497 )))));
498 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
499 "5138-11-16T09:46:39".parse().unwrap()
500 )))));
501 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
502 "2038-01-09T03:14:07Z".parse().unwrap()
503 ))));
504 assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
505 }
506
507 #[cfg(not(madsim))] #[tokio::test]
509 #[tracing_test::traced_test]
510 async fn test2_debezium_json_parser_overflow() {
511 let columns = vec![
512 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
513 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
514 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
515 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
516 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
517 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
518 ];
519 let mut parser = build_parser(columns.clone()).await;
520
521 let mut dummy_builder =
522 SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
523
524 let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
525 let overflow_values = [
526 "9223372036854775808",
527 "2",
528 "32768",
529 "2147483648",
530 "3.80282347E38",
531 "1.797695E308",
532 ];
533
534 for i in 0..6 {
535 let mut values = normal_values;
536 values[i] = overflow_values[i];
537 let data = format!(
538 r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
539 values[0], values[1], values[2], values[3], values[4], values[5]
540 ).as_bytes().to_vec();
541
542 let res = parser
543 .parse_inner(None, Some(data), dummy_builder.row_writer())
544 .await;
545 if i < 5 {
546 res.unwrap();
549 assert!(logs_contain("expected type"), "{i}");
550 } else {
551 let e = res.unwrap_err();
553 assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
554 }
555 }
556 }
557 }
558
559 mod test3_postgres {
561 use risingwave_pb::plan_common::AdditionalColumn;
562
563 use super::*;
564 use crate::source::SourceColumnType;
565
566 fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
568 vec![
569 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
570 SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
571 SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
572 SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
573 SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
574 SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
575 SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
576 SourceColumnDesc::simple(
577 "o_timestampz_0",
578 DataType::Timestamptz,
579 ColumnId::from(7),
580 ),
581 SourceColumnDesc::simple(
582 "o_timestampz_6",
583 DataType::Timestamptz,
584 ColumnId::from(8),
585 ),
586 SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
587 SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
588 ]
589 }
590
591 fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
593 vec![
594 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
595 SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
596 SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
597 SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
598 SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
599 SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
600 SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
601 SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
602 SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
603 ]
604 }
605
606 fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
608 vec![
609 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
610 SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
611 SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
612 SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
613 SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
614 SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
615 SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
616 SourceColumnDesc {
617 name: "o_point".to_owned(),
618 data_type: DataType::Struct(StructType::new(vec![
619 ("x", DataType::Float32),
620 ("y", DataType::Float32),
621 ])),
622 column_id: 7.into(),
623 column_type: SourceColumnType::Normal,
624 is_pk: false,
625 is_hidden_addition_col: false,
626 additional_column: AdditionalColumn { column_type: None },
627 },
628 SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
629 SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
630 SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
631 SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
632 SourceColumnDesc::simple(
633 "o_character_varying",
634 DataType::Varchar,
635 ColumnId::from(12),
636 ),
637 ]
638 }
639
640 #[tokio::test]
641 async fn test_temporal_types() {
642 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
659 let columns = get_temporal_test_columns();
660 let parser = build_parser(columns.clone()).await;
661 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
662 .await
663 .try_into()
664 .unwrap();
665 assert_eq!(op, Op::Insert);
666 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
667 assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
668 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
669 )))));
670 assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
671 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
672 )))));
673 assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
674 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
675 )))));
676 assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
677 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
678 )))));
679 assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
680 "2011-11-11T11:11:11".parse().unwrap()
681 )))));
682 assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
683 "2011-11-11T11:11:11.123456".parse().unwrap()
684 )))));
685 assert!(
686 row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
687 14,
688 3,
689 14706780000
690 ))))
691 );
692 assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
693 NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
694 )))));
695 }
696
697 #[tokio::test]
698 async fn test_numeric_types() {
699 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
714 let columns = get_numeric_test_columns();
715 let parser = build_parser(columns.clone()).await;
716 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
717 .await
718 .try_into()
719 .unwrap();
720 assert_eq!(op, Op::Insert);
721 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
722 assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
723 assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
724 assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
725 assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
726 assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
727 assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
728 assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
729 assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
730 }
731
732 #[tokio::test]
733 async fn test_other_types() {
734 let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
753 let columns = get_other_types_test_columns();
754 let parser = build_parser(columns.clone()).await;
755 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
756 .await
757 .try_into()
758 .unwrap();
759 assert_eq!(op, Op::Insert);
760 assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
761 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
762 assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
763 assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
764 u8::from_str_radix("01", 16).unwrap(),
765 u8::from_str_radix("23", 16).unwrap(),
766 u8::from_str_radix("45", 16).unwrap(),
767 u8::from_str_radix("67", 16).unwrap(),
768 u8::from_str_radix("89", 16).unwrap(),
769 u8::from_str_radix("AB", 16).unwrap(),
770 u8::from_str_radix("CD", 16).unwrap(),
771 u8::from_str_radix("EF", 16).unwrap()
772 ])))));
773 assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
774 assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
775 assert!(row[6].eq(&Some(ScalarImpl::Utf8(
776 "60f14fe2-f857-404a-b586-3b5375b3259f".into()
777 ))));
778 assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
779 Some(ScalarImpl::Float32(1.into())),
780 Some(ScalarImpl::Float32(2.into()))
781 ])))));
782 assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
783 assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
784 assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
785 assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
786 assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
787 }
788 }
789}