risingwave_connector_codec/decoder/json/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// The MIT License (MIT)
16//
17// Copyright (c) 2021 David Raznick
18//
19// Permission is hereby granted, free of charge, to any person obtaining a copy
20// of this software and associated documentation files (the "Software"), to deal
21// in the Software without restriction, including without limitation the rights
22// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
23// copies of the Software, and to permit persons to whom the Software is
24// furnished to do so, subject to the following conditions:
25//
26// The above copyright notice and this permission notice shall be included in all
27// copies or substantial portions of the Software.
28//
29// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
30// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
31// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
32// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
33// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
34// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
35// SOFTWARE.
36
37use std::collections::HashMap;
38use std::fs;
39
40use anyhow::Context;
41use risingwave_common::catalog::Field;
42use serde_json::Value;
43use thiserror::Error;
44use url::Url;
45
46use super::avro::{MapHandling, avro_schema_to_fields};
47
48#[derive(Debug, Error, thiserror_ext::ContextInto)]
49pub enum Error {
50    #[error("could not open schema from {filename}")]
51    SchemaFromFile {
52        filename: String,
53        source: std::io::Error,
54    },
55    #[error("parse error for url {url}")]
56    UrlParse {
57        url: String,
58        source: url::ParseError,
59    },
60    #[error("schema from {url} not valid JSON")]
61    SchemaNotJson { url: String, source: std::io::Error },
62    #[error("request error")]
63    Request { url: String, source: reqwest::Error },
64    #[error("schema from {url} not valid JSON")]
65    SchemaNotJsonSerde {
66        url: String,
67        source: serde_json::Error,
68    },
69    #[error(
70        "ref `{ref_string}` can not be resolved as pointer, `{ref_fragment}` can not be found in the schema"
71    )]
72    JsonRefPointerNotFound {
73        ref_string: String,
74        ref_fragment: String,
75    },
76    #[error("json ref error")]
77    JsonRef {
78        #[from]
79        source: std::io::Error,
80    },
81    #[error("need url to be a file or a http based, got {url}")]
82    UnsupportedUrl { url: String },
83    #[error(transparent)]
84    Uncategorized(
85        #[from]
86        #[backtrace]
87        anyhow::Error,
88    ),
89}
90
91type Result<T, E = Error> = std::result::Result<T, E>;
92
93#[derive(Debug)]
94pub struct JsonRef {
95    schema_cache: HashMap<String, Value>,
96}
97
98impl JsonRef {
99    fn new() -> JsonRef {
100        JsonRef {
101            schema_cache: HashMap::new(),
102        }
103    }
104
105    async fn deref_value(&mut self, value: &mut Value, retrieval_url: &Url) -> Result<()> {
106        self.schema_cache
107            .insert(retrieval_url.to_string(), value.clone());
108        self.deref(value, retrieval_url, &vec![]).await?;
109        Ok(())
110    }
111
112    async fn deref(
113        &mut self,
114        value: &mut Value,
115        base_url: &Url,
116        used_refs: &Vec<String>,
117    ) -> Result<()> {
118        if let Some(obj) = value.as_object_mut()
119            && let Some(ref_value) = obj.remove("$ref")
120            && let Some(ref_string) = ref_value.as_str()
121        {
122            let ref_url = base_url.join(ref_string).into_url_parse(ref_string)?;
123            let mut ref_url_no_fragment = ref_url.clone();
124            ref_url_no_fragment.set_fragment(None);
125            let url_schema = ref_url_no_fragment.scheme();
126            let ref_no_fragment = ref_url_no_fragment.to_string();
127
128            let mut schema = match self.schema_cache.get(&ref_no_fragment) {
129                Some(cached_schema) => cached_schema.clone(),
130                None => {
131                    if url_schema == "http" || url_schema == "https" {
132                        reqwest::get(ref_url_no_fragment.clone())
133                            .await
134                            .into_request(&ref_no_fragment)?
135                            .json()
136                            .await
137                            .into_request(&ref_no_fragment)?
138                    } else if url_schema == "file" {
139                        let file_path = ref_url_no_fragment.to_file_path().map_err(|_| {
140                            anyhow::anyhow!(
141                                "could not convert url {} to file path",
142                                &ref_url_no_fragment
143                            )
144                        })?;
145                        let file =
146                            fs::File::open(file_path).into_schema_from_file(&ref_no_fragment)?;
147                        serde_json::from_reader(file)
148                            .into_schema_not_json_serde(ref_no_fragment.clone())?
149                    } else {
150                        return Err(Error::UnsupportedUrl {
151                            url: ref_no_fragment,
152                        });
153                    }
154                }
155            };
156
157            if !self.schema_cache.contains_key(&ref_no_fragment) {
158                self.schema_cache
159                    .insert(ref_no_fragment.clone(), schema.clone());
160            }
161
162            let ref_url_string = ref_url.to_string();
163            if let Some(ref_fragment) = ref_url.fragment() {
164                schema = schema
165                    .pointer(ref_fragment)
166                    .ok_or(Error::JsonRefPointerNotFound {
167                        ref_string: ref_string.to_owned(),
168                        ref_fragment: ref_fragment.to_owned(),
169                    })?
170                    .clone();
171            }
172            // Do not deref a url twice to prevent infinite loops
173            if used_refs.contains(&ref_url_string) {
174                return Ok(());
175            }
176            let mut new_used_refs = used_refs.clone();
177            new_used_refs.push(ref_url_string);
178            Box::pin(self.deref(&mut schema, &ref_url_no_fragment, &new_used_refs)).await?;
179
180            *value = schema;
181        }
182
183        if let Some(obj) = value.as_object_mut() {
184            for obj_value in obj.values_mut() {
185                Box::pin(self.deref(obj_value, base_url, used_refs)).await?
186            }
187        }
188        Ok(())
189    }
190}
191
192impl crate::JsonSchema {
193    /// FIXME: when the JSON schema is invalid, it will panic.
194    ///
195    /// ## Notes on type conversion
196    /// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
197    /// When an object has `properties` and `additionalProperties`, the latter will be ignored.
198    /// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
199    ///
200    /// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
201    pub async fn json_schema_to_columns(
202        &mut self,
203        retrieval_url: Url,
204    ) -> anyhow::Result<Vec<Field>> {
205        JsonRef::new()
206            .deref_value(&mut self.0, &retrieval_url)
207            .await?;
208        let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string();
209        let schema =
210            apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
211        avro_schema_to_fields(&schema, Some(MapHandling::Jsonb))
212    }
213}