risingwave_frontend/handler/
alter_parallelism.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::session_config::parallelism::ConfigParallelism;
17use risingwave_common::system_param::AdaptiveParallelismStrategy;
18use risingwave_pb::meta::table_parallelism::{
19    AdaptiveParallelism, FixedParallelism, PbParallelism,
20};
21use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
22use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
23use risingwave_sqlparser::keywords::Keyword;
24use thiserror_ext::AsReport;
25
26use super::alter_utils::resolve_streaming_job_id_for_alter_parallelism;
27use super::{HandlerArgs, RwPgResponse};
28use crate::catalog::FragmentId;
29use crate::error::{ErrorCode, Result};
30use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
31
32pub async fn handle_alter_parallelism(
33    handler_args: HandlerArgs,
34    obj_name: ObjectName,
35    parallelism: SetVariableValue,
36    stmt_type: StatementType,
37    deferred: bool,
38) -> Result<RwPgResponse> {
39    let session = handler_args.session;
40
41    let job_id = resolve_streaming_job_id_for_alter_parallelism(
42        &session,
43        obj_name,
44        stmt_type,
45        "parallelism",
46    )?;
47
48    let (target_parallelism, adaptive_parallelism_strategy) =
49        extract_table_parallelism(parallelism)?;
50
51    let mut builder = RwPgResponse::builder(stmt_type);
52
53    let catalog_writer = session.catalog_writer()?;
54    execute_with_long_running_notification(
55        catalog_writer.alter_parallelism(
56            job_id,
57            target_parallelism,
58            adaptive_parallelism_strategy,
59            deferred,
60        ),
61        &session,
62        "ALTER PARALLELISM",
63        LongRunningNotificationAction::SuggestRecover,
64    )
65    .await?;
66
67    if deferred {
68        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
69    }
70
71    Ok(builder.into())
72}
73
74pub async fn handle_alter_backfill_parallelism(
75    handler_args: HandlerArgs,
76    obj_name: ObjectName,
77    parallelism: SetVariableValue,
78    stmt_type: StatementType,
79    deferred: bool,
80) -> Result<RwPgResponse> {
81    let session = handler_args.session;
82
83    let job_id = resolve_streaming_job_id_for_alter_parallelism(
84        &session,
85        obj_name,
86        stmt_type,
87        "backfill_parallelism",
88    )?;
89
90    let (target_parallelism, adaptive_parallelism_strategy) =
91        extract_backfill_parallelism(parallelism)?;
92
93    let mut builder = RwPgResponse::builder(stmt_type);
94
95    let catalog_writer = session.catalog_writer()?;
96    execute_with_long_running_notification(
97        catalog_writer.alter_backfill_parallelism(
98            job_id,
99            target_parallelism,
100            adaptive_parallelism_strategy,
101            deferred,
102        ),
103        &session,
104        "ALTER BACKFILL PARALLELISM",
105        LongRunningNotificationAction::SuggestRecover,
106    )
107    .await?;
108
109    if deferred {
110        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
111    }
112
113    Ok(builder.into())
114}
115
116pub async fn handle_alter_fragment_parallelism(
117    handler_args: HandlerArgs,
118    fragment_ids: Vec<FragmentId>,
119    parallelism: SetVariableValue,
120) -> Result<RwPgResponse> {
121    let session = handler_args.session;
122    let target_parallelism = extract_fragment_parallelism(parallelism)?;
123
124    session
125        .env()
126        .meta_client()
127        .alter_fragment_parallelism(fragment_ids, target_parallelism)
128        .await?;
129
130    Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
131}
132
133fn extract_table_parallelism(
134    parallelism: SetVariableValue,
135) -> Result<(TableParallelism, Option<AdaptiveParallelismStrategy>)> {
136    extract_job_parallelism(parallelism)
137}
138
139fn extract_backfill_parallelism(
140    parallelism: SetVariableValue,
141) -> Result<(
142    Option<TableParallelism>,
143    Option<AdaptiveParallelismStrategy>,
144)> {
145    match parallelism {
146        SetVariableValue::Default => Ok((None, None)),
147        other => {
148            let (parallelism, strategy) = extract_job_parallelism(other)?;
149            Ok((Some(parallelism), strategy))
150        }
151    }
152}
153
154fn extract_job_parallelism(
155    parallelism: SetVariableValue,
156) -> Result<(TableParallelism, Option<AdaptiveParallelismStrategy>)> {
157    let adaptive_parallelism = PbTableParallelism {
158        parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
159    };
160
161    let value = parse_single_parallelism_value(parallelism)?;
162    let config_parallelism = value.parse::<ConfigParallelism>().map_err(|e| {
163        ErrorCode::InvalidInputSyntax(format!(
164            "target parallelism must be a valid number, adaptive, bounded(n), or ratio(r): {}",
165            e.as_report()
166        ))
167    })?;
168
169    let result = match config_parallelism {
170        ConfigParallelism::Default | ConfigParallelism::Adaptive => (
171            adaptive_parallelism,
172            Some(AdaptiveParallelismStrategy::Auto),
173        ),
174        ConfigParallelism::Fixed(fixed_parallelism) => (
175            PbTableParallelism {
176                parallelism: Some(PbParallelism::Fixed(FixedParallelism {
177                    parallelism: fixed_parallelism.get() as _,
178                })),
179            },
180            None,
181        ),
182        ConfigParallelism::Bounded(_) | ConfigParallelism::Ratio(_) => {
183            (adaptive_parallelism, config_parallelism.adaptive_strategy())
184        }
185    };
186
187    Ok(result)
188}
189
190fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
191    match parallelism {
192        SetVariableValue::Default => Ok(None),
193        other => extract_simple_adaptive_or_fixed_parallelism(other).map(Some),
194    }
195}
196
197fn parse_single_parallelism_value(parallelism: SetVariableValue) -> Result<String> {
198    match parallelism {
199        SetVariableValue::Default => Ok("default".to_owned()),
200        SetVariableValue::Single(value) => Ok(value.to_string_unquoted()),
201        SetVariableValue::List(_) => Err(ErrorCode::InvalidInputSyntax(
202            "target parallelism must be a single value".to_owned(),
203        )
204        .into()),
205    }
206}
207
208fn extract_simple_adaptive_or_fixed_parallelism(
209    parallelism: SetVariableValue,
210) -> Result<TableParallelism> {
211    let adaptive_parallelism = PbTableParallelism {
212        parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
213    };
214
215    let target_parallelism = match parallelism {
216        SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
217            if ident
218                .real_value()
219                .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
220        {
221            adaptive_parallelism
222        }
223
224        SetVariableValue::Default => adaptive_parallelism,
225        SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
226            let fixed_parallelism = v.parse::<u32>().map_err(|e| {
227                ErrorCode::InvalidInputSyntax(format!(
228                    "target parallelism must be a valid number or adaptive: {}",
229                    e.as_report()
230                ))
231            })?;
232
233            if fixed_parallelism == 0 {
234                adaptive_parallelism
235            } else {
236                PbTableParallelism {
237                    parallelism: Some(PbParallelism::Fixed(FixedParallelism {
238                        parallelism: fixed_parallelism,
239                    })),
240                }
241            }
242        }
243
244        _ => {
245            return Err(ErrorCode::InvalidInputSyntax(
246                "target parallelism must be a valid number or adaptive".to_owned(),
247            )
248            .into());
249        }
250    };
251
252    Ok(target_parallelism)
253}
254
255#[cfg(test)]
256mod tests {
257    use risingwave_common::system_param::adaptive_parallelism_strategy::AdaptiveParallelismStrategy;
258    use risingwave_pb::meta::table_parallelism::{FixedParallelism, PbParallelism};
259    use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
260    use risingwave_sqlparser::ast::{Ident, SetVariableValueSingle};
261
262    use super::*;
263
264    fn fixed_parallelism(parallelism: u32) -> TableParallelism {
265        PbTableParallelism {
266            parallelism: Some(PbParallelism::Fixed(FixedParallelism { parallelism })),
267        }
268    }
269
270    fn adaptive_parallelism() -> TableParallelism {
271        PbTableParallelism {
272            parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
273        }
274    }
275
276    #[test]
277    fn test_extract_table_parallelism_fixed() {
278        let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
279            SetVariableValueSingle::Literal(Value::Number("4".into())),
280        ))
281        .unwrap();
282
283        assert_eq!(parallelism, fixed_parallelism(4));
284        assert_eq!(strategy, None);
285    }
286
287    #[test]
288    fn test_extract_table_parallelism_adaptive_variants() {
289        let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Default).unwrap();
290        assert_eq!(parallelism, adaptive_parallelism());
291        assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Auto));
292
293        let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
294            SetVariableValueSingle::Ident(Ident::new_unchecked("adaptive")),
295        ))
296        .unwrap();
297        assert_eq!(parallelism, adaptive_parallelism());
298        assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Auto));
299
300        let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
301            SetVariableValueSingle::Raw("bounded(4)".to_owned()),
302        ))
303        .unwrap();
304        assert_eq!(parallelism, adaptive_parallelism());
305        assert_eq!(
306            strategy,
307            Some(AdaptiveParallelismStrategy::Bounded(4.try_into().unwrap()))
308        );
309
310        let (parallelism, strategy) = extract_table_parallelism(SetVariableValue::Single(
311            SetVariableValueSingle::Raw("ratio(0.5)".to_owned()),
312        ))
313        .unwrap();
314        assert_eq!(parallelism, adaptive_parallelism());
315        assert_eq!(strategy, Some(AdaptiveParallelismStrategy::Ratio(0.5)));
316    }
317
318    #[test]
319    fn test_extract_fragment_parallelism_does_not_support_bounded_ratio() {
320        assert_eq!(
321            extract_fragment_parallelism(SetVariableValue::Default).unwrap(),
322            None
323        );
324        assert_eq!(
325            extract_fragment_parallelism(SetVariableValue::Single(
326                SetVariableValueSingle::Literal(Value::Number("0".into())),
327            ))
328            .unwrap(),
329            Some(adaptive_parallelism())
330        );
331        assert!(
332            extract_fragment_parallelism(SetVariableValue::Single(SetVariableValueSingle::Raw(
333                "bounded(4)".to_owned()
334            ),))
335            .is_err()
336        );
337        assert!(
338            extract_fragment_parallelism(SetVariableValue::Single(SetVariableValueSingle::Raw(
339                "ratio(0.5)".to_owned()
340            ),))
341            .is_err()
342        );
343    }
344
345    #[test]
346    fn test_extract_backfill_parallelism_adaptive_variants() {
347        let (parallelism, strategy) = extract_backfill_parallelism(SetVariableValue::Single(
348            SetVariableValueSingle::Raw("bounded(4)".to_owned()),
349        ))
350        .unwrap();
351        assert_eq!(parallelism, Some(adaptive_parallelism()));
352        assert_eq!(
353            strategy,
354            Some(AdaptiveParallelismStrategy::Bounded(4.try_into().unwrap()))
355        );
356
357        let (parallelism, strategy) =
358            extract_backfill_parallelism(SetVariableValue::Default).unwrap();
359        assert_eq!(parallelism, None);
360        assert_eq!(strategy, None);
361    }
362
363    #[test]
364    fn test_extract_table_parallelism_rejects_list_values() {
365        assert!(
366            extract_table_parallelism(SetVariableValue::List(vec![
367                SetVariableValueSingle::Literal(Value::Number("1".into())),
368                SetVariableValueSingle::Literal(Value::Number("2".into())),
369            ]))
370            .is_err()
371        );
372    }
373}