1use std::collections::BTreeMap;
25
26use anyhow::Context as _;
27use risingwave_common::catalog::Field;
28use risingwave_connector_codec::JsonSchema;
29
30use super::utils::{bytes_from_url, get_kafka_topic};
31use super::{JsonProperties, SchemaRegistryAuth};
32use crate::error::ConnectorResult;
33use crate::parser::AccessBuilder;
34use crate::parser::unified::AccessImpl;
35use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
36use crate::schema::schema_registry::{Client, handle_sr_list};
37
38#[derive(Debug)]
39pub struct JsonAccessBuilder {
40 value: Option<Vec<u8>>,
41 payload_start_idx: usize,
42 json_parse_options: JsonParseOptions,
43}
44
45impl AccessBuilder for JsonAccessBuilder {
46 #[allow(clippy::unused_async)]
47 async fn generate_accessor(
48 &mut self,
49 payload: Vec<u8>,
50 _: &crate::source::SourceMeta,
51 ) -> ConnectorResult<AccessImpl<'_>> {
52 if payload.is_empty() {
54 self.value = Some("{}".into());
55 } else {
56 self.value = Some(payload);
57 }
58 let value = simd_json::to_borrowed_value(
59 &mut self.value.as_mut().unwrap()[self.payload_start_idx..],
60 )
61 .context("failed to parse json payload")?;
62 Ok(AccessImpl::Json(JsonAccess::new_with_options(
63 value,
64 &self.json_parse_options,
67 )))
68 }
69}
70
71impl JsonAccessBuilder {
72 pub fn new(config: JsonProperties) -> ConnectorResult<Self> {
73 let mut json_parse_options = JsonParseOptions::DEFAULT;
74 if let Some(mode) = config.timestamptz_handling {
75 json_parse_options.timestamptz_handling = mode;
76 }
77 Ok(Self {
78 value: None,
79 payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
80 json_parse_options,
81 })
82 }
83}
84
85pub async fn fetch_json_schema_and_map_to_columns(
86 schema_location: &str,
87 schema_registry_auth: Option<SchemaRegistryAuth>,
88 props: &BTreeMap<String, String>,
89) -> ConnectorResult<Vec<Field>> {
90 let url = handle_sr_list(schema_location)?;
91 let mut json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
92 let client = Client::new(url.clone(), &schema_registry_auth)?;
93 let topic = get_kafka_topic(props)?;
94 let schema = client
95 .get_schema_by_subject(&format!("{}-value", topic))
96 .await?;
97 JsonSchema::parse_str(&schema.content)?
98 } else {
99 let url = url.first().unwrap();
100 let bytes = bytes_from_url(url, None).await?;
101 JsonSchema::parse_bytes(&bytes)?
102 };
103 json_schema
104 .json_schema_to_columns(url.first().unwrap().clone())
105 .await
106 .map_err(Into::into)
107}
108
109#[cfg(test)]
110mod tests {
111 use std::vec;
112
113 use itertools::Itertools;
114 use risingwave_common::array::{Op, StructValue};
115 use risingwave_common::catalog::ColumnDesc;
116 use risingwave_common::row::Row;
117 use risingwave_common::test_prelude::StreamChunkTestExt;
118 use risingwave_common::types::{DataType, ScalarImpl, StructType, ToOwnedDatum};
119 use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
120 use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey};
121
122 use crate::parser::test_utils::ByteStreamSourceParserImplTestExt as _;
123 use crate::parser::{
124 ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, ProtocolProperties,
125 SourceColumnDesc, SpecificParserConfig,
126 };
127 use crate::source::SourceColumnType;
128
129 fn make_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
130 ByteStreamSourceParserImpl::create_for_test(ParserConfig {
131 common: CommonParserConfig { rw_columns },
132 specific: SpecificParserConfig::DEFAULT_PLAIN_JSON,
133 })
134 .unwrap()
135 }
136
137 fn make_upsert_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
138 ByteStreamSourceParserImpl::create_for_test(ParserConfig {
139 common: CommonParserConfig { rw_columns },
140 specific: SpecificParserConfig {
141 protocol_config: ProtocolProperties::Upsert,
142 ..SpecificParserConfig::DEFAULT_PLAIN_JSON
143 },
144 })
145 .unwrap()
146 }
147
148 fn get_payload() -> Vec<Vec<u8>> {
149 vec![
150 br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890,"interval":"P1Y2M3DT0H5M0S"}"#.to_vec(),
151 br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345,"interval":"1 day"}"#.to_vec(),
152 ]
153 }
154
155 fn get_array_top_level_payload() -> Vec<Vec<u8>> {
156 vec![
157 br#"[{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}, {"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}]"#.to_vec()
158 ]
159 }
160
161 async fn test_json_parser(get_payload: fn() -> Vec<Vec<u8>>) {
162 let descs = vec![
163 SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
164 SourceColumnDesc::simple("bool", DataType::Boolean, 2.into()),
165 SourceColumnDesc::simple("i16", DataType::Int16, 3.into()),
166 SourceColumnDesc::simple("i64", DataType::Int64, 4.into()),
167 SourceColumnDesc::simple("f32", DataType::Float32, 5.into()),
168 SourceColumnDesc::simple("f64", DataType::Float64, 6.into()),
169 SourceColumnDesc::simple("varchar", DataType::Varchar, 7.into()),
170 SourceColumnDesc::simple("date", DataType::Date, 8.into()),
171 SourceColumnDesc::simple("timestamp", DataType::Timestamp, 9.into()),
172 SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()),
173 SourceColumnDesc::simple("interval", DataType::Interval, 11.into()),
174 ];
175
176 let parser = make_parser(descs);
177 let chunk = parser.parse(get_payload()).await;
178
179 let mut rows = chunk.rows();
180
181 {
182 let (op, row) = rows.next().unwrap();
183 assert_eq!(op, Op::Insert);
184 assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
185 assert_eq!(
186 row.datum_at(1).to_owned_datum(),
187 (Some(ScalarImpl::Bool(true)))
188 );
189 assert_eq!(
190 row.datum_at(2).to_owned_datum(),
191 (Some(ScalarImpl::Int16(1)))
192 );
193 assert_eq!(
194 row.datum_at(3).to_owned_datum(),
195 (Some(ScalarImpl::Int64(12345678)))
196 );
197 assert_eq!(
198 row.datum_at(4).to_owned_datum(),
199 (Some(ScalarImpl::Float32(1.23.into())))
200 );
201 assert_eq!(
202 row.datum_at(5).to_owned_datum(),
203 (Some(ScalarImpl::Float64(1.2345.into())))
204 );
205 assert_eq!(
206 row.datum_at(6).to_owned_datum(),
207 (Some(ScalarImpl::Utf8("varchar".into())))
208 );
209 assert_eq!(
210 row.datum_at(7).to_owned_datum(),
211 (Some(ScalarImpl::Date("2021-01-01".parse().unwrap())))
212 );
213 assert_eq!(
214 row.datum_at(8).to_owned_datum(),
215 (Some(ScalarImpl::Timestamp(
216 "2021-01-01 16:06:12.269".parse().unwrap()
217 )))
218 );
219 assert_eq!(
220 row.datum_at(9).to_owned_datum(),
221 (Some(ScalarImpl::Decimal("12345.67890".parse().unwrap())))
222 );
223 assert_eq!(
224 row.datum_at(10).to_owned_datum(),
225 (Some(ScalarImpl::Interval("P1Y2M3DT0H5M0S".parse().unwrap())))
226 );
227 }
228
229 {
230 let (op, row) = rows.next().unwrap();
231 assert_eq!(op, Op::Insert);
232 assert_eq!(
233 row.datum_at(0).to_owned_datum(),
234 (Some(ScalarImpl::Int32(1)))
235 );
236 assert_eq!(row.datum_at(1).to_owned_datum(), None);
237 assert_eq!(
238 row.datum_at(4).to_owned_datum(),
239 (Some(ScalarImpl::Float32(12345e+10.into())))
240 );
241 assert_eq!(
242 row.datum_at(5).to_owned_datum(),
243 (Some(ScalarImpl::Float64(12345.into())))
244 );
245 assert_eq!(
246 row.datum_at(9).to_owned_datum(),
247 (Some(ScalarImpl::Decimal(12345.into())))
248 );
249 assert_eq!(
250 row.datum_at(10).to_owned_datum(),
251 (Some(ScalarImpl::Interval("1 day".parse().unwrap())))
252 );
253 }
254 }
255
256 #[tokio::test]
257 async fn test_json_parse_object_top_level() {
258 test_json_parser(get_payload).await;
259 }
260 #[ignore]
261 #[tokio::test]
262 async fn test_json_parse_array_top_level() {
263 test_json_parser(get_array_top_level_payload).await;
264 }
265
266 #[tokio::test]
267 async fn test_json_parser_failed() {
268 let descs = vec![
269 SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
270 SourceColumnDesc::simple("v2", DataType::Int16, 1.into()),
271 SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()),
272 ];
273
274 let parser = make_parser(descs);
275 let payloads = vec![
276 br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
278 br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(),
282 br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
284 ];
285 let chunk = parser.parse(payloads).await;
286
287 assert!(chunk.valid());
288 assert_eq!(chunk.cardinality(), 3);
289
290 let row_vec = chunk.rows().collect_vec();
291 assert_eq!(row_vec[1].1.datum_at(1), None);
292 }
293
294 #[tokio::test]
295 async fn test_json_parse_struct() {
296 let descs = vec![
297 ColumnDesc::named(
298 "data",
299 0.into(),
300 DataType::from(StructType::new([
301 ("created_at", DataType::Timestamp),
302 ("id", DataType::Varchar),
303 ("text", DataType::Varchar),
304 ("lang", DataType::Varchar),
305 ])),
306 ),
307 ColumnDesc::named(
308 "author",
309 5.into(),
310 DataType::from(StructType::new([
311 ("created_at", DataType::Timestamp),
312 ("id", DataType::Varchar),
313 ("name", DataType::Varchar),
314 ("username", DataType::Varchar),
315 ])),
316 ),
317 ColumnDesc::named("I64CastToVarchar", 10.into(), DataType::Varchar),
318 ColumnDesc::named("VarcharCastToI64", 11.into(), DataType::Int64),
319 ]
320 .iter()
321 .map(SourceColumnDesc::from)
322 .collect_vec();
323
324 let parser = make_parser(descs);
325 let payload = br#"
326 {
327 "data": {
328 "created_at": "2022-07-13 20:48:37.07",
329 "id": "1732524418112319151",
330 "text": "Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.",
331 "lang": "English"
332 },
333 "author": {
334 "created_at": "2018-01-29 12:19:11.07",
335 "id": "7772634297",
336 "name": "Lily Frami yet",
337 "username": "Dooley5659"
338 },
339 "I64CastToVarchar": 1598197865760800768,
340 "VarcharCastToI64": "1598197865760800768"
341 }
342 "#.to_vec();
343 let chunk = parser.parse(vec![payload]).await;
344
345 let (op, row) = chunk.rows().next().unwrap();
346 assert_eq!(op, Op::Insert);
347 let row = row.into_owned_row().into_inner();
348
349 let expected = vec![
350 Some(ScalarImpl::Struct(StructValue::new(vec![
351 Some(ScalarImpl::Timestamp(
352 "2022-07-13 20:48:37.07".parse().unwrap()
353 )),
354 Some(ScalarImpl::Utf8("1732524418112319151".into())),
355 Some(ScalarImpl::Utf8("Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.".into())),
356 Some(ScalarImpl::Utf8("English".into())),
357 ]))),
358 Some(ScalarImpl::Struct(StructValue::new(vec![
359 Some(ScalarImpl::Timestamp(
360 "2018-01-29 12:19:11.07".parse().unwrap()
361 )),
362 Some(ScalarImpl::Utf8("7772634297".into())),
363 Some(ScalarImpl::Utf8("Lily Frami yet".into())),
364 Some(ScalarImpl::Utf8("Dooley5659".into())),
365 ]) )),
366 Some(ScalarImpl::Utf8("1598197865760800768".into())),
367 Some(ScalarImpl::Int64(1598197865760800768)),
368 ];
369 assert_eq!(row, expected.into());
370 }
371
372 #[tokio::test]
373 async fn test_json_parse_struct_from_string() {
374 let descs = vec![ColumnDesc::named(
375 "struct",
376 0.into(),
377 DataType::from(StructType::new([
378 ("varchar", DataType::Varchar),
379 ("boolean", DataType::Boolean),
380 ])),
381 )]
382 .iter()
383 .map(SourceColumnDesc::from)
384 .collect_vec();
385
386 let parser = make_parser(descs);
387 let payload = br#"
388 {
389 "struct": "{\"varchar\": \"varchar\", \"boolean\": true}"
390 }
391 "#
392 .to_vec();
393 let chunk = parser.parse(vec![payload]).await;
394
395 let (op, row) = chunk.rows().next().unwrap();
396 assert_eq!(op, Op::Insert);
397 let row = row.into_owned_row().into_inner();
398
399 let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
400 Some(ScalarImpl::Utf8("varchar".into())),
401 Some(ScalarImpl::Bool(true)),
402 ])))];
403 assert_eq!(row, expected.into());
404 }
405
406 #[cfg(not(madsim))] #[tokio::test]
408 #[tracing_test::traced_test]
409 async fn test_json_parse_struct_missing_field_warning() {
410 let descs = vec![ColumnDesc::named(
411 "struct",
412 0.into(),
413 DataType::from(StructType::new([
414 ("varchar", DataType::Varchar),
415 ("boolean", DataType::Boolean),
416 ])),
417 )]
418 .iter()
419 .map(SourceColumnDesc::from)
420 .collect_vec();
421
422 let parser = make_parser(descs);
423 let payload = br#"
424 {
425 "struct": {
426 "varchar": "varchar"
427 }
428 }
429 "#
430 .to_vec();
431 let chunk = parser.parse(vec![payload]).await;
432
433 let (op, row) = chunk.rows().next().unwrap();
434 assert_eq!(op, Op::Insert);
435 let row = row.into_owned_row().into_inner();
436
437 let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
438 Some(ScalarImpl::Utf8("varchar".into())),
439 None,
440 ])))];
441 assert_eq!(row, expected.into());
442
443 assert!(logs_contain("undefined nested field, padding with `NULL`"));
444 }
445
446 #[tokio::test]
447 async fn test_json_upsert_parser() {
448 let items = [
449 (r#"{"a":1}"#, r#"{"a":1,"b":2}"#),
450 (r#"{"a":1}"#, r#"{"a":1,"b":3}"#),
451 (r#"{"a":2}"#, r#"{"a":2,"b":2}"#),
452 (r#"{"a":2}"#, r#""#),
453 ]
454 .into_iter()
455 .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
456 .collect_vec();
457
458 let key_column_desc = SourceColumnDesc {
459 name: "rw_key".into(),
460 data_type: DataType::Bytea,
461 column_id: 2.into(),
462 column_type: SourceColumnType::Normal,
463 is_pk: true,
464 is_hidden_addition_col: false,
465 additional_column: AdditionalColumn {
466 column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
467 },
468 };
469 let descs = vec![
470 SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
471 SourceColumnDesc::simple("b", DataType::Int32, 1.into()),
472 key_column_desc,
473 ];
474
475 let parser = make_upsert_parser(descs);
476 let chunk = parser.parse_upsert(items).await;
477
478 let mut rows = chunk.rows();
487 {
488 let (op, row) = rows.next().unwrap();
489 assert_eq!(op, Op::Insert);
490 assert_eq!(
491 row.datum_at(0).to_owned_datum(),
492 (Some(ScalarImpl::Int32(1)))
493 );
494 }
495
496 {
497 let (op, row) = rows.next().unwrap();
498 assert_eq!(op, Op::Insert);
499 assert_eq!(
500 row.datum_at(0).to_owned_datum(),
501 (Some(ScalarImpl::Int32(1)))
502 );
503 }
504 {
505 let (op, row) = rows.next().unwrap();
506 assert_eq!(op, Op::Insert);
507 assert_eq!(
508 row.datum_at(0).to_owned_datum(),
509 (Some(ScalarImpl::Int32(2)))
510 );
511 }
512 {
513 let (op, row) = rows.next().unwrap();
514 assert_eq!(op, Op::Delete);
515 assert_eq!(row.datum_at(0).to_owned_datum(), (None));
516 }
517 }
518}