risingwave_hummock_sdk/
compact.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24    use std::fmt::Write;
25
26    let mut s = String::default();
27    writeln!(
28        s,
29        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30        compact_task.task_id,
31        compact_task.compaction_group_id,
32        compact_task.task_type,
33        compact_task.target_level,
34        compact_task.target_sub_level_id,
35        compact_task.target_file_size,
36        compact_task.splits.len(),
37        compact_task.task_status
38    )
39    .unwrap();
40    s.push_str("Output: \n");
41    for sst in &compact_task.sorted_output_ssts {
42        append_sstable_info_to_string(&mut s, sst);
43    }
44    s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48    use std::fmt::Write;
49
50    let mut s = String::new();
51    writeln!(
52        s,
53        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54        compact_task.task_id,
55        compact_task.compaction_group_id,
56        compact_task.task_type,
57        compact_task.target_level,
58        compact_task.target_sub_level_id,
59        compact_task.target_file_size,
60        compact_task.splits.len(),
61    )
62    .unwrap();
63    s.push_str("Input: \n");
64    let existing_table_ids: HashSet<TableId> =
65        compact_task.existing_table_ids.iter().copied().collect();
66    let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
67    let mut dropped_table_ids = HashSet::new();
68    for level_entry in &compact_task.input_ssts {
69        let tables: Vec<String> = level_entry
70            .table_infos
71            .iter()
72            .map(|table| {
73                for tid in &table.table_ids {
74                    if !existing_table_ids.contains(tid) {
75                        dropped_table_ids.insert(tid);
76                    } else {
77                        input_sst_table_ids.insert(*tid);
78                    }
79                }
80                if table.total_key_count != 0 {
81                    format!(
82                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
83                        table.sst_id,
84                        table.object_id,
85                        table.file_size / 1024,
86                        table.sst_size / 1024,
87                        (table.stale_key_count * 100 / table.total_key_count),
88                    )
89                } else {
90                    format!(
91                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
92                        table.sst_id,
93                        table.object_id,
94                        table.file_size / 1024,
95                        table.sst_size / 1024,
96                    )
97                }
98            })
99            .collect();
100        writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
101    }
102    if !compact_task.table_vnode_partition.is_empty() {
103        writeln!(s, "Table vnode partition info:").unwrap();
104        compact_task
105            .table_vnode_partition
106            .iter()
107            .filter(|t| input_sst_table_ids.contains(t.0))
108            .for_each(|(tid, partition)| {
109                writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
110            });
111    }
112
113    if !dropped_table_ids.is_empty() {
114        writeln!(s, "Dropped table_ids: {:?} ", dropped_table_ids).unwrap();
115    }
116    s
117}
118
119pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
120    use std::fmt::Write;
121
122    let key_range = &sstable_info.key_range;
123    let left_str = if key_range.left.is_empty() {
124        "-inf".to_owned()
125    } else {
126        hex::encode(&key_range.left)
127    };
128    let right_str = if key_range.right.is_empty() {
129        "+inf".to_owned()
130    } else {
131        hex::encode(&key_range.right)
132    };
133
134    let stale_ratio = (sstable_info.stale_key_count * 100)
135        .checked_div(sstable_info.total_key_count)
136        .unwrap_or(0);
137    writeln!(
138        s,
139        "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
140        sstable_info.object_id,
141        sstable_info.sst_id,
142        left_str,
143        right_str,
144        sstable_info.table_ids,
145        sstable_info.file_size / 1024,
146        sstable_info.sst_size / 1024,
147        stale_ratio,
148        sstable_info.bloom_filter_kind,
149    )
150    .unwrap();
151}
152
153pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
154    let mut total_key_count = 0;
155    let mut total_file_count: u64 = 0;
156    let mut total_file_size = 0;
157    let mut total_uncompressed_file_size = 0;
158
159    for level in &task.input_ssts {
160        total_file_count += level.table_infos.len() as u64;
161
162        level.table_infos.iter().for_each(|sst| {
163            total_file_size += sst.file_size;
164            total_uncompressed_file_size += sst.uncompressed_file_size;
165            total_key_count += sst.total_key_count;
166        });
167    }
168
169    CompactTaskStatistics {
170        total_file_count,
171        total_key_count,
172        total_file_size,
173        total_uncompressed_file_size,
174    }
175}
176
177#[derive(Debug)]
178pub struct CompactTaskStatistics {
179    pub total_file_count: u64,
180    pub total_key_count: u64,
181    pub total_file_size: u64,
182    pub total_uncompressed_file_size: u64,
183}
184
185pub fn estimate_memory_for_compact_task(
186    task: &CompactTask,
187    block_size: u64,
188    recv_buffer_size: u64,
189    sst_capacity: u64,
190) -> u64 {
191    let mut result = 0;
192    // When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
193    // to the specified block and build the iterator. Since this operation is concurrent, the memory
194    // usage will need to take into account the size of the SstableMeta.
195    // The common size of SstableMeta in tests is no more than 1m (mainly from xor filters).
196    let mut task_max_sst_meta_ratio = 0;
197
198    // The memory usage of the SstableStreamIterator comes from SstableInfo with some state
199    // information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
200    // and tcp recv_buffer_size.
201    let max_input_stream_estimated_memory = block_size + recv_buffer_size;
202
203    // input
204    for level in &task.input_ssts {
205        if level.level_type == LevelType::Nonoverlapping {
206            let mut cur_level_max_sst_meta_size = 0;
207            for sst in &level.table_infos {
208                let meta_size = sst.file_size - sst.meta_offset;
209                task_max_sst_meta_ratio =
210                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
211                cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
212            }
213            result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
214        } else {
215            for sst in &level.table_infos {
216                let meta_size = sst.file_size - sst.meta_offset;
217                result += max_input_stream_estimated_memory + meta_size;
218                task_max_sst_meta_ratio =
219                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
220            }
221        }
222    }
223
224    // output
225    // builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
226    let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
227
228    // FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader
229    // A more reasonable memory limit method needs to be adopted, this is just a temporary fix.
230    result += estimated_meta_size + sst_capacity;
231
232    result
233}