risingwave_connector/parser/unified/
maxwell.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::types::{DataType, DatumCow, ScalarRefImpl, ToDatumRef};
16
17use super::{Access, ChangeEvent};
18use crate::parser::unified::ChangeEventOperation;
19use crate::source::SourceColumnDesc;
20
21pub const MAXWELL_INSERT_OP: &str = "insert";
22pub const MAXWELL_UPDATE_OP: &str = "update";
23pub const MAXWELL_DELETE_OP: &str = "delete";
24
25pub struct MaxwellChangeEvent<A>(A);
26
27impl<A> MaxwellChangeEvent<A> {
28    pub fn new(accessor: A) -> Self {
29        Self(accessor)
30    }
31}
32
33impl<A> ChangeEvent for MaxwellChangeEvent<A>
34where
35    A: Access,
36{
37    fn op(&self) -> std::result::Result<super::ChangeEventOperation, super::AccessError> {
38        const OP: &str = "type";
39        if let Some(ScalarRefImpl::Utf8(op)) =
40            self.0.access(&[OP], &DataType::Varchar)?.to_datum_ref()
41        {
42            match op {
43                MAXWELL_INSERT_OP | MAXWELL_UPDATE_OP => return Ok(ChangeEventOperation::Upsert),
44                MAXWELL_DELETE_OP => return Ok(ChangeEventOperation::Delete),
45                _ => (),
46            }
47        }
48        Err(super::AccessError::Undefined {
49            name: "op".into(),
50            path: Default::default(),
51        })
52    }
53
54    fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult<DatumCow<'_>> {
55        const DATA: &str = "data";
56        self.0.access(&[DATA, &desc.name], &desc.data_type)
57    }
58}