risingwave_connector/parser/
chunk_builder.rs1use std::sync::LazyLock;
16
17use risingwave_common::array::stream_record::RecordType;
18use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk};
19use risingwave_common::bitmap::BitmapBuilder;
20use risingwave_common::log::LogSuppresser;
21use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl};
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_connector_codec::decoder::{AccessError, AccessResult};
24use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType;
25use smallvec::SmallVec;
26use thiserror_ext::AsReport;
27
28use super::MessageMeta;
29use crate::parser::utils::{
30 extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta,
31 extract_subject_from_meta, extract_timestamp_from_meta,
32};
33use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta};
34
35const MAX_TRANSACTION_SIZE: usize = 4096;
38
39struct Transaction {
41 id: Box<str>,
42 len: usize,
43}
44
45pub struct SourceStreamChunkBuilder {
53 column_descs: Vec<SourceColumnDesc>,
54 source_ctrl_opts: SourceCtrlOpts,
55 builders: Vec<ArrayBuilderImpl>,
56 op_builder: Vec<Op>,
57 vis_builder: BitmapBuilder,
58 ongoing_txn: Option<Transaction>,
59 ready_chunks: SmallVec<[StreamChunk; 1]>,
60}
61
62impl SourceStreamChunkBuilder {
63 pub fn new(column_descs: Vec<SourceColumnDesc>, source_ctrl_opts: SourceCtrlOpts) -> Self {
64 let (builders, op_builder, vis_builder) =
65 Self::create_builders(&column_descs, source_ctrl_opts.chunk_size);
66
67 Self {
68 column_descs,
69 source_ctrl_opts,
70 builders,
71 op_builder,
72 vis_builder,
73 ongoing_txn: None,
74 ready_chunks: SmallVec::new(),
75 }
76 }
77
78 fn create_builders(
79 column_descs: &[SourceColumnDesc],
80 chunk_size: usize,
81 ) -> (Vec<ArrayBuilderImpl>, Vec<Op>, BitmapBuilder) {
82 let reserved_capacity = chunk_size + 1; let builders = column_descs
84 .iter()
85 .map(|desc| desc.data_type.create_array_builder(reserved_capacity))
86 .collect();
87 let op_builder = Vec::with_capacity(reserved_capacity);
88 let vis_builder = BitmapBuilder::with_capacity(reserved_capacity);
89 (builders, op_builder, vis_builder)
90 }
91
92 pub fn begin_transaction(&mut self, txn_id: Box<str>) {
94 if let Some(ref txn) = self.ongoing_txn {
95 tracing::warn!(
96 ongoing_txn_id = txn.id,
97 new_txn_id = txn_id,
98 "already in a transaction"
99 );
100 }
101 tracing::debug!(txn_id, "begin upstream transaction");
102 self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 });
103 }
104
105 pub fn commit_transaction(&mut self, txn_id: Box<str>) {
107 if let Some(txn) = self.ongoing_txn.take() {
108 if txn.id != txn_id {
109 tracing::warn!(
110 expected_txn_id = txn.id,
111 actual_txn_id = txn_id,
112 "unexpected transaction id"
113 );
114 }
115 tracing::debug!(txn_id, "commit upstream transaction");
116
117 if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size {
118 assert!(!self.source_ctrl_opts.split_txn);
120 self.finish_current_chunk();
121 }
122 } else {
123 tracing::warn!(txn_id, "no ongoing transaction to commit");
124 }
125 }
126
127 pub fn is_in_transaction(&self) -> bool {
129 self.ongoing_txn.is_some()
130 }
131
132 pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> {
134 SourceStreamChunkRowWriter {
135 builder: self,
136 visible: true, row_meta: None,
138 }
139 }
140
141 pub fn heartbeat(&mut self, meta: MessageMeta<'_>) {
144 if self.current_chunk_len() > 0 {
145 self.finish_current_chunk();
149 }
150
151 _ = self
152 .row_writer()
153 .invisible()
154 .with_meta(meta)
155 .do_insert(|_| Ok(Datum::None));
156 self.finish_current_chunk(); }
158
159 pub fn finish_current_chunk(&mut self) {
163 if self.op_builder.is_empty() {
164 return;
165 }
166
167 let (builders, op_builder, vis_builder) =
168 Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size);
169 let chunk = StreamChunk::with_visibility(
170 std::mem::replace(&mut self.op_builder, op_builder),
171 std::mem::replace(&mut self.builders, builders)
172 .into_iter()
173 .map(|builder| builder.finish().into())
174 .collect(),
175 std::mem::replace(&mut self.vis_builder, vis_builder).finish(),
176 );
177 self.ready_chunks.push(chunk);
178
179 if let Some(ref mut txn) = self.ongoing_txn {
180 tracing::warn!(
181 txn_id = txn.id,
182 len = txn.len,
183 "splitting an ongoing transaction"
184 );
185 txn.len = 0;
186 }
187 }
188
189 pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ {
191 self.ready_chunks.drain(..)
192 }
193
194 fn current_chunk_len(&self) -> usize {
195 self.op_builder.len()
196 }
197
198 fn commit_record(&mut self, op: Op, vis: bool) {
201 self.op_builder.push(op);
202 self.vis_builder.append(vis);
203
204 let curr_chunk_size = self.current_chunk_len();
205 let max_chunk_size = self.source_ctrl_opts.chunk_size;
206
207 if let Some(ref mut txn) = self.ongoing_txn {
208 txn.len += 1;
209
210 if txn.len >= MAX_TRANSACTION_SIZE
211 || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size)
212 {
213 self.finish_current_chunk();
214 }
215 } else if curr_chunk_size >= max_chunk_size {
216 self.finish_current_chunk();
217 }
218 }
219}
220
221pub struct SourceStreamChunkRowWriter<'a> {
231 builder: &'a mut SourceStreamChunkBuilder,
232
233 visible: bool,
235
236 row_meta: Option<MessageMeta<'a>>,
240}
241
242impl<'a> SourceStreamChunkRowWriter<'a> {
243 pub fn with_meta(mut self, row_meta: MessageMeta<'a>) -> Self {
247 self.row_meta = Some(row_meta);
248 self
249 }
250
251 pub fn row_meta(&self) -> Option<MessageMeta<'a>> {
252 self.row_meta
253 }
254
255 pub fn source_meta(&self) -> &'a SourceMeta {
256 self.row_meta
257 .map(|m| m.source_meta)
258 .unwrap_or(&SourceMeta::Empty)
259 }
260
261 pub fn invisible(mut self) -> Self {
263 self.visible = false;
264 self
265 }
266}
267
268impl SourceStreamChunkRowWriter<'_> {
269 fn do_action<'a, A: RowWriterAction>(
270 &'a mut self,
271 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<A::Output<'a>>,
272 ) -> AccessResult<()> {
273 let mut parse_field = |desc: &SourceColumnDesc| {
274 match f(desc) {
275 Ok(output) => Ok(output),
276
277 Err(e) if desc.is_pk => Err(e),
279 Err(error) => {
281 static LOG_SUPPERSSER: LazyLock<LogSuppresser> =
286 LazyLock::new(LogSuppresser::default);
287 if let Ok(suppressed_count) = LOG_SUPPERSSER.check() {
288 tracing::warn!(
289 error = %error.as_report(),
290 split_id = self.row_meta.as_ref().map(|m| m.split_id),
291 offset = self.row_meta.as_ref().map(|m| m.offset),
292 column = desc.name,
293 suppressed_count,
294 "failed to parse non-pk column, padding with `NULL`"
295 );
296 }
297 Ok(A::output_for(Datum::None))
298 }
299 }
300 };
301
302 let mut wrapped_f = |desc: &SourceColumnDesc| {
303 match (&desc.column_type, &desc.additional_column.column_type) {
304 (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => {
305 Ok(A::output_for(
307 self.row_meta
308 .as_ref()
309 .and_then(|row_meta| row_meta.value_for_column(desc)),
310 ))
311 }
312 (&SourceColumnType::Meta, _)
313 if matches!(
314 &self.row_meta.map(|ele| ele.source_meta),
315 &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_))
316 ) =>
317 {
318 Ok(A::output_for(
320 self.row_meta
321 .as_ref()
322 .and_then(|row_meta| row_meta.value_for_column(desc)),
323 ))
324 }
325
326 (
327 _, &Some(ref col @ AdditionalColumnType::DatabaseName(_))
329 | &Some(ref col @ AdditionalColumnType::TableName(_)),
330 ) => {
331 match self.row_meta {
332 Some(row_meta) => {
333 if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.source_meta {
334 Ok(A::output_for(extract_cdc_meta_column(
335 cdc_meta,
336 col,
337 desc.name.as_str(),
338 )?))
339 } else {
340 Err(AccessError::Uncategorized {
341 message: "CDC metadata not found in the message".to_owned(),
342 })
343 }
344 }
345 None => parse_field(desc), }
347 }
348 (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta {
349 Some(row_meta) => Ok(A::output_for(extract_timestamp_from_meta(
350 row_meta.source_meta,
351 ))),
352 None => parse_field(desc), },
354 (_, &Some(AdditionalColumnType::CollectionName(_))) => {
355 parse_field(desc)
357 }
358 (_, &Some(AdditionalColumnType::Subject(_))) => Ok(A::output_for(
359 self.row_meta
360 .as_ref()
361 .and_then(|ele| extract_subject_from_meta(ele.source_meta))
362 .unwrap_or(None),
363 )),
364 (_, &Some(AdditionalColumnType::Partition(_))) => {
365 Ok(A::output_for(
367 self.row_meta
368 .as_ref()
369 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
370 ))
371 }
372 (_, &Some(AdditionalColumnType::Offset(_))) => {
373 Ok(A::output_for(
375 self.row_meta
376 .as_ref()
377 .map(|ele| ScalarRefImpl::Utf8(ele.offset)),
378 ))
379 }
380 (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => {
381 Ok(A::output_for(
382 self.row_meta
383 .as_ref()
384 .and_then(|ele| {
385 extract_header_inner_from_meta(
386 ele.source_meta,
387 header_inner.inner_field.as_ref(),
388 header_inner.data_type.as_ref(),
389 )
390 })
391 .unwrap_or(Datum::None.into()),
392 ))
393 }
394 (_, &Some(AdditionalColumnType::Headers(_))) => Ok(A::output_for(
395 self.row_meta
396 .as_ref()
397 .and_then(|ele| extract_headers_from_meta(ele.source_meta))
398 .unwrap_or(None),
399 )),
400 (_, &Some(AdditionalColumnType::Filename(_))) => {
401 Ok(A::output_for(
403 self.row_meta
404 .as_ref()
405 .map(|ele| ScalarRefImpl::Utf8(ele.split_id)),
406 ))
407 }
408 (_, &Some(AdditionalColumnType::Payload(_))) => {
409 parse_field(desc)
412 }
413 (_, _) => {
414 parse_field(desc)
416 }
417 }
418 };
419
420 let mut applied_columns = 0;
422
423 let result = (self.builder.column_descs.iter())
424 .zip_eq_fast(self.builder.builders.iter_mut())
425 .try_for_each(|(desc, builder)| {
426 wrapped_f(desc).map(|output| {
427 A::apply(builder, output);
428 applied_columns += 1;
429 })
430 });
431
432 match result {
433 Ok(_) => {
434 for op in A::RECORD_TYPE.ops() {
436 self.builder.commit_record(*op, self.visible);
437 }
438
439 Ok(())
440 }
441 Err(e) => {
442 for i in 0..applied_columns {
443 A::rollback(&mut self.builder.builders[i]);
444 }
445 Err(e)
446 }
447 }
448 }
449
450 #[inline(always)]
455 pub fn do_insert<'a, D>(
456 &mut self,
457 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
458 ) -> AccessResult<()>
459 where
460 D: Into<DatumCow<'a>>,
461 {
462 self.do_action::<InsertAction>(|desc| f(desc).map(Into::into))
463 }
464
465 #[inline(always)]
470 pub fn do_delete<'a, D>(
471 &mut self,
472 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<D>,
473 ) -> AccessResult<()>
474 where
475 D: Into<DatumCow<'a>>,
476 {
477 self.do_action::<DeleteAction>(|desc| f(desc).map(Into::into))
478 }
479
480 #[inline(always)]
485 pub fn do_update<'a, D1, D2>(
486 &mut self,
487 mut f: impl FnMut(&SourceColumnDesc) -> AccessResult<(D1, D2)>,
488 ) -> AccessResult<()>
489 where
490 D1: Into<DatumCow<'a>>,
491 D2: Into<DatumCow<'a>>,
492 {
493 self.do_action::<UpdateAction>(|desc| f(desc).map(|(old, new)| (old.into(), new.into())))
494 }
495}
496
497trait RowWriterAction {
498 type Output<'a>;
499 const RECORD_TYPE: RecordType;
500
501 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a>;
502
503 fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>);
504
505 fn rollback(builder: &mut ArrayBuilderImpl);
506}
507
508struct InsertAction;
509
510impl RowWriterAction for InsertAction {
511 type Output<'a> = DatumCow<'a>;
512
513 const RECORD_TYPE: RecordType = RecordType::Insert;
514
515 #[inline(always)]
516 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
517 datum.into()
518 }
519
520 #[inline(always)]
521 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
522 builder.append(output)
523 }
524
525 #[inline(always)]
526 fn rollback(builder: &mut ArrayBuilderImpl) {
527 builder.pop().unwrap()
528 }
529}
530
531struct DeleteAction;
532
533impl RowWriterAction for DeleteAction {
534 type Output<'a> = DatumCow<'a>;
535
536 const RECORD_TYPE: RecordType = RecordType::Delete;
537
538 #[inline(always)]
539 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
540 datum.into()
541 }
542
543 #[inline(always)]
544 fn apply(builder: &mut ArrayBuilderImpl, output: DatumCow<'_>) {
545 builder.append(output)
546 }
547
548 #[inline(always)]
549 fn rollback(builder: &mut ArrayBuilderImpl) {
550 builder.pop().unwrap()
551 }
552}
553
554struct UpdateAction;
555
556impl RowWriterAction for UpdateAction {
557 type Output<'a> = (DatumCow<'a>, DatumCow<'a>);
558
559 const RECORD_TYPE: RecordType = RecordType::Update;
560
561 #[inline(always)]
562 fn output_for<'a>(datum: impl Into<DatumCow<'a>>) -> Self::Output<'a> {
563 let datum = datum.into();
564 (datum.clone(), datum)
565 }
566
567 #[inline(always)]
568 fn apply(builder: &mut ArrayBuilderImpl, output: (DatumCow<'_>, DatumCow<'_>)) {
569 builder.append(output.0);
570 builder.append(output.1);
571 }
572
573 #[inline(always)]
574 fn rollback(builder: &mut ArrayBuilderImpl) {
575 builder.pop().unwrap();
576 builder.pop().unwrap();
577 }
578}