risingwave_frontend/handler/
alter_sink_props.rs1use pgwire::pg_response::StatementType;
16use risingwave_sqlparser::ast::{ObjectName, SqlOption};
17
18use super::{HandlerArgs, RwPgResponse};
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::handler::alter_source_props::ensure_alter_props_not_set_by_connection;
22use crate::utils::resolve_connection_ref_and_secret_ref;
23use crate::{Binder, WithOptions};
24
25pub async fn handle_alter_sink_props(
26 handler_args: HandlerArgs,
27 sink_name: ObjectName,
28 changed_props: Vec<SqlOption>,
29) -> Result<RwPgResponse> {
30 let session = handler_args.session;
31 let user_name = &session.user_name();
32 let sink_id = {
33 let db_name = &session.database();
34 let search_path = session.config().search_path();
35
36 let reader = session.env().catalog_reader().read_guard();
37 let (schema_name, real_table_name) =
38 Binder::resolve_schema_qualified_name(db_name, &sink_name)?;
39 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
40 let (sink, schema_name) =
41 reader.get_created_sink_by_name(db_name, schema_path, &real_table_name)?;
42
43 if sink.target_table.is_some() {
44 return Err(ErrorCode::InvalidInputSyntax(
45 "ALTER SINK CONNECTOR is not for SINK INTO TABLE".to_owned(),
46 )
47 .into());
48 }
49 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
50
51 ensure_alter_props_not_set_by_connection(
52 &reader,
53 db_name,
54 sink.connection_id.map(|id| id.connection_id()),
55 &changed_props,
56 )?;
57
58 sink.id.sink_id
59 };
60
61 let meta_client = session.env().meta_client();
62 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
63 WithOptions::try_from(changed_props.as_ref() as &[SqlOption])?,
64 &session,
65 None,
66 )?;
67 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
68 if connector_conn_ref.is_some() {
69 return Err(ErrorCode::InvalidInputSyntax(
70 "ALTER SINK does not support CONNECTION".to_owned(),
71 )
72 .into());
73 }
74
75 meta_client
76 .alter_sink_props(
77 sink_id,
78 changed_props,
79 changed_secret_refs,
80 connector_conn_ref, )
82 .await?;
83
84 Ok(RwPgResponse::empty_result(StatementType::ALTER_SINK))
85}