risingwave_connector_codec/decoder/protobuf/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod parser;
16use std::collections::HashSet;
17use std::sync::LazyLock;
18
19use prost_reflect::{DynamicMessage, ReflectMessage};
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::types::{DataType, DatumCow};
22use thiserror_ext::AsReport;
23
24use super::{Access, AccessResult, uncategorized};
25
26pub struct ProtobufAccess<'a> {
27    message: DynamicMessage,
28    messages_as_jsonb: &'a HashSet<String>,
29}
30
31impl<'a> ProtobufAccess<'a> {
32    pub fn new(message: DynamicMessage, messages_as_jsonb: &'a HashSet<String>) -> Self {
33        Self {
34            message,
35            messages_as_jsonb,
36        }
37    }
38
39    #[cfg(test)]
40    pub fn descriptor(&self) -> prost_reflect::MessageDescriptor {
41        self.message.descriptor()
42    }
43}
44
45impl Access for ProtobufAccess<'_> {
46    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
47        debug_assert_eq!(1, path.len());
48        let field_desc = self
49            .message
50            .descriptor()
51            .get_field_by_name(path[0])
52            .ok_or_else(|| uncategorized!("protobuf schema don't have field {}", path[0]))
53            .inspect_err(|e| {
54                static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
55                    LazyLock::new(LogSuppresser::default);
56                if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
57                    tracing::error!(suppressed_count, "{}", e.as_report());
58                }
59            })?;
60
61        parser::from_protobuf_message_field(
62            &field_desc,
63            &self.message,
64            type_expected,
65            self.messages_as_jsonb,
66        )
67    }
68}