risingwave_frontend/handler/
vacuum.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_common::catalog::Engine;
18use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
19use risingwave_sqlparser::ast::ObjectName;
20
21use crate::binder::Binder;
22use crate::error::{ErrorCode, Result, RwError};
23use crate::handler::{HandlerArgs, RwPgResponse};
24
25pub async fn handle_vacuum(
26 handler_args: HandlerArgs,
27 object_name: ObjectName,
28 full: bool,
29) -> Result<RwPgResponse> {
30 let session = &handler_args.session;
31 let db_name = &session.database();
32
33 let sink_id = {
34 let (schema_name, real_object_name) =
35 Binder::resolve_schema_qualified_name(db_name, &object_name)?;
36 let catalog_reader = session.env().catalog_reader().read_guard();
37 let search_path = session.config().search_path();
38 let user_name = session.user_name();
39 let schema_path = crate::catalog::root_catalog::SchemaPath::new(
40 schema_name.as_deref(),
41 &search_path,
42 &user_name,
43 );
44
45 if let Ok((table, _)) =
46 catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
47 {
48 if table.engine() == Engine::Iceberg {
49 let sink_name = table.iceberg_sink_name().ok_or_else(|| {
51 RwError::from(ErrorCode::CatalogError(
52 format!("No iceberg sink name found for table {}", real_object_name).into(),
53 ))
54 })?;
55
56 let (sink, _) = catalog_reader
58 .get_created_sink_by_name(db_name, schema_path, &sink_name)
59 .map_err(|_| {
60 RwError::from(ErrorCode::CatalogError(
61 format!(
62 "Iceberg sink {} not found for table {}",
63 sink_name, real_object_name
64 )
65 .into(),
66 ))
67 })?;
68
69 sink.id.sink_id()
70 } else {
71 return Err(ErrorCode::InvalidInputSyntax(format!(
72 "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
73 real_object_name,
74 table.engine()
75 ))
76 .into());
77 }
78 } else if let Ok((sink, _)) =
79 catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
80 {
81 if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
82 if connector_type == "iceberg" {
83 sink.id.sink_id()
84 } else {
85 return Err(ErrorCode::InvalidInputSyntax(format!(
86 "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
87 real_object_name, connector_type
88 ))
89 .into());
90 }
91 } else {
92 return Err(ErrorCode::InvalidInputSyntax(format!(
93 "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
94 real_object_name
95 ))
96 .into());
97 }
98 } else {
99 bail!("object {} not found", real_object_name);
100 }
101 };
102
103 if full {
104 session
106 .env()
107 .meta_client()
108 .compact_iceberg_table(sink_id)
109 .await?;
110 session
111 .env()
112 .meta_client()
113 .expire_iceberg_table_snapshots(sink_id)
114 .await?;
115 } else {
116 session
118 .env()
119 .meta_client()
120 .expire_iceberg_table_snapshots(sink_id)
121 .await?;
122 }
123 Ok(PgResponse::builder(StatementType::VACUUM).into())
124}