risingwave_meta/hummock/compaction/selector/
vnode_watermark_selector.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockCompactionTaskId;
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
21 safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
22};
23use risingwave_hummock_sdk::table_watermark::{
24 ReadTableWatermark, TableWatermarks, WatermarkSerdeType,
25};
26use risingwave_pb::hummock::compact_task::TaskType;
27
28use crate::hummock::compaction::picker::VnodeWatermarkCompactionPicker;
29use crate::hummock::compaction::selector::{CompactionSelectorContext, DynamicLevelSelectorCore};
30use crate::hummock::compaction::{CompactionSelector, CompactionTask, create_compaction_task};
31#[derive(Default)]
32pub struct VnodeWatermarkCompactionSelector {}
33
34impl CompactionSelector for VnodeWatermarkCompactionSelector {
35 fn pick_compaction(
36 &mut self,
37 task_id: HummockCompactionTaskId,
38 context: CompactionSelectorContext<'_>,
39 ) -> Option<CompactionTask> {
40 let CompactionSelectorContext {
41 group,
42 levels,
43 level_handlers,
44 developer_config,
45 table_watermarks,
46 state_table_info: _,
47 member_table_ids,
48 ..
49 } = context;
50 let dynamic_level_core =
51 DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
52 let ctx = dynamic_level_core.calculate_level_base_size(levels);
53 let mut picker = VnodeWatermarkCompactionPicker::new();
54 let pk_table_watermarks =
55 safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
56 let compaction_input =
57 picker.pick_compaction(levels, level_handlers, &pk_table_watermarks)?;
58 compaction_input.add_pending_task(task_id, level_handlers);
59 Some(create_compaction_task(
60 dynamic_level_core.get_config(),
61 compaction_input,
62 ctx.base_level,
63 self.task_type(),
64 ))
65 }
66
67 fn name(&self) -> &'static str {
68 "VnodeWatermarkCompaction"
69 }
70
71 fn task_type(&self) -> TaskType {
72 TaskType::VnodeWatermark
73 }
74}
75
76fn safe_epoch_read_table_watermarks(
77 table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
78 member_table_ids: &BTreeSet<TableId>,
79) -> BTreeMap<TableId, ReadTableWatermark> {
80 safe_epoch_read_table_watermarks_impl(
81 safe_epoch_table_watermarks_impl(
82 table_watermarks,
83 &member_table_ids
84 .iter()
85 .map(TableId::table_id)
86 .collect::<Vec<_>>(),
87 )
88 .into_iter()
89 .filter(|(_table_id, table_watermarks)| {
90 {
91 matches!(
92 table_watermarks.watermark_type,
93 WatermarkSerdeType::PkPrefix
94 )
95 }
96 })
97 .collect(),
98 )
99}