Skip to main content

risingwave_ctl/cmd_impl/hummock/
compaction_group.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
20use risingwave_pb::hummock::compact_task::TaskStatus;
21use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
22use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
23    CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
24};
25use risingwave_pb::id::TableId;
26
27use crate::CtlContext;
28
29pub async fn list_compaction_group(context: &CtlContext) -> anyhow::Result<()> {
30    let meta_client = context.meta_client().await?;
31    let result = meta_client.risectl_list_compaction_group().await?;
32    println!("{:#?}", result);
33    Ok(())
34}
35
36pub async fn update_compaction_config(
37    context: &CtlContext,
38    ids: Vec<CompactionGroupId>,
39    configs: Vec<MutableConfig>,
40) -> anyhow::Result<()> {
41    let meta_client = context.meta_client().await?;
42    meta_client
43        .risectl_update_compaction_config(ids.as_slice(), configs.as_slice())
44        .await?;
45    println!(
46        "Succeed: update compaction groups {:#?} with configs {:#?}.",
47        ids, configs
48    );
49    Ok(())
50}
51
52#[expect(clippy::too_many_arguments)]
53pub fn build_compaction_config_vec(
54    max_bytes_for_level_base: Option<u64>,
55    max_bytes_for_level_multiplier: Option<u64>,
56    max_compaction_bytes: Option<u64>,
57    sub_level_max_compaction_bytes: Option<u64>,
58    level0_tier_compact_file_number: Option<u64>,
59    target_file_size_base: Option<u64>,
60    compaction_filter_mask: Option<u32>,
61    max_sub_compaction: Option<u32>,
62    level0_stop_write_threshold_sub_level_number: Option<u64>,
63    level0_sub_level_compact_level_count: Option<u32>,
64    max_space_reclaim_bytes: Option<u64>,
65    level0_max_compact_file_number: Option<u64>,
66    level0_overlapping_sub_level_compact_level_count: Option<u32>,
67    enable_emergency_picker: Option<bool>,
68    tombstone_reclaim_ratio: Option<u32>,
69    compress_algorithm: Option<CompressionAlgorithm>,
70    sstable_filter_type: Option<SstableFilterType>,
71    sstable_filter_layout: Option<SstableFilterLayout>,
72    max_l0_compact_level: Option<u32>,
73    sst_allowed_trivial_move_min_size: Option<u64>,
74    disable_auto_group_scheduling: Option<bool>,
75    max_overlapping_level_size: Option<u64>,
76    sst_allowed_trivial_move_max_count: Option<u32>,
77    emergency_level0_sst_file_count: Option<u32>,
78    emergency_level0_sub_level_partition: Option<u32>,
79    level0_stop_write_threshold_max_sst_count: Option<u32>,
80    level0_stop_write_threshold_max_size: Option<u64>,
81    enable_optimize_l0_interval_selection: Option<bool>,
82    blocked_xor_filter_kv_count_threshold: Option<u64>,
83    max_vnode_key_range_bytes: Option<u64>,
84) -> Vec<MutableConfig> {
85    let mut configs = vec![];
86    if let Some(c) = max_bytes_for_level_base {
87        configs.push(MutableConfig::MaxBytesForLevelBase(c));
88    }
89    if let Some(c) = max_bytes_for_level_multiplier {
90        configs.push(MutableConfig::MaxBytesForLevelMultiplier(c));
91    }
92    if let Some(c) = max_compaction_bytes {
93        configs.push(MutableConfig::MaxCompactionBytes(c));
94    }
95    if let Some(c) = sub_level_max_compaction_bytes {
96        configs.push(MutableConfig::SubLevelMaxCompactionBytes(c));
97    }
98    if let Some(c) = level0_tier_compact_file_number {
99        configs.push(MutableConfig::Level0TierCompactFileNumber(c));
100    }
101    if let Some(c) = target_file_size_base {
102        configs.push(MutableConfig::TargetFileSizeBase(c));
103    }
104    if let Some(c) = compaction_filter_mask {
105        configs.push(MutableConfig::CompactionFilterMask(c));
106    }
107    if let Some(c) = max_sub_compaction {
108        configs.push(MutableConfig::MaxSubCompaction(c));
109    }
110    if let Some(c) = level0_stop_write_threshold_sub_level_number {
111        configs.push(MutableConfig::Level0StopWriteThresholdSubLevelNumber(c));
112    }
113    if let Some(c) = level0_sub_level_compact_level_count {
114        configs.push(MutableConfig::Level0SubLevelCompactLevelCount(c));
115    }
116    if let Some(c) = max_space_reclaim_bytes {
117        configs.push(MutableConfig::MaxSpaceReclaimBytes(c))
118    }
119    if let Some(c) = level0_max_compact_file_number {
120        configs.push(MutableConfig::Level0MaxCompactFileNumber(c))
121    }
122    if let Some(c) = level0_overlapping_sub_level_compact_level_count {
123        configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
124    }
125    if let Some(c) = enable_emergency_picker {
126        configs.push(MutableConfig::EnableEmergencyPicker(c))
127    }
128    if let Some(c) = tombstone_reclaim_ratio {
129        configs.push(MutableConfig::TombstoneReclaimRatio(c))
130    }
131    if let Some(c) = compress_algorithm {
132        configs.push(MutableConfig::CompressionAlgorithm(c))
133    }
134    if let Some(c) = sstable_filter_type {
135        configs.push(MutableConfig::SstableFilterType(c))
136    }
137    if let Some(c) = sstable_filter_layout {
138        configs.push(MutableConfig::SstableFilterLayout(c))
139    }
140    if let Some(c) = max_l0_compact_level {
141        configs.push(MutableConfig::MaxL0CompactLevelCount(c))
142    }
143    if let Some(c) = sst_allowed_trivial_move_min_size {
144        configs.push(MutableConfig::SstAllowedTrivialMoveMinSize(c))
145    }
146    if let Some(c) = disable_auto_group_scheduling {
147        configs.push(MutableConfig::DisableAutoGroupScheduling(c))
148    }
149    if let Some(c) = max_overlapping_level_size {
150        configs.push(MutableConfig::MaxOverlappingLevelSize(c))
151    }
152    if let Some(c) = sst_allowed_trivial_move_max_count {
153        configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c))
154    }
155    if let Some(c) = emergency_level0_sst_file_count {
156        configs.push(MutableConfig::EmergencyLevel0SstFileCount(c))
157    }
158    if let Some(c) = emergency_level0_sub_level_partition {
159        configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
160    }
161    if let Some(c) = level0_stop_write_threshold_max_sst_count {
162        configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
163    }
164    if let Some(c) = level0_stop_write_threshold_max_size {
165        configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
166    }
167    if let Some(c) = enable_optimize_l0_interval_selection {
168        configs.push(MutableConfig::EnableOptimizeL0IntervalSelection(c))
169    }
170    if let Some(c) = blocked_xor_filter_kv_count_threshold {
171        configs.push(MutableConfig::MaxKvCountForXor16(c))
172    }
173    if let Some(c) = max_vnode_key_range_bytes {
174        configs.push(MutableConfig::MaxVnodeKeyRangeBytes(c))
175    }
176
177    configs
178}
179
180pub async fn split_compaction_group(
181    context: &CtlContext,
182    group_id: CompactionGroupId,
183    table_ids_to_new_group: &[TableId],
184    partition_vnode_count: u32,
185) -> anyhow::Result<()> {
186    let meta_client = context.meta_client().await?;
187    let new_group_id = meta_client
188        .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
189        .await?;
190    println!(
191        "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
192        group_id, table_ids_to_new_group, new_group_id
193    );
194    Ok(())
195}
196
197pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyhow::Result<()> {
198    let meta_client = context.meta_client().await?;
199    let (status, assignment, progress) = meta_client.risectl_list_compaction_status().await?;
200    if !verbose {
201        let mut table = Table::new();
202        table.set_header({
203            let mut row = Row::new();
204            row.add_cell("Compaction Group".into());
205            row.add_cell("Level".into());
206            row.add_cell("Task Count".into());
207            row.add_cell("Tasks".into());
208            row
209        });
210        for s in status {
211            let cg_id = s.compaction_group_id;
212            for l in s.level_handlers {
213                let level = l.level;
214                let mut task_ids = HashSet::new();
215                for t in l.tasks {
216                    task_ids.insert(t.task_id);
217                }
218                let mut row = Row::new();
219                row.add_cell(cg_id.into());
220                row.add_cell(level.into());
221                row.add_cell(task_ids.len().into());
222                row.add_cell(
223                    task_ids
224                        .into_iter()
225                        .sorted()
226                        .map(|t| t.to_string())
227                        .join(",")
228                        .into(),
229                );
230                table.add_row(row);
231            }
232        }
233        println!("{table}");
234
235        let mut table = Table::new();
236        table.set_header({
237            let mut row = Row::new();
238            row.add_cell("Hummock Context".into());
239            row.add_cell("Task Count".into());
240            row.add_cell("Tasks".into());
241            row
242        });
243        let mut assignment_lite: HashMap<HummockContextId, Vec<u64>> = HashMap::new();
244        for a in assignment {
245            assignment_lite
246                .entry(a.context_id)
247                .or_default()
248                .push(a.compact_task.unwrap().task_id);
249        }
250        for (k, v) in assignment_lite {
251            let mut row = Row::new();
252            row.add_cell(k.into());
253            row.add_cell(v.len().into());
254            row.add_cell(
255                v.into_iter()
256                    .sorted()
257                    .map(|t| t.to_string())
258                    .join(",")
259                    .into(),
260            );
261            table.add_row(row);
262        }
263        println!("{table}");
264
265        let mut table = Table::new();
266        table.set_header({
267            let mut row = Row::new();
268            row.add_cell("Task".into());
269            row.add_cell("Num SSTs Sealed".into());
270            row.add_cell("Num SSTs Uploaded".into());
271            row.add_cell("Num Progress Key".into());
272            row.add_cell("Num Pending Read IO".into());
273            row.add_cell("Num Pending Write IO".into());
274            row
275        });
276        for p in progress {
277            let mut row = Row::new();
278            row.add_cell(p.task_id.into());
279            row.add_cell(p.num_ssts_sealed.into());
280            row.add_cell(p.num_ssts_uploaded.into());
281            row.add_cell(p.num_progress_key.into());
282            row.add_cell(p.num_pending_read_io.into());
283            row.add_cell(p.num_pending_write_io.into());
284            table.add_row(row);
285        }
286        println!("{table}");
287    } else {
288        println!("--- LSMtree Status ---");
289        println!("{:#?}", status);
290        println!("--- Task Assignment ---");
291        println!("{:#?}", assignment);
292        println!("--- Task Progress ---");
293        println!("{:#?}", progress);
294    }
295    Ok(())
296}
297
298pub async fn get_compaction_score(
299    context: &CtlContext,
300    id: CompactionGroupId,
301) -> anyhow::Result<()> {
302    let meta_client = context.meta_client().await?;
303    let scores = meta_client.get_compaction_score(id).await?;
304    let mut table = Table::new();
305    table.set_header({
306        let mut row = Row::new();
307        row.add_cell("Select Level".into());
308        row.add_cell("Target Level".into());
309        row.add_cell("Type".into());
310        row.add_cell("Score".into());
311        row
312    });
313    for s in scores.into_iter().sorted_by(|a, b| {
314        a.select_level
315            .cmp(&b.select_level)
316            .then_with(|| a.target_level.cmp(&b.target_level))
317    }) {
318        let mut row = Row::new();
319        row.add_cell(s.select_level.into());
320        row.add_cell(s.target_level.into());
321        row.add_cell(s.picker_type.into());
322        row.add_cell(s.score.into());
323        table.add_row(row);
324    }
325    println!("{table}");
326    Ok(())
327}
328
329pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
330    let meta_client = context.meta_client().await?;
331    let ret = meta_client
332        .cancel_compact_task(task_id, TaskStatus::ManualCanceled)
333        .await?;
334    println!("cancel_compact_task {} ret {:?}", task_id, ret);
335
336    Ok(())
337}
338
339pub async fn merge_compaction_group(
340    context: &CtlContext,
341    left_group_id: CompactionGroupId,
342    right_group_id: CompactionGroupId,
343) -> anyhow::Result<()> {
344    let meta_client = context.meta_client().await?;
345    meta_client
346        .merge_compaction_group(left_group_id, right_group_id)
347        .await?;
348    Ok(())
349}