risingwave_connector_codec/decoder/mod.rs
1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod avro;
16pub mod json;
17pub mod protobuf;
18pub mod utils;
19
20use risingwave_common::error::NotImplemented;
21use risingwave_common::types::{DataType, Datum, DatumCow, ToOwnedDatum};
22use thiserror::Error;
23use thiserror_ext::Macro;
24
25#[derive(Error, Debug, Macro)]
26#[thiserror_ext(macro(mangle, path = "crate::decoder"))]
27pub enum AccessError {
28 #[error("Undefined field `{name}` at `{path}`")]
29 Undefined { name: String, path: String },
30 #[error("Cannot parse value `{value}` with type `{got}` into expected type `{expected}`")]
31 TypeError {
32 expected: String,
33 got: String,
34 value: String,
35 },
36 #[error("Unsupported data type `{ty}`")]
37 UnsupportedType { ty: String },
38
39 /// CDC auto schema change specific error that may include table context
40 #[error("CDC auto schema change error: unsupported data type `{ty}` in table `{table_name}`")]
41 CdcAutoSchemaChangeError { ty: String, table_name: String },
42
43 #[error("Unsupported additional column `{name}`")]
44 UnsupportedAdditionalColumn { name: String },
45
46 #[error("Fail to convert protobuf Any into jsonb: {0}")]
47 ProtobufAnyToJson(#[source] serde_json::Error),
48
49 /// Parquet parser specific errors
50 #[error("Parquet parser error: {message}")]
51 ParquetParser { message: String },
52
53 /// Errors that are not categorized into variants above.
54 #[error("{message}")]
55 Uncategorized { message: String },
56
57 #[error(transparent)]
58 NotImplemented(#[from] NotImplemented),
59 // NOTE: We intentionally don't embed `anyhow::Error` in `AccessError` since it happens
60 // in record-level and it might be too heavy to capture the backtrace
61 // when creating a new `anyhow::Error`.
62}
63
64pub type AccessResult<T = Datum> = std::result::Result<T, AccessError>;
65
66/// Access to a field in the data structure. Created by `AccessBuilder`.
67///
68/// It's the `ENCODE ...` part in `FORMAT ... ENCODE ...`
69pub trait Access {
70 /// Accesses `path` in the data structure (*parsed* Avro/JSON/Protobuf data),
71 /// and then converts it to RisingWave `Datum`.
72 ///
73 /// `type_expected` might or might not be used during the conversion depending on the implementation.
74 ///
75 /// # Path
76 ///
77 /// We usually expect the data (`Access` instance) is a record (struct), and `path` represents field path.
78 /// The data (or part of the data) represents the whole row (`Vec<Datum>`),
79 /// and we use different `path` to access one column at a time.
80 ///
81 /// TODO: the meaning of `path` is a little confusing and maybe over-abstracted.
82 /// `access` does not need to serve arbitrarily deep `path` access, but just "top-level" access.
83 /// The API creates an illusion that arbitrary access is supported, but it's not.
84 /// Perhaps we should separate out another trait like `ToDatum`,
85 /// which only does type mapping, without caring about the path. And `path` itself is only an `enum` instead of `&[&str]`.
86 ///
87 /// What `path` to access is decided by the CDC layer, i.e., the `FORMAT ...` part (`ChangeEvent`).
88 /// e.g.,
89 /// - `DebeziumChangeEvent` accesses `["before", "col_name"]` for value,
90 /// `["source", "db"]`, `["source", "table"]` etc. for additional columns' values,
91 /// `["op"]` for op type.
92 /// - `MaxwellChangeEvent` accesses `["data", "col_name"]` for value, `["type"]` for op type.
93 /// - In the simplest case, for `FORMAT PLAIN/UPSERT` (`KvEvent`), they just access `["col_name"]` for value, and op type is derived.
94 ///
95 /// # Returns
96 ///
97 /// The implementation should prefer to return a borrowed [`DatumRef`](risingwave_common::types::DatumRef)
98 /// through [`DatumCow::Borrowed`] to avoid unnecessary allocation if possible, especially for fields
99 /// with string or bytes data. If that's not the case, it may return an owned [`Datum`] through
100 /// [`DatumCow::Owned`].
101 fn access<'a>(&'a self, path: &[&str], type_expected: &DataType) -> AccessResult<DatumCow<'a>>;
102}
103
104// Note: made an extension trait to disallow implementing or overriding `access_owned`.
105#[easy_ext::ext(AccessExt)]
106impl<A: Access> A {
107 /// Similar to `access`, but always returns an owned [`Datum`]. See [`Access::access`] for more details.
108 ///
109 /// Always prefer calling `access` directly if possible to avoid unnecessary allocation.
110 pub fn access_owned(&self, path: &[&str], type_expected: &DataType) -> AccessResult<Datum> {
111 self.access(path, type_expected)
112 .map(ToOwnedDatum::to_owned_datum)
113 }
114}