risingwave_frontend/handler/create_source/external_schema/
avro.rsuse super::*;
pub async fn extract_avro_table_schema(
info: &StreamSourceInfo,
with_properties: &WithOptionsSecResolved,
format_encode_options: &mut BTreeMap<String, String>,
is_debezium: bool,
) -> Result<Vec<ColumnCatalog>> {
let parser_config = SpecificParserConfig::new(info, with_properties)?;
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
consume_aws_config_from_options(format_encode_options);
let vec_column_desc = if is_debezium {
let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
} else {
if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
&parser_config.encoding_config
&& matches!(avro_props.schema_location, SchemaLocation::File { .. })
&& format_encode_options
.get("with_deprecated_file_header")
.is_none_or(|v| v != "true")
{
bail_not_implemented!(issue = 12871, "avro without schema registry");
}
let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
conf.map_to_columns()?
};
Ok(vec_column_desc
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec())
}