1use std::collections::HashSet;
16
17use risingwave_common::catalog::TableId;
18use risingwave_pb::hummock::LevelType;
19
20use crate::compact_task::CompactTask;
21use crate::sstable_info::SstableInfo;
22
23pub fn compact_task_output_to_string(compact_task: &CompactTask) -> String {
24 use std::fmt::Write;
25
26 let mut s = String::default();
27 writeln!(
28 s,
29 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}, status: {:?}",
30 compact_task.task_id,
31 compact_task.compaction_group_id,
32 compact_task.task_type,
33 compact_task.target_level,
34 compact_task.target_sub_level_id,
35 compact_task.target_file_size,
36 compact_task.splits.len(),
37 compact_task.task_status
38 )
39 .unwrap();
40 s.push_str("Output: \n");
41 for sst in &compact_task.sorted_output_ssts {
42 append_sstable_info_to_string(&mut s, sst);
43 }
44 s
45}
46
47pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
48 use std::fmt::Write;
49
50 let mut s = String::new();
51 writeln!(
52 s,
53 "Compaction task id: {:?}, group-id: {:?}, type: {:?}, target level: {:?}, target sub level: {:?} target_file_size: {:?}, splits: {:?}",
54 compact_task.task_id,
55 compact_task.compaction_group_id,
56 compact_task.task_type,
57 compact_task.target_level,
58 compact_task.target_sub_level_id,
59 compact_task.target_file_size,
60 compact_task.splits.len(),
61 )
62 .unwrap();
63 s.push_str("Input: \n");
64 let mut input_sst_table_ids: HashSet<TableId> = HashSet::new();
65 let mut dropped_only_sst_count = 0;
66 for level_entry in &compact_task.input_ssts {
67 let tables: Vec<String> = level_entry
68 .table_infos
69 .iter()
70 .map(|table| {
71 if table.table_ids.is_empty() {
72 dropped_only_sst_count += 1;
73 } else {
74 input_sst_table_ids.extend(table.table_ids.iter().copied());
75 }
76 if table.total_key_count != 0 {
77 format!(
78 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB stale_ratio {}]",
79 table.sst_id,
80 table.object_id,
81 table.file_size / 1024,
82 table.sst_size / 1024,
83 (table.stale_key_count * 100 / table.total_key_count),
84 )
85 } else {
86 format!(
87 "[id: {}, obj_id: {} object_size {}KB sst_size {}KB]",
88 table.sst_id,
89 table.object_id,
90 table.file_size / 1024,
91 table.sst_size / 1024,
92 )
93 }
94 })
95 .collect();
96 writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
97 }
98 if !compact_task.table_vnode_partition.is_empty() {
99 writeln!(s, "Table vnode partition info:").unwrap();
100 compact_task
101 .table_vnode_partition
102 .iter()
103 .filter(|t| input_sst_table_ids.contains(t.0))
104 .for_each(|(tid, partition)| {
105 writeln!(s, " [{:?}, {:?}]", tid, partition).unwrap();
106 });
107 }
108
109 if dropped_only_sst_count > 0 {
110 writeln!(
111 s,
112 "Dropped-only input SST count: {:?} ",
113 dropped_only_sst_count
114 )
115 .unwrap();
116 }
117 s
118}
119
120pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo) {
121 use std::fmt::Write;
122
123 let key_range = &sstable_info.key_range;
124 let left_str = if key_range.left.is_empty() {
125 "-inf".to_owned()
126 } else {
127 hex::encode(&key_range.left)
128 };
129 let right_str = if key_range.right.is_empty() {
130 "+inf".to_owned()
131 } else {
132 hex::encode(&key_range.right)
133 };
134
135 let stale_ratio = (sstable_info.stale_key_count * 100)
136 .checked_div(sstable_info.total_key_count)
137 .unwrap_or(0);
138 writeln!(
139 s,
140 "SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, object_size={}KB, sst_size={}KB stale_ratio={}%, bloom_filter_kind {:?}",
141 sstable_info.object_id,
142 sstable_info.sst_id,
143 left_str,
144 right_str,
145 sstable_info.table_ids,
146 sstable_info.file_size / 1024,
147 sstable_info.sst_size / 1024,
148 stale_ratio,
149 sstable_info.bloom_filter_kind,
150 )
151 .unwrap();
152}
153
154pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
155 let mut total_key_count = 0;
156 let mut total_file_count: u64 = 0;
157 let mut total_file_size = 0;
158 let mut total_uncompressed_file_size = 0;
159
160 for sst in task.read_input_ssts() {
161 total_file_count += 1;
162 total_file_size += sst.file_size;
163 total_uncompressed_file_size += sst.uncompressed_file_size;
164 total_key_count += sst.total_key_count;
165 }
166
167 CompactTaskStatistics {
168 total_file_count,
169 total_key_count,
170 total_file_size,
171 total_uncompressed_file_size,
172 }
173}
174
175#[derive(Debug)]
176pub struct CompactTaskStatistics {
177 pub total_file_count: u64,
178 pub total_key_count: u64,
179 pub total_file_size: u64,
180 pub total_uncompressed_file_size: u64,
181}
182
183pub fn estimate_memory_for_compact_task(
184 task: &CompactTask,
185 block_size: u64,
186 recv_buffer_size: u64,
187 sst_capacity: u64,
188) -> u64 {
189 let mut result = 0;
190 let mut task_max_sst_meta_ratio = 0;
195
196 let max_input_stream_estimated_memory = block_size + recv_buffer_size;
200
201 for level in &task.input_ssts {
203 if level.level_type == LevelType::Nonoverlapping {
204 let mut cur_level_max_sst_meta_size = 0;
205 for sst in level.read_sstable_infos() {
206 let meta_size = sst.file_size - sst.meta_offset;
207 task_max_sst_meta_ratio =
208 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
209 cur_level_max_sst_meta_size = std::cmp::max(meta_size, cur_level_max_sst_meta_size);
210 }
211 if cur_level_max_sst_meta_size > 0 {
212 result += max_input_stream_estimated_memory + cur_level_max_sst_meta_size;
213 }
214 } else {
215 for sst in level.read_sstable_infos() {
216 let meta_size = sst.file_size - sst.meta_offset;
217 result += max_input_stream_estimated_memory + meta_size;
218 task_max_sst_meta_ratio =
219 std::cmp::max(task_max_sst_meta_ratio, meta_size * 100 / sst.file_size);
220 }
221 }
222 }
223
224 let estimated_meta_size = sst_capacity * task_max_sst_meta_ratio / 100;
227
228 result += estimated_meta_size + sst_capacity;
231
232 result
233}