risingwave_meta/hummock/compaction/picker/
vnode_watermark_picker.rs1use std::collections::BTreeMap;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::key::{FullKey, TableKey};
19use risingwave_hummock_sdk::level::{InputLevel, Levels};
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_watermark::ReadTableWatermark;
22
23use crate::hummock::compaction::picker::CompactionInput;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct VnodeWatermarkCompactionPicker {}
27
28impl VnodeWatermarkCompactionPicker {
29 pub fn new() -> Self {
30 Self {}
31 }
32
33 pub fn pick_compaction(
36 &mut self,
37 levels: &Levels,
38 level_handlers: &[LevelHandler],
39 table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
40 ) -> Option<CompactionInput> {
41 let level = levels.levels.last()?;
42 let mut select_input_ssts = vec![];
43 for sst_info in &level.table_infos {
44 if !level_handlers[level.level_idx as usize].is_pending_compact(&sst_info.sst_id)
45 && should_delete_sst_by_watermark(sst_info, table_watermarks)
46 {
47 select_input_ssts.push(sst_info.clone());
48 }
49 }
50 if select_input_ssts.is_empty() {
51 return None;
52 }
53 Some(CompactionInput {
54 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
55 total_file_count: select_input_ssts.len() as u64,
56 input_levels: vec![
57 InputLevel {
58 level_idx: level.level_idx,
59 level_type: level.level_type,
60 table_infos: select_input_ssts,
61 },
62 InputLevel {
63 level_idx: level.level_idx,
64 level_type: level.level_type,
65 table_infos: vec![],
66 },
67 ],
68 target_level: level.level_idx as usize,
69 target_sub_level_id: level.sub_level_id,
70 ..Default::default()
71 })
72 }
73}
74
75fn should_delete_sst_by_watermark(
76 sst_info: &SstableInfo,
77 table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
78) -> bool {
79 let left_key = FullKey::decode(&sst_info.key_range.left);
82 let right_key = FullKey::decode(&sst_info.key_range.right);
83 if left_key.user_key.table_id != right_key.user_key.table_id {
84 return false;
85 }
86 if left_key.user_key.table_key.vnode_part() != right_key.user_key.table_key.vnode_part() {
87 return false;
88 }
89 let Some(watermarks) = table_watermarks.get(&left_key.user_key.table_id) else {
90 return false;
91 };
92 should_delete_key_by_watermark(&left_key.user_key.table_key, watermarks)
93 && should_delete_key_by_watermark(&right_key.user_key.table_key, watermarks)
94}
95
96fn should_delete_key_by_watermark(
97 table_key: &TableKey<&[u8]>,
98 watermark: &ReadTableWatermark,
99) -> bool {
100 let (vnode, key) = table_key.split_vnode();
101 let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
102 return false;
103 };
104 watermark.direction.key_filter_by_watermark(key, w)
105}
106
107#[cfg(test)]
108mod tests {
109 use bytes::{BufMut, Bytes, BytesMut};
110 use risingwave_common::hash::VirtualNode;
111 use risingwave_hummock_sdk::key::{FullKey, TableKey};
112 use risingwave_hummock_sdk::key_range::KeyRange;
113 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
114 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
115
116 use crate::hummock::compaction::picker::vnode_watermark_picker::should_delete_sst_by_watermark;
117
118 #[test]
119 fn test_should_delete_sst_by_watermark() {
120 let table_watermarks = maplit::btreemap! {
121 1.into() => ReadTableWatermark {
122 direction: WatermarkDirection::Ascending,
123 vnode_watermarks: maplit::btreemap! {
124 VirtualNode::from_index(16) => "some_watermark_key_8".into(),
125 VirtualNode::from_index(17) => "some_watermark_key_8".into(),
126 },
127 },
128 };
129 let table_key = |vnode_part: usize, key_part: &str| {
130 let mut builder = BytesMut::new();
131 builder.put_slice(&VirtualNode::from_index(vnode_part).to_be_bytes());
132 builder.put_slice(&Bytes::copy_from_slice(key_part.as_bytes()));
133 TableKey(builder.freeze())
134 };
135
136 let sst_info = SstableInfoInner {
137 object_id: 1,
138 sst_id: 1,
139 key_range: KeyRange {
140 left: FullKey::new(2.into(), table_key(16, "some_watermark_key_1"), 0)
141 .encode()
142 .into(),
143 right: FullKey::new(2.into(), table_key(16, "some_watermark_key_2"), 0)
144 .encode()
145 .into(),
146 right_exclusive: true,
147 },
148 table_ids: vec![2],
149 ..Default::default()
150 }
151 .into();
152 assert!(
153 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
154 "should fail because no matching watermark found"
155 );
156
157 let sst_info = SstableInfoInner {
158 object_id: 1,
159 sst_id: 1,
160 key_range: KeyRange {
161 left: FullKey::new(1.into(), table_key(13, "some_watermark_key_1"), 0)
162 .encode()
163 .into(),
164 right: FullKey::new(1.into(), table_key(14, "some_watermark_key_2"), 0)
165 .encode()
166 .into(),
167 right_exclusive: true,
168 },
169 table_ids: vec![1],
170 ..Default::default()
171 }
172 .into();
173 assert!(
174 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
175 "should fail because no matching vnode found"
176 );
177
178 let sst_info = SstableInfoInner {
179 object_id: 1,
180 sst_id: 1,
181 key_range: KeyRange {
182 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
183 .encode()
184 .into(),
185 right: FullKey::new(1.into(), table_key(17, "some_watermark_key_2"), 0)
186 .encode()
187 .into(),
188 right_exclusive: true,
189 },
190 table_ids: vec![1],
191 ..Default::default()
192 }
193 .into();
194 assert!(
195 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
196 "should fail because different vnodes found"
197 );
198
199 let sst_info = SstableInfoInner {
200 object_id: 1,
201 sst_id: 1,
202 key_range: KeyRange {
203 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
204 .encode()
205 .into(),
206 right: FullKey::new(1.into(), table_key(16, "some_watermark_key_9"), 0)
207 .encode()
208 .into(),
209 right_exclusive: true,
210 },
211 table_ids: vec![1],
212 ..Default::default()
213 }
214 .into();
215 assert!(
216 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
217 "should fail because right key is greater than watermark"
218 );
219
220 let sst_info = SstableInfoInner {
221 object_id: 1,
222 sst_id: 1,
223 key_range: KeyRange {
224 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
225 .encode()
226 .into(),
227 right: FullKey::new(1.into(), table_key(16, "some_watermark_key_2"), 0)
228 .encode()
229 .into(),
230 right_exclusive: true,
231 },
232 table_ids: vec![1],
233 ..Default::default()
234 }
235 .into();
236 assert!(should_delete_sst_by_watermark(&sst_info, &table_watermarks));
237 }
238}