risingwave_connector/parser/debezium/
debezium_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_common::bail;
18
19use super::simd_json_parser::DebeziumJsonAccessBuilder;
20use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
21use crate::error::ConnectorResult;
22use crate::parser::unified::debezium::DebeziumChangeEvent;
23use crate::parser::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
25use crate::parser::{
26    AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
27    ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
28};
29use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
30
31#[derive(Debug)]
32pub struct DebeziumParser {
33    key_builder: AccessBuilderImpl,
34    payload_builder: AccessBuilderImpl,
35    pub(crate) rw_columns: Vec<SourceColumnDesc>,
36    source_ctx: SourceContextRef,
37
38    props: DebeziumProps,
39}
40
41pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";
42
43#[derive(Debug, Clone, Default)]
44pub struct DebeziumProps {
45    // Ignore the key part of the message.
46    // If enabled, we don't take the key part into message accessor.
47    pub ignore_key: bool,
48}
49
50impl DebeziumProps {
51    pub fn from(props: &BTreeMap<String, String>) -> Self {
52        let ignore_key = props
53            .get(DEBEZIUM_IGNORE_KEY)
54            .map(|v| v.eq_ignore_ascii_case("true"))
55            .unwrap_or(false);
56        Self { ignore_key }
57    }
58}
59
60async fn build_accessor_builder(
61    config: EncodingProperties,
62    encoding_type: EncodingType,
63) -> ConnectorResult<AccessBuilderImpl> {
64    match config {
65        EncodingProperties::Avro(_) => {
66            let config = DebeziumAvroParserConfig::new(config).await?;
67            Ok(AccessBuilderImpl::DebeziumAvro(
68                DebeziumAvroAccessBuilder::new(config, encoding_type)?,
69            ))
70        }
71        EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
72            DebeziumJsonAccessBuilder::new(
73                json_config
74                    .timestamptz_handling
75                    .unwrap_or(TimestamptzHandling::GuessNumberUnit),
76                json_config
77                    .timestamp_handling
78                    .unwrap_or(TimestampHandling::GuessNumberUnit),
79                json_config.time_handling.unwrap_or(TimeHandling::Micro),
80                json_config.handle_toast_columns,
81            )?,
82        )),
83        _ => bail!("unsupported encoding for Debezium"),
84    }
85}
86
87impl DebeziumParser {
88    pub async fn new(
89        props: SpecificParserConfig,
90        rw_columns: Vec<SourceColumnDesc>,
91        source_ctx: SourceContextRef,
92    ) -> ConnectorResult<Self> {
93        let key_builder =
94            build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
95        let payload_builder =
96            build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
97        let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
98            props
99        } else {
100            unreachable!(
101                "expecting Debezium protocol properties but got {:?}",
102                props.protocol_config
103            )
104        };
105        Ok(Self {
106            key_builder,
107            payload_builder,
108            rw_columns,
109            source_ctx,
110            props: debezium_props,
111        })
112    }
113
114    pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
115        use crate::parser::JsonProperties;
116
117        let props = SpecificParserConfig {
118            encoding_config: EncodingProperties::Json(JsonProperties {
119                use_schema_registry: false,
120                timestamptz_handling: None,
121                timestamp_handling: None,
122                time_handling: None,
123                handle_toast_columns: false,
124            }),
125            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
126        };
127        Self::new(props, rw_columns, SourceContext::dummy().into()).await
128    }
129
130    pub async fn parse_inner(
131        &mut self,
132        key: Option<Vec<u8>>,
133        payload: Option<Vec<u8>>,
134        mut writer: SourceStreamChunkRowWriter<'_>,
135    ) -> ConnectorResult<ParseResult> {
136        let meta = writer.source_meta();
137        // tombetone messages are handled implicitly by these accessors
138        let key_accessor = match (key, self.props.ignore_key) {
139            (None, false) => None,
140            (Some(data), false) => Some(self.key_builder.generate_accessor(data, meta).await?),
141            (_, true) => None,
142        };
143        let payload_accessor = match payload {
144            None => None,
145            Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
146        };
147        let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);
148
149        match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
150            Ok(_) => Ok(ParseResult::Rows),
151            Err(err) => {
152                // Only try to access transaction control message if the row operation access failed
153                // to make it a fast path.
154                if let Some(transaction_control) =
155                    row_op.transaction_control(&self.source_ctx.connector_props)
156                {
157                    Ok(ParseResult::TransactionControl(transaction_control))
158                } else {
159                    Err(err)?
160                }
161            }
162        }
163    }
164}
165
166impl ByteStreamSourceParser for DebeziumParser {
167    fn columns(&self) -> &[SourceColumnDesc] {
168        &self.rw_columns
169    }
170
171    fn source_ctx(&self) -> &SourceContext {
172        &self.source_ctx
173    }
174
175    fn parser_format(&self) -> ParserFormat {
176        ParserFormat::Debezium
177    }
178
179    #[allow(clippy::unused_async)] // false positive for `async_trait`
180    async fn parse_one<'a>(
181        &'a mut self,
182        _key: Option<Vec<u8>>,
183        _payload: Option<Vec<u8>>,
184        _writer: SourceStreamChunkRowWriter<'a>,
185    ) -> ConnectorResult<()> {
186        unreachable!("should call `parse_one_with_txn` instead")
187    }
188
189    async fn parse_one_with_txn<'a>(
190        &'a mut self,
191        key: Option<Vec<u8>>,
192        payload: Option<Vec<u8>>,
193        writer: SourceStreamChunkRowWriter<'a>,
194    ) -> ConnectorResult<ParseResult> {
195        self.parse_inner(key, payload, writer).await
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use std::ops::Deref;
202    use std::sync::Arc;
203
204    use risingwave_common::catalog::{CDC_SOURCE_COLUMN_NUM, ColumnCatalog, ColumnDesc, ColumnId};
205    use risingwave_common::row::Row;
206    use risingwave_common::types::{DataType, Timestamptz};
207    use risingwave_pb::plan_common::{
208        AdditionalColumn, AdditionalColumnTimestamp, additional_column,
209    };
210
211    use super::*;
212    use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
213    use crate::source::{ConnectorProperties, SourceCtrlOpts};
214
215    #[tokio::test]
216    async fn test_parse_transaction_metadata() {
217        let schema = ColumnCatalog::debezium_cdc_source_cols();
218
219        let columns = schema
220            .iter()
221            .map(|c| SourceColumnDesc::from(&c.column_desc))
222            .collect::<Vec<_>>();
223
224        let props = SpecificParserConfig {
225            encoding_config: EncodingProperties::Json(JsonProperties {
226                use_schema_registry: false,
227                timestamptz_handling: None,
228                timestamp_handling: None,
229                time_handling: None,
230                handle_toast_columns: false,
231            }),
232            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
233        };
234        let source_ctx = SourceContext {
235            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
236            ..SourceContext::dummy()
237        };
238        let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
239            .await
240            .unwrap();
241        let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
242
243        // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN
244        let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
245        let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
246        let res = parser
247            .parse_one_with_txn(
248                None,
249                Some(begin_msg.as_bytes().to_vec()),
250                dummy_builder.row_writer(),
251            )
252            .await;
253        match res {
254            Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
255                assert_eq!(id.deref(), "35352");
256            }
257            _ => panic!("unexpected parse result: {:?}", res),
258        }
259        let res = parser
260            .parse_one_with_txn(
261                None,
262                Some(commit_msg.as_bytes().to_vec()),
263                dummy_builder.row_writer(),
264            )
265            .await;
266        match res {
267            Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
268                assert_eq!(id.deref(), "35352");
269            }
270            _ => panic!("unexpected parse result: {:?}", res),
271        }
272    }
273
274    #[tokio::test]
275    async fn test_parse_additional_columns() {
276        let columns = vec![
277            ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
278            ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
279            ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
280            ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
281            ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
282            ColumnDesc::named_with_additional_column(
283                "commit_ts",
284                ColumnId::new(6),
285                DataType::Timestamptz,
286                AdditionalColumn {
287                    column_type: Some(additional_column::ColumnType::Timestamp(
288                        AdditionalColumnTimestamp {},
289                    )),
290                },
291            ),
292        ];
293
294        let columns = columns
295            .iter()
296            .map(SourceColumnDesc::from)
297            .collect::<Vec<_>>();
298
299        let props = SpecificParserConfig {
300            encoding_config: EncodingProperties::Json(JsonProperties {
301                use_schema_registry: false,
302                timestamptz_handling: None,
303                timestamp_handling: None,
304                time_handling: None,
305                handle_toast_columns: false,
306            }),
307            protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
308        };
309        let source_ctx = SourceContext {
310            connector_props: ConnectorProperties::PostgresCdc(Box::default()),
311            ..SourceContext::dummy()
312        };
313        let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
314            .await
315            .unwrap();
316        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
317
318        let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;
319
320        let res = parser
321            .parse_one_with_txn(
322                None,
323                Some(payload.as_bytes().to_vec()),
324                builder.row_writer(),
325            )
326            .await;
327        match res {
328            Ok(ParseResult::Rows) => {
329                builder.finish_current_chunk();
330                let chunk = builder.consume_ready_chunks().next().unwrap();
331                for (_, row) in chunk.rows() {
332                    let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
333                    assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
334                }
335            }
336            _ => panic!("unexpected parse result: {:?}", res),
337        }
338    }
339
340    #[tokio::test]
341    async fn test_cdc_source_job_schema() {
342        let columns = ColumnCatalog::debezium_cdc_source_cols();
343        // make sure it doesn't broken by future PRs
344        assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
345    }
346}