risingwave_frontend/handler/create_source/external_schema/
protobuf.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_connector::parser::PROTOBUF_MESSAGES_AS_JSONB;
16
17use super::*;
18
19/// Map a protobuf schema to a relational schema.
20pub async fn extract_protobuf_table_schema(
21    info: &StreamSourceInfo,
22    with_properties: &WithOptionsSecResolved,
23    format_encode_options: &mut BTreeMap<String, String>,
24) -> Result<Vec<ColumnCatalog>> {
25    let parser_config = SpecificParserConfig::new(info, with_properties)?;
26    try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_USERNAME);
27    try_consume_string_from_options(format_encode_options, SCHEMA_REGISTRY_PASSWORD);
28    try_consume_string_from_options(format_encode_options, PROTOBUF_MESSAGES_AS_JSONB);
29    consume_aws_config_from_options(format_encode_options);
30
31    let conf = ProtobufParserConfig::new(parser_config.encoding_config).await?;
32
33    let column_descs = conf.map_to_columns()?;
34
35    Ok(column_descs
36        .into_iter()
37        .map(|col| ColumnCatalog {
38            column_desc: ColumnDesc::from_field_without_column_id(&col),
39            is_hidden: false,
40        })
41        .collect_vec())
42}