1use std::collections::BTreeMap;
25
26use anyhow::Context as _;
27use risingwave_common::catalog::Field;
28use risingwave_connector_codec::JsonSchema;
29
30use super::utils::{bytes_from_url, get_kafka_topic};
31use super::{JsonProperties, SchemaRegistryConfig};
32use crate::error::ConnectorResult;
33use crate::parser::AccessBuilder;
34use crate::parser::unified::AccessImpl;
35use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
36use crate::schema::schema_registry::{Client, handle_sr_list};
37
38#[derive(Debug)]
39pub struct JsonAccessBuilder {
40 value: Option<Vec<u8>>,
41 payload_start_idx: usize,
42 json_parse_options: JsonParseOptions,
43}
44
45impl JsonAccessBuilder {
46 pub fn generate_json_access(&mut self, payload: Vec<u8>) -> ConnectorResult<JsonAccess<'_>> {
47 if payload.is_empty() {
49 self.value = Some("{}".into());
50 } else {
51 self.value = Some(payload);
52 }
53 let value = simd_json::to_borrowed_value(
54 &mut self.value.as_mut().unwrap()[self.payload_start_idx..],
55 )
56 .context("failed to parse json payload")?;
57 Ok(JsonAccess::new_with_options(
58 value,
59 &self.json_parse_options,
62 ))
63 }
64}
65
66impl AccessBuilder for JsonAccessBuilder {
67 async fn generate_accessor(
68 &mut self,
69 payload: Vec<u8>,
70 _: &crate::source::SourceMeta,
71 ) -> ConnectorResult<AccessImpl<'_>> {
72 Ok(AccessImpl::Json(self.generate_json_access(payload)?))
73 }
74}
75
76impl JsonAccessBuilder {
77 pub fn new(config: JsonProperties) -> ConnectorResult<Self> {
78 let mut json_parse_options = JsonParseOptions::DEFAULT;
79 if let Some(mode) = config.timestamp_handling {
80 json_parse_options.timestamp_handling = mode;
81 }
82 if let Some(mode) = config.timestamptz_handling {
83 json_parse_options.timestamptz_handling = mode;
84 }
85 if let Some(mode) = config.time_handling {
86 json_parse_options.time_handling = mode;
87 }
88 if let Some(mode) = config.bigint_unsigned_handling {
89 json_parse_options.bigint_unsigned_handling = mode;
90 }
91 json_parse_options.handle_toast_columns = config.handle_toast_columns;
92 Ok(Self {
93 value: None,
94 payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
95 json_parse_options,
96 })
97 }
98}
99
100pub async fn fetch_json_schema_and_map_to_columns(
101 schema_location: &str,
102 schema_registry_auth: Option<SchemaRegistryConfig>,
103 props: &BTreeMap<String, String>,
104) -> ConnectorResult<Vec<Field>> {
105 let url = handle_sr_list(schema_location)?;
106 let mut json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
107 let client = Client::new(url.clone(), &schema_registry_auth)?;
108 let topic = get_kafka_topic(props)?;
109 let schema = client
110 .get_schema_by_subject(&format!("{}-value", topic))
111 .await?;
112 JsonSchema::parse_str(&schema.content)?
113 } else {
114 let url = url.first().unwrap();
115 let bytes = bytes_from_url(url, None).await?;
116 JsonSchema::parse_bytes(&bytes)?
117 };
118 json_schema
119 .json_schema_to_columns(url.first().unwrap().clone())
120 .await
121 .map_err(Into::into)
122}
123
124#[cfg(test)]
125mod tests {
126 use std::vec;
127
128 use itertools::Itertools;
129 use risingwave_common::array::{Op, StructValue};
130 use risingwave_common::catalog::ColumnDesc;
131 use risingwave_common::row::Row;
132 use risingwave_common::test_prelude::StreamChunkTestExt;
133 use risingwave_common::types::{DataType, ScalarImpl, StructType, ToOwnedDatum};
134 use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
135 use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey};
136
137 use crate::parser::test_utils::ByteStreamSourceParserImplTestExt as _;
138 use crate::parser::{
139 ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, ProtocolProperties,
140 SourceColumnDesc, SpecificParserConfig,
141 };
142 use crate::source::SourceColumnType;
143
144 fn make_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
145 ByteStreamSourceParserImpl::create_for_test(ParserConfig {
146 common: CommonParserConfig { rw_columns },
147 specific: SpecificParserConfig::DEFAULT_PLAIN_JSON,
148 })
149 .unwrap()
150 }
151
152 fn make_upsert_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
153 ByteStreamSourceParserImpl::create_for_test(ParserConfig {
154 common: CommonParserConfig { rw_columns },
155 specific: SpecificParserConfig {
156 protocol_config: ProtocolProperties::Upsert,
157 ..SpecificParserConfig::DEFAULT_PLAIN_JSON
158 },
159 })
160 .unwrap()
161 }
162
163 fn get_payload() -> Vec<Vec<u8>> {
164 vec![
165 br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890,"interval":"P1Y2M3DT0H5M0S"}"#.to_vec(),
166 br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345,"interval":"1 day"}"#.to_vec(),
167 ]
168 }
169
170 fn get_array_top_level_payload() -> Vec<Vec<u8>> {
171 vec![
172 br#"[{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}, {"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}]"#.to_vec()
173 ]
174 }
175
176 async fn test_json_parser(get_payload: fn() -> Vec<Vec<u8>>) {
177 let descs = vec![
178 SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
179 SourceColumnDesc::simple("bool", DataType::Boolean, 2.into()),
180 SourceColumnDesc::simple("i16", DataType::Int16, 3.into()),
181 SourceColumnDesc::simple("i64", DataType::Int64, 4.into()),
182 SourceColumnDesc::simple("f32", DataType::Float32, 5.into()),
183 SourceColumnDesc::simple("f64", DataType::Float64, 6.into()),
184 SourceColumnDesc::simple("varchar", DataType::Varchar, 7.into()),
185 SourceColumnDesc::simple("date", DataType::Date, 8.into()),
186 SourceColumnDesc::simple("timestamp", DataType::Timestamp, 9.into()),
187 SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()),
188 SourceColumnDesc::simple("interval", DataType::Interval, 11.into()),
189 ];
190
191 let parser = make_parser(descs);
192 let chunk = parser.parse(get_payload()).await;
193
194 let mut rows = chunk.rows();
195
196 {
197 let (op, row) = rows.next().unwrap();
198 assert_eq!(op, Op::Insert);
199 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
200 assert_eq!(
201 row.datum_at(1).to_owned_datum(),
202 (Some(ScalarImpl::Bool(true)))
203 );
204 assert_eq!(
205 row.datum_at(2).to_owned_datum(),
206 (Some(ScalarImpl::Int16(1)))
207 );
208 assert_eq!(
209 row.datum_at(3).to_owned_datum(),
210 (Some(ScalarImpl::Int64(12345678)))
211 );
212 assert_eq!(
213 row.datum_at(4).to_owned_datum(),
214 (Some(ScalarImpl::Float32(1.23.into())))
215 );
216 assert_eq!(
217 row.datum_at(5).to_owned_datum(),
218 (Some(ScalarImpl::Float64(1.2345.into())))
219 );
220 assert_eq!(
221 row.datum_at(6).to_owned_datum(),
222 (Some(ScalarImpl::Utf8("varchar".into())))
223 );
224 assert_eq!(
225 row.datum_at(7).to_owned_datum(),
226 (Some(ScalarImpl::Date("2021-01-01".parse().unwrap())))
227 );
228 assert_eq!(
229 row.datum_at(8).to_owned_datum(),
230 (Some(ScalarImpl::Timestamp(
231 "2021-01-01 16:06:12.269".parse().unwrap()
232 )))
233 );
234 assert_eq!(
235 row.datum_at(9).to_owned_datum(),
236 (Some(ScalarImpl::Decimal("12345.67890".parse().unwrap())))
237 );
238 assert_eq!(
239 row.datum_at(10).to_owned_datum(),
240 (Some(ScalarImpl::Interval("P1Y2M3DT0H5M0S".parse().unwrap())))
241 );
242 }
243
244 {
245 let (op, row) = rows.next().unwrap();
246 assert_eq!(op, Op::Insert);
247 assert_eq!(
248 row.datum_at(0).to_owned_datum(),
249 (Some(ScalarImpl::Int32(1)))
250 );
251 assert_eq!(row.datum_at(1).to_owned_datum(), None);
252 assert_eq!(
253 row.datum_at(4).to_owned_datum(),
254 (Some(ScalarImpl::Float32(12345e+10.into())))
255 );
256 assert_eq!(
257 row.datum_at(5).to_owned_datum(),
258 (Some(ScalarImpl::Float64(12345.into())))
259 );
260 assert_eq!(
261 row.datum_at(9).to_owned_datum(),
262 (Some(ScalarImpl::Decimal(12345.into())))
263 );
264 assert_eq!(
265 row.datum_at(10).to_owned_datum(),
266 (Some(ScalarImpl::Interval("1 day".parse().unwrap())))
267 );
268 }
269 }
270
271 #[tokio::test]
272 async fn test_json_parse_object_top_level() {
273 test_json_parser(get_payload).await;
274 }
275 #[ignore]
276 #[tokio::test]
277 async fn test_json_parse_array_top_level() {
278 test_json_parser(get_array_top_level_payload).await;
279 }
280
281 #[tokio::test]
282 async fn test_json_parser_failed() {
283 let descs = vec![
284 SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
285 SourceColumnDesc::simple("v2", DataType::Int16, 1.into()),
286 SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()),
287 ];
288
289 let parser = make_parser(descs);
290 let payloads = vec![
291 br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
293 br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(),
297 br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
299 ];
300 let chunk = parser.parse(payloads).await;
301
302 assert!(chunk.valid());
303 assert_eq!(chunk.cardinality(), 3);
304
305 let row_vec = chunk.rows().collect_vec();
306 assert_eq!(row_vec[1].1.datum_at(1), None);
307 }
308
309 #[tokio::test]
310 async fn test_json_parse_struct() {
311 let descs = [
312 ColumnDesc::named(
313 "data",
314 0.into(),
315 DataType::from(StructType::new([
316 ("created_at", DataType::Timestamp),
317 ("id", DataType::Varchar),
318 ("text", DataType::Varchar),
319 ("lang", DataType::Varchar),
320 ])),
321 ),
322 ColumnDesc::named(
323 "author",
324 5.into(),
325 DataType::from(StructType::new([
326 ("created_at", DataType::Timestamp),
327 ("id", DataType::Varchar),
328 ("name", DataType::Varchar),
329 ("username", DataType::Varchar),
330 ])),
331 ),
332 ColumnDesc::named("I64CastToVarchar", 10.into(), DataType::Varchar),
333 ColumnDesc::named("VarcharCastToI64", 11.into(), DataType::Int64),
334 ]
335 .iter()
336 .map(SourceColumnDesc::from)
337 .collect_vec();
338
339 let parser = make_parser(descs);
340 let payload = br#"
341 {
342 "data": {
343 "created_at": "2022-07-13 20:48:37.07",
344 "id": "1732524418112319151",
345 "text": "Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.",
346 "lang": "English"
347 },
348 "author": {
349 "created_at": "2018-01-29 12:19:11.07",
350 "id": "7772634297",
351 "name": "Lily Frami yet",
352 "username": "Dooley5659"
353 },
354 "I64CastToVarchar": 1598197865760800768,
355 "VarcharCastToI64": "1598197865760800768"
356 }
357 "#.to_vec();
358 let chunk = parser.parse(vec![payload]).await;
359
360 let (op, row) = chunk.rows().next().unwrap();
361 assert_eq!(op, Op::Insert);
362 let row = row.into_owned_row().into_inner();
363
364 let expected = vec![
365 Some(ScalarImpl::Struct(StructValue::new(vec![
366 Some(ScalarImpl::Timestamp(
367 "2022-07-13 20:48:37.07".parse().unwrap()
368 )),
369 Some(ScalarImpl::Utf8("1732524418112319151".into())),
370 Some(ScalarImpl::Utf8("Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.".into())),
371 Some(ScalarImpl::Utf8("English".into())),
372 ]))),
373 Some(ScalarImpl::Struct(StructValue::new(vec![
374 Some(ScalarImpl::Timestamp(
375 "2018-01-29 12:19:11.07".parse().unwrap()
376 )),
377 Some(ScalarImpl::Utf8("7772634297".into())),
378 Some(ScalarImpl::Utf8("Lily Frami yet".into())),
379 Some(ScalarImpl::Utf8("Dooley5659".into())),
380 ]) )),
381 Some(ScalarImpl::Utf8("1598197865760800768".into())),
382 Some(ScalarImpl::Int64(1598197865760800768)),
383 ];
384 assert_eq!(row, expected.into());
385 }
386
387 #[tokio::test]
388 async fn test_json_parse_struct_from_string() {
389 let descs = [ColumnDesc::named(
390 "struct",
391 0.into(),
392 DataType::from(StructType::new([
393 ("varchar", DataType::Varchar),
394 ("boolean", DataType::Boolean),
395 ])),
396 )]
397 .iter()
398 .map(SourceColumnDesc::from)
399 .collect_vec();
400
401 let parser = make_parser(descs);
402 let payload = br#"
403 {
404 "struct": "{\"varchar\": \"varchar\", \"boolean\": true}"
405 }
406 "#
407 .to_vec();
408 let chunk = parser.parse(vec![payload]).await;
409
410 let (op, row) = chunk.rows().next().unwrap();
411 assert_eq!(op, Op::Insert);
412 let row = row.into_owned_row().into_inner();
413
414 let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
415 Some(ScalarImpl::Utf8("varchar".into())),
416 Some(ScalarImpl::Bool(true)),
417 ])))];
418 assert_eq!(row, expected.into());
419 }
420
421 #[cfg(not(madsim))] #[tokio::test]
423 #[tracing_test::traced_test]
424 async fn test_json_parse_struct_missing_field_warning() {
425 let descs = [ColumnDesc::named(
426 "struct",
427 0.into(),
428 DataType::from(StructType::new([
429 ("varchar", DataType::Varchar),
430 ("boolean", DataType::Boolean),
431 ])),
432 )]
433 .iter()
434 .map(SourceColumnDesc::from)
435 .collect_vec();
436
437 let parser = make_parser(descs);
438 let payload = br#"
439 {
440 "struct": {
441 "varchar": "varchar"
442 }
443 }
444 "#
445 .to_vec();
446 let chunk = parser.parse(vec![payload]).await;
447
448 let (op, row) = chunk.rows().next().unwrap();
449 assert_eq!(op, Op::Insert);
450 let row = row.into_owned_row().into_inner();
451
452 let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
453 Some(ScalarImpl::Utf8("varchar".into())),
454 None,
455 ])))];
456 assert_eq!(row, expected.into());
457
458 assert!(logs_contain("undefined nested field, padding with `NULL`"));
459 }
460
461 #[tokio::test]
462 async fn test_json_upsert_parser() {
463 let items = [
464 (r#"{"a":1}"#, r#"{"a":1,"b":2}"#),
465 (r#"{"a":1}"#, r#"{"a":1,"b":3}"#),
466 (r#"{"a":2}"#, r#"{"a":2,"b":2}"#),
467 (r#"{"a":2}"#, r#""#),
468 ]
469 .into_iter()
470 .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
471 .collect_vec();
472
473 let key_column_desc = SourceColumnDesc {
474 name: "rw_key".into(),
475 data_type: DataType::Bytea,
476 column_id: 2.into(),
477 column_type: SourceColumnType::Normal,
478 is_pk: true,
479 is_hidden_addition_col: false,
480 additional_column: AdditionalColumn {
481 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
482 },
483 };
484 let descs = vec![
485 SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
486 SourceColumnDesc::simple("b", DataType::Int32, 1.into()),
487 key_column_desc,
488 ];
489
490 let parser = make_upsert_parser(descs);
491 let chunk = parser.parse_upsert(items).await;
492
493 let mut rows = chunk.rows();
502 {
503 let (op, row) = rows.next().unwrap();
504 assert_eq!(op, Op::Insert);
505 assert_eq!(
506 row.datum_at(0).to_owned_datum(),
507 (Some(ScalarImpl::Int32(1)))
508 );
509 }
510
511 {
512 let (op, row) = rows.next().unwrap();
513 assert_eq!(op, Op::Insert);
514 assert_eq!(
515 row.datum_at(0).to_owned_datum(),
516 (Some(ScalarImpl::Int32(1)))
517 );
518 }
519 {
520 let (op, row) = rows.next().unwrap();
521 assert_eq!(op, Op::Insert);
522 assert_eq!(
523 row.datum_at(0).to_owned_datum(),
524 (Some(ScalarImpl::Int32(2)))
525 );
526 }
527 {
528 let (op, row) = rows.next().unwrap();
529 assert_eq!(op, Op::Delete);
530 assert_eq!(row.datum_at(0).to_owned_datum(), (None));
531 }
532 }
533}