1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
31use risingwave_common::log::LogSuppressor;
32use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
33use risingwave_common::types::{DatumCow, DatumRef};
34use risingwave_common::util::tracing::InstrumentStream;
35use risingwave_connector_codec::decoder::avro::MapHandling;
36use thiserror_ext::AsReport;
37
38pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
39use self::plain_parser::PlainParser;
40pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
41pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
42pub use self::unified::Access;
43pub use self::unified::json::{
44 BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
45};
46use self::upsert_parser::UpsertParser;
47use crate::error::ConnectorResult;
48use crate::parser::maxwell::MaxwellParser;
49use crate::schema::schema_registry::SchemaRegistryConfig;
50use crate::source::monitor::GLOBAL_SOURCE_METRICS;
51use crate::source::{
52 BoxSourceMessageEventStream, SourceChunkStream, SourceColumnDesc, SourceColumnType,
53 SourceContext, SourceContextRef, SourceCtrlOpts, SourceMessageEvent, SourceMeta,
54 SourceReaderEvent,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81
82pub(crate) fn into_data_chunk_stream(
83 stream: impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send + 'static,
84) -> impl SourceChunkStream {
85 stream
86 .try_filter_map(|event| async move {
87 Ok(match event {
88 SourceReaderEvent::DataChunk(chunk) => Some(chunk),
89 SourceReaderEvent::SplitProgress(_) => None,
90 })
91 })
92 .boxed()
93}
94pub use debezium::DEBEZIUM_IGNORE_KEY;
95use debezium::schema_change::SchemaChangeEnvelope;
96pub use unified::{AccessError, AccessResult};
97
98#[derive(Clone, Copy, Debug)]
102pub struct MessageMeta<'a> {
103 source_meta: &'a SourceMeta,
104 split_id: &'a str,
105 offset: &'a str,
106}
107
108impl<'a> MessageMeta<'a> {
109 fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
113 let datum: DatumRef<'_> = match desc.column_type {
114 SourceColumnType::RowId => None,
117 SourceColumnType::Offset => Some(self.offset.into()),
119 SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
121 assert_eq!(
122 desc.name.as_str(),
123 KAFKA_TIMESTAMP_COLUMN_NAME,
124 "unexpected kafka meta column name"
125 );
126 kafka_meta.extract_timestamp()
127 }
128 SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
129 assert_eq!(
130 desc.name.as_str(),
131 CDC_TABLE_NAME_COLUMN_NAME,
132 "unexpected cdc meta column name"
133 );
134 cdc_meta.extract_table_name()
136 }
137
138 SourceColumnType::Meta | SourceColumnType::Normal => return None,
140 };
141
142 datum
143 }
144}
145
146#[derive(Debug)]
148pub enum TransactionControl {
149 Begin { id: Box<str> },
150 Commit { id: Box<str> },
151}
152
153#[derive(Debug)]
155pub enum ParseResult {
156 Rows,
158 TransactionControl(TransactionControl),
160
161 SchemaChange(SchemaChangeEnvelope),
163}
164
165#[derive(Clone, Copy, Debug, PartialEq)]
166pub enum ParserFormat {
167 CanalJson,
168 Csv,
169 Json,
170 Maxwell,
171 Debezium,
172 DebeziumMongo,
173 Upsert,
174 Plain,
175}
176
177pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
182 fn columns(&self) -> &[SourceColumnDesc];
184
185 fn source_ctx(&self) -> &SourceContext;
187
188 fn parser_format(&self) -> ParserFormat;
190
191 fn parse_one<'a>(
195 &'a mut self,
196 key: Option<Vec<u8>>,
197 payload: Option<Vec<u8>>,
198 writer: SourceStreamChunkRowWriter<'a>,
199 ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
200
201 fn parse_one_with_txn<'a>(
209 &'a mut self,
210 key: Option<Vec<u8>>,
211 payload: Option<Vec<u8>>,
212 writer: SourceStreamChunkRowWriter<'a>,
213 ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
214 self.parse_one(key, payload, writer)
215 .map_ok(|_| ParseResult::Rows)
216 }
217}
218
219#[easy_ext::ext(SourceParserIntoStreamExt)]
220impl<P: ByteStreamSourceParser> P {
221 pub fn parse_stream_with_events(
222 self,
223 msg_stream: BoxSourceMessageEventStream,
224 ) -> impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send {
225 let actor_id = self.source_ctx().actor_id;
226 let source_id = self.source_ctx().source_id.as_raw_id();
227
228 let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
231 parse_message_stream(self, msg_stream, source_ctrl_opts).instrument_with(
232 move || tracing::info_span!("source_parse_chunk", %actor_id, source_id),
233 )
234 }
235}
236
237#[try_stream(ok = SourceReaderEvent, error = crate::error::ConnectorError)]
240async fn parse_message_stream<P: ByteStreamSourceParser>(
241 mut parser: P,
242 msg_stream: BoxSourceMessageEventStream,
243 source_ctrl_opts: SourceCtrlOpts,
244) {
245 let mut chunk_builder =
246 SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
247
248 let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
249
250 #[for_await]
251 for event in msg_stream {
252 let batch = match event? {
258 SourceMessageEvent::Data(batch) => batch,
259 SourceMessageEvent::SplitProgress(progress) => {
260 yield SourceReaderEvent::SplitProgress(progress);
261 continue;
262 }
263 };
264 let batch_len = batch.len();
265 if batch_len == 0 {
266 continue;
267 }
268
269 let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
270 let process_time_ms = chrono::Utc::now().timestamp_millis();
271 let mut is_heartbeat_emitted = false;
272 for msg in batch {
273 if msg.is_cdc_heartbeat() {
274 if !is_heartbeat_emitted {
275 tracing::debug!(offset = msg.offset, "handling a heartbeat message");
276 chunk_builder.heartbeat(MessageMeta {
277 source_meta: &msg.meta,
278 split_id: &msg.split_id,
279 offset: &msg.offset,
280 });
281 for chunk in chunk_builder.consume_ready_chunks() {
282 yield SourceReaderEvent::DataChunk(chunk);
283 }
284 is_heartbeat_emitted = true;
285 }
286 continue;
287 }
288
289 if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
291 let lag_ms = process_time_ms - msg_meta.source_ts_ms;
292 let full_table_name = msg_meta.full_table_name.clone();
294 let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
295 .entry(full_table_name)
296 .or_insert_with(|| {
297 GLOBAL_SOURCE_METRICS
298 .direct_cdc_event_lag_latency
299 .with_guarded_label_values(&[&msg_meta.full_table_name])
300 });
301 direct_cdc_event_lag_latency.observe(lag_ms as f64);
302 }
303
304 match parser
308 .parse_one_with_txn(
309 msg.key,
310 msg.payload,
311 chunk_builder.row_writer().with_meta(MessageMeta {
312 source_meta: &msg.meta,
313 split_id: &msg.split_id,
314 offset: &msg.offset,
315 }),
316 )
317 .await
318 {
319 res @ (Ok(ParseResult::Rows) | Err(_)) => {
322 if let Err(error) = res {
323 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
326 LazyLock::new(LogSuppressor::default);
327 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
328 tracing::error!(
329 error = %error.as_report(),
330 split_id = &*msg.split_id,
331 offset = msg.offset,
332 suppressed_count,
333 "failed to parse message, skipping"
334 );
335 }
336
337 let context = parser.source_ctx();
339 GLOBAL_ERROR_METRICS.user_source_error.report([
340 error.variant_name().to_owned(),
341 context.source_id.to_string(),
342 context.source_name.clone(),
343 context.fragment_id.to_string(),
344 ]);
345 }
346
347 for chunk in chunk_builder.consume_ready_chunks() {
348 yield SourceReaderEvent::DataChunk(chunk);
349 }
350 }
351
352 Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
353 TransactionControl::Begin { id } => {
354 chunk_builder.begin_transaction(id);
355 }
356 TransactionControl::Commit { id } => {
357 chunk_builder.commit_transaction(id);
358 assert!(!chunk_builder.is_in_transaction());
359
360 if txn_started_in_last_batch {
361 chunk_builder.finish_current_chunk();
364 txn_started_in_last_batch = false;
365 }
366
367 for chunk in chunk_builder.consume_ready_chunks() {
368 yield SourceReaderEvent::DataChunk(chunk);
369 }
370 }
371 },
372
373 Ok(ParseResult::SchemaChange(schema_change)) => {
374 if schema_change.is_empty() {
375 continue;
376 }
377
378 let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
379 if let Some(ref tx) = parser.source_ctx().schema_change_tx {
383 tx.send((schema_change, oneshot_tx))
384 .await
385 .expect("send schema change to executor");
386 match oneshot_rx.await {
387 Ok(()) => {}
388 Err(e) => {
389 tracing::error!(error = %e.as_report(), "failed to wait for schema change");
390 }
391 }
392 }
393 }
394 }
395 }
396
397 if !chunk_builder.is_in_transaction() {
399 chunk_builder.finish_current_chunk();
400 }
401 for chunk in chunk_builder.consume_ready_chunks() {
402 yield SourceReaderEvent::DataChunk(chunk);
403 }
404 }
405}
406
407#[derive(Debug)]
408pub enum EncodingType {
409 Key,
410 Value,
411}
412
413#[derive(Debug)]
416pub enum ByteStreamSourceParserImpl {
417 Csv(CsvParser),
418 Debezium(DebeziumParser),
419 Plain(PlainParser),
420 Upsert(UpsertParser),
421 DebeziumMongoJson(DebeziumMongoJsonParser),
422 Maxwell(MaxwellParser),
423 CanalJson(CanalJsonParser),
424}
425
426impl ByteStreamSourceParserImpl {
427 pub fn parse_stream_with_events(
428 self,
429 msg_stream: BoxSourceMessageEventStream,
430 ) -> impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send {
431 #[auto_enum(futures03::Stream)]
432 let stream = match self {
433 Self::Csv(parser) => parser.parse_stream_with_events(msg_stream),
434 Self::Debezium(parser) => parser.parse_stream_with_events(msg_stream),
435 Self::DebeziumMongoJson(parser) => parser.parse_stream_with_events(msg_stream),
436 Self::Maxwell(parser) => parser.parse_stream_with_events(msg_stream),
437 Self::CanalJson(parser) => parser.parse_stream_with_events(msg_stream),
438 Self::Plain(parser) => parser.parse_stream_with_events(msg_stream),
439 Self::Upsert(parser) => parser.parse_stream_with_events(msg_stream),
440 };
441 Box::pin(stream)
442 }
443}
444
445impl ByteStreamSourceParserImpl {
446 pub async fn create(
447 parser_config: ParserConfig,
448 source_ctx: SourceContextRef,
449 ) -> ConnectorResult<Self> {
450 let CommonParserConfig { rw_columns } = parser_config.common;
451 let protocol = &parser_config.specific.protocol_config;
452 let encode = &parser_config.specific.encoding_config;
453 match (protocol, encode) {
454 (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
455 CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
456 }
457 (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
458 DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
459 .map(Self::DebeziumMongoJson)
460 }
461 (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
462 CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
463 }
464 (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
465 (ProtocolProperties::Upsert, _) => {
466 let parser =
467 UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
468 Ok(Self::Upsert(parser))
469 }
470 (ProtocolProperties::Plain, _) => {
471 let parser =
472 PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
473 Ok(Self::Plain(parser))
474 }
475 (ProtocolProperties::Debezium(_), _) => {
476 let parser =
477 DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
478 Ok(Self::Debezium(parser))
479 }
480 (ProtocolProperties::Maxwell, _) => {
481 let parser =
482 MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
483 Ok(Self::Maxwell(parser))
484 }
485 _ => unreachable!(),
486 }
487 }
488
489 pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
491 futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
492 }
493}
494
495#[cfg(test)]
497pub mod test_utils {
498 use futures::StreamExt;
499 use itertools::Itertools;
500 use risingwave_common::array::StreamChunk;
501
502 use super::*;
503 use crate::source::SourceMessage;
504
505 #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
506 pub(crate) impl ByteStreamSourceParserImpl {
507 async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
509 let source_messages = payloads
510 .into_iter()
511 .map(|p| SourceMessage {
512 payload: (!p.is_empty()).then_some(p),
513 ..SourceMessage::dummy()
514 })
515 .collect_vec();
516
517 into_data_chunk_stream(
518 self.parse_stream_with_events(
519 futures::stream::once(async { Ok(SourceMessageEvent::Data(source_messages)) })
520 .boxed(),
521 ),
522 )
523 .next()
524 .await
525 .unwrap()
526 .unwrap()
527 }
528
529 async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
531 let source_messages = kvs
532 .into_iter()
533 .map(|(k, v)| SourceMessage {
534 key: (!k.is_empty()).then_some(k),
535 payload: (!v.is_empty()).then_some(v),
536 ..SourceMessage::dummy()
537 })
538 .collect_vec();
539
540 into_data_chunk_stream(
541 self.parse_stream_with_events(
542 futures::stream::once(async { Ok(SourceMessageEvent::Data(source_messages)) })
543 .boxed(),
544 ),
545 )
546 .next()
547 .await
548 .unwrap()
549 .unwrap()
550 }
551 }
552}