1use std::collections::BTreeMap;
16
17use risingwave_common::bail;
18
19use super::simd_json_parser::DebeziumJsonAccessBuilder;
20use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
21use crate::error::ConnectorResult;
22use crate::parser::unified::debezium::DebeziumChangeEvent;
23use crate::parser::unified::json::{
24 BigintUnsignedHandlingMode, TimeHandling, TimestampHandling, TimestamptzHandling,
25};
26use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
27use crate::parser::{
28 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
29 ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
30};
31use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
32
33#[derive(Debug)]
34pub struct DebeziumParser {
35 key_builder: AccessBuilderImpl,
36 payload_builder: AccessBuilderImpl,
37 pub(crate) rw_columns: Vec<SourceColumnDesc>,
38 source_ctx: SourceContextRef,
39
40 props: DebeziumProps,
41}
42
43pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";
44
45#[derive(Debug, Clone, Default)]
46pub struct DebeziumProps {
47 pub ignore_key: bool,
50}
51
52impl DebeziumProps {
53 pub fn from(props: &BTreeMap<String, String>) -> Self {
54 let ignore_key = props
55 .get(DEBEZIUM_IGNORE_KEY)
56 .map(|v| v.eq_ignore_ascii_case("true"))
57 .unwrap_or(false);
58 Self { ignore_key }
59 }
60}
61
62async fn build_accessor_builder(
63 config: EncodingProperties,
64 encoding_type: EncodingType,
65) -> ConnectorResult<AccessBuilderImpl> {
66 match config {
67 EncodingProperties::Avro(_) => {
68 let config = DebeziumAvroParserConfig::new(config).await?;
69 Ok(AccessBuilderImpl::DebeziumAvro(
70 DebeziumAvroAccessBuilder::new(config, encoding_type)?,
71 ))
72 }
73 EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
74 DebeziumJsonAccessBuilder::new(
75 json_config
76 .timestamptz_handling
77 .unwrap_or(TimestamptzHandling::GuessNumberUnit),
78 json_config
79 .timestamp_handling
80 .unwrap_or(TimestampHandling::GuessNumberUnit),
81 json_config.time_handling.unwrap_or(TimeHandling::Micro),
82 json_config
83 .bigint_unsigned_handling
84 .unwrap_or(BigintUnsignedHandlingMode::Long),
85 json_config.handle_toast_columns,
86 )?,
87 )),
88 _ => bail!("unsupported encoding for Debezium"),
89 }
90}
91
92impl DebeziumParser {
93 pub async fn new(
94 props: SpecificParserConfig,
95 rw_columns: Vec<SourceColumnDesc>,
96 source_ctx: SourceContextRef,
97 ) -> ConnectorResult<Self> {
98 let key_builder =
99 build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
100 let payload_builder =
101 build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
102 let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
103 props
104 } else {
105 unreachable!(
106 "expecting Debezium protocol properties but got {:?}",
107 props.protocol_config
108 )
109 };
110 Ok(Self {
111 key_builder,
112 payload_builder,
113 rw_columns,
114 source_ctx,
115 props: debezium_props,
116 })
117 }
118
119 pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
120 use crate::parser::JsonProperties;
121
122 let props = SpecificParserConfig {
123 encoding_config: EncodingProperties::Json(JsonProperties {
124 use_schema_registry: false,
125 timestamptz_handling: None,
126 timestamp_handling: None,
127 time_handling: None,
128 bigint_unsigned_handling: None,
129 handle_toast_columns: false,
130 }),
131 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
132 };
133 Self::new(props, rw_columns, SourceContext::dummy().into()).await
134 }
135
136 pub async fn parse_inner(
137 &mut self,
138 key: Option<Vec<u8>>,
139 payload: Option<Vec<u8>>,
140 mut writer: SourceStreamChunkRowWriter<'_>,
141 ) -> ConnectorResult<ParseResult> {
142 let meta = writer.source_meta();
143 let key_accessor = match (key, self.props.ignore_key) {
145 (None, false) => None,
146 (Some(data), false) => Some(self.key_builder.generate_accessor(data, meta).await?),
147 (_, true) => None,
148 };
149 let payload_accessor = match payload {
150 None => None,
151 Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
152 };
153 let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);
154
155 match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
156 Ok(_) => Ok(ParseResult::Rows),
157 Err(err) => {
158 if let Some(transaction_control) =
161 row_op.transaction_control(&self.source_ctx.connector_props)
162 {
163 Ok(ParseResult::TransactionControl(transaction_control))
164 } else {
165 Err(err)?
166 }
167 }
168 }
169 }
170}
171
172impl ByteStreamSourceParser for DebeziumParser {
173 fn columns(&self) -> &[SourceColumnDesc] {
174 &self.rw_columns
175 }
176
177 fn source_ctx(&self) -> &SourceContext {
178 &self.source_ctx
179 }
180
181 fn parser_format(&self) -> ParserFormat {
182 ParserFormat::Debezium
183 }
184
185 #[allow(clippy::unused_async)] async fn parse_one<'a>(
187 &'a mut self,
188 _key: Option<Vec<u8>>,
189 _payload: Option<Vec<u8>>,
190 _writer: SourceStreamChunkRowWriter<'a>,
191 ) -> ConnectorResult<()> {
192 unreachable!("should call `parse_one_with_txn` instead")
193 }
194
195 async fn parse_one_with_txn<'a>(
196 &'a mut self,
197 key: Option<Vec<u8>>,
198 payload: Option<Vec<u8>>,
199 writer: SourceStreamChunkRowWriter<'a>,
200 ) -> ConnectorResult<ParseResult> {
201 self.parse_inner(key, payload, writer).await
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use std::ops::Deref;
208 use std::sync::Arc;
209
210 use risingwave_common::catalog::{CDC_SOURCE_COLUMN_NUM, ColumnCatalog, ColumnDesc, ColumnId};
211 use risingwave_common::row::Row;
212 use risingwave_common::types::{DataType, Timestamptz};
213 use risingwave_pb::plan_common::{
214 AdditionalColumn, AdditionalColumnTimestamp, additional_column,
215 };
216
217 use super::*;
218 use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
219 use crate::source::{ConnectorProperties, SourceCtrlOpts};
220
221 #[tokio::test]
222 async fn test_parse_transaction_metadata() {
223 let schema = ColumnCatalog::debezium_cdc_source_cols();
224
225 let columns = schema
226 .iter()
227 .map(|c| SourceColumnDesc::from(&c.column_desc))
228 .collect::<Vec<_>>();
229
230 let props = SpecificParserConfig {
231 encoding_config: EncodingProperties::Json(JsonProperties {
232 use_schema_registry: false,
233 timestamptz_handling: None,
234 timestamp_handling: None,
235 time_handling: None,
236 bigint_unsigned_handling: None,
237 handle_toast_columns: false,
238 }),
239 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
240 };
241 let source_ctx = SourceContext {
242 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
243 ..SourceContext::dummy()
244 };
245 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
246 .await
247 .unwrap();
248 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
249
250 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
252 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
253 let res = parser
254 .parse_one_with_txn(
255 None,
256 Some(begin_msg.as_bytes().to_vec()),
257 dummy_builder.row_writer(),
258 )
259 .await;
260 match res {
261 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
262 assert_eq!(id.deref(), "35352");
263 }
264 _ => panic!("unexpected parse result: {:?}", res),
265 }
266 let res = parser
267 .parse_one_with_txn(
268 None,
269 Some(commit_msg.as_bytes().to_vec()),
270 dummy_builder.row_writer(),
271 )
272 .await;
273 match res {
274 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
275 assert_eq!(id.deref(), "35352");
276 }
277 _ => panic!("unexpected parse result: {:?}", res),
278 }
279 }
280
281 #[tokio::test]
282 async fn test_parse_additional_columns() {
283 let columns = [
284 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
285 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
286 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
287 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
288 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
289 ColumnDesc::named_with_additional_column(
290 "commit_ts",
291 ColumnId::new(6),
292 DataType::Timestamptz,
293 AdditionalColumn {
294 column_type: Some(additional_column::ColumnType::Timestamp(
295 AdditionalColumnTimestamp {},
296 )),
297 },
298 ),
299 ];
300
301 let columns = columns
302 .iter()
303 .map(SourceColumnDesc::from)
304 .collect::<Vec<_>>();
305
306 let props = SpecificParserConfig {
307 encoding_config: EncodingProperties::Json(JsonProperties {
308 use_schema_registry: false,
309 timestamptz_handling: None,
310 timestamp_handling: None,
311 time_handling: None,
312 bigint_unsigned_handling: None,
313 handle_toast_columns: false,
314 }),
315 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
316 };
317 let source_ctx = SourceContext {
318 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
319 ..SourceContext::dummy()
320 };
321 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
322 .await
323 .unwrap();
324 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
325
326 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;
327
328 let res = parser
329 .parse_one_with_txn(
330 None,
331 Some(payload.as_bytes().to_vec()),
332 builder.row_writer(),
333 )
334 .await;
335 match res {
336 Ok(ParseResult::Rows) => {
337 builder.finish_current_chunk();
338 let chunk = builder.consume_ready_chunks().next().unwrap();
339 for (_, row) in chunk.rows() {
340 let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
341 assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
342 }
343 }
344 _ => panic!("unexpected parse result: {:?}", res),
345 }
346 }
347
348 #[tokio::test]
349 async fn test_cdc_source_job_schema() {
350 let columns = ColumnCatalog::debezium_cdc_source_cols();
351 assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
353 }
354}