risingwave_ctl/cmd_impl/hummock/
pause_resume.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_hummock_sdk::HummockEpoch;
16use risingwave_hummock_sdk::version::HummockVersion;
17
18use crate::CtlContext;
19
20pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
21    let meta_client = context.meta_client().await?;
22    let version = meta_client.disable_commit_epoch().await?;
23    println!(
24        "Disabled.\
25        Current version: id {}",
26        version.id,
27    );
28    Ok(())
29}
30
31pub async fn pause_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
32    let meta_client = context.meta_client().await?;
33    meta_client
34        .risectl_pause_hummock_version_checkpoint()
35        .await?;
36    println!("Hummock version checkpoint is paused");
37    Ok(())
38}
39
40pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
41    let meta_client = context.meta_client().await?;
42    meta_client
43        .risectl_resume_hummock_version_checkpoint()
44        .await?;
45    println!("Hummock version checkpoint is resumed");
46    Ok(())
47}
48
49/// For now this function itself doesn't provide useful info.
50/// We can extend it to reveal interested info, e.g. at which hummock version is a user key
51/// added/removed for what reason (row deletion/compaction/etc.).
52pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
53    let meta_client = context.meta_client().await?;
54    let mut base_version = HummockVersion::from_rpc_protobuf(
55        &meta_client
56            .risectl_get_checkpoint_hummock_version()
57            .await?
58            .checkpoint_version
59            .unwrap(),
60    );
61    println!("replay starts");
62    println!("base version {}", base_version.id);
63    let delta_fetch_size = 100;
64    let mut current_delta_id = base_version.next_version_id();
65    loop {
66        let deltas = meta_client
67            .list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
68            .await
69            .unwrap();
70        if deltas.is_empty() {
71            break;
72        }
73        for delta in deltas {
74            if delta.prev_id != base_version.id {
75                eprintln!("missing delta log for version {}", base_version.id);
76                break;
77            }
78            base_version.apply_version_delta(&delta);
79            println!("replayed version {}", base_version.id);
80        }
81        current_delta_id = base_version.next_version_id();
82    }
83    println!("replay ends");
84    Ok(())
85}