risingwave_hummock_sdk/
compact.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_pb::hummock::LevelType;
18
19use crate::compact_task::CompactTask;
20use crate::sstable_info::SstableInfo;
21
22pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
23    use std::fmt::Write;
24
25    let mut s = String::default();
26    writeln!(
27        s,
28        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
29        compact_task.task_id,
30        compact_task.compaction_group_id,
31        compact_task.task_type,
32        compact_task.target_level,
33        compact_task.target_sub_level_id,
34        compact_task.target_file_size,
35        compact_task.splits.len(),
36        compact_task.task_status
37    )
38    .unwrap();
39    s.push_str("Output: \n");
40    for sst in &compact_task.sorted_output_ssts {
41        append_sstable_info_to_string(&mut s, sst);
42    }
43    s
44}
45
46pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
47    use std::fmt::Write;
48
49    let mut s = String::new();
50    writeln!(
51        s,
52        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
53        compact_task.task_id,
54        compact_task.compaction_group_id,
55        compact_task.task_type,
56        compact_task.target_level,
57        compact_task.target_sub_level_id,
58        compact_task.target_file_size,
59        compact_task.splits.len(),
60    )
61    .unwrap();
62    s.push_str("Input: \n");
63    let existing_table_ids: HashSet<u32> = compact_task
64        .existing_table_ids
65        .clone()
66        .into_iter()
67        .collect();
68    let mut input_sst_table_ids: HashSet<u32> = HashSet::new();
69    let mut dropped_table_ids = HashSet::new();
70    for level_entry in &compact_task.input_ssts {
71        let tables: Vec<String> = level_entry
72            .table_infos
73            .iter()
74            .map(|table| {
75                for tid in &table.table_ids {
76                    if !existing_table_ids.contains(tid) {
77                        dropped_table_ids.insert(tid);
78                    } else {
79                        input_sst_table_ids.insert(*tid);
80                    }
81                }
82                if table.total_key_count != 0 {
83                    format!(
84                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
85                        table.sst_id,
86                        table.object_id,
87                        table.file_size / 1024,
88                        table.sst_size / 1024,
89                        (table.stale_key_count * 100 / table.total_key_count),
90                    )
91                } else {
92                    format!(
93                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
94                        table.sst_id,
95                        table.object_id,
96                        table.file_size / 1024,
97                        table.sst_size / 1024,
98                    )
99                }
100            })
101            .collect();
102        writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
103    }
104    if !compact_task.table_vnode_partition.is_empty() {
105        writeln!(s, "Table vnode partition info:").unwrap();
106        compact_task
107            .table_vnode_partition
108            .iter()
109            .filter(|t| input_sst_table_ids.contains(t.0))
110            .for_each(|(tid, partition)| {
111                writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
112            });
113    }
114
115    if !dropped_table_ids.is_empty() {
116        writeln!(s, "Dropped table_ids: {:?} ", dropped_table_ids).unwrap();
117    }
118    s
119}
120
121pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
122    use std::fmt::Write;
123
124    let key_range = &sstable_info.key_range;
125    let left_str = if key_range.left.is_empty() {
126        "-inf".to_owned()
127    } else {
128        hex::encode(&key_range.left)
129    };
130    let right_str = if key_range.right.is_empty() {
131        "+inf".to_owned()
132    } else {
133        hex::encode(&key_range.right)
134    };
135
136    let stale_ratio = (sstable_info.stale_key_count * 100)
137        .checked_div(sstable_info.total_key_count)
138        .unwrap_or(0);
139    writeln!(
140        s,
141        "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
142        sstable_info.object_id,
143        sstable_info.sst_id,
144        left_str,
145        right_str,
146        sstable_info.table_ids,
147        sstable_info.file_size / 1024,
148        sstable_info.sst_size / 1024,
149        stale_ratio,
150        sstable_info.bloom_filter_kind,
151    )
152    .unwrap();
153}
154
155pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
156    let mut total_key_count = 0;
157    let mut total_file_count: u64 = 0;
158    let mut total_file_size = 0;
159    let mut total_uncompressed_file_size = 0;
160
161    for level in &task.input_ssts {
162        total_file_count += level.table_infos.len() as u64;
163
164        level.table_infos.iter().for_each(|sst| {
165            total_file_size += sst.file_size;
166            total_uncompressed_file_size += sst.uncompressed_file_size;
167            total_key_count += sst.total_key_count;
168        });
169    }
170
171    CompactTaskStatistics {
172        total_file_count,
173        total_key_count,
174        total_file_size,
175        total_uncompressed_file_size,
176    }
177}
178
179#[derive(Debug)]
180pub struct CompactTaskStatistics {
181    pub total_file_count: u64,
182    pub total_key_count: u64,
183    pub total_file_size: u64,
184    pub total_uncompressed_file_size: u64,
185}
186
187pub fn estimate_memory_for_compact_task(
188    task: &CompactTask,
189    block_size: u64,
190    recv_buffer_size: u64,
191    sst_capacity: u64,
192) -> u64 {
193    let mut result = 0;
194    // When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
195    // to the specified block and build the iterator. Since this operation is concurrent, the memory
196    // usage will need to take into account the size of the SstableMeta.
197    // The common size of SstableMeta in tests is no more than 1m (mainly from xor filters).
198    let mut task_max_sst_meta_ratio = 0;
199
200    // The memory usage of the SstableStreamIterator comes from SstableInfo with some state
201    // information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
202    // and tcp recv_buffer_size.
203    let max_input_stream_estimated_memory = block_size + recv_buffer_size;
204
205    // input
206    for level in &task.input_ssts {
207        if level.level_type == LevelType::Nonoverlapping {
208            let mut cur_level_max_sst_meta_size = 0;
209            for sst in &level.table_infos {
210                let meta_size = sst.file_size - sst.meta_offset;
211                task_max_sst_meta_ratio =
212                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
213                cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
214            }
215            result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
216        } else {
217            for sst in &level.table_infos {
218                let meta_size = sst.file_size - sst.meta_offset;
219                result += max_input_stream_estimated_memory + meta_size;
220                task_max_sst_meta_ratio =
221                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
222            }
223        }
224    }
225
226    // output
227    // builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
228    let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
229
230    // FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader
231    // A more reasonable memory limit method needs to be adopted, this is just a temporary fix.
232    result += estimated_meta_size + sst_capacity;
233
234    result
235}