risingwave_frontend/handler/
alter_streaming_rate_limit.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
26use crate::session::SessionImpl;
27
28pub async fn handle_alter_streaming_rate_limit(
29 handler_args: HandlerArgs,
30 kind: PbThrottleTarget,
31 table_name: ObjectName,
32 rate_limit: i32,
33) -> Result<RwPgResponse> {
34 let session = handler_args.clone().session;
35 let db_name = &session.database();
36 let (schema_name, real_table_name) =
37 Binder::resolve_schema_qualified_name(db_name, &table_name)?;
38 let search_path = session.config().search_path();
39 let user_name = &session.user_name();
40
41 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
42
43 let (stmt_type, id) = match kind {
44 PbThrottleTarget::Mv => {
45 let reader = session.env().catalog_reader().read_guard();
46 let (table, schema_name) =
47 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
48 if table.table_type != TableType::MaterializedView {
49 return Err(ErrorCode::InvalidInputSyntax(format!(
50 "\"{table_name}\" is not a materialized view",
51 ))
52 .into());
53 }
54 session.check_privilege_for_drop_alter(schema_name, &**table)?;
55 (StatementType::ALTER_MATERIALIZED_VIEW, table.id.as_raw_id())
56 }
57 PbThrottleTarget::Source => {
58 let reader = session.env().catalog_reader().read_guard();
59 let (source, schema_name) =
60 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
61 session.check_privilege_for_drop_alter(schema_name, &**source)?;
62 (StatementType::ALTER_SOURCE, source.id.as_raw_id())
63 }
64 PbThrottleTarget::TableWithSource => {
65 let reader = session.env().catalog_reader().read_guard();
66 let (table, schema_name) =
67 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
68 session.check_privilege_for_drop_alter(schema_name, &**table)?;
69 let source_id = if let Some(id) = table.associated_source_id {
71 id.as_raw_id()
72 } else {
73 bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
74 };
75 (StatementType::ALTER_SOURCE, source_id)
76 }
77 PbThrottleTarget::CdcTable => {
78 let reader = session.env().catalog_reader().read_guard();
79 let (table, schema_name) =
80 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
81 if table.table_type != TableType::Table {
82 return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into());
83 }
84 session.check_privilege_for_drop_alter(schema_name, &**table)?;
85 (StatementType::ALTER_TABLE, table.id.as_raw_id())
86 }
87 PbThrottleTarget::TableDml => {
88 let reader = session.env().catalog_reader().read_guard();
89 let (table, schema_name) =
90 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
91 if table.table_type != TableType::Table {
92 return Err(ErrorCode::InvalidInputSyntax(format!(
93 "\"{table_name}\" is not a table",
94 ))
95 .into());
96 }
97 session.check_privilege_for_drop_alter(schema_name, &**table)?;
98 (StatementType::ALTER_TABLE, table.id.as_raw_id())
99 }
100 PbThrottleTarget::Sink => {
101 let reader = session.env().catalog_reader().read_guard();
102 let (sink, schema_name) =
103 reader.get_any_sink_by_name(db_name, schema_path, &real_table_name)?;
104 if sink.target_table.is_some() {
105 bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
106 }
107 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
108 (StatementType::ALTER_SINK, sink.id.as_raw_id())
109 }
110 _ => bail!("Unsupported throttle target: {:?}", kind),
111 };
112 execute_with_long_running_notification(
113 handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type),
114 &session,
115 "ALTER STREAMING RATE LIMIT",
116 LongRunningNotificationAction::SuggestRecover,
117 )
118 .await
119}
120
121pub async fn handle_alter_streaming_rate_limit_by_id(
122 session: &SessionImpl,
123 kind: PbThrottleTarget,
124 id: u32,
125 rate_limit: i32,
126 stmt_type: StatementType,
127) -> Result<RwPgResponse> {
128 let meta_client = session.env().meta_client();
129
130 let rate_limit = if rate_limit < 0 {
131 None
132 } else {
133 Some(rate_limit as u32)
134 };
135
136 meta_client.apply_throttle(kind, id, rate_limit).await?;
137
138 Ok(PgResponse::empty_result(stmt_type))
139}