risingwave_connector/parser/unified/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unified parsers for both normal events or CDC events of multiple message formats
16
17use auto_impl::auto_impl;
18use risingwave_common::types::{DataType, DatumCow};
19use risingwave_connector_codec::decoder::avro::AvroAccess;
20use risingwave_connector_codec::decoder::protobuf::ProtobufAccess;
21pub use risingwave_connector_codec::decoder::{Access, AccessError, AccessResult};
22
23use self::bytes::BytesAccess;
24use self::json::JsonAccess;
25use crate::parser::unified::debezium::MongoJsonAccess;
26use crate::source::SourceColumnDesc;
27
28pub mod bytes;
29pub mod debezium;
30pub mod json;
31pub mod kv_event;
32pub mod maxwell;
33pub mod util;
34
35pub enum AccessImpl<'a> {
36    Avro(AvroAccess<'a>),
37    Bytes(BytesAccess<'a>),
38    Protobuf(ProtobufAccess<'a>),
39    Json(JsonAccess<'a>),
40    MongoJson(MongoJsonAccess<JsonAccess<'a>>),
41}
42
43impl Access for AccessImpl<'_> {
44    fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>> {
45        match self {
46            Self::Avro(accessor) => accessor.access(path, type_expected),
47            Self::Bytes(accessor) => accessor.access(path, type_expected),
48            Self::Protobuf(accessor) => accessor.access(path, type_expected),
49            Self::Json(accessor) => accessor.access(path, type_expected),
50            Self::MongoJson(accessor) => accessor.access(path, type_expected),
51        }
52    }
53}
54
55#[derive(Debug, Clone, Copy)]
56pub enum ChangeEventOperation {
57    Upsert, // Insert or Update
58    Delete,
59}
60
61/// Methods to access a CDC event.
62#[auto_impl(&)]
63pub trait ChangeEvent {
64    /// Access the operation type.
65    fn op(&self) -> AccessResult<ChangeEventOperation>;
66    /// Access the field.
67    fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>>;
68}
69
70impl<A> ChangeEvent for (ChangeEventOperation, A)
71where
72    A: Access,
73{
74    fn op(&self) -> AccessResult<ChangeEventOperation> {
75        Ok(self.0)
76    }
77
78    fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult<DatumCow<'_>> {
79        self.1.access(&[desc.name.as_str()], &desc.data_type)
80    }
81}