risingwave_connector/parser/
chunk_builder.rs1use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl};
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_connector_codec::decoder::{AccessError, AccessResult};
24use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
25use smallvec::SmallVec;
26use thiserror_ext::AsReport;
27
28use super::MessageMeta;
29use crate::parser::utils::{
30 extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
31 extract_pulsar_message_id_data_from_meta, extract_subject_from_meta,
32 extract_timestamp_from_meta,
33};
34use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
35
36const MAX_TRANSACTION_SIZE: usize = 4096;
39
40struct Transaction {
42 id: Box<str>,
43 len: usize,
44}
45
46pub struct SourceStreamChunkBuilder {
54 column_descs: Vec<SourceColumnDesc>,
55 source_ctrl_opts: SourceCtrlOpts,
56 builders: Vec<ArrayBuilderImpl>,
57 op_builder: Vec<Op>,
58 vis_builder: BitmapBuilder,
59 ongoing_txn: Option<Transaction>,
60 ready_chunks: SmallVec<[StreamChunk; 1]>,
61}
62
63impl SourceStreamChunkBuilder {
64 pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
65 let (builders, op_builder, vis_builder) =
66 Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
67
68 Self {
69 column_descs,
70 source_ctrl_opts,
71 builders,
72 op_builder,
73 vis_builder,
74 ongoing_txn: None,
75 ready_chunks: SmallVec::new(),
76 }
77 }
78
79 fn create_builders(
80 column_descs: &[SourceColumnDesc],
81 chunk_size: usize,
82 ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
83 let reserved_capacity = chunk_size + 1; let builders = column_descs
85 .iter()
86 .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
87 .collect();
88 let op_builder = Vec::with_capacity(reserved_capacity);
89 let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
90 (builders, op_builder, vis_builder)
91 }
92
93 pub fn begin_transaction(&mut self, txn_id: Box<str>) {
95 if let Some(ref txn) = self.ongoing_txn {
96 tracing::warn!(
97 ongoing_txn_id = txn.id,
98 new_txn_id = txn_id,
99 "already in a transaction"
100 );
101 }
102 tracing::debug!(txn_id, "begin upstream transaction");
103 self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
104 }
105
106 pub fn commit_transaction(&mut self, txn_id: Box<str>) {
108 if let Some(txn) = self.ongoing_txn.take() {
109 if txn.id != txn_id {
110 tracing::warn!(
111 expected_txn_id = txn.id,
112 actual_txn_id = txn_id,
113 "unexpected transaction id"
114 );
115 }
116 tracing::debug!(txn_id, "commit upstream transaction");
117
118 if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
119 assert!(!self.source_ctrl_opts.split_txn);
121 self.finish_current_chunk();
122 }
123 } else {
124 tracing::warn!(txn_id, "no ongoing transaction to commit");
125 }
126 }
127
128 pub fn is_in_transaction(&self) -> bool {
130 self.ongoing_txn.is_some()
131 }
132
133 pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
135 SourceStreamChunkRowWriter {
136 builder: self,
137 visible: true, row_meta: None,
139 }
140 }
141
142 pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
145 if self.current_chunk_len() > 0 {
146 self.finish_current_chunk();
150 }
151
152 _ = self
153 .row_writer()
154 .invisible()
155 .with_meta(meta)
156 .do_insert(|_| Ok(Datum::None));
157 self.finish_current_chunk(); }
159
160 pub fn finish_current_chunk(&mut self) {
164 if self.op_builder.is_empty() {
165 return;
166 }
167
168 let (builders, op_builder, vis_builder) =
169 Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
170 let chunk = StreamChunk::with_visibility(
171 std::mem::replace(&mut self.op_builder, op_builder),
172 std::mem::replace(&mut self.builders, builders)
173 .into_iter()
174 .map(|builder| builder.finish().into())
175 .collect(),
176 std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
177 );
178 self.ready_chunks.push(chunk);
179
180 if let Some(ref mut txn) = self.ongoing_txn {
181 tracing::warn!(
182 txn_id = txn.id,
183 len = txn.len,
184 "splitting an ongoing transaction"
185 );
186 txn.len = 0;
187 }
188 }
189
190 pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
192 self.ready_chunks.drain(..)
193 }
194
195 fn current_chunk_len(&self) -> usize {
196 self.op_builder.len()
197 }
198
199 fn commit_record(&mut self, op: Op, vis: bool) {
202 self.op_builder.push(op);
203 self.vis_builder.append(vis);
204
205 let curr_chunk_size = self.current_chunk_len();
206 let max_chunk_size = self.source_ctrl_opts.chunk_size;
207
208 if let Some(ref mut txn) = self.ongoing_txn {
209 txn.len += 1;
210
211 if txn.len >= MAX_TRANSACTION_SIZE
212 || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
213 {
214 self.finish_current_chunk();
215 }
216 } else if curr_chunk_size >= max_chunk_size {
217 self.finish_current_chunk();
218 }
219 }
220}
221
222pub struct SourceStreamChunkRowWriter<'a> {
232 builder: &'a mut SourceStreamChunkBuilder,
233
234 visible: bool,
236
237 row_meta: Option<MessageMeta<'a>>,
241}
242
243impl<'a> SourceStreamChunkRowWriter<'a> {
244 pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
248 self.row_meta = Some(row_meta);
249 self
250 }
251
252 pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
253 self.row_meta
254 }
255
256 pub fn source_meta(&self) -> &'a SourceMeta {
257 self.row_meta
258 .map(|m| m.source_meta)
259 .unwrap_or(&SourceMeta::Empty)
260 }
261
262 pub fn invisible(mut self) -> Self {
264 self.visible = false;
265 self
266 }
267}
268
269impl SourceStreamChunkRowWriter<'_> {
270 fn do_action<'a, A: RowWriterAction>(
271 &'a mut self,
272 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
273 ) -> AccessResult<()> {
274 let mut parse_field = |desc: &SourceColumnDesc| {
275 match f(desc) {
276 Ok(output) => Ok(output),
277
278 Err(e) if desc.is_pk => Err(e),
280 Err(error) => {
282 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
287 LazyLock::new(LogSuppressor::default);
288 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
289 tracing::warn!(
290 error = %error.as_report(),
291 split_id = self.row_meta.as_ref().map(|m| m.split_id),
292 offset = self.row_meta.as_ref().map(|m| m.offset),
293 column = desc.name,
294 suppressed_count,
295 "failed to parse non-pk column, padding with `NULL`"
296 );
297 }
298 Ok(A::output_for(Datum::None))
299 }
300 }
301 };
302
303 let mut wrapped_f = |desc: &SourceColumnDesc| {
304 match (&desc.column_type, &desc.additional_column.column_type) {
305 (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
306 Ok(A::output_for(
308 self.row_meta
309 .as_ref()
310 .and_then(|row_meta| row_meta.value_for_column(desc)),
311 ))
312 }
313 (&SourceColumnType::Meta, _)
314 if matches!(
315 &self.row_meta.map(|ele| ele.source_meta),
316 &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
317 ) =>
318 {
319 Ok(A::output_for(
321 self.row_meta
322 .as_ref()
323 .and_then(|row_meta| row_meta.value_for_column(desc)),
324 ))
325 }
326
327 (
328 _, &Some(ref col @ AdditionalColumnType::DatabaseName(_))
330 | &Some(ref col @ AdditionalColumnType::TableName(_)),
331 ) => {
332 match self.row_meta {
333 Some(row_meta) => {
334 if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
335 Ok(A::output_for(extract_cdc_meta_column(
336 cdc_meta,
337 col,
338 desc.name.as_str(),
339 )?))
340 } else {
341 Err(AccessError::Uncategorized {
342 message: "CDC metadata not found in the message".to_owned(),
343 })
344 }
345 }
346 None => parse_field(desc), }
348 }
349 (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
350 Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
351 row_meta.source_meta,
352 ))),
353 None => parse_field(desc), },
355 (_, &Some(AdditionalColumnType::CollectionName(_))) => {
356 parse_field(desc)
358 }
359 (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
360 self.row_meta
361 .as_ref()
362 .and_then(|ele| extract_subject_from_meta(ele.source_meta))
363 .unwrap_or(None),
364 )),
365 (_, &Some(AdditionalColumnType::Partition(_))) => {
366 Ok(A::output_for(
368 self.row_meta
369 .as_ref()
370 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
371 ))
372 }
373 (_, &Some(AdditionalColumnType::Offset(_))) => {
374 Ok(A::output_for(
376 self.row_meta
377 .as_ref()
378 .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
379 ))
380 }
381 (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
382 Ok(A::output_for(
383 self.row_meta
384 .as_ref()
385 .and_then(|ele| {
386 extract_header_inner_from_meta(
387 ele.source_meta,
388 header_inner.inner_field.as_ref(),
389 header_inner.data_type.as_ref(),
390 )
391 })
392 .unwrap_or(Datum::None.into()),
393 ))
394 }
395 (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
396 self.row_meta
397 .as_ref()
398 .and_then(|ele| extract_headers_from_meta(ele.source_meta))
399 .unwrap_or(None),
400 )),
401 (_, &Some(AdditionalColumnType::PulsarMessageIdData(_))) => {
402 Ok(A::output_for(
404 self.row_meta
405 .as_ref()
406 .and_then(|ele| {
407 extract_pulsar_message_id_data_from_meta(ele.source_meta)
408 })
409 .unwrap_or(None),
410 ))
411 }
412 (_, &Some(AdditionalColumnType::Filename(_))) => {
413 Ok(A::output_for(
415 self.row_meta
416 .as_ref()
417 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
418 ))
419 }
420 (_, &Some(AdditionalColumnType::Payload(_))) => {
421 parse_field(desc)
424 }
425 (_, _) => {
426 parse_field(desc)
428 }
429 }
430 };
431
432 let mut applied_columns = 0;
434
435 let result = (self.builder.column_descs.iter())
436 .zip_eq_fast(self.builder.builders.iter_mut())
437 .try_for_each(|(desc, builder)| {
438 wrapped_f(desc).map(|output| {
439 A::apply(builder, output);
440 applied_columns += 1;
441 })
442 });
443
444 match result {
445 Ok(_) => {
446 for op in A::RECORD_TYPE.ops() {
448 self.builder.commit_record(*op, self.visible);
449 }
450
451 Ok(())
452 }
453 Err(e) => {
454 for i in 0..applied_columns {
455 A::rollback(&mut self.builder.builders[i]);
456 }
457 Err(e)
458 }
459 }
460 }
461
462 #[inline(always)]
467 pub fn do_insert<'a, D>(
468 &mut self,
469 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
470 ) -> AccessResult<()>
471 where
472 D: Into<DatumCow<'a>>,
473 {
474 self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
475 }
476
477 #[inline(always)]
482 pub fn do_delete<'a, D>(
483 &mut self,
484 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
485 ) -> AccessResult<()>
486 where
487 D: Into<DatumCow<'a>>,
488 {
489 self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
490 }
491
492 #[inline(always)]
497 pub fn do_update<'a, D1, D2>(
498 &mut self,
499 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
500 ) -> AccessResult<()>
501 where
502 D1: Into<DatumCow<'a>>,
503 D2: Into<DatumCow<'a>>,
504 {
505 self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
506 }
507}
508
509trait RowWriterAction {
510 type Output<'a>;
511 const RECORD_TYPE: RecordType;
512
513 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
514
515 fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
516
517 fn rollback(builder: &mut ArrayBuilderImpl);
518}
519
520struct InsertAction;
521
522impl RowWriterAction for InsertAction {
523 type Output<'a> = DatumCow<'a>;
524
525 const RECORD_TYPE: RecordType = RecordType::Insert;
526
527 #[inline(always)]
528 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
529 datum.into()
530 }
531
532 #[inline(always)]
533 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
534 builder.append(output)
535 }
536
537 #[inline(always)]
538 fn rollback(builder: &mut ArrayBuilderImpl) {
539 builder.pop().unwrap()
540 }
541}
542
543struct DeleteAction;
544
545impl RowWriterAction for DeleteAction {
546 type Output<'a> = DatumCow<'a>;
547
548 const RECORD_TYPE: RecordType = RecordType::Delete;
549
550 #[inline(always)]
551 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
552 datum.into()
553 }
554
555 #[inline(always)]
556 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
557 builder.append(output)
558 }
559
560 #[inline(always)]
561 fn rollback(builder: &mut ArrayBuilderImpl) {
562 builder.pop().unwrap()
563 }
564}
565
566struct UpdateAction;
567
568impl RowWriterAction for UpdateAction {
569 type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
570
571 const RECORD_TYPE: RecordType = RecordType::Update;
572
573 #[inline(always)]
574 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
575 let datum = datum.into();
576 (datum.clone(), datum)
577 }
578
579 #[inline(always)]
580 fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
581 builder.append(output.0);
582 builder.append(output.1);
583 }
584
585 #[inline(always)]
586 fn rollback(builder: &mut ArrayBuilderImpl) {
587 builder.pop().unwrap();
588 builder.pop().unwrap();
589 }
590}