risingwave_frontend/handler/
alter_parallelism.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::bail;
17use risingwave_pb::meta::table_parallelism::{
18    AdaptiveParallelism, FixedParallelism, PbParallelism,
19};
20use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
21use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
22use risingwave_sqlparser::keywords::Keyword;
23use thiserror_ext::AsReport;
24
25use super::{HandlerArgs, RwPgResponse};
26use crate::Binder;
27use crate::catalog::CatalogError;
28use crate::catalog::root_catalog::SchemaPath;
29use crate::catalog::table_catalog::TableType;
30use crate::error::{ErrorCode, Result};
31
32pub async fn handle_alter_parallelism(
33    handler_args: HandlerArgs,
34    obj_name: ObjectName,
35    parallelism: SetVariableValue,
36    stmt_type: StatementType,
37    deferred: bool,
38) -> Result<RwPgResponse> {
39    let session = handler_args.session;
40    let db_name = &session.database();
41    let (schema_name, real_table_name) =
42        Binder::resolve_schema_qualified_name(db_name, obj_name.clone())?;
43    let search_path = session.config().search_path();
44    let user_name = &session.user_name();
45    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47    let job_id = {
48        let reader = session.env().catalog_reader().read_guard();
49
50        match stmt_type {
51            StatementType::ALTER_TABLE
52            | StatementType::ALTER_MATERIALIZED_VIEW
53            | StatementType::ALTER_INDEX => {
54                let (table, schema_name) =
55                    reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
56
57                match (table.table_type(), stmt_type) {
58                    (TableType::Internal, _) => {
59                        // we treat internal table as NOT FOUND
60                        return Err(CatalogError::NotFound("table", table.name().to_owned()).into());
61                    }
62                    (TableType::Table, StatementType::ALTER_TABLE)
63                    | (TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW)
64                    | (TableType::Index, StatementType::ALTER_INDEX) => {}
65                    _ => {
66                        return Err(ErrorCode::InvalidInputSyntax(format!(
67                            "cannot alter parallelism of {} {} by {}",
68                            table.table_type().to_prost().as_str_name(),
69                            table.name(),
70                            stmt_type,
71                        ))
72                        .into());
73                    }
74                }
75
76                session.check_privilege_for_drop_alter(schema_name, &**table)?;
77                table.id.table_id()
78            }
79            StatementType::ALTER_SOURCE => {
80                let (source, schema_name) =
81                    reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
82
83                if !source.info.is_shared() {
84                    return Err(ErrorCode::InvalidInputSyntax(
85                        "cannot alter parallelism of non-shared source.\nUse `ALTER MATERIALIZED VIEW SET PARALLELISM` to alter the materialized view using the source instead."
86                        .to_owned()
87                    )
88                    .into());
89                }
90
91                session.check_privilege_for_drop_alter(schema_name, &**source)?;
92                source.id
93            }
94            StatementType::ALTER_SINK => {
95                let (sink, schema_name) =
96                    reader.get_sink_by_name(db_name, schema_path, &real_table_name)?;
97
98                session.check_privilege_for_drop_alter(schema_name, &**sink)?;
99                sink.id.sink_id()
100            }
101            // TODO: support alter parallelism for shared source
102            _ => bail!(
103                "invalid statement type for alter parallelism: {:?}",
104                stmt_type
105            ),
106        }
107    };
108
109    let target_parallelism = extract_table_parallelism(parallelism)?;
110
111    let mut builder = RwPgResponse::builder(stmt_type);
112
113    let catalog_writer = session.catalog_writer()?;
114    catalog_writer
115        .alter_parallelism(job_id, target_parallelism, deferred)
116        .await?;
117
118    if deferred {
119        builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
120    }
121
122    Ok(builder.into())
123}
124
125fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
126    let adaptive_parallelism = PbTableParallelism {
127        parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
128    };
129
130    // If the target parallelism is set to 0/auto/default, we would consider it as auto parallelism.
131    let target_parallelism = match parallelism {
132        SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
133            if ident
134                .real_value()
135                .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
136        {
137            adaptive_parallelism
138        }
139
140        SetVariableValue::Default => adaptive_parallelism,
141        SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
142            let fixed_parallelism = v.parse::<u32>().map_err(|e| {
143                ErrorCode::InvalidInputSyntax(format!(
144                    "target parallelism must be a valid number or adaptive: {}",
145                    e.as_report()
146                ))
147            })?;
148
149            if fixed_parallelism == 0 {
150                adaptive_parallelism
151            } else {
152                PbTableParallelism {
153                    parallelism: Some(PbParallelism::Fixed(FixedParallelism {
154                        parallelism: fixed_parallelism,
155                    })),
156                }
157            }
158        }
159
160        _ => {
161            return Err(ErrorCode::InvalidInputSyntax(
162                "target parallelism must be a valid number or adaptive".to_owned(),
163            )
164            .into());
165        }
166    };
167
168    Ok(target_parallelism)
169}