risingwave_connector/parser/
access_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::avro::AvroAccessBuilder;
16use super::bytes_parser::BytesAccessBuilder;
17use super::simd_json_parser::{DebeziumJsonAccessBuilder, DebeziumMongoJsonAccessBuilder};
18use super::unified::AccessImpl;
19use super::{
20    AvroParserConfig, DebeziumAvroAccessBuilder, EncodingProperties, JsonAccessBuilder,
21    ProtobufAccessBuilder, ProtobufParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::source::SourceMeta;
25
26/// Parses raw bytes into a specific format (avro, json, protobuf, ...), and then builds an [`Access`](risingwave_connector_codec::decoder::Access) from the parsed data.
27pub trait AccessBuilder {
28    async fn generate_accessor(
29        &mut self,
30        payload: Vec<u8>,
31        source_meta: &SourceMeta,
32    ) -> ConnectorResult<AccessImpl<'_>>;
33}
34
35#[derive(Debug)]
36pub enum AccessBuilderImpl {
37    Avro(AvroAccessBuilder),
38    Protobuf(ProtobufAccessBuilder),
39    Json(JsonAccessBuilder),
40    Bytes(BytesAccessBuilder),
41    DebeziumAvro(DebeziumAvroAccessBuilder),
42    DebeziumJson(DebeziumJsonAccessBuilder),
43    DebeziumMongoJson(DebeziumMongoJsonAccessBuilder),
44}
45
46impl AccessBuilderImpl {
47    pub async fn new_default(config: EncodingProperties) -> ConnectorResult<Self> {
48        let accessor = match config {
49            EncodingProperties::Avro(_) => {
50                let config = AvroParserConfig::new(config).await?;
51                AccessBuilderImpl::Avro(AvroAccessBuilder::new(config)?)
52            }
53            EncodingProperties::Protobuf(_) => {
54                let config = ProtobufParserConfig::new(config).await?;
55                AccessBuilderImpl::Protobuf(ProtobufAccessBuilder::new(config)?)
56            }
57            EncodingProperties::Bytes(_) => {
58                AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?)
59            }
60            EncodingProperties::Json(config) => {
61                AccessBuilderImpl::Json(JsonAccessBuilder::new(config)?)
62            }
63            _ => unreachable!(),
64        };
65        Ok(accessor)
66    }
67
68    pub async fn generate_accessor(
69        &mut self,
70        payload: Vec<u8>,
71        source_meta: &SourceMeta,
72    ) -> ConnectorResult<AccessImpl<'_>> {
73        let accessor = match self {
74            Self::Avro(builder) => builder.generate_accessor(payload, source_meta).await?,
75            Self::Protobuf(builder) => builder.generate_accessor(payload, source_meta).await?,
76            Self::Json(builder) => builder.generate_accessor(payload, source_meta).await?,
77            Self::Bytes(builder) => builder.generate_accessor(payload, source_meta).await?,
78            Self::DebeziumAvro(builder) => builder.generate_accessor(payload, source_meta).await?,
79            Self::DebeziumJson(builder) => builder.generate_accessor(payload, source_meta).await?,
80            Self::DebeziumMongoJson(builder) => {
81                builder.generate_accessor(payload, source_meta).await?
82            }
83        };
84        Ok(accessor)
85    }
86}