risingwave_connector/parser/
mod.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppressor;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{
45 BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
46};
47use self::upsert_parser::UpsertParser;
48use crate::error::ConnectorResult;
49use crate::parser::maxwell::MaxwellParser;
50use crate::schema::schema_registry::SchemaRegistryConfig;
51use crate::source::monitor::GLOBAL_SOURCE_METRICS;
52use crate::source::{
53 BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
54 SourceContextRef, SourceCtrlOpts, SourceMeta,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81pub use debezium::DEBEZIUM_IGNORE_KEY;
82use debezium::schema_change::SchemaChangeEnvelope;
83pub use unified::{AccessError, AccessResult};
84
85#[derive(Clone, Copy, Debug)]
89pub struct MessageMeta<'a> {
90 source_meta: &'a SourceMeta,
91 split_id: &'a str,
92 offset: &'a str,
93}
94
95impl<'a> MessageMeta<'a> {
96 fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
100 let datum: DatumRef<'_> = match desc.column_type {
101 SourceColumnType::RowId => None,
104 SourceColumnType::Offset => Some(self.offset.into()),
106 SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
108 assert_eq!(
109 desc.name.as_str(),
110 KAFKA_TIMESTAMP_COLUMN_NAME,
111 "unexpected kafka meta column name"
112 );
113 kafka_meta.extract_timestamp()
114 }
115 SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
116 assert_eq!(
117 desc.name.as_str(),
118 CDC_TABLE_NAME_COLUMN_NAME,
119 "unexpected cdc meta column name"
120 );
121 Some(cdc_meta.full_table_name.as_str().into())
122 }
123
124 SourceColumnType::Meta | SourceColumnType::Normal => return None,
126 };
127
128 datum
129 }
130}
131
132#[derive(Debug)]
134pub enum TransactionControl {
135 Begin { id: Box<str> },
136 Commit { id: Box<str> },
137}
138
139#[derive(Debug)]
141pub enum ParseResult {
142 Rows,
144 TransactionControl(TransactionControl),
146
147 SchemaChange(SchemaChangeEnvelope),
149}
150
151#[derive(Clone, Copy, Debug, PartialEq)]
152pub enum ParserFormat {
153 CanalJson,
154 Csv,
155 Json,
156 Maxwell,
157 Debezium,
158 DebeziumMongo,
159 Upsert,
160 Plain,
161}
162
163pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
168 fn columns(&self) -> &[SourceColumnDesc];
170
171 fn source_ctx(&self) -> &SourceContext;
173
174 fn parser_format(&self) -> ParserFormat;
176
177 fn parse_one<'a>(
181 &'a mut self,
182 key: Option<Vec<u8>>,
183 payload: Option<Vec<u8>>,
184 writer: SourceStreamChunkRowWriter<'a>,
185 ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
186
187 fn parse_one_with_txn<'a>(
195 &'a mut self,
196 key: Option<Vec<u8>>,
197 payload: Option<Vec<u8>>,
198 writer: SourceStreamChunkRowWriter<'a>,
199 ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
200 self.parse_one(key, payload, writer)
201 .map_ok(|_| ParseResult::Rows)
202 }
203}
204
205#[easy_ext::ext(SourceParserIntoStreamExt)]
206impl<P: ByteStreamSourceParser> P {
207 pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
219 let actor_id = self.source_ctx().actor_id;
220 let source_id = self.source_ctx().source_id.as_raw_id();
221
222 let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
225 parse_message_stream(self, msg_stream, source_ctrl_opts)
226 .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
227 }
228}
229
230#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
233async fn parse_message_stream<P: ByteStreamSourceParser>(
234 mut parser: P,
235 msg_stream: BoxSourceMessageStream,
236 source_ctrl_opts: SourceCtrlOpts,
237) {
238 let mut chunk_builder =
239 SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
240
241 let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
242
243 #[for_await]
244 for batch in msg_stream {
245 let batch = batch?;
251 let batch_len = batch.len();
252
253 if batch_len == 0 {
254 continue;
255 }
256
257 if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
258 let heartbeat_msg = batch.last().unwrap();
266 tracing::debug!(
267 offset = heartbeat_msg.offset,
268 "handling a heartbeat message"
269 );
270 chunk_builder.heartbeat(MessageMeta {
271 source_meta: &heartbeat_msg.meta,
272 split_id: &heartbeat_msg.split_id,
273 offset: &heartbeat_msg.offset,
274 });
275
276 for chunk in chunk_builder.consume_ready_chunks() {
277 yield chunk;
278 }
279 continue; }
281
282 let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
286 let process_time_ms = chrono::Utc::now().timestamp_millis();
287
288 for msg in batch {
289 if msg.is_cdc_heartbeat() {
290 continue;
292 }
293
294 if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
296 let lag_ms = process_time_ms - msg_meta.source_ts_ms;
297 let full_table_name = msg_meta.full_table_name.clone();
299 let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
300 .entry(full_table_name)
301 .or_insert_with(|| {
302 GLOBAL_SOURCE_METRICS
303 .direct_cdc_event_lag_latency
304 .with_guarded_label_values(&[&msg_meta.full_table_name])
305 });
306 direct_cdc_event_lag_latency.observe(lag_ms as f64);
307 }
308
309 match parser
313 .parse_one_with_txn(
314 msg.key,
315 msg.payload,
316 chunk_builder.row_writer().with_meta(MessageMeta {
317 source_meta: &msg.meta,
318 split_id: &msg.split_id,
319 offset: &msg.offset,
320 }),
321 )
322 .await
323 {
324 res @ (Ok(ParseResult::Rows) | Err(_)) => {
327 if let Err(error) = res {
328 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
331 LazyLock::new(LogSuppressor::default);
332 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
333 tracing::error!(
334 error = %error.as_report(),
335 split_id = &*msg.split_id,
336 offset = msg.offset,
337 suppressed_count,
338 "failed to parse message, skipping"
339 );
340 }
341
342 let context = parser.source_ctx();
344 GLOBAL_ERROR_METRICS.user_source_error.report([
345 error.variant_name().to_owned(),
346 context.source_id.to_string(),
347 context.source_name.clone(),
348 context.fragment_id.to_string(),
349 ]);
350 }
351
352 for chunk in chunk_builder.consume_ready_chunks() {
353 yield chunk;
354 }
355 }
356
357 Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
358 TransactionControl::Begin { id } => {
359 chunk_builder.begin_transaction(id);
360 }
361 TransactionControl::Commit { id } => {
362 chunk_builder.commit_transaction(id);
363 assert!(!chunk_builder.is_in_transaction());
364
365 if txn_started_in_last_batch {
366 chunk_builder.finish_current_chunk();
369 txn_started_in_last_batch = false;
370 }
371
372 for chunk in chunk_builder.consume_ready_chunks() {
373 yield chunk;
374 }
375 }
376 },
377
378 Ok(ParseResult::SchemaChange(schema_change)) => {
379 if schema_change.is_empty() {
380 continue;
381 }
382
383 let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
384 if let Some(ref tx) = parser.source_ctx().schema_change_tx {
388 tx.send((schema_change, oneshot_tx))
389 .await
390 .expect("send schema change to executor");
391 match oneshot_rx.await {
392 Ok(()) => {}
393 Err(e) => {
394 tracing::error!(error = %e.as_report(), "failed to wait for schema change");
395 }
396 }
397 }
398 }
399 }
400 }
401
402 if !chunk_builder.is_in_transaction() {
404 chunk_builder.finish_current_chunk();
405 }
406 for chunk in chunk_builder.consume_ready_chunks() {
407 yield chunk;
408 }
409 }
410}
411
412#[derive(Debug)]
413pub enum EncodingType {
414 Key,
415 Value,
416}
417
418#[derive(Debug)]
421pub enum ByteStreamSourceParserImpl {
422 Csv(CsvParser),
423 Debezium(DebeziumParser),
424 Plain(PlainParser),
425 Upsert(UpsertParser),
426 DebeziumMongoJson(DebeziumMongoJsonParser),
427 Maxwell(MaxwellParser),
428 CanalJson(CanalJsonParser),
429}
430
431impl ByteStreamSourceParserImpl {
432 pub fn parse_stream(
434 self,
435 msg_stream: BoxSourceMessageStream,
436 ) -> impl SourceChunkStream + Unpin {
437 #[auto_enum(futures03::Stream)]
438 let stream = match self {
439 Self::Csv(parser) => parser.parse_stream(msg_stream),
440 Self::Debezium(parser) => parser.parse_stream(msg_stream),
441 Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
442 Self::Maxwell(parser) => parser.parse_stream(msg_stream),
443 Self::CanalJson(parser) => parser.parse_stream(msg_stream),
444 Self::Plain(parser) => parser.parse_stream(msg_stream),
445 Self::Upsert(parser) => parser.parse_stream(msg_stream),
446 };
447 Box::pin(stream)
448 }
449}
450
451impl ByteStreamSourceParserImpl {
452 pub async fn create(
453 parser_config: ParserConfig,
454 source_ctx: SourceContextRef,
455 ) -> ConnectorResult<Self> {
456 let CommonParserConfig { rw_columns } = parser_config.common;
457 let protocol = &parser_config.specific.protocol_config;
458 let encode = &parser_config.specific.encoding_config;
459 match (protocol, encode) {
460 (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
461 CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
462 }
463 (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
464 DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
465 .map(Self::DebeziumMongoJson)
466 }
467 (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
468 CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
469 }
470 (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
471 (ProtocolProperties::Upsert, _) => {
472 let parser =
473 UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
474 Ok(Self::Upsert(parser))
475 }
476 (ProtocolProperties::Plain, _) => {
477 let parser =
478 PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
479 Ok(Self::Plain(parser))
480 }
481 (ProtocolProperties::Debezium(_), _) => {
482 let parser =
483 DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
484 Ok(Self::Debezium(parser))
485 }
486 (ProtocolProperties::Maxwell, _) => {
487 let parser =
488 MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
489 Ok(Self::Maxwell(parser))
490 }
491 _ => unreachable!(),
492 }
493 }
494
495 pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
497 futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
498 }
499}
500
501#[cfg(test)]
503pub mod test_utils {
504 use futures::StreamExt;
505 use itertools::Itertools;
506
507 use super::*;
508 use crate::source::SourceMessage;
509
510 #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
511 pub(crate) impl ByteStreamSourceParserImpl {
512 async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
514 let source_messages = payloads
515 .into_iter()
516 .map(|p| SourceMessage {
517 payload: (!p.is_empty()).then_some(p),
518 ..SourceMessage::dummy()
519 })
520 .collect_vec();
521
522 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
523 .next()
524 .await
525 .unwrap()
526 .unwrap()
527 }
528
529 async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
531 let source_messages = kvs
532 .into_iter()
533 .map(|(k, v)| SourceMessage {
534 key: (!k.is_empty()).then_some(k),
535 payload: (!v.is_empty()).then_some(v),
536 ..SourceMessage::dummy()
537 })
538 .collect_vec();
539
540 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
541 .next()
542 .await
543 .unwrap()
544 .unwrap()
545 }
546 }
547}