risingwave_meta/hummock/compaction/selector/
vnode_watermark_selector.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockCompactionTaskId;
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
21    safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
22};
23use risingwave_hummock_sdk::table_watermark::{
24    ReadTableWatermark, TableWatermarks, WatermarkSerdeType,
25};
26use risingwave_pb::hummock::compact_task::TaskType;
27
28use crate::hummock::compaction::picker::VnodeWatermarkCompactionPicker;
29use crate::hummock::compaction::selector::{CompactionSelectorContext, DynamicLevelSelectorCore};
30use crate::hummock::compaction::{CompactionSelector, CompactionTask, create_compaction_task};
31#[derive(Default)]
32pub struct VnodeWatermarkCompactionSelector {}
33
34impl CompactionSelector for VnodeWatermarkCompactionSelector {
35    fn pick_compaction(
36        &mut self,
37        task_id: HummockCompactionTaskId,
38        context: CompactionSelectorContext<'_>,
39    ) -> Option<CompactionTask> {
40        let CompactionSelectorContext {
41            group,
42            levels,
43            level_handlers,
44            developer_config,
45            table_watermarks,
46            state_table_info: _,
47            member_table_ids,
48            ..
49        } = context;
50        let dynamic_level_core =
51            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
52        let ctx = dynamic_level_core.calculate_level_base_size(levels);
53        let mut picker = VnodeWatermarkCompactionPicker::new();
54        let pk_table_watermarks =
55            safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
56        let compaction_input =
57            picker.pick_compaction(levels, level_handlers, &pk_table_watermarks)?;
58        compaction_input.add_pending_task(task_id, level_handlers);
59        Some(create_compaction_task(
60            dynamic_level_core.get_config(),
61            compaction_input,
62            ctx.base_level,
63            self.task_type(),
64        ))
65    }
66
67    fn name(&self) -> &'static str {
68        "VnodeWatermarkCompaction"
69    }
70
71    fn task_type(&self) -> TaskType {
72        TaskType::VnodeWatermark
73    }
74}
75
76fn safe_epoch_read_table_watermarks(
77    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
78    member_table_ids: &BTreeSet<TableId>,
79) -> BTreeMap<TableId, ReadTableWatermark> {
80    safe_epoch_read_table_watermarks_impl(
81        safe_epoch_table_watermarks_impl(
82            table_watermarks,
83            &member_table_ids
84                .iter()
85                .map(TableId::table_id)
86                .collect::<Vec<_>>(),
87        )
88        .into_iter()
89        .filter(|(_table_id, table_watermarks)| {
90            {
91                matches!(
92                    table_watermarks.watermark_type,
93                    WatermarkSerdeType::PkPrefix
94                )
95            }
96        })
97        .collect(),
98    )
99}