risingwave_ctl/cmd_impl/hummock/
pause_resume.rs1use risingwave_hummock_sdk::HummockEpoch;
16use risingwave_hummock_sdk::version::HummockVersion;
17
18use crate::CtlContext;
19
20pub async fn disable_commit_epoch(context: &CtlContext) -> anyhow::Result<()> {
21 let meta_client = context.meta_client().await?;
22 let version = meta_client.disable_commit_epoch().await?;
23 println!(
24 "Disabled.\
25 Current version: id {}",
26 version.id,
27 );
28 Ok(())
29}
30
31pub async fn pause_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
32 let meta_client = context.meta_client().await?;
33 meta_client
34 .risectl_pause_hummock_version_checkpoint()
35 .await?;
36 println!("Hummock version checkpoint is paused");
37 Ok(())
38}
39
40pub async fn resume_version_checkpoint(context: &CtlContext) -> anyhow::Result<()> {
41 let meta_client = context.meta_client().await?;
42 meta_client
43 .risectl_resume_hummock_version_checkpoint()
44 .await?;
45 println!("Hummock version checkpoint is resumed");
46 Ok(())
47}
48
49pub async fn replay_version(context: &CtlContext) -> anyhow::Result<()> {
53 let meta_client = context.meta_client().await?;
54 let mut base_version = HummockVersion::from_rpc_protobuf(
55 &meta_client
56 .risectl_get_checkpoint_hummock_version()
57 .await?
58 .checkpoint_version
59 .unwrap(),
60 );
61 println!("replay starts");
62 println!("base version {}", base_version.id);
63 let delta_fetch_size = 100;
64 let mut current_delta_id = base_version.next_version_id();
65 loop {
66 let deltas = meta_client
67 .list_version_deltas(current_delta_id, delta_fetch_size, HummockEpoch::MAX)
68 .await
69 .unwrap();
70 if deltas.is_empty() {
71 break;
72 }
73 for delta in deltas {
74 if delta.prev_id != base_version.id {
75 eprintln!("missing delta log for version {}", base_version.id);
76 break;
77 }
78 base_version.apply_version_delta(&delta);
79 println!("replayed version {}", base_version.id);
80 }
81 current_delta_id = base_version.next_version_id();
82 }
83 println!("replay ends");
84 Ok(())
85}