risingwave_frontend/handler/
alter_sink_props.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_sqlparser::ast::{ObjectName, SqlOption};
18
19use super::{HandlerArgs, RwPgResponse};
20use crate::catalog::root_catalog::SchemaPath;
21use crate::error::Result;
22use crate::utils::resolve_connection_ref_and_secret_ref;
23use crate::{Binder, WithOptions};
24
25pub async fn handle_alter_sink_props(
26 handler_args: HandlerArgs,
27 table_name: ObjectName,
28 changed_props: Vec<SqlOption>,
29) -> Result<RwPgResponse> {
30 let session = handler_args.session;
31 let sink_id = {
32 let db_name = &session.database();
33 let (schema_name, real_table_name) =
34 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
35 let search_path = session.config().search_path();
36 let user_name = &session.user_name();
37
38 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
39
40 let reader = session.env().catalog_reader().read_guard();
41 let (sink, schema_name) =
42 reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
43
44 if sink.target_table.is_some() {
45 bail!("ALTER sink config is not for sink into table")
46 }
47 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
48 sink.id.sink_id
49 };
50
51 let meta_client = session.env().meta_client();
52 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
53 WithOptions::try_from(changed_props.as_ref() as &[SqlOption])?,
54 &session,
55 None,
56 )?;
57 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
58 if !changed_secret_refs.is_empty() || connector_conn_ref.is_some() {
59 bail!("ALTER SINK does not support SECRET or CONNECTION now")
60 }
61 meta_client
62 .alter_sink_props(
63 sink_id,
64 changed_props,
65 changed_secret_refs,
66 connector_conn_ref,
67 )
68 .await?;
69
70 Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
71}