risingwave_frontend/handler/
vacuum.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_common::catalog::Engine;
18use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::binder::Binder;
22use crate::error::{ErrorCode, Result, RwError};
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub async fn handle_vacuum(
26 handler_args: HandlerArgs,
27 object_name: ObjectName,
28) -> Result<RwPgResponse> {
29 let session = &handler_args.session;
30 let db_name = &session.database();
31
32 let sink_id = {
33 let (schema_name, real_object_name) =
34 Binder::resolve_schema_qualified_name(db_name, &object_name)?;
35 let catalog_reader = session.env().catalog_reader().read_guard();
36 let search_path = session.config().search_path();
37 let user_name = session.user_name();
38 let schema_path = crate::catalog::root_catalog::SchemaPath::new(
39 schema_name.as_deref(),
40 &search_path,
41 &user_name,
42 );
43
44 if let Ok((table, _)) =
45 catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
46 {
47 if table.engine() == Engine::Iceberg {
48 let sink_name = table.iceberg_sink_name().ok_or_else(|| {
50 RwError::from(ErrorCode::CatalogError(
51 format!("No iceberg sink name found for table {}", real_object_name).into(),
52 ))
53 })?;
54
55 let (sink, _) = catalog_reader
57 .get_created_sink_by_name(db_name, schema_path, &sink_name)
58 .map_err(|_| {
59 RwError::from(ErrorCode::CatalogError(
60 format!(
61 "Iceberg sink {} not found for table {}",
62 sink_name, real_object_name
63 )
64 .into(),
65 ))
66 })?;
67
68 sink.id.sink_id()
69 } else {
70 return Err(ErrorCode::InvalidInputSyntax(format!(
71 "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
72 real_object_name,
73 table.engine()
74 ))
75 .into());
76 }
77 } else if let Ok((sink, _)) =
78 catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
79 {
80 if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
81 if connector_type == "iceberg" {
82 sink.id.sink_id()
83 } else {
84 return Err(ErrorCode::InvalidInputSyntax(format!(
85 "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
86 real_object_name, connector_type
87 ))
88 .into());
89 }
90 } else {
91 return Err(ErrorCode::InvalidInputSyntax(format!(
92 "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
93 real_object_name
94 ))
95 .into());
96 }
97 } else {
98 bail!("object {} not found", real_object_name);
99 }
100 };
101
102 session
103 .env()
104 .meta_client()
105 .compact_iceberg_table(sink_id)
106 .await?;
107 Ok(PgResponse::builder(StatementType::VACUUM).into())
108}