risingwave_connector/parser/debezium/
avro_parser.rs1use std::fmt::Debug;
16use std::sync::Arc;
17
18use anyhow::Context;
19use apache_avro::types::Value;
20use apache_avro::{Schema, from_avro_datum};
21use risingwave_common::catalog::Field;
22use risingwave_common::try_match_expand;
23use risingwave_connector_codec::decoder::avro::{
24 AvroAccess, AvroParseOptions, ResolvedAvroSchema, avro_schema_to_fields,
25 get_nullable_union_inner,
26};
27
28use crate::error::ConnectorResult;
29use crate::parser::avro::ConfluentSchemaCache;
30use crate::parser::unified::AccessImpl;
31use crate::parser::{AccessBuilder, EncodingProperties, EncodingType, SchemaLocation};
32use crate::schema::schema_registry::{
33 Client, extract_schema_id, get_subject_by_strategy, handle_sr_list,
34};
35
36#[derive(Debug)]
37pub struct DebeziumAvroAccessBuilder {
38 reader_schema: ResolvedAvroSchema,
39 schema_resolver: Arc<ConfluentSchemaCache>,
40 value: Option<Value>,
41}
42
43impl AccessBuilder for DebeziumAvroAccessBuilder {
44 async fn generate_accessor(
45 &mut self,
46 payload: Vec<u8>,
47 _: &crate::source::SourceMeta,
48 ) -> ConnectorResult<AccessImpl<'_>> {
49 let (schema_id, mut raw_payload) = extract_schema_id(&payload)?;
50 let writer_schema = self.schema_resolver.get_by_id(schema_id).await?;
51 self.value = Some(from_avro_datum(
52 writer_schema.as_ref(),
53 &mut raw_payload,
54 Some(&self.reader_schema.original_schema),
55 )?);
56 Ok(AccessImpl::Avro(AvroAccess::new(
57 self.value.as_ref().unwrap(),
58 AvroParseOptions::create(&self.reader_schema.original_schema),
59 )))
60 }
61}
62
63impl DebeziumAvroAccessBuilder {
64 pub fn new(
65 config: DebeziumAvroParserConfig,
66 encoding_type: EncodingType,
67 ) -> ConnectorResult<Self> {
68 let DebeziumAvroParserConfig {
69 key_schema,
70 outer_schema,
71 schema_resolver,
72 } = config;
73
74 Ok(Self {
75 reader_schema: ResolvedAvroSchema::create(match encoding_type {
76 EncodingType::Key => key_schema,
77 EncodingType::Value => outer_schema,
78 })?,
79 schema_resolver,
80 value: None,
81 })
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct DebeziumAvroParserConfig {
88 pub key_schema: Arc<Schema>,
89 pub outer_schema: Arc<Schema>,
90 pub schema_resolver: Arc<ConfluentSchemaCache>,
91}
92
93impl DebeziumAvroParserConfig {
94 pub async fn new(encoding_config: EncodingProperties) -> ConnectorResult<Self> {
95 let avro_config = try_match_expand!(encoding_config, EncodingProperties::Avro)?;
96 let SchemaLocation::Confluent {
97 urls: schema_location,
98 client_config,
99 name_strategy,
100 topic: kafka_topic,
101 } = &avro_config.schema_location
102 else {
103 unreachable!()
104 };
105 let url = handle_sr_list(schema_location)?;
106 let client = Client::new(url, client_config)?;
107 let resolver = ConfluentSchemaCache::new(client);
108
109 let key_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, true)?;
110 let val_subject = get_subject_by_strategy(name_strategy, kafka_topic, None, false)?;
111 let key_schema = resolver.get_by_subject(&key_subject).await?;
112 let outer_schema = resolver.get_by_subject(&val_subject).await?;
113
114 Ok(Self {
115 key_schema,
116 outer_schema,
117 schema_resolver: Arc::new(resolver),
118 })
119 }
120
121 pub fn extract_pks(&self) -> ConnectorResult<Vec<Field>> {
122 avro_schema_to_fields(
123 &self.key_schema,
124 None,
126 )
127 .map_err(Into::into)
128 }
129
130 pub fn map_to_columns(&self) -> ConnectorResult<Vec<Field>> {
131 avro_schema_to_fields(
164 extract_debezium_table_schema(&self.outer_schema)?,
167 None,
169 )
170 .map_err(Into::into)
171 }
172}
173
174fn extract_debezium_table_schema(root: &Schema) -> anyhow::Result<&Schema> {
175 let Schema::Record(root_record) = root else {
176 anyhow::bail!("Root schema of debezium shall be a record but got: {root:?}");
177 };
178 let idx = (root_record.lookup.get("before"))
179 .context("Root schema of debezium shall contain \"before\" field.")?;
180 let schema = &root_record.fields[*idx].schema;
181 let Schema::Union(union_schema) = schema else {
183 return Ok(schema);
184 };
185 get_nullable_union_inner(union_schema).context(format!(
186 "illegal avro union schema, expected [null, T], got {:?}",
187 union_schema
188 ))
189}
190
191#[cfg(test)]
192mod tests {
193 use std::io::Read;
194 use std::path::PathBuf;
195
196 use itertools::Itertools;
197 use maplit::{btreemap, convert_args};
198 use risingwave_common::array::Op;
199 use risingwave_common::catalog::ColumnDesc as CatColumnDesc;
200 use risingwave_common::row::{OwnedRow, Row};
201 use risingwave_common::types::{DataType, ScalarImpl};
202 use risingwave_pb::catalog::StreamSourceInfo;
203 use risingwave_pb::plan_common::{PbEncodeType, PbFormatType};
204
205 use super::*;
206 use crate::WithOptionsSecResolved;
207 use crate::parser::{DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig};
208 use crate::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts};
209
210 const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00";
211
212 fn schema_dir() -> String {
213 let dir = PathBuf::from("src/test_data");
214 std::fs::canonicalize(dir)
215 .unwrap()
216 .to_string_lossy()
217 .to_string()
218 }
219
220 async fn parse_one(
221 mut parser: DebeziumParser,
222 columns: Vec<SourceColumnDesc>,
223 payload: Vec<u8>,
224 ) -> Vec<(Op, OwnedRow)> {
225 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
226 parser
227 .parse_inner(None, Some(payload), builder.row_writer())
228 .await
229 .unwrap();
230 builder.finish_current_chunk();
231 let chunk = builder.consume_ready_chunks().next().unwrap();
232 chunk
233 .rows()
234 .map(|(op, row_ref)| (op, row_ref.into_owned_row()))
235 .collect::<Vec<_>>()
236 }
237
238 fn get_outer_schema() -> Schema {
239 let mut outer_schema_str = String::new();
240 let location = schema_dir() + "/debezium_avro_msg_schema.avsc";
241 std::fs::File::open(location)
242 .unwrap()
243 .read_to_string(&mut outer_schema_str)
244 .unwrap();
245 Schema::parse_str(&outer_schema_str).unwrap()
246 }
247
248 #[test]
249 fn test_extract_inner_schema() {
250 let inner_shema_str = r#"{
251 "type": "record",
252 "name": "Value",
253 "namespace": "dbserver1.inventory.customers",
254 "fields": [
255 {
256 "name": "id",
257 "type": "int"
258 },
259 {
260 "name": "first_name",
261 "type": "string"
262 },
263 {
264 "name": "last_name",
265 "type": "string"
266 },
267 {
268 "name": "email",
269 "type": "string"
270 }
271 ]
272}"#;
273
274 let outer_schema = get_outer_schema();
275 let expected_inner_schema = Schema::parse_str(inner_shema_str).unwrap();
276 let extracted_inner_schema = extract_debezium_table_schema(&outer_schema).unwrap();
277 assert_eq!(&expected_inner_schema, extracted_inner_schema);
278 }
279
280 #[test]
281 fn test_get_pk_column_names() {
282 let key_schema_str = r#"{
283 "type": "record",
284 "name": "Key",
285 "namespace": "dbserver1.inventory.customers",
286 "fields": [{
287 "name": "id",
288 "type": "int"
289 }],
290 "connect.name": "dbserver1.inventory.customers.Key"
291}
292"#;
293 let key_schema = Schema::parse_str(key_schema_str).unwrap();
294 let names: Vec<String> = avro_schema_to_fields(&key_schema, None)
295 .unwrap()
296 .drain(..)
297 .map(|d| d.name)
298 .collect();
299 assert_eq!(names, vec!["id".to_owned()])
300 }
301
302 #[test]
303 fn test_ref_avro_type() {
304 let test_schema_str = r#"{
305 "type": "record",
306 "name": "Key",
307 "namespace": "dbserver1.inventory.customers",
308 "fields": [{
309 "name": "id",
310 "type": "int"
311 },
312 {
313 "name": "unconstrained_decimal",
314 "type": [
315 "null",
316 {
317 "type": "record",
318 "name": "VariableScaleDecimal",
319 "namespace": "io.debezium.data",
320 "fields": [
321 {
322 "name": "scale",
323 "type": "int"
324 },
325 {
326 "name": "value",
327 "type": "bytes"
328 }
329 ],
330 "connect.doc": "Variable scaled decimal",
331 "connect.name": "io.debezium.data.VariableScaleDecimal",
332 "connect.version": 1
333 }
334 ],
335 "default": null
336 },
337 {
338 "name": "unconstrained_numeric",
339 "type": [
340 "null",
341 "io.debezium.data.VariableScaleDecimal"
342 ],
343 "default": null
344 }
345 ],
346 "connect.name": "dbserver1.inventory.customers.Key"
347}
348"#;
349 let schema = Schema::parse_str(test_schema_str).unwrap();
350 let columns = avro_schema_to_fields(&schema, None).unwrap();
351 for col in &columns {
352 println!("name = {}, type = {}", col.name, col.data_type);
353 if col.name.contains("unconstrained") {
354 assert_eq!(col.data_type, DataType::Decimal);
355 }
356 }
357 }
358
359 #[test]
360 fn test_map_to_columns() {
361 let outer_schema = get_outer_schema();
362 let columns =
363 avro_schema_to_fields(extract_debezium_table_schema(&outer_schema).unwrap(), None)
364 .unwrap();
365
366 assert_eq!(columns.len(), 4);
367 assert_eq!(Field::new("id", DataType::Int32), columns[0]);
368
369 assert_eq!(Field::new("first_name", DataType::Varchar), columns[1]);
370
371 assert_eq!(Field::new("last_name", DataType::Varchar), columns[2]);
372
373 assert_eq!(Field::new("email", DataType::Varchar), columns[3]);
374 }
375
376 #[ignore]
377 #[tokio::test]
378 async fn test_debezium_avro_parser() -> crate::error::ConnectorResult<()> {
379 let props = convert_args!(btreemap!(
380 "kafka.topic" => "dbserver1.inventory.customers"
381 ));
382 let info = StreamSourceInfo {
383 row_schema_location: "http://127.0.0.1:8081".into(),
384 format: PbFormatType::Debezium.into(),
385 row_encode: PbEncodeType::Avro.into(),
386 ..Default::default()
387 };
388 let parser_config =
389 SpecificParserConfig::new(&info, &WithOptionsSecResolved::without_secrets(props))?;
390 let config = DebeziumAvroParserConfig::new(parser_config.clone().encoding_config).await?;
391 let columns = config
392 .map_to_columns()?
393 .iter()
394 .map(CatColumnDesc::from_field_without_column_id)
395 .map(|c| SourceColumnDesc::from(&c))
396 .collect_vec();
397 let parser = DebeziumParser::new(
398 parser_config,
399 columns.clone(),
400 SourceContext::dummy().into(),
401 )
402 .await?;
403 let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA.to_vec())
404 .await
405 .try_into()
406 .unwrap();
407 assert_eq!(op, Op::Insert);
408 assert_eq!(row[0], Some(ScalarImpl::Int32(1001)));
409 assert_eq!(row[1], Some(ScalarImpl::Utf8("Sally".into())));
410 assert_eq!(row[2], Some(ScalarImpl::Utf8("Thomas".into())));
411 assert_eq!(
412 row[3],
413 Some(ScalarImpl::Utf8("sally.thomas@acme.com".into()))
414 );
415 Ok(())
416 }
417}