risingwave_connector/parser/
access_builder.rs1use super::avro::AvroAccessBuilder;
16use super::bytes_parser::BytesAccessBuilder;
17use super::simd_json_parser::{DebeziumJsonAccessBuilder, DebeziumMongoJsonAccessBuilder};
18use super::unified::AccessImpl;
19use super::{
20 AvroParserConfig, DebeziumAvroAccessBuilder, EncodingProperties, JsonAccessBuilder,
21 ProtobufAccessBuilder, ProtobufParserConfig,
22};
23use crate::error::ConnectorResult;
24use crate::source::SourceMeta;
25
26pub trait AccessBuilder {
28 async fn generate_accessor(
29 &mut self,
30 payload: Vec<u8>,
31 source_meta: &SourceMeta,
32 ) -> ConnectorResult<AccessImpl<'_>>;
33}
34
35#[derive(Debug)]
36pub enum AccessBuilderImpl {
37 Avro(AvroAccessBuilder),
38 Protobuf(ProtobufAccessBuilder),
39 Json(JsonAccessBuilder),
40 Bytes(BytesAccessBuilder),
41 DebeziumAvro(DebeziumAvroAccessBuilder),
42 DebeziumJson(DebeziumJsonAccessBuilder),
43 DebeziumMongoJson(DebeziumMongoJsonAccessBuilder),
44}
45
46impl AccessBuilderImpl {
47 pub async fn new_default(config: EncodingProperties) -> ConnectorResult<Self> {
48 let accessor = match config {
49 EncodingProperties::Avro(_) => {
50 let config = AvroParserConfig::new(config).await?;
51 AccessBuilderImpl::Avro(AvroAccessBuilder::new(config)?)
52 }
53 EncodingProperties::Protobuf(_) => {
54 let config = ProtobufParserConfig::new(config).await?;
55 AccessBuilderImpl::Protobuf(ProtobufAccessBuilder::new(config)?)
56 }
57 EncodingProperties::Bytes(_) => {
58 AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?)
59 }
60 EncodingProperties::Json(config) => {
61 AccessBuilderImpl::Json(JsonAccessBuilder::new(config)?)
62 }
63 _ => unreachable!(),
64 };
65 Ok(accessor)
66 }
67
68 pub async fn generate_accessor(
69 &mut self,
70 payload: Vec<u8>,
71 source_meta: &SourceMeta,
72 ) -> ConnectorResult<AccessImpl<'_>> {
73 let accessor = match self {
74 Self::Avro(builder) => builder.generate_accessor(payload, source_meta).await?,
75 Self::Protobuf(builder) => builder.generate_accessor(payload, source_meta).await?,
76 Self::Json(builder) => builder.generate_accessor(payload, source_meta).await?,
77 Self::Bytes(builder) => builder.generate_accessor(payload, source_meta).await?,
78 Self::DebeziumAvro(builder) => builder.generate_accessor(payload, source_meta).await?,
79 Self::DebeziumJson(builder) => builder.generate_accessor(payload, source_meta).await?,
80 Self::DebeziumMongoJson(builder) => {
81 builder.generate_accessor(payload, source_meta).await?
82 }
83 };
84 Ok(accessor)
85 }
86}