risingwave_frontend/handler/
drop_index.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::StreamJobStatus;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::table_catalog::TableType;
25use crate::error::ErrorCode::PermissionDenied;
26use crate::error::Result;
27use crate::handler::HandlerArgs;
28
29pub async fn handle_drop_index(
30 handler_args: HandlerArgs,
31 index_name: ObjectName,
32 if_exists: bool,
33 cascade: bool,
34) -> Result<RwPgResponse> {
35 let session = handler_args.session;
36 let db_name = &session.database();
37 let (schema_name, index_name) = Binder::resolve_schema_qualified_name(db_name, &index_name)?;
38 let search_path = session.config().search_path();
39 let user_name = &session.user_name();
40 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42 let index = {
43 let reader = session.env().catalog_reader().read_guard();
44 match reader.get_any_index_by_name(db_name, schema_path, &index_name) {
45 Ok((index, _)) => {
46 if !session.is_super_user() && session.user_id() != index.index_table().owner {
47 return Err(PermissionDenied(format!(
48 "must be owner of index \"{}\"",
49 index.name
50 ))
51 .into());
52 }
53
54 index.clone()
55 }
56 Err(err) => {
57 match err {
58 CatalogError::NotFound("index", _) => {
59 }
61 _ => return Err(err.into()),
62 };
63 return match reader.get_created_table_by_name(db_name, schema_path, &index_name) {
64 Ok((table, _)) => match table.table_type() {
65 TableType::Index => unreachable!(),
66 _ => Err(table.bad_drop_error()),
67 },
68 Err(e) => {
69 if if_exists {
70 Ok(RwPgResponse::builder(StatementType::DROP_INDEX)
71 .notice(format!(
72 "index \"{}\" does not exist, skipping",
73 index_name
74 ))
75 .into())
76 } else {
77 match e {
78 CatalogError::NotFound("table", name) => {
79 Err(CatalogError::NotFound("index", name).into())
80 }
81 _ => Err(e.into()),
82 }
83 }
84 }
85 };
86 }
87 }
88 };
89
90 let index_id = index.id;
91
92 if index.index_table().stream_job_status == StreamJobStatus::Creating {
94 let canceled_jobs = session
95 .env()
96 .meta_client()
97 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
98 job_ids: vec![index_id.index_id],
99 }))
100 .await?;
101 tracing::info!(?canceled_jobs, "cancelled creating index job");
102 } else {
103 let catalog_writer = session.catalog_writer()?;
104 catalog_writer.drop_index(index_id, cascade).await?;
105 }
106
107 Ok(PgResponse::empty_result(StatementType::DROP_INDEX))
108}
109
110#[cfg(test)]
111mod tests {
112 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
113
114 use crate::catalog::root_catalog::SchemaPath;
115 use crate::test_utils::LocalFrontend;
116
117 #[tokio::test]
118 async fn test_drop_index_handler() {
119 let sql_create_table = "create table t (v1 smallint);";
120 let sql_create_index = "create index idx on t(v1);";
121 let sql_drop_index = "drop index idx;";
122 let frontend = LocalFrontend::new(Default::default()).await;
123 frontend.run_sql(sql_create_table).await.unwrap();
124 frontend.run_sql(sql_create_index).await.unwrap();
125 frontend.run_sql(sql_drop_index).await.unwrap();
126
127 let session = frontend.session_ref();
128 let catalog_reader = session.env().catalog_reader().read_guard();
129 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
130
131 let table =
132 catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "idx");
133 assert!(table.is_err());
134 }
135}