risingwave_frontend/handler/
alter_streaming_rate_limit.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::session::SessionImpl;
26
27pub async fn handle_alter_streaming_rate_limit(
28    handler_args: HandlerArgs,
29    kind: PbThrottleTarget,
30    table_name: ObjectName,
31    rate_limit: i32,
32) -> Result<RwPgResponse> {
33    let session = handler_args.session;
34    let db_name = &session.database();
35    let (schema_name, real_table_name) =
36        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
37    let search_path = session.config().search_path();
38    let user_name = &session.user_name();
39
40    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42    let (stmt_type, id) = match kind {
43        PbThrottleTarget::Mv => {
44            let reader = session.env().catalog_reader().read_guard();
45            let (table, schema_name) =
46                reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
47            if table.table_type != TableType::MaterializedView {
48                return Err(ErrorCode::InvalidInputSyntax(format!(
49                    "\"{table_name}\" is not a materialized view",
50                ))
51                .into());
52            }
53            session.check_privilege_for_drop_alter(schema_name, &**table)?;
54            (StatementType::ALTER_MATERIALIZED_VIEW, table.id.table_id)
55        }
56        PbThrottleTarget::Source => {
57            let reader = session.env().catalog_reader().read_guard();
58            let (source, schema_name) =
59                reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
60            session.check_privilege_for_drop_alter(schema_name, &**source)?;
61            (StatementType::ALTER_SOURCE, source.id)
62        }
63        PbThrottleTarget::TableWithSource => {
64            let reader = session.env().catalog_reader().read_guard();
65            let (table, schema_name) =
66                reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
67            session.check_privilege_for_drop_alter(schema_name, &**table)?;
68            // Get the corresponding source catalog.
69            let source_id = if let Some(id) = table.associated_source_id {
70                id.table_id()
71            } else {
72                bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
73            };
74            (StatementType::ALTER_SOURCE, source_id)
75        }
76        PbThrottleTarget::CdcTable => {
77            let reader = session.env().catalog_reader().read_guard();
78            let (table, schema_name) =
79                reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
80            if table.table_type != TableType::Table {
81                return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into());
82            }
83            session.check_privilege_for_drop_alter(schema_name, &**table)?;
84            (StatementType::ALTER_TABLE, table.id.table_id)
85        }
86        PbThrottleTarget::TableDml => {
87            let reader = session.env().catalog_reader().read_guard();
88            let (table, schema_name) =
89                reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
90            if table.table_type != TableType::Table {
91                return Err(ErrorCode::InvalidInputSyntax(format!(
92                    "\"{table_name}\" is not a table",
93                ))
94                .into());
95            }
96            session.check_privilege_for_drop_alter(schema_name, &**table)?;
97            (StatementType::ALTER_TABLE, table.id.table_id)
98        }
99        PbThrottleTarget::Sink => {
100            let reader = session.env().catalog_reader().read_guard();
101            let (table, schema_name) =
102                reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
103            if table.target_table.is_some() {
104                bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
105            }
106            session.check_privilege_for_drop_alter(schema_name, &**table)?;
107            (StatementType::ALTER_SINK, table.id.sink_id)
108        }
109        _ => bail!("Unsupported throttle target: {:?}", kind),
110    };
111    handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type).await
112}
113
114pub async fn handle_alter_streaming_rate_limit_by_id(
115    session: &SessionImpl,
116    kind: PbThrottleTarget,
117    id: u32,
118    rate_limit: i32,
119    stmt_type: StatementType,
120) -> Result<RwPgResponse> {
121    let meta_client = session.env().meta_client();
122
123    let rate_limit = if rate_limit < 0 {
124        None
125    } else {
126        Some(rate_limit as u32)
127    };
128
129    meta_client.apply_throttle(kind, id, rate_limit).await?;
130
131    Ok(PgResponse::empty_result(stmt_type))
132}