risingwave_frontend/handler/
vacuum.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_common::catalog::Engine;
18use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::binder::Binder;
22use crate::error::{ErrorCode, Result, RwError};
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub async fn handle_vacuum(
26    handler_args: HandlerArgs,
27    object_name: ObjectName,
28    full: bool,
29) -> Result<RwPgResponse> {
30    let session = &handler_args.session;
31    let db_name = &session.database();
32
33    let sink_id = {
34        let (schema_name, real_object_name) =
35            Binder::resolve_schema_qualified_name(db_name, &object_name)?;
36        let catalog_reader = session.env().catalog_reader().read_guard();
37        let search_path = session.config().search_path();
38        let user_name = session.user_name();
39        let schema_path = crate::catalog::root_catalog::SchemaPath::new(
40            schema_name.as_deref(),
41            &search_path,
42            &user_name,
43        );
44
45        if let Ok((table, _)) =
46            catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
47        {
48            if table.engine() == Engine::Iceberg {
49                // For iceberg engine table, get the associated iceberg sink name
50                let sink_name = table.iceberg_sink_name().ok_or_else(|| {
51                    RwError::from(ErrorCode::CatalogError(
52                        format!("No iceberg sink name found for table {}", real_object_name).into(),
53                    ))
54                })?;
55
56                // Find the iceberg sink
57                let (sink, _) = catalog_reader
58                    .get_created_sink_by_name(db_name, schema_path, &sink_name)
59                    .map_err(|_| {
60                        RwError::from(ErrorCode::CatalogError(
61                            format!(
62                                "Iceberg sink {} not found for table {}",
63                                sink_name, real_object_name
64                            )
65                            .into(),
66                        ))
67                    })?;
68
69                sink.id.sink_id()
70            } else {
71                return Err(ErrorCode::InvalidInputSyntax(format!(
72                    "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
73                    real_object_name,
74                    table.engine()
75                ))
76                    .into());
77            }
78        } else if let Ok((sink, _)) =
79            catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
80        {
81            if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
82                if connector_type == "iceberg" {
83                    sink.id.sink_id()
84                } else {
85                    return Err(ErrorCode::InvalidInputSyntax(format!(
86                        "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
87                        real_object_name, connector_type
88                    ))
89                    .into());
90                }
91            } else {
92                return Err(ErrorCode::InvalidInputSyntax(format!(
93                    "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
94                    real_object_name
95                ))
96                    .into());
97            }
98        } else {
99            bail!("object {} not found", real_object_name);
100        }
101    };
102
103    if full {
104        // VACUUM FULL: perform compaction followed by snapshot expiration
105        session
106            .env()
107            .meta_client()
108            .compact_iceberg_table(sink_id)
109            .await?;
110        session
111            .env()
112            .meta_client()
113            .expire_iceberg_table_snapshots(sink_id)
114            .await?;
115    } else {
116        // Regular VACUUM: only expire snapshots
117        session
118            .env()
119            .meta_client()
120            .expire_iceberg_table_snapshots(sink_id)
121            .await?;
122    }
123    Ok(PgResponse::builder(StatementType::VACUUM).into())
124}