risingwave_connector/parser/debezium/
debezium_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_common::bail;
18
19use super::simd_json_parser::DebeziumJsonAccessBuilder;
20use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
21use crate::error::ConnectorResult;
22use crate::parser::unified::debezium::DebeziumChangeEvent;
23use crate::parser::unified::json::{
24    BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
25};
26use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
27use crate::parser::{
28    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
29    ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
30};
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
32
33#[derive(Debug)]
34pub struct DebeziumParser {
35    key_builder: AccessBuilderImpl,
36    payload_builder: AccessBuilderImpl,
37    pub(crate) rw_columns: Vec<SourceColumnDesc>,
38    source_ctx: SourceContextRef,
39
40    props: DebeziumProps,
41}
42
43pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";
44
45#[derive(Debug, Clone, Default)]
46pub struct DebeziumProps {
47    // Ignore the key part of the message.
48    // If enabled, we don't take the key part into message accessor.
49    pub ignore_key: bool,
50}
51
52impl DebeziumProps {
53    pub fn from(props: &BTreeMap<String, String>) -> Self {
54        let ignore_key = props
55            .get(DEBEZIUM_IGNORE_KEY)
56            .map(|v| v.eq_ignore_ascii_case("true"))
57            .unwrap_or(false);
58        Self { ignore_key }
59    }
60}
61
62async fn build_accessor_builder(
63    config: EncodingProperties,
64    encoding_type: EncodingType,
65) -> ConnectorResult<AccessBuilderImpl> {
66    match config {
67        EncodingProperties::Avro(_) => {
68            let config = DebeziumAvroParserConfig::new(config).await?;
69            Ok(AccessBuilderImpl::DebeziumAvro(
70                DebeziumAvroAccessBuilder::new(config, encoding_type)?,
71            ))
72        }
73        EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
74            DebeziumJsonAccessBuilder::new(
75                json_config
76                    .timestamptz_handling
77                    .unwrap_or(TimestamptzHandling::GuessNumberUnit),
78                json_config
79                    .timestamp_handling
80                    .unwrap_or(TimestampHandling::GuessNumberUnit),
81                json_config.time_handling.unwrap_or(TimeHandling::Micro),
82                json_config
83                    .bigint_unsigned_handling
84                    .unwrap_or(BigintUnsignedHandlingMode::Long),
85                json_config.handle_toast_columns,
86            )?,
87        )),
88        _ => bail!("unsupported encoding for Debezium"),
89    }
90}
91
92impl DebeziumParser {
93    pub async fn new(
94        props: SpecificParserConfig,
95        rw_columns: Vec<SourceColumnDesc>,
96        source_ctx: SourceContextRef,
97    ) -> ConnectorResult<Self> {
98        let key_builder =
99            build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
100        let payload_builder =
101            build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
102        let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
103            props
104        } else {
105            unreachable!(
106                "expecting Debezium protocol properties but got {:?}",
107                props.protocol_config
108            )
109        };
110        Ok(Self {
111            key_builder,
112            payload_builder,
113            rw_columns,
114            source_ctx,
115            props: debezium_props,
116        })
117    }
118
119    pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
120        use crate::parser::JsonProperties;
121
122        let props = SpecificParserConfig {
123            encoding_config: EncodingProperties::Json(JsonProperties {
124                use_schema_registry: false,
125                timestamptz_handling: None,
126                timestamp_handling: None,
127                time_handling: None,
128                bigint_unsigned_handling: None,
129                handle_toast_columns: false,
130            }),
131            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
132        };
133        Self::new(props, rw_columns, SourceContext::dummy().into()).await
134    }
135
136    pub async fn parse_inner(
137        &mut self,
138        key: Option<Vec<u8>>,
139        payload: Option<Vec<u8>>,
140        mut writer: SourceStreamChunkRowWriter<'_>,
141    ) -> ConnectorResult<ParseResult> {
142        let meta = writer.source_meta();
143        // tombetone messages are handled implicitly by these accessors
144        let key_accessor = match (key, self.props.ignore_key) {
145            (None, false) => None,
146            (Some(data), false) => Some(self.key_builder.generate_accessor(data, meta).await?),
147            (_, true) => None,
148        };
149        let payload_accessor = match payload {
150            None => None,
151            Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
152        };
153        let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);
154
155        match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
156            Ok(_) => Ok(ParseResult::Rows),
157            Err(err) => {
158                // Only try to access transaction control message if the row operation access failed
159                // to make it a fast path.
160                if let Some(transaction_control) =
161                    row_op.transaction_control(&self.source_ctx.connector_props)
162                {
163                    Ok(ParseResult::TransactionControl(transaction_control))
164                } else {
165                    Err(err)?
166                }
167            }
168        }
169    }
170}
171
172impl ByteStreamSourceParser for DebeziumParser {
173    fn columns(&self) -> &[SourceColumnDesc] {
174        &self.rw_columns
175    }
176
177    fn source_ctx(&self) -> &SourceContext {
178        &self.source_ctx
179    }
180
181    fn parser_format(&self) -> ParserFormat {
182        ParserFormat::Debezium
183    }
184
185    #[allow(clippy::unused_async)] // false positive for `async_trait`
186    async fn parse_one<'a>(
187        &'a mut self,
188        _key: Option<Vec<u8>>,
189        _payload: Option<Vec<u8>>,
190        _writer: SourceStreamChunkRowWriter<'a>,
191    ) -> ConnectorResult<()> {
192        unreachable!("should call `parse_one_with_txn` instead")
193    }
194
195    async fn parse_one_with_txn<'a>(
196        &'a mut self,
197        key: Option<Vec<u8>>,
198        payload: Option<Vec<u8>>,
199        writer: SourceStreamChunkRowWriter<'a>,
200    ) -> ConnectorResult<ParseResult> {
201        self.parse_inner(key, payload, writer).await
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::ops::Deref;
208    use std::sync::Arc;
209
210    use risingwave_common::catalog::{CDC_SOURCE_COLUMN_NUM, ColumnCatalog, ColumnDesc, ColumnId};
211    use risingwave_common::row::Row;
212    use risingwave_common::types::{DataType, Timestamptz};
213    use risingwave_pb::plan_common::{
214        AdditionalColumn, AdditionalColumnTimestamp, additional_column,
215    };
216
217    use super::*;
218    use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
219    use crate::source::{ConnectorProperties, SourceCtrlOpts};
220
221    #[tokio::test]
222    async fn test_parse_transaction_metadata() {
223        let schema = ColumnCatalog::debezium_cdc_source_cols();
224
225        let columns = schema
226            .iter()
227            .map(|c| SourceColumnDesc::from(&c.column_desc))
228            .collect::<Vec<_>>();
229
230        let props = SpecificParserConfig {
231            encoding_config: EncodingProperties::Json(JsonProperties {
232                use_schema_registry: false,
233                timestamptz_handling: None,
234                timestamp_handling: None,
235                time_handling: None,
236                bigint_unsigned_handling: None,
237                handle_toast_columns: false,
238            }),
239            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
240        };
241        let source_ctx = SourceContext {
242            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
243            ..SourceContext::dummy()
244        };
245        let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
246            .await
247            .unwrap();
248        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
249
250        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
251        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
252        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
253        let res = parser
254            .parse_one_with_txn(
255                None,
256                Some(begin_msg.as_bytes().to_vec()),
257                dummy_builder.row_writer(),
258            )
259            .await;
260        match res {
261            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
262                assert_eq!(id.deref(), "35352");
263            }
264            _ => panic!("unexpected parse result: {:?}", res),
265        }
266        let res = parser
267            .parse_one_with_txn(
268                None,
269                Some(commit_msg.as_bytes().to_vec()),
270                dummy_builder.row_writer(),
271            )
272            .await;
273        match res {
274            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
275                assert_eq!(id.deref(), "35352");
276            }
277            _ => panic!("unexpected parse result: {:?}", res),
278        }
279    }
280
281    #[tokio::test]
282    async fn test_parse_additional_columns() {
283        let columns = [
284            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
285            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
286            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
287            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
288            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
289            ColumnDesc::named_with_additional_column(
290                "commit_ts",
291                ColumnId::new(6),
292                DataType::Timestamptz,
293                AdditionalColumn {
294                    column_type: Some(additional_column::ColumnType::Timestamp(
295                        AdditionalColumnTimestamp {},
296                    )),
297                },
298            ),
299        ];
300
301        let columns = columns
302            .iter()
303            .map(SourceColumnDesc::from)
304            .collect::<Vec<_>>();
305
306        let props = SpecificParserConfig {
307            encoding_config: EncodingProperties::Json(JsonProperties {
308                use_schema_registry: false,
309                timestamptz_handling: None,
310                timestamp_handling: None,
311                time_handling: None,
312                bigint_unsigned_handling: None,
313                handle_toast_columns: false,
314            }),
315            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
316        };
317        let source_ctx = SourceContext {
318            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
319            ..SourceContext::dummy()
320        };
321        let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
322            .await
323            .unwrap();
324        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
325
326        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;
327
328        let res = parser
329            .parse_one_with_txn(
330                None,
331                Some(payload.as_bytes().to_vec()),
332                builder.row_writer(),
333            )
334            .await;
335        match res {
336            Ok(ParseResult::Rows) => {
337                builder.finish_current_chunk();
338                let chunk = builder.consume_ready_chunks().next().unwrap();
339                for (_, row) in chunk.rows() {
340                    let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
341                    assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
342                }
343            }
344            _ => panic!("unexpected parse result: {:?}", res),
345        }
346    }
347
348    #[tokio::test]
349    async fn test_cdc_source_job_schema() {
350        let columns = ColumnCatalog::debezium_cdc_source_cols();
351        // make sure it doesn't broken by future PRs
352        assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
353    }
354}