1use std::collections::BTreeMap;
16
17use risingwave_common::bail;
18
19use super::simd_json_parser::DebeziumJsonAccessBuilder;
20use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
21use crate::error::ConnectorResult;
22use crate::parser::unified::debezium::DebeziumChangeEvent;
23use crate::parser::unified::json::{
24 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
25};
26use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
27use crate::parser::{
28 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
29 ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
30};
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
32
33#[derive(Debug)]
34pub struct DebeziumParser {
35 key_builder: AccessBuilderImpl,
36 payload_builder: AccessBuilderImpl,
37 pub(crate) rw_columns: Vec<SourceColumnDesc>,
38 source_ctx: SourceContextRef,
39
40 props: DebeziumProps,
41}
42
43pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";
44
45#[derive(Debug, Clone, Default)]
46pub struct DebeziumProps {
47 pub ignore_key: bool,
50}
51
52impl DebeziumProps {
53 pub fn from(props: &BTreeMap<String, String>) -> Self {
54 let ignore_key = props
55 .get(DEBEZIUM_IGNORE_KEY)
56 .map(|v| v.eq_ignore_ascii_case("true"))
57 .unwrap_or(false);
58 Self { ignore_key }
59 }
60}
61
62async fn build_accessor_builder(
63 config: EncodingProperties,
64 encoding_type: EncodingType,
65) -> ConnectorResult<AccessBuilderImpl> {
66 match config {
67 EncodingProperties::Avro(_) => {
68 let config = DebeziumAvroParserConfig::new(config).await?;
69 Ok(AccessBuilderImpl::DebeziumAvro(
70 DebeziumAvroAccessBuilder::new(config, encoding_type)?,
71 ))
72 }
73 EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
74 DebeziumJsonAccessBuilder::new(
75 json_config
76 .timestamptz_handling
77 .unwrap_or(TimestamptzHandling::GuessNumberUnit),
78 json_config
79 .timestamp_handling
80 .unwrap_or(TimestampHandling::GuessNumberUnit),
81 json_config.time_handling.unwrap_or(TimeHandling::Micro),
82 json_config
83 .bigint_unsigned_handling
84 .unwrap_or(BigintUnsignedHandlingMode::Long),
85 json_config.handle_toast_columns,
86 )?,
87 )),
88 _ => bail!("unsupported encoding for Debezium"),
89 }
90}
91
92impl DebeziumParser {
93 pub async fn new(
94 props: SpecificParserConfig,
95 rw_columns: Vec<SourceColumnDesc>,
96 source_ctx: SourceContextRef,
97 ) -> ConnectorResult<Self> {
98 let key_builder =
99 build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
100 let payload_builder =
101 build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
102 let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
103 props
104 } else {
105 unreachable!(
106 "expecting Debezium protocol properties but got {:?}",
107 props.protocol_config
108 )
109 };
110 Ok(Self {
111 key_builder,
112 payload_builder,
113 rw_columns,
114 source_ctx,
115 props: debezium_props,
116 })
117 }
118
119 pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
120 use crate::parser::JsonProperties;
121
122 let props = SpecificParserConfig {
123 encoding_config: EncodingProperties::Json(JsonProperties {
124 use_schema_registry: false,
125 timestamptz_handling: None,
126 timestamp_handling: None,
127 time_handling: None,
128 bigint_unsigned_handling: None,
129 handle_toast_columns: false,
130 }),
131 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
132 };
133 Self::new(props, rw_columns, SourceContext::dummy().into()).await
134 }
135
136 pub async fn parse_inner(
137 &mut self,
138 key: Option<Vec<u8>>,
139 payload: Option<Vec<u8>>,
140 mut writer: SourceStreamChunkRowWriter<'_>,
141 ) -> ConnectorResult<ParseResult> {
142 let meta = writer.source_meta();
143 let key_accessor = match (key, self.props.ignore_key) {
145 (None, false) => None,
146 (Some(data), false) => Some(self.key_builder.generate_accessor(data, meta).await?),
147 (_, true) => None,
148 };
149 let payload_accessor = match payload {
150 None => None,
151 Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
152 };
153 let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);
154
155 match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
156 Ok(_) => Ok(ParseResult::Rows),
157 Err(err) => {
158 if let Some(transaction_control) =
161 row_op.transaction_control(&self.source_ctx.connector_props)
162 {
163 Ok(ParseResult::TransactionControl(transaction_control))
164 } else {
165 Err(err)?
166 }
167 }
168 }
169 }
170}
171
172impl ByteStreamSourceParser for DebeziumParser {
173 fn columns(&self) -> &[SourceColumnDesc] {
174 &self.rw_columns
175 }
176
177 fn source_ctx(&self) -> &SourceContext {
178 &self.source_ctx
179 }
180
181 fn parser_format(&self) -> ParserFormat {
182 ParserFormat::Debezium
183 }
184
185 async fn parse_one<'a>(
186 &'a mut self,
187 _key: Option<Vec<u8>>,
188 _payload: Option<Vec<u8>>,
189 _writer: SourceStreamChunkRowWriter<'a>,
190 ) -> ConnectorResult<()> {
191 unreachable!("should call `parse_one_with_txn` instead")
192 }
193
194 async fn parse_one_with_txn<'a>(
195 &'a mut self,
196 key: Option<Vec<u8>>,
197 payload: Option<Vec<u8>>,
198 writer: SourceStreamChunkRowWriter<'a>,
199 ) -> ConnectorResult<ParseResult> {
200 self.parse_inner(key, payload, writer).await
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use std::ops::Deref;
207 use std::sync::Arc;
208
209 use risingwave_common::catalog::{CDC_SOURCE_COLUMN_NUM, ColumnCatalog, ColumnDesc, ColumnId};
210 use risingwave_common::row::Row;
211 use risingwave_common::types::{DataType, Timestamptz};
212 use risingwave_pb::plan_common::{
213 AdditionalColumn, AdditionalColumnTimestamp, additional_column,
214 };
215
216 use super::*;
217 use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
218 use crate::source::{ConnectorProperties, SourceCtrlOpts};
219
220 #[tokio::test]
221 async fn test_parse_transaction_metadata() {
222 let schema = ColumnCatalog::debezium_cdc_source_cols();
223
224 let columns = schema
225 .iter()
226 .map(|c| SourceColumnDesc::from(&c.column_desc))
227 .collect::<Vec<_>>();
228
229 let props = SpecificParserConfig {
230 encoding_config: EncodingProperties::Json(JsonProperties {
231 use_schema_registry: false,
232 timestamptz_handling: None,
233 timestamp_handling: None,
234 time_handling: None,
235 bigint_unsigned_handling: None,
236 handle_toast_columns: false,
237 }),
238 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
239 };
240 let source_ctx = SourceContext {
241 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
242 ..SourceContext::dummy()
243 };
244 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
245 .await
246 .unwrap();
247 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
248
249 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
251 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
252 let res = parser
253 .parse_one_with_txn(
254 None,
255 Some(begin_msg.as_bytes().to_vec()),
256 dummy_builder.row_writer(),
257 )
258 .await;
259 match res {
260 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
261 assert_eq!(id.deref(), "35352");
262 }
263 _ => panic!("unexpected parse result: {:?}", res),
264 }
265 let res = parser
266 .parse_one_with_txn(
267 None,
268 Some(commit_msg.as_bytes().to_vec()),
269 dummy_builder.row_writer(),
270 )
271 .await;
272 match res {
273 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
274 assert_eq!(id.deref(), "35352");
275 }
276 _ => panic!("unexpected parse result: {:?}", res),
277 }
278 }
279
280 #[tokio::test]
281 async fn test_parse_additional_columns() {
282 let columns = [
283 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
284 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
285 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
286 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
287 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
288 ColumnDesc::named_with_additional_column(
289 "commit_ts",
290 ColumnId::new(6),
291 DataType::Timestamptz,
292 AdditionalColumn {
293 column_type: Some(additional_column::ColumnType::Timestamp(
294 AdditionalColumnTimestamp {},
295 )),
296 },
297 ),
298 ];
299
300 let columns = columns
301 .iter()
302 .map(SourceColumnDesc::from)
303 .collect::<Vec<_>>();
304
305 let props = SpecificParserConfig {
306 encoding_config: EncodingProperties::Json(JsonProperties {
307 use_schema_registry: false,
308 timestamptz_handling: None,
309 timestamp_handling: None,
310 time_handling: None,
311 bigint_unsigned_handling: None,
312 handle_toast_columns: false,
313 }),
314 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
315 };
316 let source_ctx = SourceContext {
317 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
318 ..SourceContext::dummy()
319 };
320 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
321 .await
322 .unwrap();
323 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
324
325 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;
326
327 let res = parser
328 .parse_one_with_txn(
329 None,
330 Some(payload.as_bytes().to_vec()),
331 builder.row_writer(),
332 )
333 .await;
334 match res {
335 Ok(ParseResult::Rows) => {
336 builder.finish_current_chunk();
337 let chunk = builder.consume_ready_chunks().next().unwrap();
338 for (_, row) in chunk.rows() {
339 let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
340 assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
341 }
342 }
343 _ => panic!("unexpected parse result: {:?}", res),
344 }
345 }
346
347 #[tokio::test]
348 async fn test_cdc_source_job_schema() {
349 let columns = ColumnCatalog::debezium_cdc_source_cols();
350 assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
352 }
353}