risingwave_frontend/handler/
alter_streaming_rate_limit.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::bail;
17use risingwave_pb::meta::ThrottleTarget as PbThrottleTarget;
18use risingwave_sqlparser::ast::ObjectName;
19
20use super::{HandlerArgs, RwPgResponse};
21use crate::Binder;
22use crate::catalog::root_catalog::SchemaPath;
23use crate::catalog::table_catalog::TableType;
24use crate::error::{ErrorCode, Result};
25use crate::session::SessionImpl;
26
27pub async fn handle_alter_streaming_rate_limit(
28 handler_args: HandlerArgs,
29 kind: PbThrottleTarget,
30 table_name: ObjectName,
31 rate_limit: i32,
32) -> Result<RwPgResponse> {
33 let session = handler_args.session;
34 let db_name = &session.database();
35 let (schema_name, real_table_name) =
36 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
37 let search_path = session.config().search_path();
38 let user_name = &session.user_name();
39
40 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
41
42 let (stmt_type, id) = match kind {
43 PbThrottleTarget::Mv => {
44 let reader = session.env().catalog_reader().read_guard();
45 let (table, schema_name) =
46 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
47 if table.table_type != TableType::MaterializedView {
48 return Err(ErrorCode::InvalidInputSyntax(format!(
49 "\"{table_name}\" is not a materialized view",
50 ))
51 .into());
52 }
53 session.check_privilege_for_drop_alter(schema_name, &**table)?;
54 (StatementType::ALTER_MATERIALIZED_VIEW, table.id.table_id)
55 }
56 PbThrottleTarget::Source => {
57 let reader = session.env().catalog_reader().read_guard();
58 let (source, schema_name) =
59 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
60 session.check_privilege_for_drop_alter(schema_name, &**source)?;
61 (StatementType::ALTER_SOURCE, source.id)
62 }
63 PbThrottleTarget::TableWithSource => {
64 let reader = session.env().catalog_reader().read_guard();
65 let (table, schema_name) =
66 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
67 session.check_privilege_for_drop_alter(schema_name, &**table)?;
68 let source_id = if let Some(id) = table.associated_source_id {
70 id.table_id()
71 } else {
72 bail!("ALTER SOURCE_RATE_LIMIT is not for table without source")
73 };
74 (StatementType::ALTER_SOURCE, source_id)
75 }
76 PbThrottleTarget::CdcTable => {
77 let reader = session.env().catalog_reader().read_guard();
78 let (table, schema_name) =
79 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
80 if table.table_type != TableType::Table {
81 return Err(ErrorCode::InvalidInputSyntax(format!("\"{table_name}\" ",)).into());
82 }
83 session.check_privilege_for_drop_alter(schema_name, &**table)?;
84 (StatementType::ALTER_TABLE, table.id.table_id)
85 }
86 PbThrottleTarget::TableDml => {
87 let reader = session.env().catalog_reader().read_guard();
88 let (table, schema_name) =
89 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
90 if table.table_type != TableType::Table {
91 return Err(ErrorCode::InvalidInputSyntax(format!(
92 "\"{table_name}\" is not a table",
93 ))
94 .into());
95 }
96 session.check_privilege_for_drop_alter(schema_name, &**table)?;
97 (StatementType::ALTER_TABLE, table.id.table_id)
98 }
99 PbThrottleTarget::Sink => {
100 let reader = session.env().catalog_reader().read_guard();
101 let (table, schema_name) =
102 reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
103 if table.target_table.is_some() {
104 bail!("ALTER SINK_RATE_LIMIT is not for sink into table")
105 }
106 session.check_privilege_for_drop_alter(schema_name, &**table)?;
107 (StatementType::ALTER_SINK, table.id.sink_id)
108 }
109 _ => bail!("Unsupported throttle target: {:?}", kind),
110 };
111 handle_alter_streaming_rate_limit_by_id(&session, kind, id, rate_limit, stmt_type).await
112}
113
114pub async fn handle_alter_streaming_rate_limit_by_id(
115 session: &SessionImpl,
116 kind: PbThrottleTarget,
117 id: u32,
118 rate_limit: i32,
119 stmt_type: StatementType,
120) -> Result<RwPgResponse> {
121 let meta_client = session.env().meta_client();
122
123 let rate_limit = if rate_limit < 0 {
124 None
125 } else {
126 Some(rate_limit as u32)
127 };
128
129 meta_client.apply_throttle(kind, id, rate_limit).await?;
130
131 Ok(PgResponse::empty_result(stmt_type))
132}