risingwave_frontend/handler/
backup.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::types::Fields;
17use risingwave_pb::backup_service::BackupJobStatus;
18use tokio::time::Duration;
19
20use super::{RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::{ErrorCode, Result};
22use crate::handler::HandlerArgs;
23use crate::session::SessionImpl;
24
25pub(super) async fn handle_backup(handler_args: HandlerArgs) -> Result<RwPgResponse> {
26 if !handler_args.session.is_super_user() {
28 return Err(ErrorCode::PermissionDenied(
29 "only superusers can trigger adhoc backup".to_owned(),
30 )
31 .into());
32 }
33 let snapshot_id = do_backup(&handler_args.session).await?;
34 Ok(PgResponse::builder(StatementType::BACKUP)
35 .rows([BackupRow { snapshot_id }])
36 .into())
37}
38
39pub(crate) async fn do_backup(session: &SessionImpl) -> Result<i64> {
40 let client = session.env().meta_client();
41 let job_id = client.backup_meta(None).await?;
42 loop {
43 let (job_status, message) = client.get_backup_job_status(job_id).await?;
44 match job_status {
45 BackupJobStatus::Running => tokio::time::sleep(Duration::from_millis(100)).await,
46 BackupJobStatus::Succeeded => {
47 return i64::try_from(job_id).map_err(|_| {
48 ErrorCode::InternalError(format!("snapshot id {} exceeds i64 range", job_id))
49 .into()
50 });
51 }
52 BackupJobStatus::NotFound => {
53 return Err(ErrorCode::InternalError(format!(
54 "backup job status not found: job {}, {}",
55 job_id, message
56 ))
57 .into());
58 }
59 BackupJobStatus::Failed => {
60 return Err(ErrorCode::InternalError(format!(
61 "backup job failed: job {}, {}",
62 job_id, message
63 ))
64 .into());
65 }
66 BackupJobStatus::Unspecified => {
67 return Err(ErrorCode::InternalError(format!(
68 "backup job status unspecified: job {}, {}",
69 job_id, message
70 ))
71 .into());
72 }
73 }
74 }
75}
76
77#[derive(Fields)]
78struct BackupRow {
79 snapshot_id: i64,
80}