risingwave_connector/parser/
json_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Note on this file:
16//
17// There's no struct named `JsonParser` anymore since #13707. `ENCODE JSON` will be
18// dispatched to `PlainParser` or `UpsertParser` with `JsonAccessBuilder` instead.
19//
20// This file now only contains utilities and tests for JSON parsing. Also, to avoid
21// rely on the internal implementation and allow that to be changed, the tests use
22// `ByteStreamSourceParserImpl` to create a parser instance.
23
24use std::collections::BTreeMap;
25
26use anyhow::Context as _;
27use risingwave_common::catalog::Field;
28use risingwave_connector_codec::JsonSchema;
29
30use super::utils::{bytes_from_url, get_kafka_topic};
31use super::{JsonProperties, SchemaRegistryAuth};
32use crate::error::ConnectorResult;
33use crate::parser::AccessBuilder;
34use crate::parser::unified::AccessImpl;
35use crate::parser::unified::json::{JsonAccess, JsonParseOptions};
36use crate::schema::schema_registry::{Client, handle_sr_list};
37
38#[derive(Debug)]
39pub struct JsonAccessBuilder {
40    value: Option<Vec<u8>>,
41    payload_start_idx: usize,
42    json_parse_options: JsonParseOptions,
43}
44
45impl AccessBuilder for JsonAccessBuilder {
46    #[allow(clippy::unused_async)]
47    async fn generate_accessor(
48        &mut self,
49        payload: Vec<u8>,
50        _: &crate::source::SourceMeta,
51    ) -> ConnectorResult<AccessImpl<'_>> {
52        // XXX: When will we enter this branch?
53        if payload.is_empty() {
54            self.value = Some("{}".into());
55        } else {
56            self.value = Some(payload);
57        }
58        let value = simd_json::to_borrowed_value(
59            &mut self.value.as_mut().unwrap()[self.payload_start_idx..],
60        )
61        .context("failed to parse json payload")?;
62        Ok(AccessImpl::Json(JsonAccess::new_with_options(
63            value,
64            // Debezium and Canal have their special json access builder and will not
65            // use this
66            &self.json_parse_options,
67        )))
68    }
69}
70
71impl JsonAccessBuilder {
72    pub fn new(config: JsonProperties) -> ConnectorResult<Self> {
73        let mut json_parse_options = JsonParseOptions::DEFAULT;
74        if let Some(mode) = config.timestamptz_handling {
75            json_parse_options.timestamptz_handling = mode;
76        }
77        Ok(Self {
78            value: None,
79            payload_start_idx: if config.use_schema_registry { 5 } else { 0 },
80            json_parse_options,
81        })
82    }
83}
84
85pub async fn fetch_json_schema_and_map_to_columns(
86    schema_location: &str,
87    schema_registry_auth: Option<SchemaRegistryAuth>,
88    props: &BTreeMap<String, String>,
89) -> ConnectorResult<Vec<Field>> {
90    let url = handle_sr_list(schema_location)?;
91    let mut json_schema = if let Some(schema_registry_auth) = schema_registry_auth {
92        let client = Client::new(url.clone(), &schema_registry_auth)?;
93        let topic = get_kafka_topic(props)?;
94        let schema = client
95            .get_schema_by_subject(&format!("{}-value", topic))
96            .await?;
97        JsonSchema::parse_str(&schema.content)?
98    } else {
99        let url = url.first().unwrap();
100        let bytes = bytes_from_url(url, None).await?;
101        JsonSchema::parse_bytes(&bytes)?
102    };
103    json_schema
104        .json_schema_to_columns(url.first().unwrap().clone())
105        .await
106        .map_err(Into::into)
107}
108
109#[cfg(test)]
110mod tests {
111    use std::vec;
112
113    use itertools::Itertools;
114    use risingwave_common::array::{Op, StructValue};
115    use risingwave_common::catalog::ColumnDesc;
116    use risingwave_common::row::Row;
117    use risingwave_common::test_prelude::StreamChunkTestExt;
118    use risingwave_common::types::{DataType, ScalarImpl, StructType, ToOwnedDatum};
119    use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
120    use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey};
121
122    use crate::parser::test_utils::ByteStreamSourceParserImplTestExt as _;
123    use crate::parser::{
124        ByteStreamSourceParserImpl, CommonParserConfig, ParserConfig, ProtocolProperties,
125        SourceColumnDesc, SpecificParserConfig,
126    };
127    use crate::source::SourceColumnType;
128
129    fn make_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
130        ByteStreamSourceParserImpl::create_for_test(ParserConfig {
131            common: CommonParserConfig { rw_columns },
132            specific: SpecificParserConfig::DEFAULT_PLAIN_JSON,
133        })
134        .unwrap()
135    }
136
137    fn make_upsert_parser(rw_columns: Vec<SourceColumnDesc>) -> ByteStreamSourceParserImpl {
138        ByteStreamSourceParserImpl::create_for_test(ParserConfig {
139            common: CommonParserConfig { rw_columns },
140            specific: SpecificParserConfig {
141                protocol_config: ProtocolProperties::Upsert,
142                ..SpecificParserConfig::DEFAULT_PLAIN_JSON
143            },
144        })
145        .unwrap()
146    }
147
148    fn get_payload() -> Vec<Vec<u8>> {
149        vec![
150            br#"{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890,"interval":"P1Y2M3DT0H5M0S"}"#.to_vec(),
151            br#"{"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345,"interval":"1 day"}"#.to_vec(),
152        ]
153    }
154
155    fn get_array_top_level_payload() -> Vec<Vec<u8>> {
156        vec![
157            br#"[{"i32":1,"bool":true,"i16":1,"i64":12345678,"f32":1.23,"f64":1.2345,"varchar":"varchar","date":"2021-01-01","timestamp":"2021-01-01 16:06:12.269","decimal":12345.67890}, {"i32":1,"f32":12345e+10,"f64":12345,"decimal":12345}]"#.to_vec()
158        ]
159    }
160
161    async fn test_json_parser(get_payload: fn() -> Vec<Vec<u8>>) {
162        let descs = vec![
163            SourceColumnDesc::simple("i32", DataType::Int32, 0.into()),
164            SourceColumnDesc::simple("bool", DataType::Boolean, 2.into()),
165            SourceColumnDesc::simple("i16", DataType::Int16, 3.into()),
166            SourceColumnDesc::simple("i64", DataType::Int64, 4.into()),
167            SourceColumnDesc::simple("f32", DataType::Float32, 5.into()),
168            SourceColumnDesc::simple("f64", DataType::Float64, 6.into()),
169            SourceColumnDesc::simple("varchar", DataType::Varchar, 7.into()),
170            SourceColumnDesc::simple("date", DataType::Date, 8.into()),
171            SourceColumnDesc::simple("timestamp", DataType::Timestamp, 9.into()),
172            SourceColumnDesc::simple("decimal", DataType::Decimal, 10.into()),
173            SourceColumnDesc::simple("interval", DataType::Interval, 11.into()),
174        ];
175
176        let parser = make_parser(descs);
177        let chunk = parser.parse(get_payload()).await;
178
179        let mut rows = chunk.rows();
180
181        {
182            let (op, row) = rows.next().unwrap();
183            assert_eq!(op, Op::Insert);
184            assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
185            assert_eq!(
186                row.datum_at(1).to_owned_datum(),
187                (Some(ScalarImpl::Bool(true)))
188            );
189            assert_eq!(
190                row.datum_at(2).to_owned_datum(),
191                (Some(ScalarImpl::Int16(1)))
192            );
193            assert_eq!(
194                row.datum_at(3).to_owned_datum(),
195                (Some(ScalarImpl::Int64(12345678)))
196            );
197            assert_eq!(
198                row.datum_at(4).to_owned_datum(),
199                (Some(ScalarImpl::Float32(1.23.into())))
200            );
201            assert_eq!(
202                row.datum_at(5).to_owned_datum(),
203                (Some(ScalarImpl::Float64(1.2345.into())))
204            );
205            assert_eq!(
206                row.datum_at(6).to_owned_datum(),
207                (Some(ScalarImpl::Utf8("varchar".into())))
208            );
209            assert_eq!(
210                row.datum_at(7).to_owned_datum(),
211                (Some(ScalarImpl::Date("2021-01-01".parse().unwrap())))
212            );
213            assert_eq!(
214                row.datum_at(8).to_owned_datum(),
215                (Some(ScalarImpl::Timestamp(
216                    "2021-01-01 16:06:12.269".parse().unwrap()
217                )))
218            );
219            assert_eq!(
220                row.datum_at(9).to_owned_datum(),
221                (Some(ScalarImpl::Decimal("12345.67890".parse().unwrap())))
222            );
223            assert_eq!(
224                row.datum_at(10).to_owned_datum(),
225                (Some(ScalarImpl::Interval("P1Y2M3DT0H5M0S".parse().unwrap())))
226            );
227        }
228
229        {
230            let (op, row) = rows.next().unwrap();
231            assert_eq!(op, Op::Insert);
232            assert_eq!(
233                row.datum_at(0).to_owned_datum(),
234                (Some(ScalarImpl::Int32(1)))
235            );
236            assert_eq!(row.datum_at(1).to_owned_datum(), None);
237            assert_eq!(
238                row.datum_at(4).to_owned_datum(),
239                (Some(ScalarImpl::Float32(12345e+10.into())))
240            );
241            assert_eq!(
242                row.datum_at(5).to_owned_datum(),
243                (Some(ScalarImpl::Float64(12345.into())))
244            );
245            assert_eq!(
246                row.datum_at(9).to_owned_datum(),
247                (Some(ScalarImpl::Decimal(12345.into())))
248            );
249            assert_eq!(
250                row.datum_at(10).to_owned_datum(),
251                (Some(ScalarImpl::Interval("1 day".parse().unwrap())))
252            );
253        }
254    }
255
256    #[tokio::test]
257    async fn test_json_parse_object_top_level() {
258        test_json_parser(get_payload).await;
259    }
260    #[ignore]
261    #[tokio::test]
262    async fn test_json_parse_array_top_level() {
263        test_json_parser(get_array_top_level_payload).await;
264    }
265
266    #[tokio::test]
267    async fn test_json_parser_failed() {
268        let descs = vec![
269            SourceColumnDesc::simple("v1", DataType::Int32, 0.into()),
270            SourceColumnDesc::simple("v2", DataType::Int16, 1.into()),
271            SourceColumnDesc::simple("v3", DataType::Varchar, 2.into()),
272        ];
273
274        let parser = make_parser(descs);
275        let payloads = vec![
276            // Parse a correct record.
277            br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
278            // Parse an incorrect record.
279            // `v2` overflowed.
280            // ignored the error, and fill None at v2.
281            br#"{"v1": 1, "v2": 65536, "v3": "3"}"#.to_vec(),
282            // Parse a correct record.
283            br#"{"v1": 1, "v2": 2, "v3": "3"}"#.to_vec(),
284        ];
285        let chunk = parser.parse(payloads).await;
286
287        assert!(chunk.valid());
288        assert_eq!(chunk.cardinality(), 3);
289
290        let row_vec = chunk.rows().collect_vec();
291        assert_eq!(row_vec[1].1.datum_at(1), None);
292    }
293
294    #[tokio::test]
295    async fn test_json_parse_struct() {
296        let descs = vec![
297            ColumnDesc::named(
298                "data",
299                0.into(),
300                DataType::from(StructType::new([
301                    ("created_at", DataType::Timestamp),
302                    ("id", DataType::Varchar),
303                    ("text", DataType::Varchar),
304                    ("lang", DataType::Varchar),
305                ])),
306            ),
307            ColumnDesc::named(
308                "author",
309                5.into(),
310                DataType::from(StructType::new([
311                    ("created_at", DataType::Timestamp),
312                    ("id", DataType::Varchar),
313                    ("name", DataType::Varchar),
314                    ("username", DataType::Varchar),
315                ])),
316            ),
317            ColumnDesc::named("I64CastToVarchar", 10.into(), DataType::Varchar),
318            ColumnDesc::named("VarcharCastToI64", 11.into(), DataType::Int64),
319        ]
320        .iter()
321        .map(SourceColumnDesc::from)
322        .collect_vec();
323
324        let parser = make_parser(descs);
325        let payload = br#"
326        {
327            "data": {
328                "created_at": "2022-07-13 20:48:37.07",
329                "id": "1732524418112319151",
330                "text": "Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.",
331                "lang": "English"
332            },
333            "author": {
334                "created_at": "2018-01-29 12:19:11.07",
335                "id": "7772634297",
336                "name": "Lily Frami yet",
337                "username": "Dooley5659"
338            },
339            "I64CastToVarchar": 1598197865760800768,
340            "VarcharCastToI64": "1598197865760800768"
341        }
342        "#.to_vec();
343        let chunk = parser.parse(vec![payload]).await;
344
345        let (op, row) = chunk.rows().next().unwrap();
346        assert_eq!(op, Op::Insert);
347        let row = row.into_owned_row().into_inner();
348
349        let expected = vec![
350            Some(ScalarImpl::Struct(StructValue::new(vec![
351                Some(ScalarImpl::Timestamp(
352                    "2022-07-13 20:48:37.07".parse().unwrap()
353                )),
354                Some(ScalarImpl::Utf8("1732524418112319151".into())),
355                Some(ScalarImpl::Utf8("Here man favor ourselves mysteriously most her sigh in straightaway for afterwards.".into())),
356                Some(ScalarImpl::Utf8("English".into())),
357            ]))),
358            Some(ScalarImpl::Struct(StructValue::new(vec![
359                Some(ScalarImpl::Timestamp(
360                    "2018-01-29 12:19:11.07".parse().unwrap()
361                )),
362                Some(ScalarImpl::Utf8("7772634297".into())),
363                Some(ScalarImpl::Utf8("Lily Frami yet".into())),
364                Some(ScalarImpl::Utf8("Dooley5659".into())),
365            ]) )),
366            Some(ScalarImpl::Utf8("1598197865760800768".into())),
367            Some(ScalarImpl::Int64(1598197865760800768)),
368        ];
369        assert_eq!(row, expected.into());
370    }
371
372    #[tokio::test]
373    async fn test_json_parse_struct_from_string() {
374        let descs = vec![ColumnDesc::named(
375            "struct",
376            0.into(),
377            DataType::from(StructType::new([
378                ("varchar", DataType::Varchar),
379                ("boolean", DataType::Boolean),
380            ])),
381        )]
382        .iter()
383        .map(SourceColumnDesc::from)
384        .collect_vec();
385
386        let parser = make_parser(descs);
387        let payload = br#"
388        {
389            "struct": "{\"varchar\": \"varchar\", \"boolean\": true}"
390        }
391        "#
392        .to_vec();
393        let chunk = parser.parse(vec![payload]).await;
394
395        let (op, row) = chunk.rows().next().unwrap();
396        assert_eq!(op, Op::Insert);
397        let row = row.into_owned_row().into_inner();
398
399        let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
400            Some(ScalarImpl::Utf8("varchar".into())),
401            Some(ScalarImpl::Bool(true)),
402        ])))];
403        assert_eq!(row, expected.into());
404    }
405
406    #[cfg(not(madsim))] // Traced test does not work with madsim
407    #[tokio::test]
408    #[tracing_test::traced_test]
409    async fn test_json_parse_struct_missing_field_warning() {
410        let descs = vec![ColumnDesc::named(
411            "struct",
412            0.into(),
413            DataType::from(StructType::new([
414                ("varchar", DataType::Varchar),
415                ("boolean", DataType::Boolean),
416            ])),
417        )]
418        .iter()
419        .map(SourceColumnDesc::from)
420        .collect_vec();
421
422        let parser = make_parser(descs);
423        let payload = br#"
424        {
425            "struct": {
426                "varchar": "varchar"
427            }
428        }
429        "#
430        .to_vec();
431        let chunk = parser.parse(vec![payload]).await;
432
433        let (op, row) = chunk.rows().next().unwrap();
434        assert_eq!(op, Op::Insert);
435        let row = row.into_owned_row().into_inner();
436
437        let expected = vec![Some(ScalarImpl::Struct(StructValue::new(vec![
438            Some(ScalarImpl::Utf8("varchar".into())),
439            None,
440        ])))];
441        assert_eq!(row, expected.into());
442
443        assert!(logs_contain("undefined nested field, padding with `NULL`"));
444    }
445
446    #[tokio::test]
447    async fn test_json_upsert_parser() {
448        let items = [
449            (r#"{"a":1}"#, r#"{"a":1,"b":2}"#),
450            (r#"{"a":1}"#, r#"{"a":1,"b":3}"#),
451            (r#"{"a":2}"#, r#"{"a":2,"b":2}"#),
452            (r#"{"a":2}"#, r#""#),
453        ]
454        .into_iter()
455        .map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
456        .collect_vec();
457
458        let key_column_desc = SourceColumnDesc {
459            name: "rw_key".into(),
460            data_type: DataType::Bytea,
461            column_id: 2.into(),
462            column_type: SourceColumnType::Normal,
463            is_pk: true,
464            is_hidden_addition_col: false,
465            additional_column: AdditionalColumn {
466                column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})),
467            },
468        };
469        let descs = vec![
470            SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
471            SourceColumnDesc::simple("b", DataType::Int32, 1.into()),
472            key_column_desc,
473        ];
474
475        let parser = make_upsert_parser(descs);
476        let chunk = parser.parse_upsert(items).await;
477
478        // expected chunk
479        // +---+---+---+------------------+
480        // | + | 1 | 2 | \x7b2261223a317d |
481        // | + | 1 | 3 | \x7b2261223a317d |
482        // | + | 2 | 2 | \x7b2261223a327d |
483        // | - |   |   | \x7b2261223a327d |
484        // +---+---+---+------------------+
485
486        let mut rows = chunk.rows();
487        {
488            let (op, row) = rows.next().unwrap();
489            assert_eq!(op, Op::Insert);
490            assert_eq!(
491                row.datum_at(0).to_owned_datum(),
492                (Some(ScalarImpl::Int32(1)))
493            );
494        }
495
496        {
497            let (op, row) = rows.next().unwrap();
498            assert_eq!(op, Op::Insert);
499            assert_eq!(
500                row.datum_at(0).to_owned_datum(),
501                (Some(ScalarImpl::Int32(1)))
502            );
503        }
504        {
505            let (op, row) = rows.next().unwrap();
506            assert_eq!(op, Op::Insert);
507            assert_eq!(
508                row.datum_at(0).to_owned_datum(),
509                (Some(ScalarImpl::Int32(2)))
510            );
511        }
512        {
513            let (op, row) = rows.next().unwrap();
514            assert_eq!(op, Op::Delete);
515            assert_eq!(row.datum_at(0).to_owned_datum(), (None));
516        }
517    }
518}