Skip to main content

risingwave_ctl/cmd_impl/hummock/
trigger_manual_compaction.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockSstableId;
16use risingwave_pb::id::{CompactionGroupId, JobId};
17use risingwave_rpc_client::HummockMetaClient;
18use tokio::time::{Duration, sleep};
19
20use crate::CtlContext;
21
22pub async fn trigger_manual_compaction(
23    context: &CtlContext,
24    compaction_group_id: CompactionGroupId,
25    table_id: JobId,
26    levels: Vec<u32>,
27    target_level: Option<u32>,
28    sst_ids: Vec<HummockSstableId>,
29    exclusive: bool,
30    retry_interval_ms: u64,
31) -> anyhow::Result<()> {
32    let meta_client = context.meta_client().await?;
33    for level in levels {
34        tracing::info!("Triggering manual compaction for level {level}...");
35        loop {
36            let result = meta_client
37                .trigger_manual_compaction(
38                    compaction_group_id,
39                    table_id,
40                    level,
41                    target_level,
42                    sst_ids.clone(),
43                    exclusive,
44                )
45                .await;
46            match &result {
47                Ok(should_retry) if *should_retry => {
48                    if exclusive {
49                        tracing::info!(
50                            "Manual compaction is blocked by ongoing tasks. Sleeping {} ms before retrying.",
51                            retry_interval_ms
52                        );
53                        sleep(Duration::from_millis(retry_interval_ms)).await;
54                        continue;
55                    }
56                    tracing::info!("Level {level}: {:#?}", result);
57                    break;
58                }
59                Ok(_) => {
60                    tracing::info!("Level {level}: {:#?}", result);
61                    break;
62                }
63                Err(_) => {
64                    tracing::info!("Level {level}: {:#?}", result);
65                    break;
66                }
67            }
68        }
69    }
70    Ok(())
71}