risingwave_frontend/handler/
drop_source.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::catalog::ICEBERG_SOURCE_PREFIX;
17use risingwave_sqlparser::ast::ObjectName;
18
19use super::RwPgResponse;
20use super::util::execute_with_long_running_notification;
21use crate::binder::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::error::Result;
24use crate::handler::HandlerArgs;
25
26pub async fn handle_drop_source(
27 handler_args: HandlerArgs,
28 name: ObjectName,
29 if_exists: bool,
30 cascade: bool,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let db_name = &session.database();
34 let (schema_name, source_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
35 let search_path = session.config().search_path();
36 let user_name = &session.user_name();
37
38 if let Some(_source) = session.get_temporary_source(&source_name) {
40 session.drop_temporary_source(&source_name);
41 return Ok(PgResponse::empty_result(StatementType::DROP_SOURCE));
42 }
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let (source, schema_name) = {
47 let catalog_reader = session.env().catalog_reader().read_guard();
48
49 if let Ok((table, _)) =
50 catalog_reader.get_created_table_by_name(db_name, schema_path, &source_name)
51 {
52 return Err(table.bad_drop_error());
53 }
54
55 match catalog_reader.get_source_by_name(db_name, schema_path, &source_name) {
56 Ok((s, schema)) => (s.clone(), schema),
57 Err(e) => {
58 return if if_exists {
59 Ok(RwPgResponse::builder(StatementType::DROP_SOURCE)
60 .notice(format!(
61 "source \"{}\" does not exist, skipping",
62 source_name
63 ))
64 .into())
65 } else {
66 Err(e.into())
67 };
68 }
69 }
70 };
71
72 if source_name.starts_with(ICEBERG_SOURCE_PREFIX) {
73 return Err(crate::error::ErrorCode::NotSupported(
74 "Dropping Iceberg sources is not supported".to_owned(),
75 "Please use DROP TABLE command.".to_owned(),
76 )
77 .into());
78 }
79
80 session.check_privilege_for_drop_alter(schema_name, &*source)?;
81
82 let catalog_writer = session.catalog_writer()?;
83 execute_with_long_running_notification(
84 catalog_writer.drop_source(source.id, cascade),
85 &session,
86 "DROP SOURCE",
87 )
88 .await?;
89
90 Ok(PgResponse::empty_result(StatementType::DROP_SOURCE))
91}