1use std::collections::HashSet;
16
17use risingwave_pb::hummock::LevelType;
18
19use crate::compact_task::CompactTask;
20use crate::sstable_info::SstableInfo;
21
22pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
23 use std::fmt::Write;
24
25 let mut s = String::default();
26 writeln!(
27 s,
28 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
29 compact_task.task_id,
30 compact_task.compaction_group_id,
31 compact_task.task_type,
32 compact_task.target_level,
33 compact_task.target_sub_level_id,
34 compact_task.target_file_size,
35 compact_task.splits.len(),
36 compact_task.task_status
37 )
38 .unwrap();
39 s.push_str("Output: \n");
40 for sst in &compact_task.sorted_output_ssts {
41 append_sstable_info_to_string(&mut s, sst);
42 }
43 s
44}
45
46pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
47 use std::fmt::Write;
48
49 let mut s = String::new();
50 writeln!(
51 s,
52 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
53 compact_task.task_id,
54 compact_task.compaction_group_id,
55 compact_task.task_type,
56 compact_task.target_level,
57 compact_task.target_sub_level_id,
58 compact_task.target_file_size,
59 compact_task.splits.len(),
60 )
61 .unwrap();
62 s.push_str("Input: \n");
63 let existing_table_ids: HashSet<u32> = compact_task
64 .existing_table_ids
65 .clone()
66 .into_iter()
67 .collect();
68 let mut input_sst_table_ids: HashSet<u32> = HashSet::new();
69 let mut dropped_table_ids = HashSet::new();
70 for level_entry in &compact_task.input_ssts {
71 let tables: Vec<String> = level_entry
72 .table_infos
73 .iter()
74 .map(|table| {
75 for tid in &table.table_ids {
76 if !existing_table_ids.contains(tid) {
77 dropped_table_ids.insert(tid);
78 } else {
79 input_sst_table_ids.insert(*tid);
80 }
81 }
82 if table.total_key_count != 0 {
83 format!(
84 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
85 table.sst_id,
86 table.object_id,
87 table.file_size / 1024,
88 table.sst_size / 1024,
89 (table.stale_key_count * 100 / table.total_key_count),
90 )
91 } else {
92 format!(
93 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
94 table.sst_id,
95 table.object_id,
96 table.file_size / 1024,
97 table.sst_size / 1024,
98 )
99 }
100 })
101 .collect();
102 writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
103 }
104 if !compact_task.table_vnode_partition.is_empty() {
105 writeln!(s, "Table vnode partition info:").unwrap();
106 compact_task
107 .table_vnode_partition
108 .iter()
109 .filter(|t| input_sst_table_ids.contains(t.0))
110 .for_each(|(tid, partition)| {
111 writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
112 });
113 }
114
115 if !dropped_table_ids.is_empty() {
116 writeln!(s, "Dropped table_ids: {:?} ", dropped_table_ids).unwrap();
117 }
118 s
119}
120
121pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
122 use std::fmt::Write;
123
124 let key_range = &sstable_info.key_range;
125 let left_str = if key_range.left.is_empty() {
126 "-inf".to_owned()
127 } else {
128 hex::encode(&key_range.left)
129 };
130 let right_str = if key_range.right.is_empty() {
131 "+inf".to_owned()
132 } else {
133 hex::encode(&key_range.right)
134 };
135
136 let stale_ratio = (sstable_info.stale_key_count * 100)
137 .checked_div(sstable_info.total_key_count)
138 .unwrap_or(0);
139 writeln!(
140 s,
141 "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
142 sstable_info.object_id,
143 sstable_info.sst_id,
144 left_str,
145 right_str,
146 sstable_info.table_ids,
147 sstable_info.file_size / 1024,
148 sstable_info.sst_size / 1024,
149 stale_ratio,
150 sstable_info.bloom_filter_kind,
151 )
152 .unwrap();
153}
154
155pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
156 let mut total_key_count = 0;
157 let mut total_file_count: u64 = 0;
158 let mut total_file_size = 0;
159 let mut total_uncompressed_file_size = 0;
160
161 for level in &task.input_ssts {
162 total_file_count += level.table_infos.len() as u64;
163
164 level.table_infos.iter().for_each(|sst| {
165 total_file_size += sst.file_size;
166 total_uncompressed_file_size += sst.uncompressed_file_size;
167 total_key_count += sst.total_key_count;
168 });
169 }
170
171 CompactTaskStatistics {
172 total_file_count,
173 total_key_count,
174 total_file_size,
175 total_uncompressed_file_size,
176 }
177}
178
179#[derive(Debug)]
180pub struct CompactTaskStatistics {
181 pub total_file_count: u64,
182 pub total_key_count: u64,
183 pub total_file_size: u64,
184 pub total_uncompressed_file_size: u64,
185}
186
187pub fn estimate_memory_for_compact_task(
188 task: &CompactTask,
189 block_size: u64,
190 recv_buffer_size: u64,
191 sst_capacity: u64,
192) -> u64 {
193 let mut result = 0;
194 let mut task_max_sst_meta_ratio = 0;
199
200 let max_input_stream_estimated_memory = block_size + recv_buffer_size;
204
205 for level in &task.input_ssts {
207 if level.level_type == LevelType::Nonoverlapping {
208 let mut cur_level_max_sst_meta_size = 0;
209 for sst in &level.table_infos {
210 let meta_size = sst.file_size - sst.meta_offset;
211 task_max_sst_meta_ratio =
212 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
213 cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
214 }
215 result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
216 } else {
217 for sst in &level.table_infos {
218 let meta_size = sst.file_size - sst.meta_offset;
219 result += max_input_stream_estimated_memory + meta_size;
220 task_max_sst_meta_ratio =
221 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
222 }
223 }
224 }
225
226 let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
229
230 result += estimated_meta_size + sst_capacity;
233
234 result
235}