risingwave_frontend/handler/create_source/external_schema/
avro.rs1use super::*;
16
17pub async fn extract_avro_table_schema(
19 info: &StreamSourceInfo,
20 with_properties: &WithOptionsSecResolved,
21 format_encode_options: &mut BTreeMap<String, String>,
22 is_debezium: bool,
23) -> Result<Vec<ColumnCatalog>> {
24 let parser_config = SpecificParserConfig::new(info, with_properties)?;
25 try_consume_schema_registry_config_from_options(format_encode_options);
26 consume_aws_config_from_options(format_encode_options);
27
28 let vec_column_desc = if is_debezium {
29 let conf = DebeziumAvroParserConfig::new(parser_config.encoding_config).await?;
30 conf.map_to_columns()?
31 } else {
32 if let risingwave_connector::parser::EncodingProperties::Avro(avro_props) =
33 &parser_config.encoding_config
34 && matches!(avro_props.schema_location, SchemaLocation::File { .. })
35 && format_encode_options
36 .get("with_deprecated_file_header")
37 .is_none_or(|v| v != "true")
38 {
39 bail_not_implemented!(issue = 12871, "avro without schema registry");
40 }
41 let conf = AvroParserConfig::new(parser_config.encoding_config).await?;
42 conf.map_to_columns()?
43 };
44 Ok(vec_column_desc
45 .into_iter()
46 .map(|col| ColumnCatalog {
47 column_desc: ColumnDesc::from_field_without_column_id(&col),
48 is_hidden: false,
49 })
50 .collect_vec())
51}