risingwave_connector/parser/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::LazyLock;

use auto_enums::auto_enum;
pub use avro::AvroParserConfig;
pub use canal::*;
pub use chunk_builder::{SourceStreamChunkBuilder, SourceStreamChunkRowWriter};
use csv_parser::CsvParser;
pub use debezium::*;
use futures::{Future, TryFutureExt};
use futures_async_stream::try_stream;
pub use json_parser::*;
pub use parquet_parser::ParquetParser;
pub use protobuf::*;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{CDC_TABLE_NAME_COLUMN_NAME, KAFKA_TIMESTAMP_COLUMN_NAME};
use risingwave_common::log::LogSuppresser;
use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::types::{DatumCow, DatumRef};
use risingwave_common::util::tracing::InstrumentStream;
use risingwave_connector_codec::decoder::avro::MapHandling;
use thiserror_ext::AsReport;

pub use self::mysql::{mysql_datum_to_rw_datum, mysql_row_to_owned_row};
use self::plain_parser::PlainParser;
pub use self::postgres::postgres_row_to_owned_row;
pub use self::sql_server::{sql_server_row_to_owned_row, ScalarImplTiberiusWrapper};
pub use self::unified::json::{JsonAccess, TimestamptzHandling};
pub use self::unified::Access;
use self::upsert_parser::UpsertParser;
use crate::error::ConnectorResult;
use crate::parser::maxwell::MaxwellParser;
use crate::schema::schema_registry::SchemaRegistryAuth;
use crate::source::monitor::GLOBAL_SOURCE_METRICS;
use crate::source::{
    BoxSourceMessageStream, SourceChunkStream, SourceColumnDesc, SourceColumnType, SourceContext,
    SourceContextRef, SourceCtrlOpts, SourceMeta,
};

mod access_builder;
pub mod additional_columns;
mod avro;
mod bytes_parser;
mod canal;
mod chunk_builder;
mod config;
mod csv_parser;
mod debezium;
mod json_parser;
mod maxwell;
mod mysql;
pub mod parquet_parser;
pub mod plain_parser;
mod postgres;
mod protobuf;
pub mod scalar_adapter;
mod sql_server;
mod unified;
mod upsert_parser;
mod utils;

use access_builder::{AccessBuilder, AccessBuilderImpl};
pub use config::*;
use debezium::schema_change::SchemaChangeEnvelope;
pub use debezium::DEBEZIUM_IGNORE_KEY;
pub use unified::{AccessError, AccessResult};

/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
#[derive(Clone, Copy, Debug)]
pub struct MessageMeta<'a> {
    source_meta: &'a SourceMeta,
    split_id: &'a str,
    offset: &'a str,
}

impl<'a> MessageMeta<'a> {
    /// Extract the value for the given column.
    ///
    /// Returns `None` if the column is not a meta column.
    fn value_for_column(self, desc: &SourceColumnDesc) -> DatumRef<'a> {
        let datum: DatumRef<'_> = match desc.column_type {
            // Row id columns are filled with `NULL` here and will be filled with the real
            // row id generated by `RowIdGenExecutor` later.
            SourceColumnType::RowId => None,
            // Extract the offset from the meta data.
            SourceColumnType::Offset => Some(self.offset.into()),
            // Extract custom meta data per connector.
            SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.source_meta => {
                assert_eq!(
                    desc.name.as_str(),
                    KAFKA_TIMESTAMP_COLUMN_NAME,
                    "unexpected kafka meta column name"
                );
                kafka_meta.extract_timestamp()
            }
            SourceColumnType::Meta if let SourceMeta::DebeziumCdc(cdc_meta) = self.source_meta => {
                assert_eq!(
                    desc.name.as_str(),
                    CDC_TABLE_NAME_COLUMN_NAME,
                    "unexpected cdc meta column name"
                );
                Some(cdc_meta.full_table_name.as_str().into())
            }

            // For other cases, return `None`.
            SourceColumnType::Meta | SourceColumnType::Normal => return None,
        };

        datum
    }
}

/// Transaction control message. Currently only used by Debezium messages.
#[derive(Debug)]
pub enum TransactionControl {
    Begin { id: Box<str> },
    Commit { id: Box<str> },
}

/// The result of parsing a message.
#[derive(Debug)]
pub enum ParseResult {
    /// Some rows are parsed and written to the [`SourceStreamChunkRowWriter`].
    Rows,
    /// A transaction control message is parsed.
    TransactionControl(TransactionControl),

