risingwave_connector/schema/schema_registry/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod client;
16mod util;
17pub use client::*;
18use risingwave_pb::catalog::SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy;
19pub(crate) use util::*;
20
21use super::{InvalidOptionError, invalid_option_error};
22
23pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrategy> {
24    match value {
25        "topic_name_strategy" => Some(PbSchemaRegistryNameStrategy::Unspecified),
26        "record_name_strategy" => Some(PbSchemaRegistryNameStrategy::RecordNameStrategy),
27        "topic_record_name_strategy" => Some(PbSchemaRegistryNameStrategy::TopicRecordNameStrategy),
28        _ => None,
29    }
30}
31
32pub fn get_subject_by_strategy(
33    name_strategy: &PbSchemaRegistryNameStrategy,
34    topic: &str,
35    record: Option<&str>,
36    is_key: bool,
37) -> Result<String, InvalidOptionError> {
38    let record_option_name = if is_key { "key.message" } else { "message" };
39    let build_error_lack_field = || {
40        invalid_option_error!(
41            "{} expect non-empty field {}",
42            name_strategy.as_str_name(),
43            record_option_name,
44        )
45    };
46    match name_strategy {
47        PbSchemaRegistryNameStrategy::Unspecified => {
48            // default behavior
49            let suffix = if is_key { "key" } else { "value" };
50            Ok(format!("{topic}-{suffix}",))
51        }
52        PbSchemaRegistryNameStrategy::RecordNameStrategy => {
53            let record_name = record.ok_or_else(build_error_lack_field)?;
54            Ok(record_name.to_owned())
55        }
56        PbSchemaRegistryNameStrategy::TopicRecordNameStrategy => {
57            let record_name = record.ok_or_else(build_error_lack_field)?;
58            Ok(format!("{topic}-{record_name}"))
59        }
60    }
61}