risingwave_connector/parser/maxwell/
maxwell_parser.rs1use risingwave_common::bail;
16
17use crate::error::ConnectorResult;
18use crate::only_parse_payload;
19use crate::parser::unified::maxwell::MaxwellChangeEvent;
20use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
21use crate::parser::{
22 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, ParserFormat,
23 SourceStreamChunkRowWriter, SpecificParserConfig,
24};
25use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
26
27#[derive(Debug)]
28pub struct MaxwellParser {
29 payload_builder: AccessBuilderImpl,
30 pub(crate) rw_columns: Vec<SourceColumnDesc>,
31 source_ctx: SourceContextRef,
32}
33
34impl MaxwellParser {
35 pub async fn new(
36 props: SpecificParserConfig,
37 rw_columns: Vec<SourceColumnDesc>,
38 source_ctx: SourceContextRef,
39 ) -> ConnectorResult<Self> {
40 match props.encoding_config {
41 EncodingProperties::Json(_) => {
42 let payload_builder = AccessBuilderImpl::new_default(props.encoding_config).await?;
43 Ok(Self {
44 payload_builder,
45 rw_columns,
46 source_ctx,
47 })
48 }
49 _ => bail!("unsupported encoding for Maxwell"),
50 }
51 }
52
53 pub async fn parse_inner(
54 &mut self,
55 payload: Vec<u8>,
56 mut writer: SourceStreamChunkRowWriter<'_>,
57 ) -> ConnectorResult<()> {
58 let m = writer.source_meta();
59 let payload_accessor = self.payload_builder.generate_accessor(payload, m).await?;
60 let row_op = MaxwellChangeEvent::new(payload_accessor);
61
62 apply_row_operation_on_stream_chunk_writer(row_op, &mut writer).map_err(Into::into)
63 }
64}
65
66impl ByteStreamSourceParser for MaxwellParser {
67 fn columns(&self) -> &[SourceColumnDesc] {
68 &self.rw_columns
69 }
70
71 fn source_ctx(&self) -> &SourceContext {
72 &self.source_ctx
73 }
74
75 fn parser_format(&self) -> ParserFormat {
76 ParserFormat::Maxwell
77 }
78
79 async fn parse_one<'a>(
80 &'a mut self,
81 _key: Option<Vec<u8>>,
82 payload: Option<Vec<u8>>,
83 writer: SourceStreamChunkRowWriter<'a>,
84 ) -> ConnectorResult<()> {
85 only_parse_payload!(self, payload, writer)
88 }
89}