risingwave_ctl/cmd_impl/hummock/
trigger_manual_compaction.rs1use risingwave_hummock_sdk::HummockSstableId;
16use risingwave_pb::id::{CompactionGroupId, JobId};
17use risingwave_rpc_client::HummockMetaClient;
18use tokio::time::{Duration, sleep};
19
20use crate::CtlContext;
21
22pub async fn trigger_manual_compaction(
23 context: &CtlContext,
24 compaction_group_id: CompactionGroupId,
25 table_id: JobId,
26 levels: Vec<u32>,
27 target_level: Option<u32>,
28 sst_ids: Vec<HummockSstableId>,
29 exclusive: bool,
30 retry_interval_ms: u64,
31) -> anyhow::Result<()> {
32 let meta_client = context.meta_client().await?;
33 for level in levels {
34 tracing::info!("Triggering manual compaction for level {level}...");
35 loop {
36 let result = meta_client
37 .trigger_manual_compaction(
38 compaction_group_id,
39 table_id,
40 level,
41 target_level,
42 sst_ids.clone(),
43 exclusive,
44 )
45 .await;
46 match &result {
47 Ok(should_retry) if *should_retry => {
48 if exclusive {
49 tracing::info!(
50 "Manual compaction is blocked by ongoing tasks. Sleeping {} ms before retrying.",
51 retry_interval_ms
52 );
53 sleep(Duration::from_millis(retry_interval_ms)).await;
54 continue;
55 }
56 tracing::info!("Level {level}: {:#?}", result);
57 break;
58 }
59 Ok(_) => {
60 tracing::info!("Level {level}: {:#?}", result);
61 break;
62 }
63 Err(_) => {
64 tracing::info!("Level {level}: {:#?}", result);
65 break;
66 }
67 }
68 }
69 }
70 Ok(())
71}