risingwave_stream/executor/iceberg_with_pk_index/
dv_merger.rs1use anyhow::Context;
16use risingwave_pb::connector_service::SinkMetadata;
17
18use crate::executor::prelude::*;
19
20#[async_trait::async_trait]
26pub trait DvHandler: Send + 'static {
27 fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()>;
28
29 async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>>;
30}
31
32pub struct DvMergerExecutor<H>
43where
44 H: DvHandler,
45{
46 _ctx: ActorContextRef,
47 input: Option<Executor>,
48 handler: H,
49}
50
51impl<H> DvMergerExecutor<H>
52where
53 H: DvHandler,
54{
55 pub fn new(ctx: ActorContextRef, input: Executor, handler: H) -> Self {
56 Self {
57 _ctx: ctx,
58 input: Some(input),
59 handler,
60 }
61 }
62
63 #[try_stream(ok = Message, error = StreamExecutorError)]
64 async fn execute_inner(mut self) {
65 let mut input = self.input.take().unwrap().execute();
66
67 let barrier = expect_first_barrier(&mut input).await?;
69 yield Message::Barrier(barrier);
70
71 #[for_await]
72 for msg in input {
73 match msg? {
74 Message::Chunk(chunk) => {
75 for (op, row) in chunk.rows() {
76 debug_assert_eq!(op, risingwave_common::array::Op::Insert);
77 let file_path = row
78 .datum_at(0)
79 .map(|d| d.into_utf8())
80 .context("file_path should not be null")?;
81 let position = row
82 .datum_at(1)
83 .context("position should not be null")?
84 .into_int64();
85 self.handler.write(file_path, position)?;
86 }
87 }
88 Message::Barrier(barrier) => {
89 let _metadata = self.handler.flush().await?.unwrap_or_default();
90 yield Message::Barrier(barrier);
93 }
94 Message::Watermark(w) => {
95 yield Message::Watermark(w);
96 }
97 }
98 }
99 }
100}
101
102impl<H> Execute for DvMergerExecutor<H>
103where
104 H: DvHandler,
105{
106 fn execute(self: Box<Self>) -> BoxedMessageStream {
107 self.execute_inner().boxed()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::collections::BTreeSet;
114 use std::sync::{Arc, Mutex};
115
116 use hashbrown::HashMap;
117 use risingwave_common::array::{Array, ArrayBuilder, I64ArrayBuilder, Op, Utf8ArrayBuilder};
118 use risingwave_common::catalog::{Field, Schema};
119 use risingwave_common::types::DataType;
120 use risingwave_common::util::epoch::test_epoch;
121
122 use super::*;
123 use crate::executor::test_utils::MockSource;
124
125 fn build_delete_position_chunk(positions: &[(&str, i64)]) -> StreamChunk {
126 let len = positions.len();
127 let mut file_path_builder = Utf8ArrayBuilder::new(len);
128 let mut position_builder = I64ArrayBuilder::new(len);
129
130 for (path, offset) in positions {
131 file_path_builder.append(Some(*path));
132 position_builder.append(Some(*offset));
133 }
134
135 StreamChunk::from_parts(
136 vec![Op::Insert; len],
137 risingwave_common::array::DataChunk::new(
138 vec![
139 file_path_builder.finish().into_ref(),
140 position_builder.finish().into_ref(),
141 ],
142 len,
143 ),
144 )
145 }
146
147 #[derive(Clone)]
148 struct DvHandlerMock {
149 existing_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
150 pending_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
151 written_dvs: Arc<Mutex<HashMap<String, BTreeSet<i64>>>>,
152 }
153
154 impl DvHandlerMock {
155 fn new() -> Self {
156 Self {
157 existing_dvs: Arc::new(Mutex::new(HashMap::new())),
158 pending_dvs: Arc::new(Mutex::new(HashMap::new())),
159 written_dvs: Arc::new(Mutex::new(HashMap::new())),
160 }
161 }
162
163 fn with_existing_dv(self, file_path: &str, positions: BTreeSet<i64>) -> Self {
164 self.existing_dvs
165 .lock()
166 .unwrap()
167 .insert(file_path.to_owned(), positions);
168 self
169 }
170 }
171
172 #[async_trait::async_trait]
173 impl DvHandler for DvHandlerMock {
174 fn write(&mut self, path: &str, pos: i64) -> StreamExecutorResult<()> {
175 self.pending_dvs
176 .lock()
177 .unwrap()
178 .entry_ref(path)
179 .or_default()
180 .insert(pos);
181 Ok(())
182 }
183
184 async fn flush(&mut self) -> StreamExecutorResult<Option<SinkMetadata>> {
185 let pending = {
186 let mut pending_dvs = self.pending_dvs.lock().unwrap();
187 std::mem::take(&mut *pending_dvs)
188 };
189 if pending.is_empty() {
190 return Ok(None);
191 }
192
193 let mut existing_dvs = self.existing_dvs.lock().unwrap();
194 let mut written_dvs = self.written_dvs.lock().unwrap();
195 for (file_path, positions) in pending {
196 let mut merged = existing_dvs.get(&file_path).cloned().unwrap_or_default();
197 merged.extend(positions);
198 existing_dvs.insert(file_path.clone(), merged.clone());
199 written_dvs.insert(file_path, merged);
200 }
201 Ok(None)
202 }
203 }
204
205 fn input_schema() -> Schema {
206 Schema::new(vec![
207 Field::unnamed(DataType::Varchar),
208 Field::unnamed(DataType::Int64),
209 ])
210 }
211
212 #[tokio::test]
213 async fn test_dv_merger_basic() {
214 let handler = DvHandlerMock::new();
215 let written_dvs = handler.written_dvs.clone();
216
217 let (mut tx, source) = MockSource::channel();
218 let source = source.into_executor(input_schema(), vec![]);
219
220 let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
221 .boxed()
222 .execute();
223
224 tx.push_barrier(test_epoch(1), false);
225 assert!(executor.next().await.unwrap().unwrap().is_barrier());
226
227 tx.push_chunk(build_delete_position_chunk(&[
228 ("file1.parquet", 0),
229 ("file1.parquet", 3),
230 ("file2.parquet", 1),
231 ]));
232 tx.push_barrier(test_epoch(2), false);
233
234 assert!(executor.next().await.unwrap().unwrap().is_barrier());
235
236 let dvs = written_dvs.lock().unwrap();
237 assert_eq!(dvs.get("file1.parquet").unwrap(), &BTreeSet::from([0, 3]));
238 assert_eq!(dvs.get("file2.parquet").unwrap(), &BTreeSet::from([1]));
239 }
240
241 #[tokio::test]
242 async fn test_dv_merger_merge_with_existing() {
243 let handler =
244 DvHandlerMock::new().with_existing_dv("file1.parquet", BTreeSet::from([0, 5, 10]));
245 let written_dvs = handler.written_dvs.clone();
246
247 let (mut tx, source) = MockSource::channel();
248 let source = source.into_executor(input_schema(), vec![]);
249
250 let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
251 .boxed()
252 .execute();
253
254 tx.push_barrier(test_epoch(1), false);
255 assert!(executor.next().await.unwrap().unwrap().is_barrier());
256
257 tx.push_chunk(build_delete_position_chunk(&[
258 ("file1.parquet", 3),
259 ("file1.parquet", 5),
260 ("file1.parquet", 7),
261 ]));
262 tx.push_barrier(test_epoch(2), false);
263
264 assert!(executor.next().await.unwrap().unwrap().is_barrier());
265
266 let dvs = written_dvs.lock().unwrap();
267 assert_eq!(
268 dvs.get("file1.parquet").unwrap(),
269 &BTreeSet::from([0, 3, 5, 7, 10])
270 );
271 }
272
273 #[tokio::test]
274 async fn test_dv_merger_no_deletes() {
275 let handler = DvHandlerMock::new();
276 let written_dvs = handler.written_dvs.clone();
277
278 let (mut tx, source) = MockSource::channel();
279 let source = source.into_executor(input_schema(), vec![]);
280
281 let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
282 .boxed()
283 .execute();
284
285 tx.push_barrier(test_epoch(1), false);
286 assert!(executor.next().await.unwrap().unwrap().is_barrier());
287
288 tx.push_barrier(test_epoch(2), false);
289 assert!(executor.next().await.unwrap().unwrap().is_barrier());
290
291 assert!(written_dvs.lock().unwrap().is_empty());
292 }
293
294 #[tokio::test]
295 async fn test_dv_merger_multiple_epochs() {
296 let handler = DvHandlerMock::new();
297 let written_dvs = handler.written_dvs.clone();
298
299 let (mut tx, source) = MockSource::channel();
300 let source = source.into_executor(input_schema(), vec![]);
301
302 let mut executor = DvMergerExecutor::new(ActorContext::for_test(123), source, handler)
303 .boxed()
304 .execute();
305
306 tx.push_barrier(test_epoch(1), false);
307 assert!(executor.next().await.unwrap().unwrap().is_barrier());
308
309 tx.push_chunk(build_delete_position_chunk(&[("file1.parquet", 0)]));
310 tx.push_barrier(test_epoch(2), false);
311 assert!(executor.next().await.unwrap().unwrap().is_barrier());
312 assert_eq!(
313 written_dvs.lock().unwrap().get("file1.parquet").unwrap(),
314 &BTreeSet::from([0])
315 );
316
317 tx.push_chunk(build_delete_position_chunk(&[("file1.parquet", 2)]));
318 tx.push_barrier(test_epoch(3), false);
319 assert!(executor.next().await.unwrap().unwrap().is_barrier());
320
321 assert_eq!(
322 written_dvs.lock().unwrap().get("file1.parquet").unwrap(),
323 &BTreeSet::from([0, 2])
324 );
325 }
326}