risingwave_frontend/handler/
alter_parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_pb::meta::table_parallelism::{
17    AdaptiveParallelism, FixedParallelism, PbParallelism,
18};
19use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
20use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
21use risingwave_sqlparser::keywords::Keyword;
22use thiserror_ext::AsReport;
23
24use super::alter_utils::resolve_streaming_job_id_for_alter;
25use super::{HandlerArgs, RwPgResponse};
26use crate::catalog::FragmentId;
27use crate::error::{ErrorCode, Result};
28
29pub async fn handle_alter_parallelism(
30    handler_args: HandlerArgs,
31    obj_name: ObjectName,
32    parallelism: SetVariableValue,
33    stmt_type: StatementType,
34    deferred: bool,
35) -> Result<RwPgResponse> {
36    let session = handler_args.session;
37
38    let job_id = resolve_streaming_job_id_for_alter(&session, obj_name, stmt_type, "parallelism")?;
39
40    let target_parallelism = extract_table_parallelism(parallelism)?;
41
42    let mut builder = RwPgResponse::builder(stmt_type);
43
44    let catalog_writer = session.catalog_writer()?;
45    catalog_writer
46        .alter_parallelism(job_id, target_parallelism, deferred)
47        .await?;
48
49    if deferred {
50        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
51    }
52
53    Ok(builder.into())
54}
55
56pub async fn handle_alter_fragment_parallelism(
57    handler_args: HandlerArgs,
58    fragment_ids: Vec<FragmentId>,
59    parallelism: SetVariableValue,
60) -> Result<RwPgResponse> {
61    let session = handler_args.session;
62    let target_parallelism = extract_fragment_parallelism(parallelism)?;
63
64    session
65        .env()
66        .meta_client()
67        .alter_fragment_parallelism(fragment_ids, target_parallelism)
68        .await?;
69
70    Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
71}
72
73fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
74    let adaptive_parallelism = PbTableParallelism {
75        parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
76    };
77
78    // If the target parallelism is set to 0/auto/default, we would consider it as auto parallelism.
79    let target_parallelism = match parallelism {
80        SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
81            if ident
82                .real_value()
83                .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
84        {
85            adaptive_parallelism
86        }
87
88        SetVariableValue::Default => adaptive_parallelism,
89        SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
90            let fixed_parallelism = v.parse::<u32>().map_err(|e| {
91                ErrorCode::InvalidInputSyntax(format!(
92                    "target parallelism must be a valid number or adaptive: {}",
93                    e.as_report()
94                ))
95            })?;
96
97            if fixed_parallelism == 0 {
98                adaptive_parallelism
99            } else {
100                PbTableParallelism {
101                    parallelism: Some(PbParallelism::Fixed(FixedParallelism {
102                        parallelism: fixed_parallelism,
103                    })),
104                }
105            }
106        }
107
108        _ => {
109            return Err(ErrorCode::InvalidInputSyntax(
110                "target parallelism must be a valid number or adaptive".to_owned(),
111            )
112            .into());
113        }
114    };
115
116    Ok(target_parallelism)
117}
118
119fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
120    match parallelism {
121        SetVariableValue::Default => Ok(None),
122        other => extract_table_parallelism(other).map(Some),
123    }
124}