    /// A schema change message is parsed.
    SchemaChange(SchemaChangeEnvelope),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ParserFormat {
    CanalJson,
    Csv,
    Json,
    Maxwell,
    Debezium,
    DebeziumMongo,
    Upsert,
    Plain,
}

/// `ByteStreamSourceParser` is the entrypoint abstraction for parsing messages.
/// It consumes bytes of one individual message and produces parsed records.
///
/// It's used by [`ByteStreamSourceParserImpl::parse_stream`]. `pub` is for benchmark only.
pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
    /// The column descriptors of the output chunk.
    fn columns(&self) -> &[SourceColumnDesc];

    /// The source context, used to report parsing error.
    fn source_ctx(&self) -> &SourceContext;

    /// The format of the specific parser.
    fn parser_format(&self) -> ParserFormat;

    /// Parse one record from the given `payload` and write rows to the `writer`.
    ///
    /// Returns error if **any** of the rows in the message failed to parse.
    fn parse_one<'a>(
        &'a mut self,
        key: Option<Vec<u8>>,
        payload: Option<Vec<u8>>,
        writer: SourceStreamChunkRowWriter<'a>,
    ) -> impl Future<Output = ConnectorResult<()>> + Send + 'a;

    /// Parse one record from the given `payload`, either write rows to the `writer` or interpret it
    /// as a transaction control message.
    ///
    /// The default implementation forwards to [`ByteStreamSourceParser::parse_one`] for
    /// non-transactional sources.
    ///
    /// Returns error if **any** of the rows in the message failed to parse.
    fn parse_one_with_txn<'a>(
        &'a mut self,
        key: Option<Vec<u8>>,
        payload: Option<Vec<u8>>,
        writer: SourceStreamChunkRowWriter<'a>,
    ) -> impl Future<Output = ConnectorResult<ParseResult>> + Send + 'a {
        self.parse_one(key, payload, writer)
            .map_ok(|_| ParseResult::Rows)
    }
}

