1use std::collections::{HashMap, HashSet};
16
17use bytes::Bytes;
18use risingwave_common::catalog::{TableId, TableOption};
19use risingwave_common::util::epoch::Epoch;
20use risingwave_hummock_sdk::compaction_group::StateTableId;
21use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
22use risingwave_hummock_sdk::level::{InputLevel, Levels};
23use risingwave_hummock_sdk::sstable_info::SstableInfo;
24
25use super::CompactionInput;
26use crate::hummock::level_handler::LevelHandler;
27
28const MIN_TTL_EXPIRE_INTERVAL_MS: u64 = 60 * 60 * 1000; #[derive(Default)]
31pub struct TtlPickerState {
32 pub last_select_end_bound: KeyRange,
38
39 pub end_bound_in_round: KeyRange,
41}
42
43impl TtlPickerState {
44 pub fn valid(&self) -> bool {
45 !self.end_bound_in_round.right.is_empty()
46 }
47
48 pub fn init(&mut self, key_range: KeyRange) {
49 self.last_select_end_bound = KeyRange {
50 left: Bytes::default(),
51 right: key_range.left.clone(),
52 right_exclusive: true,
53 };
54 self.end_bound_in_round = key_range;
55 }
56
57 pub fn clear(&mut self) {
58 self.end_bound_in_round = KeyRange::default();
59 self.last_select_end_bound = KeyRange::default();
60 }
61}
62
63pub struct TtlReclaimCompactionPicker {
64 table_id_to_ttl: HashMap<TableId, u32>,
65}
66
67impl TtlReclaimCompactionPicker {
68 pub fn new(table_id_to_options: &HashMap<StateTableId, TableOption>) -> Self {
69 let table_id_to_ttl: HashMap<TableId, u32> = table_id_to_options
70 .iter()
71 .filter(|id_to_option| {
72 let table_option = id_to_option.1;
73 table_option.retention_seconds.is_some()
74 })
75 .map(|id_to_option| (*id_to_option.0, id_to_option.1.retention_seconds.unwrap()))
76 .collect();
77
78 Self { table_id_to_ttl }
79 }
80
81 fn filter(&self, sst: &SstableInfo, current_epoch_physical_time: u64) -> bool {
82 let table_id_in_sst = sst.table_ids.iter().cloned().collect::<HashSet<_>>();
83 let expire_epoch =
84 Epoch::from_physical_time(current_epoch_physical_time - MIN_TTL_EXPIRE_INTERVAL_MS);
85
86 for table_id in table_id_in_sst {
87 match self.table_id_to_ttl.get(&table_id) {
88 Some(ttl_second_u32) => {
89 assert!(*ttl_second_u32 > 0);
90 let ttl_mill = *ttl_second_u32 as u64 * 1000;
92 let min_epoch = expire_epoch.subtract_ms(ttl_mill);
93 if Epoch(sst.min_epoch) < min_epoch {
94 return false;
95 }
96 }
97 None => continue,
98 }
99 }
100
101 true
102 }
103}
104
105impl TtlReclaimCompactionPicker {
106 pub fn pick_compaction(
107 &self,
108 levels: &Levels,
109 level_handlers: &[LevelHandler],
110 state: &mut TtlPickerState,
111 ) -> Option<CompactionInput> {
112 assert!(!levels.levels.is_empty());
113 let reclaimed_level = levels.levels.last().unwrap();
114 let mut select_input_ssts = vec![];
115 let level_handler = &level_handlers[reclaimed_level.level_idx as usize];
116
117 if reclaimed_level.table_infos.is_empty() {
118 state.clear();
120 return None;
121 }
122
123 if state.valid()
124 && state
125 .last_select_end_bound
126 .compare_right_with(&state.end_bound_in_round.right)
127 == std::cmp::Ordering::Greater
128 {
129 state.clear();
132 return None;
133 }
134
135 if !state.valid() {
136 let first_sst = reclaimed_level.table_infos.first().unwrap();
138 let last_sst = reclaimed_level.table_infos.last().unwrap();
139
140 let key_range_this_round = KeyRange {
141 left: first_sst.key_range.left.clone(),
142 right: last_sst.key_range.right.clone(),
143 right_exclusive: last_sst.key_range.right_exclusive,
144 };
145
146 state.init(key_range_this_round);
147 }
148
149 let current_epoch_physical_time = Epoch::now().physical_time();
150
151 for sst in &reclaimed_level.table_infos {
152 let unmatched_sst = sst.key_range.sstable_overlap(&state.last_select_end_bound);
153
154 if unmatched_sst
155 || level_handler.is_pending_compact(&sst.sst_id)
156 || self.filter(sst, current_epoch_physical_time)
157 {
158 continue;
159 }
160
161 select_input_ssts.push(sst.clone());
162 break;
163 }
164
165 if select_input_ssts.is_empty() {
167 state.clear();
168 return None;
169 }
170
171 let select_last_sst = select_input_ssts.last().unwrap();
172 state.last_select_end_bound.full_key_extend(&KeyRange {
173 left: Bytes::default(),
174 right: select_last_sst.key_range.right.clone(),
175 right_exclusive: select_last_sst.key_range.right_exclusive,
176 });
177
178 Some(CompactionInput {
179 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
180 total_file_count: select_input_ssts.len() as _,
181 input_levels: vec![
182 InputLevel {
183 level_idx: reclaimed_level.level_idx,
184 level_type: reclaimed_level.level_type,
185 table_infos: select_input_ssts,
186 },
187 InputLevel {
188 level_idx: reclaimed_level.level_idx,
189 level_type: reclaimed_level.level_type,
190 table_infos: vec![],
191 },
192 ],
193 target_level: reclaimed_level.level_idx as usize,
194 ..Default::default()
195 })
196 }
197}
198
199#[cfg(test)]
200mod test {
201 use std::collections::BTreeSet;
202 use std::sync::Arc;
203
204 use itertools::Itertools;
205 use risingwave_common::catalog::TableId;
206 use risingwave_hummock_sdk::level::Level;
207 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
208 use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
209 pub use risingwave_pb::hummock::LevelType;
210 use risingwave_pb::hummock::compact_task;
211
212 use super::*;
213 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
214 use crate::hummock::compaction::selector::tests::{
215 assert_compaction_task, generate_l0_nonoverlapping_sublevels, generate_level,
216 generate_table_with_ids_and_epochs,
217 };
218 use crate::hummock::compaction::selector::{CompactionSelector, TtlCompactionSelector};
219 use crate::hummock::compaction::{CompactionDeveloperConfig, LocalSelectorStatistic};
220 use crate::hummock::model::CompactionGroup;
221 use crate::hummock::test_utils::compaction_selector_context;
222
223 #[test]
224 fn test_ttl_reclaim_compaction_selector() {
225 let config = CompactionConfigBuilder::new()
226 .max_level(4)
227 .max_space_reclaim_bytes(400)
228 .build();
229 let group_config = CompactionGroup::new(1, config);
230 let l0 = generate_l0_nonoverlapping_sublevels(vec![]);
231 assert_eq!(l0.sub_levels.len(), 0);
232
233 let current_epoch_time = Epoch::now().physical_time();
234 let expired_epoch = Epoch::from_physical_time(
235 current_epoch_time - MIN_TTL_EXPIRE_INTERVAL_MS - (1000 * 1000),
236 )
237 .0;
238 let mut levels = vec![
239 generate_level(1, vec![]),
240 generate_level(2, vec![]),
241 generate_level(
242 3,
243 vec![
244 generate_table_with_ids_and_epochs(0, 1, 150, 151, 1, vec![0], 0, 0),
245 generate_table_with_ids_and_epochs(1, 1, 250, 251, 1, vec![1], 0, 0),
246 ],
247 ),
248 Level {
249 level_idx: 4,
250 level_type: LevelType::Nonoverlapping,
251 table_infos: vec![
252 generate_table_with_ids_and_epochs(2, 1, 0, 100, 1, vec![2], expired_epoch, 0),
253 generate_table_with_ids_and_epochs(
254 3,
255 1,
256 101,
257 200,
258 1,
259 vec![3],
260 expired_epoch,
261 0,
262 ),
263 generate_table_with_ids_and_epochs(
264 4,
265 1,
266 222,
267 300,
268 1,
269 vec![4],
270 expired_epoch,
271 u64::MAX,
272 ),
273 generate_table_with_ids_and_epochs(
274 5,
275 1,
276 333,
277 400,
278 1,
279 vec![5],
280 expired_epoch,
281 u64::MAX,
282 ),
283 generate_table_with_ids_and_epochs(
284 6,
285 1,
286 444,
287 500,
288 1,
289 vec![6],
290 expired_epoch,
291 u64::MAX,
292 ),
293 generate_table_with_ids_and_epochs(
294 7,
295 1,
296 555,
297 600,
298 1,
299 vec![7],
300 expired_epoch,
301 u64::MAX,
302 ),
303 generate_table_with_ids_and_epochs(
304 8,
305 1,
306 666,
307 700,
308 1,
309 vec![8],
310 expired_epoch,
311 u64::MAX,
312 ),
313 generate_table_with_ids_and_epochs(
314 9,
315 1,
316 777,
317 800,
318 1,
319 vec![9],
320 expired_epoch,
321 u64::MAX,
322 ),
323 generate_table_with_ids_and_epochs(
324 10,
325 1,
326 888,
327 1600,
328 1,
329 vec![10],
330 expired_epoch,
331 u64::MAX,
332 ),
333 generate_table_with_ids_and_epochs(
334 11,
335 1,
336 1600,
337 1800,
338 1,
339 vec![10],
340 expired_epoch,
341 u64::MAX,
342 ),
343 ],
344 total_file_size: 0,
345 sub_level_id: 0,
346 uncompressed_file_size: 0,
347 ..Default::default()
348 },
349 ];
350
351 {
352 let sst_10 = levels[3].table_infos.get_mut(8).unwrap();
353 assert_eq!(10, sst_10.sst_id);
354 *sst_10 = SstableInfoInner {
355 key_range: KeyRange {
356 right_exclusive: true,
357 ..sst_10.key_range.clone()
358 },
359 ..sst_10.get_inner()
360 }
361 .into();
362 }
363
364 assert_eq!(levels.len(), 4);
365 let levels = Levels {
366 levels,
367 l0,
368 ..Default::default()
369 };
370 let mut levels_handler = (0..5).map(LevelHandler::new).collect_vec();
371 let mut local_stats = LocalSelectorStatistic::default();
372 let mut selector = TtlCompactionSelector::default();
373 {
374 let table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
375 .map(|table_id| {
376 (
377 table_id.into(),
378 TableOption {
379 retention_seconds: Some(5_u32),
380 },
381 )
382 })
383 .collect();
384 let task = selector
386 .pick_compaction(
387 1,
388 compaction_selector_context(
389 &group_config,
390 &levels,
391 &BTreeSet::new(),
392 &mut levels_handler,
393 &mut local_stats,
394 &table_id_to_options,
395 Arc::new(CompactionDeveloperConfig::default()),
396 &Default::default(),
397 &HummockVersionStateTableInfo::empty(),
398 ),
399 )
400 .unwrap();
401 assert_compaction_task(&task, &levels_handler);
402 assert_eq!(task.input.input_levels.len(), 2);
403 assert_eq!(task.input.input_levels[0].level_idx, 4);
404 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
405
406 let mut start_id = 2;
407 for sst in &task.input.input_levels[0].table_infos {
408 assert_eq!(start_id, sst.sst_id);
409 start_id += 1;
410 }
411
412 assert_eq!(task.input.input_levels[1].level_idx, 4);
413 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
414 assert_eq!(task.input.target_level, 4);
415 assert!(matches!(
416 task.compaction_task_type,
417 compact_task::TaskType::Ttl
418 ));
419 }
420
421 {
422 for level_handler in &mut levels_handler {
423 for pending_task_id in &level_handler.pending_tasks_ids() {
424 level_handler.remove_task(*pending_task_id);
425 }
426 }
427
428 let table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
429 .map(|table_id| {
430 (
431 table_id.into(),
432 TableOption {
433 retention_seconds: Some(5_u32),
434 },
435 )
436 })
437 .collect();
438
439 let task = selector
441 .pick_compaction(
442 1,
443 compaction_selector_context(
444 &group_config,
445 &levels,
446 &BTreeSet::new(),
447 &mut levels_handler,
448 &mut local_stats,
449 &table_id_to_options,
450 Arc::new(CompactionDeveloperConfig::default()),
451 &Default::default(),
452 &HummockVersionStateTableInfo::empty(),
453 ),
454 )
455 .unwrap();
456 assert_compaction_task(&task, &levels_handler);
457 assert_eq!(task.input.input_levels.len(), 2);
458 assert_eq!(task.input.input_levels[0].level_idx, 4);
459
460 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
462
463 let mut start_id = 3;
464 for sst in &task.input.input_levels[0].table_infos {
465 assert_eq!(start_id, sst.sst_id);
466 start_id += 1;
467 }
468
469 assert_eq!(task.input.input_levels[1].level_idx, 4);
470 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
471 assert_eq!(task.input.target_level, 4);
472 assert!(matches!(
473 task.compaction_task_type,
474 compact_task::TaskType::Ttl
475 ));
476
477 let task = selector
479 .pick_compaction(
480 1,
481 compaction_selector_context(
482 &group_config,
483 &levels,
484 &BTreeSet::new(),
485 &mut levels_handler,
486 &mut local_stats,
487 &table_id_to_options,
488 Arc::new(CompactionDeveloperConfig::default()),
489 &Default::default(),
490 &HummockVersionStateTableInfo::empty(),
491 ),
492 )
493 .unwrap();
494 assert_compaction_task(&task, &levels_handler);
495 assert_eq!(task.input.input_levels.len(), 2);
496 assert_eq!(task.input.input_levels[0].level_idx, 4);
497 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
498 assert_eq!(task.input.input_levels[1].level_idx, 4);
499 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
500 assert_eq!(task.input.target_level, 4);
501 assert!(matches!(
502 task.compaction_task_type,
503 compact_task::TaskType::Ttl
504 ));
505 for sst in &task.input.input_levels[0].table_infos {
506 assert_eq!(start_id, sst.sst_id);
507 start_id += 1;
508 }
509 }
510
511 {
512 for level_handler in &mut levels_handler {
513 for pending_task_id in &level_handler.pending_tasks_ids() {
514 level_handler.remove_task(*pending_task_id);
515 }
516 }
517
518 selector = TtlCompactionSelector::default();
520 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
521 .map(|table_id| {
522 (
523 table_id.into(),
524 TableOption {
525 retention_seconds: Some(7200),
526 },
527 )
528 })
529 .collect();
530
531 table_id_to_options.insert(
532 5.into(),
533 TableOption {
534 retention_seconds: Some(5),
535 },
536 );
537
538 let task = selector
540 .pick_compaction(
541 1,
542 compaction_selector_context(
543 &group_config,
544 &levels,
545 &BTreeSet::new(),
546 &mut levels_handler,
547 &mut local_stats,
548 &table_id_to_options,
549 Arc::new(CompactionDeveloperConfig::default()),
550 &Default::default(),
551 &HummockVersionStateTableInfo::empty(),
552 ),
553 )
554 .unwrap();
555 assert_compaction_task(&task, &levels_handler);
556 assert_eq!(task.input.input_levels.len(), 2);
557 assert_eq!(task.input.input_levels[0].level_idx, 4);
558
559 assert_eq!(task.input.input_levels[0].table_infos.len(), 1);
561 let select_sst = &task.input.input_levels[0].table_infos.first().unwrap();
562 assert_eq!(select_sst.sst_id, 5);
563
564 assert_eq!(task.input.input_levels[1].level_idx, 4);
565 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
566 assert_eq!(task.input.target_level, 4);
567 assert!(matches!(
568 task.compaction_task_type,
569 compact_task::TaskType::Ttl
570 ));
571 }
572
573 {
574 for level_handler in &mut levels_handler {
577 for pending_task_id in &level_handler.pending_tasks_ids() {
578 level_handler.remove_task(*pending_task_id);
579 }
580 }
581
582 selector = TtlCompactionSelector::default();
584
585 let task = selector.pick_compaction(
587 1,
588 compaction_selector_context(
589 &group_config,
590 &levels,
591 &BTreeSet::new(),
592 &mut levels_handler,
593 &mut local_stats,
594 &HashMap::default(),
595 Arc::new(CompactionDeveloperConfig::default()),
596 &Default::default(),
597 &HummockVersionStateTableInfo::empty(),
598 ),
599 );
600
601 assert!(task.is_none());
603 }
604
605 {
606 for level_handler in &mut levels_handler {
608 for pending_task_id in &level_handler.pending_tasks_ids() {
609 level_handler.remove_task(*pending_task_id);
610 }
611 }
612
613 selector = TtlCompactionSelector::default();
615 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
616 .map(|table_id| {
617 (
618 table_id.into(),
619 TableOption {
620 retention_seconds: Some(5_u32),
621 },
622 )
623 })
624 .collect();
625
626 table_id_to_options.insert(
628 5.into(),
629 TableOption {
630 retention_seconds: Some(7200_u32),
631 },
632 );
633
634 table_id_to_options.insert(
635 8.into(),
636 TableOption {
637 retention_seconds: Some(7200_u32),
638 },
639 );
640
641 table_id_to_options.insert(
642 9.into(),
643 TableOption {
644 retention_seconds: Some(7200_u32),
645 },
646 );
647
648 let expect_task_file_count = [1, 1, 1];
649 let expect_task_sst_id_range = [vec![2], vec![3], vec![4]];
650 for (index, x) in expect_task_file_count.iter().enumerate() {
651 let task = selector
653 .pick_compaction(
654 1,
655 compaction_selector_context(
656 &group_config,
657 &levels,
658 &BTreeSet::new(),
659 &mut levels_handler,
660 &mut local_stats,
661 &table_id_to_options,
662 Arc::new(CompactionDeveloperConfig::default()),
663 &Default::default(),
664 &HummockVersionStateTableInfo::empty(),
665 ),
666 )
667 .unwrap();
668
669 assert_compaction_task(&task, &levels_handler);
670 assert_eq!(task.input.input_levels.len(), 2);
671 assert_eq!(task.input.input_levels[0].level_idx, 4);
672
673 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
675 let select_sst = &task.input.input_levels[0]
676 .table_infos
677 .iter()
678 .map(|sst| sst.sst_id)
679 .collect_vec();
680 assert!(select_sst.is_sorted());
681 assert_eq!(expect_task_sst_id_range[index], *select_sst);
682
683 assert_eq!(task.input.input_levels[1].level_idx, 4);
684 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
685 assert_eq!(task.input.target_level, 4);
686 assert!(matches!(
687 task.compaction_task_type,
688 compact_task::TaskType::Ttl
689 ));
690 }
691 }
692
693 {
694 for level_handler in &mut levels_handler {
696 for pending_task_id in &level_handler.pending_tasks_ids() {
697 level_handler.remove_task(*pending_task_id);
698 }
699 }
700
701 selector = TtlCompactionSelector::default();
703 let mut table_id_to_options: HashMap<TableId, TableOption> = (2..=10)
704 .map(|table_id| {
705 (
706 table_id.into(),
707 TableOption {
708 retention_seconds: Some(5_u32),
709 },
710 )
711 })
712 .collect();
713
714 table_id_to_options.insert(
716 5.into(),
717 TableOption {
718 retention_seconds: Some(7200_u32),
719 },
720 );
721
722 table_id_to_options.insert(
723 8.into(),
724 TableOption {
725 retention_seconds: Some(7200_u32),
726 },
727 );
728
729 table_id_to_options.insert(
730 9.into(),
731 TableOption {
732 retention_seconds: Some(7200_u32),
733 },
734 );
735
736 let expect_task_file_count = [1, 1];
737 let expect_task_sst_id_range = [vec![2], vec![3]];
738 for (index, x) in expect_task_file_count.iter().enumerate() {
739 if index == expect_task_file_count.len() - 1 {
740 table_id_to_options.insert(
741 5.into(),
742 TableOption {
743 retention_seconds: Some(5_u32),
744 },
745 );
746 }
747
748 let task = selector
750 .pick_compaction(
751 1,
752 compaction_selector_context(
753 &group_config,
754 &levels,
755 &BTreeSet::new(),
756 &mut levels_handler,
757 &mut local_stats,
758 &table_id_to_options,
759 Arc::new(CompactionDeveloperConfig::default()),
760 &Default::default(),
761 &HummockVersionStateTableInfo::empty(),
762 ),
763 )
764 .unwrap();
765
766 assert_compaction_task(&task, &levels_handler);
767 assert_eq!(task.input.input_levels.len(), 2);
768 assert_eq!(task.input.input_levels[0].level_idx, 4);
769
770 assert_eq!(task.input.input_levels[0].table_infos.len(), *x);
772 let select_sst = &task.input.input_levels[0]
773 .table_infos
774 .iter()
775 .map(|sst| sst.sst_id)
776 .collect_vec();
777 assert!(select_sst.is_sorted());
778 assert_eq!(expect_task_sst_id_range[index], *select_sst);
779
780 assert_eq!(task.input.input_levels[1].level_idx, 4);
781 assert_eq!(task.input.input_levels[1].table_infos.len(), 0);
782 assert_eq!(task.input.target_level, 4);
783 assert!(matches!(
784 task.compaction_task_type,
785 compact_task::TaskType::Ttl
786 ));
787 }
788 }
789 }
790}