risingwave_frontend/handler/
drop_index.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::StreamJobStatus;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use super::util::execute_with_long_running_notification;
22use crate::binder::Binder;
23use crate::catalog::CatalogError;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::ErrorCode::PermissionDenied;
27use crate::error::Result;
28use crate::handler::HandlerArgs;
29
30pub async fn handle_drop_index(
31    handler_args: HandlerArgs,
32    index_name: ObjectName,
33    if_exists: bool,
34    cascade: bool,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37    let db_name = &session.database();
38    let (schema_name, index_name) = Binder::resolve_schema_qualified_name(db_name, &index_name)?;
39    let search_path = session.config().search_path();
40    let user_name = &session.user_name();
41    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43    let index = {
44        let reader = session.env().catalog_reader().read_guard();
45        match reader.get_any_index_by_name(db_name, schema_path, &index_name) {
46            Ok((index, _)) => {
47                if !session.is_super_user() && session.user_id() != index.index_table().owner {
48                    return Err(PermissionDenied(format!(
49                        "must be owner of index \"{}\"",
50                        index.name
51                    ))
52                    .into());
53                }
54
55                index.clone()
56            }
57            Err(err) => {
58                match err {
59                    CatalogError::NotFound("index", _) => {
60                        // index not found, try to find table below to give a better error message
61                    }
62                    _ => return Err(err.into()),
63                };
64                return match reader.get_created_table_by_name(db_name, schema_path, &index_name) {
65                    Ok((table, _)) => match table.table_type() {
66                        TableType::Index => unreachable!(),
67                        _ => Err(table.bad_drop_error()),
68                    },
69                    Err(e) => {
70                        if if_exists {
71                            Ok(RwPgResponse::builder(StatementType::DROP_INDEX)
72                                .notice(format!(
73                                    "index \"{}\" does not exist, skipping",
74                                    index_name
75                                ))
76                                .into())
77                        } else {
78                            match e {
79                                CatalogError::NotFound("table", name) => {
80                                    Err(CatalogError::NotFound("index", name).into())
81                                }
82                                _ => Err(e.into()),
83                            }
84                        }
85                    }
86                };
87            }
88        }
89    };
90
91    let index_id = index.id;
92
93    // If the index is being created, use cancel RPC instead of normal drop
94    if index.index_table().stream_job_status == StreamJobStatus::Creating {
95        let canceled_jobs = session
96            .env()
97            .meta_client()
98            .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
99                job_ids: vec![index_id.index_id.into()],
100            }))
101            .await?;
102        tracing::info!(?canceled_jobs, "cancelled creating index job");
103    } else {
104        let catalog_writer = session.catalog_writer()?;
105        execute_with_long_running_notification(
106            catalog_writer.drop_index(index_id, cascade),
107            &session,
108            "DROP INDEX",
109        )
110        .await?;
111    }
112
113    Ok(PgResponse::empty_result(StatementType::DROP_INDEX))
114}
115
116#[cfg(test)]
117mod tests {
118    use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
119
120    use crate::catalog::root_catalog::SchemaPath;
121    use crate::test_utils::LocalFrontend;
122
123    #[tokio::test]
124    async fn test_drop_index_handler() {
125        let sql_create_table = "create table t (v1 smallint);";
126        let sql_create_index = "create index idx on t(v1);";
127        let sql_drop_index = "drop index idx;";
128        let frontend = LocalFrontend::new(Default::default()).await;
129        frontend.run_sql(sql_create_table).await.unwrap();
130        frontend.run_sql(sql_create_index).await.unwrap();
131        frontend.run_sql(sql_drop_index).await.unwrap();
132
133        let session = frontend.session_ref();
134        let catalog_reader = session.env().catalog_reader().read_guard();
135        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
136
137        let table =
138            catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "idx");
139        assert!(table.is_err());
140    }
141}