1use std::collections::{HashMap, HashSet};
16
17use bytes::Bytes;
18use risingwave_common::catalog::{TableId, TableOption};
19use risingwave_common::util::epoch::Epoch;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
22use risingwave_hummock_sdk::level::{InputLevel, Levels};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24
25use super::CompactionInput;
26use crate::hummock::level_handler::LevelHandler;
27
28const MIN_TTL_EXPIRE_INTERVAL_MS: u64 = 60 * 60 * 1000; #[derive(Default)]
31pub struct TtlPickerState {
32 pub last_select_end_bound: KeyRange,
38
39 pub end_bound_in_round: KeyRange,
41}
42
43impl TtlPickerState {
44 pub fn valid(&self) -> bool {
45 !self.end_bound_in_round.right.is_empty()
46 }
47
48 pub fn init(&mut self, key_range: KeyRange) {
49 self.last_select_end_bound = KeyRange {
50 left: Bytes::default(),
51 right: key_range.left.clone(),
52 right_exclusive: true,
53 };
54 self.end_bound_in_round = key_range;
55 }
56
57 pub fn clear(&mut self) {
58 self.end_bound_in_round = KeyRange::default();
59 self.last_select_end_bound = KeyRange::default();
60 }
61}
62
63pub struct TtlReclaimCompactionPicker {
64 table_id_to_ttl: HashMap<TableId, u32>,
65}
66
67impl TtlReclaimCompactionPicker {
68 pub fn new(table_id_to_options: &HashMap<StateTableId, TableOption>) -> Self {
69 let table_id_to_ttl: HashMap<TableId, u32> = table_id_to_options
70 .iter()
71 .filter(|id_to_option| {
72 let table_option = id_to_option.1;
73 table_option.retention_seconds.is_some()
74 })
75 .map(|id_to_option| (*id_to_option.0, id_to_option.1.retention_seconds.unwrap()))
76 .collect();
77
78 Self { table_id_to_ttl }
79 }
80
81 fn filter(&self, sst: &SstableInfo, current_epoch_physical_time: u64) -> bool {
82 let table_id_in_sst = sst.table_ids.iter().cloned().collect::<HashSet<_>>();
83 let expire_epoch =
84 Epoch::from_physical_time(current_epoch_physical_time - MIN_TTL_EXPIRE_INTERVAL_MS);
85
86 for table_id in table_id_in_sst {
87 match self.table_id_to_ttl.get(&table_id) {
88 Some(ttl_second_u32) => {
89 assert!(*ttl_second_u32 > 0);
90 let ttl_mill = *ttl_second_u32 as u64 * 1000;
92 let min_epoch = expire_epoch.subtract_ms(ttl_mill);
93 if Epoch(sst.min_epoch) < min_epoch {
94 return false;
95 }
96 }
97 None => continue,
98 }
99 }
100
101 true
102 }
103}
104
105impl TtlReclaimCompactionPicker {
106 pub fn pick_compaction(
107 &self,
108 levels: &Levels,
109 level_handlers: &[LevelHandler],
110 state: &mut TtlPickerState,
111 ) -> Option<CompactionInput> {
112 assert!(!levels.levels.is_empty());
113 let reclaimed_level = levels.levels.last().unwrap();
114 let mut select_input_ssts = vec![];
115 let level_handler = &level_handlers[reclaimed_level.level_idx as usize];
116
117 if reclaimed_level.table_infos.is_empty() {
118 state.clear();
120 return None;
121 }
122
123 if state.valid()
124 && state
125 .last_select_end_bound
126 .compare_right_with(&state.end_bound_in_round.right)
127 == std::cmp::Ordering::Greater
128 {
129 state.clear();
132 return None;
133 }
134
135 if !state.valid() {
136 let first_sst = reclaimed_level.table_infos.first().unwrap();
138 let last_sst = reclaimed_level.table_infos.last().unwrap();
139
140 let key_range_this_round = KeyRange {
141 left: first_sst.key_range.left.clone(),
142 right: last_sst.key_range.right.clone(),
143 right_exclusive: last_sst.key_range.right_exclusive,
144 };
145
146 state.init(key_range_this_round);
147 }
148
149 let current_epoch_physical_time = Epoch::now().physical_time();
150
151 for sst in &reclaimed_level.table_infos {
152 let unmatched_sst = sst.key_range.sstable_overlap(&state.last_select_end_bound);
153
154 if unmatched_sst
155 || level_handler.is_pending_compact(&sst.sst_id)
156 || self.filter(sst, current_epoch_physical_time)
157 {
158 continue;
159 }
160
161 select_input_ssts.push(sst.clone());
162 break;
163 }
164
165 if select_input_ssts.is_empty() {
167 state.clear();
168 return None;
169 }
170
171 let select_last_sst = select_input_ssts.last().unwrap();
172 state.last_select_end_bound.full_key_extend(&KeyRange {
173 left: Bytes::default(),
174 right: select_last_sst.key_range.right.clone(),
175 right_exclusive: select_last_sst.key_range.right_exclusive,
176 });
177
178 Some(CompactionInput {
179 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
180 total_file_count: select_input_ssts.len() as _,
181 input_levels: vec![
182 InputLevel {
183 level_idx: reclaimed_level.level_idx,
184 level_type: reclaimed_level.level_type,
185 table_infos: select_input_ssts,
186 },
187 InputLevel {
188 level_idx: reclaimed_level.level_idx,
189 level_type: reclaimed_level.level_type,
190 table_infos: vec![],
191 },
192 ],
193 target_level: reclaimed_level.level_idx as usize,
194 ..Default::default()
195 })
196 }
197}
198
199#[cfg(test)]
200mod test {
201 use std::collections::BTreeSet;
202 use std::sync::Arc;
203
204 use itertools::Itertools;
205 use risingwave_common::catalog::TableId;
206 use risingwave_hummock_sdk::level::Level;
207 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
208 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
209 pub use risingwave_pb::hummock::LevelType;
210 use risingwave_pb::hummock::compact_task;
211
212 use super::*;
213 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
214 use crate::hummock::compaction::selector::tests::{
215 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
216 generate_table_with_ids_and_epochs,
217 };
218 use crate::hummock::compaction::selector::{CompactionSelector, TtlCompactionSelector};
219 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
220 use crate::hummock::model::CompactionGroup;
221 use crate::hummock::test_utils::compaction_selector_context;
222
223 #[test]
224 fn test_ttl_reclaim_compaction_selector() {
225 let config = CompactionConfigBuilder::new()
226 .max_level(4)
227 .max_space_reclaim_bytes(400)
228 .build();
229 let group_config = CompactionGroup::new(1, config);
230 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
231 assert_eq!(l0.sub_levels.len(), 0);
232
233 let current_epoch_time = Epoch::now().physical_time();
234 let expired_epoch = Epoch::from_physical_time(
235 current_epoch_time - MIN_TTL_EXPIRE_INTERVAL_MS - (1000 * 1000),
236 )
237 .0;
238 let mut levels = vec![
239 generate_level(1, vec![]),
240 generate_level(2, vec![]),
241 generate_level(
242 3,
243 vec![
244 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
245 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
246 ],
247 ),
248 Level {
249 level_idx: 4,
250 level_type: LevelType::Nonoverlapping,
251 table_infos: vec![
252 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], expired_epoch, 0),
253 generate_table_with_ids_and_epochs(
254 3,
255 1,
256 101,
257 200,
258 1,
259 vec![3],
260 expired_epoch,
261 0,
262 ),
263 generate_table_with_ids_and_epochs(
264 4,
265 1,
266 222,
267 300,
268 1,
269 vec![4],
270 expired_epoch,
271 u64::MAX,
272 ),
273 generate_table_with_ids_and_epochs(
274 5,
275 1,
276 333,
277 400,
278 1,
279 vec![5],
280 expired_epoch,
281 u64::MAX,
282 ),
283 generate_table_with_ids_and_epochs(
284 6,
285 1,
286 444,
287 500,
288 1,
289 vec![6],
290 expired_epoch,
291 u64::MAX,
292 ),
293 generate_table_with_ids_and_epochs(
294 7,
295 1,
296 555,
297 600,
298 1,
299 vec![7],
300 expired_epoch,
301 u64::MAX,
302 ),
303 generate_table_with_ids_and_epochs(
304 8,
305 1,
306 666,
307 700,
308 1,
309 vec![8],
310 expired_epoch,
311 u64::MAX,
312 ),
313 generate_table_with_ids_and_epochs(
314 9,
315 1,
316 777,
317 800,
318 1,
319 vec![9],
320 expired_epoch,
321 u64::MAX,
322 ),
323 generate_table_with_ids_and_epochs(
324 10,
325 1,
326 888,
327 1600,
328 1,
329 vec![10],
330 expired_epoch,
331 u64::MAX,
332 ),
333 generate_table_with_ids_and_epochs(
334 11,
335 1,
336 1600,
337 1800,
338 1,
339 vec![10],
340 expired_epoch,
341 u64::MAX,
342 ),
343 ],
344 total_file_size: 0,
345 sub_level_id: 0,
346 uncompressed_file_size: 0,
347 ..Default::default()
348 },
349 ];
350
351 {
352 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
353 assert_eq!(10, sst_10.sst_id);
354 *sst_10 = SstableInfoInner {
355 key_range: KeyRange {
356 right_exclusive: true,
357 ..sst_10.key_range.clone()
358 },
359 ..sst_10.get_inner()
360 }
361 .into();
362 }
363
364 assert_eq!(levels.len(), 4);
365 let levels = Levels {
366 levels,
367 l0,
368 ..Default::default()
369 };
370 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
371 let mut local_stats = LocalSelectorStatistic::default();
372 let mut selector = TtlCompactionSelector::default();
373 {
374 let table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
375 .map(|table_id| {
376 (
377 table_id.into(),
378 TableOption {
379 retention_seconds: Some(5_u32),
380 },
381 )
382 })
383 .collect();
384 let task = selector
386 .pick_compaction(
387 1,
388 compaction_selector_context(
389 &group_config,
390 &levels,
391 &BTreeSet::new(),
392 &mut levels_handler,
393 &mut local_stats,
394 &table_id_to_options,
395 Arc::new(CompactionDeveloperConfig::default()),
396 &Default::default(),
397 &HummockVersionStateTableInfo::empty(),
398 ),
399 )
400 .unwrap();
401 assert_compaction_task(&task, &levels_handler);
402 assert_eq!(task.input.input_levels.len(), 2);
403 assert_eq!(task.input.input_levels[0].level_idx, 4);
404 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
405
406 for (i, sst) in task.input.input_levels[0].table_infos.iter().enumerate() {
407 assert_eq!(2 + i as u64, sst.sst_id);
408 }
409
410 assert_eq!(task.input.input_levels[1].level_idx, 4);
411 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
412 assert_eq!(task.input.target_level, 4);
413 assert!(matches!(
414 task.compaction_task_type,
415 compact_task::TaskType::Ttl
416 ));
417 }
418
419 {
420 for level_handler in &mut levels_handler {
421 for pending_task_id in &level_handler.pending_tasks_ids() {
422 level_handler.remove_task(*pending_task_id);
423 }
424 }
425
426 let table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
427 .map(|table_id| {
428 (
429 table_id.into(),
430 TableOption {
431 retention_seconds: Some(5_u32),
432 },
433 )
434 })
435 .collect();
436
437 let task = selector
439 .pick_compaction(
440 1,
441 compaction_selector_context(
442 &group_config,
443 &levels,
444 &BTreeSet::new(),
445 &mut levels_handler,
446 &mut local_stats,
447 &table_id_to_options,
448 Arc::new(CompactionDeveloperConfig::default()),
449 &Default::default(),
450 &HummockVersionStateTableInfo::empty(),
451 ),
452 )
453 .unwrap();
454 assert_compaction_task(&task, &levels_handler);
455 assert_eq!(task.input.input_levels.len(), 2);
456 assert_eq!(task.input.input_levels[0].level_idx, 4);
457
458 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
460
461 let mut start_id = 3;
462 for sst in &task.input.input_levels[0].table_infos {
463 assert_eq!(start_id, sst.sst_id);
464 start_id += 1;
465 }
466
467 assert_eq!(task.input.input_levels[1].level_idx, 4);
468 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
469 assert_eq!(task.input.target_level, 4);
470 assert!(matches!(
471 task.compaction_task_type,
472 compact_task::TaskType::Ttl
473 ));
474
475 let task = selector
477 .pick_compaction(
478 1,
479 compaction_selector_context(
480 &group_config,
481 &levels,
482 &BTreeSet::new(),
483 &mut levels_handler,
484 &mut local_stats,
485 &table_id_to_options,
486 Arc::new(CompactionDeveloperConfig::default()),
487 &Default::default(),
488 &HummockVersionStateTableInfo::empty(),
489 ),
490 )
491 .unwrap();
492 assert_compaction_task(&task, &levels_handler);
493 assert_eq!(task.input.input_levels.len(), 2);
494 assert_eq!(task.input.input_levels[0].level_idx, 4);
495 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
496 assert_eq!(task.input.input_levels[1].level_idx, 4);
497 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
498 assert_eq!(task.input.target_level, 4);
499 assert!(matches!(
500 task.compaction_task_type,
501 compact_task::TaskType::Ttl
502 ));
503 for sst in &task.input.input_levels[0].table_infos {
504 assert_eq!(start_id, sst.sst_id);
505 start_id += 1;
506 }
507 }
508
509 {
510 for level_handler in &mut levels_handler {
511 for pending_task_id in &level_handler.pending_tasks_ids() {
512 level_handler.remove_task(*pending_task_id);
513 }
514 }
515
516 selector = TtlCompactionSelector::default();
518 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
519 .map(|table_id| {
520 (
521 table_id.into(),
522 TableOption {
523 retention_seconds: Some(7200),
524 },
525 )
526 })
527 .collect();
528
529 table_id_to_options.insert(
530 5.into(),
531 TableOption {
532 retention_seconds: Some(5),
533 },
534 );
535
536 let task = selector
538 .pick_compaction(
539 1,
540 compaction_selector_context(
541 &group_config,
542 &levels,
543 &BTreeSet::new(),
544 &mut levels_handler,
545 &mut local_stats,
546 &table_id_to_options,
547 Arc::new(CompactionDeveloperConfig::default()),
548 &Default::default(),
549 &HummockVersionStateTableInfo::empty(),
550 ),
551 )
552 .unwrap();
553 assert_compaction_task(&task, &levels_handler);
554 assert_eq!(task.input.input_levels.len(), 2);
555 assert_eq!(task.input.input_levels[0].level_idx, 4);
556
557 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
559 let select_sst = &task.input.input_levels[0].table_infos.first().unwrap();
560 assert_eq!(select_sst.sst_id, 5);
561
562 assert_eq!(task.input.input_levels[1].level_idx, 4);
563 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
564 assert_eq!(task.input.target_level, 4);
565 assert!(matches!(
566 task.compaction_task_type,
567 compact_task::TaskType::Ttl
568 ));
569 }
570
571 {
572 for level_handler in &mut levels_handler {
575 for pending_task_id in &level_handler.pending_tasks_ids() {
576 level_handler.remove_task(*pending_task_id);
577 }
578 }
579
580 selector = TtlCompactionSelector::default();
582
583 let task = selector.pick_compaction(
585 1,
586 compaction_selector_context(
587 &group_config,
588 &levels,
589 &BTreeSet::new(),
590 &mut levels_handler,
591 &mut local_stats,
592 &HashMap::default(),
593 Arc::new(CompactionDeveloperConfig::default()),
594 &Default::default(),
595 &HummockVersionStateTableInfo::empty(),
596 ),
597 );
598
599 assert!(task.is_none());
601 }
602
603 {
604 for level_handler in &mut levels_handler {
606 for pending_task_id in &level_handler.pending_tasks_ids() {
607 level_handler.remove_task(*pending_task_id);
608 }
609 }
610
611 selector = TtlCompactionSelector::default();
613 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
614 .map(|table_id| {
615 (
616 table_id.into(),
617 TableOption {
618 retention_seconds: Some(5_u32),
619 },
620 )
621 })
622 .collect();
623
624 table_id_to_options.insert(
626 5.into(),
627 TableOption {
628 retention_seconds: Some(7200_u32),
629 },
630 );
631
632 table_id_to_options.insert(
633 8.into(),
634 TableOption {
635 retention_seconds: Some(7200_u32),
636 },
637 );
638
639 table_id_to_options.insert(
640 9.into(),
641 TableOption {
642 retention_seconds: Some(7200_u32),
643 },
644 );
645
646 let expect_task_file_count = [1, 1, 1];
647 let expect_task_sst_id_range = [vec![2], vec![3], vec![4]];
648 for (index, x) in expect_task_file_count.iter().enumerate() {
649 let task = selector
651 .pick_compaction(
652 1,
653 compaction_selector_context(
654 &group_config,
655 &levels,
656 &BTreeSet::new(),
657 &mut levels_handler,
658 &mut local_stats,
659 &table_id_to_options,
660 Arc::new(CompactionDeveloperConfig::default()),
661 &Default::default(),
662 &HummockVersionStateTableInfo::empty(),
663 ),
664 )
665 .unwrap();
666
667 assert_compaction_task(&task, &levels_handler);
668 assert_eq!(task.input.input_levels.len(), 2);
669 assert_eq!(task.input.input_levels[0].level_idx, 4);
670
671 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
673 let select_sst = &task.input.input_levels[0]
674 .table_infos
675 .iter()
676 .map(|sst| sst.sst_id)
677 .collect_vec();
678 assert!(select_sst.is_sorted());
679 assert_eq!(expect_task_sst_id_range[index], *select_sst);
680
681 assert_eq!(task.input.input_levels[1].level_idx, 4);
682 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
683 assert_eq!(task.input.target_level, 4);
684 assert!(matches!(
685 task.compaction_task_type,
686 compact_task::TaskType::Ttl
687 ));
688 }
689 }
690
691 {
692 for level_handler in &mut levels_handler {
694 for pending_task_id in &level_handler.pending_tasks_ids() {
695 level_handler.remove_task(*pending_task_id);
696 }
697 }
698
699 selector = TtlCompactionSelector::default();
701 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
702 .map(|table_id| {
703 (
704 table_id.into(),
705 TableOption {
706 retention_seconds: Some(5_u32),
707 },
708 )
709 })
710 .collect();
711
712 table_id_to_options.insert(
714 5.into(),
715 TableOption {
716 retention_seconds: Some(7200_u32),
717 },
718 );
719
720 table_id_to_options.insert(
721 8.into(),
722 TableOption {
723 retention_seconds: Some(7200_u32),
724 },
725 );
726
727 table_id_to_options.insert(
728 9.into(),
729 TableOption {
730 retention_seconds: Some(7200_u32),
731 },
732 );
733
734 let expect_task_file_count = [1, 1];
735 let expect_task_sst_id_range = [vec![2], vec![3]];
736 for (index, x) in expect_task_file_count.iter().enumerate() {
737 if index == expect_task_file_count.len() - 1 {
738 table_id_to_options.insert(
739 5.into(),
740 TableOption {
741 retention_seconds: Some(5_u32),
742 },
743 );
744 }
745
746 let task = selector
748 .pick_compaction(
749 1,
750 compaction_selector_context(
751 &group_config,
752 &levels,
753 &BTreeSet::new(),
754 &mut levels_handler,
755 &mut local_stats,
756 &table_id_to_options,
757 Arc::new(CompactionDeveloperConfig::default()),
758 &Default::default(),
759 &HummockVersionStateTableInfo::empty(),
760 ),
761 )
762 .unwrap();
763
764 assert_compaction_task(&task, &levels_handler);
765 assert_eq!(task.input.input_levels.len(), 2);
766 assert_eq!(task.input.input_levels[0].level_idx, 4);
767
768 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
770 let select_sst = &task.input.input_levels[0]
771 .table_infos
772 .iter()
773 .map(|sst| sst.sst_id)
774 .collect_vec();
775 assert!(select_sst.is_sorted());
776 assert_eq!(expect_task_sst_id_range[index], *select_sst);
777
778 assert_eq!(task.input.input_levels[1].level_idx, 4);
779 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
780 assert_eq!(task.input.target_level, 4);
781 assert!(matches!(
782 task.compaction_task_type,
783 compact_task::TaskType::Ttl
784 ));
785 }
786 }
787 }
788}