risingwave_connector/schema/schema_registry/
mod.rsmod client;
mod util;
pub use client::*;
use risingwave_pb::catalog::SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy;
pub(crate) use util::*;
use super::{invalid_option_error, InvalidOptionError};
pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrategy> {
match value {
"topic_name_strategy" => Some(PbSchemaRegistryNameStrategy::Unspecified),
"record_name_strategy" => Some(PbSchemaRegistryNameStrategy::RecordNameStrategy),
"topic_record_name_strategy" => Some(PbSchemaRegistryNameStrategy::TopicRecordNameStrategy),
_ => None,
}
}
pub fn get_subject_by_strategy(
name_strategy: &PbSchemaRegistryNameStrategy,
topic: &str,
record: Option<&str>,
is_key: bool,
) -> Result<String, InvalidOptionError> {
let record_option_name = if is_key { "key.message" } else { "message" };
let build_error_lack_field = || {
invalid_option_error!(
"{} expect non-empty field {}",
name_strategy.as_str_name(),
record_option_name,
)
};
match name_strategy {
PbSchemaRegistryNameStrategy::Unspecified => {
let suffix = if is_key { "key" } else { "value" };
Ok(format!("{topic}-{suffix}",))
}
PbSchemaRegistryNameStrategy::RecordNameStrategy => {
let record_name = record.ok_or_else(build_error_lack_field)?;
Ok(record_name.to_string())
}
PbSchemaRegistryNameStrategy::TopicRecordNameStrategy => {
let record_name = record.ok_or_else(build_error_lack_field)?;
Ok(format!("{topic}-{record_name}"))
}
}
}