risingwave_frontend/handler/
drop_table.rs1use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::catalog::Engine;
18use risingwave_common::util::tokio_util::either::Either;
19use risingwave_connector::sink::iceberg::IcebergConfig;
20use risingwave_connector::source::ConnectorProperties;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22use tracing::warn;
23
24use super::RwPgResponse;
25use super::util::execute_with_long_running_notification;
26use crate::binder::Binder;
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableType};
29use crate::error::Result;
30use crate::handler::HandlerArgs;
31
32pub async fn handle_drop_table(
33 handler_args: HandlerArgs,
34 table_name: ObjectName,
35 if_exists: bool,
36 cascade: bool,
37) -> Result<RwPgResponse> {
38 let session = handler_args.session.clone();
39 let db_name = &session.database();
40 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, &table_name)?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let (source_id, table_id, engine) = {
47 let reader = session.env().catalog_reader().read_guard();
48 let (table, schema_name) =
49 match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
50 Ok((t, s)) => (t, s),
51 Err(e) => {
52 return if if_exists {
53 Ok(RwPgResponse::builder(StatementType::DROP_TABLE)
54 .notice(format!("table \"{}\" does not exist, skipping", table_name))
55 .into())
56 } else {
57 Err(e.into())
58 };
59 }
60 };
61
62 session.check_privilege_for_drop_alter(schema_name, &**table)?;
63
64 if table.table_type() != TableType::Table {
65 return Err(table.bad_drop_error());
66 }
67 (table.associated_source_id(), table.id(), table.engine)
68 };
69
70 match engine {
71 Engine::Iceberg => {
72 let either = if let Ok(source) = session
73 .env()
74 .catalog_reader()
75 .read_guard()
76 .get_source_by_name(
77 db_name,
78 schema_path,
79 &(ICEBERG_SOURCE_PREFIX.to_owned() + &table_name),
80 )
81 .map(|(source, _)| source.clone())
82 {
83 let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
84 if let ConnectorProperties::Iceberg(iceberg_properties) = config {
85 Some(Either::Left(iceberg_properties))
86 } else {
87 unreachable!("must be iceberg source");
88 }
89 } else if let Ok(sink) = session
90 .env()
91 .catalog_reader()
92 .read_guard()
93 .get_created_sink_by_name(
94 db_name,
95 schema_path,
96 &(ICEBERG_SINK_PREFIX.to_owned() + &table_name),
97 )
98 .map(|(sink, _)| sink.clone())
99 {
100 let iceberg_config = IcebergConfig::from_btreemap(sink.properties.clone())?;
102 Some(Either::Right(iceberg_config))
103 } else {
104 None
105 };
106
107 crate::handler::drop_sink::handle_drop_sink(
114 handler_args.clone(),
115 ObjectName::from(match schema_name {
116 Some(ref schema) => vec![
117 Ident::from(schema.as_str()),
118 Ident::from((ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str()),
119 ],
120 None => vec![Ident::from(
121 (ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str(),
122 )],
123 }),
124 true,
125 false,
126 )
127 .await?;
128
129 if let Some(either) = either {
130 let (iceberg_catalog, table_id) = match either {
131 Either::Left(iceberg_properties) => {
132 let catalog = iceberg_properties.create_catalog().await?;
133 let table_id = iceberg_properties
134 .table
135 .to_table_ident()
136 .context("Unable to parse table name")?;
137 (catalog, table_id)
138 }
139 Either::Right(iceberg_config) => {
140 let catalog = iceberg_config.create_catalog().await?;
141 let table_id = iceberg_config
142 .full_table_name()
143 .context("Unable to parse table name")?;
144 (catalog, table_id)
145 }
146 };
147
148 iceberg_catalog
150 .drop_table(&table_id)
151 .await
152 .context("failed to drop iceberg table")?;
153
154 crate::handler::drop_source::handle_drop_source(
155 handler_args.clone(),
156 ObjectName::from(match schema_name {
157 Some(ref schema) => vec![
158 Ident::from(schema.as_str()),
159 Ident::from((ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str()),
160 ],
161 None => vec![Ident::from(
162 (ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str(),
163 )],
164 }),
165 true,
166 false,
167 )
168 .await?;
169 } else {
170 warn!(
171 "Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog",
172 table_name
173 );
174 }
175 }
176 Engine::Hummock => {}
177 }
178
179 let catalog_writer = session.catalog_writer()?;
180 execute_with_long_running_notification(
181 catalog_writer.drop_table(source_id.map(|id| id.table_id), table_id, cascade),
182 &session,
183 "DROP TABLE",
184 )
185 .await?;
186
187 Ok(PgResponse::empty_result(StatementType::DROP_TABLE))
188}
189
190#[cfg(test)]
191mod tests {
192 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
193
194 use crate::catalog::root_catalog::SchemaPath;
195 use crate::test_utils::LocalFrontend;
196
197 #[tokio::test]
198 async fn test_drop_table_handler() {
199 let sql_create_table = "create table t (v1 smallint);";
200 let sql_drop_table = "drop table t;";
201 let frontend = LocalFrontend::new(Default::default()).await;
202 frontend.run_sql(sql_create_table).await.unwrap();
203 frontend.run_sql(sql_drop_table).await.unwrap();
204
205 let session = frontend.session_ref();
206 let catalog_reader = session.env().catalog_reader().read_guard();
207 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
208
209 let source = catalog_reader.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
210 assert!(source.is_err());
211
212 let table =
213 catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
214 assert!(table.is_err());
215 }
216}