risingwave_frontend/handler/
backup.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::types::Fields;
17use risingwave_pb::backup_service::BackupJobStatus;
18use tokio::time::Duration;
19
20use super::{RwPgResponse, RwPgResponseBuilderExt};
21use crate::error::{ErrorCode, Result};
22use crate::handler::HandlerArgs;
23use crate::session::SessionImpl;
24
25pub(super) async fn handle_backup(handler_args: HandlerArgs) -> Result<RwPgResponse> {
26    // Only permit backup for super users.
27    if !handler_args.session.is_super_user() {
28        return Err(ErrorCode::PermissionDenied(
29            "only superusers can trigger adhoc backup".to_owned(),
30        )
31        .into());
32    }
33    let snapshot_id = do_backup(&handler_args.session).await?;
34    Ok(PgResponse::builder(StatementType::BACKUP)
35        .rows([BackupRow { snapshot_id }])
36        .into())
37}
38
39pub(crate) async fn do_backup(session: &SessionImpl) -> Result<i64> {
40    let client = session.env().meta_client();
41    let job_id = client.backup_meta(None).await?;
42    loop {
43        let (job_status, message) = client.get_backup_job_status(job_id).await?;
44        match job_status {
45            BackupJobStatus::Running => tokio::time::sleep(Duration::from_millis(100)).await,
46            BackupJobStatus::Succeeded => {
47                return i64::try_from(job_id).map_err(|_| {
48                    ErrorCode::InternalError(format!("snapshot id {} exceeds i64 range", job_id))
49                        .into()
50                });
51            }
52            BackupJobStatus::NotFound => {
53                return Err(ErrorCode::InternalError(format!(
54                    "backup job status not found: job {}, {}",
55                    job_id, message
56                ))
57                .into());
58            }
59            BackupJobStatus::Failed => {
60                return Err(ErrorCode::InternalError(format!(
61                    "backup job failed: job {}, {}",
62                    job_id, message
63                ))
64                .into());
65            }
66            BackupJobStatus::Unspecified => {
67                return Err(ErrorCode::InternalError(format!(
68                    "backup job status unspecified: job {}, {}",
69                    job_id, message
70                ))
71                .into());
72            }
73        }
74    }
75}
76
77#[derive(Fields)]
78struct BackupRow {
79    snapshot_id: i64,
80}