Skip to main content

risingwave_frontend/handler/
vacuum.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_batch::task::ShutdownToken;
19use risingwave_common::bail;
20use risingwave_common::catalog::Engine;
21use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
22use risingwave_sqlparser::ast::ObjectName;
23
24use crate::binder::Binder;
25use crate::error::{ErrorCode, Result, RwError};
26use crate::handler::{HandlerArgs, RwPgResponse};
27use crate::scheduler::SchedulerError;
28
29async fn await_cancelable<T, E, F>(shutdown_rx: &mut ShutdownToken, future: F) -> Result<T>
30where
31    E: Into<RwError>,
32    F: Future<Output = std::result::Result<T, E>>,
33{
34    tokio::pin!(future);
35
36    tokio::select! {
37        result = &mut future => result.map_err(Into::into),
38        _ = shutdown_rx.cancelled() => {
39            Err(SchedulerError::QueryCancelled("Cancelled by user".to_owned()).into())
40        }
41    }
42}
43
44pub async fn handle_vacuum(
45    handler_args: HandlerArgs,
46    object_name: ObjectName,
47    full: bool,
48) -> Result<RwPgResponse> {
49    let session = &handler_args.session;
50    let db_name = &session.database();
51
52    let sink_id = {
53        let (schema_name, real_object_name) =
54            Binder::resolve_schema_qualified_name(db_name, &object_name)?;
55        let catalog_reader = session.env().catalog_reader().read_guard();
56        let search_path = session.config().search_path();
57        let user_name = session.user_name();
58        let schema_path = crate::catalog::root_catalog::SchemaPath::new(
59            schema_name.as_deref(),
60            &search_path,
61            &user_name,
62        );
63
64        if let Ok((table, _)) =
65            catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
66        {
67            if table.engine() == Engine::Iceberg {
68                // For iceberg engine table, get the associated iceberg sink name
69                let sink_name = table.iceberg_sink_name().ok_or_else(|| {
70                    RwError::from(ErrorCode::CatalogError(
71                        format!("No iceberg sink name found for table {}", real_object_name).into(),
72                    ))
73                })?;
74
75                // Find the iceberg sink
76                let (sink, _) = catalog_reader
77                    .get_created_sink_by_name(db_name, schema_path, &sink_name)
78                    .map_err(|_| {
79                        RwError::from(ErrorCode::CatalogError(
80                            format!(
81                                "Iceberg sink {} not found for table {}",
82                                sink_name, real_object_name
83                            )
84                            .into(),
85                        ))
86                    })?;
87
88                sink.id
89            } else {
90                return Err(ErrorCode::InvalidInputSyntax(format!(
91                    "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
92                    real_object_name,
93                    table.engine()
94                ))
95                    .into());
96            }
97        } else if let Ok((sink, _)) =
98            catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
99        {
100            if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
101                if connector_type == "iceberg" {
102                    sink.id
103                } else {
104                    return Err(ErrorCode::InvalidInputSyntax(format!(
105                        "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
106                        real_object_name, connector_type
107                    ))
108                    .into());
109                }
110            } else {
111                return Err(ErrorCode::InvalidInputSyntax(format!(
112                    "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
113                    real_object_name
114                ))
115                    .into());
116            }
117        } else {
118            bail!("object {} not found", real_object_name);
119        }
120    };
121
122    let mut shutdown_rx = session.reset_cancel_query_flag();
123
124    if full {
125        // VACUUM FULL: perform compaction followed by snapshot expiration
126        await_cancelable(
127            &mut shutdown_rx,
128            session.env().meta_client().compact_iceberg_table(sink_id),
129        )
130        .await?;
131        await_cancelable(
132            &mut shutdown_rx,
133            session
134                .env()
135                .meta_client()
136                .expire_iceberg_table_snapshots(sink_id),
137        )
138        .await?;
139    } else {
140        // Regular VACUUM: only expire snapshots
141        await_cancelable(
142            &mut shutdown_rx,
143            session
144                .env()
145                .meta_client()
146                .expire_iceberg_table_snapshots(sink_id),
147        )
148        .await?;
149    }
150    Ok(PgResponse::builder(StatementType::VACUUM).into())
151}