risingwave_connector/parser/unified/
mod.rsuse auto_impl::auto_impl;
use risingwave_common::types::{DataType, DatumCow};
use risingwave_connector_codec::decoder::avro::AvroAccess;
use risingwave_connector_codec::decoder::protobuf::ProtobufAccess;
pub use risingwave_connector_codec::decoder::{Access, AccessError, AccessResult};
use self::bytes::BytesAccess;
use self::json::JsonAccess;
use crate::parser::unified::debezium::MongoJsonAccess;
use crate::source::SourceColumnDesc;
pub mod bytes;
pub mod debezium;
pub mod json;
pub mod kv_event;
pub mod maxwell;
pub mod util;
pub enum AccessImpl<'a> {
Avro(AvroAccess<'a>),
Bytes(BytesAccess<'a>),
Protobuf(ProtobufAccess),
Json(JsonAccess<'a>),
MongoJson(MongoJsonAccess<JsonAccess<'a>>),
}
impl Access for AccessImpl<'_> {
fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
match self {
Self::Avro(accessor) => accessor.access(path, type_expected),
Self::Bytes(accessor) => accessor.access(path, type_expected),
Self::Protobuf(accessor) => accessor.access(path, type_expected),
Self::Json(accessor) => accessor.access(path, type_expected),
Self::MongoJson(accessor) => accessor.access(path, type_expected),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum ChangeEventOperation {
Upsert, Delete,
}
#[auto_impl(&)]
pub trait ChangeEvent {
fn op(&self) -> AccessResult<ChangeEventOperation>;
fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>>;
}
impl<A> ChangeEvent for (ChangeEventOperation, A)
where
A: Access,
{
fn op(&self) -> AccessResult<ChangeEventOperation> {
Ok(self.0)
}
fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
self.1.access(&[desc.name.as_str()], &desc.data_type)
}
}