risingwave_meta/hummock/compaction/picker/
vnode_watermark_picker.rs1use std::collections::BTreeMap;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::key::{FullKey, TableKey};
19use risingwave_hummock_sdk::level::{InputLevel, Levels};
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_watermark::ReadTableWatermark;
22
23use crate::hummock::compaction::picker::CompactionInput;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct VnodeWatermarkCompactionPicker {}
27
28impl VnodeWatermarkCompactionPicker {
29 pub fn new() -> Self {
30 Self {}
31 }
32
33 pub fn pick_compaction(
36 &mut self,
37 levels: &Levels,
38 level_handlers: &[LevelHandler],
39 table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
40 ) -> Option<CompactionInput> {
41 let level = levels.levels.last()?;
42 let mut select_input_ssts = vec![];
43 for sst_info in &level.table_infos {
44 if !level_handlers[level.level_idx as usize].is_pending_compact(&sst_info.sst_id)
45 && should_delete_sst_by_watermark(sst_info, table_watermarks)
46 {
47 select_input_ssts.push(sst_info.clone());
48 }
49 }
50 if select_input_ssts.is_empty() {
51 return None;
52 }
53 Some(CompactionInput {
54 select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
55 total_file_count: select_input_ssts.len() as u64,
56 input_levels: vec![
57 InputLevel {
58 level_idx: level.level_idx,
59 level_type: level.level_type,
60 table_infos: select_input_ssts,
61 },
62 InputLevel {
63 level_idx: level.level_idx,
64 level_type: level.level_type,
65 table_infos: vec![],
66 },
67 ],
68 target_level: level.level_idx as usize,
69 target_sub_level_id: level.sub_level_id,
70 skip_target_range_conflict_check: true,
71 ..Default::default()
72 })
73 }
74}
75
76fn should_delete_sst_by_watermark(
77 sst_info: &SstableInfo,
78 table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
79) -> bool {
80 let left_key = FullKey::decode(&sst_info.key_range.left);
83 let right_key = FullKey::decode(&sst_info.key_range.right);
84 if left_key.user_key.table_id != right_key.user_key.table_id {
85 return false;
86 }
87 if left_key.user_key.table_key.vnode_part() != right_key.user_key.table_key.vnode_part() {
88 return false;
89 }
90 let Some(watermarks) = table_watermarks.get(&left_key.user_key.table_id) else {
91 return false;
92 };
93 should_delete_key_by_watermark(&left_key.user_key.table_key, watermarks)
94 && should_delete_key_by_watermark(&right_key.user_key.table_key, watermarks)
95}
96
97fn should_delete_key_by_watermark(
98 table_key: &TableKey<&[u8]>,
99 watermark: &ReadTableWatermark,
100) -> bool {
101 let (vnode, key) = table_key.split_vnode();
102 let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
103 return false;
104 };
105 watermark.direction.key_filter_by_watermark(key, w)
106}
107
108#[cfg(test)]
109mod tests {
110 use bytes::{BufMut, Bytes, BytesMut};
111 use risingwave_common::hash::VirtualNode;
112 use risingwave_hummock_sdk::key::{FullKey, TableKey};
113 use risingwave_hummock_sdk::key_range::KeyRange;
114 use risingwave_hummock_sdk::level::{Level, Levels};
115 use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
116 use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
117 use risingwave_pb::hummock::LevelType;
118
119 use crate::hummock::compaction::picker::vnode_watermark_picker::{
120 VnodeWatermarkCompactionPicker, should_delete_sst_by_watermark,
121 };
122 use crate::hummock::level_handler::LevelHandler;
123
124 fn table_key(vnode_part: usize, key_part: &str) -> TableKey<Bytes> {
125 let mut builder = BytesMut::new();
126 builder.put_slice(&VirtualNode::from_index(vnode_part).to_be_bytes());
127 builder.put_slice(&Bytes::copy_from_slice(key_part.as_bytes()));
128 TableKey(builder.freeze())
129 }
130
131 #[test]
132 fn test_should_delete_sst_by_watermark() {
133 let table_watermarks = maplit::btreemap! {
134 1.into() => ReadTableWatermark {
135 direction: WatermarkDirection::Ascending,
136 vnode_watermarks: maplit::btreemap! {
137 VirtualNode::from_index(16) => "some_watermark_key_8".into(),
138 VirtualNode::from_index(17) => "some_watermark_key_8".into(),
139 },
140 },
141 };
142
143 let sst_info = SstableInfoInner {
144 object_id: 1.into(),
145 sst_id: 1.into(),
146 key_range: KeyRange {
147 left: FullKey::new(2.into(), table_key(16, "some_watermark_key_1"), 0)
148 .encode()
149 .into(),
150 right: FullKey::new(2.into(), table_key(16, "some_watermark_key_2"), 0)
151 .encode()
152 .into(),
153 right_exclusive: true,
154 },
155 table_ids: vec![2.into()],
156 ..Default::default()
157 }
158 .into();
159 assert!(
160 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
161 "should fail because no matching watermark found"
162 );
163
164 let sst_info = SstableInfoInner {
165 object_id: 1.into(),
166 sst_id: 1.into(),
167 key_range: KeyRange {
168 left: FullKey::new(1.into(), table_key(13, "some_watermark_key_1"), 0)
169 .encode()
170 .into(),
171 right: FullKey::new(1.into(), table_key(14, "some_watermark_key_2"), 0)
172 .encode()
173 .into(),
174 right_exclusive: true,
175 },
176 table_ids: vec![1.into()],
177 ..Default::default()
178 }
179 .into();
180 assert!(
181 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
182 "should fail because no matching vnode found"
183 );
184
185 let sst_info = SstableInfoInner {
186 object_id: 1.into(),
187 sst_id: 1.into(),
188 key_range: KeyRange {
189 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
190 .encode()
191 .into(),
192 right: FullKey::new(1.into(), table_key(17, "some_watermark_key_2"), 0)
193 .encode()
194 .into(),
195 right_exclusive: true,
196 },
197 table_ids: vec![1.into()],
198 ..Default::default()
199 }
200 .into();
201 assert!(
202 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
203 "should fail because different vnodes found"
204 );
205
206 let sst_info = SstableInfoInner {
207 object_id: 1.into(),
208 sst_id: 1.into(),
209 key_range: KeyRange {
210 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
211 .encode()
212 .into(),
213 right: FullKey::new(1.into(), table_key(16, "some_watermark_key_9"), 0)
214 .encode()
215 .into(),
216 right_exclusive: true,
217 },
218 table_ids: vec![1.into()],
219 ..Default::default()
220 }
221 .into();
222 assert!(
223 !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
224 "should fail because right key is greater than watermark"
225 );
226
227 let sst_info = SstableInfoInner {
228 object_id: 1.into(),
229 sst_id: 1.into(),
230 key_range: KeyRange {
231 left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
232 .encode()
233 .into(),
234 right: FullKey::new(1.into(), table_key(16, "some_watermark_key_2"), 0)
235 .encode()
236 .into(),
237 right_exclusive: true,
238 },
239 table_ids: vec![1.into()],
240 ..Default::default()
241 }
242 .into();
243 assert!(should_delete_sst_by_watermark(&sst_info, &table_watermarks));
244
245 let levels = Levels {
246 levels: vec![Level {
247 level_idx: 1,
248 level_type: LevelType::Nonoverlapping,
249 table_infos: vec![sst_info],
250 ..Default::default()
251 }],
252 ..Default::default()
253 };
254 let level_handlers = vec![LevelHandler::new(0), LevelHandler::new(1)];
255
256 let input = VnodeWatermarkCompactionPicker::new()
257 .pick_compaction(&levels, &level_handlers, &table_watermarks)
258 .unwrap();
259
260 assert!(input.skip_target_range_conflict_check);
261 }
262}