risingwave_connector_codec/decoder/protobuf/
mod.rs1pub mod parser;
16use std::collections::HashSet;
17use std::sync::LazyLock;
18
19use prost_reflect::{DynamicMessage, ReflectMessage};
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::types::{DataType, DatumCow};
22use thiserror_ext::AsReport;
23
24use super::{Access, AccessResult, uncategorized};
25
26pub struct ProtobufAccess<'a> {
27 message: DynamicMessage,
28 messages_as_jsonb: &'a HashSet<String>,
29}
30
31impl<'a> ProtobufAccess<'a> {
32 pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
33 Self {
34 message,
35 messages_as_jsonb,
36 }
37 }
38
39 #[cfg(test)]
40 pub fn descriptor(&self) -> prost_reflect::MessageDescriptor {
41 self.message.descriptor()
42 }
43}
44
45impl Access for ProtobufAccess<'_> {
46 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
47 debug_assert_eq!(1, path.len());
48 let field_desc = self
49 .message
50 .descriptor()
51 .get_field_by_name(path[0])
52 .ok_or_else(|| uncategorized!("protobuf schema don't have field {}", path[0]))
53 .inspect_err(|e| {
54 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
55 LazyLock::new(LogSuppresser::default);
56 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
57 tracing::error!(suppressed_count, "{}", e.as_report());
58 }
59 })?;
60
61 parser::from_protobuf_message_field(
62 &field_desc,
63 &self.message,
64 type_expected,
65 self.messages_as_jsonb,
66 )
67 }
68}