1use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockEpoch;
20
21use crate::hummock::event_handler::LocalInstanceId;
22use crate::hummock::event_handler::uploader::{
23 LocalInstanceUnsyncData, UnsyncData, UnsyncEpochId, UploadTaskInput,
24};
25
26#[derive(Default)]
27struct EpochSpillableDataInfo {
28 instance_ids: HashSet<LocalInstanceId>,
29 payload_size: usize,
30}
31
32pub(super) struct Spiller<'a> {
33 unsync_data: &'a mut UnsyncData,
34 epoch_info: HashMap<UnsyncEpochId, EpochSpillableDataInfo>,
35 unsync_epoch_id_map: HashMap<(HummockEpoch, TableId), UnsyncEpochId>,
36}
37
38impl<'a> Spiller<'a> {
39 pub(super) fn new(unsync_data: &'a mut UnsyncData) -> Self {
40 let unsync_epoch_id_map: HashMap<_, _> = unsync_data
41 .unsync_epochs
42 .iter()
43 .flat_map(|(unsync_epoch_id, table_ids)| {
44 let epoch = unsync_epoch_id.epoch();
45 let unsync_epoch_id = *unsync_epoch_id;
46 table_ids
47 .iter()
48 .map(move |table_id| ((epoch, *table_id), unsync_epoch_id))
49 })
50 .collect();
51 let mut epoch_info: HashMap<_, EpochSpillableDataInfo> = HashMap::new();
52 for instance_data in unsync_data
53 .table_data
54 .values()
55 .flat_map(|table_data| table_data.instance_data.values())
56 {
57 if let Some((epoch, spill_size)) = instance_data.spillable_data_info() {
58 let unsync_epoch_id = unsync_epoch_id_map
59 .get(&(epoch, instance_data.table_id))
60 .expect("should exist");
61 let epoch_info = epoch_info.entry(*unsync_epoch_id).or_default();
62 assert!(epoch_info.instance_ids.insert(instance_data.instance_id));
63 epoch_info.payload_size += spill_size;
64 }
65 }
66 Self {
67 unsync_data,
68 epoch_info,
69 unsync_epoch_id_map,
70 }
71 }
72
73 pub(super) fn next_spilled_payload(
74 &mut self,
75 ) -> Option<(HummockEpoch, UploadTaskInput, HashSet<TableId>)> {
76 if let Some(unsync_epoch_id) = self
77 .epoch_info
78 .iter()
79 .max_by(
80 |(UnsyncEpochId(_, table1), info1), (UnsyncEpochId(_, table2), info2)| {
81 info1.payload_size.cmp(&info2.payload_size).then_with(|| {
82 if !cfg!(test) {
83 Ordering::Equal
84 } else {
85 assert_ne!(table1, table2);
86 table2.cmp(table1)
89 }
90 })
91 },
92 )
93 .map(|(unsync_epoch_id, _)| *unsync_epoch_id)
94 {
95 let spill_epoch = unsync_epoch_id.epoch();
96 let spill_info = self
97 .epoch_info
98 .remove(&unsync_epoch_id)
99 .expect("should exist");
100 let epoch_info = &mut self.epoch_info;
101 let mut payload = HashMap::new();
102 let mut spilled_table_ids = HashSet::new();
103 for instance_id in spill_info.instance_ids {
104 let table_id = *self
105 .unsync_data
106 .instance_table_id
107 .get(&instance_id)
108 .expect("should exist");
109 let instance_data = self
110 .unsync_data
111 .table_data
112 .get_mut(&table_id)
113 .expect("should exist")
114 .instance_data
115 .get_mut(&instance_id)
116 .expect("should exist");
117 let instance_payload = instance_data.spill(spill_epoch);
118 assert!(!instance_payload.is_empty());
119 payload.insert(instance_id, instance_payload);
120 spilled_table_ids.insert(table_id);
121
122 if let Some((new_spill_epoch, size)) = instance_data.spillable_data_info() {
124 let new_unsync_epoch_id = self
125 .unsync_epoch_id_map
126 .get(&(new_spill_epoch, instance_data.table_id))
127 .expect("should exist");
128 let info = epoch_info.entry(*new_unsync_epoch_id).or_default();
129 assert!(info.instance_ids.insert(instance_id));
130 info.payload_size += size;
131 }
132 }
133 Some((spill_epoch, payload, spilled_table_ids))
134 } else {
135 None
136 }
137 }
138
139 pub(super) fn unsync_data(&mut self) -> &mut UnsyncData {
140 self.unsync_data
141 }
142}
143
144impl LocalInstanceUnsyncData {
145 fn spillable_data_info(&self) -> Option<(HummockEpoch, usize)> {
146 self.sealed_data
147 .back()
148 .or(self.current_epoch_data.as_ref())
149 .and_then(|epoch_data| {
150 if !epoch_data.is_empty() {
151 Some((
152 epoch_data.epoch,
153 epoch_data.imms.iter().map(|imm| imm.size()).sum(),
154 ))
155 } else {
156 None
157 }
158 })
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::collections::{HashMap, HashSet};
165 use std::ops::Deref;
166
167 use futures::FutureExt;
168 use futures::future::join_all;
169 use itertools::Itertools;
170 use risingwave_common::catalog::TableId;
171 use risingwave_common::util::epoch::EpochExt;
172 use tokio::sync::oneshot;
173
174 use crate::hummock::event_handler::uploader::test_utils::*;
175 use crate::opts::StorageOpts;
176 use crate::store::SealCurrentEpochOptions;
177
178 #[tokio::test]
179 async fn test_spill_in_order() {
180 let config = StorageOpts {
181 shared_buffer_capacity_mb: 1024 * 1024,
182 shared_buffer_flush_ratio: 0.0,
183 ..Default::default()
184 };
185 let (buffer_tracker, mut uploader, new_task_notifier) =
186 prepare_uploader_order_test(&config, false);
187
188 let table_id1 = TableId::new(1);
189 let table_id2 = TableId::new(2);
190
191 let instance_id1_1 = 1;
192 let instance_id1_2 = 2;
193 let instance_id2 = 3;
194
195 let epoch1 = INITIAL_EPOCH.next_epoch();
196 let epoch2 = epoch1.next_epoch();
197 let epoch3 = epoch2.next_epoch();
198 let epoch4 = epoch3.next_epoch();
199 let memory_limiter = buffer_tracker.get_memory_limiter().clone();
200 let memory_limiter = Some(memory_limiter.deref());
201
202 uploader.start_epoch(epoch1, HashSet::from_iter([table_id1]));
204 uploader.start_epoch(epoch1, HashSet::from_iter([table_id2]));
205
206 uploader.init_instance(instance_id1_1, table_id1, epoch1);
207 uploader.init_instance(instance_id1_2, table_id1, epoch1);
208 uploader.init_instance(instance_id2, table_id2, epoch1);
209
210 let imm1_1_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await;
212 uploader.add_imm(instance_id1_1, imm1_1_1.clone());
213 let imm1_2_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await;
214 uploader.add_imm(instance_id1_2, imm1_2_1.clone());
215 let imm2_1 = gen_imm_inner(table_id2, epoch1, 0, memory_limiter).await;
216 uploader.add_imm(instance_id2, imm2_1.clone());
217
218 uploader.start_epoch(epoch2, HashSet::from_iter([table_id1]));
220 uploader.start_epoch(epoch2, HashSet::from_iter([table_id2]));
221
222 uploader.local_seal_epoch(instance_id1_1, epoch2, SealCurrentEpochOptions::for_test());
223 uploader.local_seal_epoch(instance_id1_2, epoch2, SealCurrentEpochOptions::for_test());
224 uploader.local_seal_epoch(instance_id2, epoch2, SealCurrentEpochOptions::for_test());
225
226 let imms1_1_2 = join_all(
227 [0, 1, 2].map(|offset| gen_imm_inner(table_id1, epoch2, offset, memory_limiter)),
228 )
229 .await;
230 for imm in imms1_1_2.clone() {
231 uploader.add_imm(instance_id1_1, imm);
232 }
233
234 uploader.start_epoch(epoch3, HashSet::from_iter([table_id1]));
236 uploader.start_epoch(epoch3, HashSet::from_iter([table_id2]));
237
238 uploader.local_seal_epoch(instance_id1_1, epoch3, SealCurrentEpochOptions::for_test());
239 uploader.local_seal_epoch(instance_id1_2, epoch3, SealCurrentEpochOptions::for_test());
240 uploader.local_seal_epoch(instance_id2, epoch3, SealCurrentEpochOptions::for_test());
241
242 let imms1_2_3 = join_all(
243 [0, 1, 2, 3].map(|offset| gen_imm_inner(table_id1, epoch3, offset, memory_limiter)),
244 )
245 .await;
246 for imm in imms1_2_3.clone() {
247 uploader.add_imm(instance_id1_2, imm);
248 }
249
250 uploader.start_epoch(epoch4, HashSet::from_iter([table_id1, table_id2]));
252
253 uploader.local_seal_epoch(instance_id1_1, epoch4, SealCurrentEpochOptions::for_test());
254 uploader.local_seal_epoch(instance_id1_2, epoch4, SealCurrentEpochOptions::for_test());
255 uploader.local_seal_epoch(instance_id2, epoch4, SealCurrentEpochOptions::for_test());
256
257 let imm1_1_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await;
258 uploader.add_imm(instance_id1_1, imm1_1_4.clone());
259 let imm1_2_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await;
260 uploader.add_imm(instance_id1_2, imm1_2_4.clone());
261 let imm2_4_1 = gen_imm_inner(table_id2, epoch4, 0, memory_limiter).await;
262 uploader.add_imm(instance_id2, imm2_4_1.clone());
263
264 let (await_start1_1, finish_tx1_1) = new_task_notifier(HashMap::from_iter([
273 (instance_id1_1, vec![imm1_1_1.batch_id()]),
274 (instance_id1_2, vec![imm1_2_1.batch_id()]),
275 ]));
276 let (await_start3, finish_tx3) = new_task_notifier(HashMap::from_iter([(
277 instance_id1_2,
278 imms1_2_3
279 .iter()
280 .rev()
281 .map(|imm| imm.batch_id())
282 .collect_vec(),
283 )]));
284 let (await_start2, finish_tx2) = new_task_notifier(HashMap::from_iter([(
285 instance_id1_1,
286 imms1_1_2
287 .iter()
288 .rev()
289 .map(|imm| imm.batch_id())
290 .collect_vec(),
291 )]));
292 let (await_start1_4, finish_tx1_4) = new_task_notifier(HashMap::from_iter([
293 (instance_id1_1, vec![imm1_1_4.batch_id()]),
294 (instance_id1_2, vec![imm1_2_4.batch_id()]),
295 ]));
296 let (await_start2_1, finish_tx2_1) = new_task_notifier(HashMap::from_iter([(
297 instance_id2,
298 vec![imm2_1.batch_id()],
299 )]));
300 let (await_start2_4_1, finish_tx2_4_1) = new_task_notifier(HashMap::from_iter([(
301 instance_id2,
302 vec![imm2_4_1.batch_id()],
303 )]));
304
305 uploader.may_flush();
306 await_start1_1.await;
307 await_start3.await;
308 await_start2.await;
309 await_start1_4.await;
310 await_start2_1.await;
311 await_start2_4_1.await;
312
313 assert_uploader_pending(&mut uploader).await;
314
315 let imm2_4_2 = gen_imm_inner(table_id2, epoch4, 1, memory_limiter).await;
316 uploader.add_imm(instance_id2, imm2_4_2.clone());
317
318 uploader.local_seal_epoch(
319 instance_id1_1,
320 u64::MAX,
321 SealCurrentEpochOptions::for_test(),
322 );
323 uploader.local_seal_epoch(
324 instance_id1_2,
325 u64::MAX,
326 SealCurrentEpochOptions::for_test(),
327 );
328 uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test());
329
330 let (sync_tx1_1, sync_rx1_1) = oneshot::channel();
339 uploader.start_single_epoch_sync(epoch1, sync_tx1_1, HashSet::from_iter([table_id1]));
340 let (sync_tx2_1, sync_rx2_1) = oneshot::channel();
341 uploader.start_single_epoch_sync(epoch2, sync_tx2_1, HashSet::from_iter([table_id1]));
342 let (sync_tx3_1, sync_rx3_1) = oneshot::channel();
343 uploader.start_single_epoch_sync(epoch3, sync_tx3_1, HashSet::from_iter([table_id1]));
344 let (sync_tx1_2, sync_rx1_2) = oneshot::channel();
345 uploader.start_single_epoch_sync(epoch1, sync_tx1_2, HashSet::from_iter([table_id2]));
346 let (sync_tx2_2, sync_rx2_2) = oneshot::channel();
347 uploader.start_single_epoch_sync(epoch2, sync_tx2_2, HashSet::from_iter([table_id2]));
348 let (sync_tx3_2, sync_rx3_2) = oneshot::channel();
349 uploader.start_single_epoch_sync(epoch3, sync_tx3_2, HashSet::from_iter([table_id2]));
350
351 let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([(
352 instance_id2,
353 vec![imm2_4_2.batch_id()],
354 )]));
355
356 finish_tx2_4_1.send(()).unwrap();
357 finish_tx3.send(()).unwrap();
358 finish_tx2.send(()).unwrap();
359 finish_tx1_4.send(()).unwrap();
360 assert_uploader_pending(&mut uploader).await;
361
362 finish_tx1_1.send(()).unwrap();
363 {
364 let imm_ids = HashMap::from_iter([
365 (instance_id1_1, vec![imm1_1_1.batch_id()]),
366 (instance_id1_2, vec![imm1_2_1.batch_id()]),
367 ]);
368 let sst = uploader.next_uploaded_sst().await;
369 assert_eq!(&imm_ids, sst.imm_ids());
370 let synced_data = sync_rx1_1.await.unwrap().unwrap();
371 assert_eq!(synced_data.uploaded_ssts.len(), 1);
372 assert_eq!(&imm_ids, synced_data.uploaded_ssts[0].imm_ids());
373 }
374 {
375 let imm_ids3 = HashMap::from_iter([(
376 instance_id1_2,
377 imms1_2_3
378 .iter()
379 .rev()
380 .map(|imm| imm.batch_id())
381 .collect_vec(),
382 )]);
383 let imm_ids2 = HashMap::from_iter([(
384 instance_id1_1,
385 imms1_1_2
386 .iter()
387 .rev()
388 .map(|imm| imm.batch_id())
389 .collect_vec(),
390 )]);
391 let sst = uploader.next_uploaded_sst().await;
392 assert_eq!(&imm_ids3, sst.imm_ids());
393 let sst = uploader.next_uploaded_sst().await;
394 assert_eq!(&imm_ids2, sst.imm_ids());
395 let synced_data = sync_rx2_1.await.unwrap().unwrap();
396 assert_eq!(synced_data.uploaded_ssts.len(), 1);
397 assert_eq!(&imm_ids2, synced_data.uploaded_ssts[0].imm_ids());
398 let synced_data = sync_rx3_1.await.unwrap().unwrap();
399 assert_eq!(synced_data.uploaded_ssts.len(), 1);
400 assert_eq!(&imm_ids3, synced_data.uploaded_ssts[0].imm_ids());
401 }
402 {
403 let imm_ids1_4 = HashMap::from_iter([
404 (instance_id1_1, vec![imm1_1_4.batch_id()]),
405 (instance_id1_2, vec![imm1_2_4.batch_id()]),
406 ]);
407 let imm_ids2_1 = HashMap::from_iter([(instance_id2, vec![imm2_1.batch_id()])]);
408 let imm_ids2_4_1 = HashMap::from_iter([(instance_id2, vec![imm2_4_1.batch_id()])]);
409 finish_tx2_1.send(()).unwrap();
410 let sst = uploader.next_uploaded_sst().await;
411 assert_eq!(&imm_ids1_4, sst.imm_ids());
412
413 let (sync_tx4, mut sync_rx4) = oneshot::channel();
415 uploader.start_single_epoch_sync(
416 epoch4,
417 sync_tx4,
418 HashSet::from_iter([table_id1, table_id2]),
419 );
420 await_start2_4_2.await;
421
422 let sst = uploader.next_uploaded_sst().await;
423 assert_eq!(&imm_ids2_1, sst.imm_ids());
424 let sst = uploader.next_uploaded_sst().await;
425 assert_eq!(&imm_ids2_4_1, sst.imm_ids());
426 let synced_data = sync_rx1_2.await.unwrap().unwrap();
427 assert_eq!(synced_data.uploaded_ssts.len(), 1);
428 assert_eq!(&imm_ids2_1, synced_data.uploaded_ssts[0].imm_ids());
429 let synced_data = sync_rx2_2.await.unwrap().unwrap();
430 assert!(synced_data.uploaded_ssts.is_empty());
431 let synced_data = sync_rx3_2.await.unwrap().unwrap();
432 assert!(synced_data.uploaded_ssts.is_empty());
433
434 let imm_ids2_4_2 = HashMap::from_iter([(instance_id2, vec![imm2_4_2.batch_id()])]);
435
436 assert!((&mut sync_rx4).now_or_never().is_none());
437 finish_tx2_4_2.send(()).unwrap();
438 let sst = uploader.next_uploaded_sst().await;
439 assert_eq!(&imm_ids2_4_2, sst.imm_ids());
440 let synced_data = sync_rx4.await.unwrap().unwrap();
441 assert_eq!(synced_data.uploaded_ssts.len(), 3);
442 assert_eq!(&imm_ids2_4_2, synced_data.uploaded_ssts[0].imm_ids());
443 assert_eq!(&imm_ids2_4_1, synced_data.uploaded_ssts[1].imm_ids());
444 assert_eq!(&imm_ids1_4, synced_data.uploaded_ssts[2].imm_ids());
445 }
446 }
447}