risingwave_frontend/handler/
vacuum.rs1use std::future::Future;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_batch::task::ShutdownToken;
19use risingwave_common::bail;
20use risingwave_common::catalog::Engine;
21use risingwave_connector::sink::CONNECTOR_TYPE_KEY;
22use risingwave_sqlparser::ast::ObjectName;
23
24use crate::binder::Binder;
25use crate::error::{ErrorCode, Result, RwError};
26use crate::handler::{HandlerArgs, RwPgResponse};
27use crate::scheduler::SchedulerError;
28
29async fn await_cancelable<T, E, F>(shutdown_rx: &mut ShutdownToken, future: F) -> Result<T>
30where
31 E: Into<RwError>,
32 F: Future<Output = std::result::Result<T, E>>,
33{
34 tokio::pin!(future);
35
36 tokio::select! {
37 result = &mut future => result.map_err(Into::into),
38 _ = shutdown_rx.cancelled() => {
39 Err(SchedulerError::QueryCancelled("Cancelled by user".to_owned()).into())
40 }
41 }
42}
43
44pub async fn handle_vacuum(
45 handler_args: HandlerArgs,
46 object_name: ObjectName,
47 full: bool,
48) -> Result<RwPgResponse> {
49 let session = &handler_args.session;
50 let db_name = &session.database();
51
52 let sink_id = {
53 let (schema_name, real_object_name) =
54 Binder::resolve_schema_qualified_name(db_name, &object_name)?;
55 let catalog_reader = session.env().catalog_reader().read_guard();
56 let search_path = session.config().search_path();
57 let user_name = session.user_name();
58 let schema_path = crate::catalog::root_catalog::SchemaPath::new(
59 schema_name.as_deref(),
60 &search_path,
61 &user_name,
62 );
63
64 if let Ok((table, _)) =
65 catalog_reader.get_created_table_by_name(db_name, schema_path, &real_object_name)
66 {
67 if table.engine() == Engine::Iceberg {
68 let sink_name = table.iceberg_sink_name().ok_or_else(|| {
70 RwError::from(ErrorCode::CatalogError(
71 format!("No iceberg sink name found for table {}", real_object_name).into(),
72 ))
73 })?;
74
75 let (sink, _) = catalog_reader
77 .get_created_sink_by_name(db_name, schema_path, &sink_name)
78 .map_err(|_| {
79 RwError::from(ErrorCode::CatalogError(
80 format!(
81 "Iceberg sink {} not found for table {}",
82 sink_name, real_object_name
83 )
84 .into(),
85 ))
86 })?;
87
88 sink.id
89 } else {
90 return Err(ErrorCode::InvalidInputSyntax(format!(
91 "VACUUM can only be used on Iceberg engine tables or Iceberg sinks, but table '{}' uses {:?} engine",
92 real_object_name,
93 table.engine()
94 ))
95 .into());
96 }
97 } else if let Ok((sink, _)) =
98 catalog_reader.get_created_sink_by_name(db_name, schema_path, &real_object_name)
99 {
100 if let Some(connector_type) = sink.properties.get(CONNECTOR_TYPE_KEY) {
101 if connector_type == "iceberg" {
102 sink.id
103 } else {
104 return Err(ErrorCode::InvalidInputSyntax(format!(
105 "VACUUM can only be used on Iceberg sinks, but sink '{}' is of type '{}'",
106 real_object_name, connector_type
107 ))
108 .into());
109 }
110 } else {
111 return Err(ErrorCode::InvalidInputSyntax(format!(
112 "VACUUM can only be used on Iceberg sinks, but sink '{}' has no connector type specified",
113 real_object_name
114 ))
115 .into());
116 }
117 } else {
118 bail!("object {} not found", real_object_name);
119 }
120 };
121
122 let mut shutdown_rx = session.reset_cancel_query_flag();
123
124 if full {
125 await_cancelable(
127 &mut shutdown_rx,
128 session.env().meta_client().compact_iceberg_table(sink_id),
129 )
130 .await?;
131 await_cancelable(
132 &mut shutdown_rx,
133 session
134 .env()
135 .meta_client()
136 .expire_iceberg_table_snapshots(sink_id),
137 )
138 .await?;
139 } else {
140 await_cancelable(
142 &mut shutdown_rx,
143 session
144 .env()
145 .meta_client()
146 .expire_iceberg_table_snapshots(sink_id),
147 )
148 .await?;
149 }
150 Ok(PgResponse::builder(StatementType::VACUUM).into())
151}