risingwave_frontend/handler/
drop_index.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::StreamJobStatus;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use super::util::execute_with_long_running_notification;
22use crate::binder::Binder;
23use crate::catalog::CatalogError;
24use crate::catalog::root_catalog::SchemaPath;
25use crate::catalog::table_catalog::TableType;
26use crate::error::ErrorCode::PermissionDenied;
27use crate::error::Result;
28use crate::handler::HandlerArgs;
29
30pub async fn handle_drop_index(
31 handler_args: HandlerArgs,
32 index_name: ObjectName,
33 if_exists: bool,
34 cascade: bool,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37 let db_name = &session.database();
38 let (schema_name, index_name) = Binder::resolve_schema_qualified_name(db_name, &index_name)?;
39 let search_path = session.config().search_path();
40 let user_name = &session.user_name();
41 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43 let index = {
44 let reader = session.env().catalog_reader().read_guard();
45 match reader.get_any_index_by_name(db_name, schema_path, &index_name) {
46 Ok((index, _)) => {
47 if !session.is_super_user() && session.user_id() != index.index_table().owner {
48 return Err(PermissionDenied(format!(
49 "must be owner of index \"{}\"",
50 index.name
51 ))
52 .into());
53 }
54
55 index.clone()
56 }
57 Err(err) => {
58 match err {
59 CatalogError::NotFound("index", _) => {
60 }
62 _ => return Err(err.into()),
63 };
64 return match reader.get_created_table_by_name(db_name, schema_path, &index_name) {
65 Ok((table, _)) => match table.table_type() {
66 TableType::Index => unreachable!(),
67 _ => Err(table.bad_drop_error()),
68 },
69 Err(e) => {
70 if if_exists {
71 Ok(RwPgResponse::builder(StatementType::DROP_INDEX)
72 .notice(format!(
73 "index \"{}\" does not exist, skipping",
74 index_name
75 ))
76 .into())
77 } else {
78 match e {
79 CatalogError::NotFound("table", name) => {
80 Err(CatalogError::NotFound("index", name).into())
81 }
82 _ => Err(e.into()),
83 }
84 }
85 }
86 };
87 }
88 }
89 };
90
91 let index_id = index.id;
92
93 if index.index_table().stream_job_status == StreamJobStatus::Creating {
95 let canceled_jobs = session
96 .env()
97 .meta_client()
98 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
99 job_ids: vec![index_id.index_id.into()],
100 }))
101 .await?;
102 tracing::info!(?canceled_jobs, "cancelled creating index job");
103 } else {
104 let catalog_writer = session.catalog_writer()?;
105 execute_with_long_running_notification(
106 catalog_writer.drop_index(index_id, cascade),
107 &session,
108 "DROP INDEX",
109 )
110 .await?;
111 }
112
113 Ok(PgResponse::empty_result(StatementType::DROP_INDEX))
114}
115
116#[cfg(test)]
117mod tests {
118 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
119
120 use crate::catalog::root_catalog::SchemaPath;
121 use crate::test_utils::LocalFrontend;
122
123 #[tokio::test]
124 async fn test_drop_index_handler() {
125 let sql_create_table = "create table t (v1 smallint);";
126 let sql_create_index = "create index idx on t(v1);";
127 let sql_drop_index = "drop index idx;";
128 let frontend = LocalFrontend::new(Default::default()).await;
129 frontend.run_sql(sql_create_table).await.unwrap();
130 frontend.run_sql(sql_create_index).await.unwrap();
131 frontend.run_sql(sql_drop_index).await.unwrap();
132
133 let session = frontend.session_ref();
134 let catalog_reader = session.env().catalog_reader().read_guard();
135 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
136
137 let table =
138 catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "idx");
139 assert!(table.is_err());
140 }
141}