1use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppressor;
21use risingwave_common::types::{
22 Datum, DatumCow, ScalarRefImpl, ToDatumRef, ToText, datum_ref_type_match,
23};
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_connector_codec::decoder::{AccessError, AccessResult};
26use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
27use smallvec::SmallVec;
28use thiserror_ext::AsReport;
29
30use super::MessageMeta;
31use crate::parser::utils::{
32 extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
33 extract_pulsar_message_id_data_from_meta, extract_subject_from_meta,
34 extract_timestamp_from_meta,
35};
36use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
37
38const MAX_TRANSACTION_SIZE: usize = 4096;
41
42struct Transaction {
44 id: Box<str>,
45 len: usize,
46}
47
48pub struct SourceStreamChunkBuilder {
56 column_descs: Vec<SourceColumnDesc>,
57 source_ctrl_opts: SourceCtrlOpts,
58 builders: Vec<ArrayBuilderImpl>,
59 op_builder: Vec<Op>,
60 vis_builder: BitmapBuilder,
61 ongoing_txn: Option<Transaction>,
62 ready_chunks: SmallVec<[StreamChunk; 1]>,
63}
64
65impl SourceStreamChunkBuilder {
66 pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
67 let (builders, op_builder, vis_builder) =
68 Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
69
70 Self {
71 column_descs,
72 source_ctrl_opts,
73 builders,
74 op_builder,
75 vis_builder,
76 ongoing_txn: None,
77 ready_chunks: SmallVec::new(),
78 }
79 }
80
81 fn create_builders(
82 column_descs: &[SourceColumnDesc],
83 chunk_size: usize,
84 ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
85 let reserved_capacity = chunk_size + 1; let builders = column_descs
87 .iter()
88 .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
89 .collect();
90 let op_builder = Vec::with_capacity(reserved_capacity);
91 let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
92 (builders, op_builder, vis_builder)
93 }
94
95 pub fn begin_transaction(&mut self, txn_id: Box<str>) {
97 if let Some(ref txn) = self.ongoing_txn {
98 tracing::warn!(
99 ongoing_txn_id = txn.id,
100 new_txn_id = txn_id,
101 "already in a transaction"
102 );
103 }
104 tracing::debug!(txn_id, "begin upstream transaction");
105 self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
106 }
107
108 pub fn commit_transaction(&mut self, txn_id: Box<str>) {
110 if let Some(txn) = self.ongoing_txn.take() {
111 if txn.id != txn_id {
112 tracing::warn!(
113 expected_txn_id = txn.id,
114 actual_txn_id = txn_id,
115 "unexpected transaction id"
116 );
117 }
118 tracing::debug!(txn_id, "commit upstream transaction");
119
120 if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
121 assert!(!self.source_ctrl_opts.split_txn);
123 self.finish_current_chunk();
124 }
125 } else {
126 tracing::warn!(txn_id, "no ongoing transaction to commit");
127 }
128 }
129
130 pub fn is_in_transaction(&self) -> bool {
132 self.ongoing_txn.is_some()
133 }
134
135 pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
137 SourceStreamChunkRowWriter {
138 builder: self,
139 visible: true, row_meta: None,
141 }
142 }
143
144 pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
147 if self.current_chunk_len() > 0 {
148 self.finish_current_chunk();
152 }
153
154 _ = self
155 .row_writer()
156 .invisible()
157 .with_meta(meta)
158 .do_insert(|_| Ok(Datum::None));
159 self.finish_current_chunk(); }
161
162 pub fn finish_current_chunk(&mut self) {
166 if self.op_builder.is_empty() {
167 return;
168 }
169
170 let (builders, op_builder, vis_builder) =
171 Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
172 let chunk = StreamChunk::with_visibility(
173 std::mem::replace(&mut self.op_builder, op_builder),
174 std::mem::replace(&mut self.builders, builders)
175 .into_iter()
176 .map(|builder| builder.finish().into())
177 .collect(),
178 std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
179 );
180 self.ready_chunks.push(chunk);
181
182 if let Some(ref mut txn) = self.ongoing_txn {
183 tracing::warn!(
184 txn_id = txn.id,
185 len = txn.len,
186 "splitting an ongoing transaction"
187 );
188 txn.len = 0;
189 }
190 }
191
192 pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
194 self.ready_chunks.drain(..)
195 }
196
197 fn current_chunk_len(&self) -> usize {
198 self.op_builder.len()
199 }
200
201 fn commit_record(&mut self, op: Op, vis: bool) {
204 self.op_builder.push(op);
205 self.vis_builder.append(vis);
206
207 let curr_chunk_size = self.current_chunk_len();
208 let max_chunk_size = self.source_ctrl_opts.chunk_size;
209
210 if let Some(ref mut txn) = self.ongoing_txn {
211 txn.len += 1;
212
213 if txn.len >= MAX_TRANSACTION_SIZE
214 || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
215 {
216 self.finish_current_chunk();
217 }
218 } else if curr_chunk_size >= max_chunk_size {
219 self.finish_current_chunk();
220 }
221 }
222}
223
224pub struct SourceStreamChunkRowWriter<'a> {
234 builder: &'a mut SourceStreamChunkBuilder,
235
236 visible: bool,
238
239 row_meta: Option<MessageMeta<'a>>,
243}
244
245impl<'a> SourceStreamChunkRowWriter<'a> {
246 pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
250 self.row_meta = Some(row_meta);
251 self
252 }
253
254 pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
255 self.row_meta
256 }
257
258 pub fn source_meta(&self) -> &'a SourceMeta {
259 self.row_meta
260 .map(|m| m.source_meta)
261 .unwrap_or(&SourceMeta::Empty)
262 }
263
264 pub fn invisible(mut self) -> Self {
266 self.visible = false;
267 self
268 }
269}
270
271impl SourceStreamChunkRowWriter<'_> {
272 fn do_action<'a, A: RowWriterAction>(
273 &'a mut self,
274 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
275 ) -> AccessResult<()> {
276 let mut parse_field = |desc: &SourceColumnDesc| {
277 match f(desc) {
278 Ok(output) => {
279 A::type_check(desc, &output)?;
281 Ok(output)
282 }
283
284 Err(e) if desc.is_pk => Err(e),
286 Err(error) => {
288 static LOG_SUPPRESSOR: LazyLock<LogSuppressor> =
293 LazyLock::new(LogSuppressor::default);
294 if let Ok(suppressed_count) = LOG_SUPPRESSOR.check() {
295 tracing::warn!(
296 error = %error.as_report(),
297 split_id = self.row_meta.as_ref().map(|m| m.split_id),
298 offset = self.row_meta.as_ref().map(|m| m.offset),
299 column = desc.name,
300 suppressed_count,
301 "failed to parse non-pk column, padding with `NULL`"
302 );
303 }
304 Ok(A::output_for(Datum::None))
305 }
306 }
307 };
308
309 let mut wrapped_f = |desc: &SourceColumnDesc| {
310 match (&desc.column_type, &desc.additional_column.column_type) {
311 (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
312 Ok(A::output_for(
314 self.row_meta
315 .as_ref()
316 .and_then(|row_meta| row_meta.value_for_column(desc)),
317 ))
318 }
319 (&SourceColumnType::Meta, _)
320 if matches!(
321 &self.row_meta.map(|ele| ele.source_meta),
322 &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
323 ) =>
324 {
325 Ok(A::output_for(
327 self.row_meta
328 .as_ref()
329 .and_then(|row_meta| row_meta.value_for_column(desc)),
330 ))
331 }
332
333 (
334 _, &Some(ref col @ AdditionalColumnType::DatabaseName(_))
336 | &Some(ref col @ AdditionalColumnType::TableName(_)),
337 ) => {
338 let is_mongodb_cdc = self.builder.column_descs.iter().any(|d| {
341 matches!(
342 d.additional_column.column_type,
343 Some(AdditionalColumnType::CollectionName(_))
344 )
345 });
346
347 if matches!(col, AdditionalColumnType::DatabaseName(_)) && is_mongodb_cdc {
348 parse_field(desc)
350 } else {
351 match self.row_meta {
352 Some(row_meta) => {
353 if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
354 Ok(A::output_for(extract_cdc_meta_column(
355 cdc_meta,
356 col,
357 desc.name.as_str(),
358 )?))
359 } else {
360 Err(AccessError::Uncategorized {
361 message: "CDC metadata not found in the message".to_owned(),
362 })
363 }
364 }
365 None => parse_field(desc), }
367 }
368 }
369 (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
370 Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
371 row_meta.source_meta,
372 ))),
373 None => parse_field(desc), },
375 (_, &Some(AdditionalColumnType::CollectionName(_))) => {
376 parse_field(desc)
378 }
379 (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
380 self.row_meta
381 .as_ref()
382 .and_then(|ele| extract_subject_from_meta(ele.source_meta))
383 .unwrap_or(None),
384 )),
385 (_, &Some(AdditionalColumnType::Partition(_))) => {
386 Ok(A::output_for(
388 self.row_meta
389 .as_ref()
390 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
391 ))
392 }
393 (_, &Some(AdditionalColumnType::Offset(_))) => {
394 Ok(A::output_for(
396 self.row_meta
397 .as_ref()
398 .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
399 ))
400 }
401 (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
402 Ok(A::output_for(
403 self.row_meta
404 .as_ref()
405 .and_then(|ele| {
406 extract_header_inner_from_meta(
407 ele.source_meta,
408 header_inner.inner_field.as_ref(),
409 header_inner.data_type.as_ref(),
410 )
411 })
412 .unwrap_or(Datum::None.into()),
413 ))
414 }
415 (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
416 self.row_meta
417 .as_ref()
418 .and_then(|ele| extract_headers_from_meta(ele.source_meta))
419 .unwrap_or(None),
420 )),
421 (_, &Some(AdditionalColumnType::PulsarMessageIdData(_))) => {
422 Ok(A::output_for(
424 self.row_meta
425 .as_ref()
426 .and_then(|ele| {
427 extract_pulsar_message_id_data_from_meta(ele.source_meta)
428 })
429 .unwrap_or(None),
430 ))
431 }
432 (_, &Some(AdditionalColumnType::Filename(_))) => {
433 Ok(A::output_for(
435 self.row_meta
436 .as_ref()
437 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
438 ))
439 }
440 (_, &Some(AdditionalColumnType::Payload(_))) => {
441 parse_field(desc)
444 }
445 (_, _) => {
446 parse_field(desc)
448 }
449 }
450 };
451
452 let mut applied_columns = 0;
454
455 let result = (self.builder.column_descs.iter())
456 .zip_eq_fast(self.builder.builders.iter_mut())
457 .try_for_each(|(desc, builder)| {
458 wrapped_f(desc).map(|output| {
459 A::apply(builder, output);
460 applied_columns += 1;
461 })
462 });
463
464 match result {
465 Ok(_) => {
466 for op in A::RECORD_TYPE.ops() {
468 self.builder.commit_record(*op, self.visible);
469 }
470
471 Ok(())
472 }
473 Err(e) => {
474 for i in 0..applied_columns {
475 A::rollback(&mut self.builder.builders[i]);
476 }
477 Err(e)
478 }
479 }
480 }
481
482 #[inline(always)]
487 pub fn do_insert<'a, D>(
488 &mut self,
489 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
490 ) -> AccessResult<()>
491 where
492 D: Into<DatumCow<'a>>,
493 {
494 self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
495 }
496
497 #[inline(always)]
502 pub fn do_delete<'a, D>(
503 &mut self,
504 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
505 ) -> AccessResult<()>
506 where
507 D: Into<DatumCow<'a>>,
508 {
509 self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
510 }
511
512 #[inline(always)]
517 pub fn do_update<'a, D1, D2>(
518 &mut self,
519 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
520 ) -> AccessResult<()>
521 where
522 D1: Into<DatumCow<'a>>,
523 D2: Into<DatumCow<'a>>,
524 {
525 self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
526 }
527}
528
529trait RowWriterAction {
530 type Output<'a>;
531 const RECORD_TYPE: RecordType;
532
533 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
534
535 fn type_check(desc: &SourceColumnDesc, output: &Self::Output<'_>) -> AccessResult<()>;
536
537 fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
538
539 fn rollback(builder: &mut ArrayBuilderImpl);
540}
541
542#[inline(always)]
543fn check_datum_type(
544 desc: &SourceColumnDesc,
545 datum: risingwave_common::types::DatumRef<'_>,
546) -> AccessResult<()> {
547 if datum_ref_type_match(&desc.data_type, datum) {
548 Ok(())
549 } else {
550 let (got, value) = match datum {
551 None => unreachable!("datum_ref_type_match returned false for NULL"),
552 Some(s) => (s.get_ident().to_owned(), s.to_text()),
554 };
555 Err(AccessError::TypeError {
556 expected: desc.data_type.to_string(),
557 got,
558 value,
559 })
560 }
561}
562
563struct InsertAction;
564
565impl RowWriterAction for InsertAction {
566 type Output<'a> = DatumCow<'a>;
567
568 const RECORD_TYPE: RecordType = RecordType::Insert;
569
570 #[inline(always)]
571 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
572 datum.into()
573 }
574
575 #[inline(always)]
576 fn type_check(desc: &SourceColumnDesc, output: &DatumCow<'_>) -> AccessResult<()> {
577 check_datum_type(desc, output.to_datum_ref())
578 }
579
580 #[inline(always)]
581 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
582 builder.append(output)
583 }
584
585 #[inline(always)]
586 fn rollback(builder: &mut ArrayBuilderImpl) {
587 builder.pop().unwrap()
588 }
589}
590
591struct DeleteAction;
592
593impl RowWriterAction for DeleteAction {
594 type Output<'a> = DatumCow<'a>;
595
596 const RECORD_TYPE: RecordType = RecordType::Delete;
597
598 #[inline(always)]
599 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
600 datum.into()
601 }
602
603 #[inline(always)]
604 fn type_check(desc: &SourceColumnDesc, output: &DatumCow<'_>) -> AccessResult<()> {
605 check_datum_type(desc, output.to_datum_ref())
606 }
607
608 #[inline(always)]
609 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
610 builder.append(output)
611 }
612
613 #[inline(always)]
614 fn rollback(builder: &mut ArrayBuilderImpl) {
615 builder.pop().unwrap()
616 }
617}
618
619struct UpdateAction;
620
621impl RowWriterAction for UpdateAction {
622 type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
623
624 const RECORD_TYPE: RecordType = RecordType::Update;
625
626 #[inline(always)]
627 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
628 let datum = datum.into();
629 (datum.clone(), datum)
630 }
631
632 #[inline(always)]
633 fn type_check(
634 desc: &SourceColumnDesc,
635 output: &(DatumCow<'_>, DatumCow<'_>),
636 ) -> AccessResult<()> {
637 check_datum_type(desc, output.0.to_datum_ref())?;
638 check_datum_type(desc, output.1.to_datum_ref())
639 }
640
641 #[inline(always)]
642 fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
643 builder.append(output.0);
644 builder.append(output.1);
645 }
646
647 #[inline(always)]
648 fn rollback(builder: &mut ArrayBuilderImpl) {
649 builder.pop().unwrap();
650 builder.pop().unwrap();
651 }
652}