risingwave_frontend/handler/create_source/external_schema/
avro.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17/// Map an Avro schema to a relational schema.
18pub async fn extract_avro_table_schema(
19    info: &StreamSourceInfo,
20    with_properties: &WithOptionsSecResolved,
21    format_encode_options: &mut BTreeMap<String, String>,
22    is_debezium: bool,
23) -> Result<Vec<ColumnCatalog>> {
24    let parser_config = SpecificParserConfig::new(info, with_properties)?;
25    try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
26    try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
27    consume_aws_config_from_options(format_encode_options);
28
29    let vec_column_desc = if is_debezium {
30        let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
31        conf.map_to_columns()?
32    } else {
33        if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
34            &parser_config.encoding_config
35            && matches!(avro_props.schema_location, SchemaLocation::File { .. })
36            && format_encode_options
37                .get("with_deprecated_file_header")
38                .is_none_or(|v| v != "true")
39        {
40            bail_not_implemented!(issue = 12871, "avro without schema registry");
41        }
42        let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
43        conf.map_to_columns()?
44    };
45    Ok(vec_column_desc
46        .into_iter()
47        .map(|col| ColumnCatalog {
48            column_desc: ColumnDesc::from_field_without_column_id(&col),
49            is_hidden: false,
50        })
51        .collect_vec())
52}