risingwave_frontend/handler/
alter_sink_props.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_sqlparser::ast::{ObjectName, SqlOption};
17
18use super::{HandlerArgs, RwPgResponse};
19use crate::catalog::root_catalog::SchemaPath;
20use crate::error::{ErrorCode, Result};
21use crate::handler::alter_source_props::ensure_alter_props_not_set_by_connection;
22use crate::utils::resolve_connection_ref_and_secret_ref;
23use crate::{Binder, WithOptions};
24
25pub async fn handle_alter_sink_props(
26    handler_args: HandlerArgs,
27    table_name: ObjectName,
28    changed_props: Vec<SqlOption>,
29) -> Result<RwPgResponse> {
30    let session = handler_args.session;
31    let sink_id = {
32        let db_name = &session.database();
33        let (schema_name, real_table_name) =
34            Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
35        let search_path = session.config().search_path();
36        let user_name = &session.user_name();
37
38        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
39
40        let reader = session.env().catalog_reader().read_guard();
41        let (sink, schema_name) =
42            reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
43
44        if sink.target_table.is_some() {
45            return Err(ErrorCode::InvalidInputSyntax(
46                "ALTER SINK CONNECTOR is not for SINK INTO TABLE".to_owned(),
47            )
48            .into());
49        }
50        session.check_privilege_for_drop_alter(schema_name, &**sink)?;
51
52        ensure_alter_props_not_set_by_connection(
53            &reader,
54            db_name,
55            sink.connection_id.map(|id| id.connection_id()),
56            &changed_props,
57        )?;
58
59        sink.id.sink_id
60    };
61
62    let meta_client = session.env().meta_client();
63    let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
64        WithOptions::try_from(changed_props.as_ref() as &[SqlOption])?,
65        &session,
66        None,
67    )?;
68    let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
69    if connector_conn_ref.is_some() {
70        return Err(ErrorCode::InvalidInputSyntax(
71            "ALTER SINK does not support CONNECTION".to_owned(),
72        )
73        .into());
74    }
75
76    meta_client
77        .alter_sink_props(
78            sink_id,
79            changed_props,
80            changed_secret_refs,
81            connector_conn_ref, // always None, keep the interface for future extension
82        )
83        .await?;
84
85    Ok(RwPgResponse::empty_result(StatementType::ALTER_SINK))
86}