risingwave_frontend/handler/
drop_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use pgwire::pg_response::{PgResponse, StatementType};
17use risingwave_common::catalog::Engine;
18use risingwave_common::util::tokio_util::either::Either;
19use risingwave_connector::sink::iceberg::IcebergConfig;
20use risingwave_connector::source::ConnectorProperties;
21use risingwave_sqlparser::ast::{Ident, ObjectName};
22use tracing::warn;
23
24use super::RwPgResponse;
25use crate::binder::Binder;
26use crate::catalog::root_catalog::SchemaPath;
27use crate::catalog::table_catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX, TableType};
28use crate::error::Result;
29use crate::handler::HandlerArgs;
30
31pub async fn handle_drop_table(
32    handler_args: HandlerArgs,
33    table_name: ObjectName,
34    if_exists: bool,
35    cascade: bool,
36) -> Result<RwPgResponse> {
37    let session = handler_args.session.clone();
38    let db_name = &session.database();
39    let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
40    let search_path = session.config().search_path();
41    let user_name = &session.user_name();
42
43    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
44
45    let (source_id, table_id, engine) = {
46        let reader = session.env().catalog_reader().read_guard();
47        let (table, schema_name) =
48            match reader.get_created_table_by_name(db_name, schema_path, &table_name) {
49                Ok((t, s)) => (t, s),
50                Err(e) => {
51                    return if if_exists {
52                        Ok(RwPgResponse::builder(StatementType::DROP_TABLE)
53                            .notice(format!("table \"{}\" does not exist, skipping", table_name))
54                            .into())
55                    } else {
56                        Err(e.into())
57                    };
58                }
59            };
60
61        session.check_privilege_for_drop_alter(schema_name, &**table)?;
62
63        if table.table_type() != TableType::Table {
64            return Err(table.bad_drop_error());
65        }
66        (table.associated_source_id(), table.id(), table.engine)
67    };
68
69    match engine {
70        Engine::Iceberg => {
71            let either = if let Ok(source) = session
72                .env()
73                .catalog_reader()
74                .read_guard()
75                .get_source_by_name(
76                    db_name,
77                    schema_path,
78                    &(ICEBERG_SOURCE_PREFIX.to_owned() + &table_name),
79                )
80                .map(|(source, _)| source.clone())
81            {
82                let config = ConnectorProperties::extract(source.with_properties.clone(), false)?;
83                if let ConnectorProperties::Iceberg(iceberg_properties) = config {
84                    Some(Either::Left(iceberg_properties))
85                } else {
86                    unreachable!("must be iceberg source");
87                }
88            } else if let Ok(sink) = session
89                .env()
90                .catalog_reader()
91                .read_guard()
92                .get_sink_by_name(
93                    db_name,
94                    schema_path,
95                    &(ICEBERG_SINK_PREFIX.to_owned() + &table_name),
96                )
97                .map(|(sink, _)| sink.clone())
98            {
99                // If iceberg source does not exist, use iceberg sink to load iceberg table
100                let iceberg_config = IcebergConfig::from_btreemap(sink.properties.clone())?;
101                Some(Either::Right(iceberg_config))
102            } else {
103                None
104            };
105
106            // TODO(iceberg): make iceberg engine table drop ddl atomic
107            // Drop sink
108            // Drop iceberg table
109            //   - Purge table from warehouse
110            //   - Drop table from catalog
111            // Drop source
112            crate::handler::drop_sink::handle_drop_sink(
113                handler_args.clone(),
114                ObjectName::from(match schema_name {
115                    Some(ref schema) => vec![
116                        Ident::from(schema.as_str()),
117                        Ident::from((ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str()),
118                    ],
119                    None => vec![Ident::from(
120                        (ICEBERG_SINK_PREFIX.to_owned() + &table_name).as_str(),
121                    )],
122                }),
123                true,
124                false,
125            )
126            .await?;
127
128            if let Some(either) = either {
129                let (iceberg_catalog, table_id) = match either {
130                    Either::Left(iceberg_properties) => {
131                        let catalog = iceberg_properties.create_catalog().await?;
132                        let table_id = iceberg_properties
133                            .common
134                            .full_table_name()
135                            .context("Unable to parse table name")?;
136                        (catalog, table_id)
137                    }
138                    Either::Right(iceberg_config) => {
139                        let catalog = iceberg_config.create_catalog().await?;
140                        let table_id = iceberg_config
141                            .full_table_name()
142                            .context("Unable to parse table name")?;
143                        (catalog, table_id)
144                    }
145                };
146
147                // For JNI catalog and storage catalog, drop table will purge the table as well.
148                iceberg_catalog
149                    .drop_table(&table_id)
150                    .await
151                    .context("failed to drop iceberg table")?;
152
153                crate::handler::drop_source::handle_drop_source(
154                    handler_args.clone(),
155                    ObjectName::from(match schema_name {
156                        Some(ref schema) => vec![
157                            Ident::from(schema.as_str()),
158                            Ident::from((ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str()),
159                        ],
160                        None => vec![Ident::from(
161                            (ICEBERG_SOURCE_PREFIX.to_owned() + &table_name).as_str(),
162                        )],
163                    }),
164                    true,
165                    false,
166                )
167                .await?;
168            } else {
169                warn!(
170                    "Table {} with iceberg engine but with no source and sink. It might be created partially. Please check it with iceberg catalog",
171                    table_name
172                );
173            }
174        }
175        Engine::Hummock => {}
176    }
177
178    let catalog_writer = session.catalog_writer()?;
179    catalog_writer
180        .drop_table(source_id.map(|id| id.table_id), table_id, cascade)
181        .await?;
182
183    Ok(PgResponse::empty_result(StatementType::DROP_TABLE))
184}
185
186#[cfg(test)]
187mod tests {
188    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
189
190    use crate::catalog::root_catalog::SchemaPath;
191    use crate::test_utils::LocalFrontend;
192
193    #[tokio::test]
194    async fn test_drop_table_handler() {
195        let sql_create_table = "create table t (v1 smallint);";
196        let sql_drop_table = "drop table t;";
197        let frontend = LocalFrontend::new(Default::default()).await;
198        frontend.run_sql(sql_create_table).await.unwrap();
199        frontend.run_sql(sql_drop_table).await.unwrap();
200
201        let session = frontend.session_ref();
202        let catalog_reader = session.env().catalog_reader().read_guard();
203        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
204
205        let source = catalog_reader.get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
206        assert!(source.is_err());
207
208        let table =
209            catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t");
210        assert!(table.is_err());
211    }
212}