1use std::collections::{HashMap, HashSet};
16
17use bytes::Bytes;
18use risingwave_common::catalog::TableOption;
19use risingwave_common::util::epoch::Epoch;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
22use risingwave_hummock_sdk::level::{InputLevel, Levels};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24
25use super::CompactionInput;
26use crate::hummock::level_handler::LevelHandler;
27
28const MIN_TTL_EXPIRE_INTERVAL_MS: u64 = 60 * 60 * 1000; #[derive(Default)]
31pub struct TtlPickerState {
32 pub last_select_end_bound: KeyRange,
38
39 pub end_bound_in_round: KeyRange,
41}
42
43impl TtlPickerState {
44 pub fn valid(&self) -> bool {
45 !self.end_bound_in_round.right.is_empty()
46 }
47
48 pub fn init(&mut self, key_range: KeyRange) {
49 self.last_select_end_bound = KeyRange {
50 left: Bytes::default(),
51 right: key_range.left.clone(),
52 right_exclusive: true,
53 };
54 self.end_bound_in_round = key_range;
55 }
56
57 pub fn clear(&mut self) {
58 self.end_bound_in_round = KeyRange::default();
59 self.last_select_end_bound = KeyRange::default();
60 }
61}
62
63pub struct TtlReclaimCompactionPicker {
64 table_id_to_ttl: HashMap<u32, u32>,
65}
66
67impl TtlReclaimCompactionPicker {
68 pub fn new(table_id_to_options: &HashMap<StateTableId, TableOption>) -> Self {
69 let table_id_to_ttl: HashMap<u32, u32> = table_id_to_options
70 .iter()
71 .filter(|id_to_option| {
72 let table_option = id_to_option.1;
73 table_option.retention_seconds.is_some()
74 })
75 .map(|id_to_option| (*id_to_option.0, id_to_option.1.retention_seconds.unwrap()))
76 .collect();
77
78 Self { table_id_to_ttl }
79 }
80
81 fn filter(&self, sst: &SstableInfo, current_epoch_physical_time: u64) -> bool {
82 let table_id_in_sst = sst.table_ids.iter().cloned().collect::<HashSet<u32>>();
83 let expire_epoch =
84 Epoch::from_physical_time(current_epoch_physical_time - MIN_TTL_EXPIRE_INTERVAL_MS);
85
86 for table_id in table_id_in_sst {
87 match self.table_id_to_ttl.get(&table_id) {
88 Some(ttl_second_u32) => {
89 assert!(*ttl_second_u32 > 0);
90 let ttl_mill = *ttl_second_u32 as u64 * 1000;
92 let min_epoch = expire_epoch.subtract_ms(ttl_mill);
93 if Epoch(sst.min_epoch) < min_epoch {
94 return false;
95 }
96 }
97 None => continue,
98 }
99 }
100
101 true
102 }
103}
104
105impl TtlReclaimCompactionPicker {
106 pub fn pick_compaction(
107 &self,
108 levels: &Levels,
109 level_handlers: &[LevelHandler],
110 state: &mut TtlPickerState,
111 ) -> Option<CompactionInput> {
112 assert!(!levels.levels.is_empty());
113 let reclaimed_level = levels.levels.last().unwrap();
114 let mut select_input_ssts = vec![];
115 let level_handler = &level_handlers[reclaimed_level.level_idx as usize];
116
117 if reclaimed_level.table_infos.is_empty() {
118 state.clear();
120 return None;
121 }
122
123 if state.valid()
124 && state
125 .last_select_end_bound
126 .compare_right_with(&state.end_bound_in_round.right)
127 == std::cmp::Ordering::Greater
128 {
129 state.clear();
132 return None;
133 }
134
135 if !state.valid() {
136 let first_sst = reclaimed_level.table_infos.first().unwrap();
138 let last_sst = reclaimed_level.table_infos.last().unwrap();
139
140 let key_range_this_round = KeyRange {
141 left: first_sst.key_range.left.clone(),
142 right: last_sst.key_range.right.clone(),
143 right_exclusive: last_sst.key_range.right_exclusive,
144 };
145
146 state.init(key_range_this_round);
147 }
148
149 let current_epoch_physical_time = Epoch::now().physical_time();
150
151 for sst in &reclaimed_level.table_infos {
152 let unmatched_sst = sst.key_range.sstable_overlap(&state.last_select_end_bound);
153
154 if unmatched_sst
155 || level_handler.is_pending_compact(&sst.sst_id)
156 || self.filter(sst, current_epoch_physical_time)
157 {
158 continue;
159 }
160
161 select_input_ssts.push(sst.clone());
162 break;
163 }
164
165 if select_input_ssts.is_empty() {
167 state.clear();
168 return None;
169 }
170
171 let select_last_sst = select_input_ssts.last().unwrap();
172 state.last_select_end_bound.full_key_extend(&KeyRange {
173 left: Bytes::default(),
174 right: select_last_sst.key_range.right.clone(),
175 right_exclusive: select_last_sst.key_range.right_exclusive,
176 });
177
178 Some(CompactionInput {
179 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
180 total_file_count: select_input_ssts.len() as _,
181 input_levels: vec![
182 InputLevel {
183 level_idx: reclaimed_level.level_idx,
184 level_type: reclaimed_level.level_type,
185 table_infos: select_input_ssts,
186 },
187 InputLevel {
188 level_idx: reclaimed_level.level_idx,
189 level_type: reclaimed_level.level_type,
190 table_infos: vec![],
191 },
192 ],
193 target_level: reclaimed_level.level_idx as usize,
194 ..Default::default()
195 })
196 }
197}
198
199#[cfg(test)]
200mod test {
201 use std::collections::BTreeSet;
202 use std::sync::Arc;
203
204 use itertools::Itertools;
205 use risingwave_hummock_sdk::level::Level;
206 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
207 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
208 pub use risingwave_pb::hummock::LevelType;
209 use risingwave_pb::hummock::compact_task;
210
211 use super::*;
212 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
213 use crate::hummock::compaction::selector::tests::{
214 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
215 generate_table_with_ids_and_epochs,
216 };
217 use crate::hummock::compaction::selector::{CompactionSelector, TtlCompactionSelector};
218 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
219 use crate::hummock::model::CompactionGroup;
220 use crate::hummock::test_utils::compaction_selector_context;
221
222 #[test]
223 fn test_ttl_reclaim_compaction_selector() {
224 let config = CompactionConfigBuilder::new()
225 .max_level(4)
226 .max_space_reclaim_bytes(400)
227 .build();
228 let group_config = CompactionGroup::new(1, config);
229 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
230 assert_eq!(l0.sub_levels.len(), 0);
231
232 let current_epoch_time = Epoch::now().physical_time();
233 let expired_epoch = Epoch::from_physical_time(
234 current_epoch_time - MIN_TTL_EXPIRE_INTERVAL_MS - (1000 * 1000),
235 )
236 .0;
237 let mut levels = vec![
238 generate_level(1, vec![]),
239 generate_level(2, vec![]),
240 generate_level(
241 3,
242 vec![
243 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
244 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
245 ],
246 ),
247 Level {
248 level_idx: 4,
249 level_type: LevelType::Nonoverlapping,
250 table_infos: vec![
251 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], expired_epoch, 0),
252 generate_table_with_ids_and_epochs(
253 3,
254 1,
255 101,
256 200,
257 1,
258 vec![3],
259 expired_epoch,
260 0,
261 ),
262 generate_table_with_ids_and_epochs(
263 4,
264 1,
265 222,
266 300,
267 1,
268 vec![4],
269 expired_epoch,
270 u64::MAX,
271 ),
272 generate_table_with_ids_and_epochs(
273 5,
274 1,
275 333,
276 400,
277 1,
278 vec![5],
279 expired_epoch,
280 u64::MAX,
281 ),
282 generate_table_with_ids_and_epochs(
283 6,
284 1,
285 444,
286 500,
287 1,
288 vec![6],
289 expired_epoch,
290 u64::MAX,
291 ),
292 generate_table_with_ids_and_epochs(
293 7,
294 1,
295 555,
296 600,
297 1,
298 vec![7],
299 expired_epoch,
300 u64::MAX,
301 ),
302 generate_table_with_ids_and_epochs(
303 8,
304 1,
305 666,
306 700,
307 1,
308 vec![8],
309 expired_epoch,
310 u64::MAX,
311 ),
312 generate_table_with_ids_and_epochs(
313 9,
314 1,
315 777,
316 800,
317 1,
318 vec![9],
319 expired_epoch,
320 u64::MAX,
321 ),
322 generate_table_with_ids_and_epochs(
323 10,
324 1,
325 888,
326 1600,
327 1,
328 vec![10],
329 expired_epoch,
330 u64::MAX,
331 ),
332 generate_table_with_ids_and_epochs(
333 11,
334 1,
335 1600,
336 1800,
337 1,
338 vec![10],
339 expired_epoch,
340 u64::MAX,
341 ),
342 ],
343 total_file_size: 0,
344 sub_level_id: 0,
345 uncompressed_file_size: 0,
346 ..Default::default()
347 },
348 ];
349
350 {
351 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
352 assert_eq!(10, sst_10.sst_id);
353 *sst_10 = SstableInfoInner {
354 key_range: KeyRange {
355 right_exclusive: true,
356 ..sst_10.key_range.clone()
357 },
358 ..sst_10.get_inner()
359 }
360 .into();
361 }
362
363 assert_eq!(levels.len(), 4);
364 let levels = Levels {
365 levels,
366 l0,
367 ..Default::default()
368 };
369 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
370 let mut local_stats = LocalSelectorStatistic::default();
371 let mut selector = TtlCompactionSelector::default();
372 {
373 let table_id_to_options: HashMap<u32, TableOption> = (2..=10)
374 .map(|table_id| {
375 (
376 table_id as u32,
377 TableOption {
378 retention_seconds: Some(5_u32),
379 },
380 )
381 })
382 .collect();
383 let task = selector
385 .pick_compaction(
386 1,
387 compaction_selector_context(
388 &group_config,
389 &levels,
390 &BTreeSet::new(),
391 &mut levels_handler,
392 &mut local_stats,
393 &table_id_to_options,
394 Arc::new(CompactionDeveloperConfig::default()),
395 &Default::default(),
396 &HummockVersionStateTableInfo::empty(),
397 ),
398 )
399 .unwrap();
400 assert_compaction_task(&task, &levels_handler);
401 assert_eq!(task.input.input_levels.len(), 2);
402 assert_eq!(task.input.input_levels[0].level_idx, 4);
403 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
404
405 let mut start_id = 2;
406 for sst in &task.input.input_levels[0].table_infos {
407 assert_eq!(start_id, sst.sst_id);
408 start_id += 1;
409 }
410
411 assert_eq!(task.input.input_levels[1].level_idx, 4);
412 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
413 assert_eq!(task.input.target_level, 4);
414 assert!(matches!(
415 task.compaction_task_type,
416 compact_task::TaskType::Ttl
417 ));
418 }
419
420 {
421 for level_handler in &mut levels_handler {
422 for pending_task_id in &level_handler.pending_tasks_ids() {
423 level_handler.remove_task(*pending_task_id);
424 }
425 }
426
427 let table_id_to_options: HashMap<u32, TableOption> = (2..=10)
428 .map(|table_id| {
429 (
430 table_id as u32,
431 TableOption {
432 retention_seconds: Some(5_u32),
433 },
434 )
435 })
436 .collect();
437
438 let task = selector
440 .pick_compaction(
441 1,
442 compaction_selector_context(
443 &group_config,
444 &levels,
445 &BTreeSet::new(),
446 &mut levels_handler,
447 &mut local_stats,
448 &table_id_to_options,
449 Arc::new(CompactionDeveloperConfig::default()),
450 &Default::default(),
451 &HummockVersionStateTableInfo::empty(),
452 ),
453 )
454 .unwrap();
455 assert_compaction_task(&task, &levels_handler);
456 assert_eq!(task.input.input_levels.len(), 2);
457 assert_eq!(task.input.input_levels[0].level_idx, 4);
458
459 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
461
462 let mut start_id = 3;
463 for sst in &task.input.input_levels[0].table_infos {
464 assert_eq!(start_id, sst.sst_id);
465 start_id += 1;
466 }
467
468 assert_eq!(task.input.input_levels[1].level_idx, 4);
469 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
470 assert_eq!(task.input.target_level, 4);
471 assert!(matches!(
472 task.compaction_task_type,
473 compact_task::TaskType::Ttl
474 ));
475
476 let task = selector
478 .pick_compaction(
479 1,
480 compaction_selector_context(
481 &group_config,
482 &levels,
483 &BTreeSet::new(),
484 &mut levels_handler,
485 &mut local_stats,
486 &table_id_to_options,
487 Arc::new(CompactionDeveloperConfig::default()),
488 &Default::default(),
489 &HummockVersionStateTableInfo::empty(),
490 ),
491 )
492 .unwrap();
493 assert_compaction_task(&task, &levels_handler);
494 assert_eq!(task.input.input_levels.len(), 2);
495 assert_eq!(task.input.input_levels[0].level_idx, 4);
496 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
497 assert_eq!(task.input.input_levels[1].level_idx, 4);
498 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
499 assert_eq!(task.input.target_level, 4);
500 assert!(matches!(
501 task.compaction_task_type,
502 compact_task::TaskType::Ttl
503 ));
504 for sst in &task.input.input_levels[0].table_infos {
505 assert_eq!(start_id, sst.sst_id);
506 start_id += 1;
507 }
508 }
509
510 {
511 for level_handler in &mut levels_handler {
512 for pending_task_id in &level_handler.pending_tasks_ids() {
513 level_handler.remove_task(*pending_task_id);
514 }
515 }
516
517 selector = TtlCompactionSelector::default();
519 let mut table_id_to_options: HashMap<u32, TableOption> = (2..=10)
520 .map(|table_id| {
521 (
522 table_id as u32,
523 TableOption {
524 retention_seconds: Some(7200),
525 },
526 )
527 })
528 .collect();
529
530 table_id_to_options.insert(
531 5,
532 TableOption {
533 retention_seconds: Some(5),
534 },
535 );
536
537 let task = selector
539 .pick_compaction(
540 1,
541 compaction_selector_context(
542 &group_config,
543 &levels,
544 &BTreeSet::new(),
545 &mut levels_handler,
546 &mut local_stats,
547 &table_id_to_options,
548 Arc::new(CompactionDeveloperConfig::default()),
549 &Default::default(),
550 &HummockVersionStateTableInfo::empty(),
551 ),
552 )
553 .unwrap();
554 assert_compaction_task(&task, &levels_handler);
555 assert_eq!(task.input.input_levels.len(), 2);
556 assert_eq!(task.input.input_levels[0].level_idx, 4);
557
558 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
560 let select_sst = &task.input.input_levels[0].table_infos.first().unwrap();
561 assert_eq!(select_sst.sst_id, 5);
562
563 assert_eq!(task.input.input_levels[1].level_idx, 4);
564 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
565 assert_eq!(task.input.target_level, 4);
566 assert!(matches!(
567 task.compaction_task_type,
568 compact_task::TaskType::Ttl
569 ));
570 }
571
572 {
573 for level_handler in &mut levels_handler {
576 for pending_task_id in &level_handler.pending_tasks_ids() {
577 level_handler.remove_task(*pending_task_id);
578 }
579 }
580
581 selector = TtlCompactionSelector::default();
583
584 let task = selector.pick_compaction(
586 1,
587 compaction_selector_context(
588 &group_config,
589 &levels,
590 &BTreeSet::new(),
591 &mut levels_handler,
592 &mut local_stats,
593 &HashMap::default(),
594 Arc::new(CompactionDeveloperConfig::default()),
595 &Default::default(),
596 &HummockVersionStateTableInfo::empty(),
597 ),
598 );
599
600 assert!(task.is_none());
602 }
603
604 {
605 for level_handler in &mut levels_handler {
607 for pending_task_id in &level_handler.pending_tasks_ids() {
608 level_handler.remove_task(*pending_task_id);
609 }
610 }
611
612 selector = TtlCompactionSelector::default();
614 let mut table_id_to_options: HashMap<u32, TableOption> = (2..=10)
615 .map(|table_id| {
616 (
617 table_id as u32,
618 TableOption {
619 retention_seconds: Some(5_u32),
620 },
621 )
622 })
623 .collect();
624
625 table_id_to_options.insert(
627 5,
628 TableOption {
629 retention_seconds: Some(7200_u32),
630 },
631 );
632
633 table_id_to_options.insert(
634 8,
635 TableOption {
636 retention_seconds: Some(7200_u32),
637 },
638 );
639
640 table_id_to_options.insert(
641 9,
642 TableOption {
643 retention_seconds: Some(7200_u32),
644 },
645 );
646
647 let expect_task_file_count = [1, 1, 1];
648 let expect_task_sst_id_range = [vec![2], vec![3], vec![4]];
649 for (index, x) in expect_task_file_count.iter().enumerate() {
650 let task = selector
652 .pick_compaction(
653 1,
654 compaction_selector_context(
655 &group_config,
656 &levels,
657 &BTreeSet::new(),
658 &mut levels_handler,
659 &mut local_stats,
660 &table_id_to_options,
661 Arc::new(CompactionDeveloperConfig::default()),
662 &Default::default(),
663 &HummockVersionStateTableInfo::empty(),
664 ),
665 )
666 .unwrap();
667
668 assert_compaction_task(&task, &levels_handler);
669 assert_eq!(task.input.input_levels.len(), 2);
670 assert_eq!(task.input.input_levels[0].level_idx, 4);
671
672 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
674 let select_sst = &task.input.input_levels[0]
675 .table_infos
676 .iter()
677 .map(|sst| sst.sst_id)
678 .collect_vec();
679 assert!(select_sst.is_sorted());
680 assert_eq!(expect_task_sst_id_range[index], *select_sst);
681
682 assert_eq!(task.input.input_levels[1].level_idx, 4);
683 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
684 assert_eq!(task.input.target_level, 4);
685 assert!(matches!(
686 task.compaction_task_type,
687 compact_task::TaskType::Ttl
688 ));
689 }
690 }
691
692 {
693 for level_handler in &mut levels_handler {
695 for pending_task_id in &level_handler.pending_tasks_ids() {
696 level_handler.remove_task(*pending_task_id);
697 }
698 }
699
700 selector = TtlCompactionSelector::default();
702 let mut table_id_to_options: HashMap<u32, TableOption> = (2..=10)
703 .map(|table_id| {
704 (
705 table_id as u32,
706 TableOption {
707 retention_seconds: Some(5_u32),
708 },
709 )
710 })
711 .collect();
712
713 table_id_to_options.insert(
715 5,
716 TableOption {
717 retention_seconds: Some(7200_u32),
718 },
719 );
720
721 table_id_to_options.insert(
722 8,
723 TableOption {
724 retention_seconds: Some(7200_u32),
725 },
726 );
727
728 table_id_to_options.insert(
729 9,
730 TableOption {
731 retention_seconds: Some(7200_u32),
732 },
733 );
734
735 let expect_task_file_count = [1, 1];
736 let expect_task_sst_id_range = [vec![2], vec![3]];
737 for (index, x) in expect_task_file_count.iter().enumerate() {
738 if index == expect_task_file_count.len() - 1 {
739 table_id_to_options.insert(
740 5,
741 TableOption {
742 retention_seconds: Some(5_u32),
743 },
744 );
745 }
746
747 let task = selector
749 .pick_compaction(
750 1,
751 compaction_selector_context(
752 &group_config,
753 &levels,
754 &BTreeSet::new(),
755 &mut levels_handler,
756 &mut local_stats,
757 &table_id_to_options,
758 Arc::new(CompactionDeveloperConfig::default()),
759 &Default::default(),
760 &HummockVersionStateTableInfo::empty(),
761 ),
762 )
763 .unwrap();
764
765 assert_compaction_task(&task, &levels_handler);
766 assert_eq!(task.input.input_levels.len(), 2);
767 assert_eq!(task.input.input_levels[0].level_idx, 4);
768
769 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
771 let select_sst = &task.input.input_levels[0]
772 .table_infos
773 .iter()
774 .map(|sst| sst.sst_id)
775 .collect_vec();
776 assert!(select_sst.is_sorted());
777 assert_eq!(expect_task_sst_id_range[index], *select_sst);
778
779 assert_eq!(task.input.input_levels[1].level_idx, 4);
780 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
781 assert_eq!(task.input.target_level, 4);
782 assert!(matches!(
783 task.compaction_task_type,
784 compact_task::TaskType::Ttl
785 ));
786 }
787 }
788 }
789}