risingwave_stream/executor/iceberg_with_pk_index/
writer.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use iceberg::writer::PositionDeleteInput;
17use risingwave_common::array::DataChunk;
18use risingwave_common::array::stream_record::Record;
19use risingwave_common::row::{Project, RowExt};
20use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_pb::connector_service::SinkMetadata;
23use risingwave_storage::StateStore;
24
25use crate::common::change_buffer::output_kind;
26use crate::common::compact_chunk::{InconsistencyBehavior, compact_chunk_inline};
27use crate::executor::prelude::*;
28
29pub type IcebergWriterFlushOutput = SinkMetadata;
30type PkRow<'a> = Project<'a, RowRef<'a>>;
31
32fn new_chunk_builder(chunk_size: usize) -> DataChunkBuilder {
33    DataChunkBuilder::new(vec![DataType::Varchar, DataType::Int64], chunk_size)
34}
35
36fn append_row(builder: &mut DataChunkBuilder, file_path: &str, position: i64) -> Option<DataChunk> {
37    builder.append_one_row([
38        Some(ScalarRefImpl::Utf8(file_path)),
39        Some(ScalarRefImpl::Int64(position)),
40    ])
41}
42
43/// Trait abstracting the Iceberg data file writing for testability.
44///
45/// Implementations are responsible for writing rows to Iceberg data files
46/// and tracking row positions. Commit is handled by the executor, not the writer.
47#[async_trait::async_trait]
48pub trait IcebergWriter: Send + 'static {
49    /// Write a batch of insert rows. Returns the position of each row in the chunk (in order).
50    async fn write_chunk(
51        &mut self,
52        chunk: DataChunk,
53    ) -> StreamExecutorResult<Vec<PositionDeleteInput>>;
54
55    /// Flush current data files on barrier. Returns the written data files
56    /// and each file's partition information (if partitioned).
57    async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>>;
58}
59
60/// Writer Executor for Iceberg V3 Sink with PK index
61///
62/// This stateful executor maintains a PK index that maps primary key values to
63/// their position in data files (`file_path`, `position`). It processes change logs
64/// from upstream:
65///
66/// - **Insert**: Writes the row to a data file via [`IcebergWriter`], records the
67///   position in the PK index state table.
68/// - **Delete**: Looks up the PK index to find the data file position, emits a
69///   delete position message downstream to the DV Merger, removes from index.
70/// - **Update**: Treated as Delete + Insert. The planner guarantees the old and
71///   new rows share the same PK, so the executor can reuse the projected PK from
72///   the old row when updating the PK index.
73pub struct WriterExecutor<S, W>
74where
75    S: StateStore,
76    W: IcebergWriter,
77{
78    ctx: ActorContextRef,
79    input: Option<Executor>,
80    /// Column indices of the primary key in the input schema.
81    pk_indices: Vec<usize>,
82    /// State table storing the PK index: `pk_columns` -> (`file_path`, `position`).
83    /// Schema: [`pk_col_0`, ..., `pk_col_n`, `file_path`: Varchar, `position`: Int64]
84    pk_index_state_table: StateTable<S>,
85    /// The Iceberg data file writer.
86    writer: W,
87    /// Buffer for accumulating delete position messages before the next barrier flush.
88    delete_position_buffer: Option<DataChunkBuilder>,
89    chunk_size: usize,
90    pk_matched: bool,
91}
92
93impl<S, W> WriterExecutor<S, W>
94where
95    S: StateStore,
96    W: IcebergWriter,
97{
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        ctx: ActorContextRef,
101        input: Executor,
102        pk_indices: Vec<usize>,
103        pk_index_state_table: StateTable<S>,
104        writer: W,
105        chunk_size: usize,
106        pk_matched: bool,
107    ) -> Self {
108        Self {
109            ctx,
110            input: Some(input),
111            pk_indices,
112            pk_index_state_table,
113            writer,
114            delete_position_buffer: None,
115            chunk_size,
116            pk_matched,
117        }
118    }
119
120    async fn delete_existing_row(
121        &mut self,
122        pk_row: PkRow<'_>,
123        delete_position_buffer: &mut DataChunkBuilder,
124    ) -> StreamExecutorResult<Option<DataChunk>> {
125        let Some(index_row) = self.pk_index_state_table.get_row(pk_row).await? else {
126            return Ok(None);
127        };
128
129        let num_cols = index_row.len();
130        let file_path = index_row
131            .datum_at(num_cols - 2)
132            .context("file_path should not be null")?
133            .into_utf8();
134        let position = index_row
135            .datum_at(num_cols - 1)
136            .context("position should not be null")?
137            .into_int64();
138        let chunk = append_row(delete_position_buffer, file_path, position);
139        self.pk_index_state_table.delete(index_row);
140        Ok(chunk)
141    }
142
143    // Process one stream chunk:
144    //
145    // 1. Compact the chunk by `pk_indices` so each PK appears at most once and any intra-chunk
146    //    `+/-` cancellations are absorbed up front. After this step every record is either a
147    //    standalone `Insert`, `Delete`, or `Update {old, new}` whose old and new rows share the
148    //    same PK.
149    // 2. For each record: `Insert` is buffered into a single batched write; `Delete` looks up
150    //    `pk_index_state_table` to emit a position delete and clears the entry; `Update` is
151    //    handled as a position delete for the old row plus a buffered insert for the new row.
152    // 3. After the scan, write all buffered inserts in one `write_chunk` call and persist the
153    //    returned Iceberg positions back to `pk_index_state_table`.
154    //
155    // `pk_index_state_table` and `delete_position_buffer` live until the next barrier, so a later
156    // chunk in the same checkpoint observes earlier writes/deletes via the state table.
157    #[try_stream(ok = DataChunk, error = StreamExecutorError)]
158    async fn process_chunk(&mut self, chunk: StreamChunk) {
159        // PK uniqueness within a chunk is not guaranteed by upstream (this executor sits in front
160        // of `SinkExecutor` and does not inherit its compaction). Run it ourselves so the loop
161        // below can stay linear. `Warn` matches `SinkExecutor`'s policy: tolerate inconsistent
162        // upstream PK rather than panic.
163        let chunk = compact_chunk_inline::<{ output_kind::RETRACT }>(
164            chunk,
165            &self.pk_indices,
166            InconsistencyBehavior::Warn,
167        );
168
169        let mut delete_position_buffer = self
170            .delete_position_buffer
171            .take()
172            .unwrap_or_else(|| new_chunk_builder(self.chunk_size));
173        let pk_indices = self.pk_indices.clone();
174
175        // `chunk.capacity() + 1` is an upper bound on appended rows: each surviving record
176        // contributes at most one row (Insert / Update::new), and `records()` yields at most
177        // `capacity` records.
178        let mut insert_chunk =
179            DataChunkBuilder::new(chunk.data_chunk().data_types(), chunk.capacity() + 1);
180        let mut insert_pks: Vec<PkRow<'_>> = Vec::new();
181
182        for record in chunk.records() {
183            match record {
184                Record::Insert { new_row } => {
185                    if !self.pk_matched {
186                        let pk_row = new_row.project(&pk_indices);
187                        if let Some(chunk) = self
188                            .delete_existing_row(pk_row, &mut delete_position_buffer)
189                            .await?
190                        {
191                            yield chunk;
192                        }
193                    }
194                    let overflow = insert_chunk.append_one_row(new_row);
195                    debug_assert!(overflow.is_none(), "insert chunk exceeds capacity");
196                    insert_pks.push(new_row.project(&pk_indices));
197                }
198                Record::Delete { old_row } => {
199                    let pk_row = old_row.project(&pk_indices);
200                    if let Some(chunk) = self
201                        .delete_existing_row(pk_row, &mut delete_position_buffer)
202                        .await?
203                    {
204                        yield chunk;
205                    }
206                }
207                Record::Update { new_row, .. } => {
208                    // The compactor groups by `pk_indices`, so old and new share the same PK.
209                    let pk_row = new_row.project(&pk_indices);
210                    if let Some(chunk) = self
211                        .delete_existing_row(pk_row, &mut delete_position_buffer)
212                        .await?
213                    {
214                        yield chunk;
215                    }
216                    let overflow = insert_chunk.append_one_row(new_row);
217                    debug_assert!(overflow.is_none(), "insert chunk exceeds capacity");
218                    insert_pks.push(pk_row);
219                }
220            }
221        }
222
223        if !insert_pks.is_empty() {
224            let insert_chunk = insert_chunk.finish();
225            let positions = self.writer.write_chunk(insert_chunk).await?;
226
227            for (pk, pos) in insert_pks.into_iter().zip_eq_fast(positions) {
228                let mut index_row_data = Vec::with_capacity(pk_indices.len() + 2);
229                for datum in pk.iter() {
230                    index_row_data.push(datum);
231                }
232                index_row_data.push(Some(ScalarRefImpl::Utf8(&pos.path)));
233                index_row_data.push(Some(ScalarRefImpl::Int64(pos.pos)));
234                self.pk_index_state_table.insert(index_row_data.as_slice());
235            }
236        }
237
238        self.delete_position_buffer = Some(delete_position_buffer);
239        self.pk_index_state_table.try_flush().await?;
240    }
241
242    #[try_stream(ok = Message, error = StreamExecutorError)]
243    async fn execute_inner(mut self) {
244        let mut input = self.input.take().unwrap().execute();
245
246        // Consume the first barrier.
247        let barrier = expect_first_barrier(&mut input).await?;
248        let first_epoch = barrier.epoch;
249        yield Message::Barrier(barrier);
250        self.pk_index_state_table.init_epoch(first_epoch).await?;
251
252        #[for_await]
253        for msg in input {
254            match msg? {
255                Message::Chunk(chunk) =>
256                {
257                    #[for_await]
258                    for data_chunk in self.process_chunk(chunk) {
259                        yield Message::Chunk(data_chunk?.into());
260                    }
261                }
262                Message::Barrier(barrier) => {
263                    if let Some(chunk) = self
264                        .delete_position_buffer
265                        .take()
266                        .and_then(|mut b| b.consume_all())
267                    {
268                        yield Message::Chunk(chunk.into());
269                    }
270
271                    let _metadata = self.writer.flush().await?.unwrap_or_default();
272                    let epoch = barrier.epoch;
273                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.ctx.id);
274                    let post_commit = self.pk_index_state_table.commit(epoch).await?;
275
276                    // TODO: commit the writer metadata
277
278                    yield Message::Barrier(barrier);
279
280                    post_commit.post_yield_barrier(update_vnode_bitmap).await?;
281                }
282                Message::Watermark(w) => {
283                    yield Message::Watermark(w);
284                }
285            }
286        }
287    }
288}
289
290impl<S, W> Execute for WriterExecutor<S, W>
291where
292    S: StateStore,
293    W: IcebergWriter,
294{
295    fn execute(self: Box<Self>) -> BoxedMessageStream {
296        self.execute_inner().boxed()
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use std::sync::{Arc, Mutex};
303
304    use iceberg::writer::PositionDeleteInput;
305    use risingwave_common::array::Op;
306    use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
307    use risingwave_common::test_prelude::StreamChunkTestExt;
308    use risingwave_common::types::DataType;
309    use risingwave_common::util::epoch::test_epoch;
310    use risingwave_common::util::sort_util::OrderType;
311    use risingwave_storage::memory::MemoryStateStore;
312
313    use super::*;
314    use crate::common::table::test_utils::gen_pbtable;
315    use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
316
317    const CHUNK_SIZE: usize = 1024;
318    const TEST_FILE_PATH: &str = "file1.parquet";
319
320    struct IcebergWriterMock {
321        file_path: String,
322        next_offset: i64,
323        written_chunks: Arc<Mutex<Vec<StreamChunk>>>,
324    }
325
326    impl IcebergWriterMock {
327        fn new(file_path: &str) -> Self {
328            Self {
329                file_path: file_path.to_owned(),
330                next_offset: 0,
331                written_chunks: Arc::new(Mutex::new(Vec::new())),
332            }
333        }
334
335        fn written_chunks(&self) -> Arc<Mutex<Vec<StreamChunk>>> {
336            self.written_chunks.clone()
337        }
338    }
339
340    #[async_trait::async_trait]
341    impl IcebergWriter for IcebergWriterMock {
342        async fn write_chunk(
343            &mut self,
344            chunk: DataChunk,
345        ) -> StreamExecutorResult<Vec<PositionDeleteInput>> {
346            let row_count = chunk.cardinality();
347            let mut positions = Vec::with_capacity(row_count);
348            for _ in 0..row_count {
349                positions.push(PositionDeleteInput::new(
350                    Arc::<str>::from(self.file_path.as_str()),
351                    self.next_offset,
352                ));
353                self.next_offset += 1;
354            }
355            self.written_chunks.lock().unwrap().push(chunk.into());
356            Ok(positions)
357        }
358
359        async fn flush(&mut self) -> StreamExecutorResult<Option<IcebergWriterFlushOutput>> {
360            Ok(None)
361        }
362    }
363
364    async fn create_pk_index_state_table(
365        store: MemoryStateStore,
366        table_id: TableId,
367    ) -> StateTable<MemoryStateStore> {
368        let column_descs = vec![
369            ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64),
370            ColumnDesc::unnamed(ColumnId::new(1), DataType::Varchar),
371            ColumnDesc::unnamed(ColumnId::new(2), DataType::Int64),
372        ];
373        let order_types = vec![OrderType::ascending()];
374        let pk_indices = vec![0];
375
376        StateTable::from_table_catalog(
377            &gen_pbtable(table_id, column_descs, order_types, pk_indices, 0),
378            store,
379            None,
380        )
381        .await
382    }
383
384    fn input_schema() -> Schema {
385        Schema::new(vec![
386            Field::unnamed(DataType::Int64),
387            Field::unnamed(DataType::Int64),
388        ])
389    }
390
391    fn decode_chunk(chunk: StreamChunk) -> Vec<(String, i64)> {
392        chunk
393            .rows()
394            .map(|(op, row)| {
395                assert_eq!(op, Op::Insert);
396                let file_path = row.datum_at(0).unwrap().into_utf8().to_owned();
397                let position = row.datum_at(1).unwrap().into_int64();
398                (file_path, position)
399            })
400            .collect()
401    }
402
403    fn test_file_position(position: i64) -> (String, i64) {
404        (TEST_FILE_PATH.to_owned(), position)
405    }
406
407    struct WriterTestHarness {
408        tx: MessageSender,
409        executor: BoxedMessageStream,
410        written_chunks: Arc<Mutex<Vec<StreamChunk>>>,
411    }
412
413    impl WriterTestHarness {
414        async fn new() -> Self {
415            let store = MemoryStateStore::new();
416            let state_table = create_pk_index_state_table(store, TableId::new(1)).await;
417            let writer = IcebergWriterMock::new(TEST_FILE_PATH);
418            let written_chunks = writer.written_chunks();
419
420            let (tx, source) = MockSource::channel();
421            let source = source.into_executor(input_schema(), vec![0]);
422            let executor = WriterExecutor::new(
423                ActorContext::for_test(123),
424                source,
425                vec![0],
426                state_table,
427                writer,
428                CHUNK_SIZE,
429                true,
430            )
431            .boxed()
432            .execute();
433
434            Self {
435                tx,
436                executor,
437                written_chunks,
438            }
439        }
440
441        async fn init(&mut self) {
442            self.tx.push_barrier(test_epoch(1), false);
443            self.executor.expect_barrier().await;
444        }
445
446        fn push_chunk(&mut self, chunk: StreamChunk) {
447            self.tx.push_chunk(chunk);
448        }
449
450        fn push_pretty_chunk(&mut self, pretty: &str) {
451            self.push_chunk(StreamChunk::from_pretty(pretty));
452        }
453
454        fn push_barrier(&mut self, epoch: u64) {
455            self.tx.push_barrier(test_epoch(epoch), false);
456        }
457
458        async fn expect_barrier(&mut self) {
459            self.executor.expect_barrier().await;
460        }
461
462        async fn expect_position_chunk(&mut self, expected: Vec<(String, i64)>) {
463            assert_eq!(decode_chunk(self.executor.expect_chunk().await), expected);
464        }
465
466        fn written_chunks(&self) -> Vec<StreamChunk> {
467            self.written_chunks.lock().unwrap().clone()
468        }
469
470        fn compacted_written_chunks(&self) -> Vec<StreamChunk> {
471            self.written_chunks()
472                .into_iter()
473                .map(StreamChunk::compact_vis)
474                .collect()
475        }
476    }
477
478    #[tokio::test]
479    async fn test_writer_executor_insert_only() {
480        let mut harness = WriterTestHarness::new().await;
481        harness.init().await;
482
483        harness.push_pretty_chunk(
484            " I I
485            + 1 10
486            + 2 20
487            + 3 30",
488        );
489        harness.push_barrier(2);
490
491        harness.expect_barrier().await;
492        assert_eq!(
493            harness.written_chunks(),
494            vec![StreamChunk::from_pretty(
495                " I I
496                + 1 10
497                + 2 20
498                + 3 30",
499            )]
500        );
501    }
502
503    #[tokio::test]
504    async fn test_writer_executor_insert_then_delete() {
505        let mut harness = WriterTestHarness::new().await;
506        harness.init().await;
507
508        harness.push_pretty_chunk(
509            " I I
510            + 1 10
511            + 2 20
512            + 3 30",
513        );
514        harness.push_barrier(2);
515        harness.expect_barrier().await;
516
517        harness.push_pretty_chunk(
518            " I I
519            - 2 20",
520        );
521        harness.push_barrier(3);
522
523        harness
524            .expect_position_chunk(vec![test_file_position(1)])
525            .await;
526        harness.expect_barrier().await;
527    }
528
529    #[tokio::test]
530    async fn test_writer_executor_update_rewrites_position() {
531        let mut harness = WriterTestHarness::new().await;
532        harness.init().await;
533
534        harness.push_pretty_chunk(
535            " I I
536            + 1 10",
537        );
538        harness.push_barrier(2);
539        harness.expect_barrier().await;
540
541        harness.push_pretty_chunk(
542            " I I
543            U- 1 10
544            U+ 1 99",
545        );
546        harness.push_barrier(3);
547
548        harness
549            .expect_position_chunk(vec![test_file_position(0)])
550            .await;
551        harness.expect_barrier().await;
552
553        harness.push_pretty_chunk(
554            " I I
555            - 1 99",
556        );
557        harness.push_barrier(4);
558
559        harness
560            .expect_position_chunk(vec![test_file_position(1)])
561            .await;
562        harness.expect_barrier().await;
563
564        assert_eq!(
565            harness.written_chunks(),
566            vec![
567                StreamChunk::from_pretty(
568                    " I I
569                    + 1 10",
570                ),
571                StreamChunk::from_pretty(
572                    " I I
573                    + 1 99",
574                ),
575            ]
576        );
577    }
578
579    #[tokio::test]
580    async fn test_writer_executor_delete_then_insert_without_existing_row_is_fresh_insert() {
581        let mut harness = WriterTestHarness::new().await;
582        harness.init().await;
583
584        harness.push_pretty_chunk(
585            " I I
586            - 1 10
587            + 1 99",
588        );
589        harness.push_barrier(2);
590
591        harness.expect_barrier().await;
592        assert_eq!(
593            harness.written_chunks(),
594            vec![StreamChunk::from_pretty(
595                " I I
596                + 1 99",
597            )]
598        );
599    }
600
601    #[tokio::test]
602    async fn test_writer_executor_delete_then_insert_rewrites_existing_row() {
603        let mut harness = WriterTestHarness::new().await;
604        harness.init().await;
605
606        harness.push_pretty_chunk(
607            " I I
608            + 1 10",
609        );
610        harness.push_barrier(2);
611        harness.expect_barrier().await;
612
613        harness.push_pretty_chunk(
614            " I I
615            - 1 10
616            + 1 99",
617        );
618        harness.push_barrier(3);
619
620        harness
621            .expect_position_chunk(vec![test_file_position(0)])
622            .await;
623        harness.expect_barrier().await;
624
625        assert_eq!(
626            harness.written_chunks(),
627            vec![
628                StreamChunk::from_pretty(
629                    " I I
630                    + 1 10",
631                ),
632                StreamChunk::from_pretty(
633                    " I I
634                    + 1 99",
635                ),
636            ]
637        );
638    }
639
640    #[tokio::test]
641    async fn test_writer_executor_delete_then_delete_emits_one_position_delete() {
642        let mut harness = WriterTestHarness::new().await;
643        harness.init().await;
644
645        harness.push_pretty_chunk(
646            " I I
647            + 1 10",
648        );
649        harness.push_barrier(2);
650        harness.expect_barrier().await;
651
652        harness.push_pretty_chunk(
653            " I I
654            - 1 10
655            - 1 10",
656        );
657        harness.push_barrier(3);
658
659        harness
660            .expect_position_chunk(vec![test_file_position(0)])
661            .await;
662        harness.expect_barrier().await;
663        assert_eq!(
664            harness.written_chunks(),
665            vec![StreamChunk::from_pretty(
666                " I I
667                + 1 10",
668            )]
669        );
670    }
671
672    #[tokio::test]
673    async fn test_writer_executor_insert_then_delete_in_different_chunks_same_checkpoint() {
674        let mut harness = WriterTestHarness::new().await;
675        harness.init().await;
676
677        harness.push_pretty_chunk(
678            " I I
679            + 1 10",
680        );
681        harness.push_pretty_chunk(
682            " I I
683            - 1 10",
684        );
685        harness.push_barrier(2);
686
687        harness
688            .expect_position_chunk(vec![test_file_position(0)])
689            .await;
690        harness.expect_barrier().await;
691        assert_eq!(
692            harness.written_chunks(),
693            vec![StreamChunk::from_pretty(
694                " I I
695                + 1 10",
696            )]
697        );
698    }
699
700    #[tokio::test]
701    async fn test_writer_executor_insert_then_insert_in_same_chunk_keeps_latest_row() {
702        let mut harness = WriterTestHarness::new().await;
703        harness.init().await;
704
705        harness.push_pretty_chunk(
706            " I I
707            + 1 10
708            + 1 99",
709        );
710        harness.push_barrier(2);
711
712        harness.expect_barrier().await;
713        assert_eq!(
714            harness.compacted_written_chunks(),
715            vec![StreamChunk::from_pretty(
716                " I I
717                + 1 99",
718            )]
719        );
720
721        harness.push_pretty_chunk(
722            " I I
723            - 1 99",
724        );
725        harness.push_barrier(3);
726
727        harness
728            .expect_position_chunk(vec![test_file_position(0)])
729            .await;
730        harness.expect_barrier().await;
731    }
732
733    #[tokio::test]
734    async fn test_writer_executor_insert_then_delete_in_same_chunk_is_cancelled() {
735        let mut harness = WriterTestHarness::new().await;
736        harness.init().await;
737
738        harness.push_pretty_chunk(
739            " I I
740            + 1 10
741            - 1 10",
742        );
743        harness.push_barrier(2);
744
745        harness.expect_barrier().await;
746        assert!(harness.written_chunks().is_empty());
747    }
748}