risingwave_ctl/cmd_impl/meta/
backup_meta.rs
1use std::time::Duration;
16
17use risingwave_common::RW_VERSION;
18use risingwave_pb::backup_service::BackupJobStatus;
19
20use crate::CtlContext;
21
22pub async fn backup_meta(context: &CtlContext, remarks: Option<String>) -> anyhow::Result<()> {
23 let meta_client = context.meta_client().await?;
24 let job_id = meta_client.backup_meta(remarks).await?;
25 loop {
26 let (job_status, message) = meta_client.get_backup_job_status(job_id).await?;
27 match job_status {
28 BackupJobStatus::Running => {
29 tracing::info!("backup job is still running: job {}, {}", job_id, message);
30 tokio::time::sleep(Duration::from_secs(1)).await;
31 }
32 BackupJobStatus::Succeeded => {
33 tracing::info!("backup job succeeded: job {}, {}", job_id, message);
34 tracing::info!("rw version: {}", RW_VERSION);
35 break;
36 }
37 BackupJobStatus::NotFound => {
38 return Err(anyhow::anyhow!(
39 "backup job status not found: job {}, {}",
40 job_id,
41 message
42 ));
43 }
44 BackupJobStatus::Failed => {
45 return Err(anyhow::anyhow!(
46 "backup job failed: job {}, {}",
47 job_id,
48 message
49 ));
50 }
51 _ => unreachable!("unknown backup job status"),
52 }
53 }
54 Ok(())
55}
56
57pub async fn delete_meta_snapshots(
58 context: &CtlContext,
59 snapshot_ids: &[u64],
60) -> anyhow::Result<()> {
61 let meta_client = context.meta_client().await?;
62 meta_client.delete_meta_snapshot(snapshot_ids).await?;
63 tracing::info!("delete meta snapshots succeeded: {:?}", snapshot_ids);
64 Ok(())
65}