risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::session_config::parallelism::ConfigParallelism;
17use risingwave_common::system_param::AdaptiveParallelismStrategy;
18use risingwave_pb::meta::table_parallelism::{
19 AdaptiveParallelism, FixedParallelism, PbParallelism,
20};
21use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
22use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
23use risingwave_sqlparser::keywords::Keyword;
24use thiserror_ext::AsReport;
25
26use super::alter_utils::resolve_streaming_job_id_for_alter_parallelism;
27use super::{HandlerArgs, RwPgResponse};
28use crate::catalog::FragmentId;
29use crate::error::{ErrorCode, Result};
30use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
31
32pub async fn handle_alter_parallelism(
33 handler_args: HandlerArgs,
34 obj_name: ObjectName,
35 parallelism: SetVariableValue,
36 stmt_type: StatementType,
37 deferred: bool,
38) -> Result<RwPgResponse> {
39 let session = handler_args.session;
40
41 let job_id = resolve_streaming_job_id_for_alter_parallelism(
42 &session,
43 obj_name,
44 stmt_type,
45 "parallelism",
46 )?;
47
48 let (target_parallelism, adaptive_parallelism_strategy) =
49 extract_table_parallelism(parallelism)?;
50
51 let mut builder = RwPgResponse::builder(stmt_type);
52
53 let catalog_writer = session.catalog_writer()?;
54 execute_with_long_running_notification(
55 catalog_writer.alter_parallelism(
56 job_id,
57 target_parallelism,
58 adaptive_parallelism_strategy,
59 deferred,
60 ),
61 &session,
62 "ALTER PARALLELISM",
63 LongRunningNotificationAction::SuggestRecover,
64 )
65 .await?;
66
67 if deferred {
68 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
69 }
70
71 Ok(builder.into())
72}
73
74pub async fn handle_alter_backfill_parallelism(
75 handler_args: HandlerArgs,
76 obj_name: ObjectName,
77 parallelism: SetVariableValue,
78 stmt_type: StatementType,
79 deferred: bool,
80) -> Result<RwPgResponse> {
81 let session = handler_args.session;
82
83 let job_id = resolve_streaming_job_id_for_alter_parallelism(
84 &session,
85 obj_name,
86 stmt_type,
87 "backfill_parallelism",
88 )?;
89
90 let (target_parallelism, adaptive_parallelism_strategy) =
91 extract_backfill_parallelism(parallelism)?;
92
93 let mut builder = RwPgResponse::builder(stmt_type);
94
95 let catalog_writer = session.catalog_writer()?;
96 execute_with_long_running_notification(
97 catalog_writer.alter_backfill_parallelism(
98 job_id,
99 target_parallelism,
100 adaptive_parallelism_strategy,
101 deferred,
102 ),
103 &session,
104 "ALTER BACKFILL PARALLELISM",
105 LongRunningNotificationAction::SuggestRecover,
106 )
107 .await?;
108
109 if deferred {
110 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
111 }
112
113 Ok(builder.into())
114}
115
116pub async fn handle_alter_fragment_parallelism(
117 handler_args: HandlerArgs,
118 fragment_ids: Vec<FragmentId>,
119 parallelism: SetVariableValue,
120) -> Result<RwPgResponse> {
121 let session = handler_args.session;
122 let target_parallelism = extract_fragment_parallelism(parallelism)?;
123
124 session
125 .env()
126 .meta_client()
127 .alter_fragment_parallelism(fragment_ids, target_parallelism)
128 .await?;
129
130 Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
131}
132
133fn extract_table_parallelism(
134 parallelism: SetVariableValue,
135) -> Result<(TableParallelism, Option<AdaptiveParallelismStrategy>)> {
136 extract_job_parallelism(parallelism)
137}
138
139fn extract_backfill_parallelism(
140 parallelism: SetVariableValue,
141) -> Result<(
142 Option<TableParallelism>,
143 Option<AdaptiveParallelismStrategy>,
144)> {
145 match parallelism {
146 SetVariableValue::Default => Ok((None, None)),
147 other => {
148 let (parallelism, strategy) = extract_job_parallelism(other)?;
149 Ok((Some(parallelism), strategy))
150 }
151 }
152}
153
154fn extract_job_parallelism(
155 parallelism: SetVariableValue,
156) -> Result<(TableParallelism, Option<AdaptiveParallelismStrategy>)> {
157 let adaptive_parallelism = PbTableParallelism {
158 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
159 };
160
161 let value = parse_single_parallelism_value(parallelism)?;
162 let config_parallelism = value.parse::<ConfigParallelism>().map_err(|e| {
163 ErrorCode::InvalidInputSyntax(format!(
164 "target parallelism must be a valid number, adaptive, bounded(n), or ratio(r): {}",
165 e.as_report()
166 ))
167 })?;
168
169 let result = match config_parallelism {
170 ConfigParallelism::Default | ConfigParallelism::Adaptive => (
171 adaptive_parallelism,
172 Some(AdaptiveParallelismStrategy::Auto),
173 ),
174 ConfigParallelism::Fixed(fixed_parallelism) => (
175 PbTableParallelism {
176 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
177 parallelism: fixed_parallelism.get() as _,
178 })),
179 },
180 None,
181 ),
182 ConfigParallelism::Bounded(_) | ConfigParallelism::Ratio(_) => {
183 (adaptive_parallelism, config_parallelism.adaptive_strategy())
184 }
185 };
186
187 Ok(result)
188}
189
190fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
191 match parallelism {
192 SetVariableValue::Default => Ok(None),
193 other => extract_simple_adaptive_or_fixed_parallelism(other).map(Some),
194 }
195}
196
197fn parse_single_parallelism_value(parallelism: SetVariableValue) -> Result<String> {
198 match parallelism {
199 SetVariableValue::Default => Ok("default".to_owned()),
200 SetVariableValue::Single(value) => Ok(value.to_string_unquoted()),
201 SetVariableValue::List(_) => Err(ErrorCode::InvalidInputSyntax(
202 "target parallelism must be a single value".to_owned(),
203 )
204 .into()),
205 }
206}
207
208fn extract_simple_adaptive_or_fixed_parallelism(
209 parallelism: SetVariableValue,
210) -> Result<TableParallelism> {
211 let adaptive_parallelism = PbTableParallelism {
212 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
213 };
214
215 let target_parallelism = match parallelism {
216 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
217 if ident
218 .real_value()
219 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
220 {
221 adaptive_parallelism
222 }
223
224 SetVariableValue::Default => adaptive_parallelism,
225 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
226 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
227 ErrorCode::InvalidInputSyntax(format!(
228 "target parallelism must be a valid number or adaptive: {}",
229 e.as_report()
230 ))
231 })?;
232
233 if fixed_parallelism == 0 {
234 adaptive_parallelism
235 } else {
236 PbTableParallelism {
237 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
238 parallelism: fixed_parallelism,
239 })),
240 }
241 }
242 }
243
244 _ => {
245 return Err(ErrorCode::InvalidInputSyntax(
246 "target parallelism must be a valid number or adaptive".to_owned(),
247 )
248 .into());
249 }
250 };
251
252 Ok(target_parallelism)
253}
254
255#[cfg(test)]
256mod tests {
257 use risingwave_common::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
258 use risingwave_pb::meta::table_parallelism::{FixedParallelism, PbParallelism};
259 use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
260 use risingwave_sqlparser::ast::{Ident, SetVariableValueSingle};
261
262 use super::*;
263
264 fn fixed_parallelism(parallelism: u32) -> TableParallelism {
265 PbTableParallelism {
266 parallelism: Some(PbParallelism::Fixed(FixedParallelism { parallelism })),
267 }
268 }
269
270 fn adaptive_parallelism() -> TableParallelism {
271 PbTableParallelism {
272 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
273 }
274 }
275
276 #[test]
277 fn test_extract_table_parallelism_fixed() {
278 let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
279 SetVariableValueSingle::Literal(Value::Number("4".into())),
280 ))
281 .unwrap();
282
283 assert_eq!(parallelism, fixed_parallelism(4));
284 assert_eq!(strategy, None);
285 }
286
287 #[test]
288 fn test_extract_table_parallelism_adaptive_variants() {
289 let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Default).unwrap();
290 assert_eq!(parallelism, adaptive_parallelism());
291 assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Auto));
292
293 let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
294 SetVariableValueSingle::Ident(Ident::new_unchecked("adaptive")),
295 ))
296 .unwrap();
297 assert_eq!(parallelism, adaptive_parallelism());
298 assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Auto));
299
300 let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
301 SetVariableValueSingle::Raw("bounded(4)".to_owned()),
302 ))
303 .unwrap();
304 assert_eq!(parallelism, adaptive_parallelism());
305 assert_eq!(
306 strategy,
307 Some(AdaptiveParallelismStrategy::Bounded(4.try_into().unwrap()))
308 );
309
310 let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
311 SetVariableValueSingle::Raw("ratio(0.5)".to_owned()),
312 ))
313 .unwrap();
314 assert_eq!(parallelism, adaptive_parallelism());
315 assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Ratio(0.5)));
316 }
317
318 #[test]
319 fn test_extract_fragment_parallelism_does_not_support_bounded_ratio() {
320 assert_eq!(
321 extract_fragment_parallelism(SetVariableValue::Default).unwrap(),
322 None
323 );
324 assert_eq!(
325 extract_fragment_parallelism(SetVariableValue::Single(
326 SetVariableValueSingle::Literal(Value::Number("0".into())),
327 ))
328 .unwrap(),
329 Some(adaptive_parallelism())
330 );
331 assert!(
332 extract_fragment_parallelism(SetVariableValue::Single(SetVariableValueSingle::Raw(
333 "bounded(4)".to_owned()
334 ),))
335 .is_err()
336 );
337 assert!(
338 extract_fragment_parallelism(SetVariableValue::Single(SetVariableValueSingle::Raw(
339 "ratio(0.5)".to_owned()
340 ),))
341 .is_err()
342 );
343 }
344
345 #[test]
346 fn test_extract_backfill_parallelism_adaptive_variants() {
347 let (parallelism, strategy) = extract_backfill_parallelism(SetVariableValue::Single(
348 SetVariableValueSingle::Raw("bounded(4)".to_owned()),
349 ))
350 .unwrap();
351 assert_eq!(parallelism, Some(adaptive_parallelism()));
352 assert_eq!(
353 strategy,
354 Some(AdaptiveParallelismStrategy::Bounded(4.try_into().unwrap()))
355 );
356
357 let (parallelism, strategy) =
358 extract_backfill_parallelism(SetVariableValue::Default).unwrap();
359 assert_eq!(parallelism, None);
360 assert_eq!(strategy, None);
361 }
362
363 #[test]
364 fn test_extract_table_parallelism_rejects_list_values() {
365 assert!(
366 extract_table_parallelism(SetVariableValue::List(vec![
367 SetVariableValueSingle::Literal(Value::Number("1".into())),
368 SetVariableValueSingle::Literal(Value::Number("2".into())),
369 ]))
370 .is_err()
371 );
372 }
373}