risingwave_hummock_sdk/
compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24    use std::fmt::Write;
25
26    let mut s = String::default();
27    writeln!(
28        s,
29        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30        compact_task.task_id,
31        compact_task.compaction_group_id,
32        compact_task.task_type,
33        compact_task.target_level,
34        compact_task.target_sub_level_id,
35        compact_task.target_file_size,
36        compact_task.splits.len(),
37        compact_task.task_status
38    )
39    .unwrap();
40    s.push_str("Output: \n");
41    for sst in &compact_task.sorted_output_ssts {
42        append_sstable_info_to_string(&mut s, sst);
43    }
44    s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48    use std::fmt::Write;
49
50    let mut s = String::new();
51    writeln!(
52        s,
53        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54        compact_task.task_id,
55        compact_task.compaction_group_id,
56        compact_task.task_type,
57        compact_task.target_level,
58        compact_task.target_sub_level_id,
59        compact_task.target_file_size,
60        compact_task.splits.len(),
61    )
62    .unwrap();
63    s.push_str("Input: \n");
64    let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
65    let mut dropped_only_sst_count = 0;
66    for level_entry in &compact_task.input_ssts {
67        let tables: Vec<String> = level_entry
68            .table_infos
69            .iter()
70            .map(|table| {
71                if table.table_ids.is_empty() {
72                    dropped_only_sst_count += 1;
73                } else {
74                    input_sst_table_ids.extend(table.table_ids.iter().copied());
75                }
76                if table.total_key_count != 0 {
77                    format!(
78                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
79                        table.sst_id,
80                        table.object_id,
81                        table.file_size / 1024,
82                        table.sst_size / 1024,
83                        (table.stale_key_count * 100 / table.total_key_count),
84                    )
85                } else {
86                    format!(
87                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
88                        table.sst_id,
89                        table.object_id,
90                        table.file_size / 1024,
91                        table.sst_size / 1024,
92                    )
93                }
94            })
95            .collect();
96        writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
97    }
98    if !compact_task.table_vnode_partition.is_empty() {
99        writeln!(s, "Table vnode partition info:").unwrap();
100        compact_task
101            .table_vnode_partition
102            .iter()
103            .filter(|t| input_sst_table_ids.contains(t.0))
104            .for_each(|(tid, partition)| {
105                writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
106            });
107    }
108
109    if dropped_only_sst_count > 0 {
110        writeln!(
111            s,
112            "Dropped-only input SST count: {:?} ",
113            dropped_only_sst_count
114        )
115        .unwrap();
116    }
117    s
118}
119
120pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
121    use std::fmt::Write;
122
123    let key_range = &sstable_info.key_range;
124    let left_str = if key_range.left.is_empty() {
125        "-inf".to_owned()
126    } else {
127        hex::encode(&key_range.left)
128    };
129    let right_str = if key_range.right.is_empty() {
130        "+inf".to_owned()
131    } else {
132        hex::encode(&key_range.right)
133    };
134
135    let stale_ratio = (sstable_info.stale_key_count * 100)
136        .checked_div(sstable_info.total_key_count)
137        .unwrap_or(0);
138    writeln!(
139        s,
140        "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
141        sstable_info.object_id,
142        sstable_info.sst_id,
143        left_str,
144        right_str,
145        sstable_info.table_ids,
146        sstable_info.file_size / 1024,
147        sstable_info.sst_size / 1024,
148        stale_ratio,
149        sstable_info.bloom_filter_kind,
150    )
151    .unwrap();
152}
153
154pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
155    let mut total_key_count = 0;
156    let mut total_file_count: u64 = 0;
157    let mut total_file_size = 0;
158    let mut total_uncompressed_file_size = 0;
159
160    for sst in task.read_input_ssts() {
161        total_file_count += 1;
162        total_file_size += sst.file_size;
163        total_uncompressed_file_size += sst.uncompressed_file_size;
164        total_key_count += sst.total_key_count;
165    }
166
167    CompactTaskStatistics {
168        total_file_count,
169        total_key_count,
170        total_file_size,
171        total_uncompressed_file_size,
172    }
173}
174
175#[derive(Debug)]
176pub struct CompactTaskStatistics {
177    pub total_file_count: u64,
178    pub total_key_count: u64,
179    pub total_file_size: u64,
180    pub total_uncompressed_file_size: u64,
181}
182
183pub fn estimate_memory_for_compact_task(
184    task: &CompactTask,
185    block_size: u64,
186    recv_buffer_size: u64,
187    sst_capacity: u64,
188) -> u64 {
189    let mut result = 0;
190    // When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
191    // to the specified block and build the iterator. Since this operation is concurrent, the memory
192    // usage will need to take into account the size of the SstableMeta.
193    // The common size of SstableMeta in tests is no more than 1m (mainly from xor filters).
194    let mut task_max_sst_meta_ratio = 0;
195
196    // The memory usage of the SstableStreamIterator comes from SstableInfo with some state
197    // information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
198    // and tcp recv_buffer_size.
199    let max_input_stream_estimated_memory = block_size + recv_buffer_size;
200
201    // input
202    for level in &task.input_ssts {
203        if level.level_type == LevelType::Nonoverlapping {
204            let mut cur_level_max_sst_meta_size = 0;
205            for sst in level.read_sstable_infos() {
206                let meta_size = sst.file_size - sst.meta_offset;
207                task_max_sst_meta_ratio =
208                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
209                cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
210            }
211            if cur_level_max_sst_meta_size > 0 {
212                result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
213            }
214        } else {
215            for sst in level.read_sstable_infos() {
216                let meta_size = sst.file_size - sst.meta_offset;
217                result += max_input_stream_estimated_memory + meta_size;
218                task_max_sst_meta_ratio =
219                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
220            }
221        }
222    }
223
224    // output
225    // builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
226    let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
227
228    // FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader
229    // A more reasonable memory limit method needs to be adopted, this is just a temporary fix.
230    result += estimated_meta_size + sst_capacity;
231
232    result
233}