1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::event_handler::LocalInstanceId;
22use crate::hummock::event_handler::uploader::{
23 LocalInstanceUnsyncData, UnsyncData, UnsyncEpochId, UploadTaskInput,
24};
25
26#[derive(Default)]
27struct EpochSpillableDataInfo {
28 instance_ids: HashSet<LocalInstanceId>,
29 payload_size: usize,
30}
31
32pub(super) struct Spiller<'a> {
33 unsync_data: &'a mut UnsyncData,
34 epoch_info: HashMap<UnsyncEpochId, EpochSpillableDataInfo>,
35 unsync_epoch_id_map: HashMap<(HummockEpoch, TableId), UnsyncEpochId>,
36}
37
38impl<'a> Spiller<'a> {
39 pub(super) fn new(unsync_data: &'a mut UnsyncData) -> Self {
40 let unsync_epoch_id_map: HashMap<_, _> = unsync_data
41 .unsync_epochs
42 .iter()
43 .flat_map(|(unsync_epoch_id, table_ids)| {
44 let epoch = unsync_epoch_id.epoch();
45 let unsync_epoch_id = *unsync_epoch_id;
46 table_ids
47 .iter()
48 .map(move |table_id| ((epoch, *table_id), unsync_epoch_id))
49 })
50 .collect();
51 let mut epoch_info: HashMap<_, EpochSpillableDataInfo> = HashMap::new();
52 for instance_data in unsync_data
53 .table_data
54 .values()
55 .flat_map(|table_data| table_data.instance_data.values())
56 {
57 if let Some((epoch, spill_size)) = instance_data.spillable_data_info() {
58 let unsync_epoch_id = unsync_epoch_id_map
59 .get(&(epoch, instance_data.table_id))
60 .expect("should exist");
61 let epoch_info = epoch_info.entry(*unsync_epoch_id).or_default();
62 assert!(epoch_info.instance_ids.insert(instance_data.instance_id));
63 epoch_info.payload_size += spill_size;
64 }
65 }
66 Self {
67 unsync_data,
68 epoch_info,
69 unsync_epoch_id_map,
70 }
71 }
72
73 pub(super) fn next_spilled_payload(
74 &mut self,
75 ) -> Option<(HummockEpoch, UploadTaskInput, HashSet<TableId>)> {
76 if let Some(unsync_epoch_id) = self
77 .epoch_info
78 .iter()
79 .max_by(
80 |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| {
81 info1.payload_size.cmp(&info2.payload_size).then_with(|| {
82 if !cfg!(test) {
83 Ordering::Equal
84 } else {
85 assert_ne!(table1, table2);
86 table2.cmp(table1)
89 }
90 })
91 },
92 )
93 .map(|(unsync_epoch_id, _)| *unsync_epoch_id)
94 {
95 let spill_epoch = unsync_epoch_id.epoch();
96 let spill_info = self
97 .epoch_info
98 .remove(&unsync_epoch_id)
99 .expect("should exist");
100 let epoch_info = &mut self.epoch_info;
101 let mut payload = HashMap::new();
102 let mut spilled_table_ids = HashSet::new();
103 for instance_id in spill_info.instance_ids {
104 let table_id = *self
105 .unsync_data
106 .instance_table_id
107 .get(&instance_id)
108 .expect("should exist");
109 let instance_data = self
110 .unsync_data
111 .table_data
112 .get_mut(&table_id)
113 .expect("should exist")
114 .instance_data
115 .get_mut(&instance_id)
116 .expect("should exist");
117 let instance_payload = instance_data.spill(spill_epoch);
118 assert!(!instance_payload.is_empty());
119 payload.insert(instance_id, instance_payload);
120 spilled_table_ids.insert(table_id);
121
122 if let Some((new_spill_epoch, size)) = instance_data.spillable_data_info() {
124 let new_unsync_epoch_id = self
125 .unsync_epoch_id_map
126 .get(&(new_spill_epoch, instance_data.table_id))
127 .expect("should exist");
128 let info = epoch_info.entry(*new_unsync_epoch_id).or_default();
129 assert!(info.instance_ids.insert(instance_id));
130 info.payload_size += size;
131 }
132 }
133 Some((spill_epoch, payload, spilled_table_ids))
134 } else {
135 None
136 }
137 }
138
139 pub(super) fn unsync_data(&mut self) -> &mut UnsyncData {
140 self.unsync_data
141 }
142}
143
144impl LocalInstanceUnsyncData {
145 fn spillable_data_info(&self) -> Option<(HummockEpoch, usize)> {
146 self.sealed_data
147 .back()
148 .or(self.current_epoch_data.as_ref())
149 .and_then(|epoch_data| {
150 if !epoch_data.is_empty() {
151 Some((
152 epoch_data.epoch,
153 epoch_data.imms.iter().map(|imm| imm.size()).sum(),
154 ))
155 } else {
156 None
157 }
158 })
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::collections::{HashMap, HashSet};
165
166 use futures::FutureExt;
167 use itertools::Itertools;
168 use risingwave_common::catalog::TableId;
169 use risingwave_common::util::epoch::EpochExt;
170 use tokio::sync::oneshot;
171
172 use crate::hummock::event_handler::uploader::test_utils::*;
173 use crate::opts::StorageOpts;
174 use crate::store::SealCurrentEpochOptions;
175
176 #[tokio::test]
177 async fn test_spill_in_order() {
178 let config = StorageOpts {
179 shared_buffer_capacity_mb: 1024 * 1024,
180 shared_buffer_flush_ratio: 0.0,
181 ..Default::default()
182 };
183 let (buffer_tracker, mut uploader, new_task_notifier) =
184 prepare_uploader_order_test(&config, false);
185 let mut sst_collector = UploadedSstCollector::default();
186
187 let table_id1 = TableId::new(1);
188 let table_id2 = TableId::new(2);
189
190 let instance_id1_1 = 1;
191 let instance_id1_2 = 2;
192 let instance_id2 = 3;
193
194 let epoch1 = INITIAL_EPOCH.next_epoch();
195 let epoch2 = epoch1.next_epoch();
196 let epoch3 = epoch2.next_epoch();
197 let epoch4 = epoch3.next_epoch();
198 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
199 let memory_limiter = Some(memory_limiter.as_ref());
200
201 uploader.start_epoch(epoch1, HashSet::from_iter([table_id1]));
203 uploader.start_epoch(epoch1, HashSet::from_iter([table_id2]));
204
205 uploader.init_instance(instance_id1_1, table_id1, epoch1);
206 uploader.init_instance(instance_id1_2, table_id1, epoch1);
207 uploader.init_instance(instance_id2, table_id2, epoch1);
208
209 let imm1_1_1 = gen_imm_inner(table_id1, epoch1, 0);
211 let tracker = tracker_for_test(&imm1_1_1, memory_limiter).await;
212 uploader.add_imm(instance_id1_1, (imm1_1_1.clone(), tracker));
213 let imm1_2_1 = gen_imm_inner(table_id1, epoch1, 0);
214 let tracker = tracker_for_test(&imm1_2_1, memory_limiter).await;
215 uploader.add_imm(instance_id1_2, (imm1_2_1.clone(), tracker));
216 let imm2_1 = gen_imm_inner(table_id2, epoch1, 0);
217 let tracker = tracker_for_test(&imm2_1, memory_limiter).await;
218 uploader.add_imm(instance_id2, (imm2_1.clone(), tracker));
219
220 uploader.start_epoch(epoch2, HashSet::from_iter([table_id1]));
222 uploader.start_epoch(epoch2, HashSet::from_iter([table_id2]));
223
224 uploader.local_seal_epoch(instance_id1_1, epoch2, SealCurrentEpochOptions::for_test());
225 uploader.local_seal_epoch(instance_id1_2, epoch2, SealCurrentEpochOptions::for_test());
226 uploader.local_seal_epoch(instance_id2, epoch2, SealCurrentEpochOptions::for_test());
227
228 let imms1_1_2 = [0, 1, 2]
229 .into_iter()
230 .map(|offset| gen_imm_inner(table_id1, epoch2, offset))
231 .collect_vec();
232 for imm in imms1_1_2.clone() {
233 let tracker = tracker_for_test(&imm, memory_limiter).await;
234 uploader.add_imm(instance_id1_1, (imm, tracker));
235 }
236
237 uploader.start_epoch(epoch3, HashSet::from_iter([table_id1]));
239 uploader.start_epoch(epoch3, HashSet::from_iter([table_id2]));
240
241 uploader.local_seal_epoch(instance_id1_1, epoch3, SealCurrentEpochOptions::for_test());
242 uploader.local_seal_epoch(instance_id1_2, epoch3, SealCurrentEpochOptions::for_test());
243 uploader.local_seal_epoch(instance_id2, epoch3, SealCurrentEpochOptions::for_test());
244
245 let imms1_2_3 = [0, 1, 2, 3]
246 .into_iter()
247 .map(|offset| gen_imm_inner(table_id1, epoch3, offset))
248 .collect_vec();
249 for imm in imms1_2_3.clone() {
250 let tracker = tracker_for_test(&imm, memory_limiter).await;
251 uploader.add_imm(instance_id1_2, (imm, tracker));
252 }
253
254 uploader.start_epoch(epoch4, HashSet::from_iter([table_id1, table_id2]));
256
257 uploader.local_seal_epoch(instance_id1_1, epoch4, SealCurrentEpochOptions::for_test());
258 uploader.local_seal_epoch(instance_id1_2, epoch4, SealCurrentEpochOptions::for_test());
259 uploader.local_seal_epoch(instance_id2, epoch4, SealCurrentEpochOptions::for_test());
260
261 let imm1_1_4 = gen_imm_inner(table_id1, epoch4, 0);
262 let tracker = tracker_for_test(&imm1_1_4, memory_limiter).await;
263 uploader.add_imm(instance_id1_1, (imm1_1_4.clone(), tracker));
264 let imm1_2_4 = gen_imm_inner(table_id1, epoch4, 0);
265 let tracker = tracker_for_test(&imm1_2_4, memory_limiter).await;
266 uploader.add_imm(instance_id1_2, (imm1_2_4.clone(), tracker));
267 let imm2_4_1 = gen_imm_inner(table_id2, epoch4, 0);
268 let tracker = tracker_for_test(&imm2_4_1, memory_limiter).await;
269 uploader.add_imm(instance_id2, (imm2_4_1.clone(), tracker));
270
271 let (await_start1_1, finish_tx1_1) = new_task_notifier(HashMap::from_iter([
280 (instance_id1_1, vec![imm1_1_1.batch_id()]),
281 (instance_id1_2, vec![imm1_2_1.batch_id()]),
282 ]));
283 let (await_start3, finish_tx3) = new_task_notifier(HashMap::from_iter([(
284 instance_id1_2,
285 imms1_2_3
286 .iter()
287 .rev()
288 .map(|imm| imm.batch_id())
289 .collect_vec(),
290 )]));
291 let (await_start2, finish_tx2) = new_task_notifier(HashMap::from_iter([(
292 instance_id1_1,
293 imms1_1_2
294 .iter()
295 .rev()
296 .map(|imm| imm.batch_id())
297 .collect_vec(),
298 )]));
299 let (await_start1_4, finish_tx1_4) = new_task_notifier(HashMap::from_iter([
300 (instance_id1_1, vec![imm1_1_4.batch_id()]),
301 (instance_id1_2, vec![imm1_2_4.batch_id()]),
302 ]));
303 let (await_start2_1, finish_tx2_1) = new_task_notifier(HashMap::from_iter([(
304 instance_id2,
305 vec![imm2_1.batch_id()],
306 )]));
307 let (await_start2_4_1, finish_tx2_4_1) = new_task_notifier(HashMap::from_iter([(
308 instance_id2,
309 vec![imm2_4_1.batch_id()],
310 )]));
311
312 uploader.may_flush_for_test();
313 await_start1_1.await;
314 await_start3.await;
315 await_start2.await;
316 await_start1_4.await;
317 await_start2_1.await;
318 await_start2_4_1.await;
319
320 assert_uploader_pending(&mut uploader).await;
321
322 let imm2_4_2 = gen_imm_inner(table_id2, epoch4, 1);
323 let tracker = tracker_for_test(&imm2_4_2, memory_limiter).await;
324 uploader.add_imm(instance_id2, (imm2_4_2.clone(), tracker));
325
326 uploader.local_seal_epoch(
327 instance_id1_1,
328 u64::MAX,
329 SealCurrentEpochOptions::for_test(),
330 );
331 uploader.local_seal_epoch(
332 instance_id1_2,
333 u64::MAX,
334 SealCurrentEpochOptions::for_test(),
335 );
336 uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test());
337
338 let (sync_tx1_1, sync_rx1_1) = oneshot::channel();
347 uploader.start_single_epoch_sync(epoch1, sync_tx1_1, HashSet::from_iter([table_id1]));
348 let (sync_tx2_1, sync_rx2_1) = oneshot::channel();
349 uploader.start_single_epoch_sync(epoch2, sync_tx2_1, HashSet::from_iter([table_id1]));
350 let (sync_tx3_1, sync_rx3_1) = oneshot::channel();
351 uploader.start_single_epoch_sync(epoch3, sync_tx3_1, HashSet::from_iter([table_id1]));
352 let (sync_tx1_2, sync_rx1_2) = oneshot::channel();
353 uploader.start_single_epoch_sync(epoch1, sync_tx1_2, HashSet::from_iter([table_id2]));
354 let (sync_tx2_2, sync_rx2_2) = oneshot::channel();
355 uploader.start_single_epoch_sync(epoch2, sync_tx2_2, HashSet::from_iter([table_id2]));
356 let (sync_tx3_2, sync_rx3_2) = oneshot::channel();
357 uploader.start_single_epoch_sync(epoch3, sync_tx3_2, HashSet::from_iter([table_id2]));
358
359 let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([(
360 instance_id2,
361 vec![imm2_4_2.batch_id()],
362 )]));
363
364 finish_tx2_4_1.send(()).unwrap();
365 finish_tx3.send(()).unwrap();
366 finish_tx2.send(()).unwrap();
367 finish_tx1_4.send(()).unwrap();
368 assert_uploader_pending(&mut uploader).await;
369
370 finish_tx1_1.send(()).unwrap();
371 {
372 let imm_ids = HashMap::from_iter([
373 (instance_id1_1, vec![imm1_1_1.batch_id()]),
374 (instance_id1_2, vec![imm1_2_1.batch_id()]),
375 ]);
376 let sst = sst_collector.next(&mut uploader).await;
377 assert_eq!(&imm_ids, sst.imm_ids());
378 let synced_data = sync_rx1_1.await.unwrap().unwrap();
379 assert_eq!(synced_data.uploaded_ssts.len(), 1);
380 assert_eq!(&imm_ids, synced_data.uploaded_ssts[0].imm_ids());
381 }
382 {
383 let imm_ids3 = HashMap::from_iter([(
384 instance_id1_2,
385 imms1_2_3
386 .iter()
387 .rev()
388 .map(|imm| imm.batch_id())
389 .collect_vec(),
390 )]);
391 let imm_ids2 = HashMap::from_iter([(
392 instance_id1_1,
393 imms1_1_2
394 .iter()
395 .rev()
396 .map(|imm| imm.batch_id())
397 .collect_vec(),
398 )]);
399 let sst = sst_collector.next(&mut uploader).await;
400 assert_eq!(&imm_ids3, sst.imm_ids());
401 let sst = sst_collector.next(&mut uploader).await;
402 assert_eq!(&imm_ids2, sst.imm_ids());
403 let synced_data = sync_rx2_1.await.unwrap().unwrap();
404 assert_eq!(synced_data.uploaded_ssts.len(), 1);
405 assert_eq!(&imm_ids2, synced_data.uploaded_ssts[0].imm_ids());
406 let synced_data = sync_rx3_1.await.unwrap().unwrap();
407 assert_eq!(synced_data.uploaded_ssts.len(), 1);
408 assert_eq!(&imm_ids3, synced_data.uploaded_ssts[0].imm_ids());
409 }
410 {
411 let imm_ids1_4 = HashMap::from_iter([
412 (instance_id1_1, vec![imm1_1_4.batch_id()]),
413 (instance_id1_2, vec![imm1_2_4.batch_id()]),
414 ]);
415 let imm_ids2_1 = HashMap::from_iter([(instance_id2, vec![imm2_1.batch_id()])]);
416 let imm_ids2_4_1 = HashMap::from_iter([(instance_id2, vec![imm2_4_1.batch_id()])]);
417 finish_tx2_1.send(()).unwrap();
418 let sst = sst_collector.next(&mut uploader).await;
419 assert_eq!(&imm_ids1_4, sst.imm_ids());
420
421 let (sync_tx4, mut sync_rx4) = oneshot::channel();
423 uploader.start_single_epoch_sync(
424 epoch4,
425 sync_tx4,
426 HashSet::from_iter([table_id1, table_id2]),
427 );
428 await_start2_4_2.await;
429
430 let sst = sst_collector.next(&mut uploader).await;
431 assert_eq!(&imm_ids2_1, sst.imm_ids());
432 let sst = sst_collector.next(&mut uploader).await;
433 assert_eq!(&imm_ids2_4_1, sst.imm_ids());
434 let synced_data = sync_rx1_2.await.unwrap().unwrap();
435 assert_eq!(synced_data.uploaded_ssts.len(), 1);
436 assert_eq!(&imm_ids2_1, synced_data.uploaded_ssts[0].imm_ids());
437 let synced_data = sync_rx2_2.await.unwrap().unwrap();
438 assert!(synced_data.uploaded_ssts.is_empty());
439 let synced_data = sync_rx3_2.await.unwrap().unwrap();
440 assert!(synced_data.uploaded_ssts.is_empty());
441
442 let imm_ids2_4_2 = HashMap::from_iter([(instance_id2, vec![imm2_4_2.batch_id()])]);
443
444 assert!((&mut sync_rx4).now_or_never().is_none());
445 finish_tx2_4_2.send(()).unwrap();
446 let sst = sst_collector.next(&mut uploader).await;
447 assert_eq!(&imm_ids2_4_2, sst.imm_ids());
448 let synced_data = sync_rx4.await.unwrap().unwrap();
449 assert_eq!(synced_data.uploaded_ssts.len(), 3);
450 assert_eq!(&imm_ids2_4_2, synced_data.uploaded_ssts[0].imm_ids());
451 assert_eq!(&imm_ids2_4_1, synced_data.uploaded_ssts[1].imm_ids());
452 assert_eq!(&imm_ids1_4, synced_data.uploaded_ssts[2].imm_ids());
453 }
454 }
455}