risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::bail;
17use risingwave_pb::meta::table_parallelism::{
18 AdaptiveParallelism, FixedParallelism, PbParallelism,
19};
20use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
21use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
22use risingwave_sqlparser::keywords::Keyword;
23use thiserror_ext::AsReport;
24
25use super::{HandlerArgs, RwPgResponse};
26use crate::Binder;
27use crate::catalog::CatalogError;
28use crate::catalog::root_catalog::SchemaPath;
29use crate::catalog::table_catalog::TableType;
30use crate::error::{ErrorCode, Result};
31
32pub async fn handle_alter_parallelism(
33 handler_args: HandlerArgs,
34 obj_name: ObjectName,
35 parallelism: SetVariableValue,
36 stmt_type: StatementType,
37 deferred: bool,
38) -> Result<RwPgResponse> {
39 let session = handler_args.session;
40 let db_name = &session.database();
41 let (schema_name, real_table_name) =
42 Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
43 let search_path = session.config().search_path();
44 let user_name = &session.user_name();
45 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47 let job_id = {
48 let reader = session.env().catalog_reader().read_guard();
49
50 match stmt_type {
51 StatementType::ALTER_TABLE
52 | StatementType::ALTER_MATERIALIZED_VIEW
53 | StatementType::ALTER_INDEX => {
54 let (table, schema_name) =
55 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
56
57 match (table.table_type(), stmt_type) {
58 (TableType::Internal, _) => {
59 return Err(CatalogError::NotFound("table", table.name().to_owned()).into());
61 }
62 (TableType::Table, StatementType::ALTER_TABLE)
63 | (TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW)
64 | (TableType::Index, StatementType::ALTER_INDEX) => {}
65 _ => {
66 return Err(ErrorCode::InvalidInputSyntax(format!(
67 "cannot alter parallelism of {} {} by {}",
68 table.table_type().to_prost().as_str_name(),
69 table.name(),
70 stmt_type,
71 ))
72 .into());
73 }
74 }
75
76 session.check_privilege_for_drop_alter(schema_name, &**table)?;
77 table.id.table_id()
78 }
79 StatementType::ALTER_SOURCE => {
80 let (source, schema_name) =
81 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
82
83 if !source.info.is_shared() {
84 return Err(ErrorCode::InvalidInputSyntax(
85 "cannot alter parallelism of non-shared source.\nUse `ALTER MATERIALIZED VIEW SET PARALLELISM` to alter the materialized view using the source instead."
86 .to_owned()
87 )
88 .into());
89 }
90
91 session.check_privilege_for_drop_alter(schema_name, &**source)?;
92 source.id
93 }
94 StatementType::ALTER_SINK => {
95 let (sink, schema_name) =
96 reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
97
98 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
99 sink.id.sink_id()
100 }
101 _ => bail!(
103 "invalid statement type for alter parallelism: {:?}",
104 stmt_type
105 ),
106 }
107 };
108
109 let target_parallelism = extract_table_parallelism(parallelism)?;
110
111 let mut builder = RwPgResponse::builder(stmt_type);
112
113 let catalog_writer = session.catalog_writer()?;
114 catalog_writer
115 .alter_parallelism(job_id, target_parallelism, deferred)
116 .await?;
117
118 if deferred {
119 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
120 }
121
122 Ok(builder.into())
123}
124
125fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
126 let adaptive_parallelism = PbTableParallelism {
127 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
128 };
129
130 let target_parallelism = match parallelism {
132 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
133 if ident
134 .real_value()
135 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
136 {
137 adaptive_parallelism
138 }
139
140 SetVariableValue::Default => adaptive_parallelism,
141 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
142 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
143 ErrorCode::InvalidInputSyntax(format!(
144 "target parallelism must be a valid number or adaptive: {}",
145 e.as_report()
146 ))
147 })?;
148
149 if fixed_parallelism == 0 {
150 adaptive_parallelism
151 } else {
152 PbTableParallelism {
153 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
154 parallelism: fixed_parallelism,
155 })),
156 }
157 }
158 }
159
160 _ => {
161 return Err(ErrorCode::InvalidInputSyntax(
162 "target parallelism must be a valid number or adaptive".to_owned(),
163 )
164 .into());
165 }
166 };
167
168 Ok(target_parallelism)
169}