risingwave_connector/parser/debezium/
avro_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::sync::Arc;
17
18use anyhow::Context;
19use apache_avro::types::Value;
20use apache_avro::{Schema, from_avro_datum};
21use risingwave_common::catalog::Field;
22use risingwave_common::try_match_expand;
23use risingwave_connector_codec::decoder::avro::{
24    AvroAccess, AvroParseOptions, ResolvedAvroSchema, avro_schema_to_fields,
25    get_nullable_union_inner,
26};
27
28use crate::error::ConnectorResult;
29use crate::parser::avro::ConfluentSchemaCache;
30use crate::parser::unified::AccessImpl;
31use crate::parser::{AccessBuilder, EncodingProperties, EncodingType, SchemaLocation};
32use crate::schema::schema_registry::{
33    Client, extract_schema_id, get_subject_by_strategy, handle_sr_list,
34};
35
36#[derive(Debug)]
37pub struct DebeziumAvroAccessBuilder {
38    reader_schema: ResolvedAvroSchema,
39    schema_resolver: Arc<ConfluentSchemaCache>,
40    value: Option<Value>,
41}
42
43impl AccessBuilder for DebeziumAvroAccessBuilder {
44    async fn generate_accessor(
45        &mut self,
46        payload: Vec<u8>,
47        _: &crate::source::SourceMeta,
48    ) -> ConnectorResult<AccessImpl<'_>> {
49        let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
50        let writer_schema = self.schema_resolver.get_by_id(schema_id).await?;
51        self.value = Some(from_avro_datum(
52            writer_schema.as_ref(),
53            &mut raw_payload,
54            Some(&self.reader_schema.original_schema),
55        )?);
56        Ok(AccessImpl::Avro(AvroAccess::new(
57            self.value.as_ref().unwrap(),
58            AvroParseOptions::create(&self.reader_schema.original_schema),
59        )))
60    }
61}
62
63impl DebeziumAvroAccessBuilder {
64    pub fn new(
65        config: DebeziumAvroParserConfig,
66        encoding_type: EncodingType,
67    ) -> ConnectorResult<Self> {
68        let DebeziumAvroParserConfig {
69            key_schema,
70            outer_schema,
71            schema_resolver,
72        } = config;
73
74        Ok(Self {
75            reader_schema: ResolvedAvroSchema::create(match encoding_type {
76                EncodingType::Key => key_schema,
77                EncodingType::Value => outer_schema,
78            })?,
79            schema_resolver,
80            value: None,
81        })
82    }
83}
84
85// TODO: avoid duplicated codes with `AvroParser`
86#[derive(Debug, Clone)]
87pub struct DebeziumAvroParserConfig {
88    pub key_schema: Arc<Schema>,
89    pub outer_schema: Arc<Schema>,
90    pub schema_resolver: Arc<ConfluentSchemaCache>,
91}
92
93impl DebeziumAvroParserConfig {
94    pub async fn new(encoding_config: EncodingProperties) -> ConnectorResult<Self> {
95        let avro_config = try_match_expand!(encoding_config, EncodingProperties::Avro)?;
96        let SchemaLocation::Confluent {
97            urls: schema_location,
98            client_config,
99            name_strategy,
100            topic: kafka_topic,
101        } = &avro_config.schema_location
102        else {
103            unreachable!()
104        };
105        let url = handle_sr_list(schema_location)?;
106        let client = Client::new(url, client_config)?;
107        let resolver = ConfluentSchemaCache::new(client);
108
109        let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?;
110        let val_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, false)?;
111        let key_schema = resolver.get_by_subject(&key_subject).await?;
112        let outer_schema = resolver.get_by_subject(&val_subject).await?;
113
114        Ok(Self {
115            key_schema,
116            outer_schema,
117            schema_resolver: Arc::new(resolver),
118        })
119    }
120
121    pub fn extract_pks(&self) -> ConnectorResult<Vec<Field>> {
122        avro_schema_to_fields(
123            &self.key_schema,
124            // TODO: do we need to support map type here?
125            None,
126        )
127        .map_err(Into::into)
128    }
129
130    pub fn map_to_columns(&self) -> ConnectorResult<Vec<Field>> {
131        // Refer to debezium_avro_msg_schema.avsc for how the schema looks like:
132
133        // "fields": [
134        // {
135        //     "name": "before",
136        //     "type": [
137        //         "null",
138        //         {
139        //             "type": "record",
140        //             "name": "Value",
141        //             "fields": [...],
142        //         }
143        //     ],
144        //     "default": null
145        // },
146        // {
147        //     "name": "after",
148        //     "type": [
149        //         "null",
150        //         "Value"
151        //     ],
152        //     "default": null
153        // },
154        // ...]
155
156        // Other fields are:
157        // - source: describes the source metadata for the event
158        // - op
159        // - ts_ms
160        // - transaction
161        // See <https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-events>
162
163        avro_schema_to_fields(
164            // This assumes no external `Ref`s (e.g. "before" referring to "after" or "source").
165            // Internal `Ref`s inside the "before" tree are allowed.
166            extract_debezium_table_schema(&self.outer_schema)?,
167            // TODO: do we need to support map type here?
168            None,
169        )
170        .map_err(Into::into)
171    }
172}
173
174fn extract_debezium_table_schema(root: &Schema) -> anyhow::Result<&Schema> {
175    let Schema::Record(root_record) = root else {
176        anyhow::bail!("Root schema of debezium shall be a record but got: {root:?}");
177    };
178    let idx = (root_record.lookup.get("before"))
179        .context("Root schema of debezium shall contain \"before\" field.")?;
180    let schema = &root_record.fields[*idx].schema;
181    // It is wrapped inside a union to allow null, so we look inside.
182    let Schema::Union(union_schema) = schema else {
183        return Ok(schema);
184    };
185    get_nullable_union_inner(union_schema).context(format!(
186        "illegal avro union schema, expected [null, T], got {:?}",
187        union_schema
188    ))
189}
190
191#[cfg(test)]
192mod tests {
193    use std::io::Read;
194    use std::path::PathBuf;
195
196    use itertools::Itertools;
197    use maplit::{btreemap, convert_args};
198    use risingwave_common::array::Op;
199    use risingwave_common::catalog::ColumnDesc as CatColumnDesc;
200    use risingwave_common::row::{OwnedRow, Row};
201    use risingwave_common::types::{DataType, ScalarImpl};
202    use risingwave_pb::catalog::StreamSourceInfo;
203    use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
204
205    use super::*;
206    use crate::WithOptionsSecResolved;
207    use crate::parser::{DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig};
208    use crate::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
209
210    const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";
211
212    fn schema_dir() -> String {
213        let dir = PathBuf::from("src/test_data");
214        std::fs::canonicalize(dir)
215            .unwrap()
216            .to_string_lossy()
217            .to_string()
218    }
219
220    async fn parse_one(
221        mut parser: DebeziumParser,
222        columns: Vec<SourceColumnDesc>,
223        payload: Vec<u8>,
224    ) -> Vec<(Op, OwnedRow)> {
225        let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
226        parser
227            .parse_inner(None, Some(payload), builder.row_writer())
228            .await
229            .unwrap();
230        builder.finish_current_chunk();
231        let chunk = builder.consume_ready_chunks().next().unwrap();
232        chunk
233            .rows()
234            .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
235            .collect::<Vec<_>>()
236    }
237
238    fn get_outer_schema() -> Schema {
239        let mut outer_schema_str = String::new();
240        let location = schema_dir() + "/debezium_avro_msg_schema.avsc";
241        std::fs::File::open(location)
242            .unwrap()
243            .read_to_string(&mut outer_schema_str)
244            .unwrap();
245        Schema::parse_str(&outer_schema_str).unwrap()
246    }
247
248    #[test]
249    fn test_extract_inner_schema() {
250        let inner_shema_str = r#"{
251    "type": "record",
252    "name": "Value",
253    "namespace": "dbserver1.inventory.customers",
254    "fields": [
255        {
256            "name": "id",
257            "type": "int"
258        },
259        {
260            "name": "first_name",
261            "type": "string"
262        },
263        {
264            "name": "last_name",
265            "type": "string"
266        },
267        {
268            "name": "email",
269            "type": "string"
270        }
271    ]
272}"#;
273
274        let outer_schema = get_outer_schema();
275        let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap();
276        let extracted_inner_schema = extract_debezium_table_schema(&outer_schema).unwrap();
277        assert_eq!(&expected_inner_schema, extracted_inner_schema);
278    }
279
280    #[test]
281    fn test_get_pk_column_names() {
282        let key_schema_str = r#"{
283    "type": "record",
284    "name": "Key",
285    "namespace": "dbserver1.inventory.customers",
286    "fields": [{
287        "name": "id",
288        "type": "int"
289    }],
290    "connect.name": "dbserver1.inventory.customers.Key"
291}
292"#;
293        let key_schema = Schema::parse_str(key_schema_str).unwrap();
294        let names: Vec<String> = avro_schema_to_fields(&key_schema, None)
295            .unwrap()
296            .drain(..)
297            .map(|d| d.name)
298            .collect();
299        assert_eq!(names, vec!["id".to_owned()])
300    }
301
302    #[test]
303    fn test_ref_avro_type() {
304        let test_schema_str = r#"{
305    "type": "record",
306    "name": "Key",
307    "namespace": "dbserver1.inventory.customers",
308    "fields": [{
309        "name": "id",
310        "type": "int"
311    },
312    {
313      "name": "unconstrained_decimal",
314      "type": [
315        "null",
316        {
317          "type": "record",
318          "name": "VariableScaleDecimal",
319          "namespace": "io.debezium.data",
320          "fields": [
321            {
322              "name": "scale",
323              "type": "int"
324            },
325            {
326              "name": "value",
327              "type": "bytes"
328            }
329          ],
330          "connect.doc": "Variable scaled decimal",
331          "connect.name": "io.debezium.data.VariableScaleDecimal",
332          "connect.version": 1
333        }
334      ],
335      "default": null
336    },
337    {
338      "name": "unconstrained_numeric",
339      "type": [
340        "null",
341        "io.debezium.data.VariableScaleDecimal"
342      ],
343      "default": null
344    }
345    ],
346    "connect.name": "dbserver1.inventory.customers.Key"
347}
348"#;
349        let schema = Schema::parse_str(test_schema_str).unwrap();
350        let columns = avro_schema_to_fields(&schema, None).unwrap();
351        for col in &columns {
352            println!("name = {}, type = {}", col.name, col.data_type);
353            if col.name.contains("unconstrained") {
354                assert_eq!(col.data_type, DataType::Decimal);
355            }
356        }
357    }
358
359    #[test]
360    fn test_map_to_columns() {
361        let outer_schema = get_outer_schema();
362        let columns =
363            avro_schema_to_fields(extract_debezium_table_schema(&outer_schema).unwrap(), None)
364                .unwrap();
365
366        assert_eq!(columns.len(), 4);
367        assert_eq!(Field::new("id", DataType::Int32), columns[0]);
368
369        assert_eq!(Field::new("first_name", DataType::Varchar), columns[1]);
370
371        assert_eq!(Field::new("last_name", DataType::Varchar), columns[2]);
372
373        assert_eq!(Field::new("email", DataType::Varchar), columns[3]);
374    }
375
376    #[ignore]
377    #[tokio::test]
378    async fn test_debezium_avro_parser() -> crate::error::ConnectorResult<()> {
379        let props = convert_args!(btreemap!(
380            "kafka.topic" => "dbserver1.inventory.customers"
381        ));
382        let info = StreamSourceInfo {
383            row_schema_location: "http://127.0.0.1:8081".into(),
384            format: PbFormatType::Debezium.into(),
385            row_encode: PbEncodeType::Avro.into(),
386            ..Default::default()
387        };
388        let parser_config =
389            SpecificParserConfig::new(&info, &WithOptionsSecResolved::without_secrets(props))?;
390        let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
391        let columns = config
392            .map_to_columns()?
393            .iter()
394            .map(CatColumnDesc::from_field_without_column_id)
395            .map(|c| SourceColumnDesc::from(&c))
396            .collect_vec();
397        let parser = DebeziumParser::new(
398            parser_config,
399            columns.clone(),
400            SourceContext::dummy().into(),
401        )
402        .await?;
403        let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec())
404            .await
405            .try_into()
406            .unwrap();
407        assert_eq!(op, Op::Insert);
408        assert_eq!(row[0], Some(ScalarImpl::Int32(1001)));
409        assert_eq!(row[1], Some(ScalarImpl::Utf8("Sally".into())));
410        assert_eq!(row[2], Some(ScalarImpl::Utf8("Thomas".into())));
411        assert_eq!(
412            row[3],
413            Some(ScalarImpl::Utf8("sally.thomas@acme.com".into()))
414        );
415        Ok(())
416    }
417}