risingwave_frontend/handler/
vacuum.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_common::catalog::Engine;
18use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::binder::Binder;
22use crate::error::{ErrorCode, Result, RwError};
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub async fn handle_vacuum(
26    handler_args: HandlerArgs,
27    object_name: ObjectName,
28) -> Result<RwPgResponse> {
29    let session = &handler_args.session;
30    let db_name = &session.database();
31
32    let sink_id = {
33        let (schema_name, real_object_name) =
34            Binder::resolve_schema_qualified_name(db_name, &object_name)?;
35        let catalog_reader = session.env().catalog_reader().read_guard();
36        let search_path = session.config().search_path();
37        let user_name = session.user_name();
38        let schema_path = crate::catalog::root_catalog::SchemaPath::new(
39            schema_name.as_deref(),
40            &search_path,
41            &user_name,
42        );
43
44        if let Ok((table, _)) =
45            catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
46        {
47            if table.engine() == Engine::Iceberg {
48                // For iceberg engine table, get the associated iceberg sink name
49                let sink_name = table.iceberg_sink_name().ok_or_else(|| {
50                    RwError::from(ErrorCode::CatalogError(
51                        format!("No iceberg sink name found for table {}", real_object_name).into(),
52                    ))
53                })?;
54
55                // Find the iceberg sink
56                let (sink, _) = catalog_reader
57                    .get_created_sink_by_name(db_name, schema_path, &sink_name)
58                    .map_err(|_| {
59                        RwError::from(ErrorCode::CatalogError(
60                            format!(
61                                "Iceberg sink {} not found for table {}",
62                                sink_name, real_object_name
63                            )
64                            .into(),
65                        ))
66                    })?;
67
68                sink.id.sink_id()
69            } else {
70                return Err(ErrorCode::InvalidInputSyntax(format!(
71                    "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
72                    real_object_name,
73                    table.engine()
74                ))
75                    .into());
76            }
77        } else if let Ok((sink, _)) =
78            catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
79        {
80            if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
81                if connector_type == "iceberg" {
82                    sink.id.sink_id()
83                } else {
84                    return Err(ErrorCode::InvalidInputSyntax(format!(
85                        "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
86                        real_object_name, connector_type
87                    ))
88                    .into());
89                }
90            } else {
91                return Err(ErrorCode::InvalidInputSyntax(format!(
92                    "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
93                    real_object_name
94                ))
95                    .into());
96            }
97        } else {
98            bail!("object {} not found", real_object_name);
99        }
100    };
101
102    session
103        .env()
104        .meta_client()
105        .compact_iceberg_table(sink_id)
106        .await?;
107    Ok(PgResponse::builder(StatementType::VACUUM).into())
108}