risingwave_ctl/cmd_impl/hummock/
trigger_manual_compaction.rs1use risingwave_hummock_sdk::HummockSstableId;
16use risingwave_pb::id::{CompactionGroupId, JobId};
17use risingwave_rpc_client::HummockMetaClient;
18use tokio::time::{Duration, sleep};
19
20use crate::CtlContext;
21
22pub async fn trigger_manual_compaction(
23 context: &CtlContext,
24 compaction_group_id: CompactionGroupId,
25 table_id: JobId,
26 levels: Vec<u32>,
27 sst_ids: Vec<HummockSstableId>,
28 exclusive: bool,
29 retry_interval_ms: u64,
30) -> anyhow::Result<()> {
31 let meta_client = context.meta_client().await?;
32 for level in levels {
33 tracing::info!("Triggering manual compaction for level {level}...");
34 loop {
35 let result = meta_client
36 .trigger_manual_compaction(
37 compaction_group_id,
38 table_id,
39 level,
40 sst_ids.clone(),
41 exclusive,
42 )
43 .await;
44 match &result {
45 Ok(should_retry) if *should_retry => {
46 if exclusive {
47 tracing::info!(
48 "Manual compaction is blocked by ongoing tasks. Sleeping {} ms before retrying.",
49 retry_interval_ms
50 );
51 sleep(Duration::from_millis(retry_interval_ms)).await;
52 continue;
53 }
54 tracing::info!("Level {level}: {:#?}", result);
55 break;
56 }
57 Ok(_) => {
58 tracing::info!("Level {level}: {:#?}", result);
59 break;
60 }
61 Err(_) => {
62 tracing::info!("Level {level}: {:#?}", result);
63 break;
64 }
65 }
66 }
67 }
68 Ok(())
69}