1use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25 JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling, TimestamptzHandling,
26};
27use crate::parser::{AccessBuilder, MongoProperties};
28
29#[derive(Debug)]
30pub struct DebeziumJsonAccessBuilder {
31 value: Option<Vec<u8>>,
32 json_parse_options: JsonParseOptions,
33}
34
35impl DebeziumJsonAccessBuilder {
36 pub fn new(
37 timestamptz_handling: TimestamptzHandling,
38 timestamp_handling: TimestampHandling,
39 time_handling: TimeHandling,
40 ) -> ConnectorResult<Self> {
41 Ok(Self {
42 value: None,
43 json_parse_options: JsonParseOptions::new_for_debezium(
44 timestamptz_handling,
45 timestamp_handling,
46 time_handling,
47 ),
48 })
49 }
50
51 pub fn new_for_schema_event() -> ConnectorResult<Self> {
52 Ok(Self {
53 value: None,
54 json_parse_options: JsonParseOptions::default(),
55 })
56 }
57}
58
59impl AccessBuilder for DebeziumJsonAccessBuilder {
60 #[allow(clippy::unused_async)]
61 async fn generate_accessor(
62 &mut self,
63 payload: Vec<u8>,
64 _: &crate::source::SourceMeta,
65 ) -> ConnectorResult<AccessImpl<'_>> {
66 self.value = Some(payload);
67 let mut event: BorrowedValue<'_> =
68 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
69 .context("failed to parse debezium json payload")?;
70
71 let payload = if let Some(payload) = event.get_mut("payload") {
72 std::mem::take(payload)
73 } else {
74 event
75 };
76
77 Ok(AccessImpl::Json(JsonAccess::new_with_options(
78 payload,
79 &self.json_parse_options,
80 )))
81 }
82}
83
84#[derive(Debug)]
85pub struct DebeziumMongoJsonAccessBuilder {
86 value: Option<Vec<u8>>,
87 json_parse_options: JsonParseOptions,
88 strong_schema: bool,
89}
90
91impl DebeziumMongoJsonAccessBuilder {
92 pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
93 Ok(Self {
94 value: None,
95 json_parse_options: JsonParseOptions::new_for_debezium(
96 TimestamptzHandling::GuessNumberUnit,
97 TimestampHandling::GuessNumberUnit,
98 TimeHandling::Micro,
99 ),
100 strong_schema: props.strong_schema,
101 })
102 }
103}
104
105impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
106 #[allow(clippy::unused_async)]
107 async fn generate_accessor(
108 &mut self,
109 payload: Vec<u8>,
110 _: &crate::source::SourceMeta,
111 ) -> ConnectorResult<AccessImpl<'_>> {
112 self.value = Some(payload);
113 let mut event: BorrowedValue<'_> =
114 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
115 .context("failed to parse debezium mongo json payload")?;
116
117 let payload = if let Some(payload) = event.get_mut("payload") {
118 std::mem::take(payload)
119 } else {
120 event
121 };
122
123 Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
124 JsonAccess::new_with_options(payload, &self.json_parse_options),
125 self.strong_schema,
126 )))
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use chrono::{NaiveDate, NaiveTime};
133 use risingwave_common::array::{Op, StructValue};
134 use risingwave_common::catalog::ColumnId;
135 use risingwave_common::row::{OwnedRow, Row};
136 use risingwave_common::types::{
137 DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
138 };
139 use serde_json::Value;
140 use thiserror_ext::AsReport;
141
142 use crate::parser::{
143 DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
144 SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
145 };
146 use crate::source::{SourceContext, SourceCtrlOpts};
147
148 fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
149 if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
150 let mut json_string = String::new();
151 json_val
152 .as_scalar_ref()
153 .force_str(&mut json_string)
154 .unwrap();
155 let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
156 let val2: Value = serde_json::from_str(json_str).unwrap();
157 assert_eq!(val1, val2);
158 }
159 }
160
161 async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
162 let props = SpecificParserConfig {
163 encoding_config: EncodingProperties::Json(JsonProperties {
164 use_schema_registry: false,
165 timestamptz_handling: None,
166 timestamp_handling: None,
167 time_handling: None,
168 }),
169 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
170 };
171 DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
172 .await
173 .unwrap()
174 }
175
176 async fn parse_one(
177 mut parser: DebeziumParser,
178 columns: Vec<SourceColumnDesc>,
179 payload: Vec<u8>,
180 ) -> Vec<(Op, OwnedRow)> {
181 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
182 parser
183 .parse_inner(None, Some(payload), builder.row_writer())
184 .await
185 .unwrap();
186 builder.finish_current_chunk();
187 let chunk = builder.consume_ready_chunks().next().unwrap();
188 chunk
189 .rows()
190 .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
191 .collect::<Vec<_>>()
192 }
193
194 mod test1_basic {
195 use super::*;
196
197 fn get_test1_columns() -> Vec<SourceColumnDesc> {
198 vec![
199 SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
200 SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
201 SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
202 SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
203 ]
204 }
205
206 #[tokio::test]
207 async fn test1_debezium_json_parser_read() {
208 let input = vec![
216 br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
218 br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
220
221 let columns = get_test1_columns();
222
223 for data in input {
224 let parser = build_parser(columns.clone()).await;
225 let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
226 .await
227 .try_into()
228 .unwrap();
229
230 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
231 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
232 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
233 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
234 }
235 }
236
237 #[tokio::test]
238 async fn test1_debezium_json_parser_insert() {
239 let input = vec![
247 br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
249 br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
251
252 let columns = get_test1_columns();
253
254 for data in input {
255 let parser = build_parser(columns.clone()).await;
256 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
257 .await
258 .try_into()
259 .unwrap();
260 assert_eq!(op, Op::Insert);
261
262 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
263 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
264 assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
265 assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
266 }
267 }
268
269 #[tokio::test]
270 async fn test1_debezium_json_parser_delete() {
271 let input = vec![
279 br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
281 br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
283
284 for data in input {
285 let columns = get_test1_columns();
286 let parser = build_parser(columns.clone()).await;
287 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
288 .await
289 .try_into()
290 .unwrap();
291
292 assert_eq!(op, Op::Delete);
293
294 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
295 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
296 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
297 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
298 }
299 }
300
301 #[tokio::test]
302 async fn test1_debezium_json_parser_update() {
303 let input = vec![
316 br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
318 br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
320
321 let columns = get_test1_columns();
322
323 for data in input {
324 let parser = build_parser(columns.clone()).await;
325 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
326 .await
327 .try_into()
328 .unwrap();
329
330 assert_eq!(op, Op::Insert);
331
332 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
333 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
334 assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
335 assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
336 }
337 }
338 }
339 mod test2_mysql {
357 use super::*;
358
359 fn get_test2_columns() -> Vec<SourceColumnDesc> {
360 vec![
361 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
362 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
363 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
364 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
365 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
366 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
367 SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
368 SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
369 SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
370 SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
371 SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
372 SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
373 SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
374 ]
375 }
376
377 #[tokio::test]
378 async fn test2_debezium_json_parser_read() {
379 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
380
381 let columns = get_test2_columns();
382
383 let parser = build_parser(columns.clone()).await;
384
385 let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
386 .await
387 .try_into()
388 .unwrap();
389
390 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
391 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
392 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
393 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
394 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
395 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
396 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
397 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
398 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
399 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
400 )))));
401 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
402 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
403 )))));
404 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
405 "1970-01-01T00:00:00".parse().unwrap()
406 )))));
407 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
408 "1970-01-01T00:00:01Z".parse().unwrap()
409 ))));
410 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
411 }
412
413 #[tokio::test]
414 async fn test2_debezium_json_parser_insert() {
415 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
416
417 let columns = get_test2_columns();
418 let parser = build_parser(columns.clone()).await;
419 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
420 .await
421 .try_into()
422 .unwrap();
423 assert_eq!(op, Op::Insert);
424
425 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
426 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
427 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
428 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
429 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
430 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
431 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
432 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
433 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
434 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
435 )))));
436 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
437 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
438 )))));
439 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
440 "1970-01-01T00:00:00".parse().unwrap()
441 )))));
442 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
443 "1970-01-01T00:00:01Z".parse().unwrap()
444 ))));
445 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
446 }
447
448 #[tokio::test]
449 async fn test2_debezium_json_parser_delete() {
450 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
451
452 let columns = get_test2_columns();
453 let parser = build_parser(columns.clone()).await;
454 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
455 .await
456 .try_into()
457 .unwrap();
458
459 assert_eq!(op, Op::Delete);
460
461 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
462 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
463 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
464 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
465 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
466 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
467 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
468 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
469 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
470 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
471 )))));
472 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
473 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
474 )))));
475 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
476 "5138-11-16T09:46:39".parse().unwrap()
477 )))));
478 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
479 "2038-01-09T03:14:07Z".parse().unwrap()
480 ))));
481 assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
482 }
483
484 #[tokio::test]
485 async fn test2_debezium_json_parser_update() {
486 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
487
488 let columns = get_test2_columns();
489
490 let parser = build_parser(columns.clone()).await;
491 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
492 .await
493 .try_into()
494 .unwrap();
495
496 assert_eq!(op, Op::Insert);
497
498 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
499 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
500 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
501 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
502 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
503 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
504 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
505 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
506 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
507 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
508 )))));
509 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
510 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
511 )))));
512 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
513 "5138-11-16T09:46:39".parse().unwrap()
514 )))));
515 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
516 "2038-01-09T03:14:07Z".parse().unwrap()
517 ))));
518 assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
519 }
520
521 #[cfg(not(madsim))] #[tokio::test]
523 #[tracing_test::traced_test]
524 async fn test2_debezium_json_parser_overflow() {
525 let columns = vec![
526 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
527 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
528 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
529 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
530 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
531 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
532 ];
533 let mut parser = build_parser(columns.clone()).await;
534
535 let mut dummy_builder =
536 SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
537
538 let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
539 let overflow_values = [
540 "9223372036854775808",
541 "2",
542 "32768",
543 "2147483648",
544 "3.80282347E38",
545 "1.797695E308",
546 ];
547
548 for i in 0..6 {
549 let mut values = normal_values;
550 values[i] = overflow_values[i];
551 let data = format!(
552 r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
553 values[0], values[1], values[2], values[3], values[4], values[5]
554 ).as_bytes().to_vec();
555
556 let res = parser
557 .parse_inner(None, Some(data), dummy_builder.row_writer())
558 .await;
559 if i < 5 {
560 res.unwrap();
563 assert!(logs_contain("expected type"), "{i}");
564 } else {
565 let e = res.unwrap_err();
567 assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
568 }
569 }
570 }
571 }
572
573 mod test3_postgres {
575 use risingwave_pb::plan_common::AdditionalColumn;
576
577 use super::*;
578 use crate::source::SourceColumnType;
579
580 fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
582 vec![
583 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
584 SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
585 SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
586 SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
587 SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
588 SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
589 SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
590 SourceColumnDesc::simple(
591 "o_timestampz_0",
592 DataType::Timestamptz,
593 ColumnId::from(7),
594 ),
595 SourceColumnDesc::simple(
596 "o_timestampz_6",
597 DataType::Timestamptz,
598 ColumnId::from(8),
599 ),
600 SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
601 SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
602 ]
603 }
604
605 fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
607 vec![
608 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
609 SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
610 SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
611 SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
612 SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
613 SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
614 SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
615 SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
616 SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
617 ]
618 }
619
620 fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
622 vec![
623 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
624 SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
625 SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
626 SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
627 SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
628 SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
629 SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
630 SourceColumnDesc {
631 name: "o_point".to_owned(),
632 data_type: DataType::Struct(StructType::new(vec![
633 ("x", DataType::Float32),
634 ("y", DataType::Float32),
635 ])),
636 column_id: 7.into(),
637 column_type: SourceColumnType::Normal,
638 is_pk: false,
639 is_hidden_addition_col: false,
640 additional_column: AdditionalColumn { column_type: None },
641 },
642 SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
643 SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
644 SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
645 SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
646 SourceColumnDesc::simple(
647 "o_character_varying",
648 DataType::Varchar,
649 ColumnId::from(12),
650 ),
651 ]
652 }
653
654 #[tokio::test]
655 async fn test_temporal_types() {
656 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
673 let columns = get_temporal_test_columns();
674 let parser = build_parser(columns.clone()).await;
675 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
676 .await
677 .try_into()
678 .unwrap();
679 assert_eq!(op, Op::Insert);
680 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
681 assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
682 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
683 )))));
684 assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
685 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
686 )))));
687 assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
688 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
689 )))));
690 assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
691 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
692 )))));
693 assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
694 "2011-11-11T11:11:11".parse().unwrap()
695 )))));
696 assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
697 "2011-11-11T11:11:11.123456".parse().unwrap()
698 )))));
699 assert!(
700 row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
701 14,
702 3,
703 14706780000
704 ))))
705 );
706 assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
707 NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
708 )))));
709 }
710
711 #[tokio::test]
712 async fn test_numeric_types() {
713 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
728 let columns = get_numeric_test_columns();
729 let parser = build_parser(columns.clone()).await;
730 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
731 .await
732 .try_into()
733 .unwrap();
734 assert_eq!(op, Op::Insert);
735 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
736 assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
737 assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
738 assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
739 assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
740 assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
741 assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
742 assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
743 assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
744 }
745
746 #[tokio::test]
747 async fn test_other_types() {
748 let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
767 let columns = get_other_types_test_columns();
768 let parser = build_parser(columns.clone()).await;
769 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
770 .await
771 .try_into()
772 .unwrap();
773 assert_eq!(op, Op::Insert);
774 assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
775 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
776 assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
777 assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
778 u8::from_str_radix("01", 16).unwrap(),
779 u8::from_str_radix("23", 16).unwrap(),
780 u8::from_str_radix("45", 16).unwrap(),
781 u8::from_str_radix("67", 16).unwrap(),
782 u8::from_str_radix("89", 16).unwrap(),
783 u8::from_str_radix("AB", 16).unwrap(),
784 u8::from_str_radix("CD", 16).unwrap(),
785 u8::from_str_radix("EF", 16).unwrap()
786 ])))));
787 assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
788 assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
789 assert!(row[6].eq(&Some(ScalarImpl::Utf8(
790 "60f14fe2-f857-404a-b586-3b5375b3259f".into()
791 ))));
792 assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
793 Some(ScalarImpl::Float32(1.into())),
794 Some(ScalarImpl::Float32(2.into()))
795 ])))));
796 assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
797 assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
798 assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
799 assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
800 assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
801 }
802 }
803}