risingwave_connector_codec/decoder/json/
mod.rs1use std::collections::HashMap;
38use std::fs;
39
40use anyhow::Context;
41use risingwave_common::catalog::Field;
42use serde_json::Value;
43use thiserror::Error;
44use url::Url;
45
46use super::avro::{MapHandling, avro_schema_to_fields};
47
48#[derive(Debug, Error, thiserror_ext::ContextInto)]
49pub enum Error {
50 #[error("could not open schema from {filename}")]
51 SchemaFromFile {
52 filename: String,
53 source: std::io::Error,
54 },
55 #[error("parse error for url {url}")]
56 UrlParse {
57 url: String,
58 source: url::ParseError,
59 },
60 #[error("schema from {url} not valid JSON")]
61 SchemaNotJson { url: String, source: std::io::Error },
62 #[error("request error")]
63 Request { url: String, source: reqwest::Error },
64 #[error("schema from {url} not valid JSON")]
65 SchemaNotJsonSerde {
66 url: String,
67 source: serde_json::Error,
68 },
69 #[error(
70 "ref `{ref_string}` can not be resolved as pointer, `{ref_fragment}` can not be found in the schema"
71 )]
72 JsonRefPointerNotFound {
73 ref_string: String,
74 ref_fragment: String,
75 },
76 #[error("json ref error")]
77 JsonRef {
78 #[from]
79 source: std::io::Error,
80 },
81 #[error("need url to be a file or a http based, got {url}")]
82 UnsupportedUrl { url: String },
83 #[error(transparent)]
84 Uncategorized(
85 #[from]
86 #[backtrace]
87 anyhow::Error,
88 ),
89}
90
91type Result<T, E = Error> = std::result::Result<T, E>;
92
93#[derive(Debug)]
94pub struct JsonRef {
95 schema_cache: HashMap<String, Value>,
96}
97
98impl JsonRef {
99 fn new() -> JsonRef {
100 JsonRef {
101 schema_cache: HashMap::new(),
102 }
103 }
104
105 async fn deref_value(&mut self, value: &mut Value, retrieval_url: &Url) -> Result<()> {
106 self.schema_cache
107 .insert(retrieval_url.to_string(), value.clone());
108 self.deref(value, retrieval_url, &vec![]).await?;
109 Ok(())
110 }
111
112 async fn deref(
113 &mut self,
114 value: &mut Value,
115 base_url: &Url,
116 used_refs: &Vec<String>,
117 ) -> Result<()> {
118 if let Some(obj) = value.as_object_mut()
119 && let Some(ref_value) = obj.remove("$ref")
120 && let Some(ref_string) = ref_value.as_str()
121 {
122 let ref_url = base_url.join(ref_string).into_url_parse(ref_string)?;
123 let mut ref_url_no_fragment = ref_url.clone();
124 ref_url_no_fragment.set_fragment(None);
125 let url_schema = ref_url_no_fragment.scheme();
126 let ref_no_fragment = ref_url_no_fragment.to_string();
127
128 let mut schema = match self.schema_cache.get(&ref_no_fragment) {
129 Some(cached_schema) => cached_schema.clone(),
130 None => {
131 if url_schema == "http" || url_schema == "https" {
132 reqwest::get(ref_url_no_fragment.clone())
133 .await
134 .into_request(&ref_no_fragment)?
135 .json()
136 .await
137 .into_request(&ref_no_fragment)?
138 } else if url_schema == "file" {
139 let file_path = ref_url_no_fragment.to_file_path().map_err(|_| {
140 anyhow::anyhow!(
141 "could not convert url {} to file path",
142 &ref_url_no_fragment
143 )
144 })?;
145 let file =
146 fs::File::open(file_path).into_schema_from_file(&ref_no_fragment)?;
147 serde_json::from_reader(file)
148 .into_schema_not_json_serde(ref_no_fragment.clone())?
149 } else {
150 return Err(Error::UnsupportedUrl {
151 url: ref_no_fragment,
152 });
153 }
154 }
155 };
156
157 if !self.schema_cache.contains_key(&ref_no_fragment) {
158 self.schema_cache
159 .insert(ref_no_fragment.clone(), schema.clone());
160 }
161
162 let ref_url_string = ref_url.to_string();
163 if let Some(ref_fragment) = ref_url.fragment() {
164 schema = schema
165 .pointer(ref_fragment)
166 .ok_or(Error::JsonRefPointerNotFound {
167 ref_string: ref_string.to_owned(),
168 ref_fragment: ref_fragment.to_owned(),
169 })?
170 .clone();
171 }
172 if used_refs.contains(&ref_url_string) {
174 return Ok(());
175 }
176 let mut new_used_refs = used_refs.clone();
177 new_used_refs.push(ref_url_string);
178 Box::pin(self.deref(&mut schema, &ref_url_no_fragment, &new_used_refs)).await?;
179
180 *value = schema;
181 }
182
183 if let Some(obj) = value.as_object_mut() {
184 for obj_value in obj.values_mut() {
185 Box::pin(self.deref(obj_value, base_url, used_refs)).await?
186 }
187 }
188 Ok(())
189 }
190}
191
192impl crate::JsonSchema {
193 pub async fn json_schema_to_columns(
202 &mut self,
203 retrieval_url: Url,
204 ) -> anyhow::Result<Vec<Field>> {
205 JsonRef::new()
206 .deref_value(&mut self.0, &retrieval_url)
207 .await?;
208 let avro_schema = jst::convert_avro(&self.0, jst::Context::default()).to_string();
209 let schema =
210 apache_avro::Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
211 avro_schema_to_fields(&schema, Some(MapHandling::Jsonb))
212 }
213}