risingwave_connector_codec/decoder/protobuf/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parser;
16use std::borrow::Cow;
17use std::collections::HashSet;
18use std::sync::LazyLock;
19
20use parser::from_protobuf_value;
21use prost_reflect::{DynamicMessage, ReflectMessage};
22use risingwave_common::log::LogSuppresser;
23use risingwave_common::types::{DataType, DatumCow, ToOwnedDatum};
24use thiserror_ext::AsReport;
25
26use super::{Access, AccessResult, uncategorized};
27
28pub struct ProtobufAccess<'a> {
29    message: DynamicMessage,
30    messages_as_jsonb: &'a HashSet<String>,
31}
32
33impl<'a> ProtobufAccess<'a> {
34    pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
35        Self {
36            message,
37            messages_as_jsonb,
38        }
39    }
40
41    #[cfg(test)]
42    pub fn descriptor(&self) -> prost_reflect::MessageDescriptor {
43        self.message.descriptor()
44    }
45}
46
47impl Access for ProtobufAccess<'_> {
48    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
49        debug_assert_eq!(1, path.len());
50        let field_desc = self
51            .message
52            .descriptor()
53            .get_field_by_name(path[0])
54            .ok_or_else(|| uncategorized!("protobuf schema don't have field {}", path[0]))
55            .inspect_err(|e| {
56                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
57                    LazyLock::new(LogSuppresser::default);
58                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
59                    tracing::error!(suppressed_count, "{}", e.as_report());
60                }
61            })?;
62
63        match self.message.get_field(&field_desc) {
64            Cow::Borrowed(value) => {
65                from_protobuf_value(&field_desc, value, type_expected, self.messages_as_jsonb)
66            }
67
68            // `Owned` variant occurs only if there's no such field and the default value is returned.
69            Cow::Owned(value) => {
70                from_protobuf_value(&field_desc, &value, type_expected, self.messages_as_jsonb)
71                    .map(|d| d.to_owned_datum().into())
72            }
73        }
74    }
75}