Skip to main content

risingwave_hummock_sdk/
compact.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24    use std::fmt::Write;
25
26    let mut s = String::default();
27    writeln!(
28        s,
29        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30        compact_task.task_id,
31        compact_task.compaction_group_id,
32        compact_task.task_type,
33        compact_task.target_level,
34        compact_task.target_sub_level_id,
35        compact_task.target_file_size,
36        compact_task.splits.len(),
37        compact_task.task_status
38    )
39    .unwrap();
40    s.push_str("Output: \n");
41    for sst in &compact_task.sorted_output_ssts {
42        append_sstable_info_to_string(&mut s, sst);
43    }
44    s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48    use std::fmt::Write;
49
50    let mut s = String::new();
51    writeln!(
52        s,
53        "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54        compact_task.task_id,
55        compact_task.compaction_group_id,
56        compact_task.task_type,
57        compact_task.target_level,
58        compact_task.target_sub_level_id,
59        compact_task.target_file_size,
60        compact_task.splits.len(),
61    )
62    .unwrap();
63    s.push_str("Input: \n");
64    let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
65    let mut dropped_only_sst_count = 0;
66    for level_entry in &compact_task.input_ssts {
67        let tables: Vec<String> = level_entry
68            .table_infos
69            .iter()
70            .map(|table| {
71                if table.table_ids.is_empty() {
72                    dropped_only_sst_count += 1;
73                } else {
74                    input_sst_table_ids.extend(table.table_ids.iter().copied());
75                }
76                if let Some(stale_ratio) =
77                    (table.stale_key_count * 100).checked_div(table.total_key_count)
78                {
79                    format!(
80                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
81                        table.sst_id,
82                        table.object_id,
83                        table.file_size / 1024,
84                        table.sst_size / 1024,
85                        stale_ratio,
86                    )
87                } else {
88                    format!(
89                        "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
90                        table.sst_id,
91                        table.object_id,
92                        table.file_size / 1024,
93                        table.sst_size / 1024,
94                    )
95                }
96            })
97            .collect();
98        writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
99    }
100    if !compact_task.table_vnode_partition.is_empty() {
101        writeln!(s, "Table vnode partition info:").unwrap();
102        compact_task
103            .table_vnode_partition
104            .iter()
105            .filter(|t| input_sst_table_ids.contains(t.0))
106            .for_each(|(tid, partition)| {
107                writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
108            });
109    }
110
111    if dropped_only_sst_count > 0 {
112        writeln!(
113            s,
114            "Dropped-only input SST count: {:?} ",
115            dropped_only_sst_count
116        )
117        .unwrap();
118    }
119    s
120}
121
122pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
123    use std::fmt::Write;
124
125    let key_range = &sstable_info.key_range;
126    let left_str = if key_range.left.is_empty() {
127        "-inf".to_owned()
128    } else {
129        hex::encode(&key_range.left)
130    };
131    let right_str = if key_range.right.is_empty() {
132        "+inf".to_owned()
133    } else {
134        hex::encode(&key_range.right)
135    };
136
137    let stale_ratio = (sstable_info.stale_key_count * 100)
138        .checked_div(sstable_info.total_key_count)
139        .unwrap_or(0);
140    writeln!(
141        s,
142        "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, filter_type {:?}, filter_layout {:?}",
143        sstable_info.object_id,
144        sstable_info.sst_id,
145        left_str,
146        right_str,
147        sstable_info.table_ids,
148        sstable_info.file_size / 1024,
149        sstable_info.sst_size / 1024,
150        stale_ratio,
151        sstable_info.filter_type,
152        sstable_info.filter_layout,
153    )
154    .unwrap();
155}
156
157pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
158    let mut total_key_count = 0;
159    let mut total_file_count: u64 = 0;
160    let mut total_file_size = 0;
161    let mut total_uncompressed_file_size = 0;
162
163    for sst in task.read_input_ssts() {
164        total_file_count += 1;
165        total_file_size += sst.file_size;
166        total_uncompressed_file_size += sst.uncompressed_file_size;
167        total_key_count += sst.total_key_count;
168    }
169
170    CompactTaskStatistics {
171        total_file_count,
172        total_key_count,
173        total_file_size,
174        total_uncompressed_file_size,
175    }
176}
177
178#[derive(Debug)]
179pub struct CompactTaskStatistics {
180    pub total_file_count: u64,
181    pub total_key_count: u64,
182    pub total_file_size: u64,
183    pub total_uncompressed_file_size: u64,
184}
185
186pub fn estimate_memory_for_compact_task(
187    task: &CompactTask,
188    block_size: u64,
189    recv_buffer_size: u64,
190    sst_capacity: u64,
191) -> u64 {
192    let mut result = 0;
193    // When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
194    // to the specified block and build the iterator. Since this operation is concurrent, the memory
195    // usage will need to take into account the size of the SstableMeta.
196    // The common size of SstableMeta in tests is no more than 1m (mainly from xor filters).
197    let mut task_max_sst_meta_ratio = 0;
198
199    // The memory usage of the SstableStreamIterator comes from SstableInfo with some state
200    // information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
201    // and tcp recv_buffer_size.
202    let max_input_stream_estimated_memory = block_size + recv_buffer_size;
203
204    // input
205    for level in &task.input_ssts {
206        if level.level_type == LevelType::Nonoverlapping {
207            let mut cur_level_max_sst_meta_size = 0;
208            for sst in level.read_sstable_infos() {
209                let meta_size = sst.file_size - sst.meta_offset;
210                task_max_sst_meta_ratio =
211                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
212                cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
213            }
214            if cur_level_max_sst_meta_size > 0 {
215                result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
216            }
217        } else {
218            for sst in level.read_sstable_infos() {
219                let meta_size = sst.file_size - sst.meta_offset;
220                result += max_input_stream_estimated_memory + meta_size;
221                task_max_sst_meta_ratio =
222                    std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
223            }
224        }
225    }
226
227    // output
228    // builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
229    let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
230
231    // FIXME: sst_capacity is the upper bound of the memory usage of the streaming sstable uploader
232    // A more reasonable memory limit method needs to be adopted, this is just a temporary fix.
233    result += estimated_meta_size + sst_capacity;
234
235    result
236}