risingwave_meta/hummock/compaction/picker/
vnode_watermark_picker.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::key::{FullKey, TableKey};
19use risingwave_hummock_sdk::level::{InputLevel, Levels};
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_watermark::ReadTableWatermark;
22
23use crate::hummock::compaction::picker::CompactionInput;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct VnodeWatermarkCompactionPicker {}
27
28impl VnodeWatermarkCompactionPicker {
29    pub fn new() -> Self {
30        Self {}
31    }
32
33    /// The current implementation only picks trivial reclaim task for the bottommost level.
34    /// Must modify `is_trivial_reclaim`, if non-trivial reclaim is supported in the future.
35    pub fn pick_compaction(
36        &mut self,
37        levels: &Levels,
38        level_handlers: &[LevelHandler],
39        table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
40    ) -> Option<CompactionInput> {
41        let level = levels.levels.last()?;
42        let mut select_input_ssts = vec![];
43        for sst_info in &level.table_infos {
44            if !level_handlers[level.level_idx as usize].is_pending_compact(&sst_info.sst_id)
45                && should_delete_sst_by_watermark(sst_info, table_watermarks)
46            {
47                select_input_ssts.push(sst_info.clone());
48            }
49        }
50        if select_input_ssts.is_empty() {
51            return None;
52        }
53        Some(CompactionInput {
54            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
55            total_file_count: select_input_ssts.len() as u64,
56            input_levels: vec![
57                InputLevel {
58                    level_idx: level.level_idx,
59                    level_type: level.level_type,
60                    table_infos: select_input_ssts,
61                },
62                InputLevel {
63                    level_idx: level.level_idx,
64                    level_type: level.level_type,
65                    table_infos: vec![],
66                },
67            ],
68            target_level: level.level_idx as usize,
69            target_sub_level_id: level.sub_level_id,
70            ..Default::default()
71        })
72    }
73}
74
75fn should_delete_sst_by_watermark(
76    sst_info: &SstableInfo,
77    table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
78) -> bool {
79    // Both table id and vnode must be identical for both the left and right keys in a SST.
80    // As more data is written to the bottommost level, they will eventually become identical.
81    let left_key = FullKey::decode(&sst_info.key_range.left);
82    let right_key = FullKey::decode(&sst_info.key_range.right);
83    if left_key.user_key.table_id != right_key.user_key.table_id {
84        return false;
85    }
86    if left_key.user_key.table_key.vnode_part() != right_key.user_key.table_key.vnode_part() {
87        return false;
88    }
89    let Some(watermarks) = table_watermarks.get(&left_key.user_key.table_id) else {
90        return false;
91    };
92    should_delete_key_by_watermark(&left_key.user_key.table_key, watermarks)
93        && should_delete_key_by_watermark(&right_key.user_key.table_key, watermarks)
94}
95
96fn should_delete_key_by_watermark(
97    table_key: &TableKey<&[u8]>,
98    watermark: &ReadTableWatermark,
99) -> bool {
100    let (vnode, key) = table_key.split_vnode();
101    let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
102        return false;
103    };
104    watermark.direction.key_filter_by_watermark(key, w)
105}
106
107#[cfg(test)]
108mod tests {
109    use bytes::{BufMut, Bytes, BytesMut};
110    use risingwave_common::hash::VirtualNode;
111    use risingwave_hummock_sdk::key::{FullKey, TableKey};
112    use risingwave_hummock_sdk::key_range::KeyRange;
113    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
114    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
115
116    use crate::hummock::compaction::picker::vnode_watermark_picker::should_delete_sst_by_watermark;
117
118    #[test]
119    fn test_should_delete_sst_by_watermark() {
120        let table_watermarks = maplit::btreemap! {
121            1.into() => ReadTableWatermark {
122                direction: WatermarkDirection::Ascending,
123                vnode_watermarks: maplit::btreemap! {
124                    VirtualNode::from_index(16) => "some_watermark_key_8".into(),
125                    VirtualNode::from_index(17) => "some_watermark_key_8".into(),
126                },
127            },
128        };
129        let table_key = |vnode_part: usize, key_part: &str| {
130            let mut builder = BytesMut::new();
131            builder.put_slice(&VirtualNode::from_index(vnode_part).to_be_bytes());
132            builder.put_slice(&Bytes::copy_from_slice(key_part.as_bytes()));
133            TableKey(builder.freeze())
134        };
135
136        let sst_info = SstableInfoInner {
137            object_id: 1,
138            sst_id: 1,
139            key_range: KeyRange {
140                left: FullKey::new(2.into(), table_key(16, "some_watermark_key_1"), 0)
141                    .encode()
142                    .into(),
143                right: FullKey::new(2.into(), table_key(16, "some_watermark_key_2"), 0)
144                    .encode()
145                    .into(),
146                right_exclusive: true,
147            },
148            table_ids: vec![2],
149            ..Default::default()
150        }
151        .into();
152        assert!(
153            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
154            "should fail because no matching watermark found"
155        );
156
157        let sst_info = SstableInfoInner {
158            object_id: 1,
159            sst_id: 1,
160            key_range: KeyRange {
161                left: FullKey::new(1.into(), table_key(13, "some_watermark_key_1"), 0)
162                    .encode()
163                    .into(),
164                right: FullKey::new(1.into(), table_key(14, "some_watermark_key_2"), 0)
165                    .encode()
166                    .into(),
167                right_exclusive: true,
168            },
169            table_ids: vec![1],
170            ..Default::default()
171        }
172        .into();
173        assert!(
174            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
175            "should fail because no matching vnode found"
176        );
177
178        let sst_info = SstableInfoInner {
179            object_id: 1,
180            sst_id: 1,
181            key_range: KeyRange {
182                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
183                    .encode()
184                    .into(),
185                right: FullKey::new(1.into(), table_key(17, "some_watermark_key_2"), 0)
186                    .encode()
187                    .into(),
188                right_exclusive: true,
189            },
190            table_ids: vec![1],
191            ..Default::default()
192        }
193        .into();
194        assert!(
195            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
196            "should fail because different vnodes found"
197        );
198
199        let sst_info = SstableInfoInner {
200            object_id: 1,
201            sst_id: 1,
202            key_range: KeyRange {
203                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
204                    .encode()
205                    .into(),
206                right: FullKey::new(1.into(), table_key(16, "some_watermark_key_9"), 0)
207                    .encode()
208                    .into(),
209                right_exclusive: true,
210            },
211            table_ids: vec![1],
212            ..Default::default()
213        }
214        .into();
215        assert!(
216            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
217            "should fail because right key is greater than watermark"
218        );
219
220        let sst_info = SstableInfoInner {
221            object_id: 1,
222            sst_id: 1,
223            key_range: KeyRange {
224                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
225                    .encode()
226                    .into(),
227                right: FullKey::new(1.into(), table_key(16, "some_watermark_key_2"), 0)
228                    .encode()
229                    .into(),
230                right_exclusive: true,
231            },
232            table_ids: vec![1],
233            ..Default::default()
234        }
235        .into();
236        assert!(should_delete_sst_by_watermark(&sst_info, &table_watermarks));
237    }
238}