risingwave_frontend/handler/
alter_compaction_group.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::StatementType;
16use risingwave_common::config::meta::default::compaction_config;
17use risingwave_hummock_sdk::CompactionGroupId;
18use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
19use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
20use risingwave_sqlparser::ast::{
21    AlterCompactionGroupOperation, ConfigParam, SetVariableValue, SetVariableValueSingle, Value,
22};
23
24use super::{HandlerArgs, RwPgResponse};
25use crate::error::{ErrorCode, Result};
26
27/// Wire encoding for clearing optional `u64` compaction configs.
28///
29/// Note: meta currently treats both `u64::MIN` and `u64::MAX` as "unset" for historical reasons.
30/// SQL `DEFAULT` always sends `u64::MAX` to avoid overloading `0` at the SQL layer.
31const OPTIONAL_U64_UNSET_WIRE: u64 = u64::MAX;
32
33pub async fn handle_alter_compaction_group(
34    handler_args: HandlerArgs,
35    group_ids: Vec<u64>,
36    operation: AlterCompactionGroupOperation,
37) -> Result<RwPgResponse> {
38    // Only superuser can alter compaction group config
39    if !handler_args.session.is_super_user() {
40        return Err(ErrorCode::PermissionDenied(
41            "must be superuser to execute ALTER COMPACTION GROUP command".to_owned(),
42        )
43        .into());
44    }
45
46    let configs = match operation {
47        AlterCompactionGroupOperation::Set { configs } => {
48            build_compaction_config_from_params(&configs)?
49        }
50    };
51
52    if configs.is_empty() {
53        return Err(ErrorCode::InvalidInputSyntax(
54            "no valid compaction config specified".to_owned(),
55        )
56        .into());
57    }
58
59    let meta_client = handler_args.session.env().meta_client();
60    meta_client
61        .update_compaction_config(
62            group_ids
63                .iter()
64                .copied()
65                .map(CompactionGroupId::from)
66                .collect(),
67            configs,
68        )
69        .await?;
70
71    Ok(RwPgResponse::builder(StatementType::ALTER_COMPACTION_GROUP)
72        .notice(format!(
73            "Compaction group(s) {:?} config updated successfully",
74            group_ids
75        ))
76        .into())
77}
78
79fn build_compaction_config_from_params(params: &[ConfigParam]) -> Result<Vec<MutableConfig>> {
80    let mut configs = vec![];
81
82    for param in params {
83        // Config parameter names are ASCII identifiers. Prefer ASCII-only case folding.
84        let name = param.param.real_value().to_ascii_lowercase();
85        let value = &param.value;
86
87        let config = match name.as_str() {
88            "max_bytes_for_level_base" => MutableConfig::MaxBytesForLevelBase(parse_u64_value(
89                value,
90                &name,
91                compaction_config::max_bytes_for_level_base(),
92            )?),
93            "max_bytes_for_level_multiplier" => {
94                MutableConfig::MaxBytesForLevelMultiplier(parse_u64_value(
95                    value,
96                    &name,
97                    compaction_config::max_bytes_for_level_multiplier(),
98                )?)
99            }
100            "max_compaction_bytes" => MutableConfig::MaxCompactionBytes(parse_u64_value(
101                value,
102                &name,
103                compaction_config::max_compaction_bytes(),
104            )?),
105            "sub_level_max_compaction_bytes" => {
106                MutableConfig::SubLevelMaxCompactionBytes(parse_u64_value(
107                    value,
108                    &name,
109                    compaction_config::sub_level_max_compaction_bytes(),
110                )?)
111            }
112            "level0_tier_compact_file_number" => {
113                MutableConfig::Level0TierCompactFileNumber(parse_u64_value(
114                    value,
115                    &name,
116                    compaction_config::level0_tier_compact_file_number(),
117                )?)
118            }
119            "target_file_size_base" => MutableConfig::TargetFileSizeBase(parse_u64_value(
120                value,
121                &name,
122                compaction_config::target_file_size_base(),
123            )?),
124            "compaction_filter_mask" => MutableConfig::CompactionFilterMask(parse_u32_value(
125                value,
126                &name,
127                compaction_config::compaction_filter_mask(),
128            )?),
129            "max_sub_compaction" => MutableConfig::MaxSubCompaction(parse_u32_value(
130                value,
131                &name,
132                compaction_config::max_sub_compaction(),
133            )?),
134            "level0_stop_write_threshold_sub_level_number" => {
135                MutableConfig::Level0StopWriteThresholdSubLevelNumber(parse_u64_value(
136                    value,
137                    &name,
138                    compaction_config::level0_stop_write_threshold_sub_level_number(),
139                )?)
140            }
141            "level0_sub_level_compact_level_count" => {
142                MutableConfig::Level0SubLevelCompactLevelCount(parse_u32_value(
143                    value,
144                    &name,
145                    compaction_config::level0_sub_level_compact_level_count(),
146                )?)
147            }
148            "max_space_reclaim_bytes" => MutableConfig::MaxSpaceReclaimBytes(parse_u64_value(
149                value,
150                &name,
151                compaction_config::max_space_reclaim_bytes(),
152            )?),
153            "level0_max_compact_file_number" => {
154                MutableConfig::Level0MaxCompactFileNumber(parse_u64_value(
155                    value,
156                    &name,
157                    compaction_config::level0_max_compact_file_number(),
158                )?)
159            }
160            "level0_overlapping_sub_level_compact_level_count" => {
161                MutableConfig::Level0OverlappingSubLevelCompactLevelCount(parse_u32_value(
162                    value,
163                    &name,
164                    compaction_config::level0_overlapping_sub_level_compact_level_count(),
165                )?)
166            }
167            "enable_emergency_picker" => MutableConfig::EnableEmergencyPicker(parse_bool_value(
168                value,
169                &name,
170                compaction_config::enable_emergency_picker(),
171            )?),
172            "tombstone_reclaim_ratio" => MutableConfig::TombstoneReclaimRatio(parse_u32_value(
173                value,
174                &name,
175                compaction_config::tombstone_reclaim_ratio(),
176            )?),
177            "compression_algorithm" => {
178                // Format: 'level:algorithm' e.g., '3:lz4' or '6:zstd'
179                // This sets the compression algorithm for a specific LSM-tree level
180                match value {
181                    // Let meta expand defaults by each group's max_level.
182                    SetVariableValue::Default => MutableConfig::ResetCompressionAlgorithm(true),
183                    _ => {
184                        let algorithm = parse_compression_algorithm_with_level(value, &name)?;
185                        MutableConfig::CompressionAlgorithm(algorithm)
186                    }
187                }
188            }
189            "max_l0_compact_level_count" => MutableConfig::MaxL0CompactLevelCount(parse_u32_value(
190                value,
191                &name,
192                compaction_config::max_l0_compact_level_count(),
193            )?),
194            "sst_allowed_trivial_move_min_size" => {
195                MutableConfig::SstAllowedTrivialMoveMinSize(parse_u64_value(
196                    value,
197                    &name,
198                    compaction_config::sst_allowed_trivial_move_min_size(),
199                )?)
200            }
201            "disable_auto_group_scheduling" => {
202                MutableConfig::DisableAutoGroupScheduling(parse_bool_value(
203                    value,
204                    &name,
205                    compaction_config::disable_auto_group_scheduling(),
206                )?)
207            }
208            "max_overlapping_level_size" => {
209                MutableConfig::MaxOverlappingLevelSize(parse_u64_value(
210                    value,
211                    &name,
212                    compaction_config::max_overlapping_level_size(),
213                )?)
214            }
215            "sst_allowed_trivial_move_max_count" => {
216                MutableConfig::SstAllowedTrivialMoveMaxCount(parse_u32_value(
217                    value,
218                    &name,
219                    compaction_config::sst_allowed_trivial_move_max_count(),
220                )?)
221            }
222            "emergency_level0_sst_file_count" => {
223                MutableConfig::EmergencyLevel0SstFileCount(parse_u32_value(
224                    value,
225                    &name,
226                    compaction_config::emergency_level0_sst_file_count(),
227                )?)
228            }
229            "emergency_level0_sub_level_partition" => {
230                MutableConfig::EmergencyLevel0SubLevelPartition(parse_u32_value(
231                    value,
232                    &name,
233                    compaction_config::emergency_level0_sub_level_partition(),
234                )?)
235            }
236            "level0_stop_write_threshold_max_sst_count" => {
237                MutableConfig::Level0StopWriteThresholdMaxSstCount(parse_u32_value(
238                    value,
239                    &name,
240                    compaction_config::level0_stop_write_threshold_max_sst_count(),
241                )?)
242            }
243            "level0_stop_write_threshold_max_size" => {
244                MutableConfig::Level0StopWriteThresholdMaxSize(parse_u64_value(
245                    value,
246                    &name,
247                    compaction_config::level0_stop_write_threshold_max_size(),
248                )?)
249            }
250            "enable_optimize_l0_interval_selection" => {
251                MutableConfig::EnableOptimizeL0IntervalSelection(parse_bool_value(
252                    value,
253                    &name,
254                    compaction_config::enable_optimize_l0_interval_selection(),
255                )?)
256            }
257            "vnode_aligned_level_size_threshold" => {
258                return Err(ErrorCode::InvalidInputSyntax(
259                    "vnode_aligned_level_size_threshold is deprecated and cannot be altered via SQL"
260                        .to_owned(),
261                )
262                .into());
263            }
264            "max_kv_count_for_xor16" => {
265                MutableConfig::MaxKvCountForXor16(parse_optional_positive_u64_value(
266                    value,
267                    &name,
268                    compaction_config::max_kv_count_for_xor16(),
269                )?)
270            }
271            "max_vnode_key_range_bytes" => {
272                MutableConfig::MaxVnodeKeyRangeBytes(parse_optional_positive_u64_value(
273                    value,
274                    &name,
275                    compaction_config::max_vnode_key_range_bytes(),
276                )?)
277            }
278            "split_weight_by_vnode" => {
279                return Err(ErrorCode::InvalidInputSyntax(
280                    "split_weight_by_vnode is managed by the system and cannot be altered via SQL"
281                        .to_owned(),
282                )
283                .into());
284            }
285            _ => {
286                return Err(ErrorCode::InvalidInputSyntax(format!(
287                    "unknown compaction config parameter: {}",
288                    name
289                ))
290                .into());
291            }
292        };
293        configs.push(config);
294    }
295
296    Ok(configs)
297}
298
299fn expect_single_literal<'a>(value: &'a SetVariableValue, name: &str) -> Result<&'a Value> {
300    match value {
301        SetVariableValue::Single(SetVariableValueSingle::Literal(v)) => Ok(v),
302        _ => Err(ErrorCode::InvalidInputSyntax(format!(
303            "expected a single literal value for {}, got {}",
304            name, value
305        ))
306        .into()),
307    }
308}
309
310fn parse_u64_value(value: &SetVariableValue, name: &str, default_value: u64) -> Result<u64> {
311    if matches!(value, SetVariableValue::Default) {
312        return Ok(default_value);
313    }
314
315    let v = expect_single_literal(value, name)?;
316    match v {
317        Value::Number(n) => n.parse::<u64>().map_err(|_| {
318            ErrorCode::InvalidInputSyntax(format!("invalid u64 value for {}: {}", name, n)).into()
319        }),
320        _ => Err(ErrorCode::InvalidInputSyntax(format!(
321            "expected numeric value for {}, got {}",
322            name, v
323        ))
324        .into()),
325    }
326}
327
328fn parse_u32_value(value: &SetVariableValue, name: &str, default_value: u32) -> Result<u32> {
329    if matches!(value, SetVariableValue::Default) {
330        return Ok(default_value);
331    }
332
333    let v = expect_single_literal(value, name)?;
334    match v {
335        Value::Number(n) => n.parse::<u32>().map_err(|_| {
336            ErrorCode::InvalidInputSyntax(format!("invalid u32 value for {}: {}", name, n)).into()
337        }),
338        _ => Err(ErrorCode::InvalidInputSyntax(format!(
339            "expected numeric value for {}, got {}",
340            name, v
341        ))
342        .into()),
343    }
344}
345
346fn parse_bool_value(value: &SetVariableValue, name: &str, default_value: bool) -> Result<bool> {
347    if matches!(value, SetVariableValue::Default) {
348        return Ok(default_value);
349    }
350
351    if let SetVariableValue::Single(SetVariableValueSingle::Ident(ident)) = value {
352        return parse_bool_str(name, &ident.real_value());
353    }
354
355    let v = expect_single_literal(value, name)?;
356    match v {
357        Value::Boolean(b) => Ok(*b),
358        Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) | Value::Number(s) => {
359            parse_bool_str(name, s)
360        }
361        _ => Err(ErrorCode::InvalidInputSyntax(format!(
362            "expected boolean value for {}, got {}",
363            name, v
364        ))
365        .into()),
366    }
367}
368
369fn parse_bool_str(name: &str, raw: &str) -> Result<bool> {
370    match raw.to_ascii_lowercase().as_str() {
371        "true" | "on" | "1" => Ok(true),
372        "false" | "off" | "0" => Ok(false),
373        _ => Err(ErrorCode::InvalidInputSyntax(format!(
374            "invalid boolean value for {}: {}",
375            name, raw
376        ))
377        .into()),
378    }
379}
380
381fn parse_optional_u64_value(
382    value: &SetVariableValue,
383    name: &str,
384    default_value: Option<u64>,
385) -> Result<u64> {
386    if matches!(value, SetVariableValue::Default) {
387        return Ok(default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE));
388    }
389
390    parse_u64_value(
391        value,
392        name,
393        default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE),
394    )
395}
396
397fn parse_optional_positive_u64_value(
398    value: &SetVariableValue,
399    name: &str,
400    default_value: Option<u64>,
401) -> Result<u64> {
402    if matches!(value, SetVariableValue::Default) {
403        return Ok(default_value.unwrap_or(OPTIONAL_U64_UNSET_WIRE));
404    }
405
406    let value = parse_optional_u64_value(value, name, default_value)?;
407    if value == 0 || value == OPTIONAL_U64_UNSET_WIRE {
408        return Err(ErrorCode::InvalidInputSyntax(format!(
409            "{} must be between 1 and {}, or DEFAULT",
410            name,
411            OPTIONAL_U64_UNSET_WIRE - 1
412        ))
413        .into());
414    }
415    Ok(value)
416}
417
418fn parse_compression_algorithm_with_level(
419    value: &SetVariableValue,
420    name: &str,
421) -> Result<CompressionAlgorithm> {
422    // Format: 'level:algorithm' e.g., '3:lz4', '6:zstd', '0:none'
423    // level: non-negative integer (validation is done by meta service)
424    // algorithm: none, lz4, zstd
425    let v = expect_single_literal(value, name)?;
426    let input_str = match v {
427        Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => s.to_ascii_lowercase(),
428        _ => {
429            return Err(ErrorCode::InvalidInputSyntax(format!(
430                "expected string value for {} in format 'level:algorithm' (e.g., '3:lz4'), got {}",
431                name, v
432            ))
433            .into());
434        }
435    };
436
437    let (level_str, algo_str) = input_str.split_once(':').ok_or_else(|| {
438        ErrorCode::InvalidInputSyntax(format!(
439            "invalid format for {}: '{}', expected 'level:algorithm' (e.g., '3:lz4', '6:zstd')",
440            name, input_str
441        ))
442    })?;
443
444    let level: u32 = level_str.parse().map_err(|_| {
445        ErrorCode::InvalidInputSyntax(format!(
446            "invalid level for {}: '{}', expected a non-negative integer",
447            name, level_str
448        ))
449    })?;
450
451    // Note: level validation (whether it exceeds max_level) is done by meta service
452    // since max_level is per-compaction-group configuration
453
454    let algo_name = match algo_str {
455        "none" => "None",
456        "lz4" => "Lz4",
457        "zstd" => "Zstd",
458        _ => {
459            return Err(ErrorCode::InvalidInputSyntax(format!(
460                "invalid compression algorithm for {}: '{}', expected one of: none, lz4, zstd",
461                name, algo_str
462            ))
463            .into());
464        }
465    };
466
467    Ok(CompressionAlgorithm {
468        level,
469        compression_algorithm: algo_name.to_owned(),
470    })
471}