risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::meta::table_parallelism::{
17 AdaptiveParallelism, FixedParallelism, PbParallelism,
18};
19use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
20use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
21use risingwave_sqlparser::keywords::Keyword;
22use thiserror_ext::AsReport;
23
24use super::alter_utils::resolve_streaming_job_id_for_alter_parallelism;
25use super::{HandlerArgs, RwPgResponse};
26use crate::catalog::FragmentId;
27use crate::error::{ErrorCode, Result};
28use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
29
30pub async fn handle_alter_parallelism(
31 handler_args: HandlerArgs,
32 obj_name: ObjectName,
33 parallelism: SetVariableValue,
34 stmt_type: StatementType,
35 deferred: bool,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session;
38
39 let job_id = resolve_streaming_job_id_for_alter_parallelism(
40 &session,
41 obj_name,
42 stmt_type,
43 "parallelism",
44 )?;
45
46 let target_parallelism = extract_table_parallelism(parallelism)?;
47
48 let mut builder = RwPgResponse::builder(stmt_type);
49
50 let catalog_writer = session.catalog_writer()?;
51 execute_with_long_running_notification(
52 catalog_writer.alter_parallelism(job_id, target_parallelism, deferred),
53 &session,
54 "ALTER PARALLELISM",
55 LongRunningNotificationAction::SuggestRecover,
56 )
57 .await?;
58
59 if deferred {
60 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
61 }
62
63 Ok(builder.into())
64}
65
66pub async fn handle_alter_backfill_parallelism(
67 handler_args: HandlerArgs,
68 obj_name: ObjectName,
69 parallelism: SetVariableValue,
70 stmt_type: StatementType,
71 deferred: bool,
72) -> Result<RwPgResponse> {
73 let session = handler_args.session;
74
75 let job_id = resolve_streaming_job_id_for_alter_parallelism(
76 &session,
77 obj_name,
78 stmt_type,
79 "backfill_parallelism",
80 )?;
81
82 let target_parallelism = extract_backfill_parallelism(parallelism)?;
83
84 let mut builder = RwPgResponse::builder(stmt_type);
85
86 let catalog_writer = session.catalog_writer()?;
87 execute_with_long_running_notification(
88 catalog_writer.alter_backfill_parallelism(job_id, target_parallelism, deferred),
89 &session,
90 "ALTER BACKFILL PARALLELISM",
91 LongRunningNotificationAction::SuggestRecover,
92 )
93 .await?;
94
95 if deferred {
96 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
97 }
98
99 Ok(builder.into())
100}
101
102pub async fn handle_alter_fragment_parallelism(
103 handler_args: HandlerArgs,
104 fragment_ids: Vec<FragmentId>,
105 parallelism: SetVariableValue,
106) -> Result<RwPgResponse> {
107 let session = handler_args.session;
108 let target_parallelism = extract_fragment_parallelism(parallelism)?;
109
110 session
111 .env()
112 .meta_client()
113 .alter_fragment_parallelism(fragment_ids, target_parallelism)
114 .await?;
115
116 Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
117}
118
119fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
120 let adaptive_parallelism = PbTableParallelism {
121 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
122 };
123
124 let target_parallelism = match parallelism {
126 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
127 if ident
128 .real_value()
129 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
130 {
131 adaptive_parallelism
132 }
133
134 SetVariableValue::Default => adaptive_parallelism,
135 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
136 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
137 ErrorCode::InvalidInputSyntax(format!(
138 "target parallelism must be a valid number or adaptive: {}",
139 e.as_report()
140 ))
141 })?;
142
143 if fixed_parallelism == 0 {
144 adaptive_parallelism
145 } else {
146 PbTableParallelism {
147 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
148 parallelism: fixed_parallelism,
149 })),
150 }
151 }
152 }
153
154 _ => {
155 return Err(ErrorCode::InvalidInputSyntax(
156 "target parallelism must be a valid number or adaptive".to_owned(),
157 )
158 .into());
159 }
160 };
161
162 Ok(target_parallelism)
163}
164
165fn extract_backfill_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
166 match parallelism {
167 SetVariableValue::Default => Ok(None),
168 other => extract_table_parallelism(other).map(Some),
169 }
170}
171
172fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
173 match parallelism {
174 SetVariableValue::Default => Ok(None),
175 other => extract_table_parallelism(other).map(Some),
176 }
177}