risingwave_frontend/handler/create_source/external_schema/
json.rs1use super::*;
16
17pub fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool {
18 match schema_config {
19 None => false,
20 Some((_, use_registry)) => *use_registry,
21 }
22}
23
24pub async fn extract_json_table_schema(
26 schema_config: &Option<(AstString, bool)>,
27 with_properties: &BTreeMap<String, String>,
28 format_encode_options: &mut BTreeMap<String, String>,
29) -> Result<Option<Vec<ColumnCatalog>>> {
30 match schema_config {
31 None => Ok(None),
32 Some((schema_location, use_schema_registry)) => {
33 let schema_registry_auth = use_schema_registry.then(|| {
34 let auth = SchemaRegistryAuth::from(&*format_encode_options);
35 try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
36 try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
37 auth
38 });
39 Ok(Some(
40 fetch_json_schema_and_map_to_columns(
41 &schema_location.0,
42 schema_registry_auth,
43 with_properties,
44 )
45 .await?
46 .into_iter()
47 .map(|col| ColumnCatalog {
48 column_desc: ColumnDesc::from_field_without_column_id(&col),
49 is_hidden: false,
50 })
51 .collect_vec(),
52 ))
53 }
54 }
55}
56
57pub fn get_json_schema_location(
58 format_encode_options: &mut BTreeMap<String, String>,
59) -> Result<Option<(AstString, bool)>> {
60 let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
61 let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
62 match (schema_location, schema_registry) {
63 (None, None) => Ok(None),
64 (None, Some(schema_registry)) => Ok(Some((schema_registry, true))),
65 (Some(schema_location), None) => Ok(Some((schema_location, false))),
66 (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
67 "only need either the schema location or the schema registry".to_owned(),
68 ))),
69 }
70}