risingwave_frontend/handler/
drop_connection.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_pb::catalog::connection::Info as ConnectionInfo;
17use risingwave_pb::catalog::connection_params::ConnectionType;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::RwPgResponse;
21use crate::binder::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::error::{ErrorCode, Result};
24use crate::handler::HandlerArgs;
25
26pub async fn handle_drop_connection(
27 handler_args: HandlerArgs,
28 connection_name: ObjectName,
29 if_exists: bool,
30 cascade: bool,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let db_name = &session.database();
34 let (schema_name, connection_name) =
35 Binder::resolve_schema_qualified_name(db_name, &connection_name)?;
36 let search_path = session.config().search_path();
37 let user_name = &session.user_name();
38
39 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
40
41 let (connection_id, is_iceberg_connection) = {
42 let reader = session.env().catalog_reader().read_guard();
43 let (connection, schema_name) =
44 match reader.get_connection_by_name(db_name, schema_path, connection_name.as_str()) {
45 Ok((c, s)) => (c, s),
46 Err(e) => {
47 return if if_exists {
48 Ok(RwPgResponse::builder(StatementType::DROP_CONNECTION)
49 .notice(format!(
50 "connection \"{}\" does not exist, skipping",
51 connection_name
52 ))
53 .into())
54 } else {
55 Err(e.into())
56 };
57 }
58 };
59 session.check_privilege_for_drop_alter(schema_name, &**connection)?;
60
61 let is_iceberg = match &connection.info {
63 ConnectionInfo::ConnectionParams(params) => {
64 params.connection_type == ConnectionType::Iceberg as i32
65 }
66 _ => false,
67 };
68
69 (connection.id, is_iceberg)
70 };
71
72 if is_iceberg_connection && cascade {
74 return Err(ErrorCode::NotSupported(
75 "DROP CONNECTION CASCADE".to_owned(),
76 "Please drop dependent objects manually before dropping the Iceberg connection, or use DROP CONNECTION without CASCADE".to_owned(),
77 )
78 .into());
79 }
80
81 let catalog_writer = session.catalog_writer()?;
82 catalog_writer
83 .drop_connection(connection_id, cascade)
84 .await?;
85
86 Ok(PgResponse::empty_result(StatementType::DROP_CONNECTION))
87}