risingwave_frontend/handler/create_source/external_schema/
avro.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::*;
16
17/// Map an Avro schema to a relational schema.
18pub async fn extract_avro_table_schema(
19    info: &StreamSourceInfo,
20    with_properties: &WithOptionsSecResolved,
21    format_encode_options: &mut BTreeMap<String, String>,
22    is_debezium: bool,
23) -> Result<Vec<ColumnCatalog>> {
24    let parser_config = SpecificParserConfig::new(info, with_properties)?;
25    try_consume_schema_registry_config_from_options(format_encode_options);
26    consume_aws_config_from_options(format_encode_options);
27
28    let vec_column_desc = if is_debezium {
29        let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
30        conf.map_to_columns()?
31    } else {
32        if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
33            &parser_config.encoding_config
34            && matches!(avro_props.schema_location, SchemaLocation::File { .. })
35            && format_encode_options
36                .get("with_deprecated_file_header")
37                .is_none_or(|v| v != "true")
38        {
39            bail_not_implemented!(issue = 12871, "avro without schema registry");
40        }
41        let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
42        conf.map_to_columns()?
43    };
44    Ok(vec_column_desc
45        .into_iter()
46        .map(|col| ColumnCatalog {
47            column_desc: ColumnDesc::from_field_without_column_id(&col),
48            is_hidden: false,
49        })
50        .collect_vec())
51}