risingwave_stream/executor/iceberg_with_pk_index/
dv_merger.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_pb::connector_service::SinkMetadata;
17
18use crate::executor::prelude::*;
19
20/// Trait abstracting DV (Deletion Vector) file operations for testability.
21///
22/// Implementations are responsible for reading existing DVs, merging new
23/// delete positions, writing DV files, and returning the metadata that should
24/// be committed on the current barrier.
25#[async_trait::async_trait]
26pub trait DvHandler: Send + 'static {
27    fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()>;
28
29    async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>>;
30}
31
32/// DV Merger Executor for Iceberg V3 Sink without Equality Delete.
33///
34/// This stateless executor receives [`file_path`, `position`] messages from the Writer Executor,
35/// merges them with historical DVs, and reports the merged DV metadata to meta on each barrier.
36///
37/// The upstream plan shards messages by `file_path`, so each actor only merges delete
38/// positions for the files assigned to its shard.
39///
40/// Input schema: [`file_path`: Varchar, `position`: int64]
41/// Output: Only barriers (terminal executor in the stream graph).
42pub struct DvMergerExecutor<H>
43where
44    H: DvHandler,
45{
46    _ctx: ActorContextRef,
47    input: Option<Executor>,
48    handler: H,
49}
50
51impl<H> DvMergerExecutor<H>
52where
53    H: DvHandler,
54{
55    pub fn new(ctx: ActorContextRef, input: Executor, handler: H) -> Self {
56        Self {
57            _ctx: ctx,
58            input: Some(input),
59            handler,
60        }
61    }
62
63    #[try_stream(ok = Message, error = StreamExecutorError)]
64    async fn execute_inner(mut self) {
65        let mut input = self.input.take().unwrap().execute();
66
67        // Consume the first barrier.
68        let barrier = expect_first_barrier(&mut input).await?;
69        yield Message::Barrier(barrier);
70
71        #[for_await]
72        for msg in input {
73            match msg? {
74                Message::Chunk(chunk) => {
75                    for (op, row) in chunk.rows() {
76                        debug_assert_eq!(op, risingwave_common::array::Op::Insert);
77                        let file_path = row
78                            .datum_at(0)
79                            .map(|d| d.into_utf8())
80                            .context("file_path should not be null")?;
81                        let position = row
82                            .datum_at(1)
83                            .context("position should not be null")?
84                            .into_int64();
85                        self.handler.write(file_path, position)?;
86                    }
87                }
88                Message::Barrier(barrier) => {
89                    let _metadata = self.handler.flush().await?.unwrap_or_default();
90                    // TODO: commit the DV metadata
91
92                    yield Message::Barrier(barrier);
93                }
94                Message::Watermark(w) => {
95                    yield Message::Watermark(w);
96                }
97            }
98        }
99    }
100}
101
102impl<H> Execute for DvMergerExecutor<H>
103where
104    H: DvHandler,
105{
106    fn execute(self: Box<Self>) -> BoxedMessageStream {
107        self.execute_inner().boxed()
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::collections::BTreeSet;
114    use std::sync::{Arc, Mutex};
115
116    use hashbrown::HashMap;
117    use risingwave_common::array::{Array, ArrayBuilder, I64ArrayBuilder, Op, Utf8ArrayBuilder};
118    use risingwave_common::catalog::{Field, Schema};
119    use risingwave_common::types::DataType;
120    use risingwave_common::util::epoch::test_epoch;
121
122    use super::*;
123    use crate::executor::test_utils::MockSource;
124
125    fn build_delete_position_chunk(positions: &[(&str, i64)]) -> StreamChunk {
126        let len = positions.len();
127        let mut file_path_builder = Utf8ArrayBuilder::new(len);
128        let mut position_builder = I64ArrayBuilder::new(len);
129
130        for (path, offset) in positions {
131            file_path_builder.append(Some(*path));
132            position_builder.append(Some(*offset));
133        }
134
135        StreamChunk::from_parts(
136            vec![Op::Insert; len],
137            risingwave_common::array::DataChunk::new(
138                vec![
139                    file_path_builder.finish().into_ref(),
140                    position_builder.finish().into_ref(),
141                ],
142                len,
143            ),
144        )
145    }
146
147    #[derive(Clone)]
148    struct DvHandlerMock {
149        existing_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
150        pending_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
151        written_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
152    }
153
154    impl DvHandlerMock {
155        fn new() -> Self {
156            Self {
157                existing_dvs: Arc::new(Mutex::new(HashMap::new())),
158                pending_dvs: Arc::new(Mutex::new(HashMap::new())),
159                written_dvs: Arc::new(Mutex::new(HashMap::new())),
160            }
161        }
162
163        fn with_existing_dv(self, file_path: &str, positions: BTreeSet<i64>) -> Self {
164            self.existing_dvs
165                .lock()
166                .unwrap()
167                .insert(file_path.to_owned(), positions);
168            self
169        }
170    }
171
172    #[async_trait::async_trait]
173    impl DvHandler for DvHandlerMock {
174        fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()> {
175            self.pending_dvs
176                .lock()
177                .unwrap()
178                .entry_ref(path)
179                .or_default()
180                .insert(pos);
181            Ok(())
182        }
183
184        async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>> {
185            let pending = {
186                let mut pending_dvs = self.pending_dvs.lock().unwrap();
187                std::mem::take(&mut *pending_dvs)
188            };
189            if pending.is_empty() {
190                return Ok(None);
191            }
192
193            let mut existing_dvs = self.existing_dvs.lock().unwrap();
194            let mut written_dvs = self.written_dvs.lock().unwrap();
195            for (file_path, positions) in pending {
196                let mut merged = existing_dvs.get(&file_path).cloned().unwrap_or_default();
197                merged.extend(positions);
198                existing_dvs.insert(file_path.clone(), merged.clone());
199                written_dvs.insert(file_path, merged);
200            }
201            Ok(None)
202        }
203    }
204
205    fn input_schema() -> Schema {
206        Schema::new(vec![
207            Field::unnamed(DataType::Varchar),
208            Field::unnamed(DataType::Int64),
209        ])
210    }
211
212    #[tokio::test]
213    async fn test_dv_merger_basic() {
214        let handler = DvHandlerMock::new();
215        let written_dvs = handler.written_dvs.clone();
216
217        let (mut tx, source) = MockSource::channel();
218        let source = source.into_executor(input_schema(), vec![]);
219
220        let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
221            .boxed()
222            .execute();
223
224        tx.push_barrier(test_epoch(1), false);
225        assert!(executor.next().await.unwrap().unwrap().is_barrier());
226
227        tx.push_chunk(build_delete_position_chunk(&[
228            ("file1.parquet", 0),
229            ("file1.parquet", 3),
230            ("file2.parquet", 1),
231        ]));
232        tx.push_barrier(test_epoch(2), false);
233
234        assert!(executor.next().await.unwrap().unwrap().is_barrier());
235
236        let dvs = written_dvs.lock().unwrap();
237        assert_eq!(dvs.get("file1.parquet").unwrap(), &BTreeSet::from([0, 3]));
238        assert_eq!(dvs.get("file2.parquet").unwrap(), &BTreeSet::from([1]));
239    }
240
241    #[tokio::test]
242    async fn test_dv_merger_merge_with_existing() {
243        let handler =
244            DvHandlerMock::new().with_existing_dv("file1.parquet", BTreeSet::from([0, 5, 10]));
245        let written_dvs = handler.written_dvs.clone();
246
247        let (mut tx, source) = MockSource::channel();
248        let source = source.into_executor(input_schema(), vec![]);
249
250        let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
251            .boxed()
252            .execute();
253
254        tx.push_barrier(test_epoch(1), false);
255        assert!(executor.next().await.unwrap().unwrap().is_barrier());
256
257        tx.push_chunk(build_delete_position_chunk(&[
258            ("file1.parquet", 3),
259            ("file1.parquet", 5),
260            ("file1.parquet", 7),
261        ]));
262        tx.push_barrier(test_epoch(2), false);
263
264        assert!(executor.next().await.unwrap().unwrap().is_barrier());
265
266        let dvs = written_dvs.lock().unwrap();
267        assert_eq!(
268            dvs.get("file1.parquet").unwrap(),
269            &BTreeSet::from([0, 3, 5, 7, 10])
270        );
271    }
272
273    #[tokio::test]
274    async fn test_dv_merger_no_deletes() {
275        let handler = DvHandlerMock::new();
276        let written_dvs = handler.written_dvs.clone();
277
278        let (mut tx, source) = MockSource::channel();
279        let source = source.into_executor(input_schema(), vec![]);
280
281        let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
282            .boxed()
283            .execute();
284
285        tx.push_barrier(test_epoch(1), false);
286        assert!(executor.next().await.unwrap().unwrap().is_barrier());
287
288        tx.push_barrier(test_epoch(2), false);
289        assert!(executor.next().await.unwrap().unwrap().is_barrier());
290
291        assert!(written_dvs.lock().unwrap().is_empty());
292    }
293
294    #[tokio::test]
295    async fn test_dv_merger_multiple_epochs() {
296        let handler = DvHandlerMock::new();
297        let written_dvs = handler.written_dvs.clone();
298
299        let (mut tx, source) = MockSource::channel();
300        let source = source.into_executor(input_schema(), vec![]);
301
302        let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
303            .boxed()
304            .execute();
305
306        tx.push_barrier(test_epoch(1), false);
307        assert!(executor.next().await.unwrap().unwrap().is_barrier());
308
309        tx.push_chunk(build_delete_position_chunk(&[("file1.parquet", 0)]));
310        tx.push_barrier(test_epoch(2), false);
311        assert!(executor.next().await.unwrap().unwrap().is_barrier());
312        assert_eq!(
313            written_dvs.lock().unwrap().get("file1.parquet").unwrap(),
314            &BTreeSet::from([0])
315        );
316
317        tx.push_chunk(build_delete_position_chunk(&[("file1.parquet", 2)]));
318        tx.push_barrier(test_epoch(3), false);
319        assert!(executor.next().await.unwrap().unwrap().is_barrier());
320
321        assert_eq!(
322            written_dvs.lock().unwrap().get("file1.parquet").unwrap(),
323            &BTreeSet::from([0, 2])
324        );
325    }
326}