risingwave_connector/parser/
chunk_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl};
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_connector_codec::decoder::{AccessError, AccessResult};
24use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
25use smallvec::SmallVec;
26use thiserror_ext::AsReport;
27
28use super::MessageMeta;
29use crate::parser::utils::{
30    extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
31    extract_subject_from_meta, extract_timestamp_from_meta,
32};
33use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
34
35/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force
36/// committed to avoid potential OOM.
37const MAX_TRANSACTION_SIZE: usize = 4096;
38
39/// Represents an ongoing transaction.
40struct Transaction {
41    id: Box<str>,
42    len: usize,
43}
44
45/// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`].
46///
47/// Output chunk size is controlled by `source_ctrl_opts.chunk_size` and `source_ctrl_opts.split_txn`.
48/// During building process, it's possible that multiple chunks are built even without any explicit
49/// call to `finish_current_chunk`. This mainly happens when we find more than one records in one
50/// `SourceMessage` when parsing it. User of this builder should call `consume_ready_chunks` to consume
51/// the built chunks from time to time, to avoid the buffer from growing too large.
52pub struct SourceStreamChunkBuilder {
53    column_descs: Vec<SourceColumnDesc>,
54    source_ctrl_opts: SourceCtrlOpts,
55    builders: Vec<ArrayBuilderImpl>,
56    op_builder: Vec<Op>,
57    vis_builder: BitmapBuilder,
58    ongoing_txn: Option<Transaction>,
59    ready_chunks: SmallVec<[StreamChunk; 1]>,
60}
61
62impl SourceStreamChunkBuilder {
63    pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
64        let (builders, op_builder, vis_builder) =
65            Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
66
67        Self {
68            column_descs,
69            source_ctrl_opts,
70            builders,
71            op_builder,
72            vis_builder,
73            ongoing_txn: None,
74            ready_chunks: SmallVec::new(),
75        }
76    }
77
78    fn create_builders(
79        column_descs: &[SourceColumnDesc],
80        chunk_size: usize,
81    ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
82        let reserved_capacity = chunk_size + 1; // it's possible to have an additional `U-` at the end
83        let builders = column_descs
84            .iter()
85            .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
86            .collect();
87        let op_builder = Vec::with_capacity(reserved_capacity);
88        let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
89        (builders, op_builder, vis_builder)
90    }
91
92    /// Begin a (CDC) transaction with the given `txn_id`.
93    pub fn begin_transaction(&mut self, txn_id: Box<str>) {
94        if let Some(ref txn) = self.ongoing_txn {
95            tracing::warn!(
96                ongoing_txn_id = txn.id,
97                new_txn_id = txn_id,
98                "already in a transaction"
99            );
100        }
101        tracing::debug!(txn_id, "begin upstream transaction");
102        self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
103    }
104
105    /// Commit the ongoing transaction with the given `txn_id`.
106    pub fn commit_transaction(&mut self, txn_id: Box<str>) {
107        if let Some(txn) = self.ongoing_txn.take() {
108            if txn.id != txn_id {
109                tracing::warn!(
110                    expected_txn_id = txn.id,
111                    actual_txn_id = txn_id,
112                    "unexpected transaction id"
113                );
114            }
115            tracing::debug!(txn_id, "commit upstream transaction");
116
117            if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
118                // if `split_txn` is on, we should've finished the chunk already
119                assert!(!self.source_ctrl_opts.split_txn);
120                self.finish_current_chunk();
121            }
122        } else {
123            tracing::warn!(txn_id, "no ongoing transaction to commit");
124        }
125    }
126
127    /// Check if the builder is in an ongoing transaction.
128    pub fn is_in_transaction(&self) -> bool {
129        self.ongoing_txn.is_some()
130    }
131
132    /// Get a row writer for parser to write records to the builder.
133    pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
134        SourceStreamChunkRowWriter {
135            builder: self,
136            visible: true, // write visible rows by default
137            row_meta: None,
138        }
139    }
140
141    /// Write a heartbeat record to the builder. The builder will decide whether to finish the
142    /// current chunk or not. Currently it ensures that heartbeats are always in separate chunks.
143    pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
144        if self.current_chunk_len() > 0 {
145            // If there are records in the chunk, finish it first.
146            // If there's an ongoing transaction, `finish_current_chunk` will handle it properly.
147            // Note this
148            self.finish_current_chunk();
149        }
150
151        _ = self
152            .row_writer()
153            .invisible()
154            .with_meta(meta)
155            .do_insert(|_| Ok(Datum::None));
156        self.finish_current_chunk(); // each heartbeat should be a separate chunk
157    }
158
159    /// Finish and build a [`StreamChunk`] from the current pending records in the builder,
160    /// no matter whether the builder is in a transaction or not, `split_txn` or not. The
161    /// built chunk will be appended to the `ready_chunks` and the builder will be reset.
162    pub fn finish_current_chunk(&mut self) {
163        if self.op_builder.is_empty() {
164            return;
165        }
166
167        let (builders, op_builder, vis_builder) =
168            Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
169        let chunk = StreamChunk::with_visibility(
170            std::mem::replace(&mut self.op_builder, op_builder),
171            std::mem::replace(&mut self.builders, builders)
172                .into_iter()
173                .map(|builder| builder.finish().into())
174                .collect(),
175            std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
176        );
177        self.ready_chunks.push(chunk);
178
179        if let Some(ref mut txn) = self.ongoing_txn {
180            tracing::warn!(
181                txn_id = txn.id,
182                len = txn.len,
183                "splitting an ongoing transaction"
184            );
185            txn.len = 0;
186        }
187    }
188
189    /// Consumes and returns the ready [`StreamChunk`]s.
190    pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
191        self.ready_chunks.drain(..)
192    }
193
194    fn current_chunk_len(&self) -> usize {
195        self.op_builder.len()
196    }
197
198    /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders.
199    /// This is supposed to be called via the `row_writer` only.
200    fn commit_record(&mut self, op: Op, vis: bool) {
201        self.op_builder.push(op);
202        self.vis_builder.append(vis);
203
204        let curr_chunk_size = self.current_chunk_len();
205        let max_chunk_size = self.source_ctrl_opts.chunk_size;
206
207        if let Some(ref mut txn) = self.ongoing_txn {
208            txn.len += 1;
209
210            if txn.len >= MAX_TRANSACTION_SIZE
211                || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
212            {
213                self.finish_current_chunk();
214            }
215        } else if curr_chunk_size >= max_chunk_size {
216            self.finish_current_chunk();
217        }
218    }
219}
220
221/// `SourceStreamChunkRowWriter` is responsible to write one or more records to the [`StreamChunk`],
222/// where each contains either one row (Insert/Delete) or two rows (Update) that can be written atomically.
223///
224/// Callers are supposed to call one of the `insert`, `delete` or `update` methods to write a record,
225/// providing a closure that produces one or two [`Datum`]s by corresponding [`SourceColumnDesc`].
226/// Specifically,
227/// - only columns with [`SourceColumnType::Normal`] need to be handled;
228/// - errors for non-primary key columns will be ignored and filled with default value instead;
229/// - other errors will be propagated.
230pub struct SourceStreamChunkRowWriter<'a> {
231    builder: &'a mut SourceStreamChunkBuilder,
232
233    /// Whether the rows written by this writer should be visible in output `StreamChunk`.
234    visible: bool,
235
236    /// An optional meta data of the original message.
237    ///
238    /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
239    row_meta: Option<MessageMeta<'a>>,
240}
241
242impl<'a> SourceStreamChunkRowWriter<'a> {
243    /// Set the meta data of the original message for this row writer.
244    ///
245    /// This should always be called except for tests.
246    pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
247        self.row_meta = Some(row_meta);
248        self
249    }
250
251    pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
252        self.row_meta
253    }
254
255    pub fn source_meta(&self) -> &'a SourceMeta {
256        self.row_meta
257            .map(|m| m.source_meta)
258            .unwrap_or(&SourceMeta::Empty)
259    }
260
261    /// Convert the row writer to invisible row writer.
262    pub fn invisible(mut self) -> Self {
263        self.visible = false;
264        self
265    }
266}
267
268impl SourceStreamChunkRowWriter<'_> {
269    fn do_action<'a, A: RowWriterAction>(
270        &'a mut self,
271        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
272    ) -> AccessResult<()> {
273        let mut parse_field = |desc: &SourceColumnDesc| {
274            match f(desc) {
275                Ok(output) => Ok(output),
276
277                // Throw error for failed access to primary key columns.
278                Err(e) if desc.is_pk => Err(e),
279                // Ignore error for other columns and fill in `NULL` instead.
280                Err(error) => {
281                    // TODO: figure out a way to fill in not-null default value if user specifies one
282                    // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
283                    // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
284                    //       see #13105
285                    static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
286                        LazyLock::new(LogSuppresser::default);
287                    if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
288                        tracing::warn!(
289                            error = %error.as_report(),
290                            split_id = self.row_meta.as_ref().map(|m| m.split_id),
291                            offset = self.row_meta.as_ref().map(|m| m.offset),
292                            column = desc.name,
293                            suppressed_count,
294                            "failed to parse non-pk column, padding with `NULL`"
295                        );
296                    }
297                    Ok(A::output_for(Datum::None))
298                }
299            }
300        };
301
302        let mut wrapped_f = |desc: &SourceColumnDesc| {
303            match (&desc.column_type, &desc.additional_column.column_type) {
304                (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
305                    // SourceColumnType is for CDC source only.
306                    Ok(A::output_for(
307                        self.row_meta
308                            .as_ref()
309                            .and_then(|row_meta| row_meta.value_for_column(desc)),
310                    ))
311                }
312                (&SourceColumnType::Meta, _)
313                    if matches!(
314                        &self.row_meta.map(|ele| ele.source_meta),
315                        &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
316                    ) =>
317                {
318                    // SourceColumnType is for CDC source only.
319                    Ok(A::output_for(
320                        self.row_meta
321                            .as_ref()
322                            .and_then(|row_meta| row_meta.value_for_column(desc)),
323                    ))
324                }
325
326                (
327                    _, // for cdc tables
328                    &Some(ref col @ AdditionalColumnType::DatabaseName(_))
329                    | &Some(ref col @ AdditionalColumnType::TableName(_)),
330                ) => {
331                    match self.row_meta {
332                        Some(row_meta) => {
333                            if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
334                                Ok(A::output_for(extract_cdc_meta_column(
335                                    cdc_meta,
336                                    col,
337                                    desc.name.as_str(),
338                                )?))
339                            } else {
340                                Err(AccessError::Uncategorized {
341                                    message: "CDC metadata not found in the message".to_owned(),
342                                })
343                            }
344                        }
345                        None => parse_field(desc), // parse from payload
346                    }
347                }
348                (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
349                    Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
350                        row_meta.source_meta,
351                    ))),
352                    None => parse_field(desc), // parse from payload
353                },
354                (_, &Some(AdditionalColumnType::CollectionName(_))) => {
355                    // collection name for `mongodb-cdc` should be parsed from the message payload
356                    parse_field(desc)
357                }
358                (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
359                    self.row_meta
360                        .as_ref()
361                        .and_then(|ele| extract_subject_from_meta(ele.source_meta))
362                        .unwrap_or(None),
363                )),
364                (_, &Some(AdditionalColumnType::Partition(_))) => {
365                    // the meta info does not involve spec connector
366                    Ok(A::output_for(
367                        self.row_meta
368                            .as_ref()
369                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
370                    ))
371                }
372                (_, &Some(AdditionalColumnType::Offset(_))) => {
373                    // the meta info does not involve spec connector
374                    Ok(A::output_for(
375                        self.row_meta
376                            .as_ref()
377                            .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
378                    ))
379                }
380                (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
381                    Ok(A::output_for(
382                        self.row_meta
383                            .as_ref()
384                            .and_then(|ele| {
385                                extract_header_inner_from_meta(
386                                    ele.source_meta,
387                                    header_inner.inner_field.as_ref(),
388                                    header_inner.data_type.as_ref(),
389                                )
390                            })
391                            .unwrap_or(Datum::None.into()),
392                    ))
393                }
394                (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
395                    self.row_meta
396                        .as_ref()
397                        .and_then(|ele| extract_headers_from_meta(ele.source_meta))
398                        .unwrap_or(None),
399                )),
400                (_, &Some(AdditionalColumnType::Filename(_))) => {
401                    // Filename is used as partition in FS connectors
402                    Ok(A::output_for(
403                        self.row_meta
404                            .as_ref()
405                            .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
406                    ))
407                }
408                (_, &Some(AdditionalColumnType::Payload(_))) => {
409                    // ingest the whole payload as a single column
410                    // do special logic in `KvEvent::access_field`
411                    parse_field(desc)
412                }
413                (_, _) => {
414                    // For normal columns, call the user provided closure.
415                    parse_field(desc)
416                }
417            }
418        };
419
420        // Columns that changes have been applied to. Used to rollback when an error occurs.
421        let mut applied_columns = 0;
422
423        let result = (self.builder.column_descs.iter())
424            .zip_eq_fast(self.builder.builders.iter_mut())
425            .try_for_each(|(desc, builder)| {
426                wrapped_f(desc).map(|output| {
427                    A::apply(builder, output);
428                    applied_columns += 1;
429                })
430            });
431
432        match result {
433            Ok(_) => {
434                // commit the action by appending `Op`s and visibility
435                for op in A::RECORD_TYPE.ops() {
436                    self.builder.commit_record(*op, self.visible);
437                }
438
439                Ok(())
440            }
441            Err(e) => {
442                for i in 0..applied_columns {
443                    A::rollback(&mut self.builder.builders[i]);
444                }
445                Err(e)
446            }
447        }
448    }
449
450    /// Write an `Insert` record to the [`StreamChunk`], with the given fallible closure that
451    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
452    ///
453    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
454    #[inline(always)]
455    pub fn do_insert<'a, D>(
456        &mut self,
457        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
458    ) -> AccessResult<()>
459    where
460        D: Into<DatumCow<'a>>,
461    {
462        self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
463    }
464
465    /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that
466    /// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
467    ///
468    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
469    #[inline(always)]
470    pub fn do_delete<'a, D>(
471        &mut self,
472        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
473    ) -> AccessResult<()>
474    where
475        D: Into<DatumCow<'a>>,
476    {
477        self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
478    }
479
480    /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that
481    /// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
482    ///
483    /// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
484    #[inline(always)]
485    pub fn do_update<'a, D1, D2>(
486        &mut self,
487        mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
488    ) -> AccessResult<()>
489    where
490        D1: Into<DatumCow<'a>>,
491        D2: Into<DatumCow<'a>>,
492    {
493        self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
494    }
495}
496
497trait RowWriterAction {
498    type Output<'a>;
499    const RECORD_TYPE: RecordType;
500
501    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
502
503    fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
504
505    fn rollback(builder: &mut ArrayBuilderImpl);
506}
507
508struct InsertAction;
509
510impl RowWriterAction for InsertAction {
511    type Output<'a> = DatumCow<'a>;
512
513    const RECORD_TYPE: RecordType = RecordType::Insert;
514
515    #[inline(always)]
516    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
517        datum.into()
518    }
519
520    #[inline(always)]
521    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
522        builder.append(output)
523    }
524
525    #[inline(always)]
526    fn rollback(builder: &mut ArrayBuilderImpl) {
527        builder.pop().unwrap()
528    }
529}
530
531struct DeleteAction;
532
533impl RowWriterAction for DeleteAction {
534    type Output<'a> = DatumCow<'a>;
535
536    const RECORD_TYPE: RecordType = RecordType::Delete;
537
538    #[inline(always)]
539    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
540        datum.into()
541    }
542
543    #[inline(always)]
544    fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
545        builder.append(output)
546    }
547
548    #[inline(always)]
549    fn rollback(builder: &mut ArrayBuilderImpl) {
550        builder.pop().unwrap()
551    }
552}
553
554struct UpdateAction;
555
556impl RowWriterAction for UpdateAction {
557    type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
558
559    const RECORD_TYPE: RecordType = RecordType::Update;
560
561    #[inline(always)]
562    fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
563        let datum = datum.into();
564        (datum.clone(), datum)
565    }
566
567    #[inline(always)]
568    fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
569        builder.append(output.0);
570        builder.append(output.1);
571    }
572
573    #[inline(always)]
574    fn rollback(builder: &mut ArrayBuilderImpl) {
575        builder.pop().unwrap();
576        builder.pop().unwrap();
577    }
578}