risingwave_connector/parser/maxwell/
maxwell_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16
17use crate::error::ConnectorResult;
18use crate::only_parse_payload;
19use crate::parser::unified::maxwell::MaxwellChangeEvent;
20use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
21use crate::parser::{
22    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, ParserFormat,
23    SourceStreamChunkRowWriter, SpecificParserConfig,
24};
25use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
26
27#[derive(Debug)]
28pub struct MaxwellParser {
29    payload_builder: AccessBuilderImpl,
30    pub(crate) rw_columns: Vec<SourceColumnDesc>,
31    source_ctx: SourceContextRef,
32}
33
34impl MaxwellParser {
35    pub async fn new(
36        props: SpecificParserConfig,
37        rw_columns: Vec<SourceColumnDesc>,
38        source_ctx: SourceContextRef,
39    ) -> ConnectorResult<Self> {
40        match props.encoding_config {
41            EncodingProperties::Json(_) => {
42                let payload_builder = AccessBuilderImpl::new_default(props.encoding_config).await?;
43                Ok(Self {
44                    payload_builder,
45                    rw_columns,
46                    source_ctx,
47                })
48            }
49            _ => bail!("unsupported encoding for Maxwell"),
50        }
51    }
52
53    pub async fn parse_inner(
54        &mut self,
55        payload: Vec<u8>,
56        mut writer: SourceStreamChunkRowWriter<'_>,
57    ) -> ConnectorResult<()> {
58        let m = writer.source_meta();
59        let payload_accessor = self.payload_builder.generate_accessor(payload, m).await?;
60        let row_op = MaxwellChangeEvent::new(payload_accessor);
61
62        apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
63    }
64}
65
66impl ByteStreamSourceParser for MaxwellParser {
67    fn columns(&self) -> &[SourceColumnDesc] {
68        &self.rw_columns
69    }
70
71    fn source_ctx(&self) -> &SourceContext {
72        &self.source_ctx
73    }
74
75    fn parser_format(&self) -> ParserFormat {
76        ParserFormat::Maxwell
77    }
78
79    async fn parse_one<'a>(
80        &'a mut self,
81        _key: Option<Vec<u8>>,
82        payload: Option<Vec<u8>>,
83        writer: SourceStreamChunkRowWriter<'a>,
84    ) -> ConnectorResult<()> {
85        // restrict the behaviours since there is no corresponding
86        // key/value test for maxwell yet.
87        only_parse_payload!(self, payload, writer)
88    }
89}