risingwave_connector/parser/
access_builder.rsuse super::avro::AvroAccessBuilder;
use super::bytes_parser::BytesAccessBuilder;
use super::simd_json_parser::{DebeziumJsonAccessBuilder, DebeziumMongoJsonAccessBuilder};
use super::unified::AccessImpl;
use super::{
AvroParserConfig, DebeziumAvroAccessBuilder, EncodingProperties, JsonAccessBuilder,
ProtobufAccessBuilder, ProtobufParserConfig,
};
use crate::error::ConnectorResult;
pub trait AccessBuilder {
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>>;
}
#[derive(Debug)]
pub enum AccessBuilderImpl {
Avro(AvroAccessBuilder),
Protobuf(ProtobufAccessBuilder),
Json(JsonAccessBuilder),
Bytes(BytesAccessBuilder),
DebeziumAvro(DebeziumAvroAccessBuilder),
DebeziumJson(DebeziumJsonAccessBuilder),
DebeziumMongoJson(DebeziumMongoJsonAccessBuilder),
}
impl AccessBuilderImpl {
pub async fn new_default(config: EncodingProperties) -> ConnectorResult<Self> {
let accessor = match config {
EncodingProperties::Avro(_) => {
let config = AvroParserConfig::new(config).await?;
AccessBuilderImpl::Avro(AvroAccessBuilder::new(config)?)
}
EncodingProperties::Protobuf(_) => {
let config = ProtobufParserConfig::new(config).await?;
AccessBuilderImpl::Protobuf(ProtobufAccessBuilder::new(config)?)
}
EncodingProperties::Bytes(_) => {
AccessBuilderImpl::Bytes(BytesAccessBuilder::new(config)?)
}
EncodingProperties::Json(config) => {
AccessBuilderImpl::Json(JsonAccessBuilder::new(config)?)
}
_ => unreachable!(),
};
Ok(accessor)
}
pub async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
let accessor = match self {
Self::Avro(builder) => builder.generate_accessor(payload).await?,
Self::Protobuf(builder) => builder.generate_accessor(payload).await?,
Self::Json(builder) => builder.generate_accessor(payload).await?,
Self::Bytes(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumAvro(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumJson(builder) => builder.generate_accessor(payload).await?,
Self::DebeziumMongoJson(builder) => builder.generate_accessor(payload).await?,
};
Ok(accessor)
}
}