risingwave_connector/parser/
upsert_parser.rs1use risingwave_common::bail;
16use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
17
18use super::bytes_parser::BytesAccessBuilder;
19use super::unified::{AccessImpl, ChangeEventOperation};
20use super::{
21 AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties,
22 SourceStreamChunkRowWriter, SpecificParserConfig,
23};
24use crate::error::ConnectorResult;
25use crate::parser::ParserFormat;
26use crate::parser::unified::kv_event::KvEvent;
27use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
28
29#[derive(Debug)]
30pub struct UpsertParser {
31 key_builder: AccessBuilderImpl,
32 payload_builder: AccessBuilderImpl,
33 pub(crate) rw_columns: Vec<SourceColumnDesc>,
34 source_ctx: SourceContextRef,
35}
36
37async fn build_accessor_builder(config: EncodingProperties) -> ConnectorResult<AccessBuilderImpl> {
38 match config {
39 EncodingProperties::Json(_)
40 | EncodingProperties::Protobuf(_)
41 | EncodingProperties::Avro(_) => Ok(AccessBuilderImpl::new_default(config).await?),
42 _ => bail!("unsupported encoding for Upsert"),
43 }
44}
45
46pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option<String> {
47 columns.iter().find_map(|column| {
48 if matches!(
49 column.additional_column.column_type,
50 Some(AdditionalColumnType::Key(_))
51 ) {
52 Some(column.name.clone())
53 } else {
54 None
55 }
56 })
57}
58
59impl UpsertParser {
60 pub async fn new(
61 props: SpecificParserConfig,
62 rw_columns: Vec<SourceColumnDesc>,
63 source_ctx: SourceContextRef,
64 ) -> ConnectorResult<Self> {
65 let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) {
68 AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes(
71 BytesProperties {
72 column_name: Some(key_column_name),
73 },
74 ))?)
75 } else {
76 unreachable!("format upsert must have key column")
77 };
78 let payload_builder = build_accessor_builder(props.encoding_config).await?;
79 Ok(Self {
80 key_builder,
81 payload_builder,
82 rw_columns,
83 source_ctx,
84 })
85 }
86
87 pub async fn parse_inner(
88 &mut self,
89 key: Option<Vec<u8>>,
90 payload: Option<Vec<u8>>,
91 mut writer: SourceStreamChunkRowWriter<'_>,
92 ) -> ConnectorResult<()> {
93 let meta = writer.source_meta();
94 let mut row_op: KvEvent<AccessImpl<'_>, AccessImpl<'_>> = KvEvent::default();
95 if let Some(data) = key {
96 row_op.with_key(self.key_builder.generate_accessor(data, meta).await?);
97 }
98 let change_event_op;
100 if let Some(data) = payload
101 && !data.is_empty()
102 {
103 row_op.with_value(self.payload_builder.generate_accessor(data, meta).await?);
104 change_event_op = ChangeEventOperation::Upsert;
105 } else {
106 change_event_op = ChangeEventOperation::Delete;
107 }
108
109 match change_event_op {
110 ChangeEventOperation::Upsert => {
111 let f = |column: &SourceColumnDesc| row_op.access_field::<false>(column);
112 writer.do_insert(f)?
113 }
114 ChangeEventOperation::Delete => {
115 let f = |column: &SourceColumnDesc| row_op.access_field::<true>(column);
116 writer.do_delete(f)?
117 }
118 }
119 Ok(())
120 }
121}
122
123impl ByteStreamSourceParser for UpsertParser {
124 fn columns(&self) -> &[SourceColumnDesc] {
125 &self.rw_columns
126 }
127
128 fn source_ctx(&self) -> &SourceContext {
129 &self.source_ctx
130 }
131
132 fn parser_format(&self) -> ParserFormat {
133 ParserFormat::Upsert
134 }
135
136 async fn parse_one<'a>(
137 &'a mut self,
138 key: Option<Vec<u8>>,
139 payload: Option<Vec<u8>>,
140 writer: SourceStreamChunkRowWriter<'a>,
141 ) -> ConnectorResult<()> {
142 self.parse_inner(key, payload, writer).await
143 }
144}