risingwave_connector/parser/unified/
mod.rs1use auto_impl::auto_impl;
18use risingwave_common::types::{DataType, DatumCow};
19use risingwave_connector_codec::decoder::avro::AvroAccess;
20use risingwave_connector_codec::decoder::protobuf::ProtobufAccess;
21pub use risingwave_connector_codec::decoder::{Access, AccessError, AccessResult};
22
23use self::bytes::BytesAccess;
24use self::json::JsonAccess;
25use crate::parser::unified::debezium::MongoJsonAccess;
26use crate::source::SourceColumnDesc;
27
28pub mod bytes;
29pub mod debezium;
30pub mod json;
31pub mod kv_event;
32pub mod maxwell;
33pub mod util;
34
35pub enum AccessImpl<'a> {
36 Avro(AvroAccess<'a>),
37 Bytes(BytesAccess<'a>),
38 Protobuf(ProtobufAccess<'a>),
39 Json(JsonAccess<'a>),
40 MongoJson(MongoJsonAccess<JsonAccess<'a>>),
41}
42
43impl Access for AccessImpl<'_> {
44 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
45 match self {
46 Self::Avro(accessor) => accessor.access(path, type_expected),
47 Self::Bytes(accessor) => accessor.access(path, type_expected),
48 Self::Protobuf(accessor) => accessor.access(path, type_expected),
49 Self::Json(accessor) => accessor.access(path, type_expected),
50 Self::MongoJson(accessor) => accessor.access(path, type_expected),
51 }
52 }
53}
54
55#[derive(Debug, Clone, Copy)]
56pub enum ChangeEventOperation {
57 Upsert, Delete,
59}
60
61#[auto_impl(&)]
63pub trait ChangeEvent {
64 fn op(&self) -> AccessResult<ChangeEventOperation>;
66 fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>>;
68}
69
70impl<A> ChangeEvent for (ChangeEventOperation, A)
71where
72 A: Access,
73{
74 fn op(&self) -> AccessResult<ChangeEventOperation> {
75 Ok(self.0)
76 }
77
78 fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
79 self.1.access(&[desc.name.as_str()], &desc.data_type)
80 }
81}