risingwave_connector/parser/
chunk_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl};
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_connector_codec::decoder::{AccessError, AccessResult};
24use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
25use smallvec::SmallVec;
26use thiserror_ext::AsReport;
27
28use super::MessageMeta;
29use crate::parser::utils::{
30    extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
31    extract_pulsar_message_id_data_from_meta, extract_subject_from_meta,
32    extract_timestamp_from_meta,
33};
34use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
35
36/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force
37/// committed to avoid potential OOM.
38const MAX_TRANSACTION_SIZE: usize = 4096;
39
40/// Represents an ongoing transaction.
41struct Transaction {
42    id: Box<str>,
43    len: usize,
44}
45
46/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
47///
48/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`.
49/// During building process, it's possible that multiple chunks are built even without any explicit
50/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one
51/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume
52/// the built chunks from time to time, to avoid the buffer from growing too large.
53pub struct SourceStreamChunkBuilder {
54    column_descs: Vec<SourceColumnDesc>,
55    source_ctrl_opts: SourceCtrlOpts,
56    builders: Vec<ArrayBuilderImpl>,
57    op_builder: Vec<Op>,
58    vis_builder: BitmapBuilder,
59    ongoing_txn: Option<Transaction>,
60    ready_chunks: SmallVec<[StreamChunk; 1]>,
61}
62
63impl SourceStreamChunkBuilder {
64    pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
65        let (builders, op_builder, vis_builder) =
66            Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
67
68        Self {
69            column_descs,
70            source_ctrl_opts,
71            builders,
72            op_builder,
73            vis_builder,
74            ongoing_txn: None,
75            ready_chunks: SmallVec::new(),
76        }
77    }
78
79    fn create_builders(
80        column_descs: &[SourceColumnDesc],
81        chunk_size: usize,
82    ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
83        let reserved_capacity = chunk_size + 1; // it's possible to have an additional `U-` at the end
84        let builders = column_descs
85            .iter()
86            .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
87            .collect();
88        let op_builder = Vec::with_capacity(reserved_capacity);
89        let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
90        (builders, op_builder, vis_builder)
91    }
92
93    /// Begin a (CDC) transaction with the given `txn_id`.
94    pub fn begin_transaction(&mut self, txn_id: Box<str>) {
95        if let Some(ref txn) = self.ongoing_txn {
96            tracing::warn!(
97                ongoing_txn_id = txn.id,
98                new_txn_id = txn_id,
99                "already in a transaction"
100            );
101        }
102        tracing::debug!(txn_id, "begin upstream transaction");
103        self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
104    }
105
106    /// Commit the ongoing transaction with the given `txn_id`.
107    pub fn commit_transaction(&mut self, txn_id: Box<str>) {
108        if let Some(txn) = self.ongoing_txn.take() {
109            if txn.id != txn_id {
110                tracing::warn!(
111                    expected_txn_id = txn.id,
112                    actual_txn_id = txn_id,
113                    "unexpected transaction id"
114                );
115            }
116            tracing::debug!(txn_id, "commit upstream transaction");
117
118            if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
119                // if `split_txn` is on, we should've finished the chunk already
120                assert!(!self.source_ctrl_opts.split_txn);
121                self.finish_current_chunk();
122            }
123        } else {
124            tracing::warn!(txn_id, "no ongoing transaction to commit");
125        }
126    }
127
128    /// Check if the builder is in an ongoing transaction.
129    pub fn is_in_transaction(&self) -> bool {
130        self.ongoing_txn.is_some()
131    }
132
133    /// Get a row writer for parser to write records to the builder.
134    pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
135        SourceStreamChunkRowWriter {
136            builder: self,
137            visible: true, // write visible rows by default
138            row_meta: None,
139        }
140    }
141
142    /// Write a heartbeat record to the builder. The builder will decide whether to finish the
143    /// current chunk or not. Currently it ensures that heartbeats are always in separate chunks.
144    pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
145        if self.current_chunk_len() > 0 {
146            // If there are records in the chunk, finish it first.
147            // If there's an ongoing transaction, `finish_current_chunk` will handle it properly.
148            // Note this
149            self.finish_current_chunk();
150        }
151
152        _ = self
153            .row_writer()
154            .invisible()
155            .with_meta(meta)
156            .do_insert(|_| Ok(Datum::None));
157        self.finish_current_chunk(); // each heartbeat should be a separate chunk
158    }
159
160    /// Finish and build a [`StreamChunk`] from the current pending records in the builder,
161    /// no matter whether the builder is in a transaction or not, `split_txn` or not. The
162    /// built chunk will be appended to the `ready_chunks` and the builder will be reset.
163    pub fn finish_current_chunk(&mut self) {
164        if self.op_builder.is_empty() {
165            return;
166        }
167
168        let (builders, op_builder, vis_builder) =
169            Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
170        let chunk = StreamChunk::with_visibility(
171            std::mem::replace(&mut self.op_builder, op_builder),
172            std::mem::replace(&mut self.builders, builders)
173                .into_iter()
174                .map(|builder| builder.finish().into())
175                .collect(),
176            std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
177        );
178        self.ready_chunks.push(chunk);
179
180        if let Some(ref mut txn) = self.ongoing_txn {
181            tracing::warn!(
182                txn_id = txn.id,
183                len = txn.len,
184                "splitting an ongoing transaction"
185            );
186            txn.len = 0;
187        }
188    }
189
190    /// Consumes and returns the ready [`StreamChunk`]s.
191    pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
192        self.ready_chunks.drain(..)
193    }
194
195    fn current_chunk_len(&self) -> usize {
196        self.op_builder.len()
197    }
198
199    /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders.
200    /// This is supposed to be called via the `row_writer` only.
201    fn commit_record(&mut self, op: Op, vis: bool) {
202        self.op_builder.push(op);
203        self.vis_builder.append(vis);
204
205        let curr_chunk_size = self.current_chunk_len();
206        let max_chunk_size = self.source_ctrl_opts.chunk_size;
207
208        if let Some(ref mut txn) = self.ongoing_txn {
209            txn.len += 1;
210
211            if txn.len >= MAX_TRANSACTION_SIZE
212                || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
213            {
214                self.finish_current_chunk();
215            }
216        } else if curr_chunk_size >= max_chunk_size {
217            self.finish_current_chunk();
218        }
219    }
220}
221
222/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
223/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
224///
225/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record,
226/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`].
227/// Specifically,
228/// - only columns with [`SourceColumnType::Normal`] need to be handled;
229/// - errors for non-primary key columns will be ignored and filled with default value instead;
230/// - other errors will be propagated.
231pub struct SourceStreamChunkRowWriter<'a> {
232    builder: &'a mut SourceStreamChunkBuilder,
233
234    /// Whether the rows written by this writer should be visible in output `StreamChunk`.
235    visible: bool,
236
237    /// An optional meta data of the original message.
238    ///
239    /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
240    row_meta: Option<MessageMeta<'a>>,
241}
242
243impl<'a> SourceStreamChunkRowWriter<'a> {
244    /// Set the meta data of the original message for this row writer.
245    ///
246    /// This should always be called except for tests.
247    pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
248        self.row_meta = Some(row_meta);
249        self
250    }
251
252    pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
253        self.row_meta
254    }
255
256    pub fn source_meta(&self) -> &'a SourceMeta {
257        self.row_meta
258            .map(|m| m.source_meta)
259            .unwrap_or(&SourceMeta::Empty)
260    }
261
262    /// Convert the row writer to invisible row writer.
263    pub fn invisible(mut self) -> Self {
264        self.visible = false;
265        self
266    }
267}
268
269impl SourceStreamChunkRowWriter<'_> {
270    fn do_action<'a, A: RowWriterAction>(
271        &'a mut self,
272        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
273    ) -> AccessResult<()> {
274        let mut parse_field = |desc: &SourceColumnDesc| {
275            match f(desc) {
276                Ok(output) => Ok(output),
277
278                // Throw error for failed access to primary key columns.
279                Err(e) if desc.is_pk => Err(e),
280                // Ignore error for other columns and fill in `NULL` instead.
281                Err(error) => {
282                    // TODO: figure out a way to fill in not-null default value if user specifies one
283                    // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
284                    // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
285                    //       see #13105
286                    static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
287                        LazyLock::new(LogSuppressor::default);
288                    if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
289                        tracing::warn!(
290                            error = %error.as_report(),
291                            split_id = self.row_meta.as_ref().map(|m| m.split_id),
292                            offset = self.row_meta.as_ref().map(|m| m.offset),
293                            column = desc.name,
294                            suppressed_count,
295                            "failed to parse non-pk column, padding with `NULL`"
296                        );
297                    }
298                    Ok(A::output_for(Datum::None))
299                }
300            }
301        };
302
303        let mut wrapped_f = |desc: &SourceColumnDesc| {
304            match (&desc.column_type, &desc.additional_column.column_type) {
305                (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
306                    // SourceColumnType is for CDC source only.
307                    Ok(A::output_for(
308                        self.row_meta
309                            .as_ref()
310                            .and_then(|row_meta| row_meta.value_for_column(desc)),
311                    ))
312                }
313                (&SourceColumnType::Meta, _)
314                    if matches!(
315                        &self.row_meta.map(|ele| ele.source_meta),
316                        &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
317                    ) =>
318                {
319                    // SourceColumnType is for CDC source only.
320                    Ok(A::output_for(
321                        self.row_meta
322                            .as_ref()
323                            .and_then(|row_meta| row_meta.value_for_column(desc)),
324                    ))
325                }
326
327                (
328                    _, // for cdc tables
329                    &Some(ref col @ AdditionalColumnType::DatabaseName(_))
330                    | &Some(ref col @ AdditionalColumnType::TableName(_)),
331                ) => {
332                    match self.row_meta {
333                        Some(row_meta) => {
334                            if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
335                                Ok(A::output_for(extract_cdc_meta_column(
336                                    cdc_meta,
337                                    col,
338                                    desc.name.as_str(),
339                                )?))
340                            } else {
341                                Err(AccessError::Uncategorized {
342                                    message: "CDC metadata not found in the message".to_owned(),
343                                })
344                            }
345                        }
346                        None => parse_field(desc), // parse from payload
347                    }
348                }
349                (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
350                    Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
351                        row_meta.source_meta,
352                    ))),
353                    None => parse_field(desc), // parse from payload
354                },
355                (_, &Some(AdditionalColumnType::CollectionName(_))) => {
356                    // collection name for `mongodb-cdc` should be parsed from the message payload
357                    parse_field(desc)
358                }
359                (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
360                    self.row_meta
361                        .as_ref()
362                        .and_then(|ele| extract_subject_from_meta(ele.source_meta))
363                        .unwrap_or(None),
364                )),
365                (_, &Some(AdditionalColumnType::Partition(_))) => {
366                    // the meta info does not involve spec connector
367                    Ok(A::output_for(
368                        self.row_meta
369                            .as_ref()
370                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
371                    ))
372                }
373                (_, &Some(AdditionalColumnType::Offset(_))) => {
374                    // the meta info does not involve spec connector
375                    Ok(A::output_for(
376                        self.row_meta
377                            .as_ref()
378                            .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
379                    ))
380                }
381                (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
382                    Ok(A::output_for(
383                        self.row_meta
384                            .as_ref()
385                            .and_then(|ele| {
386                                extract_header_inner_from_meta(
387                                    ele.source_meta,
388                                    header_inner.inner_field.as_ref(),
389                                    header_inner.data_type.as_ref(),
390                                )
391                            })
392                            .unwrap_or(Datum::None.into()),
393                    ))
394                }
395                (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
396                    self.row_meta
397                        .as_ref()
398                        .and_then(|ele| extract_headers_from_meta(ele.source_meta))
399                        .unwrap_or(None),
400                )),
401                (_, &Some(AdditionalColumnType::PulsarMessageIdData(_))) => {
402                    // message_id_data is derived internally, so it's not included here
403                    Ok(A::output_for(
404                        self.row_meta
405                            .as_ref()
406                            .and_then(|ele| {
407                                extract_pulsar_message_id_data_from_meta(ele.source_meta)
408                            })
409                            .unwrap_or(None),
410                    ))
411                }
412                (_, &Some(AdditionalColumnType::Filename(_))) => {
413                    // Filename is used as partition in FS connectors
414                    Ok(A::output_for(
415                        self.row_meta
416                            .as_ref()
417                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
418                    ))
419                }
420                (_, &Some(AdditionalColumnType::Payload(_))) => {
421                    // ingest the whole payload as a single column
422                    // do special logic in `KvEvent::access_field`
423                    parse_field(desc)
424                }
425                (_, _) => {
426                    // For normal columns, call the user provided closure.
427                    parse_field(desc)
428                }
429            }
430        };
431
432        // Columns that changes have been applied to. Used to rollback when an error occurs.
433        let mut applied_columns = 0;
434
435        let result = (self.builder.column_descs.iter())
436            .zip_eq_fast(self.builder.builders.iter_mut())
437            .try_for_each(|(desc, builder)| {
438                wrapped_f(desc).map(|output| {
439                    A::apply(builder, output);
440                    applied_columns += 1;
441                })
442            });
443
444        match result {
445            Ok(_) => {
446                // commit the action by appending `Op`s and visibility
447                for op in A::RECORD_TYPE.ops() {
448                    self.builder.commit_record(*op, self.visible);
449                }
450
451                Ok(())
452            }
453            Err(e) => {
454                for i in 0..applied_columns {
455                    A::rollback(&mut self.builder.builders[i]);
456                }
457                Err(e)
458            }
459        }
460    }
461
462    /// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that
463    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
464    ///
465    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
466    #[inline(always)]
467    pub fn do_insert<'a, D>(
468        &mut self,
469        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
470    ) -> AccessResult<()>
471    where
472        D: Into<DatumCow<'a>>,
473    {
474        self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
475    }
476
477    /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
478    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
479    ///
480    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
481    #[inline(always)]
482    pub fn do_delete<'a, D>(
483        &mut self,
484        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
485    ) -> AccessResult<()>
486    where
487        D: Into<DatumCow<'a>>,
488    {
489        self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
490    }
491
492    /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
493    /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
494    ///
495    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
496    #[inline(always)]
497    pub fn do_update<'a, D1, D2>(
498        &mut self,
499        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
500    ) -> AccessResult<()>
501    where
502        D1: Into<DatumCow<'a>>,
503        D2: Into<DatumCow<'a>>,
504    {
505        self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
506    }
507}
508
509trait RowWriterAction {
510    type Output<'a>;
511    const RECORD_TYPE: RecordType;
512
513    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
514
515    fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
516
517    fn rollback(builder: &mut ArrayBuilderImpl);
518}
519
520struct InsertAction;
521
522impl RowWriterAction for InsertAction {
523    type Output<'a> = DatumCow<'a>;
524
525    const RECORD_TYPE: RecordType = RecordType::Insert;
526
527    #[inline(always)]
528    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
529        datum.into()
530    }
531
532    #[inline(always)]
533    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
534        builder.append(output)
535    }
536
537    #[inline(always)]
538    fn rollback(builder: &mut ArrayBuilderImpl) {
539        builder.pop().unwrap()
540    }
541}
542
543struct DeleteAction;
544
545impl RowWriterAction for DeleteAction {
546    type Output<'a> = DatumCow<'a>;
547
548    const RECORD_TYPE: RecordType = RecordType::Delete;
549
550    #[inline(always)]
551    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
552        datum.into()
553    }
554
555    #[inline(always)]
556    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
557        builder.append(output)
558    }
559
560    #[inline(always)]
561    fn rollback(builder: &mut ArrayBuilderImpl) {
562        builder.pop().unwrap()
563    }
564}
565
566struct UpdateAction;
567
568impl RowWriterAction for UpdateAction {
569    type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
570
571    const RECORD_TYPE: RecordType = RecordType::Update;
572
573    #[inline(always)]
574    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
575        let datum = datum.into();
576        (datum.clone(), datum)
577    }
578
579    #[inline(always)]
580    fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
581        builder.append(output.0);
582        builder.append(output.1);
583    }
584
585    #[inline(always)]
586    fn rollback(builder: &mut ArrayBuilderImpl) {
587        builder.pop().unwrap();
588        builder.pop().unwrap();
589    }
590}