risingwave_connector/parser/
mod.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppressor;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{
45 BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
46};
47use self::upsert_parser::UpsertParser;
48use crate::error::ConnectorResult;
49use crate::parser::maxwell::MaxwellParser;
50use crate::schema::schema_registry::SchemaRegistryConfig;
51use crate::source::monitor::GLOBAL_SOURCE_METRICS;
52use crate::source::{
53 BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
54 SourceContextRef, SourceCtrlOpts, SourceMeta,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81pub use debezium::DEBEZIUM_IGNORE_KEY;
82use debezium::schema_change::SchemaChangeEnvelope;
83pub use unified::{AccessError, AccessResult};
84
85#[derive(Clone, Copy, Debug)]
89pub struct MessageMeta<'a> {
90 source_meta: &'a SourceMeta,
91 split_id: &'a str,
92 offset: &'a str,
93}
94
95impl<'a> MessageMeta<'a> {
96 fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
100 let datum: DatumRef<'_> = match desc.column_type {
101 SourceColumnType::RowId => None,
104 SourceColumnType::Offset => Some(self.offset.into()),
106 SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
108 assert_eq!(
109 desc.name.as_str(),
110 KAFKA_TIMESTAMP_COLUMN_NAME,
111 "unexpected kafka meta column name"
112 );
113 kafka_meta.extract_timestamp()
114 }
115 SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
116 assert_eq!(
117 desc.name.as_str(),
118 CDC_TABLE_NAME_COLUMN_NAME,
119 "unexpected cdc meta column name"
120 );
121 cdc_meta.extract_table_name()
123 }
124
125 SourceColumnType::Meta | SourceColumnType::Normal => return None,
127 };
128
129 datum
130 }
131}
132
133#[derive(Debug)]
135pub enum TransactionControl {
136 Begin { id: Box<str> },
137 Commit { id: Box<str> },
138}
139
140#[derive(Debug)]
142pub enum ParseResult {
143 Rows,
145 TransactionControl(TransactionControl),
147
148 SchemaChange(SchemaChangeEnvelope),
150}
151
152#[derive(Clone, Copy, Debug, PartialEq)]
153pub enum ParserFormat {
154 CanalJson,
155 Csv,
156 Json,
157 Maxwell,
158 Debezium,
159 DebeziumMongo,
160 Upsert,
161 Plain,
162}
163
164pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
169 fn columns(&self) -> &[SourceColumnDesc];
171
172 fn source_ctx(&self) -> &SourceContext;
174
175 fn parser_format(&self) -> ParserFormat;
177
178 fn parse_one<'a>(
182 &'a mut self,
183 key: Option<Vec<u8>>,
184 payload: Option<Vec<u8>>,
185 writer: SourceStreamChunkRowWriter<'a>,
186 ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
187
188 fn parse_one_with_txn<'a>(
196 &'a mut self,
197 key: Option<Vec<u8>>,
198 payload: Option<Vec<u8>>,
199 writer: SourceStreamChunkRowWriter<'a>,
200 ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
201 self.parse_one(key, payload, writer)
202 .map_ok(|_| ParseResult::Rows)
203 }
204}
205
206#[easy_ext::ext(SourceParserIntoStreamExt)]
207impl<P: ByteStreamSourceParser> P {
208 pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
220 let actor_id = self.source_ctx().actor_id;
221 let source_id = self.source_ctx().source_id.as_raw_id();
222
223 let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
226 parse_message_stream(self, msg_stream, source_ctrl_opts).instrument_with(
227 move || tracing::info_span!("source_parse_chunk", %actor_id, source_id),
228 )
229 }
230}
231
232#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
235async fn parse_message_stream<P: ByteStreamSourceParser>(
236 mut parser: P,
237 msg_stream: BoxSourceMessageStream,
238 source_ctrl_opts: SourceCtrlOpts,
239) {
240 let mut chunk_builder =
241 SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
242
243 let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
244
245 #[for_await]
246 for batch in msg_stream {
247 let batch = batch?;
253 let batch_len = batch.len();
254 if batch_len == 0 {
255 continue;
256 }
257
258 let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
259 let process_time_ms = chrono::Utc::now().timestamp_millis();
260 let mut is_heartbeat_emitted = false;
261 for msg in batch {
262 if msg.is_cdc_heartbeat() {
263 if !is_heartbeat_emitted {
264 tracing::debug!(offset = msg.offset, "handling a heartbeat message");
265 chunk_builder.heartbeat(MessageMeta {
266 source_meta: &msg.meta,
267 split_id: &msg.split_id,
268 offset: &msg.offset,
269 });
270 for chunk in chunk_builder.consume_ready_chunks() {
271 yield chunk;
272 }
273 is_heartbeat_emitted = true;
274 }
275 continue;
276 }
277
278 if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
280 let lag_ms = process_time_ms - msg_meta.source_ts_ms;
281 let full_table_name = msg_meta.full_table_name.clone();
283 let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
284 .entry(full_table_name)
285 .or_insert_with(|| {
286 GLOBAL_SOURCE_METRICS
287 .direct_cdc_event_lag_latency
288 .with_guarded_label_values(&[&msg_meta.full_table_name])
289 });
290 direct_cdc_event_lag_latency.observe(lag_ms as f64);
291 }
292
293 match parser
297 .parse_one_with_txn(
298 msg.key,
299 msg.payload,
300 chunk_builder.row_writer().with_meta(MessageMeta {
301 source_meta: &msg.meta,
302 split_id: &msg.split_id,
303 offset: &msg.offset,
304 }),
305 )
306 .await
307 {
308 res @ (Ok(ParseResult::Rows) | Err(_)) => {
311 if let Err(error) = res {
312 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
315 LazyLock::new(LogSuppressor::default);
316 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
317 tracing::error!(
318 error = %error.as_report(),
319 split_id = &*msg.split_id,
320 offset = msg.offset,
321 suppressed_count,
322 "failed to parse message, skipping"
323 );
324 }
325
326 let context = parser.source_ctx();
328 GLOBAL_ERROR_METRICS.user_source_error.report([
329 error.variant_name().to_owned(),
330 context.source_id.to_string(),
331 context.source_name.clone(),
332 context.fragment_id.to_string(),
333 ]);
334 }
335
336 for chunk in chunk_builder.consume_ready_chunks() {
337 yield chunk;
338 }
339 }
340
341 Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
342 TransactionControl::Begin { id } => {
343 chunk_builder.begin_transaction(id);
344 }
345 TransactionControl::Commit { id } => {
346 chunk_builder.commit_transaction(id);
347 assert!(!chunk_builder.is_in_transaction());
348
349 if txn_started_in_last_batch {
350 chunk_builder.finish_current_chunk();
353 txn_started_in_last_batch = false;
354 }
355
356 for chunk in chunk_builder.consume_ready_chunks() {
357 yield chunk;
358 }
359 }
360 },
361
362 Ok(ParseResult::SchemaChange(schema_change)) => {
363 if schema_change.is_empty() {
364 continue;
365 }
366
367 let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
368 if let Some(ref tx) = parser.source_ctx().schema_change_tx {
372 tx.send((schema_change, oneshot_tx))
373 .await
374 .expect("send schema change to executor");
375 match oneshot_rx.await {
376 Ok(()) => {}
377 Err(e) => {
378 tracing::error!(error = %e.as_report(), "failed to wait for schema change");
379 }
380 }
381 }
382 }
383 }
384 }
385
386 if !chunk_builder.is_in_transaction() {
388 chunk_builder.finish_current_chunk();
389 }
390 for chunk in chunk_builder.consume_ready_chunks() {
391 yield chunk;
392 }
393 }
394}
395
396#[derive(Debug)]
397pub enum EncodingType {
398 Key,
399 Value,
400}
401
402#[derive(Debug)]
405pub enum ByteStreamSourceParserImpl {
406 Csv(CsvParser),
407 Debezium(DebeziumParser),
408 Plain(PlainParser),
409 Upsert(UpsertParser),
410 DebeziumMongoJson(DebeziumMongoJsonParser),
411 Maxwell(MaxwellParser),
412 CanalJson(CanalJsonParser),
413}
414
415impl ByteStreamSourceParserImpl {
416 pub fn parse_stream(
418 self,
419 msg_stream: BoxSourceMessageStream,
420 ) -> impl SourceChunkStream + Unpin {
421 #[auto_enum(futures03::Stream)]
422 let stream = match self {
423 Self::Csv(parser) => parser.parse_stream(msg_stream),
424 Self::Debezium(parser) => parser.parse_stream(msg_stream),
425 Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
426 Self::Maxwell(parser) => parser.parse_stream(msg_stream),
427 Self::CanalJson(parser) => parser.parse_stream(msg_stream),
428 Self::Plain(parser) => parser.parse_stream(msg_stream),
429 Self::Upsert(parser) => parser.parse_stream(msg_stream),
430 };
431 Box::pin(stream)
432 }
433}
434
435impl ByteStreamSourceParserImpl {
436 pub async fn create(
437 parser_config: ParserConfig,
438 source_ctx: SourceContextRef,
439 ) -> ConnectorResult<Self> {
440 let CommonParserConfig { rw_columns } = parser_config.common;
441 let protocol = &parser_config.specific.protocol_config;
442 let encode = &parser_config.specific.encoding_config;
443 match (protocol, encode) {
444 (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
445 CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
446 }
447 (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
448 DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
449 .map(Self::DebeziumMongoJson)
450 }
451 (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
452 CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
453 }
454 (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
455 (ProtocolProperties::Upsert, _) => {
456 let parser =
457 UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
458 Ok(Self::Upsert(parser))
459 }
460 (ProtocolProperties::Plain, _) => {
461 let parser =
462 PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
463 Ok(Self::Plain(parser))
464 }
465 (ProtocolProperties::Debezium(_), _) => {
466 let parser =
467 DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
468 Ok(Self::Debezium(parser))
469 }
470 (ProtocolProperties::Maxwell, _) => {
471 let parser =
472 MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
473 Ok(Self::Maxwell(parser))
474 }
475 _ => unreachable!(),
476 }
477 }
478
479 pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
481 futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
482 }
483}
484
485#[cfg(test)]
487pub mod test_utils {
488 use futures::StreamExt;
489 use itertools::Itertools;
490
491 use super::*;
492 use crate::source::SourceMessage;
493
494 #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
495 pub(crate) impl ByteStreamSourceParserImpl {
496 async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
498 let source_messages = payloads
499 .into_iter()
500 .map(|p| SourceMessage {
501 payload: (!p.is_empty()).then_some(p),
502 ..SourceMessage::dummy()
503 })
504 .collect_vec();
505
506 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
507 .next()
508 .await
509 .unwrap()
510 .unwrap()
511 }
512
513 async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
515 let source_messages = kvs
516 .into_iter()
517 .map(|(k, v)| SourceMessage {
518 key: (!k.is_empty()).then_some(k),
519 payload: (!v.is_empty()).then_some(v),
520 ..SourceMessage::dummy()
521 })
522 .collect_vec();
523
524 self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
525 .next()
526 .await
527 .unwrap()
528 .unwrap()
529 }
530 }
531}