risingwave_connector/parser/
mod.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppressor;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{
45 BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
46};
47use self::upsert_parser::UpsertParser;
48use crate::error::ConnectorResult;
49use crate::parser::maxwell::MaxwellParser;
50use crate::schema::schema_registry::SchemaRegistryConfig;
51use crate::source::monitor::GLOBAL_SOURCE_METRICS;
52use crate::source::{
53 BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
54 SourceContextRef, SourceCtrlOpts, SourceMeta,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81pub use debezium::DEBEZIUM_IGNORE_KEY;
82use debezium::schema_change::SchemaChangeEnvelope;
83pub use unified::{AccessError, AccessResult};
84
85#[derive(Clone, Copy, Debug)]
89pub struct MessageMeta<'a> {
90 source_meta: &'a SourceMeta,
91 split_id: &'a str,
92 offset: &'a str,
93}
94
95impl<'a> MessageMeta<'a> {
96 fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
100 let datum: DatumRef<'_> = match desc.column_type {
101 SourceColumnType::RowId => None,
104 SourceColumnType::Offset => Some(self.offset.into()),
106 SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
108 assert_eq!(
109 desc.name.as_str(),
110 KAFKA_TIMESTAMP_COLUMN_NAME,
111 "unexpected kafka meta column name"
112 );
113 kafka_meta.extract_timestamp()
114 }
115 SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
116 assert_eq!(
117 desc.name.as_str(),
118 CDC_TABLE_NAME_COLUMN_NAME,
119 "unexpected cdc meta column name"
120 );
121 Some(cdc_meta.full_table_name.as_str().into())
122 }
123
124 SourceColumnType::Meta | SourceColumnType::Normal => return None,
126 };
127
128 datum
129 }
130}
131
132#[derive(Debug)]
134pub enum TransactionControl {
135 Begin { id: Box<str> },
136 Commit { id: Box<str> },
137}
138
139#[derive(Debug)]
141pub enum ParseResult {
142 Rows,
144 TransactionControl(TransactionControl),
146
147 SchemaChange(SchemaChangeEnvelope),
149}
150
151#[derive(Clone, Copy, Debug, PartialEq)]
152pub enum ParserFormat {
153 CanalJson,
154 Csv,
155 Json,
156 Maxwell,
157 Debezium,
158 DebeziumMongo,
159 Upsert,
160 Plain,
161}
162
163pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
168 fn columns(&self) -> &[SourceColumnDesc];
170
171 fn source_ctx(&self) -> &SourceContext;
173
174 fn parser_format(&self) -> ParserFormat;
176
177 fn parse_one<'a>(
181 &'a mut self,
182 key: Option<Vec<u8>>,
183 payload: Option<Vec<u8>>,
184 writer: SourceStreamChunkRowWriter<'a>,
185 ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
186
187 fn parse_one_with_txn<'a>(
195 &'a mut self,
196 key: Option<Vec<u8>>,
197 payload: Option<Vec<u8>>,
198 writer: SourceStreamChunkRowWriter<'a>,
199 ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
200 self.parse_one(key, payload, writer)
201 .map_ok(|_| ParseResult::Rows)
202 }
203}
204
205#[easy_ext::ext(SourceParserIntoStreamExt)]
206impl<P: ByteStreamSourceParser> P {
207 pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
219 let actor_id = self.source_ctx().actor_id;
220 let source_id = self.source_ctx().source_id.as_raw_id();
221
222 let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
225 parse_message_stream(self, msg_stream, source_ctrl_opts).instrument_with(
226 move || tracing::info_span!("source_parse_chunk", %actor_id, source_id),
227 )
228 }
229}
230
231#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
234async fn parse_message_stream<P: ByteStreamSourceParser>(
235 mut parser: P,
236 msg_stream: BoxSourceMessageStream,
237 source_ctrl_opts: SourceCtrlOpts,
238) {
239 let mut chunk_builder =
240 SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
241
242 let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
243
244 #[for_await]
245 for batch in msg_stream {
246 let batch = batch?;
252 let batch_len = batch.len();
253 if batch_len == 0 {
254 continue;
255 }
256
257 let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
258 let process_time_ms = chrono::Utc::now().timestamp_millis();
259 let mut is_heartbeat_emitted = false;
260 for msg in batch {
261 if msg.is_cdc_heartbeat() {
262 if !is_heartbeat_emitted {
263 tracing::debug!(offset = msg.offset, "handling a heartbeat message");
264 chunk_builder.heartbeat(MessageMeta {
265 source_meta: &msg.meta,
266 split_id: &msg.split_id,
267 offset: &msg.offset,
268 });
269 for chunk in chunk_builder.consume_ready_chunks() {
270 yield chunk;
271 }
272 is_heartbeat_emitted = true;
273 }
274 continue;
275 }
276
277 if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
279 let lag_ms = process_time_ms - msg_meta.source_ts_ms;
280 let full_table_name = msg_meta.full_table_name.clone();
282 let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
283 .entry(full_table_name)
284 .or_insert_with(|| {
285 GLOBAL_SOURCE_METRICS
286 .direct_cdc_event_lag_latency
287 .with_guarded_label_values(&[&msg_meta.full_table_name])
288 });
289 direct_cdc_event_lag_latency.observe(lag_ms as f64);
290 }
291
292 match parser
296 .parse_one_with_txn(
297 msg.key,
298 msg.payload,
299 chunk_builder.row_writer().with_meta(MessageMeta {
300 source_meta: &msg.meta,
301 split_id: &msg.split_id,
302 offset: &msg.offset,
303 }),
304 )
305 .await
306 {
307 res @ (Ok(ParseResult::Rows) | Err(_)) => {
310 if let Err(error) = res {
311 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
314 LazyLock::new(LogSuppressor::default);
315 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
316 tracing::error!(
317 error = %error.as_report(),
318 split_id = &*msg.split_id,
319 offset = msg.offset,
320 suppressed_count,
321 "failed to parse message, skipping"
322 );
323 }
324
325 let context = parser.source_ctx();
327 GLOBAL_ERROR_METRICS.user_source_error.report([
328 error.variant_name().to_owned(),
329 context.source_id.to_string(),
330 context.source_name.clone(),
331 context.fragment_id.to_string(),
332 ]);
333 }
334
335 for chunk in chunk_builder.consume_ready_chunks() {
336 yield chunk;
337 }
338 }
339
340 Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
341 TransactionControl::Begin { id } => {
342 chunk_builder.begin_transaction(id);
343 }
344 TransactionControl::Commit { id } => {
345 chunk_builder.commit_transaction(id);
346 assert!(!chunk_builder.is_in_transaction());
347
348 if txn_started_in_last_batch {
349 chunk_builder.finish_current_chunk();
352 txn_started_in_last_batch = false;
353 }
354
355 for chunk in chunk_builder.consume_ready_chunks() {
356 yield chunk;
357 }
358 }
359 },
360
361 Ok(ParseResult::SchemaChange(schema_change)) => {
362 if schema_change.is_empty() {
363 continue;
364 }
365
366 let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
367 if let Some(ref tx) = parser.source_ctx().schema_change_tx {
371 tx.send((schema_change, oneshot_tx))
372 .await
373 .expect("send schema change to executor");
374 match oneshot_rx.await {
375 Ok(()) => {}
376 Err(e) => {
377 tracing::error!(error = %e.as_report(), "failed to wait for schema change");
378 }
379 }
380 }
381 }
382 }
383 }
384
385 if !chunk_builder.is_in_transaction() {
387 chunk_builder.finish_current_chunk();
388 }
389 for chunk in chunk_builder.consume_ready_chunks() {
390 yield chunk;
391 }
392 }
393}
394
395#[derive(Debug)]
396pub enum EncodingType {
397 Key,
398 Value,
399}
400
401#[derive(Debug)]
404pub enum ByteStreamSourceParserImpl {
405 Csv(CsvParser),
406 Debezium(DebeziumParser),
407 Plain(PlainParser),
408 Upsert(UpsertParser),
409 DebeziumMongoJson(DebeziumMongoJsonParser),
410 Maxwell(MaxwellParser),
411 CanalJson(CanalJsonParser),
412}
413
414impl ByteStreamSourceParserImpl {
415 pub fn parse_stream(
417 self,
418 msg_stream: BoxSourceMessageStream,
419 ) -> impl SourceChunkStream + Unpin {
420 #[auto_enum(futures03::Stream)]
421 let stream = match self {
422 Self::Csv(parser) => parser.parse_stream(msg_stream),
423 Self::Debezium(parser) => parser.parse_stream(msg_stream),
424 Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
425 Self::Maxwell(parser) => parser.parse_stream(msg_stream),
426 Self::CanalJson(parser) => parser.parse_stream(msg_stream),
427 Self::Plain(parser) => parser.parse_stream(msg_stream),
428 Self::Upsert(parser) => parser.parse_stream(msg_stream),
429 };
430 Box::pin(stream)
431 }
432}
433
434impl ByteStreamSourceParserImpl {
435 pub async fn create(
436 parser_config: ParserConfig,
437 source_ctx: SourceContextRef,
438 ) -> ConnectorResult<Self> {
439 let CommonParserConfig { rw_columns } = parser_config.common;
440 let protocol = &parser_config.specific.protocol_config;
441 let encode = &parser_config.specific.encoding_config;
442 match (protocol, encode) {
443 (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
444 CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
445 }
446 (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
447 DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
448 .map(Self::DebeziumMongoJson)
449 }
450 (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
451 CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
452 }
453 (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
454 (ProtocolProperties::Upsert, _) => {
455 let parser =
456 UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
457 Ok(Self::Upsert(parser))
458 }
459 (ProtocolProperties::Plain, _) => {
460 let parser =
461 PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
462 Ok(Self::Plain(parser))
463 }
464 (ProtocolProperties::Debezium(_), _) => {
465 let parser =
466 DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
467 Ok(Self::Debezium(parser))
468 }
469 (ProtocolProperties::Maxwell, _) => {
470 let parser =
471 MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
472 Ok(Self::Maxwell(parser))
473 }
474 _ => unreachable!(),
475 }
476 }
477
478 pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
480 futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
481 }
482}
483
484#[cfg(test)]
486pub mod test_utils {
487 use futures::StreamExt;
488 use itertools::Itertools;
489
490 use super::*;
491 use crate::source::SourceMessage;
492
493 #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
494 pub(crate) impl ByteStreamSourceParserImpl {
495 async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
497 let source_messages = payloads
498 .into_iter()
499 .map(|p| SourceMessage {
500 payload: (!p.is_empty()).then_some(p),
501 ..SourceMessage::dummy()
502 })
503 .collect_vec();
504
505 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
506 .next()
507 .await
508 .unwrap()
509 .unwrap()
510 }
511
512 async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
514 let source_messages = kvs
515 .into_iter()
516 .map(|(k, v)| SourceMessage {
517 key: (!k.is_empty()).then_some(k),
518 payload: (!v.is_empty()).then_some(v),
519 ..SourceMessage::dummy()
520 })
521 .collect_vec();
522
523 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
524 .next()
525 .await
526 .unwrap()
527 .unwrap()
528 }
529 }
530}