risingwave_connector/schema/schema_registry/
mod.rs1mod client;
16mod util;
17pub use client::*;
18use risingwave_pb::catalog::SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy;
19pub(crate) use util::*;
20
21use super::{InvalidOptionError, invalid_option_error};
22
23pub fn name_strategy_from_str(value: &str) -> Option<PbSchemaRegistryNameStrategy> {
24 match value {
25 "topic_name_strategy" => Some(PbSchemaRegistryNameStrategy::Unspecified),
26 "record_name_strategy" => Some(PbSchemaRegistryNameStrategy::RecordNameStrategy),
27 "topic_record_name_strategy" => Some(PbSchemaRegistryNameStrategy::TopicRecordNameStrategy),
28 _ => None,
29 }
30}
31
32pub fn get_subject_by_strategy(
33 name_strategy: &PbSchemaRegistryNameStrategy,
34 topic: &str,
35 record: Option<&str>,
36 is_key: bool,
37) -> Result<String, InvalidOptionError> {
38 let record_option_name = if is_key { "key.message" } else { "message" };
39 let build_error_lack_field = || {
40 invalid_option_error!(
41 "{} expect non-empty field {}",
42 name_strategy.as_str_name(),
43 record_option_name,
44 )
45 };
46 match name_strategy {
47 PbSchemaRegistryNameStrategy::Unspecified => {
48 let suffix = if is_key { "key" } else { "value" };
50 Ok(format!("{topic}-{suffix}",))
51 }
52 PbSchemaRegistryNameStrategy::RecordNameStrategy => {
53 let record_name = record.ok_or_else(build_error_lack_field)?;
54 Ok(record_name.to_owned())
55 }
56 PbSchemaRegistryNameStrategy::TopicRecordNameStrategy => {
57 let record_name = record.ok_or_else(build_error_lack_field)?;
58 Ok(format!("{topic}-{record_name}"))
59 }
60 }
61}