risingwave_connector/parser/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppressor;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{
45    BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
46};
47use self::upsert_parser::UpsertParser;
48use crate::error::ConnectorResult;
49use crate::parser::maxwell::MaxwellParser;
50use crate::schema::schema_registry::SchemaRegistryConfig;
51use crate::source::monitor::GLOBAL_SOURCE_METRICS;
52use crate::source::{
53    BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
54    SourceContextRef, SourceCtrlOpts, SourceMeta,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81pub use debezium::DEBEZIUM_IGNORE_KEY;
82use debezium::schema_change::SchemaChangeEnvelope;
83pub use unified::{AccessError, AccessResult};
84
85/// The meta data of the original message for a row writer.
86///
87/// Extracted from the `SourceMessage`.
88#[derive(Clone, Copy, Debug)]
89pub struct MessageMeta<'a> {
90    source_meta: &'a SourceMeta,
91    split_id: &'a str,
92    offset: &'a str,
93}
94
95impl<'a> MessageMeta<'a> {
96    /// Extract the value for the given column.
97    ///
98    /// Returns `None` if the column is not a meta column.
99    fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
100        let datum: DatumRef<'_> = match desc.column_type {
101            // Row id columns are filled with `NULL` here and will be filled with the real
102            // row id generated by `RowIdGenExecutor` later.
103            SourceColumnType::RowId => None,
104            // Extract the offset from the meta data.
105            SourceColumnType::Offset => Some(self.offset.into()),
106            // Extract custom meta data per connector.
107            SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
108                assert_eq!(
109                    desc.name.as_str(),
110                    KAFKA_TIMESTAMP_COLUMN_NAME,
111                    "unexpected kafka meta column name"
112                );
113                kafka_meta.extract_timestamp()
114            }
115            SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
116                assert_eq!(
117                    desc.name.as_str(),
118                    CDC_TABLE_NAME_COLUMN_NAME,
119                    "unexpected cdc meta column name"
120                );
121                Some(cdc_meta.full_table_name.as_str().into())
122            }
123
124            // For other cases, return `None`.
125            SourceColumnType::Meta | SourceColumnType::Normal => return None,
126        };
127
128        datum
129    }
130}
131
132/// Transaction control message. Currently only used by Debezium messages.
133#[derive(Debug)]
134pub enum TransactionControl {
135    Begin { id: Box<str> },
136    Commit { id: Box<str> },
137}
138
139/// The result of parsing a message.
140#[derive(Debug)]
141pub enum ParseResult {
142    /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
143    Rows,
144    /// A transaction control message is parsed.
145    TransactionControl(TransactionControl),
146
147    /// A schema change message is parsed.
148    SchemaChange(SchemaChangeEnvelope),
149}
150
151#[derive(Clone, Copy, Debug, PartialEq)]
152pub enum ParserFormat {
153    CanalJson,
154    Csv,
155    Json,
156    Maxwell,
157    Debezium,
158    DebeziumMongo,
159    Upsert,
160    Plain,
161}
162
163/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
164/// It consumes bytes of one individual message and produces parsed records.
165///
166/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only.
167pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
168    /// The column descriptors of the output chunk.
169    fn columns(&self) -> &[SourceColumnDesc];
170
171    /// The source context, used to report parsing error.
172    fn source_ctx(&self) -> &SourceContext;
173
174    /// The format of the specific parser.
175    fn parser_format(&self) -> ParserFormat;
176
177    /// Parse one record from the given `payload` and write rows to the `writer`.
178    ///
179    /// Returns error if **any** of the rows in the message failed to parse.
180    fn parse_one<'a>(
181        &'a mut self,
182        key: Option<Vec<u8>>,
183        payload: Option<Vec<u8>>,
184        writer: SourceStreamChunkRowWriter<'a>,
185    ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
186
187    /// Parse one record from the given `payload`, either write rows to the `writer` or interpret it
188    /// as a transaction control message.
189    ///
190    /// The default implementation forwards to [`ByteStreamSourceParser::parse_one`] for
191    /// non-transactional sources.
192    ///
193    /// Returns error if **any** of the rows in the message failed to parse.
194    fn parse_one_with_txn<'a>(
195        &'a mut self,
196        key: Option<Vec<u8>>,
197        payload: Option<Vec<u8>>,
198        writer: SourceStreamChunkRowWriter<'a>,
199    ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
200        self.parse_one(key, payload, writer)
201            .map_ok(|_| ParseResult::Rows)
202    }
203}
204
205#[easy_ext::ext(SourceParserIntoStreamExt)]
206impl<P: ByteStreamSourceParser> P {
207    /// Parse a `SourceMessage` stream into a [`StreamChunk`] stream.
208    ///
209    /// # Arguments
210    ///
211    /// - `msg_stream`: A stream of batches of `SourceMessage`.
212    ///
213    /// # Returns
214    ///
215    /// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed
216    /// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a
217    /// large transaction and `source_ctrl_opts.split_txn` is false.
218    pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
219        let actor_id = self.source_ctx().actor_id;
220        let source_id = self.source_ctx().source_id.as_raw_id();
221
222        // The stream will be long-lived. We use `instrument_with` here to create
223        // a new span for the polling of each chunk.
224        let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
225        parse_message_stream(self, msg_stream, source_ctrl_opts)
226            .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
227    }
228}
229
230// TODO: when upsert is disabled, how to filter those empty payload
231// Currently, an err is returned for non upsert with empty payload
232#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
233async fn parse_message_stream<P: ByteStreamSourceParser>(
234    mut parser: P,
235    msg_stream: BoxSourceMessageStream,
236    source_ctrl_opts: SourceCtrlOpts,
237) {
238    let mut chunk_builder =
239        SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
240
241    let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
242
243    #[for_await]
244    for batch in msg_stream {
245        // It's possible that the split is not active, which means the next batch may arrive
246        // very lately, so we should prefer emitting all records in current batch before the end
247        // of each iteration, instead of merging them with the next batch. An exception is when
248        // a transaction is not committed yet, in which yield when the transaction is committed.
249
250        let batch = batch?;
251        let batch_len = batch.len();
252
253        if batch_len == 0 {
254            continue;
255        }
256
257        if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
258            // This `.iter().all(...)` will short-circuit after seeing the first `false`, so in
259            // normal cases, this should only involve a constant time cost.
260
261            // Now we know that there is no data message in the batch, let's just emit the latest
262            // heartbeat message. Note that all messages in `batch` should belong to the same
263            // split, so we don't have to do a split to heartbeats mapping here.
264
265            let heartbeat_msg = batch.last().unwrap();
266            tracing::debug!(
267                offset = heartbeat_msg.offset,
268                "handling a heartbeat message"
269            );
270            chunk_builder.heartbeat(MessageMeta {
271                source_meta: &heartbeat_msg.meta,
272                split_id: &heartbeat_msg.split_id,
273                offset: &heartbeat_msg.offset,
274            });
275
276            for chunk in chunk_builder.consume_ready_chunks() {
277                yield chunk;
278            }
279            continue; // continue to next batch
280        }
281
282        // When we reach here, there is at least one data message in the batch. We should ignore all
283        // heartbeat messages.
284
285        let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
286        let process_time_ms = chrono::Utc::now().timestamp_millis();
287
288        for msg in batch {
289            if msg.is_cdc_heartbeat() {
290                // ignore heartbeat messages
291                continue;
292            }
293
294            // calculate process_time - event_time lag
295            if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
296                let lag_ms = process_time_ms - msg_meta.source_ts_ms;
297                // report to promethus
298                let full_table_name = msg_meta.full_table_name.clone();
299                let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
300                    .entry(full_table_name)
301                    .or_insert_with(|| {
302                        GLOBAL_SOURCE_METRICS
303                            .direct_cdc_event_lag_latency
304                            .with_guarded_label_values(&[&msg_meta.full_table_name])
305                    });
306                direct_cdc_event_lag_latency.observe(lag_ms as f64);
307            }
308
309            // Parse the message and write to the chunk builder, it's possible that the message
310            // contains multiple rows. When the chunk size reached the limit during parsing, the
311            // chunk builder may yield the chunk to `ready_chunks` and start a new chunk.
312            match parser
313                .parse_one_with_txn(
314                    msg.key,
315                    msg.payload,
316                    chunk_builder.row_writer().with_meta(MessageMeta {
317                        source_meta: &msg.meta,
318                        split_id: &msg.split_id,
319                        offset: &msg.offset,
320                    }),
321                )
322                .await
323            {
324                // It's possible that parsing multiple rows in a single message PARTIALLY failed.
325                // We still have to maintain the row number in this case.
326                res @ (Ok(ParseResult::Rows) | Err(_)) => {
327                    if let Err(error) = res {
328                        // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
329                        //       see #13105
330                        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
331                            LazyLock::new(LogSuppressor::default);
332                        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
333                            tracing::error!(
334                                error = %error.as_report(),
335                                split_id = &*msg.split_id,
336                                offset = msg.offset,
337                                suppressed_count,
338                                "failed to parse message, skipping"
339                            );
340                        }
341
342                        // report to error metrics
343                        let context = parser.source_ctx();
344                        GLOBAL_ERROR_METRICS.user_source_error.report([
345                            error.variant_name().to_owned(),
346                            context.source_id.to_string(),
347                            context.source_name.clone(),
348                            context.fragment_id.to_string(),
349                        ]);
350                    }
351
352                    for chunk in chunk_builder.consume_ready_chunks() {
353                        yield chunk;
354                    }
355                }
356
357                Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
358                    TransactionControl::Begin { id } => {
359                        chunk_builder.begin_transaction(id);
360                    }
361                    TransactionControl::Commit { id } => {
362                        chunk_builder.commit_transaction(id);
363                        assert!(!chunk_builder.is_in_transaction());
364
365                        if txn_started_in_last_batch {
366                            // If a transaction is across multiple batches, we yield the chunk
367                            // immediately after the transaction is committed.
368                            chunk_builder.finish_current_chunk();
369                            txn_started_in_last_batch = false;
370                        }
371
372                        for chunk in chunk_builder.consume_ready_chunks() {
373                            yield chunk;
374                        }
375                    }
376                },
377
378                Ok(ParseResult::SchemaChange(schema_change)) => {
379                    if schema_change.is_empty() {
380                        continue;
381                    }
382
383                    let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
384                    // we bubble up the schema change event to the source executor via channel,
385                    // and wait for the source executor to finish the schema change process before
386                    // parsing the following messages.
387                    if let Some(ref tx) = parser.source_ctx().schema_change_tx {
388                        tx.send((schema_change, oneshot_tx))
389                            .await
390                            .expect("send schema change to executor");
391                        match oneshot_rx.await {
392                            Ok(()) => {}
393                            Err(e) => {
394                                tracing::error!(error = %e.as_report(), "failed to wait for schema change");
395                            }
396                        }
397                    }
398                }
399            }
400        }
401
402        // Finish the remaining records in the batch.
403        if !chunk_builder.is_in_transaction() {
404            chunk_builder.finish_current_chunk();
405        }
406        for chunk in chunk_builder.consume_ready_chunks() {
407            yield chunk;
408        }
409    }
410}
411
412#[derive(Debug)]
413pub enum EncodingType {
414    Key,
415    Value,
416}
417
418/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`StreamChunk`] stream.
419/// Used by [`crate::source::into_chunk_stream`].
420#[derive(Debug)]
421pub enum ByteStreamSourceParserImpl {
422    Csv(CsvParser),
423    Debezium(DebeziumParser),
424    Plain(PlainParser),
425    Upsert(UpsertParser),
426    DebeziumMongoJson(DebeziumMongoJsonParser),
427    Maxwell(MaxwellParser),
428    CanalJson(CanalJsonParser),
429}
430
431impl ByteStreamSourceParserImpl {
432    /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream.
433    pub fn parse_stream(
434        self,
435        msg_stream: BoxSourceMessageStream,
436    ) -> impl SourceChunkStream + Unpin {
437        #[auto_enum(futures03::Stream)]
438        let stream = match self {
439            Self::Csv(parser) => parser.parse_stream(msg_stream),
440            Self::Debezium(parser) => parser.parse_stream(msg_stream),
441            Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
442            Self::Maxwell(parser) => parser.parse_stream(msg_stream),
443            Self::CanalJson(parser) => parser.parse_stream(msg_stream),
444            Self::Plain(parser) => parser.parse_stream(msg_stream),
445            Self::Upsert(parser) => parser.parse_stream(msg_stream),
446        };
447        Box::pin(stream)
448    }
449}
450
451impl ByteStreamSourceParserImpl {
452    pub async fn create(
453        parser_config: ParserConfig,
454        source_ctx: SourceContextRef,
455    ) -> ConnectorResult<Self> {
456        let CommonParserConfig { rw_columns } = parser_config.common;
457        let protocol = &parser_config.specific.protocol_config;
458        let encode = &parser_config.specific.encoding_config;
459        match (protocol, encode) {
460            (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
461                CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
462            }
463            (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
464                DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
465                    .map(Self::DebeziumMongoJson)
466            }
467            (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
468                CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
469            }
470            (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
471            (ProtocolProperties::Upsert, _) => {
472                let parser =
473                    UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
474                Ok(Self::Upsert(parser))
475            }
476            (ProtocolProperties::Plain, _) => {
477                let parser =
478                    PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
479                Ok(Self::Plain(parser))
480            }
481            (ProtocolProperties::Debezium(_), _) => {
482                let parser =
483                    DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
484                Ok(Self::Debezium(parser))
485            }
486            (ProtocolProperties::Maxwell, _) => {
487                let parser =
488                    MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
489                Ok(Self::Maxwell(parser))
490            }
491            _ => unreachable!(),
492        }
493    }
494
495    /// Create a parser for testing purposes.
496    pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
497        futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
498    }
499}
500
501/// Test utilities for [`ByteStreamSourceParserImpl`].
502#[cfg(test)]
503pub mod test_utils {
504    use futures::StreamExt;
505    use itertools::Itertools;
506
507    use super::*;
508    use crate::source::SourceMessage;
509
510    #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
511    pub(crate) impl ByteStreamSourceParserImpl {
512        /// Parse the given payloads into a [`StreamChunk`].
513        async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
514            let source_messages = payloads
515                .into_iter()
516                .map(|p| SourceMessage {
517                    payload: (!p.is_empty()).then_some(p),
518                    ..SourceMessage::dummy()
519                })
520                .collect_vec();
521
522            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
523                .next()
524                .await
525                .unwrap()
526                .unwrap()
527        }
528
529        /// Parse the given key-value pairs into a [`StreamChunk`].
530        async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
531            let source_messages = kvs
532                .into_iter()
533                .map(|(k, v)| SourceMessage {
534                    key: (!k.is_empty()).then_some(k),
535                    payload: (!v.is_empty()).then_some(v),
536                    ..SourceMessage::dummy()
537                })
538                .collect_vec();
539
540            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
541                .next()
542                .await
543                .unwrap()
544                .unwrap()
545        }
546    }
547}