risingwave_frontend/handler/create_source/external_schema/
json.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17pub fn json_schema_infer_use_schema_registry(schema_config: &Option<(AstString, bool)>) -> bool {
18    match schema_config {
19        None => false,
20        Some((_, use_registry)) => *use_registry,
21    }
22}
23
24/// Map a JSON schema to a relational schema
25pub async fn extract_json_table_schema(
26    schema_config: &Option<(AstString, bool)>,
27    with_properties: &BTreeMap<String, String>,
28    format_encode_options: &mut BTreeMap<String, String>,
29) -> Result<Option<Vec<ColumnCatalog>>> {
30    match schema_config {
31        None => Ok(None),
32        Some((schema_location, use_schema_registry)) => {
33            let schema_registry_auth = use_schema_registry.then(|| {
34                let auth = SchemaRegistryAuth::from(&*format_encode_options);
35                try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
36                try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
37                auth
38            });
39            Ok(Some(
40                fetch_json_schema_and_map_to_columns(
41                    &schema_location.0,
42                    schema_registry_auth,
43                    with_properties,
44                )
45                .await?
46                .into_iter()
47                .map(|col| ColumnCatalog {
48                    column_desc: ColumnDesc::from_field_without_column_id(&col),
49                    is_hidden: false,
50                })
51                .collect_vec(),
52            ))
53        }
54    }
55}
56
57pub fn get_json_schema_location(
58    format_encode_options: &mut BTreeMap<String, String>,
59) -> Result<Option<(AstString, bool)>> {
60    let schema_location = try_consume_string_from_options(format_encode_options, "schema.location");
61    let schema_registry = try_consume_string_from_options(format_encode_options, "schema.registry");
62    match (schema_location, schema_registry) {
63        (None, None) => Ok(None),
64        (None, Some(schema_registry)) => Ok(Some((schema_registry, true))),
65        (Some(schema_location), None) => Ok(Some((schema_location, false))),
66        (Some(_), Some(_)) => Err(RwError::from(ProtocolError(
67            "only need either the schema location or the schema registry".to_owned(),
68        ))),
69    }
70}