risingwave_frontend/handler/
drop_mv.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::StreamJobStatus;
17use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::CatalogError;
23use crate::catalog::root_catalog::SchemaPath;
24use crate::catalog::table_catalog::TableType;
25use crate::error::Result;
26use crate::handler::HandlerArgs;
27
28pub async fn handle_drop_mv(
29 handler_args: HandlerArgs,
30 table_name: ObjectName,
31 if_exists: bool,
32 cascade: bool,
33) -> Result<RwPgResponse> {
34 let session = handler_args.session;
35 let db_name = &session.database();
36 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, table_name)?;
37 let search_path = session.config().search_path();
38 let user_name = &session.user_name();
39
40 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42 let (table_id, status) = {
43 let reader = session.env().catalog_reader().read_guard();
44 let (table, schema_name) =
45 match reader.get_any_table_by_name(&session.database(), schema_path, &table_name) {
46 Ok((t, s)) => (t, s),
47 Err(e) => {
48 return if if_exists {
49 Ok(RwPgResponse::builder(StatementType::DROP_MATERIALIZED_VIEW)
50 .notice(format!(
51 "materialized view \"{}\" does not exist, skipping",
52 table_name
53 ))
54 .into())
55 } else {
56 match e {
57 CatalogError::NotFound("table", name) => {
58 Err(CatalogError::NotFound("materialized view", name).into())
59 }
60 _ => Err(e.into()),
61 }
62 };
63 }
64 };
65
66 session.check_privilege_for_drop_alter(schema_name, &**table)?;
67
68 match table.table_type() {
69 TableType::MaterializedView => {}
70 _ => return Err(table.bad_drop_error()),
71 }
72
73 (table.id(), table.stream_job_status)
74 };
75
76 match status {
77 StreamJobStatus::Created => {
78 let catalog_writer = session.catalog_writer()?;
79 catalog_writer
80 .drop_materialized_view(table_id, cascade)
81 .await?;
82 }
83 StreamJobStatus::Creating => {
84 let canceled_jobs = session
85 .env()
86 .meta_client()
87 .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds {
88 job_ids: vec![table_id.table_id],
89 }))
90 .await?;
91 tracing::info!(?canceled_jobs, "cancelled creating jobs");
92 }
93 }
94
95 Ok(PgResponse::empty_result(
96 StatementType::DROP_MATERIALIZED_VIEW,
97 ))
98}
99
100#[cfg(test)]
101mod tests {
102 use risingwave_common::catalog::{DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME};
103
104 use crate::catalog::root_catalog::SchemaPath;
105 use crate::test_utils::LocalFrontend;
106
107 #[tokio::test]
108 async fn test_drop_mv_handler() {
109 let sql_create_table = "create table t (v1 smallint);";
110 let sql_create_mv = "create materialized view mv as select v1 from t;";
111 let sql_drop_mv = "drop materialized view mv;";
112 let frontend = LocalFrontend::new(Default::default()).await;
113 frontend.run_sql(sql_create_table).await.unwrap();
114 frontend.run_sql(sql_create_mv).await.unwrap();
115 frontend.run_sql(sql_drop_mv).await.unwrap();
116
117 let session = frontend.session_ref();
118 let catalog_reader = session.env().catalog_reader().read_guard();
119 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
120
121 let table =
122 catalog_reader.get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv");
123 assert!(table.is_err());
124 }
125}