#[easy_ext::ext(SourceParserIntoStreamExt)]
impl<P: ByteStreamSourceParser> P {
    /// Parse a `SourceMessage` stream into a [`StreamChunk`] stream.
    ///
    /// # Arguments
    ///
    /// - `msg_stream`: A stream of batches of `SourceMessage`.
    ///
    /// # Returns
    ///
    /// A [`SourceChunkStream`] of parsed chunks. Each of the parsed chunks are guaranteed
    /// to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless there's a
    /// large transaction and `source_ctrl_opts.split_txn` is false.
    pub fn parse_stream(self, msg_stream: BoxSourceMessageStream) -> impl SourceChunkStream {
        let actor_id = self.source_ctx().actor_id;
        let source_id = self.source_ctx().source_id.table_id();

        // The stream will be long-lived. We use `instrument_with` here to create
        // a new span for the polling of each chunk.
        let source_ctrl_opts = self.source_ctx().source_ctrl_opts;
        parse_message_stream(self, msg_stream, source_ctrl_opts)
            .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id))
    }
}

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
#[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)]
async fn parse_message_stream<P: ByteStreamSourceParser>(
    mut parser: P,
    msg_stream: BoxSourceMessageStream,
    source_ctrl_opts: SourceCtrlOpts,
) {
    let mut chunk_builder =
        SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts);

    let mut direct_cdc_event_lag_latency_metrics = HashMap::new();

    #[for_await]
    for batch in msg_stream {
        // It's possible that the split is not active, which means the next batch may arrive
        // very lately, so we should prefer emitting all records in current batch before the end
        // of each iteration, instead of merging them with the next batch. An exception is when
        // a transaction is not committed yet, in which yield when the transaction is committed.

        let batch = batch?;
        let batch_len = batch.len();

        if batch_len == 0 {
            continue;
        }

        if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
            // This `.iter().all(...)` will short-circuit after seeing the first `false`, so in
            // normal cases, this should only involve a constant time cost.

            // Now we know that there is no data message in the batch, let's just emit the latest
            // heartbeat message. Note that all messages in `batch` should belong to the same
            // split, so we don't have to do a split to heartbeats mapping here.

            let heartbeat_msg = batch.last().unwrap();
            tracing::debug!(
                offset = heartbeat_msg.offset,
                "handling a heartbeat message"
            );
            chunk_builder.heartbeat(MessageMeta {
                source_meta: &heartbeat_msg.meta,
                split_id: &heartbeat_msg.split_id,
                offset: &heartbeat_msg.offset,
            });

            for chunk in chunk_builder.consume_ready_chunks() {
                yield chunk;
            }
            continue; // continue to next batch
        }

        // When we reach here, there is at least one data message in the batch. We should ignore all
        // heartbeat messages.

        let mut txn_started_in_last_batch = chunk_builder.is_in_transaction();
        let process_time_ms = chrono::Utc::now().timestamp_millis();

        for msg in batch {
            if msg.is_cdc_heartbeat() {
                // ignore heartbeat messages
                continue;
            }

            // calculate process_time - event_time lag
            if let SourceMeta::DebeziumCdc(msg_meta) = &msg.meta {
                let lag_ms = process_time_ms - msg_meta.source_ts_ms;
                // report to promethus
                let full_table_name = msg_meta.full_table_name.clone();
                let direct_cdc_event_lag_latency = direct_cdc_event_lag_latency_metrics
                    .entry(full_table_name)
                    .or_insert_with(|| {
                        GLOBAL_SOURCE_METRICS
                            .direct_cdc_event_lag_latency
                            .with_guarded_label_values(&[&msg_meta.full_table_name])
                    });
                direct_cdc_event_lag_latency.observe(lag_ms as f64);
            }

            // Parse the message and write to the chunk builder, it's possible that the message
            // contains multiple rows. When the chunk size reached the limit during parsing, the
            // chunk builder may yield the chunk to `ready_chunks` and start a new chunk.
            match parser
                .parse_one_with_txn(
                    msg.key,
                    msg.payload,
                    chunk_builder.row_writer().with_meta(MessageMeta {
                        source_meta: &msg.meta,
                        split_id: &msg.split_id,
                        offset: &msg.offset,
                    }),
                )
                .await
            {
                // It's possible that parsing multiple rows in a single message PARTIALLY failed.
                // We still have to maintain the row number in this case.
                res @ (Ok(ParseResult::Rows) | Err(_)) => {
                    if let Err(error) = res {
                        // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
                        //       see #13105
                        static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
                            LazyLock::new(LogSuppresser::default);
                        if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
                            tracing::error!(
                                error = %error.as_report(),
                                split_id = &*msg.split_id,
                                offset = msg.offset,
                                suppressed_count,
                                "failed to parse message, skipping"
                            );
                        }

                        // report to error metrics
                        let context = parser.source_ctx();
                        GLOBAL_ERROR_METRICS.user_source_error.report([
                            error.variant_name().to_owned(),
                            context.source_id.to_string(),
                            context.source_name.clone(),
                            context.fragment_id.to_string(),
                        ]);
                    }

                    for chunk in chunk_builder.consume_ready_chunks() {
                        yield chunk;
                    }
                }

                Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl {
                    TransactionControl::Begin { id } => {
                        chunk_builder.begin_transaction(id);
                    }
                    TransactionControl::Commit { id } => {
                        chunk_builder.commit_transaction(id);
                        assert!(!chunk_builder.is_in_transaction());

                        if txn_started_in_last_batch {
                            // If a transaction is across multiple batches, we yield the chunk
                            // immediately after the transaction is committed.
                            chunk_builder.finish_current_chunk();
                            txn_started_in_last_batch = false;
                        }

                        for chunk in chunk_builder.consume_ready_chunks() {
                            yield chunk;
                        }
                    }
                },

                Ok(ParseResult::SchemaChange(schema_change)) => {
                    if schema_change.is_empty() {
                        continue;
                    }

                    let (oneshot_tx, oneshot_rx) = tokio::sync::oneshot::channel();
                    // we bubble up the schema change event to the source executor via channel,
                    // and wait for the source executor to finish the schema change process before
                    // parsing the following messages.
                    if let Some(ref tx) = parser.source_ctx().schema_change_tx {
                        tx.send((schema_change, oneshot_tx))
                            .await
                            .expect("send schema change to executor");
                        match oneshot_rx.await {
                            Ok(()) => {}
                            Err(e) => {
                                tracing::error!(error = %e.as_report(), "failed to wait for schema change");
                            }
                        }
                    }
                }
            }
        }

        // Finish the remaining records in the batch.
        if !chunk_builder.is_in_transaction() {
            chunk_builder.finish_current_chunk();
        }
        for chunk in chunk_builder.consume_ready_chunks() {
            yield chunk;
        }
    }
}

#[derive(Debug)]
pub enum EncodingType {
    Key,
    Value,
}

/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`StreamChunk`] stream.
/// Used by [`crate::source::into_chunk_stream`].
#[derive(Debug)]
pub enum ByteStreamSourceParserImpl {
    Csv(CsvParser),
    Debezium(DebeziumParser),
    Plain(PlainParser),
    Upsert(UpsertParser),
    DebeziumMongoJson(DebeziumMongoJsonParser),
    Maxwell(MaxwellParser),
    CanalJson(CanalJsonParser),
}

