risingwave_connector/parser/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, TryFutureExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::array::StreamChunk;
31use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
32use risingwave_common::log::LogSuppresser;
33use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
34use risingwave_common::types::{DatumCow, DatumRef};
35use risingwave_common::util::tracing::InstrumentStream;
36use risingwave_connector_codec::decoder::avro::MapHandling;
37use thiserror_ext::AsReport;
38
39pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
40use self::plain_parser::PlainParser;
41pub use self::postgres::postgres_row_to_owned_row;
42pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
43pub use self::unified::Access;
44pub use self::unified::json::{JsonAccess, TimestamptzHandling};
45use self::upsert_parser::UpsertParser;
46use crate::error::ConnectorResult;
47use crate::parser::maxwell::MaxwellParser;
48use crate::schema::schema_registry::SchemaRegistryAuth;
49use crate::source::monitor::GLOBAL_SOURCE_METRICS;
50use crate::source::{
51    BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
52    SourceContextRef, SourceCtrlOpts, SourceMeta,
53};
54
55mod access_builder;
56pub mod additional_columns;
57mod avro;
58mod bytes_parser;
59mod canal;
60mod chunk_builder;
61mod config;
62mod csv_parser;
63mod debezium;
64mod json_parser;
65mod maxwell;
66mod mysql;
67pub mod parquet_parser;
68pub mod plain_parser;
69mod postgres;
70mod protobuf;
71pub mod scalar_adapter;
72mod sql_server;
73mod unified;
74mod upsert_parser;
75mod utils;
76
77use access_builder::{AccessBuilder, AccessBuilderImpl};
78pub use config::*;
79pub use debezium::DEBEZIUM_IGNORE_KEY;
80use debezium::schema_change::SchemaChangeEnvelope;
81pub use unified::{AccessError, AccessResult};
82
83/// The meta data of the original message for a row writer.
84///
85/// Extracted from the `SourceMessage`.
86#[derive(Clone, Copy, Debug)]
87pub struct MessageMeta<'a> {
88    source_meta: &'a SourceMeta,
89    split_id: &'a str,
90    offset: &'a str,
91}
92
93impl<'a> MessageMeta<'a> {
94    /// Extract the value for the given column.
95    ///
96    /// Returns `None` if the column is not a meta column.
97    fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
98        let datum: DatumRef<'_> = match desc.column_type {
99            // Row id columns are filled with `NULL` here and will be filled with the real
100            // row id generated by `RowIdGenExecutor` later.
101            SourceColumnType::RowId => None,
102            // Extract the offset from the meta data.
103            SourceColumnType::Offset => Some(self.offset.into()),
104            // Extract custom meta data per connector.
105            SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
106                assert_eq!(
107                    desc.name.as_str(),
108                    KAFKA_TIMESTAMP_COLUMN_NAME,
109                    "unexpected kafka meta column name"
110                );
111                kafka_meta.extract_timestamp()
112            }
113            SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
114                assert_eq!(
115                    desc.name.as_str(),
116                    CDC_TABLE_NAME_COLUMN_NAME,
117                    "unexpected cdc meta column name"
118                );
119                Some(cdc_meta.full_table_name.as_str().into())
120            }
121
122            // For other cases, return `None`.
123            SourceColumnType::Meta | SourceColumnType::Normal => return None,
124        };
125
126        datum
127    }
128}
129
130/// Transaction control message. Currently only used by Debezium messages.
131#[derive(Debug)]
132pub enum TransactionControl {
133    Begin { id: Box<str> },
134    Commit { id: Box<str> },
135}
136
137/// The result of parsing a message.
138#[derive(Debug)]
139pub enum ParseResult {
140    /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
141    Rows,
142    /// A transaction control message is parsed.
143    TransactionControl(TransactionControl),
144
145    /// A schema change message is parsed.
146    SchemaChange(SchemaChangeEnvelope),
147}
148
149#[derive(Clone, Copy, Debug, PartialEq)]
150pub enum ParserFormat {
151    CanalJson,
152    Csv,
153    Json,
154    Maxwell,
155    Debezium,
156    DebeziumMongo,
157    Upsert,
158    Plain,
159}
160
161/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
162/// It consumes bytes of one individual message and produces parsed records.
163///
164/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only.
165pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
166    /// The column descriptors of the output chunk.
167    fn columns(&self) -> &[SourceColumnDesc];
168
169    /// The source context, used to report parsing error.
170    fn source_ctx(&self) -> &SourceContext;
171
172    /// The format of the specific parser.
173    fn parser_format(&self) -> ParserFormat;
174
175    /// Parse one record from the given `payload` and write rows to the `writer`.
176    ///
177    /// Returns error if **any** of the rows in the message failed to parse.
178    fn parse_one<'a>(
179        &'a mut self,
180        key: Option<Vec<u8>>,
181        payload: Option<Vec<u8>>,
182        writer: SourceStreamChunkRowWriter<'a>,
183    ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
184
185    /// Parse one record from the given `payload`, either write rows to the `writer` or interpret it
186    /// as a transaction control message.
187    ///
188    /// The default implementation forwards to [`ByteStreamSourceParser::parse_one`] for
189    /// non-transactional sources.
190    ///
191    /// Returns error if **any** of the rows in the message failed to parse.
192    fn parse_one_with_txn<'a>(
193        &'a mut self,
194        key: Option<Vec<u8>>,
195        payload: Option<Vec<u8>>,
196        writer: SourceStreamChunkRowWriter<'a>,
197    ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
198        self.parse_one(key, payload, writer)
199            .map_ok(|_| ParseResult::Rows)
200    }
201}
202
203#[easy_ext::ext(SourceParserIntoStreamExt)]
204impl<P: ByteStreamSourceParser> P {
205    /// Parse a `SourceMessage` stream into a [`StreamChunk`] stream.
206    ///
207    /// # Arguments
208    ///
209    /// - `msg_stream`: A stream of batches of `SourceMessage`.
210    ///
211    /// # Returns
212    ///
213    /// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed
214    /// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a
215    /// large transaction and `source_ctrl_opts.split_txn` is false.
216    pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
217        let actor_id = self.source_ctx().actor_id;
218        let source_id = self.source_ctx().source_id.table_id();
219
220        // The stream will be long-lived. We use `instrument_with` here to create
221        // a new span for the polling of each chunk.
222        let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
223        parse_message_stream(self, msg_stream, source_ctrl_opts)
224            .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
225    }
226}
227
228// TODO: when upsert is disabled, how to filter those empty payload
229// Currently, an err is returned for non upsert with empty payload
230#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
231async fn parse_message_stream<P: ByteStreamSourceParser>(
232    mut parser: P,
233    msg_stream: BoxSourceMessageStream,
234    source_ctrl_opts: SourceCtrlOpts,
235) {
236    let mut chunk_builder =
237        SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
238
239    let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
240
241    #[for_await]
242    for batch in msg_stream {
243        // It's possible that the split is not active, which means the next batch may arrive
244        // very lately, so we should prefer emitting all records in current batch before the end
245        // of each iteration, instead of merging them with the next batch. An exception is when
246        // a transaction is not committed yet, in which yield when the transaction is committed.
247
248        let batch = batch?;
249        let batch_len = batch.len();
250
251        if batch_len == 0 {
252            continue;
253        }
254
255        if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
256            // This `.iter().all(...)` will short-circuit after seeing the first `false`, so in
257            // normal cases, this should only involve a constant time cost.
258
259            // Now we know that there is no data message in the batch, let's just emit the latest
260            // heartbeat message. Note that all messages in `batch` should belong to the same
261            // split, so we don't have to do a split to heartbeats mapping here.
262
263            let heartbeat_msg = batch.last().unwrap();
264            tracing::debug!(
265                offset = heartbeat_msg.offset,
266                "handling a heartbeat message"
267            );
268            chunk_builder.heartbeat(MessageMeta {
269                source_meta: &heartbeat_msg.meta,
270                split_id: &heartbeat_msg.split_id,
271                offset: &heartbeat_msg.offset,
272            });
273
274            for chunk in chunk_builder.consume_ready_chunks() {
275                yield chunk;
276            }
277            continue; // continue to next batch
278        }
279
280        // When we reach here, there is at least one data message in the batch. We should ignore all
281        // heartbeat messages.
282
283        let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
284        let process_time_ms = chrono::Utc::now().timestamp_millis();
285
286        for msg in batch {
287            if msg.is_cdc_heartbeat() {
288                // ignore heartbeat messages
289                continue;
290            }
291
292            // calculate process_time - event_time lag
293            if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
294                let lag_ms = process_time_ms - msg_meta.source_ts_ms;
295                // report to promethus
296                let full_table_name = msg_meta.full_table_name.clone();
297                let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
298                    .entry(full_table_name)
299                    .or_insert_with(|| {
300                        GLOBAL_SOURCE_METRICS
301                            .direct_cdc_event_lag_latency
302                            .with_guarded_label_values(&[&msg_meta.full_table_name])
303                    });
304                direct_cdc_event_lag_latency.observe(lag_ms as f64);
305            }
306
307            // Parse the message and write to the chunk builder, it's possible that the message
308            // contains multiple rows. When the chunk size reached the limit during parsing, the
309            // chunk builder may yield the chunk to `ready_chunks` and start a new chunk.
310            match parser
311                .parse_one_with_txn(
312                    msg.key,
313                    msg.payload,
314                    chunk_builder.row_writer().with_meta(MessageMeta {
315                        source_meta: &msg.meta,
316                        split_id: &msg.split_id,
317                        offset: &msg.offset,
318                    }),
319                )
320                .await
321            {
322                // It's possible that parsing multiple rows in a single message PARTIALLY failed.
323                // We still have to maintain the row number in this case.
324                res @ (Ok(ParseResult::Rows) | Err(_)) => {
325                    if let Err(error) = res {
326                        // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
327                        //       see #13105
328                        static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
329                            LazyLock::new(LogSuppresser::default);
330                        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
331                            tracing::error!(
332                                error = %error.as_report(),
333                                split_id = &*msg.split_id,
334                                offset = msg.offset,
335                                suppressed_count,
336                                "failed to parse message, skipping"
337                            );
338                        }
339
340                        // report to error metrics
341                        let context = parser.source_ctx();
342                        GLOBAL_ERROR_METRICS.user_source_error.report([
343                            error.variant_name().to_owned(),
344                            context.source_id.to_string(),
345                            context.source_name.clone(),
346                            context.fragment_id.to_string(),
347                        ]);
348                    }
349
350                    for chunk in chunk_builder.consume_ready_chunks() {
351                        yield chunk;
352                    }
353                }
354
355                Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
356                    TransactionControl::Begin { id } => {
357                        chunk_builder.begin_transaction(id);
358                    }
359                    TransactionControl::Commit { id } => {
360                        chunk_builder.commit_transaction(id);
361                        assert!(!chunk_builder.is_in_transaction());
362
363                        if txn_started_in_last_batch {
364                            // If a transaction is across multiple batches, we yield the chunk
365                            // immediately after the transaction is committed.
366                            chunk_builder.finish_current_chunk();
367                            txn_started_in_last_batch = false;
368                        }
369
370                        for chunk in chunk_builder.consume_ready_chunks() {
371                            yield chunk;
372                        }
373                    }
374                },
375
376                Ok(ParseResult::SchemaChange(schema_change)) => {
377                    if schema_change.is_empty() {
378                        continue;
379                    }
380
381                    let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
382                    // we bubble up the schema change event to the source executor via channel,
383                    // and wait for the source executor to finish the schema change process before
384                    // parsing the following messages.
385                    if let Some(ref tx) = parser.source_ctx().schema_change_tx {
386                        tx.send((schema_change, oneshot_tx))
387                            .await
388                            .expect("send schema change to executor");
389                        match oneshot_rx.await {
390                            Ok(()) => {}
391                            Err(e) => {
392                                tracing::error!(error = %e.as_report(), "failed to wait for schema change");
393                            }
394                        }
395                    }
396                }
397            }
398        }
399
400        // Finish the remaining records in the batch.
401        if !chunk_builder.is_in_transaction() {
402            chunk_builder.finish_current_chunk();
403        }
404        for chunk in chunk_builder.consume_ready_chunks() {
405            yield chunk;
406        }
407    }
408}
409
410#[derive(Debug)]
411pub enum EncodingType {
412    Key,
413    Value,
414}
415
416/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`StreamChunk`] stream.
417/// Used by [`crate::source::into_chunk_stream`].
418#[derive(Debug)]
419pub enum ByteStreamSourceParserImpl {
420    Csv(CsvParser),
421    Debezium(DebeziumParser),
422    Plain(PlainParser),
423    Upsert(UpsertParser),
424    DebeziumMongoJson(DebeziumMongoJsonParser),
425    Maxwell(MaxwellParser),
426    CanalJson(CanalJsonParser),
427}
428
429impl ByteStreamSourceParserImpl {
430    /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream.
431    pub fn parse_stream(
432        self,
433        msg_stream: BoxSourceMessageStream,
434    ) -> impl SourceChunkStream + Unpin {
435        #[auto_enum(futures03::Stream)]
436        let stream = match self {
437            Self::Csv(parser) => parser.parse_stream(msg_stream),
438            Self::Debezium(parser) => parser.parse_stream(msg_stream),
439            Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
440            Self::Maxwell(parser) => parser.parse_stream(msg_stream),
441            Self::CanalJson(parser) => parser.parse_stream(msg_stream),
442            Self::Plain(parser) => parser.parse_stream(msg_stream),
443            Self::Upsert(parser) => parser.parse_stream(msg_stream),
444        };
445        Box::pin(stream)
446    }
447}
448
449impl ByteStreamSourceParserImpl {
450    pub async fn create(
451        parser_config: ParserConfig,
452        source_ctx: SourceContextRef,
453    ) -> ConnectorResult<Self> {
454        let CommonParserConfig { rw_columns } = parser_config.common;
455        let protocol = &parser_config.specific.protocol_config;
456        let encode = &parser_config.specific.encoding_config;
457        match (protocol, encode) {
458            (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
459                CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
460            }
461            (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
462                DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
463                    .map(Self::DebeziumMongoJson)
464            }
465            (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
466                CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
467            }
468            (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
469            (ProtocolProperties::Upsert, _) => {
470                let parser =
471                    UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
472                Ok(Self::Upsert(parser))
473            }
474            (ProtocolProperties::Plain, _) => {
475                let parser =
476                    PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
477                Ok(Self::Plain(parser))
478            }
479            (ProtocolProperties::Debezium(_), _) => {
480                let parser =
481                    DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
482                Ok(Self::Debezium(parser))
483            }
484            (ProtocolProperties::Maxwell, _) => {
485                let parser =
486                    MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
487                Ok(Self::Maxwell(parser))
488            }
489            _ => unreachable!(),
490        }
491    }
492
493    /// Create a parser for testing purposes.
494    pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
495        futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
496    }
497}
498
499/// Test utilities for [`ByteStreamSourceParserImpl`].
500#[cfg(test)]
501pub mod test_utils {
502    use futures::StreamExt;
503    use itertools::Itertools;
504
505    use super::*;
506    use crate::source::SourceMessage;
507
508    #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
509    pub(crate) impl ByteStreamSourceParserImpl {
510        /// Parse the given payloads into a [`StreamChunk`].
511        async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
512            let source_messages = payloads
513                .into_iter()
514                .map(|p| SourceMessage {
515                    payload: (!p.is_empty()).then_some(p),
516                    ..SourceMessage::dummy()
517                })
518                .collect_vec();
519
520            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
521                .next()
522                .await
523                .unwrap()
524                .unwrap()
525        }
526
527        /// Parse the given key-value pairs into a [`StreamChunk`].
528        async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
529            let source_messages = kvs
530                .into_iter()
531                .map(|(k, v)| SourceMessage {
532                    key: (!k.is_empty()).then_some(k),
533                    payload: (!v.is_empty()).then_some(v),
534                    ..SourceMessage::dummy()
535                })
536                .collect_vec();
537
538            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
539                .next()
540                .await
541                .unwrap()
542                .unwrap()
543        }
544    }
545}