risingwave_meta/hummock/manager/compaction/
compact_task_builder.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::key_range::KeyRange;
22use risingwave_hummock_sdk::table_watermark::{TableWatermarks, WatermarkSerdeType};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
24use risingwave_pb::hummock::compact_task::TaskStatus;
25use risingwave_pb::hummock::{CompactionConfig, TableOption, TableSchema};
26
27use crate::hummock::compaction::CompactionTask as PickedCompactionTask;
28
29pub(super) struct CompactTaskBuildContext {
30    pub(super) task_id: HummockCompactionTaskId,
31    pub(super) compaction_group_id: CompactionGroupId,
32    pub(super) compaction_group_version_id: u64,
33    pub(super) existing_table_ids: Vec<TableId>,
34    pub(super) table_options: BTreeMap<TableId, TableOption>,
35    pub(super) is_target_level_last: bool,
36    pub(super) compaction_config: Arc<CompactionConfig>,
37    pub(super) current_epoch_time: u64,
38}
39
40pub(super) fn build_base_compact_task(
41    picked_task: PickedCompactionTask,
42    context: CompactTaskBuildContext,
43) -> (CompactTask, Vec<TableId>) {
44    let target_level_id = picked_task.input.target_level as u32;
45    let input_ssts = picked_task.input.input_levels;
46    let compact_table_ids = input_ssts
47        .iter()
48        .flat_map(|level| level.table_infos.iter())
49        .flat_map(|sst| sst.table_ids.iter().copied())
50        .sorted()
51        .dedup()
52        .collect_vec();
53    let compact_table_id_set: HashSet<_> = compact_table_ids.iter().copied().collect();
54    let mut table_options = context.table_options;
55    retain_table_options(&mut table_options, &compact_table_id_set);
56    let compression_algorithm = match picked_task.compression_algorithm.as_str() {
57        "Lz4" => 1,
58        "Zstd" => 2,
59        _ => 0,
60    };
61    let compaction_config = context.compaction_config.as_ref();
62
63    (
64        CompactTask {
65            input_ssts,
66            splits: vec![KeyRange::inf()],
67            sorted_output_ssts: vec![],
68            task_id: context.task_id,
69            target_level: target_level_id,
70            // Only gc delete keys in last level because there may be older versions in lower levels.
71            gc_delete_keys: context.is_target_level_last,
72            base_level: picked_task.base_level as u32,
73            task_status: TaskStatus::Pending,
74            compaction_group_id: context.compaction_group_id,
75            compaction_group_version_id: context.compaction_group_version_id,
76            existing_table_ids: context.existing_table_ids,
77            compression_algorithm,
78            target_file_size: picked_task.target_file_size,
79            table_options,
80            current_epoch_time: context.current_epoch_time,
81            compaction_filter_mask: compaction_config.compaction_filter_mask,
82            target_sub_level_id: picked_task.input.target_sub_level_id,
83            task_type: picked_task.compaction_task_type,
84            split_weight_by_vnode: picked_task.input.vnode_partition_count,
85            max_sub_compaction: compaction_config.max_sub_compaction,
86            max_kv_count_for_xor16: compaction_config.max_kv_count_for_xor16,
87            max_vnode_key_range_bytes: compaction_config.max_vnode_key_range_bytes,
88            ..Default::default()
89        },
90        compact_table_ids,
91    )
92}
93
94pub(super) fn attach_compact_task_table_metadata(
95    compact_task: &mut CompactTask,
96    compact_table_ids: &[TableId],
97    table_watermarks: BTreeMap<TableId, TableWatermarks>,
98    all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
99) {
100    let mut pk_prefix_table_watermarks = BTreeMap::default();
101    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
102    let mut value_table_watermarks = BTreeMap::default();
103    for (table_id, watermark) in table_watermarks {
104        match watermark.watermark_type {
105            WatermarkSerdeType::PkPrefix => {
106                pk_prefix_table_watermarks.insert(table_id, watermark);
107            }
108            WatermarkSerdeType::NonPkPrefix => {
109                non_pk_prefix_table_watermarks.insert(table_id, watermark);
110            }
111            WatermarkSerdeType::Value => {
112                value_table_watermarks.insert(table_id, watermark);
113            }
114        }
115    }
116
117    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
118    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
119    compact_task.value_table_watermarks = value_table_watermarks;
120    compact_task.table_schemas = compact_table_ids
121        .iter()
122        .filter_map(|table_id| {
123            all_versioned_table_schemas.get(table_id).map(|column_ids| {
124                (
125                    *table_id,
126                    TableSchema {
127                        column_ids: column_ids.clone(),
128                    },
129                )
130            })
131        })
132        .collect();
133}
134
135fn retain_table_options(
136    table_options: &mut BTreeMap<TableId, TableOption>,
137    table_id_set: &HashSet<TableId>,
138) {
139    table_options.retain(|table_id, _| table_id_set.contains(table_id));
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn test_retain_table_options() {
148        let mut table_options = BTreeMap::from([
149            (TableId::new(1), TableOption::default()),
150            (TableId::new(2), TableOption::default()),
151            (TableId::new(3), TableOption::default()),
152        ]);
153        let compact_table_id_set = HashSet::from_iter([TableId::new(1), TableId::new(3)]);
154
155        retain_table_options(&mut table_options, &compact_table_id_set);
156
157        assert_eq!(
158            table_options.keys().copied().collect::<Vec<_>>(),
159            vec![TableId::new(1), TableId::new(3)]
160        );
161    }
162}