risingwave_ctl/cmd_impl/hummock/
compaction_group.rs1use std::collections::{HashMap, HashSet};
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
20use risingwave_pb::hummock::compact_task::TaskStatus;
21use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
22use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
23use risingwave_pb::id::TableId;
24
25use crate::CtlContext;
26
27pub async fn list_compaction_group(context: &CtlContext) -> anyhow::Result<()> {
28 let meta_client = context.meta_client().await?;
29 let result = meta_client.risectl_list_compaction_group().await?;
30 println!("{:#?}", result);
31 Ok(())
32}
33
34pub async fn update_compaction_config(
35 context: &CtlContext,
36 ids: Vec<CompactionGroupId>,
37 configs: Vec<MutableConfig>,
38) -> anyhow::Result<()> {
39 let meta_client = context.meta_client().await?;
40 meta_client
41 .risectl_update_compaction_config(ids.as_slice(), configs.as_slice())
42 .await?;
43 println!(
44 "Succeed: update compaction groups {:#?} with configs {:#?}.",
45 ids, configs
46 );
47 Ok(())
48}
49
50#[allow(clippy::too_many_arguments)]
51pub fn build_compaction_config_vec(
52 max_bytes_for_level_base: Option<u64>,
53 max_bytes_for_level_multiplier: Option<u64>,
54 max_compaction_bytes: Option<u64>,
55 sub_level_max_compaction_bytes: Option<u64>,
56 level0_tier_compact_file_number: Option<u64>,
57 target_file_size_base: Option<u64>,
58 compaction_filter_mask: Option<u32>,
59 max_sub_compaction: Option<u32>,
60 level0_stop_write_threshold_sub_level_number: Option<u64>,
61 level0_sub_level_compact_level_count: Option<u32>,
62 max_space_reclaim_bytes: Option<u64>,
63 level0_max_compact_file_number: Option<u64>,
64 level0_overlapping_sub_level_compact_level_count: Option<u32>,
65 enable_emergency_picker: Option<bool>,
66 tombstone_reclaim_ratio: Option<u32>,
67 compress_algorithm: Option<CompressionAlgorithm>,
68 max_l0_compact_level: Option<u32>,
69 sst_allowed_trivial_move_min_size: Option<u64>,
70 disable_auto_group_scheduling: Option<bool>,
71 max_overlapping_level_size: Option<u64>,
72 sst_allowed_trivial_move_max_count: Option<u32>,
73 emergency_level0_sst_file_count: Option<u32>,
74 emergency_level0_sub_level_partition: Option<u32>,
75 level0_stop_write_threshold_max_sst_count: Option<u32>,
76 level0_stop_write_threshold_max_size: Option<u64>,
77 enable_optimize_l0_interval_selection: Option<bool>,
78 vnode_aligned_level_size_threshold: Option<u64>,
79 max_kv_count_for_xor16: Option<u64>,
80) -> Vec<MutableConfig> {
81 let mut configs = vec![];
82 if let Some(c) = max_bytes_for_level_base {
83 configs.push(MutableConfig::MaxBytesForLevelBase(c));
84 }
85 if let Some(c) = max_bytes_for_level_multiplier {
86 configs.push(MutableConfig::MaxBytesForLevelMultiplier(c));
87 }
88 if let Some(c) = max_compaction_bytes {
89 configs.push(MutableConfig::MaxCompactionBytes(c));
90 }
91 if let Some(c) = sub_level_max_compaction_bytes {
92 configs.push(MutableConfig::SubLevelMaxCompactionBytes(c));
93 }
94 if let Some(c) = level0_tier_compact_file_number {
95 configs.push(MutableConfig::Level0TierCompactFileNumber(c));
96 }
97 if let Some(c) = target_file_size_base {
98 configs.push(MutableConfig::TargetFileSizeBase(c));
99 }
100 if let Some(c) = compaction_filter_mask {
101 configs.push(MutableConfig::CompactionFilterMask(c));
102 }
103 if let Some(c) = max_sub_compaction {
104 configs.push(MutableConfig::MaxSubCompaction(c));
105 }
106 if let Some(c) = level0_stop_write_threshold_sub_level_number {
107 configs.push(MutableConfig::Level0StopWriteThresholdSubLevelNumber(c));
108 }
109 if let Some(c) = level0_sub_level_compact_level_count {
110 configs.push(MutableConfig::Level0SubLevelCompactLevelCount(c));
111 }
112 if let Some(c) = max_space_reclaim_bytes {
113 configs.push(MutableConfig::MaxSpaceReclaimBytes(c))
114 }
115 if let Some(c) = level0_max_compact_file_number {
116 configs.push(MutableConfig::Level0MaxCompactFileNumber(c))
117 }
118 if let Some(c) = level0_overlapping_sub_level_compact_level_count {
119 configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
120 }
121 if let Some(c) = enable_emergency_picker {
122 configs.push(MutableConfig::EnableEmergencyPicker(c))
123 }
124 if let Some(c) = tombstone_reclaim_ratio {
125 configs.push(MutableConfig::TombstoneReclaimRatio(c))
126 }
127 if let Some(c) = compress_algorithm {
128 configs.push(MutableConfig::CompressionAlgorithm(c))
129 }
130 if let Some(c) = max_l0_compact_level {
131 configs.push(MutableConfig::MaxL0CompactLevelCount(c))
132 }
133 if let Some(c) = sst_allowed_trivial_move_min_size {
134 configs.push(MutableConfig::SstAllowedTrivialMoveMinSize(c))
135 }
136 if let Some(c) = disable_auto_group_scheduling {
137 configs.push(MutableConfig::DisableAutoGroupScheduling(c))
138 }
139 if let Some(c) = max_overlapping_level_size {
140 configs.push(MutableConfig::MaxOverlappingLevelSize(c))
141 }
142 if let Some(c) = sst_allowed_trivial_move_max_count {
143 configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c))
144 }
145 if let Some(c) = emergency_level0_sst_file_count {
146 configs.push(MutableConfig::EmergencyLevel0SstFileCount(c))
147 }
148 if let Some(c) = emergency_level0_sub_level_partition {
149 configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
150 }
151 if let Some(c) = level0_stop_write_threshold_max_sst_count {
152 configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
153 }
154 if let Some(c) = level0_stop_write_threshold_max_size {
155 configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
156 }
157 if let Some(c) = enable_optimize_l0_interval_selection {
158 configs.push(MutableConfig::EnableOptimizeL0IntervalSelection(c))
159 }
160 if let Some(c) = vnode_aligned_level_size_threshold {
161 configs.push(MutableConfig::VnodeAlignedLevelSizeThreshold(c))
162 }
163 if let Some(c) = max_kv_count_for_xor16 {
164 configs.push(MutableConfig::MaxKvCountForXor16(c))
165 }
166
167 configs
168}
169
170pub async fn split_compaction_group(
171 context: &CtlContext,
172 group_id: CompactionGroupId,
173 table_ids_to_new_group: &[TableId],
174 partition_vnode_count: u32,
175) -> anyhow::Result<()> {
176 let meta_client = context.meta_client().await?;
177 let new_group_id = meta_client
178 .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
179 .await?;
180 println!(
181 "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
182 group_id, table_ids_to_new_group, new_group_id
183 );
184 Ok(())
185}
186
187pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyhow::Result<()> {
188 let meta_client = context.meta_client().await?;
189 let (status, assignment, progress) = meta_client.risectl_list_compaction_status().await?;
190 if !verbose {
191 let mut table = Table::new();
192 table.set_header({
193 let mut row = Row::new();
194 row.add_cell("Compaction Group".into());
195 row.add_cell("Level".into());
196 row.add_cell("Task Count".into());
197 row.add_cell("Tasks".into());
198 row
199 });
200 for s in status {
201 let cg_id = s.compaction_group_id;
202 for l in s.level_handlers {
203 let level = l.level;
204 let mut task_ids = HashSet::new();
205 for t in l.tasks {
206 task_ids.insert(t.task_id);
207 }
208 let mut row = Row::new();
209 row.add_cell(cg_id.into());
210 row.add_cell(level.into());
211 row.add_cell(task_ids.len().into());
212 row.add_cell(
213 task_ids
214 .into_iter()
215 .sorted()
216 .map(|t| t.to_string())
217 .join(",")
218 .into(),
219 );
220 table.add_row(row);
221 }
222 }
223 println!("{table}");
224
225 let mut table = Table::new();
226 table.set_header({
227 let mut row = Row::new();
228 row.add_cell("Hummock Context".into());
229 row.add_cell("Task Count".into());
230 row.add_cell("Tasks".into());
231 row
232 });
233 let mut assignment_lite: HashMap<HummockContextId, Vec<u64>> = HashMap::new();
234 for a in assignment {
235 assignment_lite
236 .entry(a.context_id)
237 .or_default()
238 .push(a.compact_task.unwrap().task_id);
239 }
240 for (k, v) in assignment_lite {
241 let mut row = Row::new();
242 row.add_cell(k.into());
243 row.add_cell(v.len().into());
244 row.add_cell(
245 v.into_iter()
246 .sorted()
247 .map(|t| t.to_string())
248 .join(",")
249 .into(),
250 );
251 table.add_row(row);
252 }
253 println!("{table}");
254
255 let mut table = Table::new();
256 table.set_header({
257 let mut row = Row::new();
258 row.add_cell("Task".into());
259 row.add_cell("Num SSTs Sealed".into());
260 row.add_cell("Num SSTs Uploaded".into());
261 row.add_cell("Num Progress Key".into());
262 row.add_cell("Num Pending Read IO".into());
263 row.add_cell("Num Pending Write IO".into());
264 row
265 });
266 for p in progress {
267 let mut row = Row::new();
268 row.add_cell(p.task_id.into());
269 row.add_cell(p.num_ssts_sealed.into());
270 row.add_cell(p.num_ssts_uploaded.into());
271 row.add_cell(p.num_progress_key.into());
272 row.add_cell(p.num_pending_read_io.into());
273 row.add_cell(p.num_pending_write_io.into());
274 table.add_row(row);
275 }
276 println!("{table}");
277 } else {
278 println!("--- LSMtree Status ---");
279 println!("{:#?}", status);
280 println!("--- Task Assignment ---");
281 println!("{:#?}", assignment);
282 println!("--- Task Progress ---");
283 println!("{:#?}", progress);
284 }
285 Ok(())
286}
287
288pub async fn get_compaction_score(
289 context: &CtlContext,
290 id: CompactionGroupId,
291) -> anyhow::Result<()> {
292 let meta_client = context.meta_client().await?;
293 let scores = meta_client.get_compaction_score(id).await?;
294 let mut table = Table::new();
295 table.set_header({
296 let mut row = Row::new();
297 row.add_cell("Select Level".into());
298 row.add_cell("Target Level".into());
299 row.add_cell("Type".into());
300 row.add_cell("Score".into());
301 row
302 });
303 for s in scores.into_iter().sorted_by(|a, b| {
304 a.select_level
305 .cmp(&b.select_level)
306 .then_with(|| a.target_level.cmp(&b.target_level))
307 }) {
308 let mut row = Row::new();
309 row.add_cell(s.select_level.into());
310 row.add_cell(s.target_level.into());
311 row.add_cell(s.picker_type.into());
312 row.add_cell(s.score.into());
313 table.add_row(row);
314 }
315 println!("{table}");
316 Ok(())
317}
318
319pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
320 let meta_client = context.meta_client().await?;
321 let ret = meta_client
322 .cancel_compact_task(task_id, TaskStatus::ManualCanceled)
323 .await?;
324 println!("cancel_compact_task {} ret {:?}", task_id, ret);
325
326 Ok(())
327}
328
329pub async fn merge_compaction_group(
330 context: &CtlContext,
331 left_group_id: CompactionGroupId,
332 right_group_id: CompactionGroupId,
333) -> anyhow::Result<()> {
334 let meta_client = context.meta_client().await?;
335 meta_client
336 .merge_compaction_group(left_group_id, right_group_id)
337 .await?;
338 Ok(())
339}