risingwave_storage/hummock/event_handler/uploader/
spiller.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::event_handler::LocalInstanceId;
22use crate::hummock::event_handler::uploader::{
23    LocalInstanceUnsyncData, UnsyncData, UnsyncEpochId, UploadTaskInput,
24};
25
26#[derive(Default)]
27struct EpochSpillableDataInfo {
28    instance_ids: HashSet<LocalInstanceId>,
29    payload_size: usize,
30}
31
32pub(super) struct Spiller<'a> {
33    unsync_data: &'a mut UnsyncData,
34    epoch_info: HashMap<UnsyncEpochId, EpochSpillableDataInfo>,
35    unsync_epoch_id_map: HashMap<(HummockEpoch, TableId), UnsyncEpochId>,
36}
37
38impl<'a> Spiller<'a> {
39    pub(super) fn new(unsync_data: &'a mut UnsyncData) -> Self {
40        let unsync_epoch_id_map: HashMap<_, _> = unsync_data
41            .unsync_epochs
42            .iter()
43            .flat_map(|(unsync_epoch_id, table_ids)| {
44                let epoch = unsync_epoch_id.epoch();
45                let unsync_epoch_id = *unsync_epoch_id;
46                table_ids
47                    .iter()
48                    .map(move |table_id| ((epoch, *table_id), unsync_epoch_id))
49            })
50            .collect();
51        let mut epoch_info: HashMap<_, EpochSpillableDataInfo> = HashMap::new();
52        for instance_data in unsync_data
53            .table_data
54            .values()
55            .flat_map(|table_data| table_data.instance_data.values())
56        {
57            if let Some((epoch, spill_size)) = instance_data.spillable_data_info() {
58                let unsync_epoch_id = unsync_epoch_id_map
59                    .get(&(epoch, instance_data.table_id))
60                    .expect("should exist");
61                let epoch_info = epoch_info.entry(*unsync_epoch_id).or_default();
62                assert!(epoch_info.instance_ids.insert(instance_data.instance_id));
63                epoch_info.payload_size += spill_size;
64            }
65        }
66        Self {
67            unsync_data,
68            epoch_info,
69            unsync_epoch_id_map,
70        }
71    }
72
73    pub(super) fn next_spilled_payload(
74        &mut self,
75    ) -> Option<(HummockEpoch, UploadTaskInput, HashSet<TableId>)> {
76        if let Some(unsync_epoch_id) = self
77            .epoch_info
78            .iter()
79            .max_by(
80                |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| {
81                    info1.payload_size.cmp(&info2.payload_size).then_with(|| {
82                        if !cfg!(test) {
83                            Ordering::Equal
84                        } else {
85                            assert_ne!(table1, table2);
86                            // enforce deterministic spill order in test
87                            // smaller table id will be spilled first.
88                            table2.cmp(table1)
89                        }
90                    })
91                },
92            )
93            .map(|(unsync_epoch_id, _)| *unsync_epoch_id)
94        {
95            let spill_epoch = unsync_epoch_id.epoch();
96            let spill_info = self
97                .epoch_info
98                .remove(&unsync_epoch_id)
99                .expect("should exist");
100            let epoch_info = &mut self.epoch_info;
101            let mut payload = HashMap::new();
102            let mut spilled_table_ids = HashSet::new();
103            for instance_id in spill_info.instance_ids {
104                let table_id = *self
105                    .unsync_data
106                    .instance_table_id
107                    .get(&instance_id)
108                    .expect("should exist");
109                let instance_data = self
110                    .unsync_data
111                    .table_data
112                    .get_mut(&table_id)
113                    .expect("should exist")
114                    .instance_data
115                    .get_mut(&instance_id)
116                    .expect("should exist");
117                let instance_payload = instance_data.spill(spill_epoch);
118                assert!(!instance_payload.is_empty());
119                payload.insert(instance_id, instance_payload);
120                spilled_table_ids.insert(table_id);
121
122                // update the spill info
123                if let Some((new_spill_epoch, size)) = instance_data.spillable_data_info() {
124                    let new_unsync_epoch_id = self
125                        .unsync_epoch_id_map
126                        .get(&(new_spill_epoch, instance_data.table_id))
127                        .expect("should exist");
128                    let info = epoch_info.entry(*new_unsync_epoch_id).or_default();
129                    assert!(info.instance_ids.insert(instance_id));
130                    info.payload_size += size;
131                }
132            }
133            Some((spill_epoch, payload, spilled_table_ids))
134        } else {
135            None
136        }
137    }
138
139    pub(super) fn unsync_data(&mut self) -> &mut UnsyncData {
140        self.unsync_data
141    }
142}
143
144impl LocalInstanceUnsyncData {
145    fn spillable_data_info(&self) -> Option<(HummockEpoch, usize)> {
146        self.sealed_data
147            .back()
148            .or(self.current_epoch_data.as_ref())
149            .and_then(|epoch_data| {
150                if !epoch_data.is_empty() {
151                    Some((
152                        epoch_data.epoch,
153                        epoch_data.imms.iter().map(|imm| imm.size()).sum(),
154                    ))
155                } else {
156                    None
157                }
158            })
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::collections::{HashMap, HashSet};
165
166    use futures::FutureExt;
167    use itertools::Itertools;
168    use risingwave_common::catalog::TableId;
169    use risingwave_common::util::epoch::EpochExt;
170    use tokio::sync::oneshot;
171
172    use crate::hummock::event_handler::uploader::test_utils::*;
173    use crate::opts::StorageOpts;
174    use crate::store::SealCurrentEpochOptions;
175
176    #[tokio::test]
177    async fn test_spill_in_order() {
178        let config = StorageOpts {
179            shared_buffer_capacity_mb: 1024 * 1024,
180            shared_buffer_flush_ratio: 0.0,
181            ..Default::default()
182        };
183        let (buffer_tracker, mut uploader, new_task_notifier) =
184            prepare_uploader_order_test(&config, false);
185        let mut sst_collector = UploadedSstCollector::default();
186
187        let table_id1 = TableId::new(1);
188        let table_id2 = TableId::new(2);
189
190        let instance_id1_1 = 1;
191        let instance_id1_2 = 2;
192        let instance_id2 = 3;
193
194        let epoch1 = INITIAL_EPOCH.next_epoch();
195        let epoch2 = epoch1.next_epoch();
196        let epoch3 = epoch2.next_epoch();
197        let epoch4 = epoch3.next_epoch();
198        let memory_limiter = buffer_tracker.get_memory_limiter().clone();
199        let memory_limiter = Some(memory_limiter.as_ref());
200
201        // epoch1
202        uploader.start_epoch(epoch1, HashSet::from_iter([table_id1]));
203        uploader.start_epoch(epoch1, HashSet::from_iter([table_id2]));
204
205        uploader.init_instance(instance_id1_1, table_id1, epoch1);
206        uploader.init_instance(instance_id1_2, table_id1, epoch1);
207        uploader.init_instance(instance_id2, table_id2, epoch1);
208
209        // naming: imm<table>_<instance>_<epoch>
210        let imm1_1_1 = gen_imm_inner(table_id1, epoch1, 0);
211        let tracker = tracker_for_test(&imm1_1_1, memory_limiter).await;
212        uploader.add_imm(instance_id1_1, (imm1_1_1.clone(), tracker));
213        let imm1_2_1 = gen_imm_inner(table_id1, epoch1, 0);
214        let tracker = tracker_for_test(&imm1_2_1, memory_limiter).await;
215        uploader.add_imm(instance_id1_2, (imm1_2_1.clone(), tracker));
216        let imm2_1 = gen_imm_inner(table_id2, epoch1, 0);
217        let tracker = tracker_for_test(&imm2_1, memory_limiter).await;
218        uploader.add_imm(instance_id2, (imm2_1.clone(), tracker));
219
220        // epoch2
221        uploader.start_epoch(epoch2, HashSet::from_iter([table_id1]));
222        uploader.start_epoch(epoch2, HashSet::from_iter([table_id2]));
223
224        uploader.local_seal_epoch(instance_id1_1, epoch2, SealCurrentEpochOptions::for_test());
225        uploader.local_seal_epoch(instance_id1_2, epoch2, SealCurrentEpochOptions::for_test());
226        uploader.local_seal_epoch(instance_id2, epoch2, SealCurrentEpochOptions::for_test());
227
228        let imms1_1_2 = [0, 1, 2]
229            .into_iter()
230            .map(|offset| gen_imm_inner(table_id1, epoch2, offset))
231            .collect_vec();
232        for imm in imms1_1_2.clone() {
233            let tracker = tracker_for_test(&imm, memory_limiter).await;
234            uploader.add_imm(instance_id1_1, (imm, tracker));
235        }
236
237        // epoch3
238        uploader.start_epoch(epoch3, HashSet::from_iter([table_id1]));
239        uploader.start_epoch(epoch3, HashSet::from_iter([table_id2]));
240
241        uploader.local_seal_epoch(instance_id1_1, epoch3, SealCurrentEpochOptions::for_test());
242        uploader.local_seal_epoch(instance_id1_2, epoch3, SealCurrentEpochOptions::for_test());
243        uploader.local_seal_epoch(instance_id2, epoch3, SealCurrentEpochOptions::for_test());
244
245        let imms1_2_3 = [0, 1, 2, 3]
246            .into_iter()
247            .map(|offset| gen_imm_inner(table_id1, epoch3, offset))
248            .collect_vec();
249        for imm in imms1_2_3.clone() {
250            let tracker = tracker_for_test(&imm, memory_limiter).await;
251            uploader.add_imm(instance_id1_2, (imm, tracker));
252        }
253
254        // epoch4
255        uploader.start_epoch(epoch4, HashSet::from_iter([table_id1, table_id2]));
256
257        uploader.local_seal_epoch(instance_id1_1, epoch4, SealCurrentEpochOptions::for_test());
258        uploader.local_seal_epoch(instance_id1_2, epoch4, SealCurrentEpochOptions::for_test());
259        uploader.local_seal_epoch(instance_id2, epoch4, SealCurrentEpochOptions::for_test());
260
261        let imm1_1_4 = gen_imm_inner(table_id1, epoch4, 0);
262        let tracker = tracker_for_test(&imm1_1_4, memory_limiter).await;
263        uploader.add_imm(instance_id1_1, (imm1_1_4.clone(), tracker));
264        let imm1_2_4 = gen_imm_inner(table_id1, epoch4, 0);
265        let tracker = tracker_for_test(&imm1_2_4, memory_limiter).await;
266        uploader.add_imm(instance_id1_2, (imm1_2_4.clone(), tracker));
267        let imm2_4_1 = gen_imm_inner(table_id2, epoch4, 0);
268        let tracker = tracker_for_test(&imm2_4_1, memory_limiter).await;
269        uploader.add_imm(instance_id2, (imm2_4_1.clone(), tracker));
270
271        // uploader state:
272        //          table_id1:                                      table_id2:
273        //          instance_id1_1:         instance_id1_2:         instance_id2
274        //  epoch1  imm1_1_1                imm1_2_1           |    imm2_1      |
275        //  epoch2  imms1_1_2(size 3)                          |                |
276        //  epoch3                          imms_1_2_3(size 4) |                |
277        //  epoch4  imm1_1_4                imm1_2_4                imm2_4_1    |
278
279        let (await_start1_1, finish_tx1_1) = new_task_notifier(HashMap::from_iter([
280            (instance_id1_1, vec![imm1_1_1.batch_id()]),
281            (instance_id1_2, vec![imm1_2_1.batch_id()]),
282        ]));
283        let (await_start3, finish_tx3) = new_task_notifier(HashMap::from_iter([(
284            instance_id1_2,
285            imms1_2_3
286                .iter()
287                .rev()
288                .map(|imm| imm.batch_id())
289                .collect_vec(),
290        )]));
291        let (await_start2, finish_tx2) = new_task_notifier(HashMap::from_iter([(
292            instance_id1_1,
293            imms1_1_2
294                .iter()
295                .rev()
296                .map(|imm| imm.batch_id())
297                .collect_vec(),
298        )]));
299        let (await_start1_4, finish_tx1_4) = new_task_notifier(HashMap::from_iter([
300            (instance_id1_1, vec![imm1_1_4.batch_id()]),
301            (instance_id1_2, vec![imm1_2_4.batch_id()]),
302        ]));
303        let (await_start2_1, finish_tx2_1) = new_task_notifier(HashMap::from_iter([(
304            instance_id2,
305            vec![imm2_1.batch_id()],
306        )]));
307        let (await_start2_4_1, finish_tx2_4_1) = new_task_notifier(HashMap::from_iter([(
308            instance_id2,
309            vec![imm2_4_1.batch_id()],
310        )]));
311
312        uploader.may_flush_for_test();
313        await_start1_1.await;
314        await_start3.await;
315        await_start2.await;
316        await_start1_4.await;
317        await_start2_1.await;
318        await_start2_4_1.await;
319
320        assert_uploader_pending(&mut uploader).await;
321
322        let imm2_4_2 = gen_imm_inner(table_id2, epoch4, 1);
323        let tracker = tracker_for_test(&imm2_4_2, memory_limiter).await;
324        uploader.add_imm(instance_id2, (imm2_4_2.clone(), tracker));
325
326        uploader.local_seal_epoch(
327            instance_id1_1,
328            u64::MAX,
329            SealCurrentEpochOptions::for_test(),
330        );
331        uploader.local_seal_epoch(
332            instance_id1_2,
333            u64::MAX,
334            SealCurrentEpochOptions::for_test(),
335        );
336        uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test());
337
338        // uploader state:
339        //          table_id1:                                      table_id2:
340        //          instance_id1_1:         instance_id1_2:         instance_id2
341        //  epoch1  spill(imm1_1_1, imm1_2_1, size 2)          |    spill(imm2_1, size 1)               |
342        //  epoch2  spill(imms1_1_2, size 3)                   |                                        |
343        //  epoch3                   spill(imms_1_2_3, size 4) |                                        |
344        //  epoch4  spill(imm1_1_4, imm1_2_4, size 2)               spill(imm2_4_1, size 1), imm2_4_2   |
345
346        let (sync_tx1_1, sync_rx1_1) = oneshot::channel();
347        uploader.start_single_epoch_sync(epoch1, sync_tx1_1, HashSet::from_iter([table_id1]));
348        let (sync_tx2_1, sync_rx2_1) = oneshot::channel();
349        uploader.start_single_epoch_sync(epoch2, sync_tx2_1, HashSet::from_iter([table_id1]));
350        let (sync_tx3_1, sync_rx3_1) = oneshot::channel();
351        uploader.start_single_epoch_sync(epoch3, sync_tx3_1, HashSet::from_iter([table_id1]));
352        let (sync_tx1_2, sync_rx1_2) = oneshot::channel();
353        uploader.start_single_epoch_sync(epoch1, sync_tx1_2, HashSet::from_iter([table_id2]));
354        let (sync_tx2_2, sync_rx2_2) = oneshot::channel();
355        uploader.start_single_epoch_sync(epoch2, sync_tx2_2, HashSet::from_iter([table_id2]));
356        let (sync_tx3_2, sync_rx3_2) = oneshot::channel();
357        uploader.start_single_epoch_sync(epoch3, sync_tx3_2, HashSet::from_iter([table_id2]));
358
359        let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([(
360            instance_id2,
361            vec![imm2_4_2.batch_id()],
362        )]));
363
364        finish_tx2_4_1.send(()).unwrap();
365        finish_tx3.send(()).unwrap();
366        finish_tx2.send(()).unwrap();
367        finish_tx1_4.send(()).unwrap();
368        assert_uploader_pending(&mut uploader).await;
369
370        finish_tx1_1.send(()).unwrap();
371        {
372            let imm_ids = HashMap::from_iter([
373                (instance_id1_1, vec![imm1_1_1.batch_id()]),
374                (instance_id1_2, vec![imm1_2_1.batch_id()]),
375            ]);
376            let sst = sst_collector.next(&mut uploader).await;
377            assert_eq!(&imm_ids, sst.imm_ids());
378            let synced_data = sync_rx1_1.await.unwrap().unwrap();
379            assert_eq!(synced_data.uploaded_ssts.len(), 1);
380            assert_eq!(&imm_ids, synced_data.uploaded_ssts[0].imm_ids());
381        }
382        {
383            let imm_ids3 = HashMap::from_iter([(
384                instance_id1_2,
385                imms1_2_3
386                    .iter()
387                    .rev()
388                    .map(|imm| imm.batch_id())
389                    .collect_vec(),
390            )]);
391            let imm_ids2 = HashMap::from_iter([(
392                instance_id1_1,
393                imms1_1_2
394                    .iter()
395                    .rev()
396                    .map(|imm| imm.batch_id())
397                    .collect_vec(),
398            )]);
399            let sst = sst_collector.next(&mut uploader).await;
400            assert_eq!(&imm_ids3, sst.imm_ids());
401            let sst = sst_collector.next(&mut uploader).await;
402            assert_eq!(&imm_ids2, sst.imm_ids());
403            let synced_data = sync_rx2_1.await.unwrap().unwrap();
404            assert_eq!(synced_data.uploaded_ssts.len(), 1);
405            assert_eq!(&imm_ids2, synced_data.uploaded_ssts[0].imm_ids());
406            let synced_data = sync_rx3_1.await.unwrap().unwrap();
407            assert_eq!(synced_data.uploaded_ssts.len(), 1);
408            assert_eq!(&imm_ids3, synced_data.uploaded_ssts[0].imm_ids());
409        }
410        {
411            let imm_ids1_4 = HashMap::from_iter([
412                (instance_id1_1, vec![imm1_1_4.batch_id()]),
413                (instance_id1_2, vec![imm1_2_4.batch_id()]),
414            ]);
415            let imm_ids2_1 = HashMap::from_iter([(instance_id2, vec![imm2_1.batch_id()])]);
416            let imm_ids2_4_1 = HashMap::from_iter([(instance_id2, vec![imm2_4_1.batch_id()])]);
417            finish_tx2_1.send(()).unwrap();
418            let sst = sst_collector.next(&mut uploader).await;
419            assert_eq!(&imm_ids1_4, sst.imm_ids());
420
421            // trigger the sync after the spill task is finished and acked to cover the case
422            let (sync_tx4, mut sync_rx4) = oneshot::channel();
423            uploader.start_single_epoch_sync(
424                epoch4,
425                sync_tx4,
426                HashSet::from_iter([table_id1, table_id2]),
427            );
428            await_start2_4_2.await;
429
430            let sst = sst_collector.next(&mut uploader).await;
431            assert_eq!(&imm_ids2_1, sst.imm_ids());
432            let sst = sst_collector.next(&mut uploader).await;
433            assert_eq!(&imm_ids2_4_1, sst.imm_ids());
434            let synced_data = sync_rx1_2.await.unwrap().unwrap();
435            assert_eq!(synced_data.uploaded_ssts.len(), 1);
436            assert_eq!(&imm_ids2_1, synced_data.uploaded_ssts[0].imm_ids());
437            let synced_data = sync_rx2_2.await.unwrap().unwrap();
438            assert!(synced_data.uploaded_ssts.is_empty());
439            let synced_data = sync_rx3_2.await.unwrap().unwrap();
440            assert!(synced_data.uploaded_ssts.is_empty());
441
442            let imm_ids2_4_2 = HashMap::from_iter([(instance_id2, vec![imm2_4_2.batch_id()])]);
443
444            assert!((&mut sync_rx4).now_or_never().is_none());
445            finish_tx2_4_2.send(()).unwrap();
446            let sst = sst_collector.next(&mut uploader).await;
447            assert_eq!(&imm_ids2_4_2, sst.imm_ids());
448            let synced_data = sync_rx4.await.unwrap().unwrap();
449            assert_eq!(synced_data.uploaded_ssts.len(), 3);
450            assert_eq!(&imm_ids2_4_2, synced_data.uploaded_ssts[0].imm_ids());
451            assert_eq!(&imm_ids2_4_1, synced_data.uploaded_ssts[1].imm_ids());
452            assert_eq!(&imm_ids1_4, synced_data.uploaded_ssts[2].imm_ids());
453        }
454    }
455}