risingwave_ctl/cmd_impl/hummock/
trigger_manual_compaction.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockSstableId;
16use risingwave_pb::id::{CompactionGroupId, JobId};
17use risingwave_rpc_client::HummockMetaClient;
18use tokio::time::{Duration, sleep};
19
20use crate::CtlContext;
21
22pub async fn trigger_manual_compaction(
23    context: &CtlContext,
24    compaction_group_id: CompactionGroupId,
25    table_id: JobId,
26    levels: Vec<u32>,
27    sst_ids: Vec<HummockSstableId>,
28    exclusive: bool,
29    retry_interval_ms: u64,
30) -> anyhow::Result<()> {
31    let meta_client = context.meta_client().await?;
32    for level in levels {
33        tracing::info!("Triggering manual compaction for level {level}...");
34        loop {
35            let result = meta_client
36                .trigger_manual_compaction(
37                    compaction_group_id,
38                    table_id,
39                    level,
40                    sst_ids.clone(),
41                    exclusive,
42                )
43                .await;
44            match &result {
45                Ok(should_retry) if *should_retry => {
46                    if exclusive {
47                        tracing::info!(
48                            "Manual compaction is blocked by ongoing tasks. Sleeping {} ms before retrying.",
49                            retry_interval_ms
50                        );
51                        sleep(Duration::from_millis(retry_interval_ms)).await;
52                        continue;
53                    }
54                    tracing::info!("Level {level}: {:#?}", result);
55                    break;
56                }
57                Ok(_) => {
58                    tracing::info!("Level {level}: {:#?}", result);
59                    break;
60                }
61                Err(_) => {
62                    tracing::info!("Level {level}: {:#?}", result);
63                    break;
64                }
65            }
66        }
67    }
68    Ok(())
69}