impl ByteStreamSourceParserImpl {
    /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream.
    pub fn parse_stream(
        self,
        msg_stream: BoxSourceMessageStream,
    ) -> impl SourceChunkStream + Unpin {
        #[auto_enum(futures03::Stream)]
        let stream = match self {
            Self::Csv(parser) => parser.parse_stream(msg_stream),
            Self::Debezium(parser) => parser.parse_stream(msg_stream),
            Self::DebeziumMongoJson(parser) => parser.parse_stream(msg_stream),
            Self::Maxwell(parser) => parser.parse_stream(msg_stream),
            Self::CanalJson(parser) => parser.parse_stream(msg_stream),
            Self::Plain(parser) => parser.parse_stream(msg_stream),
            Self::Upsert(parser) => parser.parse_stream(msg_stream),
        };
        Box::pin(stream)
    }
}

impl ByteStreamSourceParserImpl {
    pub async fn create(
        parser_config: ParserConfig,
        source_ctx: SourceContextRef,
    ) -> ConnectorResult<Self> {
        let CommonParserConfig { rw_columns } = parser_config.common;
        let protocol = &parser_config.specific.protocol_config;
        let encode = &parser_config.specific.encoding_config;
        match (protocol, encode) {
            (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => {
                CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv)
            }
            (ProtocolProperties::DebeziumMongo, EncodingProperties::Json(_)) => {
                DebeziumMongoJsonParser::new(rw_columns, source_ctx).map(Self::DebeziumMongoJson)
            }
            (ProtocolProperties::Canal, EncodingProperties::Json(config)) => {
                CanalJsonParser::new(rw_columns, source_ctx, config).map(Self::CanalJson)
            }
            (ProtocolProperties::Native, _) => unreachable!("Native parser should not be created"),
            (ProtocolProperties::Upsert, _) => {
                let parser =
                    UpsertParser::new(parser_config.specific, rw_columns, source_ctx).await?;
                Ok(Self::Upsert(parser))
            }
            (ProtocolProperties::Plain, _) => {
                let parser =
                    PlainParser::new(parser_config.specific, rw_columns, source_ctx).await?;
                Ok(Self::Plain(parser))
            }
            (ProtocolProperties::Debezium(_), _) => {
                let parser =
                    DebeziumParser::new(parser_config.specific, rw_columns, source_ctx).await?;
                Ok(Self::Debezium(parser))
            }
            (ProtocolProperties::Maxwell, _) => {
                let parser =
                    MaxwellParser::new(parser_config.specific, rw_columns, source_ctx).await?;
                Ok(Self::Maxwell(parser))
            }
            _ => unreachable!(),
        }
    }

    /// Create a parser for testing purposes.
    pub fn create_for_test(parser_config: ParserConfig) -> ConnectorResult<Self> {
        futures::executor::block_on(Self::create(parser_config, SourceContext::dummy().into()))
    }
}

/// Test utilities for [`ByteStreamSourceParserImpl`].
#[cfg(test)]
pub mod test_utils {
    use futures::StreamExt;
    use itertools::Itertools;

    use super::*;
    use crate::source::SourceMessage;

    #[easy_ext::ext(ByteStreamSourceParserImplTestExt)]
    pub(crate) impl ByteStreamSourceParserImpl {
        /// Parse the given payloads into a [`StreamChunk`].
        async fn parse(self, payloads: Vec<Vec<u8>>) -> StreamChunk {
            let source_messages = payloads
                .into_iter()
                .map(|p| SourceMessage {
                    payload: (!p.is_empty()).then_some(p),
                    ..SourceMessage::dummy()
                })
                .collect_vec();

            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
                .next()
                .await
                .unwrap()
                .unwrap()
        }

        /// Parse the given key-value pairs into a [`StreamChunk`].
        async fn parse_upsert(self, kvs: Vec<(Vec<u8>, Vec<u8>)>) -> StreamChunk {
            let source_messages = kvs
                .into_iter()
                .map(|(k, v)| SourceMessage {
                    key: (!k.is_empty()).then_some(k),
                    payload: (!v.is_empty()).then_some(v),
                    ..SourceMessage::dummy()
                })
                .collect_vec();

            self.parse_stream(futures::stream::once(async { Ok(source_messages) }).boxed())
                .next()
                .await
                .unwrap()
                .unwrap()
        }
    }
}