1use pgwire::pg_response::StatementType;
16use risingwave_common::config::meta::default::compaction_config;
17use risingwave_hummock_sdk::CompactionGroupId;
18use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
19use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
20use risingwave_sqlparser::ast::{
21 AlterCompactionGroupOperation, ConfigParam, SetVariableValue, SetVariableValueSingle, Value,
22};
23
24use super::{HandlerArgs, RwPgResponse};
25use crate::error::{ErrorCode, Result};
26
27const OPTIONAL_U64_UNSET_WIRE: u64 = u64::MAX;
32
33pub async fn handle_alter_compaction_group(
34 handler_args: HandlerArgs,
35 group_ids: Vec<u64>,
36 operation: AlterCompactionGroupOperation,
37) -> Result<RwPgResponse> {
38 if !handler_args.session.is_super_user() {
40 return Err(ErrorCode::PermissionDenied(
41 "must be superuser to execute ALTER COMPACTION GROUP command".to_owned(),
42 )
43 .into());
44 }
45
46 let configs = match operation {
47 AlterCompactionGroupOperation::Set { configs } => {
48 build_compaction_config_from_params(&configs)?
49 }
50 };
51
52 if configs.is_empty() {
53 return Err(ErrorCode::InvalidInputSyntax(
54 "no valid compaction config specified".to_owned(),
55 )
56 .into());
57 }
58
59 let meta_client = handler_args.session.env().meta_client();
60 meta_client
61 .update_compaction_config(
62 group_ids
63 .iter()
64 .copied()
65 .map(CompactionGroupId::from)
66 .collect(),
67 configs,
68 )
69 .await?;
70
71 Ok(RwPgResponse::builder(StatementType::ALTER_COMPACTION_GROUP)
72 .notice(format!(
73 "Compaction group(s) {:?} config updated successfully",
74 group_ids
75 ))
76 .into())
77}
78
79fn build_compaction_config_from_params(params: &[ConfigParam]) -> Result<Vec<MutableConfig>> {
80 let mut configs = vec![];
81
82 for param in params {
83 let name = param.param.real_value().to_ascii_lowercase();
85 let value = ¶m.value;
86
87 let config = match name.as_str() {
88 "max_bytes_for_level_base" => MutableConfig::MaxBytesForLevelBase(parse_u64_value(
89 value,
90 &name,
91 compaction_config::max_bytes_for_level_base(),
92 )?),
93 "max_bytes_for_level_multiplier" => {
94 MutableConfig::MaxBytesForLevelMultiplier(parse_u64_value(
95 value,
96 &name,
97 compaction_config::max_bytes_for_level_multiplier(),
98 )?)
99 }
100 "max_compaction_bytes" => MutableConfig::MaxCompactionBytes(parse_u64_value(
101 value,
102 &name,
103 compaction_config::max_compaction_bytes(),
104 )?),
105 "sub_level_max_compaction_bytes" => {
106 MutableConfig::SubLevelMaxCompactionBytes(parse_u64_value(
107 value,
108 &name,
109 compaction_config::sub_level_max_compaction_bytes(),
110 )?)
111 }
112 "level0_tier_compact_file_number" => {
113 MutableConfig::Level0TierCompactFileNumber(parse_u64_value(
114 value,
115 &name,
116 compaction_config::level0_tier_compact_file_number(),
117 )?)
118 }
119 "target_file_size_base" => MutableConfig::TargetFileSizeBase(parse_u64_value(
120 value,
121 &name,
122 compaction_config::target_file_size_base(),
123 )?),
124 "compaction_filter_mask" => MutableConfig::CompactionFilterMask(parse_u32_value(
125 value,
126 &name,
127 compaction_config::compaction_filter_mask(),
128 )?),
129 "max_sub_compaction" => MutableConfig::MaxSubCompaction(parse_u32_value(
130 value,
131 &name,
132 compaction_config::max_sub_compaction(),
133 )?),
134 "level0_stop_write_threshold_sub_level_number" => {
135 MutableConfig::Level0StopWriteThresholdSubLevelNumber(parse_u64_value(
136 value,
137 &name,
138 compaction_config::level0_stop_write_threshold_sub_level_number(),
139 )?)
140 }
141 "level0_sub_level_compact_level_count" => {
142 MutableConfig::Level0SubLevelCompactLevelCount(parse_u32_value(
143 value,
144 &name,
145 compaction_config::level0_sub_level_compact_level_count(),
146 )?)
147 }
148 "max_space_reclaim_bytes" => MutableConfig::MaxSpaceReclaimBytes(parse_u64_value(
149 value,
150 &name,
151 compaction_config::max_space_reclaim_bytes(),
152 )?),
153 "level0_max_compact_file_number" => {
154 MutableConfig::Level0MaxCompactFileNumber(parse_u64_value(
155 value,
156 &name,
157 compaction_config::level0_max_compact_file_number(),
158 )?)
159 }
160 "level0_overlapping_sub_level_compact_level_count" => {
161 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(parse_u32_value(
162 value,
163 &name,
164 compaction_config::level0_overlapping_sub_level_compact_level_count(),
165 )?)
166 }
167 "enable_emergency_picker" => MutableConfig::EnableEmergencyPicker(parse_bool_value(
168 value,
169 &name,
170 compaction_config::enable_emergency_picker(),
171 )?),
172 "tombstone_reclaim_ratio" => MutableConfig::TombstoneReclaimRatio(parse_u32_value(
173 value,
174 &name,
175 compaction_config::tombstone_reclaim_ratio(),
176 )?),
177 "compression_algorithm" => {
178 match value {
181 SetVariableValue::Default => MutableConfig::ResetCompressionAlgorithm(true),
183 _ => {
184 let algorithm = parse_compression_algorithm_with_level(value, &name)?;
185 MutableConfig::CompressionAlgorithm(algorithm)
186 }
187 }
188 }
189 "max_l0_compact_level_count" => MutableConfig::MaxL0CompactLevelCount(parse_u32_value(
190 value,
191 &name,
192 compaction_config::max_l0_compact_level_count(),
193 )?),
194 "sst_allowed_trivial_move_min_size" => {
195 MutableConfig::SstAllowedTrivialMoveMinSize(parse_u64_value(
196 value,
197 &name,
198 compaction_config::sst_allowed_trivial_move_min_size(),
199 )?)
200 }
201 "disable_auto_group_scheduling" => {
202 MutableConfig::DisableAutoGroupScheduling(parse_bool_value(
203 value,
204 &name,
205 compaction_config::disable_auto_group_scheduling(),
206 )?)
207 }
208 "max_overlapping_level_size" => {
209 MutableConfig::MaxOverlappingLevelSize(parse_u64_value(
210 value,
211 &name,
212 compaction_config::max_overlapping_level_size(),
213 )?)
214 }
215 "sst_allowed_trivial_move_max_count" => {
216 MutableConfig::SstAllowedTrivialMoveMaxCount(parse_u32_value(
217 value,
218 &name,
219 compaction_config::sst_allowed_trivial_move_max_count(),
220 )?)
221 }
222 "emergency_level0_sst_file_count" => {
223 MutableConfig::EmergencyLevel0SstFileCount(parse_u32_value(
224 value,
225 &name,
226 compaction_config::emergency_level0_sst_file_count(),
227 )?)
228 }
229 "emergency_level0_sub_level_partition" => {
230 MutableConfig::EmergencyLevel0SubLevelPartition(parse_u32_value(
231 value,
232 &name,
233 compaction_config::emergency_level0_sub_level_partition(),
234 )?)
235 }
236 "level0_stop_write_threshold_max_sst_count" => {
237 MutableConfig::Level0StopWriteThresholdMaxSstCount(parse_u32_value(
238 value,
239 &name,
240 compaction_config::level0_stop_write_threshold_max_sst_count(),
241 )?)
242 }
243 "level0_stop_write_threshold_max_size" => {
244 MutableConfig::Level0StopWriteThresholdMaxSize(parse_u64_value(
245 value,
246 &name,
247 compaction_config::level0_stop_write_threshold_max_size(),
248 )?)
249 }
250 "enable_optimize_l0_interval_selection" => {
251 MutableConfig::EnableOptimizeL0IntervalSelection(parse_bool_value(
252 value,
253 &name,
254 compaction_config::enable_optimize_l0_interval_selection(),
255 )?)
256 }
257 "vnode_aligned_level_size_threshold" => {
258 return Err(ErrorCode::InvalidInputSyntax(
259 "vnode_aligned_level_size_threshold is deprecated and cannot be altered via SQL"
260 .to_owned(),
261 )
262 .into());
263 }
264 "max_kv_count_for_xor16" => {
265 MutableConfig::MaxKvCountForXor16(parse_optional_positive_u64_value(
266 value,
267 &name,
268 compaction_config::max_kv_count_for_xor16(),
269 )?)
270 }
271 "max_vnode_key_range_bytes" => {
272 MutableConfig::MaxVnodeKeyRangeBytes(parse_optional_positive_u64_value(
273 value,
274 &name,
275 compaction_config::max_vnode_key_range_bytes(),
276 )?)
277 }
278 "split_weight_by_vnode" => {
279 return Err(ErrorCode::InvalidInputSyntax(
280 "split_weight_by_vnode is managed by the system and cannot be altered via SQL"
281 .to_owned(),
282 )
283 .into());
284 }
285 _ => {
286 return Err(ErrorCode::InvalidInputSyntax(format!(
287 "unknown compaction config parameter: {}",
288 name
289 ))
290 .into());
291 }
292 };
293 configs.push(config);
294 }
295
296 Ok(configs)
297}
298
299fn expect_single_literal<'a>(value: &'a SetVariableValue, name: &str) -> Result<&'a Value> {
300 match value {
301 SetVariableValue::Single(SetVariableValueSingle::Literal(v)) => Ok(v),
302 _ => Err(ErrorCode::InvalidInputSyntax(format!(
303 "expected a single literal value for {}, got {}",
304 name, value
305 ))
306 .into()),
307 }
308}
309
310fn parse_u64_value(value: &SetVariableValue, name: &str, default_value: u64) -> Result<u64> {
311 if matches!(value, SetVariableValue::Default) {
312 return Ok(default_value);
313 }
314
315 let v = expect_single_literal(value, name)?;
316 match v {
317 Value::Number(n) => n.parse::<u64>().map_err(|_| {
318 ErrorCode::InvalidInputSyntax(format!("invalid u64 value for {}: {}", name, n)).into()
319 }),
320 _ => Err(ErrorCode::InvalidInputSyntax(format!(
321 "expected numeric value for {}, got {}",
322 name, v
323 ))
324 .into()),
325 }
326}
327
328fn parse_u32_value(value: &SetVariableValue, name: &str, default_value: u32) -> Result<u32> {
329 if matches!(value, SetVariableValue::Default) {
330 return Ok(default_value);
331 }
332
333 let v = expect_single_literal(value, name)?;
334 match v {
335 Value::Number(n) => n.parse::<u32>().map_err(|_| {
336 ErrorCode::InvalidInputSyntax(format!("invalid u32 value for {}: {}", name, n)).into()
337 }),
338 _ => Err(ErrorCode::InvalidInputSyntax(format!(
339 "expected numeric value for {}, got {}",
340 name, v
341 ))
342 .into()),
343 }
344}
345
346fn parse_bool_value(value: &SetVariableValue, name: &str, default_value: bool) -> Result<bool> {
347 if matches!(value, SetVariableValue::Default) {
348 return Ok(default_value);
349 }
350
351 if let SetVariableValue::Single(SetVariableValueSingle::Ident(ident)) = value {
352 return parse_bool_str(name, &ident.real_value());
353 }
354
355 let v = expect_single_literal(value, name)?;
356 match v {
357 Value::Boolean(b) => Ok(*b),
358 Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) | Value::Number(s) => {
359 parse_bool_str(name, s)
360 }
361 _ => Err(ErrorCode::InvalidInputSyntax(format!(
362 "expected boolean value for {}, got {}",
363 name, v
364 ))
365 .into()),
366 }
367}
368
369fn parse_bool_str(name: &str, raw: &str) -> Result<bool> {
370 match raw.to_ascii_lowercase().as_str() {
371 "true" | "on" | "1" => Ok(true),
372 "false" | "off" | "0" => Ok(false),
373 _ => Err(ErrorCode::InvalidInputSyntax(format!(
374 "invalid boolean value for {}: {}",
375 name, raw
376 ))
377 .into()),
378 }
379}
380
381fn parse_optional_u64_value(
382 value: &SetVariableValue,
383 name: &str,
384 default_value: Option<u64>,
385) -> Result<u64> {
386 if matches!(value, SetVariableValue::Default) {
387 return Ok(default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE));
388 }
389
390 parse_u64_value(
391 value,
392 name,
393 default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE),
394 )
395}
396
397fn parse_optional_positive_u64_value(
398 value: &SetVariableValue,
399 name: &str,
400 default_value: Option<u64>,
401) -> Result<u64> {
402 if matches!(value, SetVariableValue::Default) {
403 return Ok(default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE));
404 }
405
406 let value = parse_optional_u64_value(value, name, default_value)?;
407 if value == 0 || value == OPTIONAL_U64_UNSET_WIRE {
408 return Err(ErrorCode::InvalidInputSyntax(format!(
409 "{} must be between 1 and {}, or DEFAULT",
410 name,
411 OPTIONAL_U64_UNSET_WIRE - 1
412 ))
413 .into());
414 }
415 Ok(value)
416}
417
418fn parse_compression_algorithm_with_level(
419 value: &SetVariableValue,
420 name: &str,
421) -> Result<CompressionAlgorithm> {
422 let v = expect_single_literal(value, name)?;
426 let input_str = match v {
427 Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => s.to_ascii_lowercase(),
428 _ => {
429 return Err(ErrorCode::InvalidInputSyntax(format!(
430 "expected string value for {} in format 'level:algorithm' (e.g., '3:lz4'), got {}",
431 name, v
432 ))
433 .into());
434 }
435 };
436
437 let (level_str, algo_str) = input_str.split_once(':').ok_or_else(|| {
438 ErrorCode::InvalidInputSyntax(format!(
439 "invalid format for {}: '{}', expected 'level:algorithm' (e.g., '3:lz4', '6:zstd')",
440 name, input_str
441 ))
442 })?;
443
444 let level: u32 = level_str.parse().map_err(|_| {
445 ErrorCode::InvalidInputSyntax(format!(
446 "invalid level for {}: '{}', expected a non-negative integer",
447 name, level_str
448 ))
449 })?;
450
451 let algo_name = match algo_str {
455 "none" => "None",
456 "lz4" => "Lz4",
457 "zstd" => "Zstd",
458 _ => {
459 return Err(ErrorCode::InvalidInputSyntax(format!(
460 "invalid compression algorithm for {}: '{}', expected one of: none, lz4, zstd",
461 name, algo_str
462 ))
463 .into());
464 }
465 };
466
467 Ok(CompressionAlgorithm {
468 level,
469 compression_algorithm: algo_name.to_owned(),
470 })
471}