Skip to main content

risingwave_meta/hummock/compaction/selector/
vnode_watermark_selector.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16use std::sync::Arc;
17
18use risingwave_common::catalog::TableId;
19use risingwave_hummock_sdk::HummockCompactionTaskId;
20use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
21    safe_epoch_read_table_watermarks_impl, safe_epoch_table_watermarks_impl,
22};
23use risingwave_hummock_sdk::table_watermark::{
24    ReadTableWatermark, TableWatermarks, WatermarkSerdeType,
25};
26use risingwave_pb::hummock::compact_task::TaskType;
27
28use crate::hummock::compaction::picker::VnodeWatermarkCompactionPicker;
29use crate::hummock::compaction::selector::{CompactionSelectorContext, DynamicLevelSelectorCore};
30use crate::hummock::compaction::{CompactionSelector, CompactionTask, create_compaction_task};
31#[derive(Default)]
32pub struct VnodeWatermarkCompactionSelector {}
33
34impl CompactionSelector for VnodeWatermarkCompactionSelector {
35    fn pick_compaction(
36        &mut self,
37        task_id: HummockCompactionTaskId,
38        context: CompactionSelectorContext<'_>,
39    ) -> Option<CompactionTask> {
40        let CompactionSelectorContext {
41            group,
42            levels,
43            level_handlers,
44            developer_config,
45            table_watermarks,
46            state_table_info: _,
47            member_table_ids,
48            in_progress_compactions,
49            ..
50        } = context;
51        let dynamic_level_core =
52            DynamicLevelSelectorCore::new(group.compaction_config.clone(), developer_config);
53        let ctx = dynamic_level_core.calculate_level_base_size(levels);
54        let mut picker = VnodeWatermarkCompactionPicker::new();
55        let pk_table_watermarks =
56            safe_epoch_read_table_watermarks(table_watermarks, member_table_ids);
57        let compaction_input =
58            picker.pick_compaction(levels, level_handlers, &pk_table_watermarks)?;
59        if !compaction_input.skip_target_range_conflict_check
60            && in_progress_compactions.has_conflict_with_input(&compaction_input)
61        {
62            return None;
63        }
64        compaction_input.add_pending_task(task_id, level_handlers);
65        Some(create_compaction_task(
66            dynamic_level_core.get_config(),
67            compaction_input,
68            ctx.base_level,
69            self.task_type(),
70        ))
71    }
72
73    fn name(&self) -> &'static str {
74        "VnodeWatermarkCompaction"
75    }
76
77    fn task_type(&self) -> TaskType {
78        TaskType::VnodeWatermark
79    }
80}
81
82fn safe_epoch_read_table_watermarks(
83    table_watermarks: &HashMap<TableId, Arc<TableWatermarks>>,
84    member_table_ids: &BTreeSet<TableId>,
85) -> BTreeMap<TableId, ReadTableWatermark> {
86    safe_epoch_read_table_watermarks_impl(
87        safe_epoch_table_watermarks_impl(
88            table_watermarks,
89            &member_table_ids.iter().copied().collect::<Vec<_>>(),
90        )
91        .into_iter()
92        .filter(|(_table_id, table_watermarks)| {
93            {
94                matches!(
95                    table_watermarks.watermark_type,
96                    WatermarkSerdeType::PkPrefix
97                )
98            }
99        })
100        .collect(),
101    )
102}