risingwave_frontend/handler/
alter_streaming_rate_limit.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::common::ThrottleType as PbThrottleType;
18use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
19use risingwave_sqlparser::ast::ObjectName;
20
21use super::{HandlerArgs, RwPgResponse};
22use crate::catalog::root_catalog::{Catalog, SchemaPath};
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
26use crate::session::SessionImpl;
27use crate::{Binder, TableCatalog};
28
29fn check_table_mismatch<'a>(
30 reader: &'a Catalog,
31 session: &SessionImpl,
32 db_name: &str,
33 schema_path: SchemaPath<'_>,
34 table_name: &str,
35) -> Result<&'a TableCatalog> {
36 let (table, schema_name) =
37 reader.get_created_table_by_name(db_name, schema_path, table_name)?;
38 if table.table_type != TableType::Table {
39 return Err(
40 ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" is not a TABLE",)).into(),
41 );
42 }
43 session.check_privilege_for_drop_alter(schema_name, &**table)?;
44 Ok(table)
45}
46
47pub async fn handle_alter_streaming_rate_limit(
48 handler_args: HandlerArgs,
49 throttle_target: PbThrottleTarget,
50 throttle_type: PbThrottleType,
51 table_name: ObjectName,
52 rate_limit: i32,
53) -> Result<RwPgResponse> {
54 let session = handler_args.clone().session;
55 let db_name = &session.database();
56 let (schema_name, real_table_name) =
57 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
58 let search_path = session.config().search_path();
59 let user_name = &session.user_name();
60
61 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
62
63 let (stmt_type, id) = match (throttle_target, throttle_type) {
64 (PbThrottleTarget::Mv, PbThrottleType::Backfill) => {
65 let reader = session.env().catalog_reader().read_guard();
66 let (table, schema_name) =
67 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
68 if table.table_type != TableType::MaterializedView {
69 return Err(ErrorCode::InvalidInputSyntax(format!(
70 "\"{table_name}\" is not a materialized view",
71 ))
72 .into());
73 }
74 session.check_privilege_for_drop_alter(schema_name, &**table)?;
75 (StatementType::ALTER_MATERIALIZED_VIEW, table.id.as_raw_id())
76 }
77 (PbThrottleTarget::Source, PbThrottleType::Source) => {
78 let reader = session.env().catalog_reader().read_guard();
79 let (source, schema_name) =
80 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
81 session.check_privilege_for_drop_alter(schema_name, &**source)?;
82 (StatementType::ALTER_SOURCE, source.id.as_raw_id())
83 }
84 (PbThrottleTarget::Table, PbThrottleType::Dml) => {
85 let reader = session.env().catalog_reader().read_guard();
86 let table =
87 check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
88 (StatementType::ALTER_TABLE, table.id.as_raw_id())
89 }
90 (PbThrottleTarget::Table, PbThrottleType::Source) => {
91 let reader = session.env().catalog_reader().read_guard();
92 let table =
93 check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
94 let source_id = if let Some(id) = table.associated_source_id {
95 id.as_raw_id()
96 } else {
97 bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
98 };
99 (StatementType::ALTER_TABLE, source_id)
100 }
101 (PbThrottleTarget::Table, PbThrottleType::Backfill) => {
102 let reader = session.env().catalog_reader().read_guard();
103 let table =
104 check_table_mismatch(&reader, &session, db_name, schema_path, &real_table_name)?;
105 if table.cdc_table_type.is_none() {
106 return Err(ErrorCode::InvalidInputSyntax(format!(
107 "\"{table_name}\" is not a CDC table",
108 ))
109 .into());
110 }
111 (StatementType::ALTER_TABLE, table.id.as_raw_id())
112 }
113 (PbThrottleTarget::Sink, PbThrottleType::Sink) => {
114 let reader = session.env().catalog_reader().read_guard();
115 let (sink, schema_name) =
116 reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
117 if sink.target_table.is_some() {
118 bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
119 }
120 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
121 (StatementType::ALTER_SINK, sink.id.as_raw_id())
122 }
123 (PbThrottleTarget::Sink, PbThrottleType::Backfill) => {
124 let reader = session.env().catalog_reader().read_guard();
125 let (sink, schema_name) =
126 reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
127 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
128 (StatementType::ALTER_SINK, sink.id.as_raw_id())
129 }
130 _ => bail!(
131 "Unsupported throttle target: {:?} and throttle type: {:?}",
132 throttle_target,
133 throttle_type
134 ),
135 };
136 execute_with_long_running_notification(
137 handle_alter_streaming_rate_limit_by_id(
138 &session,
139 throttle_target,
140 throttle_type,
141 id,
142 rate_limit,
143 stmt_type,
144 ),
145 &session,
146 "ALTER STREAMING RATE LIMIT",
147 LongRunningNotificationAction::SuggestRecover,
148 )
149 .await
150}
151
152pub async fn handle_alter_streaming_rate_limit_by_id(
153 session: &SessionImpl,
154 throttle_target: PbThrottleTarget,
155 throttle_type: PbThrottleType,
156 id: u32,
157 rate_limit: i32,
158 stmt_type: StatementType,
159) -> Result<RwPgResponse> {
160 let meta_client = session.env().meta_client();
161
162 let rate_limit = if rate_limit < 0 {
163 None
164 } else {
165 Some(rate_limit as u32)
166 };
167
168 meta_client
169 .apply_throttle(throttle_target, throttle_type, id, rate_limit)
170 .await?;
171
172 Ok(PgResponse::empty_result(stmt_type))
173}