Skip to main content

risingwave_meta/hummock/compaction/picker/
vnode_watermark_picker.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_common::catalog::TableId;
18use risingwave_hummock_sdk::key::{FullKey, TableKey};
19use risingwave_hummock_sdk::level::{InputLevel, Levels};
20use risingwave_hummock_sdk::sstable_info::SstableInfo;
21use risingwave_hummock_sdk::table_watermark::ReadTableWatermark;
22
23use crate::hummock::compaction::picker::CompactionInput;
24use crate::hummock::level_handler::LevelHandler;
25
26pub struct VnodeWatermarkCompactionPicker {}
27
28impl VnodeWatermarkCompactionPicker {
29    pub fn new() -> Self {
30        Self {}
31    }
32
33    /// The current implementation only picks trivial reclaim task for the bottommost level.
34    /// Must modify `is_trivial_reclaim`, if non-trivial reclaim is supported in the future.
35    pub fn pick_compaction(
36        &mut self,
37        levels: &Levels,
38        level_handlers: &[LevelHandler],
39        table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
40    ) -> Option<CompactionInput> {
41        let level = levels.levels.last()?;
42        let mut select_input_ssts = vec![];
43        for sst_info in &level.table_infos {
44            if !level_handlers[level.level_idx as usize].is_pending_compact(&sst_info.sst_id)
45                && should_delete_sst_by_watermark(sst_info, table_watermarks)
46            {
47                select_input_ssts.push(sst_info.clone());
48            }
49        }
50        if select_input_ssts.is_empty() {
51            return None;
52        }
53        Some(CompactionInput {
54            select_input_size: select_input_ssts.iter().map(|sst| sst.sst_size).sum(),
55            total_file_count: select_input_ssts.len() as u64,
56            input_levels: vec![
57                InputLevel {
58                    level_idx: level.level_idx,
59                    level_type: level.level_type,
60                    table_infos: select_input_ssts,
61                },
62                InputLevel {
63                    level_idx: level.level_idx,
64                    level_type: level.level_type,
65                    table_infos: vec![],
66                },
67            ],
68            target_level: level.level_idx as usize,
69            target_sub_level_id: level.sub_level_id,
70            skip_target_range_conflict_check: true,
71            ..Default::default()
72        })
73    }
74}
75
76fn should_delete_sst_by_watermark(
77    sst_info: &SstableInfo,
78    table_watermarks: &BTreeMap<TableId, ReadTableWatermark>,
79) -> bool {
80    // Both table id and vnode must be identical for both the left and right keys in a SST.
81    // As more data is written to the bottommost level, they will eventually become identical.
82    let left_key = FullKey::decode(&sst_info.key_range.left);
83    let right_key = FullKey::decode(&sst_info.key_range.right);
84    if left_key.user_key.table_id != right_key.user_key.table_id {
85        return false;
86    }
87    if left_key.user_key.table_key.vnode_part() != right_key.user_key.table_key.vnode_part() {
88        return false;
89    }
90    let Some(watermarks) = table_watermarks.get(&left_key.user_key.table_id) else {
91        return false;
92    };
93    should_delete_key_by_watermark(&left_key.user_key.table_key, watermarks)
94        && should_delete_key_by_watermark(&right_key.user_key.table_key, watermarks)
95}
96
97fn should_delete_key_by_watermark(
98    table_key: &TableKey<&[u8]>,
99    watermark: &ReadTableWatermark,
100) -> bool {
101    let (vnode, key) = table_key.split_vnode();
102    let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
103        return false;
104    };
105    watermark.direction.key_filter_by_watermark(key, w)
106}
107
108#[cfg(test)]
109mod tests {
110    use bytes::{BufMut, Bytes, BytesMut};
111    use risingwave_common::hash::VirtualNode;
112    use risingwave_hummock_sdk::key::{FullKey, TableKey};
113    use risingwave_hummock_sdk::key_range::KeyRange;
114    use risingwave_hummock_sdk::level::{Level, Levels};
115    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
116    use risingwave_hummock_sdk::table_watermark::{ReadTableWatermark, WatermarkDirection};
117    use risingwave_pb::hummock::LevelType;
118
119    use crate::hummock::compaction::picker::vnode_watermark_picker::{
120        VnodeWatermarkCompactionPicker, should_delete_sst_by_watermark,
121    };
122    use crate::hummock::level_handler::LevelHandler;
123
124    fn table_key(vnode_part: usize, key_part: &str) -> TableKey<Bytes> {
125        let mut builder = BytesMut::new();
126        builder.put_slice(&VirtualNode::from_index(vnode_part).to_be_bytes());
127        builder.put_slice(&Bytes::copy_from_slice(key_part.as_bytes()));
128        TableKey(builder.freeze())
129    }
130
131    #[test]
132    fn test_should_delete_sst_by_watermark() {
133        let table_watermarks = maplit::btreemap! {
134            1.into() => ReadTableWatermark {
135                direction: WatermarkDirection::Ascending,
136                vnode_watermarks: maplit::btreemap! {
137                    VirtualNode::from_index(16) => "some_watermark_key_8".into(),
138                    VirtualNode::from_index(17) => "some_watermark_key_8".into(),
139                },
140            },
141        };
142
143        let sst_info = SstableInfoInner {
144            object_id: 1.into(),
145            sst_id: 1.into(),
146            key_range: KeyRange {
147                left: FullKey::new(2.into(), table_key(16, "some_watermark_key_1"), 0)
148                    .encode()
149                    .into(),
150                right: FullKey::new(2.into(), table_key(16, "some_watermark_key_2"), 0)
151                    .encode()
152                    .into(),
153                right_exclusive: true,
154            },
155            table_ids: vec![2.into()],
156            ..Default::default()
157        }
158        .into();
159        assert!(
160            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
161            "should fail because no matching watermark found"
162        );
163
164        let sst_info = SstableInfoInner {
165            object_id: 1.into(),
166            sst_id: 1.into(),
167            key_range: KeyRange {
168                left: FullKey::new(1.into(), table_key(13, "some_watermark_key_1"), 0)
169                    .encode()
170                    .into(),
171                right: FullKey::new(1.into(), table_key(14, "some_watermark_key_2"), 0)
172                    .encode()
173                    .into(),
174                right_exclusive: true,
175            },
176            table_ids: vec![1.into()],
177            ..Default::default()
178        }
179        .into();
180        assert!(
181            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
182            "should fail because no matching vnode found"
183        );
184
185        let sst_info = SstableInfoInner {
186            object_id: 1.into(),
187            sst_id: 1.into(),
188            key_range: KeyRange {
189                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
190                    .encode()
191                    .into(),
192                right: FullKey::new(1.into(), table_key(17, "some_watermark_key_2"), 0)
193                    .encode()
194                    .into(),
195                right_exclusive: true,
196            },
197            table_ids: vec![1.into()],
198            ..Default::default()
199        }
200        .into();
201        assert!(
202            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
203            "should fail because different vnodes found"
204        );
205
206        let sst_info = SstableInfoInner {
207            object_id: 1.into(),
208            sst_id: 1.into(),
209            key_range: KeyRange {
210                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
211                    .encode()
212                    .into(),
213                right: FullKey::new(1.into(), table_key(16, "some_watermark_key_9"), 0)
214                    .encode()
215                    .into(),
216                right_exclusive: true,
217            },
218            table_ids: vec![1.into()],
219            ..Default::default()
220        }
221        .into();
222        assert!(
223            !should_delete_sst_by_watermark(&sst_info, &table_watermarks),
224            "should fail because right key is greater than watermark"
225        );
226
227        let sst_info = SstableInfoInner {
228            object_id: 1.into(),
229            sst_id: 1.into(),
230            key_range: KeyRange {
231                left: FullKey::new(1.into(), table_key(16, "some_watermark_key_1"), 0)
232                    .encode()
233                    .into(),
234                right: FullKey::new(1.into(), table_key(16, "some_watermark_key_2"), 0)
235                    .encode()
236                    .into(),
237                right_exclusive: true,
238            },
239            table_ids: vec![1.into()],
240            ..Default::default()
241        }
242        .into();
243        assert!(should_delete_sst_by_watermark(&sst_info, &table_watermarks));
244
245        let levels = Levels {
246            levels: vec![Level {
247                level_idx: 1,
248                level_type: LevelType::Nonoverlapping,
249                table_infos: vec![sst_info],
250                ..Default::default()
251            }],
252            ..Default::default()
253        };
254        let level_handlers = vec![LevelHandler::new(0), LevelHandler::new(1)];
255
256        let input = VnodeWatermarkCompactionPicker::new()
257            .pick_compaction(&levels, &level_handlers, &table_watermarks)
258            .unwrap();
259
260        assert!(input.skip_target_range_conflict_check);
261    }
262}