risingwave_frontend/handler/
alter_parallelism.rs1use pgwire::pg_response::StatementType;
16use risingwave_pb::meta::table_parallelism::{
17 AdaptiveParallelism, FixedParallelism, PbParallelism,
18};
19use risingwave_pb::meta::{PbTableParallelism, TableParallelism};
20use risingwave_sqlparser::ast::{ObjectName, SetVariableValue, SetVariableValueSingle, Value};
21use risingwave_sqlparser::keywords::Keyword;
22use thiserror_ext::AsReport;
23
24use super::alter_utils::resolve_streaming_job_id_for_alter;
25use super::{HandlerArgs, RwPgResponse};
26use crate::catalog::FragmentId;
27use crate::error::{ErrorCode, Result};
28
29pub async fn handle_alter_parallelism(
30 handler_args: HandlerArgs,
31 obj_name: ObjectName,
32 parallelism: SetVariableValue,
33 stmt_type: StatementType,
34 deferred: bool,
35) -> Result<RwPgResponse> {
36 let session = handler_args.session;
37
38 let job_id = resolve_streaming_job_id_for_alter(&session, obj_name, stmt_type, "parallelism")?;
39
40 let target_parallelism = extract_table_parallelism(parallelism)?;
41
42 let mut builder = RwPgResponse::builder(stmt_type);
43
44 let catalog_writer = session.catalog_writer()?;
45 catalog_writer
46 .alter_parallelism(job_id, target_parallelism, deferred)
47 .await?;
48
49 if deferred {
50 builder = builder.notice("DEFERRED is used, please ensure that automatic parallelism control is enabled on the meta, otherwise, the alter will not take effect.".to_owned());
51 }
52
53 Ok(builder.into())
54}
55
56pub async fn handle_alter_fragment_parallelism(
57 handler_args: HandlerArgs,
58 fragment_ids: Vec<FragmentId>,
59 parallelism: SetVariableValue,
60) -> Result<RwPgResponse> {
61 let session = handler_args.session;
62 let target_parallelism = extract_fragment_parallelism(parallelism)?;
63
64 session
65 .env()
66 .meta_client()
67 .alter_fragment_parallelism(fragment_ids, target_parallelism)
68 .await?;
69
70 Ok(RwPgResponse::builder(StatementType::ALTER_FRAGMENT).into())
71}
72
73fn extract_table_parallelism(parallelism: SetVariableValue) -> Result<TableParallelism> {
74 let adaptive_parallelism = PbTableParallelism {
75 parallelism: Some(PbParallelism::Adaptive(AdaptiveParallelism {})),
76 };
77
78 let target_parallelism = match parallelism {
80 SetVariableValue::Single(SetVariableValueSingle::Ident(ident))
81 if ident
82 .real_value()
83 .eq_ignore_ascii_case(&Keyword::ADAPTIVE.to_string()) =>
84 {
85 adaptive_parallelism
86 }
87
88 SetVariableValue::Default => adaptive_parallelism,
89 SetVariableValue::Single(SetVariableValueSingle::Literal(Value::Number(v))) => {
90 let fixed_parallelism = v.parse::<u32>().map_err(|e| {
91 ErrorCode::InvalidInputSyntax(format!(
92 "target parallelism must be a valid number or adaptive: {}",
93 e.as_report()
94 ))
95 })?;
96
97 if fixed_parallelism == 0 {
98 adaptive_parallelism
99 } else {
100 PbTableParallelism {
101 parallelism: Some(PbParallelism::Fixed(FixedParallelism {
102 parallelism: fixed_parallelism,
103 })),
104 }
105 }
106 }
107
108 _ => {
109 return Err(ErrorCode::InvalidInputSyntax(
110 "target parallelism must be a valid number or adaptive".to_owned(),
111 )
112 .into());
113 }
114 };
115
116 Ok(target_parallelism)
117}
118
119fn extract_fragment_parallelism(parallelism: SetVariableValue) -> Result<Option<TableParallelism>> {
120 match parallelism {
121 SetVariableValue::Default => Ok(None),
122 other => extract_table_parallelism(other).map(Some),
123 }
124}