risingwave_connector/parser/
mod.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppresser;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::postgres_row_to_owned_row;
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{JsonAccess, TimestamptzHandling};
45use self::upsert_parser::UpsertParser;
46use crate::error::ConnectorResult;
47use crate::parser::maxwell::MaxwellParser;
48use crate::schema::schema_registry::SchemaRegistryAuth;
49use crate::source::monitor::GLOBAL_SOURCE_METRICS;
50use crate::source::{
51 BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
52 SourceContextRef, SourceCtrlOpts, SourceMeta,
53};
54
55mod access_builder;
56pub mod additional_columns;
57mod avro;
58mod bytes_parser;
59mod canal;
60mod chunk_builder;
61mod config;
62mod csv_parser;
63mod debezium;
64mod json_parser;
65mod maxwell;
66mod mysql;
67pub mod parquet_parser;
68pub mod plain_parser;
69mod postgres;
70mod protobuf;
71pub mod scalar_adapter;
72mod sql_server;
73mod unified;
74mod upsert_parser;
75mod utils;
76
77use access_builder::{AccessBuilder, AccessBuilderImpl};
78pub use config::*;
79pub use debezium::DEBEZIUM_IGNORE_KEY;
80use debezium::schema_change::SchemaChangeEnvelope;
81pub use unified::{AccessError, AccessResult};
82
83#[derive(Clone, Copy, Debug)]
87pub struct MessageMeta<'a> {
88 source_meta: &'a SourceMeta,
89 split_id: &'a str,
90 offset: &'a str,
91}
92
93impl<'a> MessageMeta<'a> {
94 fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
98 let datum: DatumRef<'_> = match desc.column_type {
99 SourceColumnType::RowId => None,
102 SourceColumnType::Offset => Some(self.offset.into()),
104 SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
106 assert_eq!(
107 desc.name.as_str(),
108 KAFKA_TIMESTAMP_COLUMN_NAME,
109 "unexpected kafka meta column name"
110 );
111 kafka_meta.extract_timestamp()
112 }
113 SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
114 assert_eq!(
115 desc.name.as_str(),
116 CDC_TABLE_NAME_COLUMN_NAME,
117 "unexpected cdc meta column name"
118 );
119 Some(cdc_meta.full_table_name.as_str().into())
120 }
121
122 SourceColumnType::Meta | SourceColumnType::Normal => return None,
124 };
125
126 datum
127 }
128}
129
130#[derive(Debug)]
132pub enum TransactionControl {
133 Begin { id: Box<str> },
134 Commit { id: Box<str> },
135}
136
137#[derive(Debug)]
139pub enum ParseResult {
140 Rows,
142 TransactionControl(TransactionControl),
144
145 SchemaChange(SchemaChangeEnvelope),
147}
148
149#[derive(Clone, Copy, Debug, PartialEq)]
150pub enum ParserFormat {
151 CanalJson,
152 Csv,
153 Json,
154 Maxwell,
155 Debezium,
156 DebeziumMongo,
157 Upsert,
158 Plain,
159}
160
161pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
166 fn columns(&self) -> &[SourceColumnDesc];
168
169 fn source_ctx(&self) -> &SourceContext;
171
172 fn parser_format(&self) -> ParserFormat;
174
175 fn parse_one<'a>(
179 &'a mut self,
180 key: Option<Vec<u8>>,
181 payload: Option<Vec<u8>>,
182 writer: SourceStreamChunkRowWriter<'a>,
183 ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
184
185 fn parse_one_with_txn<'a>(
193 &'a mut self,
194 key: Option<Vec<u8>>,
195 payload: Option<Vec<u8>>,
196 writer: SourceStreamChunkRowWriter<'a>,
197 ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
198 self.parse_one(key, payload, writer)
199 .map_ok(|_| ParseResult::Rows)
200 }
201}
202
203#[easy_ext::ext(SourceParserIntoStreamExt)]
204impl<P: ByteStreamSourceParser> P {
205 pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
217 let actor_id = self.source_ctx().actor_id;
218 let source_id = self.source_ctx().source_id.table_id();
219
220 let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
223 parse_message_stream(self, msg_stream, source_ctrl_opts)
224 .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
225 }
226}
227
228#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
231async fn parse_message_stream<P: ByteStreamSourceParser>(
232 mut parser: P,
233 msg_stream: BoxSourceMessageStream,
234 source_ctrl_opts: SourceCtrlOpts,
235) {
236 let mut chunk_builder =
237 SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
238
239 let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
240
241 #[for_await]
242 for batch in msg_stream {
243 let batch = batch?;
249 let batch_len = batch.len();
250
251 if batch_len == 0 {
252 continue;
253 }
254
255 if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
256 let heartbeat_msg = batch.last().unwrap();
264 tracing::debug!(
265 offset = heartbeat_msg.offset,
266 "handling a heartbeat message"
267 );
268 chunk_builder.heartbeat(MessageMeta {
269 source_meta: &heartbeat_msg.meta,
270 split_id: &heartbeat_msg.split_id,
271 offset: &heartbeat_msg.offset,
272 });
273
274 for chunk in chunk_builder.consume_ready_chunks() {
275 yield chunk;
276 }
277 continue; }
279
280 let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
284 let process_time_ms = chrono::Utc::now().timestamp_millis();
285
286 for msg in batch {
287 if msg.is_cdc_heartbeat() {
288 continue;
290 }
291
292 if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
294 let lag_ms = process_time_ms - msg_meta.source_ts_ms;
295 let full_table_name = msg_meta.full_table_name.clone();
297 let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
298 .entry(full_table_name)
299 .or_insert_with(|| {
300 GLOBAL_SOURCE_METRICS
301 .direct_cdc_event_lag_latency
302 .with_guarded_label_values(&[&msg_meta.full_table_name])
303 });
304 direct_cdc_event_lag_latency.observe(lag_ms as f64);
305 }
306
307 match parser
311 .parse_one_with_txn(
312 msg.key,
313 msg.payload,
314 chunk_builder.row_writer().with_meta(MessageMeta {
315 source_meta: &msg.meta,
316 split_id: &msg.split_id,
317 offset: &msg.offset,
318 }),
319 )
320 .await
321 {
322 res @ (Ok(ParseResult::Rows) | Err(_)) => {
325 if let Err(error) = res {
326 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
329 LazyLock::new(LogSuppresser::default);
330 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
331 tracing::error!(
332 error = %error.as_report(),
333 split_id = &*msg.split_id,
334 offset = msg.offset,
335 suppressed_count,
336 "failed to parse message, skipping"
337 );
338 }
339
340 let context = parser.source_ctx();
342 GLOBAL_ERROR_METRICS.user_source_error.report([
343 error.variant_name().to_owned(),
344 context.source_id.to_string(),
345 context.source_name.clone(),
346 context.fragment_id.to_string(),
347 ]);
348 }
349
350 for chunk in chunk_builder.consume_ready_chunks() {
351 yield chunk;
352 }
353 }
354
355 Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
356 TransactionControl::Begin { id } => {
357 chunk_builder.begin_transaction(id);
358 }
359 TransactionControl::Commit { id } => {
360 chunk_builder.commit_transaction(id);
361 assert!(!chunk_builder.is_in_transaction());
362
363 if txn_started_in_last_batch {
364 chunk_builder.finish_current_chunk();
367 txn_started_in_last_batch = false;
368 }
369
370 for chunk in chunk_builder.consume_ready_chunks() {
371 yield chunk;
372 }
373 }
374 },
375
376 Ok(ParseResult::SchemaChange(schema_change)) => {
377 if schema_change.is_empty() {
378 continue;
379 }
380
381 let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
382 if let Some(ref tx) = parser.source_ctx().schema_change_tx {
386 tx.send((schema_change, oneshot_tx))
387 .await
388 .expect("send schema change to executor");
389 match oneshot_rx.await {
390 Ok(()) => {}
391 Err(e) => {
392 tracing::error!(error = %e.as_report(), "failed to wait for schema change");
393 }
394 }
395 }
396 }
397 }
398 }
399
400 if !chunk_builder.is_in_transaction() {
402 chunk_builder.finish_current_chunk();
403 }
404 for chunk in chunk_builder.consume_ready_chunks() {
405 yield chunk;
406 }
407 }
408}
409
410#[derive(Debug)]
411pub enum EncodingType {
412 Key,
413 Value,
414}
415
416#[derive(Debug)]
419pub enum ByteStreamSourceParserImpl {
420 Csv(CsvParser),
421 Debezium(DebeziumParser),
422 Plain(PlainParser),
423 Upsert(UpsertParser),
424 DebeziumMongoJson(DebeziumMongoJsonParser),
425 Maxwell(MaxwellParser),
426 CanalJson(CanalJsonParser),
427}
428
429impl ByteStreamSourceParserImpl {
430 pub fn parse_stream(
432 self,
433 msg_stream: BoxSourceMessageStream,
434 ) -> impl SourceChunkStream + Unpin {
435 #[auto_enum(futures03::Stream)]
436 let stream = match self {
437 Self::Csv(parser) => parser.parse_stream(msg_stream),
438 Self::Debezium(parser) => parser.parse_stream(msg_stream),
439 Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
440 Self::Maxwell(parser) => parser.parse_stream(msg_stream),
441 Self::CanalJson(parser) => parser.parse_stream(msg_stream),
442 Self::Plain(parser) => parser.parse_stream(msg_stream),
443 Self::Upsert(parser) => parser.parse_stream(msg_stream),
444 };
445 Box::pin(stream)
446 }
447}
448
449impl ByteStreamSourceParserImpl {
450 pub async fn create(
451 parser_config: ParserConfig,
452 source_ctx: SourceContextRef,
453 ) -> ConnectorResult<Self> {
454 let CommonParserConfig { rw_columns } = parser_config.common;
455 let protocol = &parser_config.specific.protocol_config;
456 let encode = &parser_config.specific.encoding_config;
457 match (protocol, encode) {
458 (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
459 CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
460 }
461 (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
462 DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
463 .map(Self::DebeziumMongoJson)
464 }
465 (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
466 CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
467 }
468 (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
469 (ProtocolProperties::Upsert, _) => {
470 let parser =
471 UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
472 Ok(Self::Upsert(parser))
473 }
474 (ProtocolProperties::Plain, _) => {
475 let parser =
476 PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
477 Ok(Self::Plain(parser))
478 }
479 (ProtocolProperties::Debezium(_), _) => {
480 let parser =
481 DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
482 Ok(Self::Debezium(parser))
483 }
484 (ProtocolProperties::Maxwell, _) => {
485 let parser =
486 MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
487 Ok(Self::Maxwell(parser))
488 }
489 _ => unreachable!(),
490 }
491 }
492
493 pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
495 futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
496 }
497}
498
499#[cfg(test)]
501pub mod test_utils {
502 use futures::StreamExt;
503 use itertools::Itertools;
504
505 use super::*;
506 use crate::source::SourceMessage;
507
508 #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
509 pub(crate) impl ByteStreamSourceParserImpl {
510 async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
512 let source_messages = payloads
513 .into_iter()
514 .map(|p| SourceMessage {
515 payload: (!p.is_empty()).then_some(p),
516 ..SourceMessage::dummy()
517 })
518 .collect_vec();
519
520 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
521 .next()
522 .await
523 .unwrap()
524 .unwrap()
525 }
526
527 async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
529 let source_messages = kvs
530 .into_iter()
531 .map(|(k, v)| SourceMessage {
532 key: (!k.is_empty()).then_some(k),
533 payload: (!v.is_empty()).then_some(v),
534 ..SourceMessage::dummy()
535 })
536 .collect_vec();
537
538 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
539 .next()
540 .await
541 .unwrap()
542 .unwrap()
543 }
544 }
545}