1use std::collections::BTreeMap;
16
17use risingwave_common::bail;
18
19use super::simd_json_parser::DebeziumJsonAccessBuilder;
20use super::{DebeziumAvroAccessBuilder, DebeziumAvroParserConfig};
21use crate::error::ConnectorResult;
22use crate::parser::unified::debezium::DebeziumChangeEvent;
23use crate::parser::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer;
25use crate::parser::{
26 AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, ParseResult,
27 ParserFormat, ProtocolProperties, SourceStreamChunkRowWriter, SpecificParserConfig,
28};
29use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
30
31#[derive(Debug)]
32pub struct DebeziumParser {
33 key_builder: AccessBuilderImpl,
34 payload_builder: AccessBuilderImpl,
35 pub(crate) rw_columns: Vec<SourceColumnDesc>,
36 source_ctx: SourceContextRef,
37
38 props: DebeziumProps,
39}
40
41pub const DEBEZIUM_IGNORE_KEY: &str = "ignore_key";
42
43#[derive(Debug, Clone, Default)]
44pub struct DebeziumProps {
45 pub ignore_key: bool,
48}
49
50impl DebeziumProps {
51 pub fn from(props: &BTreeMap<String, String>) -> Self {
52 let ignore_key = props
53 .get(DEBEZIUM_IGNORE_KEY)
54 .map(|v| v.eq_ignore_ascii_case("true"))
55 .unwrap_or(false);
56 Self { ignore_key }
57 }
58}
59
60async fn build_accessor_builder(
61 config: EncodingProperties,
62 encoding_type: EncodingType,
63) -> ConnectorResult<AccessBuilderImpl> {
64 match config {
65 EncodingProperties::Avro(_) => {
66 let config = DebeziumAvroParserConfig::new(config).await?;
67 Ok(AccessBuilderImpl::DebeziumAvro(
68 DebeziumAvroAccessBuilder::new(config, encoding_type)?,
69 ))
70 }
71 EncodingProperties::Json(json_config) => Ok(AccessBuilderImpl::DebeziumJson(
72 DebeziumJsonAccessBuilder::new(
73 json_config
74 .timestamptz_handling
75 .unwrap_or(TimestamptzHandling::GuessNumberUnit),
76 json_config
77 .timestamp_handling
78 .unwrap_or(TimestampHandling::GuessNumberUnit),
79 json_config.time_handling.unwrap_or(TimeHandling::Micro),
80 json_config.handle_toast_columns,
81 )?,
82 )),
83 _ => bail!("unsupported encoding for Debezium"),
84 }
85}
86
87impl DebeziumParser {
88 pub async fn new(
89 props: SpecificParserConfig,
90 rw_columns: Vec<SourceColumnDesc>,
91 source_ctx: SourceContextRef,
92 ) -> ConnectorResult<Self> {
93 let key_builder =
94 build_accessor_builder(props.encoding_config.clone(), EncodingType::Key).await?;
95 let payload_builder =
96 build_accessor_builder(props.encoding_config, EncodingType::Value).await?;
97 let debezium_props = if let ProtocolProperties::Debezium(props) = props.protocol_config {
98 props
99 } else {
100 unreachable!(
101 "expecting Debezium protocol properties but got {:?}",
102 props.protocol_config
103 )
104 };
105 Ok(Self {
106 key_builder,
107 payload_builder,
108 rw_columns,
109 source_ctx,
110 props: debezium_props,
111 })
112 }
113
114 pub async fn new_for_test(rw_columns: Vec<SourceColumnDesc>) -> ConnectorResult<Self> {
115 use crate::parser::JsonProperties;
116
117 let props = SpecificParserConfig {
118 encoding_config: EncodingProperties::Json(JsonProperties {
119 use_schema_registry: false,
120 timestamptz_handling: None,
121 timestamp_handling: None,
122 time_handling: None,
123 handle_toast_columns: false,
124 }),
125 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
126 };
127 Self::new(props, rw_columns, SourceContext::dummy().into()).await
128 }
129
130 pub async fn parse_inner(
131 &mut self,
132 key: Option<Vec<u8>>,
133 payload: Option<Vec<u8>>,
134 mut writer: SourceStreamChunkRowWriter<'_>,
135 ) -> ConnectorResult<ParseResult> {
136 let meta = writer.source_meta();
137 let key_accessor = match (key, self.props.ignore_key) {
139 (None, false) => None,
140 (Some(data), false) => Some(self.key_builder.generate_accessor(data, meta).await?),
141 (_, true) => None,
142 };
143 let payload_accessor = match payload {
144 None => None,
145 Some(data) => Some(self.payload_builder.generate_accessor(data, meta).await?),
146 };
147 let row_op = DebeziumChangeEvent::new(key_accessor, payload_accessor);
148
149 match apply_row_operation_on_stream_chunk_writer(&row_op, &mut writer) {
150 Ok(_) => Ok(ParseResult::Rows),
151 Err(err) => {
152 if let Some(transaction_control) =
155 row_op.transaction_control(&self.source_ctx.connector_props)
156 {
157 Ok(ParseResult::TransactionControl(transaction_control))
158 } else {
159 Err(err)?
160 }
161 }
162 }
163 }
164}
165
166impl ByteStreamSourceParser for DebeziumParser {
167 fn columns(&self) -> &[SourceColumnDesc] {
168 &self.rw_columns
169 }
170
171 fn source_ctx(&self) -> &SourceContext {
172 &self.source_ctx
173 }
174
175 fn parser_format(&self) -> ParserFormat {
176 ParserFormat::Debezium
177 }
178
179 #[allow(clippy::unused_async)] async fn parse_one<'a>(
181 &'a mut self,
182 _key: Option<Vec<u8>>,
183 _payload: Option<Vec<u8>>,
184 _writer: SourceStreamChunkRowWriter<'a>,
185 ) -> ConnectorResult<()> {
186 unreachable!("should call `parse_one_with_txn` instead")
187 }
188
189 async fn parse_one_with_txn<'a>(
190 &'a mut self,
191 key: Option<Vec<u8>>,
192 payload: Option<Vec<u8>>,
193 writer: SourceStreamChunkRowWriter<'a>,
194 ) -> ConnectorResult<ParseResult> {
195 self.parse_inner(key, payload, writer).await
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use std::ops::Deref;
202 use std::sync::Arc;
203
204 use risingwave_common::catalog::{CDC_SOURCE_COLUMN_NUM, ColumnCatalog, ColumnDesc, ColumnId};
205 use risingwave_common::row::Row;
206 use risingwave_common::types::{DataType, Timestamptz};
207 use risingwave_pb::plan_common::{
208 AdditionalColumn, AdditionalColumnTimestamp, additional_column,
209 };
210
211 use super::*;
212 use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl};
213 use crate::source::{ConnectorProperties, SourceCtrlOpts};
214
215 #[tokio::test]
216 async fn test_parse_transaction_metadata() {
217 let schema = ColumnCatalog::debezium_cdc_source_cols();
218
219 let columns = schema
220 .iter()
221 .map(|c| SourceColumnDesc::from(&c.column_desc))
222 .collect::<Vec<_>>();
223
224 let props = SpecificParserConfig {
225 encoding_config: EncodingProperties::Json(JsonProperties {
226 use_schema_registry: false,
227 timestamptz_handling: None,
228 timestamp_handling: None,
229 time_handling: None,
230 handle_toast_columns: false,
231 }),
232 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
233 };
234 let source_ctx = SourceContext {
235 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
236 ..SourceContext::dummy()
237 };
238 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
239 .await
240 .unwrap();
241 let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
242
243 let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#;
245 let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"35352:3962950064","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#;
246 let res = parser
247 .parse_one_with_txn(
248 None,
249 Some(begin_msg.as_bytes().to_vec()),
250 dummy_builder.row_writer(),
251 )
252 .await;
253 match res {
254 Ok(ParseResult::TransactionControl(TransactionControl::Begin { id })) => {
255 assert_eq!(id.deref(), "35352");
256 }
257 _ => panic!("unexpected parse result: {:?}", res),
258 }
259 let res = parser
260 .parse_one_with_txn(
261 None,
262 Some(commit_msg.as_bytes().to_vec()),
263 dummy_builder.row_writer(),
264 )
265 .await;
266 match res {
267 Ok(ParseResult::TransactionControl(TransactionControl::Commit { id })) => {
268 assert_eq!(id.deref(), "35352");
269 }
270 _ => panic!("unexpected parse result: {:?}", res),
271 }
272 }
273
274 #[tokio::test]
275 async fn test_parse_additional_columns() {
276 let columns = vec![
277 ColumnDesc::named("O_ORDERKEY", ColumnId::new(1), DataType::Int64),
278 ColumnDesc::named("O_CUSTKEY", ColumnId::new(2), DataType::Int64),
279 ColumnDesc::named("O_ORDERSTATUS", ColumnId::new(3), DataType::Varchar),
280 ColumnDesc::named("O_TOTALPRICE", ColumnId::new(4), DataType::Decimal),
281 ColumnDesc::named("O_ORDERDATE", ColumnId::new(5), DataType::Date),
282 ColumnDesc::named_with_additional_column(
283 "commit_ts",
284 ColumnId::new(6),
285 DataType::Timestamptz,
286 AdditionalColumn {
287 column_type: Some(additional_column::ColumnType::Timestamp(
288 AdditionalColumnTimestamp {},
289 )),
290 },
291 ),
292 ];
293
294 let columns = columns
295 .iter()
296 .map(SourceColumnDesc::from)
297 .collect::<Vec<_>>();
298
299 let props = SpecificParserConfig {
300 encoding_config: EncodingProperties::Json(JsonProperties {
301 use_schema_registry: false,
302 timestamptz_handling: None,
303 timestamp_handling: None,
304 time_handling: None,
305 handle_toast_columns: false,
306 }),
307 protocol_config: ProtocolProperties::Debezium(DebeziumProps::default()),
308 };
309 let source_ctx = SourceContext {
310 connector_props: ConnectorProperties::PostgresCdc(Box::default()),
311 ..SourceContext::dummy()
312 };
313 let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx))
314 .await
315 .unwrap();
316 let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test());
317
318 let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#;
319
320 let res = parser
321 .parse_one_with_txn(
322 None,
323 Some(payload.as_bytes().to_vec()),
324 builder.row_writer(),
325 )
326 .await;
327 match res {
328 Ok(ParseResult::Rows) => {
329 builder.finish_current_chunk();
330 let chunk = builder.consume_ready_chunks().next().unwrap();
331 for (_, row) in chunk.rows() {
332 let commit_ts = row.datum_at(5).unwrap().into_timestamptz();
333 assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap());
334 }
335 }
336 _ => panic!("unexpected parse result: {:?}", res),
337 }
338 }
339
340 #[tokio::test]
341 async fn test_cdc_source_job_schema() {
342 let columns = ColumnCatalog::debezium_cdc_source_cols();
343 assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32);
345 }
346}