risingwave_meta/hummock/manager/compaction/
compact_task_builder.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::Arc;
17
18use itertools::Itertools;
19use risingwave_common::catalog::TableId;
20use risingwave_hummock_sdk::compact_task::CompactTask;
21use risingwave_hummock_sdk::key_range::KeyRange;
22use risingwave_hummock_sdk::table_watermark::{TableWatermarks, WatermarkSerdeType};
23use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
24use risingwave_pb::hummock::compact_task::TaskStatus;
25use risingwave_pb::hummock::{CompactionConfig, TableOption, TableSchema};
26
27use crate::hummock::compaction::CompactionTask as PickedCompactionTask;
28
29pub(super) struct CompactTaskBuildContext {
30    pub(super) task_id: HummockCompactionTaskId,
31    pub(super) compaction_group_id: CompactionGroupId,
32    pub(super) compaction_group_version_id: u64,
33    pub(super) existing_table_ids: Vec<TableId>,
34    pub(super) table_options: BTreeMap<TableId, TableOption>,
35    pub(super) is_target_level_last: bool,
36    pub(super) compaction_config: Arc<CompactionConfig>,
37    pub(super) current_epoch_time: u64,
38}
39
40pub(super) fn build_base_compact_task(
41    picked_task: PickedCompactionTask,
42    context: CompactTaskBuildContext,
43) -> (CompactTask, Vec<TableId>) {
44    let target_level_id = picked_task.input.target_level as u32;
45    let input_ssts = picked_task.input.input_levels;
46    let compact_table_ids = input_ssts
47        .iter()
48        .flat_map(|level| level.table_infos.iter())
49        .flat_map(|sst| sst.table_ids.iter().copied())
50        .sorted()
51        .dedup()
52        .collect_vec();
53    let compact_table_id_set: HashSet<_> = compact_table_ids.iter().copied().collect();
54    let mut table_options = context.table_options;
55    retain_table_options(&mut table_options, &compact_table_id_set);
56    let compression_algorithm = match picked_task.compression_algorithm.as_str() {
57        "Lz4" => 1,
58        "Zstd" => 2,
59        _ => 0,
60    };
61    let compaction_config = context.compaction_config.as_ref();
62
63    (
64        CompactTask {
65            input_ssts,
66            splits: vec![KeyRange::inf()],
67            sorted_output_ssts: vec![],
68            task_id: context.task_id,
69            target_level: target_level_id,
70            // Only gc delete keys in last level because there may be older versions in lower levels.
71            gc_delete_keys: context.is_target_level_last,
72            base_level: picked_task.base_level as u32,
73            task_status: TaskStatus::Pending,
74            compaction_group_id: context.compaction_group_id,
75            compaction_group_version_id: context.compaction_group_version_id,
76            existing_table_ids: context.existing_table_ids,
77            compression_algorithm,
78            target_file_size: picked_task.target_file_size,
79            table_options,
80            current_epoch_time: context.current_epoch_time,
81            compaction_filter_mask: compaction_config.compaction_filter_mask,
82            target_sub_level_id: picked_task.input.target_sub_level_id,
83            task_type: picked_task.compaction_task_type,
84            split_weight_by_vnode: picked_task.input.vnode_partition_count,
85            max_sub_compaction: compaction_config.max_sub_compaction,
86            blocked_xor_filter_kv_count_threshold: compaction_config.max_kv_count_for_xor16,
87            max_vnode_key_range_bytes: compaction_config.max_vnode_key_range_bytes,
88            sstable_filter_kind: picked_task.sstable_filter_kind,
89            sstable_filter_layout: picked_task.sstable_filter_layout,
90            ..Default::default()
91        },
92        compact_table_ids,
93    )
94}
95
96pub(super) fn attach_compact_task_table_metadata(
97    compact_task: &mut CompactTask,
98    compact_table_ids: &[TableId],
99    table_watermarks: BTreeMap<TableId, TableWatermarks>,
100    all_versioned_table_schemas: &HashMap<TableId, Vec<i32>>,
101) {
102    let mut pk_prefix_table_watermarks = BTreeMap::default();
103    let mut non_pk_prefix_table_watermarks = BTreeMap::default();
104    let mut value_table_watermarks = BTreeMap::default();
105    for (table_id, watermark) in table_watermarks {
106        match watermark.watermark_type {
107            WatermarkSerdeType::PkPrefix => {
108                pk_prefix_table_watermarks.insert(table_id, watermark);
109            }
110            WatermarkSerdeType::NonPkPrefix => {
111                non_pk_prefix_table_watermarks.insert(table_id, watermark);
112            }
113            WatermarkSerdeType::Value => {
114                value_table_watermarks.insert(table_id, watermark);
115            }
116        }
117    }
118
119    compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
120    compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
121    compact_task.value_table_watermarks = value_table_watermarks;
122    compact_task.table_schemas = compact_table_ids
123        .iter()
124        .filter_map(|table_id| {
125            all_versioned_table_schemas.get(table_id).map(|column_ids| {
126                (
127                    *table_id,
128                    TableSchema {
129                        column_ids: column_ids.clone(),
130                    },
131                )
132            })
133        })
134        .collect();
135}
136
137fn retain_table_options(
138    table_options: &mut BTreeMap<TableId, TableOption>,
139    table_id_set: &HashSet<TableId>,
140) {
141    table_options.retain(|table_id, _| table_id_set.contains(table_id));
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn test_retain_table_options() {
150        let mut table_options = BTreeMap::from([
151            (TableId::new(1), TableOption::default()),
152            (TableId::new(2), TableOption::default()),
153            (TableId::new(3), TableOption::default()),
154        ]);
155        let compact_table_id_set = HashSet::from_iter([TableId::new(1), TableId::new(3)]);
156
157        retain_table_options(&mut table_options, &compact_table_id_set);
158
159        assert_eq!(
160            table_options.keys().copied().collect::<Vec<_>>(),
161            vec![TableId::new(1), TableId::new(3)]
162        );
163    }
164}