risingwave_ctl/cmd_impl/hummock/
compaction_group.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
20use risingwave_pb::hummock::compact_task::TaskStatus;
21use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
22use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
23
24use crate::CtlContext;
25
26pub async fn list_compaction_group(context: &CtlContext) -> anyhow::Result<()> {
27    let meta_client = context.meta_client().await?;
28    let result = meta_client.risectl_list_compaction_group().await?;
29    println!("{:#?}", result);
30    Ok(())
31}
32
33pub async fn update_compaction_config(
34    context: &CtlContext,
35    ids: Vec<CompactionGroupId>,
36    configs: Vec<MutableConfig>,
37) -> anyhow::Result<()> {
38    let meta_client = context.meta_client().await?;
39    meta_client
40        .risectl_update_compaction_config(ids.as_slice(), configs.as_slice())
41        .await?;
42    println!(
43        "Succeed: update compaction groups {:#?} with configs {:#?}.",
44        ids, configs
45    );
46    Ok(())
47}
48
49#[allow(clippy::too_many_arguments)]
50pub fn build_compaction_config_vec(
51    max_bytes_for_level_base: Option<u64>,
52    max_bytes_for_level_multiplier: Option<u64>,
53    max_compaction_bytes: Option<u64>,
54    sub_level_max_compaction_bytes: Option<u64>,
55    level0_tier_compact_file_number: Option<u64>,
56    target_file_size_base: Option<u64>,
57    compaction_filter_mask: Option<u32>,
58    max_sub_compaction: Option<u32>,
59    level0_stop_write_threshold_sub_level_number: Option<u64>,
60    level0_sub_level_compact_level_count: Option<u32>,
61    max_space_reclaim_bytes: Option<u64>,
62    level0_max_compact_file_number: Option<u64>,
63    level0_overlapping_sub_level_compact_level_count: Option<u32>,
64    enable_emergency_picker: Option<bool>,
65    tombstone_reclaim_ratio: Option<u32>,
66    compress_algorithm: Option<CompressionAlgorithm>,
67    max_l0_compact_level: Option<u32>,
68    sst_allowed_trivial_move_min_size: Option<u64>,
69    disable_auto_group_scheduling: Option<bool>,
70    max_overlapping_level_size: Option<u64>,
71    sst_allowed_trivial_move_max_count: Option<u32>,
72    emergency_level0_sst_file_count: Option<u32>,
73    emergency_level0_sub_level_partition: Option<u32>,
74    level0_stop_write_threshold_max_sst_count: Option<u32>,
75    level0_stop_write_threshold_max_size: Option<u64>,
76    enable_optimize_l0_interval_selection: Option<bool>,
77) -> Vec<MutableConfig> {
78    let mut configs = vec![];
79    if let Some(c) = max_bytes_for_level_base {
80        configs.push(MutableConfig::MaxBytesForLevelBase(c));
81    }
82    if let Some(c) = max_bytes_for_level_multiplier {
83        configs.push(MutableConfig::MaxBytesForLevelMultiplier(c));
84    }
85    if let Some(c) = max_compaction_bytes {
86        configs.push(MutableConfig::MaxCompactionBytes(c));
87    }
88    if let Some(c) = sub_level_max_compaction_bytes {
89        configs.push(MutableConfig::SubLevelMaxCompactionBytes(c));
90    }
91    if let Some(c) = level0_tier_compact_file_number {
92        configs.push(MutableConfig::Level0TierCompactFileNumber(c));
93    }
94    if let Some(c) = target_file_size_base {
95        configs.push(MutableConfig::TargetFileSizeBase(c));
96    }
97    if let Some(c) = compaction_filter_mask {
98        configs.push(MutableConfig::CompactionFilterMask(c));
99    }
100    if let Some(c) = max_sub_compaction {
101        configs.push(MutableConfig::MaxSubCompaction(c));
102    }
103    if let Some(c) = level0_stop_write_threshold_sub_level_number {
104        configs.push(MutableConfig::Level0StopWriteThresholdSubLevelNumber(c));
105    }
106    if let Some(c) = level0_sub_level_compact_level_count {
107        configs.push(MutableConfig::Level0SubLevelCompactLevelCount(c));
108    }
109    if let Some(c) = max_space_reclaim_bytes {
110        configs.push(MutableConfig::MaxSpaceReclaimBytes(c))
111    }
112    if let Some(c) = level0_max_compact_file_number {
113        configs.push(MutableConfig::Level0MaxCompactFileNumber(c))
114    }
115    if let Some(c) = level0_overlapping_sub_level_compact_level_count {
116        configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
117    }
118    if let Some(c) = enable_emergency_picker {
119        configs.push(MutableConfig::EnableEmergencyPicker(c))
120    }
121    if let Some(c) = tombstone_reclaim_ratio {
122        configs.push(MutableConfig::TombstoneReclaimRatio(c))
123    }
124    if let Some(c) = compress_algorithm {
125        configs.push(MutableConfig::CompressionAlgorithm(c))
126    }
127    if let Some(c) = max_l0_compact_level {
128        configs.push(MutableConfig::MaxL0CompactLevelCount(c))
129    }
130    if let Some(c) = sst_allowed_trivial_move_min_size {
131        configs.push(MutableConfig::SstAllowedTrivialMoveMinSize(c))
132    }
133    if let Some(c) = disable_auto_group_scheduling {
134        configs.push(MutableConfig::DisableAutoGroupScheduling(c))
135    }
136    if let Some(c) = max_overlapping_level_size {
137        configs.push(MutableConfig::MaxOverlappingLevelSize(c))
138    }
139    if let Some(c) = sst_allowed_trivial_move_max_count {
140        configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c))
141    }
142    if let Some(c) = emergency_level0_sst_file_count {
143        configs.push(MutableConfig::EmergencyLevel0SstFileCount(c))
144    }
145    if let Some(c) = emergency_level0_sub_level_partition {
146        configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
147    }
148    if let Some(c) = level0_stop_write_threshold_max_sst_count {
149        configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
150    }
151    if let Some(c) = level0_stop_write_threshold_max_size {
152        configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
153    }
154    if let Some(c) = enable_optimize_l0_interval_selection {
155        configs.push(MutableConfig::EnableOptimizeL0IntervalSelection(c))
156    }
157
158    configs
159}
160
161pub async fn split_compaction_group(
162    context: &CtlContext,
163    group_id: CompactionGroupId,
164    table_ids_to_new_group: &[u32],
165    partition_vnode_count: u32,
166) -> anyhow::Result<()> {
167    let meta_client = context.meta_client().await?;
168    let new_group_id = meta_client
169        .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
170        .await?;
171    println!(
172        "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
173        group_id, table_ids_to_new_group, new_group_id
174    );
175    Ok(())
176}
177
178pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyhow::Result<()> {
179    let meta_client = context.meta_client().await?;
180    let (status, assignment, progress) = meta_client.risectl_list_compaction_status().await?;
181    if !verbose {
182        let mut table = Table::new();
183        table.set_header({
184            let mut row = Row::new();
185            row.add_cell("Compaction Group".into());
186            row.add_cell("Level".into());
187            row.add_cell("Task Count".into());
188            row.add_cell("Tasks".into());
189            row
190        });
191        for s in status {
192            let cg_id = s.compaction_group_id;
193            for l in s.level_handlers {
194                let level = l.level;
195                let mut task_ids = HashSet::new();
196                for t in l.tasks {
197                    task_ids.insert(t.task_id);
198                }
199                let mut row = Row::new();
200                row.add_cell(cg_id.into());
201                row.add_cell(level.into());
202                row.add_cell(task_ids.len().into());
203                row.add_cell(
204                    task_ids
205                        .into_iter()
206                        .sorted()
207                        .map(|t| t.to_string())
208                        .join(",")
209                        .into(),
210                );
211                table.add_row(row);
212            }
213        }
214        println!("{table}");
215
216        let mut table = Table::new();
217        table.set_header({
218            let mut row = Row::new();
219            row.add_cell("Hummock Context".into());
220            row.add_cell("Task Count".into());
221            row.add_cell("Tasks".into());
222            row
223        });
224        let mut assignment_lite: HashMap<HummockContextId, Vec<u64>> = HashMap::new();
225        for a in assignment {
226            assignment_lite
227                .entry(a.context_id)
228                .or_default()
229                .push(a.compact_task.unwrap().task_id);
230        }
231        for (k, v) in assignment_lite {
232            let mut row = Row::new();
233            row.add_cell(k.into());
234            row.add_cell(v.len().into());
235            row.add_cell(
236                v.into_iter()
237                    .sorted()
238                    .map(|t| t.to_string())
239                    .join(",")
240                    .into(),
241            );
242            table.add_row(row);
243        }
244        println!("{table}");
245
246        let mut table = Table::new();
247        table.set_header({
248            let mut row = Row::new();
249            row.add_cell("Task".into());
250            row.add_cell("Num SSTs Sealed".into());
251            row.add_cell("Num SSTs Uploaded".into());
252            row.add_cell("Num Progress Key".into());
253            row.add_cell("Num Pending Read IO".into());
254            row.add_cell("Num Pending Write IO".into());
255            row
256        });
257        for p in progress {
258            let mut row = Row::new();
259            row.add_cell(p.task_id.into());
260            row.add_cell(p.num_ssts_sealed.into());
261            row.add_cell(p.num_ssts_uploaded.into());
262            row.add_cell(p.num_progress_key.into());
263            row.add_cell(p.num_pending_read_io.into());
264            row.add_cell(p.num_pending_write_io.into());
265            table.add_row(row);
266        }
267        println!("{table}");
268    } else {
269        println!("--- LSMtree Status ---");
270        println!("{:#?}", status);
271        println!("--- Task Assignment ---");
272        println!("{:#?}", assignment);
273        println!("--- Task Progress ---");
274        println!("{:#?}", progress);
275    }
276    Ok(())
277}
278
279pub async fn get_compaction_score(
280    context: &CtlContext,
281    id: CompactionGroupId,
282) -> anyhow::Result<()> {
283    let meta_client = context.meta_client().await?;
284    let scores = meta_client.get_compaction_score(id).await?;
285    let mut table = Table::new();
286    table.set_header({
287        let mut row = Row::new();
288        row.add_cell("Select Level".into());
289        row.add_cell("Target Level".into());
290        row.add_cell("Type".into());
291        row.add_cell("Score".into());
292        row
293    });
294    for s in scores.into_iter().sorted_by(|a, b| {
295        a.select_level
296            .cmp(&b.select_level)
297            .then_with(|| a.target_level.cmp(&b.target_level))
298    }) {
299        let mut row = Row::new();
300        row.add_cell(s.select_level.into());
301        row.add_cell(s.target_level.into());
302        row.add_cell(s.picker_type.into());
303        row.add_cell(s.score.into());
304        table.add_row(row);
305    }
306    println!("{table}");
307    Ok(())
308}
309
310pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
311    let meta_client = context.meta_client().await?;
312    let ret = meta_client
313        .cancel_compact_task(task_id, TaskStatus::ManualCanceled)
314        .await?;
315    println!("cancel_compact_task {} ret {:?}", task_id, ret);
316
317    Ok(())
318}
319
320pub async fn merge_compaction_group(
321    context: &CtlContext,
322    left_group_id: CompactionGroupId,
323    right_group_id: CompactionGroupId,
324) -> anyhow::Result<()> {
325    let meta_client = context.meta_client().await?;
326    meta_client
327        .merge_compaction_group(left_group_id, right_group_id)
328        .await?;
329    Ok(())
330}