risingwave_frontend/handler/
alter_streaming_rate_limit.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::common::ThrottleType as PbThrottleType;
18use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
19use risingwave_sqlparser::ast::ObjectName;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::catalog::root_catalog::{Catalog, SchemaPath};
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
26use crate::session::SessionImpl;
27use crate::{Binder, TableCatalog};
28
29fn check_table_mismatch<'a>(
30    reader: &'a Catalog,
31    session: &SessionImpl,
32    db_name: &str,
33    schema_path: SchemaPath<'_>,
34    table_name: &str,
35) -> Result<&'a TableCatalog> {
36    let (table, schema_name) =
37        reader.get_created_table_by_name(db_name, schema_path, table_name)?;
38    if table.table_type != TableType::Table {
39        return Err(
40            ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" is not a TABLE",)).into(),
41        );
42    }
43    session.check_privilege_for_drop_alter(schema_name, &**table)?;
44    Ok(table)
45}
46
47pub async fn handle_alter_streaming_rate_limit(
48    handler_args: HandlerArgs,
49    throttle_target: PbThrottleTarget,
50    throttle_type: PbThrottleType,
51    table_name: ObjectName,
52    rate_limit: i32,
53) -> Result<RwPgResponse> {
54    let session = handler_args.clone().session;
55    let db_name = &session.database();
56    let (schema_name, real_table_name) =
57        Binder::resolve_schema_qualified_name(db_name, &table_name)?;
58    let search_path = session.config().search_path();
59    let user_name = &session.user_name();
60
61    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
62
63    let (stmt_type, id) = match (throttle_target, throttle_type) {
64        (PbThrottleTarget::Mv, PbThrottleType::Backfill) => {
65            let reader = session.env().catalog_reader().read_guard();
66            let (table, schema_name) =
67                reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
68            if table.table_type != TableType::MaterializedView {
69                return Err(ErrorCode::InvalidInputSyntax(format!(
70                    "\"{table_name}\" is not a materialized view",
71                ))
72                .into());
73            }
74            session.check_privilege_for_drop_alter(schema_name, &**table)?;
75            (StatementType::ALTER_MATERIALIZED_VIEW, table.id.as_raw_id())
76        }
77        (PbThrottleTarget::Source, PbThrottleType::Source) => {
78            let reader = session.env().catalog_reader().read_guard();
79            let (source, schema_name) =
80                reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
81            session.check_privilege_for_drop_alter(schema_name, &**source)?;
82            (StatementType::ALTER_SOURCE, source.id.as_raw_id())
83        }
84        (PbThrottleTarget::Table, PbThrottleType::Dml) => {
85            let reader = session.env().catalog_reader().read_guard();
86            let table =
87                check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
88            (StatementType::ALTER_TABLE, table.id.as_raw_id())
89        }
90        (PbThrottleTarget::Table, PbThrottleType::Source) => {
91            let reader = session.env().catalog_reader().read_guard();
92            let table =
93                check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
94            let source_id = if let Some(id) = table.associated_source_id {
95                id.as_raw_id()
96            } else {
97                bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
98            };
99            (StatementType::ALTER_TABLE, source_id)
100        }
101        (PbThrottleTarget::Table, PbThrottleType::Backfill) => {
102            let reader = session.env().catalog_reader().read_guard();
103            let table =
104                check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
105            if table.cdc_table_type.is_none() {
106                return Err(ErrorCode::InvalidInputSyntax(format!(
107                    "\"{table_name}\" is not a CDC table",
108                ))
109                .into());
110            }
111            (StatementType::ALTER_TABLE, table.id.as_raw_id())
112        }
113        (PbThrottleTarget::Sink, PbThrottleType::Sink) => {
114            let reader = session.env().catalog_reader().read_guard();
115            let (sink, schema_name) =
116                reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
117            if sink.target_table.is_some() {
118                bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
119            }
120            session.check_privilege_for_drop_alter(schema_name, &**sink)?;
121            (StatementType::ALTER_SINK, sink.id.as_raw_id())
122        }
123        (PbThrottleTarget::Sink, PbThrottleType::Backfill) => {
124            let reader = session.env().catalog_reader().read_guard();
125            let (sink, schema_name) =
126                reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
127            session.check_privilege_for_drop_alter(schema_name, &**sink)?;
128            (StatementType::ALTER_SINK, sink.id.as_raw_id())
129        }
130        _ => bail!(
131            "Unsupported throttle target: {:?} and throttle type: {:?}",
132            throttle_target,
133            throttle_type
134        ),
135    };
136    execute_with_long_running_notification(
137        handle_alter_streaming_rate_limit_by_id(
138            &session,
139            throttle_target,
140            throttle_type,
141            id,
142            rate_limit,
143            stmt_type,
144        ),
145        &session,
146        "ALTER STREAMING RATE LIMIT",
147        LongRunningNotificationAction::SuggestRecover,
148    )
149    .await
150}
151
152pub async fn handle_alter_streaming_rate_limit_by_id(
153    session: &SessionImpl,
154    throttle_target: PbThrottleTarget,
155    throttle_type: PbThrottleType,
156    id: u32,
157    rate_limit: i32,
158    stmt_type: StatementType,
159) -> Result<RwPgResponse> {
160    let meta_client = session.env().meta_client();
161
162    let rate_limit = if rate_limit < 0 {
163        None
164    } else {
165        Some(rate_limit as u32)
166    };
167
168    meta_client
169        .apply_throttle(throttle_target, throttle_type, id, rate_limit)
170        .await?;
171
172    Ok(PgResponse::empty_result(stmt_type))
173}