risingwave_ctl/cmd_impl/hummock/
compaction_group.rs1use std::collections::{HashMap, HashSet};
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
20use risingwave_pb::hummock::compact_task::TaskStatus;
21use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
22use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
23 CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
24};
25use risingwave_pb::id::TableId;
26
27use crate::CtlContext;
28
29pub async fn list_compaction_group(context: &CtlContext) -> anyhow::Result<()> {
30 let meta_client = context.meta_client().await?;
31 let result = meta_client.risectl_list_compaction_group().await?;
32 println!("{:#?}", result);
33 Ok(())
34}
35
36pub async fn update_compaction_config(
37 context: &CtlContext,
38 ids: Vec<CompactionGroupId>,
39 configs: Vec<MutableConfig>,
40) -> anyhow::Result<()> {
41 let meta_client = context.meta_client().await?;
42 meta_client
43 .risectl_update_compaction_config(ids.as_slice(), configs.as_slice())
44 .await?;
45 println!(
46 "Succeed: update compaction groups {:#?} with configs {:#?}.",
47 ids, configs
48 );
49 Ok(())
50}
51
52#[expect(clippy::too_many_arguments)]
53pub fn build_compaction_config_vec(
54 max_bytes_for_level_base: Option<u64>,
55 max_bytes_for_level_multiplier: Option<u64>,
56 max_compaction_bytes: Option<u64>,
57 sub_level_max_compaction_bytes: Option<u64>,
58 level0_tier_compact_file_number: Option<u64>,
59 target_file_size_base: Option<u64>,
60 compaction_filter_mask: Option<u32>,
61 max_sub_compaction: Option<u32>,
62 level0_stop_write_threshold_sub_level_number: Option<u64>,
63 level0_sub_level_compact_level_count: Option<u32>,
64 max_space_reclaim_bytes: Option<u64>,
65 level0_max_compact_file_number: Option<u64>,
66 level0_overlapping_sub_level_compact_level_count: Option<u32>,
67 enable_emergency_picker: Option<bool>,
68 tombstone_reclaim_ratio: Option<u32>,
69 compress_algorithm: Option<CompressionAlgorithm>,
70 sstable_filter_type: Option<SstableFilterType>,
71 sstable_filter_layout: Option<SstableFilterLayout>,
72 max_l0_compact_level: Option<u32>,
73 sst_allowed_trivial_move_min_size: Option<u64>,
74 disable_auto_group_scheduling: Option<bool>,
75 max_overlapping_level_size: Option<u64>,
76 sst_allowed_trivial_move_max_count: Option<u32>,
77 emergency_level0_sst_file_count: Option<u32>,
78 emergency_level0_sub_level_partition: Option<u32>,
79 level0_stop_write_threshold_max_sst_count: Option<u32>,
80 level0_stop_write_threshold_max_size: Option<u64>,
81 enable_optimize_l0_interval_selection: Option<bool>,
82 blocked_xor_filter_kv_count_threshold: Option<u64>,
83 max_vnode_key_range_bytes: Option<u64>,
84) -> Vec<MutableConfig> {
85 let mut configs = vec![];
86 if let Some(c) = max_bytes_for_level_base {
87 configs.push(MutableConfig::MaxBytesForLevelBase(c));
88 }
89 if let Some(c) = max_bytes_for_level_multiplier {
90 configs.push(MutableConfig::MaxBytesForLevelMultiplier(c));
91 }
92 if let Some(c) = max_compaction_bytes {
93 configs.push(MutableConfig::MaxCompactionBytes(c));
94 }
95 if let Some(c) = sub_level_max_compaction_bytes {
96 configs.push(MutableConfig::SubLevelMaxCompactionBytes(c));
97 }
98 if let Some(c) = level0_tier_compact_file_number {
99 configs.push(MutableConfig::Level0TierCompactFileNumber(c));
100 }
101 if let Some(c) = target_file_size_base {
102 configs.push(MutableConfig::TargetFileSizeBase(c));
103 }
104 if let Some(c) = compaction_filter_mask {
105 configs.push(MutableConfig::CompactionFilterMask(c));
106 }
107 if let Some(c) = max_sub_compaction {
108 configs.push(MutableConfig::MaxSubCompaction(c));
109 }
110 if let Some(c) = level0_stop_write_threshold_sub_level_number {
111 configs.push(MutableConfig::Level0StopWriteThresholdSubLevelNumber(c));
112 }
113 if let Some(c) = level0_sub_level_compact_level_count {
114 configs.push(MutableConfig::Level0SubLevelCompactLevelCount(c));
115 }
116 if let Some(c) = max_space_reclaim_bytes {
117 configs.push(MutableConfig::MaxSpaceReclaimBytes(c))
118 }
119 if let Some(c) = level0_max_compact_file_number {
120 configs.push(MutableConfig::Level0MaxCompactFileNumber(c))
121 }
122 if let Some(c) = level0_overlapping_sub_level_compact_level_count {
123 configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
124 }
125 if let Some(c) = enable_emergency_picker {
126 configs.push(MutableConfig::EnableEmergencyPicker(c))
127 }
128 if let Some(c) = tombstone_reclaim_ratio {
129 configs.push(MutableConfig::TombstoneReclaimRatio(c))
130 }
131 if let Some(c) = compress_algorithm {
132 configs.push(MutableConfig::CompressionAlgorithm(c))
133 }
134 if let Some(c) = sstable_filter_type {
135 configs.push(MutableConfig::SstableFilterType(c))
136 }
137 if let Some(c) = sstable_filter_layout {
138 configs.push(MutableConfig::SstableFilterLayout(c))
139 }
140 if let Some(c) = max_l0_compact_level {
141 configs.push(MutableConfig::MaxL0CompactLevelCount(c))
142 }
143 if let Some(c) = sst_allowed_trivial_move_min_size {
144 configs.push(MutableConfig::SstAllowedTrivialMoveMinSize(c))
145 }
146 if let Some(c) = disable_auto_group_scheduling {
147 configs.push(MutableConfig::DisableAutoGroupScheduling(c))
148 }
149 if let Some(c) = max_overlapping_level_size {
150 configs.push(MutableConfig::MaxOverlappingLevelSize(c))
151 }
152 if let Some(c) = sst_allowed_trivial_move_max_count {
153 configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c))
154 }
155 if let Some(c) = emergency_level0_sst_file_count {
156 configs.push(MutableConfig::EmergencyLevel0SstFileCount(c))
157 }
158 if let Some(c) = emergency_level0_sub_level_partition {
159 configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
160 }
161 if let Some(c) = level0_stop_write_threshold_max_sst_count {
162 configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
163 }
164 if let Some(c) = level0_stop_write_threshold_max_size {
165 configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
166 }
167 if let Some(c) = enable_optimize_l0_interval_selection {
168 configs.push(MutableConfig::EnableOptimizeL0IntervalSelection(c))
169 }
170 if let Some(c) = blocked_xor_filter_kv_count_threshold {
171 configs.push(MutableConfig::MaxKvCountForXor16(c))
172 }
173 if let Some(c) = max_vnode_key_range_bytes {
174 configs.push(MutableConfig::MaxVnodeKeyRangeBytes(c))
175 }
176
177 configs
178}
179
180pub async fn split_compaction_group(
181 context: &CtlContext,
182 group_id: CompactionGroupId,
183 table_ids_to_new_group: &[TableId],
184 partition_vnode_count: u32,
185) -> anyhow::Result<()> {
186 let meta_client = context.meta_client().await?;
187 let new_group_id = meta_client
188 .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
189 .await?;
190 println!(
191 "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
192 group_id, table_ids_to_new_group, new_group_id
193 );
194 Ok(())
195}
196
197pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyhow::Result<()> {
198 let meta_client = context.meta_client().await?;
199 let (status, assignment, progress) = meta_client.risectl_list_compaction_status().await?;
200 if !verbose {
201 let mut table = Table::new();
202 table.set_header({
203 let mut row = Row::new();
204 row.add_cell("Compaction Group".into());
205 row.add_cell("Level".into());
206 row.add_cell("Task Count".into());
207 row.add_cell("Tasks".into());
208 row
209 });
210 for s in status {
211 let cg_id = s.compaction_group_id;
212 for l in s.level_handlers {
213 let level = l.level;
214 let mut task_ids = HashSet::new();
215 for t in l.tasks {
216 task_ids.insert(t.task_id);
217 }
218 let mut row = Row::new();
219 row.add_cell(cg_id.into());
220 row.add_cell(level.into());
221 row.add_cell(task_ids.len().into());
222 row.add_cell(
223 task_ids
224 .into_iter()
225 .sorted()
226 .map(|t| t.to_string())
227 .join(",")
228 .into(),
229 );
230 table.add_row(row);
231 }
232 }
233 println!("{table}");
234
235 let mut table = Table::new();
236 table.set_header({
237 let mut row = Row::new();
238 row.add_cell("Hummock Context".into());
239 row.add_cell("Task Count".into());
240 row.add_cell("Tasks".into());
241 row
242 });
243 let mut assignment_lite: HashMap<HummockContextId, Vec<u64>> = HashMap::new();
244 for a in assignment {
245 assignment_lite
246 .entry(a.context_id)
247 .or_default()
248 .push(a.compact_task.unwrap().task_id);
249 }
250 for (k, v) in assignment_lite {
251 let mut row = Row::new();
252 row.add_cell(k.into());
253 row.add_cell(v.len().into());
254 row.add_cell(
255 v.into_iter()
256 .sorted()
257 .map(|t| t.to_string())
258 .join(",")
259 .into(),
260 );
261 table.add_row(row);
262 }
263 println!("{table}");
264
265 let mut table = Table::new();
266 table.set_header({
267 let mut row = Row::new();
268 row.add_cell("Task".into());
269 row.add_cell("Num SSTs Sealed".into());
270 row.add_cell("Num SSTs Uploaded".into());
271 row.add_cell("Num Progress Key".into());
272 row.add_cell("Num Pending Read IO".into());
273 row.add_cell("Num Pending Write IO".into());
274 row
275 });
276 for p in progress {
277 let mut row = Row::new();
278 row.add_cell(p.task_id.into());
279 row.add_cell(p.num_ssts_sealed.into());
280 row.add_cell(p.num_ssts_uploaded.into());
281 row.add_cell(p.num_progress_key.into());
282 row.add_cell(p.num_pending_read_io.into());
283 row.add_cell(p.num_pending_write_io.into());
284 table.add_row(row);
285 }
286 println!("{table}");
287 } else {
288 println!("--- LSMtree Status ---");
289 println!("{:#?}", status);
290 println!("--- Task Assignment ---");
291 println!("{:#?}", assignment);
292 println!("--- Task Progress ---");
293 println!("{:#?}", progress);
294 }
295 Ok(())
296}
297
298pub async fn get_compaction_score(
299 context: &CtlContext,
300 id: CompactionGroupId,
301) -> anyhow::Result<()> {
302 let meta_client = context.meta_client().await?;
303 let scores = meta_client.get_compaction_score(id).await?;
304 let mut table = Table::new();
305 table.set_header({
306 let mut row = Row::new();
307 row.add_cell("Select Level".into());
308 row.add_cell("Target Level".into());
309 row.add_cell("Type".into());
310 row.add_cell("Score".into());
311 row
312 });
313 for s in scores.into_iter().sorted_by(|a, b| {
314 a.select_level
315 .cmp(&b.select_level)
316 .then_with(|| a.target_level.cmp(&b.target_level))
317 }) {
318 let mut row = Row::new();
319 row.add_cell(s.select_level.into());
320 row.add_cell(s.target_level.into());
321 row.add_cell(s.picker_type.into());
322 row.add_cell(s.score.into());
323 table.add_row(row);
324 }
325 println!("{table}");
326 Ok(())
327}
328
329pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
330 let meta_client = context.meta_client().await?;
331 let ret = meta_client
332 .cancel_compact_task(task_id, TaskStatus::ManualCanceled)
333 .await?;
334 println!("cancel_compact_task {} ret {:?}", task_id, ret);
335
336 Ok(())
337}
338
339pub async fn merge_compaction_group(
340 context: &CtlContext,
341 left_group_id: CompactionGroupId,
342 right_group_id: CompactionGroupId,
343) -> anyhow::Result<()> {
344 let meta_client = context.meta_client().await?;
345 meta_client
346 .merge_compaction_group(left_group_id, right_group_id)
347 .await?;
348 Ok(())
349}