risingwave_frontend/handler/
alter_parallelism.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_pb::meta::table_parallelism::{
17    AdaptiveParallelism, FixedParallelism, PbParallelism,
18};
19use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
20use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
21use risingwave_sqlparser::keywords::Keyword;
22use thiserror_ext::AsReport;
23
24use super::alter_utils::resolve_streaming_job_id_for_alter_parallelism;
25use super::{HandlerArgs, RwPgResponse};
26use crate::catalog::FragmentId;
27use crate::error::{ErrorCode, Result};
28use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
29
30pub async fn handle_alter_parallelism(
31    handler_args: HandlerArgs,
32    obj_name: ObjectName,
33    parallelism: SetVariableValue,
34    stmt_type: StatementType,
35    deferred: bool,
36) -> Result<RwPgResponse> {
37    let session = handler_args.session;
38
39    let job_id = resolve_streaming_job_id_for_alter_parallelism(
40        &session,
41        obj_name,
42        stmt_type,
43        "parallelism",
44    )?;
45
46    let target_parallelism = extract_table_parallelism(parallelism)?;
47
48    let mut builder = RwPgResponse::builder(stmt_type);
49
50    let catalog_writer = session.catalog_writer()?;
51    execute_with_long_running_notification(
52        catalog_writer.alter_parallelism(job_id, target_parallelism, deferred),
53        &session,
54        "ALTER PARALLELISM",
55        LongRunningNotificationAction::SuggestRecover,
56    )
57    .await?;
58
59    if deferred {
60        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
61    }
62
63    Ok(builder.into())
64}
65
66pub async fn handle_alter_backfill_parallelism(
67    handler_args: HandlerArgs,
68    obj_name: ObjectName,
69    parallelism: SetVariableValue,
70    stmt_type: StatementType,
71    deferred: bool,
72) -> Result<RwPgResponse> {
73    let session = handler_args.session;
74
75    let job_id = resolve_streaming_job_id_for_alter_parallelism(
76        &session,
77        obj_name,
78        stmt_type,
79        "backfill_parallelism",
80    )?;
81
82    let target_parallelism = extract_backfill_parallelism(parallelism)?;
83
84    let mut builder = RwPgResponse::builder(stmt_type);
85
86    let catalog_writer = session.catalog_writer()?;
87    execute_with_long_running_notification(
88        catalog_writer.alter_backfill_parallelism(job_id, target_parallelism, deferred),
89        &session,
90        "ALTER BACKFILL PARALLELISM",
91        LongRunningNotificationAction::SuggestRecover,
92    )
93    .await?;
94
95    if deferred {
96        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
97    }
98
99    Ok(builder.into())
100}
101
102pub async fn handle_alter_fragment_parallelism(
103    handler_args: HandlerArgs,
104    fragment_ids: Vec<FragmentId>,
105    parallelism: SetVariableValue,
106) -> Result<RwPgResponse> {
107    let session = handler_args.session;
108    let target_parallelism = extract_fragment_parallelism(parallelism)?;
109
110    session
111        .env()
112        .meta_client()
113        .alter_fragment_parallelism(fragment_ids, target_parallelism)
114        .await?;
115
116    Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
117}
118
119fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
120    let adaptive_parallelism = PbTableParallelism {
121        parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
122    };
123
124    // If the target parallelism is set to 0/auto/default, we would consider it as auto parallelism.
125    let target_parallelism = match parallelism {
126        SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
127            if ident
128                .real_value()
129                .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
130        {
131            adaptive_parallelism
132        }
133
134        SetVariableValue::Default => adaptive_parallelism,
135        SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
136            let fixed_parallelism = v.parse::<u32>().map_err(|e| {
137                ErrorCode::InvalidInputSyntax(format!(
138                    "target parallelism must be a valid number or adaptive: {}",
139                    e.as_report()
140                ))
141            })?;
142
143            if fixed_parallelism == 0 {
144                adaptive_parallelism
145            } else {
146                PbTableParallelism {
147                    parallelism: Some(PbParallelism::Fixed(FixedParallelism {
148                        parallelism: fixed_parallelism,
149                    })),
150                }
151            }
152        }
153
154        _ => {
155            return Err(ErrorCode::InvalidInputSyntax(
156                "target parallelism must be a valid number or adaptive".to_owned(),
157            )
158            .into());
159        }
160    };
161
162    Ok(target_parallelism)
163}
164
165fn extract_backfill_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
166    match parallelism {
167        SetVariableValue::Default => Ok(None),
168        other => extract_table_parallelism(other).map(Some),
169    }
170}
171
172fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
173    match parallelism {
174        SetVariableValue::Default => Ok(None),
175        other => extract_table_parallelism(other).map(Some),
176    }
177}