1use std::fmt::Debug;
16
17use anyhow::Context;
18use simd_json::BorrowedValue;
19use simd_json::prelude::MutableObject;
20
21use crate::error::ConnectorResult;
22use crate::parser::unified::AccessImpl;
23use crate::parser::unified::debezium::MongoJsonAccess;
24use crate::parser::unified::json::{
25 BigintUnsignedHandlingMode, JsonAccess, JsonParseOptions, TimeHandling, TimestampHandling,
26 TimestamptzHandling,
27};
28use crate::parser::{AccessBuilder, MongoProperties};
29
30#[derive(Debug)]
31pub struct DebeziumJsonAccessBuilder {
32 value: Option<Vec<u8>>,
33 json_parse_options: JsonParseOptions,
34}
35
36impl DebeziumJsonAccessBuilder {
37 pub fn new(
38 timestamptz_handling: TimestamptzHandling,
39 timestamp_handling: TimestampHandling,
40 time_handling: TimeHandling,
41 bigint_unsigned_handling: BigintUnsignedHandlingMode,
42 handle_toast_columns: bool,
43 ) -> ConnectorResult<Self> {
44 Ok(Self {
45 value: None,
46 json_parse_options: JsonParseOptions::new_for_debezium(
47 timestamptz_handling,
48 timestamp_handling,
49 time_handling,
50 bigint_unsigned_handling,
51 handle_toast_columns,
52 ),
53 })
54 }
55
56 pub fn new_for_schema_event() -> ConnectorResult<Self> {
57 Ok(Self {
58 value: None,
59 json_parse_options: JsonParseOptions::default(),
60 })
61 }
62}
63
64impl AccessBuilder for DebeziumJsonAccessBuilder {
65 #[allow(clippy::unused_async)]
66 async fn generate_accessor(
67 &mut self,
68 payload: Vec<u8>,
69 _: &crate::source::SourceMeta,
70 ) -> ConnectorResult<AccessImpl<'_>> {
71 self.value = Some(payload);
72 let mut event: BorrowedValue<'_> =
73 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
74 .context("failed to parse debezium json payload")?;
75
76 let payload = if let Some(payload) = event.get_mut("payload") {
77 std::mem::take(payload)
78 } else {
79 event
80 };
81
82 Ok(AccessImpl::Json(JsonAccess::new_with_options(
83 payload,
84 &self.json_parse_options,
85 )))
86 }
87}
88
89#[derive(Debug)]
90pub struct DebeziumMongoJsonAccessBuilder {
91 value: Option<Vec<u8>>,
92 json_parse_options: JsonParseOptions,
93 strong_schema: bool,
94}
95
96impl DebeziumMongoJsonAccessBuilder {
97 pub fn new(props: MongoProperties) -> anyhow::Result<Self> {
98 Ok(Self {
99 value: None,
100 json_parse_options: JsonParseOptions::new_for_debezium(
101 TimestamptzHandling::GuessNumberUnit,
102 TimestampHandling::GuessNumberUnit,
103 TimeHandling::Micro,
104 BigintUnsignedHandlingMode::Long,
105 false,
106 ),
107 strong_schema: props.strong_schema,
108 })
109 }
110}
111
112impl AccessBuilder for DebeziumMongoJsonAccessBuilder {
113 #[allow(clippy::unused_async)]
114 async fn generate_accessor(
115 &mut self,
116 payload: Vec<u8>,
117 _: &crate::source::SourceMeta,
118 ) -> ConnectorResult<AccessImpl<'_>> {
119 self.value = Some(payload);
120 let mut event: BorrowedValue<'_> =
121 simd_json::to_borrowed_value(self.value.as_mut().unwrap())
122 .context("failed to parse debezium mongo json payload")?;
123
124 let payload = if let Some(payload) = event.get_mut("payload") {
125 std::mem::take(payload)
126 } else {
127 event
128 };
129
130 Ok(AccessImpl::MongoJson(MongoJsonAccess::new(
131 JsonAccess::new_with_options(payload, &self.json_parse_options),
132 self.strong_schema,
133 )))
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use chrono::{NaiveDate, NaiveTime};
140 use risingwave_common::array::{Op, StructValue};
141 use risingwave_common::catalog::ColumnId;
142 use risingwave_common::row::{OwnedRow, Row};
143 use risingwave_common::types::{
144 DataType, Date, Interval, Scalar, ScalarImpl, StructType, Time, Timestamp,
145 };
146 use serde_json::Value;
147 use thiserror_ext::AsReport;
148
149 use crate::parser::{
150 DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties,
151 SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig,
152 };
153 use crate::source::{SourceContext, SourceCtrlOpts};
154
155 fn assert_json_eq(parse_result: &Option<ScalarImpl>, json_str: &str) {
156 if let Some(ScalarImpl::Jsonb(json_val)) = parse_result {
157 let mut json_string = String::new();
158 json_val
159 .as_scalar_ref()
160 .force_str(&mut json_string)
161 .unwrap();
162 let val1: Value = serde_json::from_str(json_string.as_str()).unwrap();
163 let val2: Value = serde_json::from_str(json_str).unwrap();
164 assert_eq!(val1, val2);
165 }
166 }
167
168 async fn build_parser(rw_columns: Vec<SourceColumnDesc>) -> DebeziumParser {
169 let props = SpecificParserConfig {
170 encoding_config: EncodingProperties::Json(JsonProperties {
171 use_schema_registry: false,
172 timestamptz_handling: None,
173 timestamp_handling: None,
174 time_handling: None,
175 bigint_unsigned_handling: None,
176 handle_toast_columns: false,
177 }),
178 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
179 };
180 DebeziumParser::new(props, rw_columns, SourceContext::dummy().into())
181 .await
182 .unwrap()
183 }
184
185 async fn parse_one(
186 mut parser: DebeziumParser,
187 columns: Vec<SourceColumnDesc>,
188 payload: Vec<u8>,
189 ) -> Vec<(Op, OwnedRow)> {
190 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
191 parser
192 .parse_inner(None, Some(payload), builder.row_writer())
193 .await
194 .unwrap();
195 builder.finish_current_chunk();
196 let chunk = builder.consume_ready_chunks().next().unwrap();
197 chunk
198 .rows()
199 .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
200 .collect::<Vec<_>>()
201 }
202
203 mod test1_basic {
204 use super::*;
205
206 fn get_test1_columns() -> Vec<SourceColumnDesc> {
207 vec![
208 SourceColumnDesc::simple("id", DataType::Int32, ColumnId::from(0)),
209 SourceColumnDesc::simple("name", DataType::Varchar, ColumnId::from(1)),
210 SourceColumnDesc::simple("description", DataType::Varchar, ColumnId::from(2)),
211 SourceColumnDesc::simple("weight", DataType::Float64, ColumnId::from(3)),
212 ]
213 }
214
215 #[tokio::test]
216 async fn test1_debezium_json_parser_read() {
217 let input = vec![
225 br#"{"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}}"#.to_vec(),
227 br#"{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1639547113602,"transaction":null}"#.to_vec()];
229
230 let columns = get_test1_columns();
231
232 for data in input {
233 let parser = build_parser(columns.clone()).await;
234 let [(_op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
235 .await
236 .try_into()
237 .unwrap();
238
239 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
240 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
241 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
242 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
243 }
244 }
245
246 #[tokio::test]
247 async fn test1_debezium_json_parser_insert() {
248 let input = vec![
256 br#"{"payload":{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}}"#.to_vec(),
258 br#"{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551564000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1639551564960,"transaction":null}"#.to_vec()];
260
261 let columns = get_test1_columns();
262
263 for data in input {
264 let parser = build_parser(columns.clone()).await;
265 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
266 .await
267 .try_into()
268 .unwrap();
269 assert_eq!(op, Op::Insert);
270
271 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
272 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
273 assert!(row[2].eq(&Some(ScalarImpl::Utf8("12V car battery".into()))));
274 assert!(row[3].eq(&Some(ScalarImpl::Float64(8.1.into()))));
275 }
276 }
277
278 #[tokio::test]
279 async fn test1_debezium_json_parser_delete() {
280 let input = vec![
288 br#"{"payload":{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}}"#.to_vec(),
290 br#"{"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":1.234},"after":null,"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551767000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1045,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1639551767775,"transaction":null}"#.to_vec()];
292
293 for data in input {
294 let columns = get_test1_columns();
295 let parser = build_parser(columns.clone()).await;
296 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
297 .await
298 .try_into()
299 .unwrap();
300
301 assert_eq!(op, Op::Delete);
302
303 assert!(row[0].eq(&Some(ScalarImpl::Int32(101))));
304 assert!(row[1].eq(&Some(ScalarImpl::Utf8("scooter".into()))));
305 assert!(row[2].eq(&Some(ScalarImpl::Utf8("Small 2-wheel scooter".into()))));
306 assert!(row[3].eq(&Some(ScalarImpl::Float64(1.234.into()))));
307 }
308 }
309
310 #[tokio::test]
311 async fn test1_debezium_json_parser_update() {
312 let input = vec![
325 br#"{"payload":{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}}"#.to_vec(),
327 br#"{"before":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"after":{"id":102,"name":"car battery","description":"24V car battery","weight":9.1},"source":{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639551901000,"snapshot":"false","db":"inventory","sequence":null,"table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1382,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1639551901165,"transaction":null}"#.to_vec()];
329
330 let columns = get_test1_columns();
331
332 for data in input {
333 let parser = build_parser(columns.clone()).await;
334 let [(op, row)]: [_; 1] = parse_one(parser, columns.clone(), data)
335 .await
336 .try_into()
337 .unwrap();
338
339 assert_eq!(op, Op::Insert);
340
341 assert!(row[0].eq(&Some(ScalarImpl::Int32(102))));
342 assert!(row[1].eq(&Some(ScalarImpl::Utf8("car battery".into()))));
343 assert!(row[2].eq(&Some(ScalarImpl::Utf8("24V car battery".into()))));
344 assert!(row[3].eq(&Some(ScalarImpl::Float64(9.1.into()))));
345 }
346 }
347 }
348 mod test2_mysql {
366 use super::*;
367
368 fn get_test2_columns() -> Vec<SourceColumnDesc> {
369 vec![
370 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
371 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
372 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
373 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
374 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
375 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
376 SourceColumnDesc::simple("O_DECIMAL", DataType::Decimal, ColumnId::from(6)),
377 SourceColumnDesc::simple("O_CHAR", DataType::Varchar, ColumnId::from(7)),
378 SourceColumnDesc::simple("O_DATE", DataType::Date, ColumnId::from(8)),
379 SourceColumnDesc::simple("O_TIME", DataType::Time, ColumnId::from(9)),
380 SourceColumnDesc::simple("O_DATETIME", DataType::Timestamp, ColumnId::from(10)),
381 SourceColumnDesc::simple("O_TIMESTAMP", DataType::Timestamptz, ColumnId::from(11)),
382 SourceColumnDesc::simple("O_JSON", DataType::Jsonb, ColumnId::from(12)),
383 ]
384 }
385
386 #[tokio::test]
387 async fn test2_debezium_json_parser_read() {
388 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090651000,"snapshot":"last","db":"test","sequence":null,"table":"orders","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":951,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1678090651640,"transaction":null}}"#;
389
390 let columns = get_test2_columns();
391
392 let parser = build_parser(columns.clone()).await;
393
394 let [(_op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
395 .await
396 .try_into()
397 .unwrap();
398
399 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
400 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
401 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
402 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
403 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
404 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
405 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
406 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
407 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
408 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
409 )))));
410 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
411 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
412 )))));
413 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
414 "1970-01-01T00:00:00".parse().unwrap()
415 )))));
416 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
417 "1970-01-01T00:00:01Z".parse().unwrap()
418 ))));
419 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
420 }
421
422 #[tokio::test]
423 async fn test2_debezium_json_parser_insert() {
424 let data = br#"{"payload":{"before":null,"after":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678088861000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":789,"row":0,"thread":4,"query":null},"op":"c","ts_ms":1678088861249,"transaction":null}}"#;
425
426 let columns = get_test2_columns();
427 let parser = build_parser(columns.clone()).await;
428 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
429 .await
430 .try_into()
431 .unwrap();
432 assert_eq!(op, Op::Insert);
433
434 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
435 assert!(row[1].eq(&Some(ScalarImpl::Bool(true))));
436 assert!(row[2].eq(&Some(ScalarImpl::Int16(-1))));
437 assert!(row[3].eq(&Some(ScalarImpl::Int32(-1111))));
438 assert!(row[4].eq(&Some(ScalarImpl::Float32((-11.11).into()))));
439 assert!(row[5].eq(&Some(ScalarImpl::Float64((-111.11111).into()))));
440 assert!(row[6].eq(&Some(ScalarImpl::Decimal("-111.11".parse().unwrap()))));
441 assert!(row[7].eq(&Some(ScalarImpl::Utf8("yes please".into()))));
442 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
443 NaiveDate::from_ymd_opt(1000, 1, 1).unwrap()
444 )))));
445 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
446 NaiveTime::from_hms_micro_opt(0, 0, 0, 0).unwrap()
447 )))));
448 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
449 "1970-01-01T00:00:00".parse().unwrap()
450 )))));
451 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
452 "1970-01-01T00:00:01Z".parse().unwrap()
453 ))));
454 assert_json_eq(&row[12], "{\"k1\": \"v1\", \"k2\": 11}");
455 }
456
457 #[tokio::test]
458 async fn test2_debezium_json_parser_delete() {
459 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\":\"v1_updated\",\"k2\":33}"},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678090653000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1643,"row":0,"thread":4,"query":null},"op":"d","ts_ms":1678090653611,"transaction":null}}"#;
460
461 let columns = get_test2_columns();
462 let parser = build_parser(columns.clone()).await;
463 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
464 .await
465 .try_into()
466 .unwrap();
467
468 assert_eq!(op, Op::Delete);
469
470 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
471 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
472 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
473 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
474 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
475 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
476 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
477 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
478 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
479 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
480 )))));
481 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
482 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
483 )))));
484 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
485 "5138-11-16T09:46:39".parse().unwrap()
486 )))));
487 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
488 "2038-01-09T03:14:07Z".parse().unwrap()
489 ))));
490 assert_json_eq(&row[12], "{\"k1\":\"v1_updated\",\"k2\":33}");
491 }
492
493 #[tokio::test]
494 async fn test2_debezium_json_parser_update() {
495 let data = br#"{"payload":{"before":{"O_KEY":111,"O_BOOL":1,"O_TINY":-1,"O_INT":-1111,"O_REAL":-11.11,"O_DOUBLE":-111.11111,"O_DECIMAL":-111.11,"O_CHAR":"yes please","O_DATE":"1000-01-01","O_TIME":0,"O_DATETIME":0,"O_TIMESTAMP":"1970-01-01T00:00:01Z","O_JSON":"{\"k1\": \"v1\", \"k2\": 11}"},"after":{"O_KEY":111,"O_BOOL":0,"O_TINY":3,"O_INT":3333,"O_REAL":33.33,"O_DOUBLE":333.33333,"O_DECIMAL":333.33,"O_CHAR":"no thanks","O_DATE":"9999-12-31","O_TIME":86399000000,"O_DATETIME":99999999999000,"O_TIMESTAMP":"2038-01-09T03:14:07Z","O_JSON":"{\"k1\": \"v1_updated\", \"k2\": 33}"},"source":{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678089331000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1168,"row":0,"thread":4,"query":null},"op":"u","ts_ms":1678089331464,"transaction":null}}"#;
496
497 let columns = get_test2_columns();
498
499 let parser = build_parser(columns.clone()).await;
500 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
501 .await
502 .try_into()
503 .unwrap();
504
505 assert_eq!(op, Op::Insert);
506
507 assert!(row[0].eq(&Some(ScalarImpl::Int64(111))));
508 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
509 assert!(row[2].eq(&Some(ScalarImpl::Int16(3))));
510 assert!(row[3].eq(&Some(ScalarImpl::Int32(3333))));
511 assert!(row[4].eq(&Some(ScalarImpl::Float32((33.33).into()))));
512 assert!(row[5].eq(&Some(ScalarImpl::Float64((333.33333).into()))));
513 assert!(row[6].eq(&Some(ScalarImpl::Decimal("333.33".parse().unwrap()))));
514 assert!(row[7].eq(&Some(ScalarImpl::Utf8("no thanks".into()))));
515 assert!(row[8].eq(&Some(ScalarImpl::Date(Date::new(
516 NaiveDate::from_ymd_opt(9999, 12, 31).unwrap()
517 )))));
518 assert!(row[9].eq(&Some(ScalarImpl::Time(Time::new(
519 NaiveTime::from_hms_micro_opt(23, 59, 59, 0).unwrap()
520 )))));
521 assert!(row[10].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
522 "5138-11-16T09:46:39".parse().unwrap()
523 )))));
524 assert!(row[11].eq(&Some(ScalarImpl::Timestamptz(
525 "2038-01-09T03:14:07Z".parse().unwrap()
526 ))));
527 assert_json_eq(&row[12], "{\"k1\": \"v1_updated\", \"k2\": 33}");
528 }
529
530 #[cfg(not(madsim))] #[tokio::test]
532 #[tracing_test::traced_test]
533 async fn test2_debezium_json_parser_overflow() {
534 let columns = vec![
535 SourceColumnDesc::simple("O_KEY", DataType::Int64, ColumnId::from(0)),
536 SourceColumnDesc::simple("O_BOOL", DataType::Boolean, ColumnId::from(1)),
537 SourceColumnDesc::simple("O_TINY", DataType::Int16, ColumnId::from(2)),
538 SourceColumnDesc::simple("O_INT", DataType::Int32, ColumnId::from(3)),
539 SourceColumnDesc::simple("O_REAL", DataType::Float32, ColumnId::from(4)),
540 SourceColumnDesc::simple("O_DOUBLE", DataType::Float64, ColumnId::from(5)),
541 ];
542 let mut parser = build_parser(columns.clone()).await;
543
544 let mut dummy_builder =
545 SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
546
547 let normal_values = ["111", "1", "33", "444", "555.0", "666.0"];
548 let overflow_values = [
549 "9223372036854775808",
550 "2",
551 "32768",
552 "2147483648",
553 "3.80282347E38",
554 "1.797695E308",
555 ];
556
557 for i in 0..6 {
558 let mut values = normal_values;
559 values[i] = overflow_values[i];
560 let data = format!(
561 r#"{{"payload":{{"before":null,"after":{{"O_KEY":{},"O_BOOL":{},"O_TINY":{},"O_INT":{},"O_REAL":{},"O_DOUBLE":{}}},"source":{{"version":"1.9.7.Final","connector":"mysql","name":"RW_CDC_test.orders","ts_ms":1678158055000,"snapshot":"false","db":"test","sequence":null,"table":"orders","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":637,"row":0,"thread":4,"query":null}},"op":"c","ts_ms":1678158055464,"transaction":null}}}}"#,
562 values[0], values[1], values[2], values[3], values[4], values[5]
563 ).as_bytes().to_vec();
564
565 let res = parser
566 .parse_inner(None, Some(data), dummy_builder.row_writer())
567 .await;
568 if i < 5 {
569 res.unwrap();
572 assert!(logs_contain("expected type"), "{i}");
573 } else {
574 let e = res.unwrap_err();
576 assert!(e.to_report_string().contains("InvalidNumber"), "{i}: {e}");
577 }
578 }
579 }
580 }
581
582 mod test3_postgres {
584 use risingwave_pb::plan_common::AdditionalColumn;
585
586 use super::*;
587 use crate::source::SourceColumnType;
588
589 fn get_temporal_test_columns() -> Vec<SourceColumnDesc> {
591 vec![
592 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
593 SourceColumnDesc::simple("o_time_0", DataType::Time, ColumnId::from(1)),
594 SourceColumnDesc::simple("o_time_6", DataType::Time, ColumnId::from(2)),
595 SourceColumnDesc::simple("o_timez_0", DataType::Time, ColumnId::from(3)),
596 SourceColumnDesc::simple("o_timez_6", DataType::Time, ColumnId::from(4)),
597 SourceColumnDesc::simple("o_timestamp_0", DataType::Timestamp, ColumnId::from(5)),
598 SourceColumnDesc::simple("o_timestamp_6", DataType::Timestamp, ColumnId::from(6)),
599 SourceColumnDesc::simple(
600 "o_timestampz_0",
601 DataType::Timestamptz,
602 ColumnId::from(7),
603 ),
604 SourceColumnDesc::simple(
605 "o_timestampz_6",
606 DataType::Timestamptz,
607 ColumnId::from(8),
608 ),
609 SourceColumnDesc::simple("o_interval", DataType::Interval, ColumnId::from(9)),
610 SourceColumnDesc::simple("o_date", DataType::Date, ColumnId::from(10)),
611 ]
612 }
613
614 fn get_numeric_test_columns() -> Vec<SourceColumnDesc> {
616 vec![
617 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
618 SourceColumnDesc::simple("o_smallint", DataType::Int16, ColumnId::from(1)),
619 SourceColumnDesc::simple("o_integer", DataType::Int32, ColumnId::from(2)),
620 SourceColumnDesc::simple("o_bigint", DataType::Int64, ColumnId::from(3)),
621 SourceColumnDesc::simple("o_real", DataType::Float32, ColumnId::from(4)),
622 SourceColumnDesc::simple("o_double", DataType::Float64, ColumnId::from(5)),
623 SourceColumnDesc::simple("o_numeric", DataType::Decimal, ColumnId::from(6)),
624 SourceColumnDesc::simple("o_numeric_6_3", DataType::Decimal, ColumnId::from(7)),
625 SourceColumnDesc::simple("o_money", DataType::Decimal, ColumnId::from(8)),
626 ]
627 }
628
629 fn get_other_types_test_columns() -> Vec<SourceColumnDesc> {
631 vec![
632 SourceColumnDesc::simple("o_key", DataType::Int32, ColumnId::from(0)),
633 SourceColumnDesc::simple("o_boolean", DataType::Boolean, ColumnId::from(1)),
634 SourceColumnDesc::simple("o_bit", DataType::Boolean, ColumnId::from(2)),
635 SourceColumnDesc::simple("o_bytea", DataType::Bytea, ColumnId::from(3)),
636 SourceColumnDesc::simple("o_json", DataType::Jsonb, ColumnId::from(4)),
637 SourceColumnDesc::simple("o_xml", DataType::Varchar, ColumnId::from(5)),
638 SourceColumnDesc::simple("o_uuid", DataType::Varchar, ColumnId::from(6)),
639 SourceColumnDesc {
640 name: "o_point".to_owned(),
641 data_type: DataType::Struct(StructType::new(vec![
642 ("x", DataType::Float32),
643 ("y", DataType::Float32),
644 ])),
645 column_id: 7.into(),
646 column_type: SourceColumnType::Normal,
647 is_pk: false,
648 is_hidden_addition_col: false,
649 additional_column: AdditionalColumn { column_type: None },
650 },
651 SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)),
652 SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)),
653 SourceColumnDesc::simple("o_varchar", DataType::Varchar, ColumnId::from(10)),
654 SourceColumnDesc::simple("o_character", DataType::Varchar, ColumnId::from(11)),
655 SourceColumnDesc::simple(
656 "o_character_varying",
657 DataType::Varchar,
658 ColumnId::from(12),
659 ),
660 ]
661 }
662
663 #[tokio::test]
664 async fn test_temporal_types() {
665 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_time_0":40271000000,"o_time_6":40271000010,"o_timez_0":"11:11:11Z","o_timez_6":"11:11:11.00001Z","o_timestamp_0":1321009871000,"o_timestamp_6":1321009871123456,"o_timestampz_0":"2011-11-11T03:11:11Z","o_timestampz_6":"2011-11-11T03:11:11.123456Z","o_interval":"P1Y2M3DT4H5M6.78S","o_date":"1999-09-09"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684733351963,"snapshot":"last","db":"test","sequence":"[null,\"26505352\"]","schema":"public","table":"orders","txId":729,"lsn":26505352,"xmin":null},"op":"r","ts_ms":1684733352110,"transaction":null}}"#;
682 let columns = get_temporal_test_columns();
683 let parser = build_parser(columns.clone()).await;
684 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
685 .await
686 .try_into()
687 .unwrap();
688 assert_eq!(op, Op::Insert);
689 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
690 assert!(row[1].eq(&Some(ScalarImpl::Time(Time::new(
691 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
692 )))));
693 assert!(row[2].eq(&Some(ScalarImpl::Time(Time::new(
694 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
695 )))));
696 assert!(row[3].eq(&Some(ScalarImpl::Time(Time::new(
697 NaiveTime::from_hms_micro_opt(11, 11, 11, 0).unwrap()
698 )))));
699 assert!(row[4].eq(&Some(ScalarImpl::Time(Time::new(
700 NaiveTime::from_hms_micro_opt(11, 11, 11, 10).unwrap()
701 )))));
702 assert!(row[5].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
703 "2011-11-11T11:11:11".parse().unwrap()
704 )))));
705 assert!(row[6].eq(&Some(ScalarImpl::Timestamp(Timestamp::new(
706 "2011-11-11T11:11:11.123456".parse().unwrap()
707 )))));
708 assert!(
709 row[9].eq(&Some(ScalarImpl::Interval(Interval::from_month_day_usec(
710 14,
711 3,
712 14706780000
713 ))))
714 );
715 assert!(row[10].eq(&Some(ScalarImpl::Date(Date::new(
716 NaiveDate::from_ymd_opt(1999, 9, 9).unwrap()
717 )))));
718 }
719
720 #[tokio::test]
721 async fn test_numeric_types() {
722 let data = br#"{"payload":{"before":null,"after":{"o_key":0,"o_smallint":32767,"o_integer":2147483647,"o_bigint":9223372036854775807,"o_real":9.999,"o_double":9.999999,"o_numeric":123456.789,"o_numeric_6_3":123.456,"o_money":123.12},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684404343201,"snapshot":"last","db":"test","sequence":"[null,\"26519216\"]","schema":"public","table":"orders","txId":729,"lsn":26519216,"xmin":null},"op":"r","ts_ms":1684404343349,"transaction":null}}"#;
737 let columns = get_numeric_test_columns();
738 let parser = build_parser(columns.clone()).await;
739 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
740 .await
741 .try_into()
742 .unwrap();
743 assert_eq!(op, Op::Insert);
744 assert!(row[0].eq(&Some(ScalarImpl::Int32(0))));
745 assert!(row[1].eq(&Some(ScalarImpl::Int16(32767))));
746 assert!(row[2].eq(&Some(ScalarImpl::Int32(2147483647))));
747 assert!(row[3].eq(&Some(ScalarImpl::Int64(9223372036854775807))));
748 assert!(row[4].eq(&Some(ScalarImpl::Float32((9.999).into()))));
749 assert!(row[5].eq(&Some(ScalarImpl::Float64((9.999999).into()))));
750 assert!(row[6].eq(&Some(ScalarImpl::Decimal("123456.7890".parse().unwrap()))));
751 assert!(row[7].eq(&Some(ScalarImpl::Decimal("123.456".parse().unwrap()))));
752 assert!(row[8].eq(&Some(ScalarImpl::Decimal("123.12".parse().unwrap()))));
753 }
754
755 #[tokio::test]
756 async fn test_other_types() {
757 let data = br#"{"payload":{"before":null,"after":{"o_key":1,"o_boolean":false,"o_bit":true,"o_bytea":"ASNFZ4mrze8=","o_json":"{\"k1\": \"v1\", \"k2\": 11}","o_xml":"<!--hahaha-->","o_uuid":"60f14fe2-f857-404a-b586-3b5375b3259f","o_point":{"x":1.0,"y":2.0,"wkb":"AQEAAAAAAAAAAADwPwAAAAAAAABA","srid":null},"o_enum":"polar","o_char":"h","o_varchar":"ha","o_character":"h","o_character_varying":"hahaha"},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"RW_CDC_localhost.test.orders","ts_ms":1684743927178,"snapshot":"last","db":"test","sequence":"[null,\"26524528\"]","schema":"public","table":"orders","txId":730,"lsn":26524528,"xmin":null},"op":"r","ts_ms":1684743927343,"transaction":null}}"#;
776 let columns = get_other_types_test_columns();
777 let parser = build_parser(columns.clone()).await;
778 let [(op, row)]: [_; 1] = parse_one(parser, columns, data.to_vec())
779 .await
780 .try_into()
781 .unwrap();
782 assert_eq!(op, Op::Insert);
783 assert!(row[0].eq(&Some(ScalarImpl::Int32(1))));
784 assert!(row[1].eq(&Some(ScalarImpl::Bool(false))));
785 assert!(row[2].eq(&Some(ScalarImpl::Bool(true))));
786 assert!(row[3].eq(&Some(ScalarImpl::Bytea(Box::new([
787 u8::from_str_radix("01", 16).unwrap(),
788 u8::from_str_radix("23", 16).unwrap(),
789 u8::from_str_radix("45", 16).unwrap(),
790 u8::from_str_radix("67", 16).unwrap(),
791 u8::from_str_radix("89", 16).unwrap(),
792 u8::from_str_radix("AB", 16).unwrap(),
793 u8::from_str_radix("CD", 16).unwrap(),
794 u8::from_str_radix("EF", 16).unwrap()
795 ])))));
796 assert_json_eq(&row[4], "{\"k1\": \"v1\", \"k2\": 11}");
797 assert!(row[5].eq(&Some(ScalarImpl::Utf8("<!--hahaha-->".into()))));
798 assert!(row[6].eq(&Some(ScalarImpl::Utf8(
799 "60f14fe2-f857-404a-b586-3b5375b3259f".into()
800 ))));
801 assert!(row[7].eq(&Some(ScalarImpl::Struct(StructValue::new(vec![
802 Some(ScalarImpl::Float32(1.into())),
803 Some(ScalarImpl::Float32(2.into()))
804 ])))));
805 assert!(row[8].eq(&Some(ScalarImpl::Utf8("polar".into()))));
806 assert!(row[9].eq(&Some(ScalarImpl::Utf8("h".into()))));
807 assert!(row[10].eq(&Some(ScalarImpl::Utf8("ha".into()))));
808 assert!(row[11].eq(&Some(ScalarImpl::Utf8("h".into()))));
809 assert!(row[12].eq(&Some(ScalarImpl::Utf8("hahaha".into()))));
810 }
811 }
812}