1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24 use std::fmt::Write;
25
26 let mut s = String::default();
27 writeln!(
28 s,
29 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30 compact_task.task_id,
31 compact_task.compaction_group_id,
32 compact_task.task_type,
33 compact_task.target_level,
34 compact_task.target_sub_level_id,
35 compact_task.target_file_size,
36 compact_task.splits.len(),
37 compact_task.task_status
38 )
39 .unwrap();
40 s.push_str("Output: \n");
41 for sst in &compact_task.sorted_output_ssts {
42 append_sstable_info_to_string(&mut s, sst);
43 }
44 s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48 use std::fmt::Write;
49
50 let mut s = String::new();
51 writeln!(
52 s,
53 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54 compact_task.task_id,
55 compact_task.compaction_group_id,
56 compact_task.task_type,
57 compact_task.target_level,
58 compact_task.target_sub_level_id,
59 compact_task.target_file_size,
60 compact_task.splits.len(),
61 )
62 .unwrap();
63 s.push_str("Input: \n");
64 let existing_table_ids: HashSet<TableId> =
65 compact_task.existing_table_ids.iter().copied().collect();
66 let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
67 let mut dropped_table_ids = HashSet::new();
68 for level_entry in &compact_task.input_ssts {
69 let tables: Vec<String> = level_entry
70 .table_infos
71 .iter()
72 .map(|table| {
73 for tid in &table.table_ids {
74 if !existing_table_ids.contains(tid) {
75 dropped_table_ids.insert(tid);
76 } else {
77 input_sst_table_ids.insert(*tid);
78 }
79 }
80 if table.total_key_count != 0 {
81 format!(
82 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
83 table.sst_id,
84 table.object_id,
85 table.file_size / 1024,
86 table.sst_size / 1024,
87 (table.stale_key_count * 100 / table.total_key_count),
88 )
89 } else {
90 format!(
91 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
92 table.sst_id,
93 table.object_id,
94 table.file_size / 1024,
95 table.sst_size / 1024,
96 )
97 }
98 })
99 .collect();
100 writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
101 }
102 if !compact_task.table_vnode_partition.is_empty() {
103 writeln!(s, "Table vnode partition info:").unwrap();
104 compact_task
105 .table_vnode_partition
106 .iter()
107 .filter(|t| input_sst_table_ids.contains(t.0))
108 .for_each(|(tid, partition)| {
109 writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
110 });
111 }
112
113 if !dropped_table_ids.is_empty() {
114 writeln!(s, "Dropped table_ids: {:?} ", dropped_table_ids).unwrap();
115 }
116 s
117}
118
119pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
120 use std::fmt::Write;
121
122 let key_range = &sstable_info.key_range;
123 let left_str = if key_range.left.is_empty() {
124 "-inf".to_owned()
125 } else {
126 hex::encode(&key_range.left)
127 };
128 let right_str = if key_range.right.is_empty() {
129 "+inf".to_owned()
130 } else {
131 hex::encode(&key_range.right)
132 };
133
134 let stale_ratio = (sstable_info.stale_key_count * 100)
135 .checked_div(sstable_info.total_key_count)
136 .unwrap_or(0);
137 writeln!(
138 s,
139 "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
140 sstable_info.object_id,
141 sstable_info.sst_id,
142 left_str,
143 right_str,
144 sstable_info.table_ids,
145 sstable_info.file_size / 1024,
146 sstable_info.sst_size / 1024,
147 stale_ratio,
148 sstable_info.bloom_filter_kind,
149 )
150 .unwrap();
151}
152
153pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
154 let mut total_key_count = 0;
155 let mut total_file_count: u64 = 0;
156 let mut total_file_size = 0;
157 let mut total_uncompressed_file_size = 0;
158
159 for level in &task.input_ssts {
160 total_file_count += level.table_infos.len() as u64;
161
162 level.table_infos.iter().for_each(|sst| {
163 total_file_size += sst.file_size;
164 total_uncompressed_file_size += sst.uncompressed_file_size;
165 total_key_count += sst.total_key_count;
166 });
167 }
168
169 CompactTaskStatistics {
170 total_file_count,
171 total_key_count,
172 total_file_size,
173 total_uncompressed_file_size,
174 }
175}
176
177#[derive(Debug)]
178pub struct CompactTaskStatistics {
179 pub total_file_count: u64,
180 pub total_key_count: u64,
181 pub total_file_size: u64,
182 pub total_uncompressed_file_size: u64,
183}
184
185pub fn estimate_memory_for_compact_task(
186 task: &CompactTask,
187 block_size: u64,
188 recv_buffer_size: u64,
189 sst_capacity: u64,
190) -> u64 {
191 let mut result = 0;
192 let mut task_max_sst_meta_ratio = 0;
197
198 let max_input_stream_estimated_memory = block_size + recv_buffer_size;
202
203 for level in &task.input_ssts {
205 if level.level_type == LevelType::Nonoverlapping {
206 let mut cur_level_max_sst_meta_size = 0;
207 for sst in &level.table_infos {
208 let meta_size = sst.file_size - sst.meta_offset;
209 task_max_sst_meta_ratio =
210 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
211 cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
212 }
213 result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
214 } else {
215 for sst in &level.table_infos {
216 let meta_size = sst.file_size - sst.meta_offset;
217 result += max_input_stream_estimated_memory + meta_size;
218 task_max_sst_meta_ratio =
219 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
220 }
221 }
222 }
223
224 let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
227
228 result += estimated_meta_size + sst_capacity;
231
232 result
233}