risingwave_stream/executor/iceberg_with_pk_index/
writer.rs1use anyhow::Context;
16use iceberg::writer::PositionDeleteInput;
17use risingwave_common::array::DataChunk;
18use risingwave_common::array::stream_record::Record;
19use risingwave_common::row::{Project, RowExt};
20use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_pb::connector_service::SinkMetadata;
23use risingwave_storage::StateStore;
24
25use crate::common::change_buffer::output_kind;
26use crate::common::compact_chunk::{InconsistencyBehavior, compact_chunk_inline};
27use crate::executor::prelude::*;
28
29pub type IcebergWriterFlushOutput = SinkMetadata;
30type PkRow<'a> = Project<'a, RowRef<'a>>;
31
32fn new_chunk_builder(chunk_size: usize) -> DataChunkBuilder {
33 DataChunkBuilder::new(vec![DataType::Varchar, DataType::Int64], chunk_size)
34}
35
36fn append_row(builder: &mut DataChunkBuilder, file_path: &str, position: i64) -> Option<DataChunk> {
37 builder.append_one_row([
38 Some(ScalarRefImpl::Utf8(file_path)),
39 Some(ScalarRefImpl::Int64(position)),
40 ])
41}
42
43#[async_trait::async_trait]
48pub trait IcebergWriter: Send + 'static {
49 async fn write_chunk(
51 &mut self,
52 chunk: DataChunk,
53 ) -> StreamExecutorResult<Vec<PositionDeleteInput>>;
54
55 async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>>;
58}
59
60pub struct WriterExecutor<S, W>
74where
75 S: StateStore,
76 W: IcebergWriter,
77{
78 ctx: ActorContextRef,
79 input: Option<Executor>,
80 pk_indices: Vec<usize>,
82 pk_index_state_table: StateTable<S>,
85 writer: W,
87 delete_position_buffer: Option<DataChunkBuilder>,
89 chunk_size: usize,
90 pk_matched: bool,
91}
92
93impl<S, W> WriterExecutor<S, W>
94where
95 S: StateStore,
96 W: IcebergWriter,
97{
98 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 ctx: ActorContextRef,
101 input: Executor,
102 pk_indices: Vec<usize>,
103 pk_index_state_table: StateTable<S>,
104 writer: W,
105 chunk_size: usize,
106 pk_matched: bool,
107 ) -> Self {
108 Self {
109 ctx,
110 input: Some(input),
111 pk_indices,
112 pk_index_state_table,
113 writer,
114 delete_position_buffer: None,
115 chunk_size,
116 pk_matched,
117 }
118 }
119
120 async fn delete_existing_row(
121 &mut self,
122 pk_row: PkRow<'_>,
123 delete_position_buffer: &mut DataChunkBuilder,
124 ) -> StreamExecutorResult<Option<DataChunk>> {
125 let Some(index_row) = self.pk_index_state_table.get_row(pk_row).await? else {
126 return Ok(None);
127 };
128
129 let num_cols = index_row.len();
130 let file_path = index_row
131 .datum_at(num_cols - 2)
132 .context("file_path should not be null")?
133 .into_utf8();
134 let position = index_row
135 .datum_at(num_cols - 1)
136 .context("position should not be null")?
137 .into_int64();
138 let chunk = append_row(delete_position_buffer, file_path, position);
139 self.pk_index_state_table.delete(index_row);
140 Ok(chunk)
141 }
142
143 #[try_stream(ok = DataChunk, error = StreamExecutorError)]
158 async fn process_chunk(&mut self, chunk: StreamChunk) {
159 let chunk = compact_chunk_inline::<{ output_kind::RETRACT }>(
164 chunk,
165 &self.pk_indices,
166 InconsistencyBehavior::Warn,
167 );
168
169 let mut delete_position_buffer = self
170 .delete_position_buffer
171 .take()
172 .unwrap_or_else(|| new_chunk_builder(self.chunk_size));
173 let pk_indices = self.pk_indices.clone();
174
175 let mut insert_chunk =
179 DataChunkBuilder::new(chunk.data_chunk().data_types(), chunk.capacity() + 1);
180 let mut insert_pks: Vec<PkRow<'_>> = Vec::new();
181
182 for record in chunk.records() {
183 match record {
184 Record::Insert { new_row } => {
185 if !self.pk_matched {
186 let pk_row = new_row.project(&pk_indices);
187 if let Some(chunk) = self
188 .delete_existing_row(pk_row, &mut delete_position_buffer)
189 .await?
190 {
191 yield chunk;
192 }
193 }
194 let overflow = insert_chunk.append_one_row(new_row);
195 debug_assert!(overflow.is_none(), "insert chunk exceeds capacity");
196 insert_pks.push(new_row.project(&pk_indices));
197 }
198 Record::Delete { old_row } => {
199 let pk_row = old_row.project(&pk_indices);
200 if let Some(chunk) = self
201 .delete_existing_row(pk_row, &mut delete_position_buffer)
202 .await?
203 {
204 yield chunk;
205 }
206 }
207 Record::Update { new_row, .. } => {
208 let pk_row = new_row.project(&pk_indices);
210 if let Some(chunk) = self
211 .delete_existing_row(pk_row, &mut delete_position_buffer)
212 .await?
213 {
214 yield chunk;
215 }
216 let overflow = insert_chunk.append_one_row(new_row);
217 debug_assert!(overflow.is_none(), "insert chunk exceeds capacity");
218 insert_pks.push(pk_row);
219 }
220 }
221 }
222
223 if !insert_pks.is_empty() {
224 let insert_chunk = insert_chunk.finish();
225 let positions = self.writer.write_chunk(insert_chunk).await?;
226
227 for (pk, pos) in insert_pks.into_iter().zip_eq_fast(positions) {
228 let mut index_row_data = Vec::with_capacity(pk_indices.len() + 2);
229 for datum in pk.iter() {
230 index_row_data.push(datum);
231 }
232 index_row_data.push(Some(ScalarRefImpl::Utf8(&pos.path)));
233 index_row_data.push(Some(ScalarRefImpl::Int64(pos.pos)));
234 self.pk_index_state_table.insert(index_row_data.as_slice());
235 }
236 }
237
238 self.delete_position_buffer = Some(delete_position_buffer);
239 self.pk_index_state_table.try_flush().await?;
240 }
241
242 #[try_stream(ok = Message, error = StreamExecutorError)]
243 async fn execute_inner(mut self) {
244 let mut input = self.input.take().unwrap().execute();
245
246 let barrier = expect_first_barrier(&mut input).await?;
248 let first_epoch = barrier.epoch;
249 yield Message::Barrier(barrier);
250 self.pk_index_state_table.init_epoch(first_epoch).await?;
251
252 #[for_await]
253 for msg in input {
254 match msg? {
255 Message::Chunk(chunk) =>
256 {
257 #[for_await]
258 for data_chunk in self.process_chunk(chunk) {
259 yield Message::Chunk(data_chunk?.into());
260 }
261 }
262 Message::Barrier(barrier) => {
263 if let Some(chunk) = self
264 .delete_position_buffer
265 .take()
266 .and_then(|mut b| b.consume_all())
267 {
268 yield Message::Chunk(chunk.into());
269 }
270
271 let _metadata = self.writer.flush().await?.unwrap_or_default();
272 let epoch = barrier.epoch;
273 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
274 let post_commit = self.pk_index_state_table.commit(epoch).await?;
275
276 yield Message::Barrier(barrier);
279
280 post_commit.post_yield_barrier(update_vnode_bitmap).await?;
281 }
282 Message::Watermark(w) => {
283 yield Message::Watermark(w);
284 }
285 }
286 }
287 }
288}
289
290impl<S, W> Execute for WriterExecutor<S, W>
291where
292 S: StateStore,
293 W: IcebergWriter,
294{
295 fn execute(self: Box<Self>) -> BoxedMessageStream {
296 self.execute_inner().boxed()
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use std::sync::{Arc, Mutex};
303
304 use iceberg::writer::PositionDeleteInput;
305 use risingwave_common::array::Op;
306 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
307 use risingwave_common::test_prelude::StreamChunkTestExt;
308 use risingwave_common::types::DataType;
309 use risingwave_common::util::epoch::test_epoch;
310 use risingwave_common::util::sort_util::OrderType;
311 use risingwave_storage::memory::MemoryStateStore;
312
313 use super::*;
314 use crate::common::table::test_utils::gen_pbtable;
315 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
316
317 const CHUNK_SIZE: usize = 1024;
318 const TEST_FILE_PATH: &str = "file1.parquet";
319
320 struct IcebergWriterMock {
321 file_path: String,
322 next_offset: i64,
323 written_chunks: Arc<Mutex<Vec<StreamChunk>>>,
324 }
325
326 impl IcebergWriterMock {
327 fn new(file_path: &str) -> Self {
328 Self {
329 file_path: file_path.to_owned(),
330 next_offset: 0,
331 written_chunks: Arc::new(Mutex::new(Vec::new())),
332 }
333 }
334
335 fn written_chunks(&self) -> Arc<Mutex<Vec<StreamChunk>>> {
336 self.written_chunks.clone()
337 }
338 }
339
340 #[async_trait::async_trait]
341 impl IcebergWriter for IcebergWriterMock {
342 async fn write_chunk(
343 &mut self,
344 chunk: DataChunk,
345 ) -> StreamExecutorResult<Vec<PositionDeleteInput>> {
346 let row_count = chunk.cardinality();
347 let mut positions = Vec::with_capacity(row_count);
348 for _ in 0..row_count {
349 positions.push(PositionDeleteInput::new(
350 Arc::<str>::from(self.file_path.as_str()),
351 self.next_offset,
352 ));
353 self.next_offset += 1;
354 }
355 self.written_chunks.lock().unwrap().push(chunk.into());
356 Ok(positions)
357 }
358
359 async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>> {
360 Ok(None)
361 }
362 }
363
364 async fn create_pk_index_state_table(
365 store: MemoryStateStore,
366 table_id: TableId,
367 ) -> StateTable<MemoryStateStore> {
368 let column_descs = vec![
369 ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
370 ColumnDesc::unnamed(ColumnId::new(1), DataType::Varchar),
371 ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
372 ];
373 let order_types = vec![OrderType::ascending()];
374 let pk_indices = vec![0];
375
376 StateTable::from_table_catalog(
377 &gen_pbtable(table_id, column_descs, order_types, pk_indices, 0),
378 store,
379 None,
380 )
381 .await
382 }
383
384 fn input_schema() -> Schema {
385 Schema::new(vec![
386 Field::unnamed(DataType::Int64),
387 Field::unnamed(DataType::Int64),
388 ])
389 }
390
391 fn decode_chunk(chunk: StreamChunk) -> Vec<(String, i64)> {
392 chunk
393 .rows()
394 .map(|(op, row)| {
395 assert_eq!(op, Op::Insert);
396 let file_path = row.datum_at(0).unwrap().into_utf8().to_owned();
397 let position = row.datum_at(1).unwrap().into_int64();
398 (file_path, position)
399 })
400 .collect()
401 }
402
403 fn test_file_position(position: i64) -> (String, i64) {
404 (TEST_FILE_PATH.to_owned(), position)
405 }
406
407 struct WriterTestHarness {
408 tx: MessageSender,
409 executor: BoxedMessageStream,
410 written_chunks: Arc<Mutex<Vec<StreamChunk>>>,
411 }
412
413 impl WriterTestHarness {
414 async fn new() -> Self {
415 let store = MemoryStateStore::new();
416 let state_table = create_pk_index_state_table(store, TableId::new(1)).await;
417 let writer = IcebergWriterMock::new(TEST_FILE_PATH);
418 let written_chunks = writer.written_chunks();
419
420 let (tx, source) = MockSource::channel();
421 let source = source.into_executor(input_schema(), vec![0]);
422 let executor = WriterExecutor::new(
423 ActorContext::for_test(123),
424 source,
425 vec![0],
426 state_table,
427 writer,
428 CHUNK_SIZE,
429 true,
430 )
431 .boxed()
432 .execute();
433
434 Self {
435 tx,
436 executor,
437 written_chunks,
438 }
439 }
440
441 async fn init(&mut self) {
442 self.tx.push_barrier(test_epoch(1), false);
443 self.executor.expect_barrier().await;
444 }
445
446 fn push_chunk(&mut self, chunk: StreamChunk) {
447 self.tx.push_chunk(chunk);
448 }
449
450 fn push_pretty_chunk(&mut self, pretty: &str) {
451 self.push_chunk(StreamChunk::from_pretty(pretty));
452 }
453
454 fn push_barrier(&mut self, epoch: u64) {
455 self.tx.push_barrier(test_epoch(epoch), false);
456 }
457
458 async fn expect_barrier(&mut self) {
459 self.executor.expect_barrier().await;
460 }
461
462 async fn expect_position_chunk(&mut self, expected: Vec<(String, i64)>) {
463 assert_eq!(decode_chunk(self.executor.expect_chunk().await), expected);
464 }
465
466 fn written_chunks(&self) -> Vec<StreamChunk> {
467 self.written_chunks.lock().unwrap().clone()
468 }
469
470 fn compacted_written_chunks(&self) -> Vec<StreamChunk> {
471 self.written_chunks()
472 .into_iter()
473 .map(StreamChunk::compact_vis)
474 .collect()
475 }
476 }
477
478 #[tokio::test]
479 async fn test_writer_executor_insert_only() {
480 let mut harness = WriterTestHarness::new().await;
481 harness.init().await;
482
483 harness.push_pretty_chunk(
484 " I I
485 + 1 10
486 + 2 20
487 + 3 30",
488 );
489 harness.push_barrier(2);
490
491 harness.expect_barrier().await;
492 assert_eq!(
493 harness.written_chunks(),
494 vec![StreamChunk::from_pretty(
495 " I I
496 + 1 10
497 + 2 20
498 + 3 30",
499 )]
500 );
501 }
502
503 #[tokio::test]
504 async fn test_writer_executor_insert_then_delete() {
505 let mut harness = WriterTestHarness::new().await;
506 harness.init().await;
507
508 harness.push_pretty_chunk(
509 " I I
510 + 1 10
511 + 2 20
512 + 3 30",
513 );
514 harness.push_barrier(2);
515 harness.expect_barrier().await;
516
517 harness.push_pretty_chunk(
518 " I I
519 - 2 20",
520 );
521 harness.push_barrier(3);
522
523 harness
524 .expect_position_chunk(vec![test_file_position(1)])
525 .await;
526 harness.expect_barrier().await;
527 }
528
529 #[tokio::test]
530 async fn test_writer_executor_update_rewrites_position() {
531 let mut harness = WriterTestHarness::new().await;
532 harness.init().await;
533
534 harness.push_pretty_chunk(
535 " I I
536 + 1 10",
537 );
538 harness.push_barrier(2);
539 harness.expect_barrier().await;
540
541 harness.push_pretty_chunk(
542 " I I
543 U- 1 10
544 U+ 1 99",
545 );
546 harness.push_barrier(3);
547
548 harness
549 .expect_position_chunk(vec![test_file_position(0)])
550 .await;
551 harness.expect_barrier().await;
552
553 harness.push_pretty_chunk(
554 " I I
555 - 1 99",
556 );
557 harness.push_barrier(4);
558
559 harness
560 .expect_position_chunk(vec![test_file_position(1)])
561 .await;
562 harness.expect_barrier().await;
563
564 assert_eq!(
565 harness.written_chunks(),
566 vec![
567 StreamChunk::from_pretty(
568 " I I
569 + 1 10",
570 ),
571 StreamChunk::from_pretty(
572 " I I
573 + 1 99",
574 ),
575 ]
576 );
577 }
578
579 #[tokio::test]
580 async fn test_writer_executor_delete_then_insert_without_existing_row_is_fresh_insert() {
581 let mut harness = WriterTestHarness::new().await;
582 harness.init().await;
583
584 harness.push_pretty_chunk(
585 " I I
586 - 1 10
587 + 1 99",
588 );
589 harness.push_barrier(2);
590
591 harness.expect_barrier().await;
592 assert_eq!(
593 harness.written_chunks(),
594 vec![StreamChunk::from_pretty(
595 " I I
596 + 1 99",
597 )]
598 );
599 }
600
601 #[tokio::test]
602 async fn test_writer_executor_delete_then_insert_rewrites_existing_row() {
603 let mut harness = WriterTestHarness::new().await;
604 harness.init().await;
605
606 harness.push_pretty_chunk(
607 " I I
608 + 1 10",
609 );
610 harness.push_barrier(2);
611 harness.expect_barrier().await;
612
613 harness.push_pretty_chunk(
614 " I I
615 - 1 10
616 + 1 99",
617 );
618 harness.push_barrier(3);
619
620 harness
621 .expect_position_chunk(vec![test_file_position(0)])
622 .await;
623 harness.expect_barrier().await;
624
625 assert_eq!(
626 harness.written_chunks(),
627 vec![
628 StreamChunk::from_pretty(
629 " I I
630 + 1 10",
631 ),
632 StreamChunk::from_pretty(
633 " I I
634 + 1 99",
635 ),
636 ]
637 );
638 }
639
640 #[tokio::test]
641 async fn test_writer_executor_delete_then_delete_emits_one_position_delete() {
642 let mut harness = WriterTestHarness::new().await;
643 harness.init().await;
644
645 harness.push_pretty_chunk(
646 " I I
647 + 1 10",
648 );
649 harness.push_barrier(2);
650 harness.expect_barrier().await;
651
652 harness.push_pretty_chunk(
653 " I I
654 - 1 10
655 - 1 10",
656 );
657 harness.push_barrier(3);
658
659 harness
660 .expect_position_chunk(vec![test_file_position(0)])
661 .await;
662 harness.expect_barrier().await;
663 assert_eq!(
664 harness.written_chunks(),
665 vec![StreamChunk::from_pretty(
666 " I I
667 + 1 10",
668 )]
669 );
670 }
671
672 #[tokio::test]
673 async fn test_writer_executor_insert_then_delete_in_different_chunks_same_checkpoint() {
674 let mut harness = WriterTestHarness::new().await;
675 harness.init().await;
676
677 harness.push_pretty_chunk(
678 " I I
679 + 1 10",
680 );
681 harness.push_pretty_chunk(
682 " I I
683 - 1 10",
684 );
685 harness.push_barrier(2);
686
687 harness
688 .expect_position_chunk(vec![test_file_position(0)])
689 .await;
690 harness.expect_barrier().await;
691 assert_eq!(
692 harness.written_chunks(),
693 vec![StreamChunk::from_pretty(
694 " I I
695 + 1 10",
696 )]
697 );
698 }
699
700 #[tokio::test]
701 async fn test_writer_executor_insert_then_insert_in_same_chunk_keeps_latest_row() {
702 let mut harness = WriterTestHarness::new().await;
703 harness.init().await;
704
705 harness.push_pretty_chunk(
706 " I I
707 + 1 10
708 + 1 99",
709 );
710 harness.push_barrier(2);
711
712 harness.expect_barrier().await;
713 assert_eq!(
714 harness.compacted_written_chunks(),
715 vec![StreamChunk::from_pretty(
716 " I I
717 + 1 99",
718 )]
719 );
720
721 harness.push_pretty_chunk(
722 " I I
723 - 1 99",
724 );
725 harness.push_barrier(3);
726
727 harness
728 .expect_position_chunk(vec![test_file_position(0)])
729 .await;
730 harness.expect_barrier().await;
731 }
732
733 #[tokio::test]
734 async fn test_writer_executor_insert_then_delete_in_same_chunk_is_cancelled() {
735 let mut harness = WriterTestHarness::new().await;
736 harness.init().await;
737
738 harness.push_pretty_chunk(
739 " I I
740 + 1 10
741 - 1 10",
742 );
743 harness.push_barrier(2);
744
745 harness.expect_barrier().await;
746 assert!(harness.written_chunks().is_empty());
747 }
748}