risingwave_frontend/handler/
drop_table.rs1use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::catalog::Engine;
18use risingwave_common::util::tokio_util::either::Either;
19use risingwave_connector::sink::iceberg::IcebergConfig;
20use risingwave_connector::source::ConnectorProperties;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22use tracing::warn;
23
24use super::RwPgResponse;
25use crate::binder::Binder;
26use crate::catalog::root_catalog::SchemaPath;
27use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableType};
28use crate::error::Result;
29use crate::handler::HandlerArgs;
30
31pub async fn handle_drop_table(
32 handler_args: HandlerArgs,
33 table_name: ObjectName,
34 if_exists: bool,
35 cascade: bool,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session.clone();
38 let db_name = &session.database();
39 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
40 let search_path = session.config().search_path();
41 let user_name = &session.user_name();
42
43 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
44
45 let (source_id, table_id, engine) = {
46 let reader = session.env().catalog_reader().read_guard();
47 let (table, schema_name) =
48 match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
49 Ok((t, s)) => (t, s),
50 Err(e) => {
51 return if if_exists {
52 Ok(RwPgResponse::builder(StatementType::DROP_TABLE)
53 .notice(format!("table \"{}\" does not exist, skipping", table_name))
54 .into())
55 } else {
56 Err(e.into())
57 };
58 }
59 };
60
61 session.check_privilege_for_drop_alter(schema_name, &**table)?;
62
63 if table.table_type() != TableType::Table {
64 return Err(table.bad_drop_error());
65 }
66 (table.associated_source_id(), table.id(), table.engine)
67 };
68
69 match engine {
70 Engine::Iceberg => {
71 let either = if let Ok(source) = session
72 .env()
73 .catalog_reader()
74 .read_guard()
75 .get_source_by_name(
76 db_name,
77 schema_path,
78 &(ICEBERG_SOURCE_PREFIX.to_owned() + &table_name),
79 )
80 .map(|(source, _)| source.clone())
81 {
82 let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
83 if let ConnectorProperties::Iceberg(iceberg_properties) = config {
84 Some(Either::Left(iceberg_properties))
85 } else {
86 unreachable!("must be iceberg source");
87 }
88 } else if let Ok(sink) = session
89 .env()
90 .catalog_reader()
91 .read_guard()
92 .get_sink_by_name(
93 db_name,
94 schema_path,
95 &(ICEBERG_SINK_PREFIX.to_owned() + &table_name),
96 )
97 .map(|(sink, _)| sink.clone())
98 {
99 let iceberg_config = IcebergConfig::from_btreemap(sink.properties.clone())?;
101 Some(Either::Right(iceberg_config))
102 } else {
103 None
104 };
105
106 crate::handler::drop_sink::handle_drop_sink(
113 handler_args.clone(),
114 ObjectName::from(match schema_name {
115 Some(ref schema) => vec![
116 Ident::from(schema.as_str()),
117 Ident::from((ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str()),
118 ],
119 None => vec![Ident::from(
120 (ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str(),
121 )],
122 }),
123 true,
124 false,
125 )
126 .await?;
127
128 if let Some(either) = either {
129 let (iceberg_catalog, table_id) = match either {
130 Either::Left(iceberg_properties) => {
131 let catalog = iceberg_properties.create_catalog().await?;
132 let table_id = iceberg_properties
133 .common
134 .full_table_name()
135 .context("Unable to parse table name")?;
136 (catalog, table_id)
137 }
138 Either::Right(iceberg_config) => {
139 let catalog = iceberg_config.create_catalog().await?;
140 let table_id = iceberg_config
141 .full_table_name()
142 .context("Unable to parse table name")?;
143 (catalog, table_id)
144 }
145 };
146
147 iceberg_catalog
149 .drop_table(&table_id)
150 .await
151 .context("failed to drop iceberg table")?;
152
153 crate::handler::drop_source::handle_drop_source(
154 handler_args.clone(),
155 ObjectName::from(match schema_name {
156 Some(ref schema) => vec![
157 Ident::from(schema.as_str()),
158 Ident::from((ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str()),
159 ],
160 None => vec![Ident::from(
161 (ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str(),
162 )],
163 }),
164 true,
165 false,
166 )
167 .await?;
168 } else {
169 warn!(
170 "Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog",
171 table_name
172 );
173 }
174 }
175 Engine::Hummock => {}
176 }
177
178 let catalog_writer = session.catalog_writer()?;
179 catalog_writer
180 .drop_table(source_id.map(|id| id.table_id), table_id, cascade)
181 .await?;
182
183 Ok(PgResponse::empty_result(StatementType::DROP_TABLE))
184}
185
186#[cfg(test)]
187mod tests {
188 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
189
190 use crate::catalog::root_catalog::SchemaPath;
191 use crate::test_utils::LocalFrontend;
192
193 #[tokio::test]
194 async fn test_drop_table_handler() {
195 let sql_create_table = "create table t (v1 smallint);";
196 let sql_drop_table = "drop table t;";
197 let frontend = LocalFrontend::new(Default::default()).await;
198 frontend.run_sql(sql_create_table).await.unwrap();
199 frontend.run_sql(sql_drop_table).await.unwrap();
200
201 let session = frontend.session_ref();
202 let catalog_reader = session.env().catalog_reader().read_guard();
203 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
204
205 let source = catalog_reader.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
206 assert!(source.is_err());
207
208 let table =
209 catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
210 assert!(table.is_err());
211 }
212}