risingwave_connector/parser/
json_parser.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Note on this file:
16//
17// There's no struct named `JsonParser` anymore since #13707. `ENCODE JSON` will be
18// dispatched to `PlainParser` or `UpsertParser` with `JsonAccessBuilder` instead.
19//
20// This file now only contains utilities and tests for JSON parsing. Also, to avoid
21// rely on the internal implementation and allow that to be changed, the tests use
22// `ByteStreamSourceParserImpl` to create a parser instance.
23
24use std::collections::BTreeMap;
25
26use anyhow::Context as _;
27use risingwave_common::catalog::Field;
28use risingwave_connector_codec::JsonSchema;
29
30use super::utils::{bytes_from_url, get_kafka_topic};
31use super::{JsonProperties, SchemaRegistryConfig};
32use crate::error::ConnectorResult;
33use crate::parser::AccessBuilder;
34use crate::parser::unified::AccessImpl;
35use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
36use crate::schema::schema_registry::{Client, handle_sr_list};
37
38#[derive(Debug)]
39pub struct JsonAccessBuilder {
40    value: Option<Vec<u8>>,
41    payload_start_idx: usize,
42    json_parse_options: JsonParseOptions,
43}
44
45impl JsonAccessBuilder {
46    pub fn generate_json_access(&mut self, payload: Vec<u8>) -> ConnectorResult<JsonAccess<'_>> {
47        // XXX: When will we enter this branch?
48        if payload.is_empty() {
49            self.value = Some("{}".into());
50        } else {
51            self.value = Some(payload);
52        }
53        let value = simd_json::to_borrowed_value(
54            &mut self.value.as_mut().unwrap()[self.payload_start_idx..],
55        )
56        .context("failed to parse json payload")?;
57        Ok(JsonAccess::new_with_options(
58            value,
59            // Debezium and Canal have their special json access builder and will not
60            // use this
61            &self.json_parse_options,
62        ))
63    }
64}
65
66impl AccessBuilder for JsonAccessBuilder {
67    async fn generate_accessor(
68        &mut self,
69        payload: Vec<u8>,
70        _: &crate::source::SourceMeta,
71    ) -> ConnectorResult<AccessImpl<'_>> {
72        Ok(AccessImpl::Json(self.generate_json_access(payload)?))
73    }
74}
75
76impl JsonAccessBuilder {
77    pub fn new(config: JsonProperties) -> ConnectorResult<Self> {
78        let mut json_parse_options = JsonParseOptions::DEFAULT;
79        if let Some(mode) = config.timestamp_handling {
80            json_parse_options.timestamp_handling = mode;
81        }
82        if let Some(mode) = config.timestamptz_handling {
83            json_parse_options.timestamptz_handling = mode;
84        }
85        if let Some(mode) = config.time_handling {
86            json_parse_options.time_handling = mode;
87        }
88        if let Some(mode) = config.bigint_unsigned_handling {
89            json_parse_options.bigint_unsigned_handling = mode;
90        }
91        json_parse_options.handle_toast_columns = config.handle_toast_columns;
92        Ok(Self {
93            value: None,
94            payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
95            json_parse_options,
96        })
97    }
98}
99
100pub async fn fetch_json_schema_and_map_to_columns(
101    schema_location: &str,
102    schema_registry_auth: Option<SchemaRegistryConfig>,
103    props: &BTreeMap<String, String>,
104) -> ConnectorResult<Vec<Field>> {
105    let url = handle_sr_list(schema_location)?;
106    let mut json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
107        let client = Client::new(url.clone(), &schema_registry_auth)?;
108        let topic = get_kafka_topic(props)?;
109        let schema = client
110            .get_schema_by_subject(&format!("{}-value", topic))
111            .await?;
112        JsonSchema::parse_str(&schema.content)?
113    } else {
114        let url = url.first().unwrap();
115        let bytes = bytes_from_url(url, None).await?;
116        JsonSchema::parse_bytes(&bytes)?
117    };
118    json_schema
119        .json_schema_to_columns(url.first().unwrap().clone())
120        .await
121        .map_err(Into::into)
122}
123
124#[cfg(test)]
125mod tests {
126    use std::vec;
127
128    use itertools::Itertools;
129    use risingwave_common::array::{Op, StructValue};
130    use risingwave_common::catalog::ColumnDesc;
131    use risingwave_common::row::Row;
132    use risingwave_common::test_prelude::StreamChunkTestExt;
133    use risingwave_common::types::{DataType, ScalarImpl, StructType, ToOwnedDatum};
134    use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
135    use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey};
136
137    use crate::parser::test_utils::ByteStreamSourceParserImplTestExt as _;
138    use crate::parser::{
139        ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, ProtocolProperties,
140        SourceColumnDesc, SpecificParserConfig,
141    };
142    use crate::source::SourceColumnType;
143
144    fn make_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
145        ByteStreamSourceParserImpl::create_for_test(ParserConfig {
146            common: CommonParserConfig { rw_columns },
147            specific: SpecificParserConfig::DEFAULT_PLAIN_JSON,
148        })
149        .unwrap()
150    }
151
152    fn make_upsert_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
153        ByteStreamSourceParserImpl::create_for_test(ParserConfig {
154            common: CommonParserConfig { rw_columns },
155            specific: SpecificParserConfig {
156                protocol_config: ProtocolProperties::Upsert,
157                ..SpecificParserConfig::DEFAULT_PLAIN_JSON
158            },
159        })
160        .unwrap()
161    }
162
163    fn get_payload() -> Vec<Vec<u8>> {
164        vec![
165            br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890,"interval":"P1Y2M3DT0H5M0S"}"#.to_vec(),
166            br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345,"interval":"1 day"}"#.to_vec(),
167        ]
168    }
169
170    fn get_array_top_level_payload() -> Vec<Vec<u8>> {
171        vec![
172            br#"[{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}, {"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}]"#.to_vec()
173        ]
174    }
175
176    async fn test_json_parser(get_payload: fn() -> Vec<Vec<u8>>) {
177        let descs = vec![
178            SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
179            SourceColumnDesc::simple("bool", DataType::Boolean, 2.into()),
180            SourceColumnDesc::simple("i16", DataType::Int16, 3.into()),
181            SourceColumnDesc::simple("i64", DataType::Int64, 4.into()),
182            SourceColumnDesc::simple("f32", DataType::Float32, 5.into()),
183            SourceColumnDesc::simple("f64", DataType::Float64, 6.into()),
184            SourceColumnDesc::simple("varchar", DataType::Varchar, 7.into()),
185            SourceColumnDesc::simple("date", DataType::Date, 8.into()),
186            SourceColumnDesc::simple("timestamp", DataType::Timestamp, 9.into()),
187            SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()),
188            SourceColumnDesc::simple("interval", DataType::Interval, 11.into()),
189        ];
190
191        let parser = make_parser(descs);
192        let chunk = parser.parse(get_payload()).await;
193
194        let mut rows = chunk.rows();
195
196        {
197            let (op, row) = rows.next().unwrap();
198            assert_eq!(op, Op::Insert);
199            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
200            assert_eq!(
201                row.datum_at(1).to_owned_datum(),
202                (Some(ScalarImpl::Bool(true)))
203            );
204            assert_eq!(
205                row.datum_at(2).to_owned_datum(),
206                (Some(ScalarImpl::Int16(1)))
207            );
208            assert_eq!(
209                row.datum_at(3).to_owned_datum(),
210                (Some(ScalarImpl::Int64(12345678)))
211            );
212            assert_eq!(
213                row.datum_at(4).to_owned_datum(),
214                (Some(ScalarImpl::Float32(1.23.into())))
215            );
216            assert_eq!(
217                row.datum_at(5).to_owned_datum(),
218                (Some(ScalarImpl::Float64(1.2345.into())))
219            );
220            assert_eq!(
221                row.datum_at(6).to_owned_datum(),
222                (Some(ScalarImpl::Utf8("varchar".into())))
223            );
224            assert_eq!(
225                row.datum_at(7).to_owned_datum(),
226                (Some(ScalarImpl::Date("2021-01-01".parse().unwrap())))
227            );
228            assert_eq!(
229                row.datum_at(8).to_owned_datum(),
230                (Some(ScalarImpl::Timestamp(
231                    "2021-01-01 16:06:12.269".parse().unwrap()
232                )))
233            );
234            assert_eq!(
235                row.datum_at(9).to_owned_datum(),
236                (Some(ScalarImpl::Decimal("12345.67890".parse().unwrap())))
237            );
238            assert_eq!(
239                row.datum_at(10).to_owned_datum(),
240                (Some(ScalarImpl::Interval("P1Y2M3DT0H5M0S".parse().unwrap())))
241            );
242        }
243
244        {
245            let (op, row) = rows.next().unwrap();
246            assert_eq!(op, Op::Insert);
247            assert_eq!(
248                row.datum_at(0).to_owned_datum(),
249                (Some(ScalarImpl::Int32(1)))
250            );
251            assert_eq!(row.datum_at(1).to_owned_datum(), None);
252            assert_eq!(
253                row.datum_at(4).to_owned_datum(),
254                (Some(ScalarImpl::Float32(12345e+10.into())))
255            );
256            assert_eq!(
257                row.datum_at(5).to_owned_datum(),
258                (Some(ScalarImpl::Float64(12345.into())))
259            );
260            assert_eq!(
261                row.datum_at(9).to_owned_datum(),
262                (Some(ScalarImpl::Decimal(12345.into())))
263            );
264            assert_eq!(
265                row.datum_at(10).to_owned_datum(),
266                (Some(ScalarImpl::Interval("1 day".parse().unwrap())))
267            );
268        }
269    }
270
271    #[tokio::test]
272    async fn test_json_parse_object_top_level() {
273        test_json_parser(get_payload).await;
274    }
275    #[ignore]
276    #[tokio::test]
277    async fn test_json_parse_array_top_level() {
278        test_json_parser(get_array_top_level_payload).await;
279    }
280
281    #[tokio::test]
282    async fn test_json_parser_failed() {
283        let descs = vec![
284            SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
285            SourceColumnDesc::simple("v2", DataType::Int16, 1.into()),
286            SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()),
287        ];
288
289        let parser = make_parser(descs);
290        let payloads = vec![
291            // Parse a correct record.
292            br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
293            // Parse an incorrect record.
294            // `v2` overflowed.
295            // ignored the error, and fill None at v2.
296            br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(),
297            // Parse a correct record.
298            br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
299        ];
300        let chunk = parser.parse(payloads).await;
301
302        assert!(chunk.valid());
303        assert_eq!(chunk.cardinality(), 3);
304
305        let row_vec = chunk.rows().collect_vec();
306        assert_eq!(row_vec[1].1.datum_at(1), None);
307    }
308
309    #[tokio::test]
310    async fn test_json_parse_struct() {
311        let descs = [
312            ColumnDesc::named(
313                "data",
314                0.into(),
315                DataType::from(StructType::new([
316                    ("created_at", DataType::Timestamp),
317                    ("id", DataType::Varchar),
318                    ("text", DataType::Varchar),
319                    ("lang", DataType::Varchar),
320                ])),
321            ),
322            ColumnDesc::named(
323                "author",
324                5.into(),
325                DataType::from(StructType::new([
326                    ("created_at", DataType::Timestamp),
327                    ("id", DataType::Varchar),
328                    ("name", DataType::Varchar),
329                    ("username", DataType::Varchar),
330                ])),
331            ),
332            ColumnDesc::named("I64CastToVarchar", 10.into(), DataType::Varchar),
333            ColumnDesc::named("VarcharCastToI64", 11.into(), DataType::Int64),
334        ]
335        .iter()
336        .map(SourceColumnDesc::from)
337        .collect_vec();
338
339        let parser = make_parser(descs);
340        let payload = br#"
341        {
342            "data": {
343                "created_at": "2022-07-13 20:48:37.07",
344                "id": "1732524418112319151",
345                "text": "Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.",
346                "lang": "English"
347            },
348            "author": {
349                "created_at": "2018-01-29 12:19:11.07",
350                "id": "7772634297",
351                "name": "Lily Frami yet",
352                "username": "Dooley5659"
353            },
354            "I64CastToVarchar": 1598197865760800768,
355            "VarcharCastToI64": "1598197865760800768"
356        }
357        "#.to_vec();
358        let chunk = parser.parse(vec![payload]).await;
359
360        let (op, row) = chunk.rows().next().unwrap();
361        assert_eq!(op, Op::Insert);
362        let row = row.into_owned_row().into_inner();
363
364        let expected = vec![
365            Some(ScalarImpl::Struct(StructValue::new(vec![
366                Some(ScalarImpl::Timestamp(
367                    "2022-07-13 20:48:37.07".parse().unwrap()
368                )),
369                Some(ScalarImpl::Utf8("1732524418112319151".into())),
370                Some(ScalarImpl::Utf8("Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.".into())),
371                Some(ScalarImpl::Utf8("English".into())),
372            ]))),
373            Some(ScalarImpl::Struct(StructValue::new(vec![
374                Some(ScalarImpl::Timestamp(
375                    "2018-01-29 12:19:11.07".parse().unwrap()
376                )),
377                Some(ScalarImpl::Utf8("7772634297".into())),
378                Some(ScalarImpl::Utf8("Lily Frami yet".into())),
379                Some(ScalarImpl::Utf8("Dooley5659".into())),
380            ]) )),
381            Some(ScalarImpl::Utf8("1598197865760800768".into())),
382            Some(ScalarImpl::Int64(1598197865760800768)),
383        ];
384        assert_eq!(row, expected.into());
385    }
386
387    #[tokio::test]
388    async fn test_json_parse_struct_from_string() {
389        let descs = [ColumnDesc::named(
390            "struct",
391            0.into(),
392            DataType::from(StructType::new([
393                ("varchar", DataType::Varchar),
394                ("boolean", DataType::Boolean),
395            ])),
396        )]
397        .iter()
398        .map(SourceColumnDesc::from)
399        .collect_vec();
400
401        let parser = make_parser(descs);
402        let payload = br#"
403        {
404            "struct": "{\"varchar\": \"varchar\", \"boolean\": true}"
405        }
406        "#
407        .to_vec();
408        let chunk = parser.parse(vec![payload]).await;
409
410        let (op, row) = chunk.rows().next().unwrap();
411        assert_eq!(op, Op::Insert);
412        let row = row.into_owned_row().into_inner();
413
414        let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
415            Some(ScalarImpl::Utf8("varchar".into())),
416            Some(ScalarImpl::Bool(true)),
417        ])))];
418        assert_eq!(row, expected.into());
419    }
420
421    #[cfg(not(madsim))] // Traced test does not work with madsim
422    #[tokio::test]
423    #[tracing_test::traced_test]
424    async fn test_json_parse_struct_missing_field_warning() {
425        let descs = [ColumnDesc::named(
426            "struct",
427            0.into(),
428            DataType::from(StructType::new([
429                ("varchar", DataType::Varchar),
430                ("boolean", DataType::Boolean),
431            ])),
432        )]
433        .iter()
434        .map(SourceColumnDesc::from)
435        .collect_vec();
436
437        let parser = make_parser(descs);
438        let payload = br#"
439        {
440            "struct": {
441                "varchar": "varchar"
442            }
443        }
444        "#
445        .to_vec();
446        let chunk = parser.parse(vec![payload]).await;
447
448        let (op, row) = chunk.rows().next().unwrap();
449        assert_eq!(op, Op::Insert);
450        let row = row.into_owned_row().into_inner();
451
452        let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
453            Some(ScalarImpl::Utf8("varchar".into())),
454            None,
455        ])))];
456        assert_eq!(row, expected.into());
457
458        assert!(logs_contain("undefined nested field, padding with `NULL`"));
459    }
460
461    #[tokio::test]
462    async fn test_json_upsert_parser() {
463        let items = [
464            (r#"{"a":1}"#, r#"{"a":1,"b":2}"#),
465            (r#"{"a":1}"#, r#"{"a":1,"b":3}"#),
466            (r#"{"a":2}"#, r#"{"a":2,"b":2}"#),
467            (r#"{"a":2}"#, r#""#),
468        ]
469        .into_iter()
470        .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
471        .collect_vec();
472
473        let key_column_desc = SourceColumnDesc {
474            name: "rw_key".into(),
475            data_type: DataType::Bytea,
476            column_id: 2.into(),
477            column_type: SourceColumnType::Normal,
478            is_pk: true,
479            is_hidden_addition_col: false,
480            additional_column: AdditionalColumn {
481                column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
482            },
483        };
484        let descs = vec![
485            SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
486            SourceColumnDesc::simple("b", DataType::Int32, 1.into()),
487            key_column_desc,
488        ];
489
490        let parser = make_upsert_parser(descs);
491        let chunk = parser.parse_upsert(items).await;
492
493        // expected chunk
494        // +---+---+---+------------------+
495        // | + | 1 | 2 | \x7b2261223a317d |
496        // | + | 1 | 3 | \x7b2261223a317d |
497        // | + | 2 | 2 | \x7b2261223a327d |
498        // | - |   |   | \x7b2261223a327d |
499        // +---+---+---+------------------+
500
501        let mut rows = chunk.rows();
502        {
503            let (op, row) = rows.next().unwrap();
504            assert_eq!(op, Op::Insert);
505            assert_eq!(
506                row.datum_at(0).to_owned_datum(),
507                (Some(ScalarImpl::Int32(1)))
508            );
509        }
510
511        {
512            let (op, row) = rows.next().unwrap();
513            assert_eq!(op, Op::Insert);
514            assert_eq!(
515                row.datum_at(0).to_owned_datum(),
516                (Some(ScalarImpl::Int32(1)))
517            );
518        }
519        {
520            let (op, row) = rows.next().unwrap();
521            assert_eq!(op, Op::Insert);
522            assert_eq!(
523                row.datum_at(0).to_owned_datum(),
524                (Some(ScalarImpl::Int32(2)))
525            );
526        }
527        {
528            let (op, row) = rows.next().unwrap();
529            assert_eq!(op, Op::Delete);
530            assert_eq!(row.datum_at(0).to_owned_datum(), (None));
531        }
532    }
533}