risingwave_frontend/handler/
alter_streaming_rate_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
26use crate::session::SessionImpl;
27
28pub async fn handle_alter_streaming_rate_limit(
29    handler_args: HandlerArgs,
30    kind: PbThrottleTarget,
31    table_name: ObjectName,
32    rate_limit: i32,
33) -> Result<RwPgResponse> {
34    let session = handler_args.clone().session;
35    let db_name = &session.database();
36    let (schema_name, real_table_name) =
37        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
38    let search_path = session.config().search_path();
39    let user_name = &session.user_name();
40
41    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43    let (stmt_type, id) = match kind {
44        PbThrottleTarget::Mv => {
45            let reader = session.env().catalog_reader().read_guard();
46            let (table, schema_name) =
47                reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
48            if table.table_type != TableType::MaterializedView {
49                return Err(ErrorCode::InvalidInputSyntax(format!(
50                    "\"{table_name}\" is not a materialized view",
51                ))
52                .into());
53            }
54            session.check_privilege_for_drop_alter(schema_name, &**table)?;
55            (StatementType::ALTER_MATERIALIZED_VIEW, table.id.as_raw_id())
56        }
57        PbThrottleTarget::Source => {
58            let reader = session.env().catalog_reader().read_guard();
59            let (source, schema_name) =
60                reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
61            session.check_privilege_for_drop_alter(schema_name, &**source)?;
62            (StatementType::ALTER_SOURCE, source.id.as_raw_id())
63        }
64        PbThrottleTarget::TableWithSource => {
65            let reader = session.env().catalog_reader().read_guard();
66            let (table, schema_name) =
67                reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
68            session.check_privilege_for_drop_alter(schema_name, &**table)?;
69            // Get the corresponding source catalog.
70            let source_id = if let Some(id) = table.associated_source_id {
71                id.as_raw_id()
72            } else {
73                bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
74            };
75            (StatementType::ALTER_SOURCE, source_id)
76        }
77        PbThrottleTarget::CdcTable => {
78            let reader = session.env().catalog_reader().read_guard();
79            let (table, schema_name) =
80                reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
81            if table.table_type != TableType::Table {
82                return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into());
83            }
84            session.check_privilege_for_drop_alter(schema_name, &**table)?;
85            (StatementType::ALTER_TABLE, table.id.as_raw_id())
86        }
87        PbThrottleTarget::TableDml => {
88            let reader = session.env().catalog_reader().read_guard();
89            let (table, schema_name) =
90                reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
91            if table.table_type != TableType::Table {
92                return Err(ErrorCode::InvalidInputSyntax(format!(
93                    "\"{table_name}\" is not a table",
94                ))
95                .into());
96            }
97            session.check_privilege_for_drop_alter(schema_name, &**table)?;
98            (StatementType::ALTER_TABLE, table.id.as_raw_id())
99        }
100        PbThrottleTarget::Sink => {
101            let reader = session.env().catalog_reader().read_guard();
102            let (sink, schema_name) =
103                reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
104            if sink.target_table.is_some() {
105                bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
106            }
107            session.check_privilege_for_drop_alter(schema_name, &**sink)?;
108            (StatementType::ALTER_SINK, sink.id.as_raw_id())
109        }
110        _ => bail!("Unsupported throttle target: {:?}", kind),
111    };
112    execute_with_long_running_notification(
113        handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type),
114        &session,
115        "ALTER STREAMING RATE LIMIT",
116        LongRunningNotificationAction::SuggestRecover,
117    )
118    .await
119}
120
121pub async fn handle_alter_streaming_rate_limit_by_id(
122    session: &SessionImpl,
123    kind: PbThrottleTarget,
124    id: u32,
125    rate_limit: i32,
126    stmt_type: StatementType,
127) -> Result<RwPgResponse> {
128    let meta_client = session.env().meta_client();
129
130    let rate_limit = if rate_limit < 0 {
131        None
132    } else {
133        Some(rate_limit as u32)
134    };
135
136    meta_client.apply_throttle(kind, id, rate_limit).await?;
137
138    Ok(PgResponse::empty_result(stmt_type))
139}