1use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25 JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling,
26};
27use crate::parser::{AccessBuilder, MongoProperties};
28
29#[derive(Debug)]
30pub struct DebeziumJsonAccessBuilder {
31 value: Option<Vec<u8>>,
32 json_parse_options: JsonParseOptions,
33}
34
35impl DebeziumJsonAccessBuilder {
36 pub fn new(
37 timestamptz_handling: TimestamptzHandling,
38 timestamp_handling: TimestampHandling,
39 time_handling: TimeHandling,
40 handle_toast_columns: bool,
41 ) -> ConnectorResult<Self> {
42 Ok(Self {
43 value: None,
44 json_parse_options: JsonParseOptions::new_for_debezium(
45 timestamptz_handling,
46 timestamp_handling,
47 time_handling,
48 handle_toast_columns,
49 ),
50 })
51 }
52
53 pub fn new_for_schema_event() -> ConnectorResult<Self> {
54 Ok(Self {
55 value: None,
56 json_parse_options: JsonParseOptions::default(),
57 })
58 }
59}
60
61impl AccessBuilder for DebeziumJsonAccessBuilder {
62 #[allow(clippy::unused_async)]
63 async fn generate_accessor(
64 &mut self,
65 payload: Vec<u8>,
66 _: &crate::source::SourceMeta,
67 ) -> ConnectorResult<AccessImpl<'_>> {
68 self.value = Some(payload);
69 let mut event: BorrowedValue<'_> =
70 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
71 .context("failed to parse debezium json payload")?;
72
73 let payload = if let Some(payload) = event.get_mut("payload") {
74 std::mem::take(payload)
75 } else {
76 event
77 };
78
79 Ok(AccessImpl::Json(JsonAccess::new_with_options(
80 payload,
81 &self.json_parse_options,
82 )))
83 }
84}
85
86#[derive(Debug)]
87pub struct DebeziumMongoJsonAccessBuilder {
88 value: Option<Vec<u8>>,
89 json_parse_options: JsonParseOptions,
90 strong_schema: bool,
91}
92
93impl DebeziumMongoJsonAccessBuilder {
94 pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
95 Ok(Self {
96 value: None,
97 json_parse_options: JsonParseOptions::new_for_debezium(
98 TimestamptzHandling::GuessNumberUnit,
99 TimestampHandling::GuessNumberUnit,
100 TimeHandling::Micro,
101 false,
102 ),
103 strong_schema: props.strong_schema,
104 })
105 }
106}
107
108impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
109 #[allow(clippy::unused_async)]
110 async fn generate_accessor(
111 &mut self,
112 payload: Vec<u8>,
113 _: &crate::source::SourceMeta,
114 ) -> ConnectorResult<AccessImpl<'_>> {
115 self.value = Some(payload);
116 let mut event: BorrowedValue<'_> =
117 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
118 .context("failed to parse debezium mongo json payload")?;
119
120 let payload = if let Some(payload) = event.get_mut("payload") {
121 std::mem::take(payload)
122 } else {
123 event
124 };
125
126 Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
127 JsonAccess::new_with_options(payload, &self.json_parse_options),
128 self.strong_schema,
129 )))
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use chrono::{NaiveDate, NaiveTime};
136 use risingwave_common::array::{Op, StructValue};
137 use risingwave_common::catalog::ColumnId;
138 use risingwave_common::row::{OwnedRow, Row};
139 use risingwave_common::types::{
140 DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
141 };
142 use serde_json::Value;
143 use thiserror_ext::AsReport;
144
145 use crate::parser::{
146 DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
147 SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
148 };
149 use crate::source::{SourceContext, SourceCtrlOpts};
150
151 fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
152 if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
153 let mut json_string = String::new();
154 json_val
155 .as_scalar_ref()
156 .force_str(&mut json_string)
157 .unwrap();
158 let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
159 let val2: Value = serde_json::from_str(json_str).unwrap();
160 assert_eq!(val1, val2);
161 }
162 }
163
164 async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
165 let props = SpecificParserConfig {
166 encoding_config: EncodingProperties::Json(JsonProperties {
167 use_schema_registry: false,
168 timestamptz_handling: None,
169 timestamp_handling: None,
170 time_handling: None,
171 handle_toast_columns: false,
172 }),
173 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
174 };
175 DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
176 .await
177 .unwrap()
178 }
179
180 async fn parse_one(
181 mut parser: DebeziumParser,
182 columns: Vec<SourceColumnDesc>,
183 payload: Vec<u8>,
184 ) -> Vec<(Op, OwnedRow)> {
185 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
186 parser
187 .parse_inner(None, Some(payload), builder.row_writer())
188 .await
189 .unwrap();
190 builder.finish_current_chunk();
191 let chunk = builder.consume_ready_chunks().next().unwrap();
192 chunk
193 .rows()
194 .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
195 .collect::<Vec<_>>()
196 }
197
198 mod test1_basic {
199 use super::*;
200
201 fn get_test1_columns() -> Vec<SourceColumnDesc> {
202 vec![
203 SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
204 SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
205 SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
206 SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
207 ]
208 }
209
210 #[tokio::test]
211 async fn test1_debezium_json_parser_read() {
212 let input = vec![
220 br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
222 br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
224
225 let columns = get_test1_columns();
226
227 for data in input {
228 let parser = build_parser(columns.clone()).await;
229 let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
230 .await
231 .try_into()
232 .unwrap();
233
234 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
235 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
236 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
237 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
238 }
239 }
240
241 #[tokio::test]
242 async fn test1_debezium_json_parser_insert() {
243 let input = vec![
251 br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
253 br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
255
256 let columns = get_test1_columns();
257
258 for data in input {
259 let parser = build_parser(columns.clone()).await;
260 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
261 .await
262 .try_into()
263 .unwrap();
264 assert_eq!(op, Op::Insert);
265
266 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
267 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
268 assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
269 assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
270 }
271 }
272
273 #[tokio::test]
274 async fn test1_debezium_json_parser_delete() {
275 let input = vec![
283 br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
285 br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
287
288 for data in input {
289 let columns = get_test1_columns();
290 let parser = build_parser(columns.clone()).await;
291 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
292 .await
293 .try_into()
294 .unwrap();
295
296 assert_eq!(op, Op::Delete);
297
298 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
299 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
300 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
301 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
302 }
303 }
304
305 #[tokio::test]
306 async fn test1_debezium_json_parser_update() {
307 let input = vec![
320 br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
322 br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
324
325 let columns = get_test1_columns();
326
327 for data in input {
328 let parser = build_parser(columns.clone()).await;
329 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
330 .await
331 .try_into()
332 .unwrap();
333
334 assert_eq!(op, Op::Insert);
335
336 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
337 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
338 assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
339 assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
340 }
341 }
342 }
343 mod test2_mysql {
361 use super::*;
362
363 fn get_test2_columns() -> Vec<SourceColumnDesc> {
364 vec![
365 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
366 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
367 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
368 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
369 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
370 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
371 SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
372 SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
373 SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
374 SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
375 SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
376 SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
377 SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
378 ]
379 }
380
381 #[tokio::test]
382 async fn test2_debezium_json_parser_read() {
383 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
384
385 let columns = get_test2_columns();
386
387 let parser = build_parser(columns.clone()).await;
388
389 let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
390 .await
391 .try_into()
392 .unwrap();
393
394 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
395 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
396 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
397 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
398 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
399 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
400 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
401 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
402 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
403 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
404 )))));
405 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
406 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
407 )))));
408 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
409 "1970-01-01T00:00:00".parse().unwrap()
410 )))));
411 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
412 "1970-01-01T00:00:01Z".parse().unwrap()
413 ))));
414 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
415 }
416
417 #[tokio::test]
418 async fn test2_debezium_json_parser_insert() {
419 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
420
421 let columns = get_test2_columns();
422 let parser = build_parser(columns.clone()).await;
423 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
424 .await
425 .try_into()
426 .unwrap();
427 assert_eq!(op, Op::Insert);
428
429 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
430 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
431 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
432 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
433 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
434 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
435 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
436 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
437 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
438 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
439 )))));
440 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
441 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
442 )))));
443 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
444 "1970-01-01T00:00:00".parse().unwrap()
445 )))));
446 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
447 "1970-01-01T00:00:01Z".parse().unwrap()
448 ))));
449 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
450 }
451
452 #[tokio::test]
453 async fn test2_debezium_json_parser_delete() {
454 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
455
456 let columns = get_test2_columns();
457 let parser = build_parser(columns.clone()).await;
458 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
459 .await
460 .try_into()
461 .unwrap();
462
463 assert_eq!(op, Op::Delete);
464
465 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
466 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
467 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
468 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
469 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
470 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
471 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
472 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
473 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
474 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
475 )))));
476 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
477 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
478 )))));
479 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
480 "5138-11-16T09:46:39".parse().unwrap()
481 )))));
482 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
483 "2038-01-09T03:14:07Z".parse().unwrap()
484 ))));
485 assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
486 }
487
488 #[tokio::test]
489 async fn test2_debezium_json_parser_update() {
490 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
491
492 let columns = get_test2_columns();
493
494 let parser = build_parser(columns.clone()).await;
495 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
496 .await
497 .try_into()
498 .unwrap();
499
500 assert_eq!(op, Op::Insert);
501
502 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
503 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
504 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
505 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
506 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
507 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
508 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
509 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
510 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
511 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
512 )))));
513 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
514 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
515 )))));
516 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
517 "5138-11-16T09:46:39".parse().unwrap()
518 )))));
519 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
520 "2038-01-09T03:14:07Z".parse().unwrap()
521 ))));
522 assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
523 }
524
525 #[cfg(not(madsim))] #[tokio::test]
527 #[tracing_test::traced_test]
528 async fn test2_debezium_json_parser_overflow() {
529 let columns = vec![
530 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
531 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
532 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
533 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
534 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
535 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
536 ];
537 let mut parser = build_parser(columns.clone()).await;
538
539 let mut dummy_builder =
540 SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
541
542 let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
543 let overflow_values = [
544 "9223372036854775808",
545 "2",
546 "32768",
547 "2147483648",
548 "3.80282347E38",
549 "1.797695E308",
550 ];
551
552 for i in 0..6 {
553 let mut values = normal_values;
554 values[i] = overflow_values[i];
555 let data = format!(
556 r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
557 values[0], values[1], values[2], values[3], values[4], values[5]
558 ).as_bytes().to_vec();
559
560 let res = parser
561 .parse_inner(None, Some(data), dummy_builder.row_writer())
562 .await;
563 if i < 5 {
564 res.unwrap();
567 assert!(logs_contain("expected type"), "{i}");
568 } else {
569 let e = res.unwrap_err();
571 assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
572 }
573 }
574 }
575 }
576
577 mod test3_postgres {
579 use risingwave_pb::plan_common::AdditionalColumn;
580
581 use super::*;
582 use crate::source::SourceColumnType;
583
584 fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
586 vec![
587 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
588 SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
589 SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
590 SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
591 SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
592 SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
593 SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
594 SourceColumnDesc::simple(
595 "o_timestampz_0",
596 DataType::Timestamptz,
597 ColumnId::from(7),
598 ),
599 SourceColumnDesc::simple(
600 "o_timestampz_6",
601 DataType::Timestamptz,
602 ColumnId::from(8),
603 ),
604 SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
605 SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
606 ]
607 }
608
609 fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
611 vec![
612 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
613 SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
614 SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
615 SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
616 SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
617 SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
618 SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
619 SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
620 SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
621 ]
622 }
623
624 fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
626 vec![
627 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
628 SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
629 SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
630 SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
631 SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
632 SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
633 SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
634 SourceColumnDesc {
635 name: "o_point".to_owned(),
636 data_type: DataType::Struct(StructType::new(vec![
637 ("x", DataType::Float32),
638 ("y", DataType::Float32),
639 ])),
640 column_id: 7.into(),
641 column_type: SourceColumnType::Normal,
642 is_pk: false,
643 is_hidden_addition_col: false,
644 additional_column: AdditionalColumn { column_type: None },
645 },
646 SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
647 SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
648 SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
649 SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
650 SourceColumnDesc::simple(
651 "o_character_varying",
652 DataType::Varchar,
653 ColumnId::from(12),
654 ),
655 ]
656 }
657
658 #[tokio::test]
659 async fn test_temporal_types() {
660 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
677 let columns = get_temporal_test_columns();
678 let parser = build_parser(columns.clone()).await;
679 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
680 .await
681 .try_into()
682 .unwrap();
683 assert_eq!(op, Op::Insert);
684 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
685 assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
686 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
687 )))));
688 assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
689 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
690 )))));
691 assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
692 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
693 )))));
694 assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
695 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
696 )))));
697 assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
698 "2011-11-11T11:11:11".parse().unwrap()
699 )))));
700 assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
701 "2011-11-11T11:11:11.123456".parse().unwrap()
702 )))));
703 assert!(
704 row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
705 14,
706 3,
707 14706780000
708 ))))
709 );
710 assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
711 NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
712 )))));
713 }
714
715 #[tokio::test]
716 async fn test_numeric_types() {
717 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
732 let columns = get_numeric_test_columns();
733 let parser = build_parser(columns.clone()).await;
734 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
735 .await
736 .try_into()
737 .unwrap();
738 assert_eq!(op, Op::Insert);
739 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
740 assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
741 assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
742 assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
743 assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
744 assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
745 assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
746 assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
747 assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
748 }
749
750 #[tokio::test]
751 async fn test_other_types() {
752 let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
771 let columns = get_other_types_test_columns();
772 let parser = build_parser(columns.clone()).await;
773 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
774 .await
775 .try_into()
776 .unwrap();
777 assert_eq!(op, Op::Insert);
778 assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
779 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
780 assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
781 assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
782 u8::from_str_radix("01", 16).unwrap(),
783 u8::from_str_radix("23", 16).unwrap(),
784 u8::from_str_radix("45", 16).unwrap(),
785 u8::from_str_radix("67", 16).unwrap(),
786 u8::from_str_radix("89", 16).unwrap(),
787 u8::from_str_radix("AB", 16).unwrap(),
788 u8::from_str_radix("CD", 16).unwrap(),
789 u8::from_str_radix("EF", 16).unwrap()
790 ])))));
791 assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
792 assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
793 assert!(row[6].eq(&Some(ScalarImpl::Utf8(
794 "60f14fe2-f857-404a-b586-3b5375b3259f".into()
795 ))));
796 assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
797 Some(ScalarImpl::Float32(1.into())),
798 Some(ScalarImpl::Float32(2.into()))
799 ])))));
800 assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
801 assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
802 assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
803 assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
804 assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
805 }
806 }
807}