risingwave_connector/parser/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::LazyLock;
18
19use auto_enums::auto_enum;
20pub use avro::AvroParserConfig;
21pub use canal::*;
22pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
23use csv_parser::CsvParser;
24pub use debezium::*;
25use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
26use futures_async_stream::try_stream;
27pub use json_parser::*;
28pub use parquet_parser::ParquetParser;
29pub use protobuf::*;
30use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
31use risingwave_common::log::LogSuppressor;
32use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
33use risingwave_common::types::{DatumCow, DatumRef};
34use risingwave_common::util::tracing::InstrumentStream;
35use risingwave_connector_codec::decoder::avro::MapHandling;
36use thiserror_ext::AsReport;
37
38pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
39use self::plain_parser::PlainParser;
40pub use self::postgres::{postgres_cell_to_scalar_impl, postgres_row_to_owned_row};
41pub use self::sql_server::{ScalarImplTiberiusWrapper, sql_server_row_to_owned_row};
42pub use self::unified::Access;
43pub use self::unified::json::{
44    BigintUnsignedHandlingMode, JsonAccess, TimeHandling, TimestampHandling, TimestamptzHandling,
45};
46use self::upsert_parser::UpsertParser;
47use crate::error::ConnectorResult;
48use crate::parser::maxwell::MaxwellParser;
49use crate::schema::schema_registry::SchemaRegistryConfig;
50use crate::source::monitor::GLOBAL_SOURCE_METRICS;
51use crate::source::{
52    BoxSourceMessageEventStream, SourceChunkStream, SourceColumnDesc, SourceColumnType,
53    SourceContext, SourceContextRef, SourceCtrlOpts, SourceMessageEvent, SourceMeta,
54    SourceReaderEvent,
55};
56
57mod access_builder;
58pub mod additional_columns;
59mod avro;
60mod bytes_parser;
61mod canal;
62mod chunk_builder;
63mod config;
64mod csv_parser;
65mod debezium;
66mod json_parser;
67mod maxwell;
68mod mysql;
69pub mod parquet_parser;
70pub mod plain_parser;
71mod postgres;
72mod protobuf;
73pub mod scalar_adapter;
74mod sql_server;
75mod unified;
76mod upsert_parser;
77mod utils;
78
79use access_builder::{AccessBuilder, AccessBuilderImpl};
80pub use config::*;
81
82pub(crate) fn into_data_chunk_stream(
83    stream: impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send + 'static,
84) -> impl SourceChunkStream {
85    stream
86        .try_filter_map(|event| async move {
87            Ok(match event {
88                SourceReaderEvent::DataChunk(chunk) => Some(chunk),
89                SourceReaderEvent::SplitProgress(_) => None,
90            })
91        })
92        .boxed()
93}
94pub use debezium::DEBEZIUM_IGNORE_KEY;
95use debezium::schema_change::SchemaChangeEnvelope;
96pub use unified::{AccessError, AccessResult};
97
98/// The meta data of the original message for a row writer.
99///
100/// Extracted from the `SourceMessage`.
101#[derive(Clone, Copy, Debug)]
102pub struct MessageMeta<'a> {
103    source_meta: &'a SourceMeta,
104    split_id: &'a str,
105    offset: &'a str,
106}
107
108impl<'a> MessageMeta<'a> {
109    /// Extract the value for the given column.
110    ///
111    /// Returns `None` if the column is not a meta column.
112    fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
113        let datum: DatumRef<'_> = match desc.column_type {
114            // Row id columns are filled with `NULL` here and will be filled with the real
115            // row id generated by `RowIdGenExecutor` later.
116            SourceColumnType::RowId => None,
117            // Extract the offset from the meta data.
118            SourceColumnType::Offset => Some(self.offset.into()),
119            // Extract custom meta data per connector.
120            SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
121                assert_eq!(
122                    desc.name.as_str(),
123                    KAFKA_TIMESTAMP_COLUMN_NAME,
124                    "unexpected kafka meta column name"
125                );
126                kafka_meta.extract_timestamp()
127            }
128            SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
129                assert_eq!(
130                    desc.name.as_str(),
131                    CDC_TABLE_NAME_COLUMN_NAME,
132                    "unexpected cdc meta column name"
133                );
134                // For CDC sources, extract_table_name() strips the database prefix:
135                cdc_meta.extract_table_name()
136            }
137
138            // For other cases, return `None`.
139            SourceColumnType::Meta | SourceColumnType::Normal => return None,
140        };
141
142        datum
143    }
144}
145
146/// Transaction control message. Currently only used by Debezium messages.
147#[derive(Debug)]
148pub enum TransactionControl {
149    Begin { id: Box<str> },
150    Commit { id: Box<str> },
151}
152
153/// The result of parsing a message.
154#[derive(Debug)]
155pub enum ParseResult {
156    /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
157    Rows,
158    /// A transaction control message is parsed.
159    TransactionControl(TransactionControl),
160
161    /// A schema change message is parsed.
162    SchemaChange(SchemaChangeEnvelope),
163}
164
165#[derive(Clone, Copy, Debug, PartialEq)]
166pub enum ParserFormat {
167    CanalJson,
168    Csv,
169    Json,
170    Maxwell,
171    Debezium,
172    DebeziumMongo,
173    Upsert,
174    Plain,
175}
176
177/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
178/// It consumes bytes of one individual message and produces parsed records.
179///
180/// It's used by [`ByteStreamSourceParserImpl::parse_stream_with_events`]. `pub` is for benchmark only.
181pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
182    /// The column descriptors of the output chunk.
183    fn columns(&self) -> &[SourceColumnDesc];
184
185    /// The source context, used to report parsing error.
186    fn source_ctx(&self) -> &SourceContext;
187
188    /// The format of the specific parser.
189    fn parser_format(&self) -> ParserFormat;
190
191    /// Parse one record from the given `payload` and write rows to the `writer`.
192    ///
193    /// Returns error if **any** of the rows in the message failed to parse.
194    fn parse_one<'a>(
195        &'a mut self,
196        key: Option<Vec<u8>>,
197        payload: Option<Vec<u8>>,
198        writer: SourceStreamChunkRowWriter<'a>,
199    ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;
200
201    /// Parse one record from the given `payload`, either write rows to the `writer` or interpret it
202    /// as a transaction control message.
203    ///
204    /// The default implementation forwards to [`ByteStreamSourceParser::parse_one`] for
205    /// non-transactional sources.
206    ///
207    /// Returns error if **any** of the rows in the message failed to parse.
208    fn parse_one_with_txn<'a>(
209        &'a mut self,
210        key: Option<Vec<u8>>,
211        payload: Option<Vec<u8>>,
212        writer: SourceStreamChunkRowWriter<'a>,
213    ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
214        self.parse_one(key, payload, writer)
215            .map_ok(|_| ParseResult::Rows)
216    }
217}
218
219#[easy_ext::ext(SourceParserIntoStreamExt)]
220impl<P: ByteStreamSourceParser> P {
221    pub fn parse_stream_with_events(
222        self,
223        msg_stream: BoxSourceMessageEventStream,
224    ) -> impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send {
225        let actor_id = self.source_ctx().actor_id;
226        let source_id = self.source_ctx().source_id.as_raw_id();
227
228        // The stream will be long-lived. We use `instrument_with` here to create
229        // a new span for the polling of each chunk.
230        let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
231        parse_message_stream(self, msg_stream, source_ctrl_opts).instrument_with(
232            move || tracing::info_span!("source_parse_chunk", %actor_id, source_id),
233        )
234    }
235}
236
237// TODO: when upsert is disabled, how to filter those empty payload
238// Currently, an err is returned for non upsert with empty payload
239#[try_stream(ok = SourceReaderEvent, error = crate::error::ConnectorError)]
240async fn parse_message_stream<P: ByteStreamSourceParser>(
241    mut parser: P,
242    msg_stream: BoxSourceMessageEventStream,
243    source_ctrl_opts: SourceCtrlOpts,
244) {
245    let mut chunk_builder =
246        SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);
247
248    let mut direct_cdc_event_lag_latency_metrics = HashMap::new();
249
250    #[for_await]
251    for event in msg_stream {
252        // It's possible that the split is not active, which means the next batch may arrive
253        // very lately, so we should prefer emitting all records in current batch before the end
254        // of each iteration, instead of merging them with the next batch. An exception is when
255        // a transaction is not committed yet, in which yield when the transaction is committed.
256
257        let batch = match event? {
258            SourceMessageEvent::Data(batch) => batch,
259            SourceMessageEvent::SplitProgress(progress) => {
260                yield SourceReaderEvent::SplitProgress(progress);
261                continue;
262            }
263        };
264        let batch_len = batch.len();
265        if batch_len == 0 {
266            continue;
267        }
268
269        let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
270        let process_time_ms = chrono::Utc::now().timestamp_millis();
271        let mut is_heartbeat_emitted = false;
272        for msg in batch {
273            if msg.is_cdc_heartbeat() {
274                if !is_heartbeat_emitted {
275                    tracing::debug!(offset = msg.offset, "handling a heartbeat message");
276                    chunk_builder.heartbeat(MessageMeta {
277                        source_meta: &msg.meta,
278                        split_id: &msg.split_id,
279                        offset: &msg.offset,
280                    });
281                    for chunk in chunk_builder.consume_ready_chunks() {
282                        yield SourceReaderEvent::DataChunk(chunk);
283                    }
284                    is_heartbeat_emitted = true;
285                }
286                continue;
287            }
288
289            // calculate process_time - event_time lag
290            if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
291                let lag_ms = process_time_ms - msg_meta.source_ts_ms;
292                // report to promethus
293                let full_table_name = msg_meta.full_table_name.clone();
294                let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
295                    .entry(full_table_name)
296                    .or_insert_with(|| {
297                        GLOBAL_SOURCE_METRICS
298                            .direct_cdc_event_lag_latency
299                            .with_guarded_label_values(&[&msg_meta.full_table_name])
300                    });
301                direct_cdc_event_lag_latency.observe(lag_ms as f64);
302            }
303
304            // Parse the message and write to the chunk builder, it's possible that the message
305            // contains multiple rows. When the chunk size reached the limit during parsing, the
306            // chunk builder may yield the chunk to `ready_chunks` and start a new chunk.
307            match parser
308                .parse_one_with_txn(
309                    msg.key,
310                    msg.payload,
311                    chunk_builder.row_writer().with_meta(MessageMeta {
312                        source_meta: &msg.meta,
313                        split_id: &msg.split_id,
314                        offset: &msg.offset,
315                    }),
316                )
317                .await
318            {
319                // It's possible that parsing multiple rows in a single message PARTIALLY failed.
320                // We still have to maintain the row number in this case.
321                res @ (Ok(ParseResult::Rows) | Err(_)) => {
322                    if let Err(error) = res {
323                        // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
324                        //       see #13105
325                        static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
326                            LazyLock::new(LogSuppressor::default);
327                        if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
328                            tracing::error!(
329                                error = %error.as_report(),
330                                split_id = &*msg.split_id,
331                                offset = msg.offset,
332                                suppressed_count,
333                                "failed to parse message, skipping"
334                            );
335                        }
336
337                        // report to error metrics
338                        let context = parser.source_ctx();
339                        GLOBAL_ERROR_METRICS.user_source_error.report([
340                            error.variant_name().to_owned(),
341                            context.source_id.to_string(),
342                            context.source_name.clone(),
343                            context.fragment_id.to_string(),
344                        ]);
345                    }
346
347                    for chunk in chunk_builder.consume_ready_chunks() {
348                        yield SourceReaderEvent::DataChunk(chunk);
349                    }
350                }
351
352                Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
353                    TransactionControl::Begin { id } => {
354                        chunk_builder.begin_transaction(id);
355                    }
356                    TransactionControl::Commit { id } => {
357                        chunk_builder.commit_transaction(id);
358                        assert!(!chunk_builder.is_in_transaction());
359
360                        if txn_started_in_last_batch {
361                            // If a transaction is across multiple batches, we yield the chunk
362                            // immediately after the transaction is committed.
363                            chunk_builder.finish_current_chunk();
364                            txn_started_in_last_batch = false;
365                        }
366
367                        for chunk in chunk_builder.consume_ready_chunks() {
368                            yield SourceReaderEvent::DataChunk(chunk);
369                        }
370                    }
371                },
372
373                Ok(ParseResult::SchemaChange(schema_change)) => {
374                    if schema_change.is_empty() {
375                        continue;
376                    }
377
378                    let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
379                    // we bubble up the schema change event to the source executor via channel,
380                    // and wait for the source executor to finish the schema change process before
381                    // parsing the following messages.
382                    if let Some(ref tx) = parser.source_ctx().schema_change_tx {
383                        tx.send((schema_change, oneshot_tx))
384                            .await
385                            .expect("send schema change to executor");
386                        match oneshot_rx.await {
387                            Ok(()) => {}
388                            Err(e) => {
389                                tracing::error!(error = %e.as_report(), "failed to wait for schema change");
390                            }
391                        }
392                    }
393                }
394            }
395        }
396
397        // Finish the remaining records in the batch.
398        if !chunk_builder.is_in_transaction() {
399            chunk_builder.finish_current_chunk();
400        }
401        for chunk in chunk_builder.consume_ready_chunks() {
402            yield SourceReaderEvent::DataChunk(chunk);
403        }
404    }
405}
406
407#[derive(Debug)]
408pub enum EncodingType {
409    Key,
410    Value,
411}
412
413/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`risingwave_common::array::StreamChunk`] stream.
414/// Used by [`crate::source::into_chunk_stream`].
415#[derive(Debug)]
416pub enum ByteStreamSourceParserImpl {
417    Csv(CsvParser),
418    Debezium(DebeziumParser),
419    Plain(PlainParser),
420    Upsert(UpsertParser),
421    DebeziumMongoJson(DebeziumMongoJsonParser),
422    Maxwell(MaxwellParser),
423    CanalJson(CanalJsonParser),
424}
425
426impl ByteStreamSourceParserImpl {
427    pub fn parse_stream_with_events(
428        self,
429        msg_stream: BoxSourceMessageEventStream,
430    ) -> impl Stream<Item = ConnectorResult<SourceReaderEvent>> + Send {
431        #[auto_enum(futures03::Stream)]
432        let stream = match self {
433            Self::Csv(parser) => parser.parse_stream_with_events(msg_stream),
434            Self::Debezium(parser) => parser.parse_stream_with_events(msg_stream),
435            Self::DebeziumMongoJson(parser) => parser.parse_stream_with_events(msg_stream),
436            Self::Maxwell(parser) => parser.parse_stream_with_events(msg_stream),
437            Self::CanalJson(parser) => parser.parse_stream_with_events(msg_stream),
438            Self::Plain(parser) => parser.parse_stream_with_events(msg_stream),
439            Self::Upsert(parser) => parser.parse_stream_with_events(msg_stream),
440        };
441        Box::pin(stream)
442    }
443}
444
445impl ByteStreamSourceParserImpl {
446    pub async fn create(
447        parser_config: ParserConfig,
448        source_ctx: SourceContextRef,
449    ) -> ConnectorResult<Self> {
450        let CommonParserConfig { rw_columns } = parser_config.common;
451        let protocol = &parser_config.specific.protocol_config;
452        let encode = &parser_config.specific.encoding_config;
453        match (protocol, encode) {
454            (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
455                CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
456            }
457            (ProtocolProperties::DebeziumMongo, EncodingProperties::MongoJson(props)) => {
458                DebeziumMongoJsonParser::new(rw_columns, source_ctx, props.clone())
459                    .map(Self::DebeziumMongoJson)
460            }
461            (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
462                CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
463            }
464            (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
465            (ProtocolProperties::Upsert, _) => {
466                let parser =
467                    UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
468                Ok(Self::Upsert(parser))
469            }
470            (ProtocolProperties::Plain, _) => {
471                let parser =
472                    PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
473                Ok(Self::Plain(parser))
474            }
475            (ProtocolProperties::Debezium(_), _) => {
476                let parser =
477                    DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
478                Ok(Self::Debezium(parser))
479            }
480            (ProtocolProperties::Maxwell, _) => {
481                let parser =
482                    MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
483                Ok(Self::Maxwell(parser))
484            }
485            _ => unreachable!(),
486        }
487    }
488
489    /// Create a parser for testing purposes.
490    pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
491        futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
492    }
493}
494
495/// Test utilities for [`ByteStreamSourceParserImpl`].
496#[cfg(test)]
497pub mod test_utils {
498    use futures::StreamExt;
499    use itertools::Itertools;
500    use risingwave_common::array::StreamChunk;
501
502    use super::*;
503    use crate::source::SourceMessage;
504
505    #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
506    pub(crate) impl ByteStreamSourceParserImpl {
507        /// Parse the given payloads into a [`StreamChunk`].
508        async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
509            let source_messages = payloads
510                .into_iter()
511                .map(|p| SourceMessage {
512                    payload: (!p.is_empty()).then_some(p),
513                    ..SourceMessage::dummy()
514                })
515                .collect_vec();
516
517            into_data_chunk_stream(
518                self.parse_stream_with_events(
519                    futures::stream::once(async { Ok(SourceMessageEvent::Data(source_messages)) })
520                        .boxed(),
521                ),
522            )
523            .next()
524            .await
525            .unwrap()
526            .unwrap()
527        }
528
529        /// Parse the given key-value pairs into a [`StreamChunk`].
530        async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
531            let source_messages = kvs
532                .into_iter()
533                .map(|(k, v)| SourceMessage {
534                    key: (!k.is_empty()).then_some(k),
535                    payload: (!v.is_empty()).then_some(v),
536                    ..SourceMessage::dummy()
537                })
538                .collect_vec();
539
540            into_data_chunk_stream(
541                self.parse_stream_with_events(
542                    futures::stream::once(async { Ok(SourceMessageEvent::Data(source_messages)) })
543                        .boxed(),
544                ),
545            )
546            .next()
547            .await
548            .unwrap()
549            .unwrap()
550        }
551    }
552}