risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_common::bail;
17use risingwave_pb::meta::table_parallelism::{
18 AdaptiveParallelism, FixedParallelism, PbParallelism,
19};
20use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
21use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
22use risingwave_sqlparser::keywords::Keyword;
23use thiserror_ext::AsReport;
24
25use super::{HandlerArgs, RwPgResponse};
26use crate::Binder;
27use crate::catalog::CatalogError;
28use crate::catalog::root_catalog::SchemaPath;
29use crate::catalog::table_catalog::TableType;
30use crate::error::{ErrorCode, Result};
31
32pub async fn handle_alter_parallelism(
33 handler_args: HandlerArgs,
34 obj_name: ObjectName,
35 parallelism: SetVariableValue,
36 stmt_type: StatementType,
37 deferred: bool,
38) -> Result<RwPgResponse> {
39 let session = handler_args.session;
40 let db_name = &session.database();
41 let (schema_name, real_table_name) = Binder::resolve_schema_qualified_name(db_name, &obj_name)?;
42 let search_path = session.config().search_path();
43 let user_name = &session.user_name();
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let job_id = {
47 let reader = session.env().catalog_reader().read_guard();
48
49 match stmt_type {
50 StatementType::ALTER_TABLE
51 | StatementType::ALTER_MATERIALIZED_VIEW
52 | StatementType::ALTER_INDEX => {
53 let (table, schema_name) =
54 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
55
56 match (table.table_type(), stmt_type) {
57 (TableType::Internal, _) => {
58 return Err(CatalogError::NotFound("table", table.name().to_owned()).into());
60 }
61 (TableType::Table, StatementType::ALTER_TABLE)
62 | (TableType::MaterializedView, StatementType::ALTER_MATERIALIZED_VIEW)
63 | (TableType::Index, StatementType::ALTER_INDEX) => {}
64 _ => {
65 return Err(ErrorCode::InvalidInputSyntax(format!(
66 "cannot alter parallelism of {} {} by {}",
67 table.table_type().to_prost().as_str_name(),
68 table.name(),
69 stmt_type,
70 ))
71 .into());
72 }
73 }
74
75 session.check_privilege_for_drop_alter(schema_name, &**table)?;
76 table.id.table_id()
77 }
78 StatementType::ALTER_SOURCE => {
79 let (source, schema_name) =
80 reader.get_source_by_name(db_name, schema_path, &real_table_name)?;
81
82 if !source.info.is_shared() {
83 return Err(ErrorCode::InvalidInputSyntax(
84 "cannot alter parallelism of non-shared source.\nUse `ALTER MATERIALIZED VIEW SET PARALLELISM` to alter the materialized view using the source instead."
85 .to_owned()
86 )
87 .into());
88 }
89
90 session.check_privilege_for_drop_alter(schema_name, &**source)?;
91 source.id
92 }
93 StatementType::ALTER_SINK => {
94 let (sink, schema_name) =
95 reader.get_created_sink_by_name(db_name, schema_path, &real_table_name)?;
96
97 session.check_privilege_for_drop_alter(schema_name, &**sink)?;
98 sink.id.sink_id()
99 }
100 _ => bail!(
102 "invalid statement type for alter parallelism: {:?}",
103 stmt_type
104 ),
105 }
106 };
107
108 let target_parallelism = extract_table_parallelism(parallelism)?;
109
110 let mut builder = RwPgResponse::builder(stmt_type);
111
112 let catalog_writer = session.catalog_writer()?;
113 catalog_writer
114 .alter_parallelism(job_id, target_parallelism, deferred)
115 .await?;
116
117 if deferred {
118 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
119 }
120
121 Ok(builder.into())
122}
123
124fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
125 let adaptive_parallelism = PbTableParallelism {
126 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
127 };
128
129 let target_parallelism = match parallelism {
131 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
132 if ident
133 .real_value()
134 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
135 {
136 adaptive_parallelism
137 }
138
139 SetVariableValue::Default => adaptive_parallelism,
140 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
141 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
142 ErrorCode::InvalidInputSyntax(format!(
143 "target parallelism must be a valid number or adaptive: {}",
144 e.as_report()
145 ))
146 })?;
147
148 if fixed_parallelism == 0 {
149 adaptive_parallelism
150 } else {
151 PbTableParallelism {
152 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
153 parallelism: fixed_parallelism,
154 })),
155 }
156 }
157 }
158
159 _ => {
160 return Err(ErrorCode::InvalidInputSyntax(
161 "target parallelism must be a valid number or adaptive".to_owned(),
162 )
163 .into());
164 }
165 };
166
167 Ok(target_parallelism)
168}