risingwave_frontend/handler/create_source/external_schema/
avro.rs1use super::*;
16
17pub async fn extract_avro_table_schema(
19 info: &StreamSourceInfo,
20 with_properties: &WithOptionsSecResolved,
21 format_encode_options: &mut BTreeMap<String, String>,
22 is_debezium: bool,
23) -> Result<Vec<ColumnCatalog>> {
24 let parser_config = SpecificParserConfig::new(info, with_properties)?;
25 try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
26 try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
27 consume_aws_config_from_options(format_encode_options);
28
29 let vec_column_desc = if is_debezium {
30 let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
31 conf.map_to_columns()?
32 } else {
33 if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
34 &parser_config.encoding_config
35 && matches!(avro_props.schema_location, SchemaLocation::File { .. })
36 && format_encode_options
37 .get("with_deprecated_file_header")
38 .is_none_or(|v| v != "true")
39 {
40 bail_not_implemented!(issue = 12871, "avro without schema registry");
41 }
42 let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
43 conf.map_to_columns()?
44 };
45 Ok(vec_column_desc
46 .into_iter()
47 .map(|col| ColumnCatalog {
48 column_desc: ColumnDesc::from_field_without_column_id(&col),
49 is_hidden: false,
50 })
51 .collect_vec())
52}