risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::meta::table_parallelism::{
17 AdaptiveParallelism, FixedParallelism, PbParallelism,
18};
19use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
20use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
21use risingwave_sqlparser::keywords::Keyword;
22use thiserror_ext::AsReport;
23
24use super::alter_utils::resolve_streaming_job_id_for_alter;
25use super::{HandlerArgs, RwPgResponse};
26use crate::catalog::FragmentId;
27use crate::error::{ErrorCode, Result};
28use crate::handler::util::{LongRunningNotificationAction, execute_with_long_running_notification};
29
30pub async fn handle_alter_parallelism(
31 handler_args: HandlerArgs,
32 obj_name: ObjectName,
33 parallelism: SetVariableValue,
34 stmt_type: StatementType,
35 deferred: bool,
36) -> Result<RwPgResponse> {
37 let session = handler_args.session;
38
39 let job_id = resolve_streaming_job_id_for_alter(&session, obj_name, stmt_type, "parallelism")?;
40
41 let target_parallelism = extract_table_parallelism(parallelism)?;
42
43 let mut builder = RwPgResponse::builder(stmt_type);
44
45 let catalog_writer = session.catalog_writer()?;
46 execute_with_long_running_notification(
47 catalog_writer.alter_parallelism(job_id, target_parallelism, deferred),
48 &session,
49 "ALTER PARALLELISM",
50 LongRunningNotificationAction::SuggestRecover,
51 )
52 .await?;
53
54 if deferred {
55 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
56 }
57
58 Ok(builder.into())
59}
60
61pub async fn handle_alter_fragment_parallelism(
62 handler_args: HandlerArgs,
63 fragment_ids: Vec<FragmentId>,
64 parallelism: SetVariableValue,
65) -> Result<RwPgResponse> {
66 let session = handler_args.session;
67 let target_parallelism = extract_fragment_parallelism(parallelism)?;
68
69 session
70 .env()
71 .meta_client()
72 .alter_fragment_parallelism(fragment_ids, target_parallelism)
73 .await?;
74
75 Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
76}
77
78fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
79 let adaptive_parallelism = PbTableParallelism {
80 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
81 };
82
83 let target_parallelism = match parallelism {
85 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
86 if ident
87 .real_value()
88 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
89 {
90 adaptive_parallelism
91 }
92
93 SetVariableValue::Default => adaptive_parallelism,
94 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
95 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
96 ErrorCode::InvalidInputSyntax(format!(
97 "target parallelism must be a valid number or adaptive: {}",
98 e.as_report()
99 ))
100 })?;
101
102 if fixed_parallelism == 0 {
103 adaptive_parallelism
104 } else {
105 PbTableParallelism {
106 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
107 parallelism: fixed_parallelism,
108 })),
109 }
110 }
111 }
112
113 _ => {
114 return Err(ErrorCode::InvalidInputSyntax(
115 "target parallelism must be a valid number or adaptive".to_owned(),
116 )
117 .into());
118 }
119 };
120
121 Ok(target_parallelism)
122}
123
124fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
125 match parallelism {
126 SetVariableValue::Default => Ok(None),
127 other => extract_table_parallelism(other).map(Some),
128 }
129}