risingwave_connector_codec/decoder/protobuf/
mod.rs1pub mod parser;
16use std::borrow::Cow;
17use std::collections::HashSet;
18use std::sync::LazyLock;
19
20use parser::from_protobuf_value;
21use prost_reflect::{DynamicMessage, ReflectMessage};
22use risingwave_common::log::LogSuppresser;
23use risingwave_common::types::{DataType, DatumCow, ToOwnedDatum};
24use thiserror_ext::AsReport;
25
26use super::{Access, AccessResult, uncategorized};
27
28pub struct ProtobufAccess<'a> {
29 message: DynamicMessage,
30 messages_as_jsonb: &'a HashSet<String>,
31}
32
33impl<'a> ProtobufAccess<'a> {
34 pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
35 Self {
36 message,
37 messages_as_jsonb,
38 }
39 }
40
41 #[cfg(test)]
42 pub fn descriptor(&self) -> prost_reflect::MessageDescriptor {
43 self.message.descriptor()
44 }
45}
46
47impl Access for ProtobufAccess<'_> {
48 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
49 debug_assert_eq!(1, path.len());
50 let field_desc = self
51 .message
52 .descriptor()
53 .get_field_by_name(path[0])
54 .ok_or_else(|| uncategorized!("protobuf schema don't have field {}", path[0]))
55 .inspect_err(|e| {
56 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
57 LazyLock::new(LogSuppresser::default);
58 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
59 tracing::error!(suppressed_count, "{}", e.as_report());
60 }
61 })?;
62
63 match self.message.get_field(&field_desc) {
64 Cow::Borrowed(value) => {
65 from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb)
66 }
67
68 Cow::Owned(value) => {
70 from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb)
71 .map(|d| d.to_owned_datum().into())
72 }
73 }
74 }
75}