risingwave_frontend/handler/
drop_subscription.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_sqlparser::ast::ObjectName;
17
18use super::util::execute_with_long_running_notification;
19use super::{HandlerArgs, RwPgResponse};
20use crate::Binder;
21use crate::catalog::root_catalog::SchemaPath;
22use crate::error::Result;
23
24pub async fn handle_drop_subscription(
25 handler_args: HandlerArgs,
26 subscription_name: ObjectName,
27 if_exists: bool,
28 cascade: bool,
29) -> Result<RwPgResponse> {
30 let session = handler_args.session;
31 let db_name = &session.database();
32 let (schema_name, subscription_name) =
33 Binder::resolve_schema_qualified_name(db_name, &subscription_name)?;
34 let search_path = session.config().search_path();
35 let user_name = &session.user_name();
36 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
37
38 let subscription = {
39 let catalog_reader = session.env().catalog_reader().read_guard();
40 let (subscription, schema_name) =
41 match catalog_reader.get_subscription_by_name(db_name, schema_path, &subscription_name)
42 {
43 Ok((subscription, schema)) => (subscription.clone(), schema),
44 Err(e) => {
45 return if if_exists {
46 Ok(RwPgResponse::builder(StatementType::DROP_SUBSCRIPTION)
47 .notice(format!(
48 "subscription \"{}\" does not exist, skipping",
49 subscription_name
50 ))
51 .into())
52 } else {
53 Err(e.into())
54 };
55 }
56 };
57
58 session.check_privilege_for_drop_alter(schema_name, &*subscription)?;
59
60 subscription
61 };
62
63 let subscription_id = subscription.id;
64
65 let catalog_writer = session.catalog_writer()?;
66 execute_with_long_running_notification(
67 catalog_writer.drop_subscription(subscription_id, cascade),
68 &session,
69 "DROP SUBSCRIPTION",
70 )
71 .await?;
72
73 Ok(PgResponse::empty_result(StatementType::DROP_SUBSCRIPTION))
74}