risingwave_storage/hummock/event_handler/uploader/
spiller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::event_handler::LocalInstanceId;
22use crate::hummock::event_handler::uploader::{
23    LocalInstanceUnsyncData, UnsyncData, UnsyncEpochId, UploadTaskInput,
24};
25
26#[derive(Default)]
27struct EpochSpillableDataInfo {
28    instance_ids: HashSet<LocalInstanceId>,
29    payload_size: usize,
30}
31
32pub(super) struct Spiller<'a> {
33    unsync_data: &'a mut UnsyncData,
34    epoch_info: HashMap<UnsyncEpochId, EpochSpillableDataInfo>,
35    unsync_epoch_id_map: HashMap<(HummockEpoch, TableId), UnsyncEpochId>,
36}
37
38impl<'a> Spiller<'a> {
39    pub(super) fn new(unsync_data: &'a mut UnsyncData) -> Self {
40        let unsync_epoch_id_map: HashMap<_, _> = unsync_data
41            .unsync_epochs
42            .iter()
43            .flat_map(|(unsync_epoch_id, table_ids)| {
44                let epoch = unsync_epoch_id.epoch();
45                let unsync_epoch_id = *unsync_epoch_id;
46                table_ids
47                    .iter()
48                    .map(move |table_id| ((epoch, *table_id), unsync_epoch_id))
49            })
50            .collect();
51        let mut epoch_info: HashMap<_, EpochSpillableDataInfo> = HashMap::new();
52        for instance_data in unsync_data
53            .table_data
54            .values()
55            .flat_map(|table_data| table_data.instance_data.values())
56        {
57            if let Some((epoch, spill_size)) = instance_data.spillable_data_info() {
58                let unsync_epoch_id = unsync_epoch_id_map
59                    .get(&(epoch, instance_data.table_id))
60                    .expect("should exist");
61                let epoch_info = epoch_info.entry(*unsync_epoch_id).or_default();
62                assert!(epoch_info.instance_ids.insert(instance_data.instance_id));
63                epoch_info.payload_size += spill_size;
64            }
65        }
66        Self {
67            unsync_data,
68            epoch_info,
69            unsync_epoch_id_map,
70        }
71    }
72
73    pub(super) fn next_spilled_payload(
74        &mut self,
75    ) -> Option<(HummockEpoch, UploadTaskInput, HashSet<TableId>)> {
76        if let Some(unsync_epoch_id) = self
77            .epoch_info
78            .iter()
79            .max_by(
80                |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| {
81                    info1.payload_size.cmp(&info2.payload_size).then_with(|| {
82                        if !cfg!(test) {
83                            Ordering::Equal
84                        } else {
85                            assert_ne!(table1, table2);
86                            // enforce deterministic spill order in test
87                            // smaller table id will be spilled first.
88                            table2.cmp(table1)
89                        }
90                    })
91                },
92            )
93            .map(|(unsync_epoch_id, _)| *unsync_epoch_id)
94        {
95            let spill_epoch = unsync_epoch_id.epoch();
96            let spill_info = self
97                .epoch_info
98                .remove(&unsync_epoch_id)
99                .expect("should exist");
100            let epoch_info = &mut self.epoch_info;
101            let mut payload = HashMap::new();
102            let mut spilled_table_ids = HashSet::new();
103            for instance_id in spill_info.instance_ids {
104                let table_id = *self
105                    .unsync_data
106                    .instance_table_id
107                    .get(&instance_id)
108                    .expect("should exist");
109                let instance_data = self
110                    .unsync_data
111                    .table_data
112                    .get_mut(&table_id)
113                    .expect("should exist")
114                    .instance_data
115                    .get_mut(&instance_id)
116                    .expect("should exist");
117                let instance_payload = instance_data.spill(spill_epoch);
118                assert!(!instance_payload.is_empty());
119                payload.insert(instance_id, instance_payload);
120                spilled_table_ids.insert(table_id);
121
122                // update the spill info
123                if let Some((new_spill_epoch, size)) = instance_data.spillable_data_info() {
124                    let new_unsync_epoch_id = self
125                        .unsync_epoch_id_map
126                        .get(&(new_spill_epoch, instance_data.table_id))
127                        .expect("should exist");
128                    let info = epoch_info.entry(*new_unsync_epoch_id).or_default();
129                    assert!(info.instance_ids.insert(instance_id));
130                    info.payload_size += size;
131                }
132            }
133            Some((spill_epoch, payload, spilled_table_ids))
134        } else {
135            None
136        }
137    }
138
139    pub(super) fn unsync_data(&mut self) -> &mut UnsyncData {
140        self.unsync_data
141    }
142}
143
144impl LocalInstanceUnsyncData {
145    fn spillable_data_info(&self) -> Option<(HummockEpoch, usize)> {
146        self.sealed_data
147            .back()
148            .or(self.current_epoch_data.as_ref())
149            .and_then(|epoch_data| {
150                if !epoch_data.is_empty() {
151                    Some((
152                        epoch_data.epoch,
153                        epoch_data.imms.iter().map(|imm| imm.size()).sum(),
154                    ))
155                } else {
156                    None
157                }
158            })
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::collections::{HashMap, HashSet};
165    use std::ops::Deref;
166
167    use futures::FutureExt;
168    use futures::future::join_all;
169    use itertools::Itertools;
170    use risingwave_common::catalog::TableId;
171    use risingwave_common::util::epoch::EpochExt;
172    use tokio::sync::oneshot;
173
174    use crate::hummock::event_handler::uploader::test_utils::*;
175    use crate::opts::StorageOpts;
176    use crate::store::SealCurrentEpochOptions;
177
178    #[tokio::test]
179    async fn test_spill_in_order() {
180        let config = StorageOpts {
181            shared_buffer_capacity_mb: 1024 * 1024,
182            shared_buffer_flush_ratio: 0.0,
183            ..Default::default()
184        };
185        let (buffer_tracker, mut uploader, new_task_notifier) =
186            prepare_uploader_order_test(&config, false);
187
188        let table_id1 = TableId::new(1);
189        let table_id2 = TableId::new(2);
190
191        let instance_id1_1 = 1;
192        let instance_id1_2 = 2;
193        let instance_id2 = 3;
194
195        let epoch1 = INITIAL_EPOCH.next_epoch();
196        let epoch2 = epoch1.next_epoch();
197        let epoch3 = epoch2.next_epoch();
198        let epoch4 = epoch3.next_epoch();
199        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
200        let memory_limiter = Some(memory_limiter.deref());
201
202        // epoch1
203        uploader.start_epoch(epoch1, HashSet::from_iter([table_id1]));
204        uploader.start_epoch(epoch1, HashSet::from_iter([table_id2]));
205
206        uploader.init_instance(instance_id1_1, table_id1, epoch1);
207        uploader.init_instance(instance_id1_2, table_id1, epoch1);
208        uploader.init_instance(instance_id2, table_id2, epoch1);
209
210        // naming: imm<table>_<instance>_<epoch>
211        let imm1_1_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await;
212        uploader.add_imm(instance_id1_1, imm1_1_1.clone());
213        let imm1_2_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await;
214        uploader.add_imm(instance_id1_2, imm1_2_1.clone());
215        let imm2_1 = gen_imm_inner(table_id2, epoch1, 0, memory_limiter).await;
216        uploader.add_imm(instance_id2, imm2_1.clone());
217
218        // epoch2
219        uploader.start_epoch(epoch2, HashSet::from_iter([table_id1]));
220        uploader.start_epoch(epoch2, HashSet::from_iter([table_id2]));
221
222        uploader.local_seal_epoch(instance_id1_1, epoch2, SealCurrentEpochOptions::for_test());
223        uploader.local_seal_epoch(instance_id1_2, epoch2, SealCurrentEpochOptions::for_test());
224        uploader.local_seal_epoch(instance_id2, epoch2, SealCurrentEpochOptions::for_test());
225
226        let imms1_1_2 = join_all(
227            [0, 1, 2].map(|offset| gen_imm_inner(table_id1, epoch2, offset, memory_limiter)),
228        )
229        .await;
230        for imm in imms1_1_2.clone() {
231            uploader.add_imm(instance_id1_1, imm);
232        }
233
234        // epoch3
235        uploader.start_epoch(epoch3, HashSet::from_iter([table_id1]));
236        uploader.start_epoch(epoch3, HashSet::from_iter([table_id2]));
237
238        uploader.local_seal_epoch(instance_id1_1, epoch3, SealCurrentEpochOptions::for_test());
239        uploader.local_seal_epoch(instance_id1_2, epoch3, SealCurrentEpochOptions::for_test());
240        uploader.local_seal_epoch(instance_id2, epoch3, SealCurrentEpochOptions::for_test());
241
242        let imms1_2_3 = join_all(
243            [0, 1, 2, 3].map(|offset| gen_imm_inner(table_id1, epoch3, offset, memory_limiter)),
244        )
245        .await;
246        for imm in imms1_2_3.clone() {
247            uploader.add_imm(instance_id1_2, imm);
248        }
249
250        // epoch4
251        uploader.start_epoch(epoch4, HashSet::from_iter([table_id1, table_id2]));
252
253        uploader.local_seal_epoch(instance_id1_1, epoch4, SealCurrentEpochOptions::for_test());
254        uploader.local_seal_epoch(instance_id1_2, epoch4, SealCurrentEpochOptions::for_test());
255        uploader.local_seal_epoch(instance_id2, epoch4, SealCurrentEpochOptions::for_test());
256
257        let imm1_1_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await;
258        uploader.add_imm(instance_id1_1, imm1_1_4.clone());
259        let imm1_2_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await;
260        uploader.add_imm(instance_id1_2, imm1_2_4.clone());
261        let imm2_4_1 = gen_imm_inner(table_id2, epoch4, 0, memory_limiter).await;
262        uploader.add_imm(instance_id2, imm2_4_1.clone());
263
264        // uploader state:
265        //          table_id1:                                      table_id2:
266        //          instance_id1_1:         instance_id1_2:         instance_id2
267        //  epoch1  imm1_1_1                imm1_2_1           |    imm2_1      |
268        //  epoch2  imms1_1_2(size 3)                          |                |
269        //  epoch3                          imms_1_2_3(size 4) |                |
270        //  epoch4  imm1_1_4                imm1_2_4                imm2_4_1    |
271
272        let (await_start1_1, finish_tx1_1) = new_task_notifier(HashMap::from_iter([
273            (instance_id1_1, vec![imm1_1_1.batch_id()]),
274            (instance_id1_2, vec![imm1_2_1.batch_id()]),
275        ]));
276        let (await_start3, finish_tx3) = new_task_notifier(HashMap::from_iter([(
277            instance_id1_2,
278            imms1_2_3
279                .iter()
280                .rev()
281                .map(|imm| imm.batch_id())
282                .collect_vec(),
283        )]));
284        let (await_start2, finish_tx2) = new_task_notifier(HashMap::from_iter([(
285            instance_id1_1,
286            imms1_1_2
287                .iter()
288                .rev()
289                .map(|imm| imm.batch_id())
290                .collect_vec(),
291        )]));
292        let (await_start1_4, finish_tx1_4) = new_task_notifier(HashMap::from_iter([
293            (instance_id1_1, vec![imm1_1_4.batch_id()]),
294            (instance_id1_2, vec![imm1_2_4.batch_id()]),
295        ]));
296        let (await_start2_1, finish_tx2_1) = new_task_notifier(HashMap::from_iter([(
297            instance_id2,
298            vec![imm2_1.batch_id()],
299        )]));
300        let (await_start2_4_1, finish_tx2_4_1) = new_task_notifier(HashMap::from_iter([(
301            instance_id2,
302            vec![imm2_4_1.batch_id()],
303        )]));
304
305        uploader.may_flush();
306        await_start1_1.await;
307        await_start3.await;
308        await_start2.await;
309        await_start1_4.await;
310        await_start2_1.await;
311        await_start2_4_1.await;
312
313        assert_uploader_pending(&mut uploader).await;
314
315        let imm2_4_2 = gen_imm_inner(table_id2, epoch4, 1, memory_limiter).await;
316        uploader.add_imm(instance_id2, imm2_4_2.clone());
317
318        uploader.local_seal_epoch(
319            instance_id1_1,
320            u64::MAX,
321            SealCurrentEpochOptions::for_test(),
322        );
323        uploader.local_seal_epoch(
324            instance_id1_2,
325            u64::MAX,
326            SealCurrentEpochOptions::for_test(),
327        );
328        uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test());
329
330        // uploader state:
331        //          table_id1:                                      table_id2:
332        //          instance_id1_1:         instance_id1_2:         instance_id2
333        //  epoch1  spill(imm1_1_1, imm1_2_1, size 2)          |    spill(imm2_1, size 1)               |
334        //  epoch2  spill(imms1_1_2, size 3)                   |                                        |
335        //  epoch3                   spill(imms_1_2_3, size 4) |                                        |
336        //  epoch4  spill(imm1_1_4, imm1_2_4, size 2)               spill(imm2_4_1, size 1), imm2_4_2   |
337
338        let (sync_tx1_1, sync_rx1_1) = oneshot::channel();
339        uploader.start_single_epoch_sync(epoch1, sync_tx1_1, HashSet::from_iter([table_id1]));
340        let (sync_tx2_1, sync_rx2_1) = oneshot::channel();
341        uploader.start_single_epoch_sync(epoch2, sync_tx2_1, HashSet::from_iter([table_id1]));
342        let (sync_tx3_1, sync_rx3_1) = oneshot::channel();
343        uploader.start_single_epoch_sync(epoch3, sync_tx3_1, HashSet::from_iter([table_id1]));
344        let (sync_tx1_2, sync_rx1_2) = oneshot::channel();
345        uploader.start_single_epoch_sync(epoch1, sync_tx1_2, HashSet::from_iter([table_id2]));
346        let (sync_tx2_2, sync_rx2_2) = oneshot::channel();
347        uploader.start_single_epoch_sync(epoch2, sync_tx2_2, HashSet::from_iter([table_id2]));
348        let (sync_tx3_2, sync_rx3_2) = oneshot::channel();
349        uploader.start_single_epoch_sync(epoch3, sync_tx3_2, HashSet::from_iter([table_id2]));
350
351        let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([(
352            instance_id2,
353            vec![imm2_4_2.batch_id()],
354        )]));
355
356        finish_tx2_4_1.send(()).unwrap();
357        finish_tx3.send(()).unwrap();
358        finish_tx2.send(()).unwrap();
359        finish_tx1_4.send(()).unwrap();
360        assert_uploader_pending(&mut uploader).await;
361
362        finish_tx1_1.send(()).unwrap();
363        {
364            let imm_ids = HashMap::from_iter([
365                (instance_id1_1, vec![imm1_1_1.batch_id()]),
366                (instance_id1_2, vec![imm1_2_1.batch_id()]),
367            ]);
368            let sst = uploader.next_uploaded_sst().await;
369            assert_eq!(&imm_ids, sst.imm_ids());
370            let synced_data = sync_rx1_1.await.unwrap().unwrap();
371            assert_eq!(synced_data.uploaded_ssts.len(), 1);
372            assert_eq!(&imm_ids, synced_data.uploaded_ssts[0].imm_ids());
373        }
374        {
375            let imm_ids3 = HashMap::from_iter([(
376                instance_id1_2,
377                imms1_2_3
378                    .iter()
379                    .rev()
380                    .map(|imm| imm.batch_id())
381                    .collect_vec(),
382            )]);
383            let imm_ids2 = HashMap::from_iter([(
384                instance_id1_1,
385                imms1_1_2
386                    .iter()
387                    .rev()
388                    .map(|imm| imm.batch_id())
389                    .collect_vec(),
390            )]);
391            let sst = uploader.next_uploaded_sst().await;
392            assert_eq!(&imm_ids3, sst.imm_ids());
393            let sst = uploader.next_uploaded_sst().await;
394            assert_eq!(&imm_ids2, sst.imm_ids());
395            let synced_data = sync_rx2_1.await.unwrap().unwrap();
396            assert_eq!(synced_data.uploaded_ssts.len(), 1);
397            assert_eq!(&imm_ids2, synced_data.uploaded_ssts[0].imm_ids());
398            let synced_data = sync_rx3_1.await.unwrap().unwrap();
399            assert_eq!(synced_data.uploaded_ssts.len(), 1);
400            assert_eq!(&imm_ids3, synced_data.uploaded_ssts[0].imm_ids());
401        }
402        {
403            let imm_ids1_4 = HashMap::from_iter([
404                (instance_id1_1, vec![imm1_1_4.batch_id()]),
405                (instance_id1_2, vec![imm1_2_4.batch_id()]),
406            ]);
407            let imm_ids2_1 = HashMap::from_iter([(instance_id2, vec![imm2_1.batch_id()])]);
408            let imm_ids2_4_1 = HashMap::from_iter([(instance_id2, vec![imm2_4_1.batch_id()])]);
409            finish_tx2_1.send(()).unwrap();
410            let sst = uploader.next_uploaded_sst().await;
411            assert_eq!(&imm_ids1_4, sst.imm_ids());
412
413            // trigger the sync after the spill task is finished and acked to cover the case
414            let (sync_tx4, mut sync_rx4) = oneshot::channel();
415            uploader.start_single_epoch_sync(
416                epoch4,
417                sync_tx4,
418                HashSet::from_iter([table_id1, table_id2]),
419            );
420            await_start2_4_2.await;
421
422            let sst = uploader.next_uploaded_sst().await;
423            assert_eq!(&imm_ids2_1, sst.imm_ids());
424            let sst = uploader.next_uploaded_sst().await;
425            assert_eq!(&imm_ids2_4_1, sst.imm_ids());
426            let synced_data = sync_rx1_2.await.unwrap().unwrap();
427            assert_eq!(synced_data.uploaded_ssts.len(), 1);
428            assert_eq!(&imm_ids2_1, synced_data.uploaded_ssts[0].imm_ids());
429            let synced_data = sync_rx2_2.await.unwrap().unwrap();
430            assert!(synced_data.uploaded_ssts.is_empty());
431            let synced_data = sync_rx3_2.await.unwrap().unwrap();
432            assert!(synced_data.uploaded_ssts.is_empty());
433
434            let imm_ids2_4_2 = HashMap::from_iter([(instance_id2, vec![imm2_4_2.batch_id()])]);
435
436            assert!((&mut sync_rx4).now_or_never().is_none());
437            finish_tx2_4_2.send(()).unwrap();
438            let sst = uploader.next_uploaded_sst().await;
439            assert_eq!(&imm_ids2_4_2, sst.imm_ids());
440            let synced_data = sync_rx4.await.unwrap().unwrap();
441            assert_eq!(synced_data.uploaded_ssts.len(), 3);
442            assert_eq!(&imm_ids2_4_2, synced_data.uploaded_ssts[0].imm_ids());
443            assert_eq!(&imm_ids2_4_1, synced_data.uploaded_ssts[1].imm_ids());
444            assert_eq!(&imm_ids1_4, synced_data.uploaded_ssts[2].imm_ids());
445        }
446    }
447}