risingwave_connector/parser/
upsert_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
17
18use super::bytes_parser::BytesAccessBuilder;
19use super::unified::{AccessImpl, ChangeEventOperation};
20use super::{
21    AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties,
22    SourceStreamChunkRowWriter, SpecificParserConfig,
23};
24use crate::error::ConnectorResult;
25use crate::parser::ParserFormat;
26use crate::parser::unified::kv_event::KvEvent;
27use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
28
29#[derive(Debug)]
30pub struct UpsertParser {
31    key_builder: AccessBuilderImpl,
32    payload_builder: AccessBuilderImpl,
33    pub(crate) rw_columns: Vec<SourceColumnDesc>,
34    source_ctx: SourceContextRef,
35}
36
37async fn build_accessor_builder(config: EncodingProperties) -> ConnectorResult<AccessBuilderImpl> {
38    match config {
39        EncodingProperties::Json(_)
40        | EncodingProperties::Protobuf(_)
41        | EncodingProperties::Avro(_) => Ok(AccessBuilderImpl::new_default(config).await?),
42        _ => bail!("unsupported encoding for Upsert"),
43    }
44}
45
46pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option<String> {
47    columns.iter().find_map(|column| {
48        if matches!(
49            column.additional_column.column_type,
50            Some(AdditionalColumnType::Key(_))
51        ) {
52            Some(column.name.clone())
53        } else {
54            None
55        }
56    })
57}
58
59impl UpsertParser {
60    pub async fn new(
61        props: SpecificParserConfig,
62        rw_columns: Vec<SourceColumnDesc>,
63        source_ctx: SourceContextRef,
64    ) -> ConnectorResult<Self> {
65        // check whether columns has Key as AdditionalColumnType, if so, the key accessor should be
66        // bytes
67        let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
68            // later: if key column has other type other than bytes, build other accessor.
69            // For now, all key columns are bytes
70            AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes(
71                BytesProperties {
72                    column_name: Some(key_column_name),
73                },
74            ))?)
75        } else {
76            unreachable!("format upsert must have key column")
77        };
78        let payload_builder = build_accessor_builder(props.encoding_config).await?;
79        Ok(Self {
80            key_builder,
81            payload_builder,
82            rw_columns,
83            source_ctx,
84        })
85    }
86
87    pub async fn parse_inner(
88        &mut self,
89        key: Option<Vec<u8>>,
90        payload: Option<Vec<u8>>,
91        mut writer: SourceStreamChunkRowWriter<'_>,
92    ) -> ConnectorResult<()> {
93        let meta = writer.source_meta();
94        let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
95        if let Some(data) = key {
96            row_op.with_key(self.key_builder.generate_accessor(data, meta).await?);
97        }
98        // Empty payload of kafka is Some(vec![])
99        let change_event_op;
100        if let Some(data) = payload
101            && !data.is_empty()
102        {
103            row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
104            change_event_op = ChangeEventOperation::Upsert;
105        } else {
106            change_event_op = ChangeEventOperation::Delete;
107        }
108
109        match change_event_op {
110            ChangeEventOperation::Upsert => {
111                let f = |column: &SourceColumnDesc| row_op.access_field::<false>(column);
112                writer.do_insert(f)?
113            }
114            ChangeEventOperation::Delete => {
115                let f = |column: &SourceColumnDesc| row_op.access_field::<true>(column);
116                writer.do_delete(f)?
117            }
118        }
119        Ok(())
120    }
121}
122
123impl ByteStreamSourceParser for UpsertParser {
124    fn columns(&self) -> &[SourceColumnDesc] {
125        &self.rw_columns
126    }
127
128    fn source_ctx(&self) -> &SourceContext {
129        &self.source_ctx
130    }
131
132    fn parser_format(&self) -> ParserFormat {
133        ParserFormat::Upsert
134    }
135
136    async fn parse_one<'a>(
137        &'a mut self,
138        key: Option<Vec<u8>>,
139        payload: Option<Vec<u8>>,
140        writer: SourceStreamChunkRowWriter<'a>,
141    ) -> ConnectorResult<()> {
142        self.parse_inner(key, payload, writer).await
143    }
144}