1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24 use std::fmt::Write;
25
26 let mut s = String::default();
27 writeln!(
28 s,
29 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30 compact_task.task_id,
31 compact_task.compaction_group_id,
32 compact_task.task_type,
33 compact_task.target_level,
34 compact_task.target_sub_level_id,
35 compact_task.target_file_size,
36 compact_task.splits.len(),
37 compact_task.task_status
38 )
39 .unwrap();
40 s.push_str("Output: \n");
41 for sst in &compact_task.sorted_output_ssts {
42 append_sstable_info_to_string(&mut s, sst);
43 }
44 s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48 use std::fmt::Write;
49
50 let mut s = String::new();
51 writeln!(
52 s,
53 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54 compact_task.task_id,
55 compact_task.compaction_group_id,
56 compact_task.task_type,
57 compact_task.target_level,
58 compact_task.target_sub_level_id,
59 compact_task.target_file_size,
60 compact_task.splits.len(),
61 )
62 .unwrap();
63 s.push_str("Input: \n");
64 let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
65 let mut dropped_only_sst_count = 0;
66 for level_entry in &compact_task.input_ssts {
67 let tables: Vec<String> = level_entry
68 .table_infos
69 .iter()
70 .map(|table| {
71 if table.table_ids.is_empty() {
72 dropped_only_sst_count += 1;
73 } else {
74 input_sst_table_ids.extend(table.table_ids.iter().copied());
75 }
76 if let Some(stale_ratio) =
77 (table.stale_key_count * 100).checked_div(table.total_key_count)
78 {
79 format!(
80 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
81 table.sst_id,
82 table.object_id,
83 table.file_size / 1024,
84 table.sst_size / 1024,
85 stale_ratio,
86 )
87 } else {
88 format!(
89 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
90 table.sst_id,
91 table.object_id,
92 table.file_size / 1024,
93 table.sst_size / 1024,
94 )
95 }
96 })
97 .collect();
98 writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
99 }
100 if !compact_task.table_vnode_partition.is_empty() {
101 writeln!(s, "Table vnode partition info:").unwrap();
102 compact_task
103 .table_vnode_partition
104 .iter()
105 .filter(|t| input_sst_table_ids.contains(t.0))
106 .for_each(|(tid, partition)| {
107 writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
108 });
109 }
110
111 if dropped_only_sst_count > 0 {
112 writeln!(
113 s,
114 "Dropped-only input SST count: {:?} ",
115 dropped_only_sst_count
116 )
117 .unwrap();
118 }
119 s
120}
121
122pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
123 use std::fmt::Write;
124
125 let key_range = &sstable_info.key_range;
126 let left_str = if key_range.left.is_empty() {
127 "-inf".to_owned()
128 } else {
129 hex::encode(&key_range.left)
130 };
131 let right_str = if key_range.right.is_empty() {
132 "+inf".to_owned()
133 } else {
134 hex::encode(&key_range.right)
135 };
136
137 let stale_ratio = (sstable_info.stale_key_count * 100)
138 .checked_div(sstable_info.total_key_count)
139 .unwrap_or(0);
140 writeln!(
141 s,
142 "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, filter_type {:?}, filter_layout {:?}",
143 sstable_info.object_id,
144 sstable_info.sst_id,
145 left_str,
146 right_str,
147 sstable_info.table_ids,
148 sstable_info.file_size / 1024,
149 sstable_info.sst_size / 1024,
150 stale_ratio,
151 sstable_info.filter_type,
152 sstable_info.filter_layout,
153 )
154 .unwrap();
155}
156
157pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
158 let mut total_key_count = 0;
159 let mut total_file_count: u64 = 0;
160 let mut total_file_size = 0;
161 let mut total_uncompressed_file_size = 0;
162
163 for sst in task.read_input_ssts() {
164 total_file_count += 1;
165 total_file_size += sst.file_size;
166 total_uncompressed_file_size += sst.uncompressed_file_size;
167 total_key_count += sst.total_key_count;
168 }
169
170 CompactTaskStatistics {
171 total_file_count,
172 total_key_count,
173 total_file_size,
174 total_uncompressed_file_size,
175 }
176}
177
178#[derive(Debug)]
179pub struct CompactTaskStatistics {
180 pub total_file_count: u64,
181 pub total_key_count: u64,
182 pub total_file_size: u64,
183 pub total_uncompressed_file_size: u64,
184}
185
186pub fn estimate_memory_for_compact_task(
187 task: &CompactTask,
188 block_size: u64,
189 recv_buffer_size: u64,
190 sst_capacity: u64,
191) -> u64 {
192 let mut result = 0;
193 let mut task_max_sst_meta_ratio = 0;
198
199 let max_input_stream_estimated_memory = block_size + recv_buffer_size;
203
204 for level in &task.input_ssts {
206 if level.level_type == LevelType::Nonoverlapping {
207 let mut cur_level_max_sst_meta_size = 0;
208 for sst in level.read_sstable_infos() {
209 let meta_size = sst.file_size - sst.meta_offset;
210 task_max_sst_meta_ratio =
211 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
212 cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
213 }
214 if cur_level_max_sst_meta_size > 0 {
215 result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
216 }
217 } else {
218 for sst in level.read_sstable_infos() {
219 let meta_size = sst.file_size - sst.meta_offset;
220 result += max_input_stream_estimated_memory + meta_size;
221 task_max_sst_meta_ratio =
222 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
223 }
224 }
225 }
226
227 let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
230
231 result += estimated_meta_size + sst_capacity;
234
235 result
236}