risingwave_meta/hummock/compaction/selector/
vnode_watermark_selector.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockCompactionTaskId;
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
21 safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
22};
23use risingwave_hummock_sdk::table_watermark::{
24 ReadTableWatermark, TableWatermarks, WatermarkSerdeType,
25};
26use risingwave_pb::hummock::compact_task::TaskType;
27
28use crate::hummock::compaction::picker::VnodeWatermarkCompactionPicker;
29use crate::hummock::compaction::selector::{CompactionSelectorContext, DynamicLevelSelectorCore};
30use crate::hummock::compaction::{CompactionSelector, CompactionTask, create_compaction_task};
31#[derive(Default)]
32pub struct VnodeWatermarkCompactionSelector {}
33
34impl CompactionSelector for VnodeWatermarkCompactionSelector {
35 fn pick_compaction(
36 &mut self,
37 task_id: HummockCompactionTaskId,
38 context: CompactionSelectorContext<'_>,
39 ) -> Option<CompactionTask> {
40 let CompactionSelectorContext {
41 group,
42 levels,
43 level_handlers,
44 developer_config,
45 table_watermarks,
46 state_table_info: _,
47 member_table_ids,
48 in_progress_compactions,
49 ..
50 } = context;
51 let dynamic_level_core =
52 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
53 let ctx = dynamic_level_core.calculate_level_base_size(levels);
54 let mut picker = VnodeWatermarkCompactionPicker::new();
55 let pk_table_watermarks =
56 safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
57 let compaction_input =
58 picker.pick_compaction(levels, level_handlers, &pk_table_watermarks)?;
59 if !compaction_input.skip_target_range_conflict_check
60 && in_progress_compactions.has_conflict_with_input(&compaction_input)
61 {
62 return None;
63 }
64 compaction_input.add_pending_task(task_id, level_handlers);
65 Some(create_compaction_task(
66 dynamic_level_core.get_config(),
67 compaction_input,
68 ctx.base_level,
69 self.task_type(),
70 ))
71 }
72
73 fn name(&self) -> &'static str {
74 "VnodeWatermarkCompaction"
75 }
76
77 fn task_type(&self) -> TaskType {
78 TaskType::VnodeWatermark
79 }
80}
81
82fn safe_epoch_read_table_watermarks(
83 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
84 member_table_ids: &BTreeSet<TableId>,
85) -> BTreeMap<TableId, ReadTableWatermark> {
86 safe_epoch_read_table_watermarks_impl(
87 safe_epoch_table_watermarks_impl(
88 table_watermarks,
89 &member_table_ids.iter().copied().collect::<Vec<_>>(),
90 )
91 .into_iter()
92 .filter(|(_table_id, table_watermarks)| {
93 {
94 matches!(
95 table_watermarks.watermark_type,
96 WatermarkSerdeType::PkPrefix
97 )
98 }
99 })
100 .collect(),
101 )
102}