risingwave_ctl/cmd_impl/hummock/
compaction_group.rs1use std::collections::{HashMap, HashSet};
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_hummock_sdk::compaction_group::StateTableId;
20use risingwave_hummock_sdk::{CompactionGroupId, HummockContextId};
21use risingwave_pb::hummock::compact_task::TaskStatus;
22use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
23use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
24
25use crate::CtlContext;
26
27pub async fn list_compaction_group(context: &CtlContext) -> anyhow::Result<()> {
28 let meta_client = context.meta_client().await?;
29 let result = meta_client.risectl_list_compaction_group().await?;
30 println!("{:#?}", result);
31 Ok(())
32}
33
34pub async fn update_compaction_config(
35 context: &CtlContext,
36 ids: Vec<CompactionGroupId>,
37 configs: Vec<MutableConfig>,
38) -> anyhow::Result<()> {
39 let meta_client = context.meta_client().await?;
40 meta_client
41 .risectl_update_compaction_config(ids.as_slice(), configs.as_slice())
42 .await?;
43 println!(
44 "Succeed: update compaction groups {:#?} with configs {:#?}.",
45 ids, configs
46 );
47 Ok(())
48}
49
50#[allow(clippy::too_many_arguments)]
51pub fn build_compaction_config_vec(
52 max_bytes_for_level_base: Option<u64>,
53 max_bytes_for_level_multiplier: Option<u64>,
54 max_compaction_bytes: Option<u64>,
55 sub_level_max_compaction_bytes: Option<u64>,
56 level0_tier_compact_file_number: Option<u64>,
57 target_file_size_base: Option<u64>,
58 compaction_filter_mask: Option<u32>,
59 max_sub_compaction: Option<u32>,
60 level0_stop_write_threshold_sub_level_number: Option<u64>,
61 level0_sub_level_compact_level_count: Option<u32>,
62 max_space_reclaim_bytes: Option<u64>,
63 level0_max_compact_file_number: Option<u64>,
64 level0_overlapping_sub_level_compact_level_count: Option<u32>,
65 enable_emergency_picker: Option<bool>,
66 tombstone_reclaim_ratio: Option<u32>,
67 compress_algorithm: Option<CompressionAlgorithm>,
68 max_l0_compact_level: Option<u32>,
69 sst_allowed_trivial_move_min_size: Option<u64>,
70 disable_auto_group_scheduling: Option<bool>,
71 max_overlapping_level_size: Option<u64>,
72 sst_allowed_trivial_move_max_count: Option<u32>,
73 emergency_level0_sst_file_count: Option<u32>,
74 emergency_level0_sub_level_partition: Option<u32>,
75 level0_stop_write_threshold_max_sst_count: Option<u32>,
76 level0_stop_write_threshold_max_size: Option<u64>,
77 enable_optimize_l0_interval_selection: Option<bool>,
78) -> Vec<MutableConfig> {
79 let mut configs = vec![];
80 if let Some(c) = max_bytes_for_level_base {
81 configs.push(MutableConfig::MaxBytesForLevelBase(c));
82 }
83 if let Some(c) = max_bytes_for_level_multiplier {
84 configs.push(MutableConfig::MaxBytesForLevelMultiplier(c));
85 }
86 if let Some(c) = max_compaction_bytes {
87 configs.push(MutableConfig::MaxCompactionBytes(c));
88 }
89 if let Some(c) = sub_level_max_compaction_bytes {
90 configs.push(MutableConfig::SubLevelMaxCompactionBytes(c));
91 }
92 if let Some(c) = level0_tier_compact_file_number {
93 configs.push(MutableConfig::Level0TierCompactFileNumber(c));
94 }
95 if let Some(c) = target_file_size_base {
96 configs.push(MutableConfig::TargetFileSizeBase(c));
97 }
98 if let Some(c) = compaction_filter_mask {
99 configs.push(MutableConfig::CompactionFilterMask(c));
100 }
101 if let Some(c) = max_sub_compaction {
102 configs.push(MutableConfig::MaxSubCompaction(c));
103 }
104 if let Some(c) = level0_stop_write_threshold_sub_level_number {
105 configs.push(MutableConfig::Level0StopWriteThresholdSubLevelNumber(c));
106 }
107 if let Some(c) = level0_sub_level_compact_level_count {
108 configs.push(MutableConfig::Level0SubLevelCompactLevelCount(c));
109 }
110 if let Some(c) = max_space_reclaim_bytes {
111 configs.push(MutableConfig::MaxSpaceReclaimBytes(c))
112 }
113 if let Some(c) = level0_max_compact_file_number {
114 configs.push(MutableConfig::Level0MaxCompactFileNumber(c))
115 }
116 if let Some(c) = level0_overlapping_sub_level_compact_level_count {
117 configs.push(MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c))
118 }
119 if let Some(c) = enable_emergency_picker {
120 configs.push(MutableConfig::EnableEmergencyPicker(c))
121 }
122 if let Some(c) = tombstone_reclaim_ratio {
123 configs.push(MutableConfig::TombstoneReclaimRatio(c))
124 }
125 if let Some(c) = compress_algorithm {
126 configs.push(MutableConfig::CompressionAlgorithm(c))
127 }
128 if let Some(c) = max_l0_compact_level {
129 configs.push(MutableConfig::MaxL0CompactLevelCount(c))
130 }
131 if let Some(c) = sst_allowed_trivial_move_min_size {
132 configs.push(MutableConfig::SstAllowedTrivialMoveMinSize(c))
133 }
134 if let Some(c) = disable_auto_group_scheduling {
135 configs.push(MutableConfig::DisableAutoGroupScheduling(c))
136 }
137 if let Some(c) = max_overlapping_level_size {
138 configs.push(MutableConfig::MaxOverlappingLevelSize(c))
139 }
140 if let Some(c) = sst_allowed_trivial_move_max_count {
141 configs.push(MutableConfig::SstAllowedTrivialMoveMaxCount(c))
142 }
143 if let Some(c) = emergency_level0_sst_file_count {
144 configs.push(MutableConfig::EmergencyLevel0SstFileCount(c))
145 }
146 if let Some(c) = emergency_level0_sub_level_partition {
147 configs.push(MutableConfig::EmergencyLevel0SubLevelPartition(c))
148 }
149 if let Some(c) = level0_stop_write_threshold_max_sst_count {
150 configs.push(MutableConfig::Level0StopWriteThresholdMaxSstCount(c))
151 }
152 if let Some(c) = level0_stop_write_threshold_max_size {
153 configs.push(MutableConfig::Level0StopWriteThresholdMaxSize(c))
154 }
155 if let Some(c) = enable_optimize_l0_interval_selection {
156 configs.push(MutableConfig::EnableOptimizeL0IntervalSelection(c))
157 }
158
159 configs
160}
161
162pub async fn split_compaction_group(
163 context: &CtlContext,
164 group_id: CompactionGroupId,
165 table_ids_to_new_group: &[StateTableId],
166 partition_vnode_count: u32,
167) -> anyhow::Result<()> {
168 let meta_client = context.meta_client().await?;
169 let new_group_id = meta_client
170 .split_compaction_group(group_id, table_ids_to_new_group, partition_vnode_count)
171 .await?;
172 println!(
173 "Succeed: split compaction group {}. tables {:#?} are moved to new group {}.",
174 group_id, table_ids_to_new_group, new_group_id
175 );
176 Ok(())
177}
178
179pub async fn list_compaction_status(context: &CtlContext, verbose: bool) -> anyhow::Result<()> {
180 let meta_client = context.meta_client().await?;
181 let (status, assignment, progress) = meta_client.risectl_list_compaction_status().await?;
182 if !verbose {
183 let mut table = Table::new();
184 table.set_header({
185 let mut row = Row::new();
186 row.add_cell("Compaction Group".into());
187 row.add_cell("Level".into());
188 row.add_cell("Task Count".into());
189 row.add_cell("Tasks".into());
190 row
191 });
192 for s in status {
193 let cg_id = s.compaction_group_id;
194 for l in s.level_handlers {
195 let level = l.level;
196 let mut task_ids = HashSet::new();
197 for t in l.tasks {
198 task_ids.insert(t.task_id);
199 }
200 let mut row = Row::new();
201 row.add_cell(cg_id.into());
202 row.add_cell(level.into());
203 row.add_cell(task_ids.len().into());
204 row.add_cell(
205 task_ids
206 .into_iter()
207 .sorted()
208 .map(|t| t.to_string())
209 .join(",")
210 .into(),
211 );
212 table.add_row(row);
213 }
214 }
215 println!("{table}");
216
217 let mut table = Table::new();
218 table.set_header({
219 let mut row = Row::new();
220 row.add_cell("Hummock Context".into());
221 row.add_cell("Task Count".into());
222 row.add_cell("Tasks".into());
223 row
224 });
225 let mut assignment_lite: HashMap<HummockContextId, Vec<u64>> = HashMap::new();
226 for a in assignment {
227 assignment_lite
228 .entry(a.context_id)
229 .or_default()
230 .push(a.compact_task.unwrap().task_id);
231 }
232 for (k, v) in assignment_lite {
233 let mut row = Row::new();
234 row.add_cell(k.into());
235 row.add_cell(v.len().into());
236 row.add_cell(
237 v.into_iter()
238 .sorted()
239 .map(|t| t.to_string())
240 .join(",")
241 .into(),
242 );
243 table.add_row(row);
244 }
245 println!("{table}");
246
247 let mut table = Table::new();
248 table.set_header({
249 let mut row = Row::new();
250 row.add_cell("Task".into());
251 row.add_cell("Num SSTs Sealed".into());
252 row.add_cell("Num SSTs Uploaded".into());
253 row.add_cell("Num Progress Key".into());
254 row.add_cell("Num Pending Read IO".into());
255 row.add_cell("Num Pending Write IO".into());
256 row
257 });
258 for p in progress {
259 let mut row = Row::new();
260 row.add_cell(p.task_id.into());
261 row.add_cell(p.num_ssts_sealed.into());
262 row.add_cell(p.num_ssts_uploaded.into());
263 row.add_cell(p.num_progress_key.into());
264 row.add_cell(p.num_pending_read_io.into());
265 row.add_cell(p.num_pending_write_io.into());
266 table.add_row(row);
267 }
268 println!("{table}");
269 } else {
270 println!("--- LSMtree Status ---");
271 println!("{:#?}", status);
272 println!("--- Task Assignment ---");
273 println!("{:#?}", assignment);
274 println!("--- Task Progress ---");
275 println!("{:#?}", progress);
276 }
277 Ok(())
278}
279
280pub async fn get_compaction_score(
281 context: &CtlContext,
282 id: CompactionGroupId,
283) -> anyhow::Result<()> {
284 let meta_client = context.meta_client().await?;
285 let scores = meta_client.get_compaction_score(id).await?;
286 let mut table = Table::new();
287 table.set_header({
288 let mut row = Row::new();
289 row.add_cell("Select Level".into());
290 row.add_cell("Target Level".into());
291 row.add_cell("Type".into());
292 row.add_cell("Score".into());
293 row
294 });
295 for s in scores.into_iter().sorted_by(|a, b| {
296 a.select_level
297 .cmp(&b.select_level)
298 .then_with(|| a.target_level.cmp(&b.target_level))
299 }) {
300 let mut row = Row::new();
301 row.add_cell(s.select_level.into());
302 row.add_cell(s.target_level.into());
303 row.add_cell(s.picker_type.into());
304 row.add_cell(s.score.into());
305 table.add_row(row);
306 }
307 println!("{table}");
308 Ok(())
309}
310
311pub async fn cancel_compact_task(context: &CtlContext, task_id: u64) -> anyhow::Result<()> {
312 let meta_client = context.meta_client().await?;
313 let ret = meta_client
314 .cancel_compact_task(task_id, TaskStatus::ManualCanceled)
315 .await?;
316 println!("cancel_compact_task {} ret {:?}", task_id, ret);
317
318 Ok(())
319}
320
321pub async fn merge_compaction_group(
322 context: &CtlContext,
323 left_group_id: CompactionGroupId,
324 right_group_id: CompactionGroupId,
325) -> anyhow::Result<()> {
326 let meta_client = context.meta_client().await?;
327 meta_client
328 .merge_compaction_group(left_group_id, right_group_id)
329 .await?;
330 Ok(())
331}