risingwave_frontend/handler/create_source/external_schema/
json.rsuse super::*;
pub fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool {
match schema_config {
None => false,
Some((_, use_registry)) => *use_registry,
}
}
pub async fn extract_json_table_schema(
schema_config: &Option<(AstString, bool)>,
with_properties: &BTreeMap<String, String>,
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<Option<Vec<ColumnCatalog>>> {
match schema_config {
None => Ok(None),
Some((schema_location, use_schema_registry)) => {
let schema_registry_auth = use_schema_registry.then(|| {
let auth = SchemaRegistryAuth::from(&*format_encode_options);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
auth
});
Ok(Some(
fetch_json_schema_and_map_to_columns(
&schema_location.0,
schema_registry_auth,
with_properties,
)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
))
}
}
}
pub fn get_json_schema_location(
format_encode_options: &mut BTreeMap<String, String>,
) -> Result<Option<(AstString, bool)>> {
let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
match (schema_location, schema_registry) {
(None, None) => Ok(None),
(None, Some(schema_registry)) => Ok(Some((schema_registry, true))),
(Some(schema_location), None) => Ok(Some((schema_location, false))),
(Some(_), Some(_)) => Err(RwError::from(ProtocolError(
"only need either the schema location or the schema registry".to_owned(),
))),
}
}