risingwave_meta/hummock/manager/compaction/
compaction_group_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config as default_compaction_config;
22use risingwave_common::util::epoch::INVALID_EPOCH;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_hummock_sdk::version::GroupDelta;
27use risingwave_meta_model::compaction_config;
28use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31    CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
32};
33use sea_orm::EntityTrait;
34use tokio::sync::OnceCell;
35
36use super::CompactionGroupStatistic;
37use crate::hummock::compaction::compaction_config::{
38    CompactionConfigBuilder, validate_compaction_config,
39};
40use crate::hummock::error::{Error, Result};
41use crate::hummock::manager::transaction::HummockVersionTransaction;
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::manager::{HummockManager, commit_multi_var};
44use crate::hummock::metrics_utils::remove_compaction_group_metrics;
45use crate::hummock::model::CompactionGroup;
46use crate::hummock::sequence::next_compaction_group_id;
47use crate::manager::MetaSrvEnv;
48use crate::model::{
49    BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
50};
51
52type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
53
54impl CompactionGroupManager {
55    pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
56        let default_config = match env.opts.compaction_config.as_ref() {
57            None => CompactionConfigBuilder::new().build(),
58            Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
59        };
60        Self::new_with_config(env, default_config).await
61    }
62
63    pub(crate) async fn new_with_config(
64        env: &MetaSrvEnv,
65        default_config: CompactionConfig,
66    ) -> Result<CompactionGroupManager> {
67        let mut compaction_group_manager = CompactionGroupManager {
68            compaction_groups: BTreeMap::new(),
69            default_config: Arc::new(default_config),
70            write_limit: Default::default(),
71        };
72
73        let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
74            compaction_config::Entity::find()
75                .all(&env.meta_store_ref().conn)
76                .await
77                .map_err(MetadataModelError::from)?
78                .into_iter()
79                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
80                .collect();
81
82        compaction_group_manager.init(loaded_compaction_groups);
83        Ok(compaction_group_manager)
84    }
85
86    fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
87        if !loaded_compaction_groups.is_empty() {
88            self.compaction_groups = loaded_compaction_groups;
89        }
90    }
91}
92
93impl HummockManager {
94    /// Should not be called inside [`HummockManager`], because it requests locks internally.
95    /// The implementation acquires `versioning` lock.
96    pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
97        get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
98    }
99
100    /// The implementation acquires `compaction_group_manager` lock.
101    pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
102        self.compaction_group_manager
103            .read()
104            .await
105            .compaction_groups
106            .clone()
107    }
108
109    #[cfg(test)]
110    /// Registers `table_fragments` to compaction groups.
111    pub async fn register_table_fragments(
112        &self,
113        mv_table: Option<TableId>,
114        mut internal_tables: Vec<TableId>,
115    ) -> Result<()> {
116        let mut pairs = vec![];
117        if let Some(mv_table) = mv_table {
118            if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
119                tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
120            }
121            // materialized_view
122            pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
123        }
124        // internal states
125        for table_id in internal_tables {
126            pairs.push((table_id, StaticCompactionGroupId::StateDefault));
127        }
128        self.register_table_ids_for_test(&pairs).await?;
129        Ok(())
130    }
131
132    #[cfg(test)]
133    /// Unregisters `table_fragments` from compaction groups
134    pub async fn unregister_table_fragments_vec(
135        &self,
136        table_fragments: &[crate::model::StreamJobFragments],
137    ) {
138        self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
139            .await
140            .unwrap();
141    }
142
143    /// Unregisters stale members and groups
144    /// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
145    /// Currently `purge` is only called during meta service start ups.
146    pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
147        let to_unregister = self
148            .versioning
149            .read()
150            .await
151            .current_version
152            .state_table_info
153            .info()
154            .keys()
155            .cloned()
156            .filter(|table_id| !valid_ids.contains(table_id))
157            .collect_vec();
158
159        // As we have released versioning lock, the version that `to_unregister` is calculated from
160        // may not be the same as the one used in unregister_table_ids. It is OK.
161        self.unregister_table_ids(to_unregister).await
162    }
163
164    /// The implementation acquires `versioning` lock.
165    ///
166    /// The method name is temporarily added with a `_for_test` prefix to mark
167    /// that it's currently only used in test.
168    pub async fn register_table_ids_for_test(
169        &self,
170        pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
171    ) -> Result<()> {
172        if pairs.is_empty() {
173            return Ok(());
174        }
175        let mut versioning_guard = self.versioning.write().await;
176        let versioning = versioning_guard.deref_mut();
177        let mut compaction_group_manager = self.compaction_group_manager.write().await;
178        let current_version = &versioning.current_version;
179        let default_config = compaction_group_manager.default_compaction_config();
180        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
181
182        for (table_id, _) in pairs {
183            let table_id = (*table_id).into();
184            if let Some(info) = current_version.state_table_info.info().get(&table_id) {
185                return Err(Error::CompactionGroup(format!(
186                    "table {} already {:?}",
187                    table_id, info
188                )));
189            }
190        }
191        // All NewCompactionGroup pairs are mapped to one new compaction group.
192        let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
193        let mut version = HummockVersionTransaction::new(
194            &mut versioning.current_version,
195            &mut versioning.hummock_version_deltas,
196            &mut versioning.table_change_log,
197            self.env.notification_manager(),
198            None,
199            &self.metrics,
200            &self.env.opts,
201        );
202        let mut new_version_delta = version.new_delta();
203
204        let committed_epoch = new_version_delta
205            .latest_version()
206            .state_table_info
207            .info()
208            .values()
209            .map(|info| info.committed_epoch)
210            .max()
211            .unwrap_or(INVALID_EPOCH);
212
213        for (table_id, raw_group_id) in pairs {
214            let table_id = (*table_id).into();
215            let mut group_id = *raw_group_id;
216            if group_id == StaticCompactionGroupId::NewCompactionGroup {
217                let mut is_group_init = false;
218                group_id = *new_compaction_group_id
219                    .get_or_try_init(|| async {
220                        next_compaction_group_id(&self.env).await.inspect(|_| {
221                            is_group_init = true;
222                        })
223                    })
224                    .await?;
225                if is_group_init {
226                    let group_deltas = &mut new_version_delta
227                        .group_deltas
228                        .entry(group_id)
229                        .or_default()
230                        .group_deltas;
231
232                    let config =
233                        match compaction_groups_txn.try_get_compaction_group_config(group_id) {
234                            Some(config) => config.compaction_config.as_ref().clone(),
235                            None => {
236                                compaction_groups_txn
237                                    .create_compaction_groups(group_id, default_config.clone());
238                                default_config.as_ref().clone()
239                            }
240                        };
241
242                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
243                        group_config: Some(config),
244                        group_id,
245                        ..Default::default()
246                    }));
247
248                    group_deltas.push(group_delta);
249                }
250            }
251            assert!(
252                new_version_delta
253                    .state_table_info_delta
254                    .insert(
255                        table_id,
256                        PbStateTableInfoDelta {
257                            committed_epoch,
258                            compaction_group_id: *raw_group_id,
259                        }
260                    )
261                    .is_none()
262            );
263        }
264        new_version_delta.pre_apply();
265        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
266
267        Ok(())
268    }
269
270    pub async fn unregister_table_ids(
271        &self,
272        table_ids: impl IntoIterator<Item = TableId>,
273    ) -> Result<()> {
274        let table_ids = table_ids.into_iter().collect_vec();
275        if table_ids.is_empty() {
276            return Ok(());
277        }
278
279        {
280            // Remove table write throughput statistics
281            // The Caller acquires `Send`, so we should safely use `write` lock before the await point.
282            // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
283            let mut table_write_throughput_statistic_manager =
284                self.table_write_throughput_statistic_manager.write();
285            for &table_id in table_ids.iter().unique() {
286                table_write_throughput_statistic_manager.remove_table(table_id);
287            }
288        }
289
290        let mut versioning_guard = self.versioning.write().await;
291        let versioning = versioning_guard.deref_mut();
292        let mut version = HummockVersionTransaction::new(
293            &mut versioning.current_version,
294            &mut versioning.hummock_version_deltas,
295            &mut versioning.table_change_log,
296            self.env.notification_manager(),
297            None,
298            &self.metrics,
299            &self.env.opts,
300        );
301        let mut new_version_delta = version.new_delta();
302        struct UnregisterGroupChange {
303            remaining_member_count: usize,
304            removed_table_ids: HashSet<TableId>,
305        }
306        let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
307        // Remove member tables
308        for table_id in table_ids.into_iter().unique() {
309            let version = new_version_delta.latest_version();
310            let Some(info) = version.state_table_info.info().get(&table_id) else {
311                continue;
312            };
313            let compaction_group_id = info.compaction_group_id;
314
315            let group_change =
316                group_changes
317                    .entry(compaction_group_id)
318                    .or_insert_with(|| UnregisterGroupChange {
319                        remaining_member_count: version
320                            .state_table_info
321                            .compaction_group_member_tables()
322                            .get(&compaction_group_id)
323                            .expect("should exist")
324                            .len(),
325                        removed_table_ids: HashSet::new(),
326                    });
327            group_change.remaining_member_count = group_change
328                .remaining_member_count
329                .checked_sub(1)
330                .expect("member table count should be positive");
331            assert!(group_change.removed_table_ids.insert(table_id));
332            new_version_delta.removed_table_ids.insert(table_id);
333        }
334
335        for (group_id, change) in group_changes {
336            if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
337                let max_level = new_version_delta
338                    .latest_version()
339                    .get_compaction_group_levels(group_id)
340                    .levels
341                    .len();
342                new_version_delta
343                    .group_deltas
344                    .entry(group_id)
345                    .or_default()
346                    .group_deltas
347                    .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
348                remove_compaction_group_metrics(&self.metrics, group_id, max_level);
349                // clean up compaction schedule state for the removed group
350                self.compaction_state.remove_compaction_group(group_id);
351            } else {
352                new_version_delta
353                    .group_deltas
354                    .entry(group_id)
355                    .or_default()
356                    .group_deltas
357                    .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
358            }
359        }
360
361        new_version_delta.pre_apply();
362
363        // Purge may cause write to meta store. If it hurts performance while holding versioning
364        // lock, consider to make it in batch.
365        let mut compaction_group_manager = self.compaction_group_manager.write().await;
366        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
367
368        compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
369            version.latest_version(),
370        )));
371        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
372
373        // No need to handle DeltaType::GroupDestroy during time travel.
374        Ok(())
375    }
376
377    pub async fn update_compaction_config(
378        &self,
379        compaction_group_ids: &[CompactionGroupId],
380        config_to_update: &[MutableConfig],
381    ) -> Result<()> {
382        {
383            // Avoid lock conflicts with `try_update_write_limits``
384            let mut compaction_group_manager = self.compaction_group_manager.write().await;
385            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
386            compaction_groups_txn
387                .update_compaction_config(compaction_group_ids, config_to_update)?;
388            commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
389        }
390
391        if config_to_update
392            .iter()
393            .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
394        {
395            // Update write limits with lock
396            self.try_update_write_limits(compaction_group_ids).await;
397        }
398
399        Ok(())
400    }
401
402    /// Gets complete compaction group info.
403    /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
404    pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
405        let mut versioning_guard = self.versioning.write().await;
406        let versioning = versioning_guard.deref_mut();
407        let current_version = &versioning.current_version;
408        let mut results = vec![];
409        let compaction_group_manager = self.compaction_group_manager.read().await;
410
411        for levels in current_version.levels.values() {
412            let compaction_config = compaction_group_manager
413                .try_get_compaction_group_config(levels.group_id)
414                .unwrap()
415                .compaction_config
416                .as_ref()
417                .clone();
418            let group = CompactionGroupInfo {
419                id: levels.group_id,
420                parent_id: levels.parent_group_id,
421                member_table_ids: current_version
422                    .state_table_info
423                    .compaction_group_member_table_ids(levels.group_id)
424                    .iter()
425                    .copied()
426                    .collect_vec(),
427                compaction_config: Some(compaction_config),
428            };
429            results.push(group);
430        }
431        results
432    }
433
434    pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
435        let mut infos = vec![];
436        {
437            let versioning_guard = self.versioning.read().await;
438            let manager = self.compaction_group_manager.read().await;
439            let version = &versioning_guard.current_version;
440            for group_id in version.levels.keys() {
441                let mut group_info = CompactionGroupStatistic {
442                    group_id: *group_id,
443                    ..Default::default()
444                };
445                for table_id in version
446                    .state_table_info
447                    .compaction_group_member_table_ids(*group_id)
448                {
449                    let stats_size = versioning_guard
450                        .version_stats
451                        .table_stats
452                        .get(table_id)
453                        .map(|stats| stats.total_key_size + stats.total_value_size)
454                        .unwrap_or(0);
455                    let table_size = std::cmp::max(stats_size, 0) as u64;
456                    group_info.group_size += table_size;
457                    group_info.table_statistic.insert(*table_id, table_size);
458                    group_info.compaction_group_config =
459                        manager.try_get_compaction_group_config(*group_id).unwrap();
460                }
461                infos.push(group_info);
462            }
463        };
464        infos
465    }
466
467    pub(crate) async fn initial_compaction_group_config_after_load(
468        &self,
469        versioning_guard: &Versioning,
470        compaction_group_manager: &mut CompactionGroupManager,
471    ) -> Result<()> {
472        // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts.
473        let current_version = &versioning_guard.current_version;
474        let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
475        let default_config = compaction_group_manager.default_compaction_config();
476        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
477        compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
478        commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
479
480        Ok(())
481    }
482}
483
484/// We muse ensure there is an entry exists in [`CompactionGroupManager`] for any
485/// compaction group found in current hummock version. That's done by invoking
486/// `get_or_insert_compaction_group_config` or `get_or_insert_compaction_group_configs` before
487/// adding any group in current hummock version:
488/// 1. initialize default static compaction group.
489/// 2. register new table to new compaction group.
490/// 3. move existent table to new compaction group.
491pub(crate) struct CompactionGroupManager {
492    compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
493    default_config: Arc<CompactionConfig>,
494    /// Tables that write limit is trigger for.
495    pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
496}
497
498impl CompactionGroupManager {
499    /// Starts a transaction to update compaction group configs.
500    pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
501        CompactionGroupTransaction::new(&mut self.compaction_groups)
502    }
503
504    #[expect(clippy::type_complexity)]
505    pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
506        inner: P,
507    ) -> BTreeMapTransactionInner<
508        CompactionGroupId,
509        CompactionGroup,
510        DerefMutForward<
511            Self,
512            BTreeMap<CompactionGroupId, CompactionGroup>,
513            P,
514            impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
515            impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
516        >,
517    > {
518        BTreeMapTransactionInner::new(DerefMutForward::new(
519            inner,
520            |mgr| &mgr.compaction_groups,
521            |mgr| &mut mgr.compaction_groups,
522        ))
523    }
524
525    /// Tries to get compaction group config for `compaction_group_id`.
526    pub(crate) fn try_get_compaction_group_config(
527        &self,
528        compaction_group_id: impl Into<CompactionGroupId>,
529    ) -> Option<CompactionGroup> {
530        self.compaction_groups
531            .get(&compaction_group_id.into())
532            .cloned()
533    }
534
535    /// Tries to get compaction group config for `compaction_group_id`.
536    pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
537        self.default_config.clone()
538    }
539}
540
541fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) -> Result<()> {
542    for item in items {
543        match item {
544            MutableConfig::MaxBytesForLevelBase(c) => {
545                target.max_bytes_for_level_base = *c;
546            }
547            MutableConfig::MaxBytesForLevelMultiplier(c) => {
548                target.max_bytes_for_level_multiplier = *c;
549            }
550            MutableConfig::MaxCompactionBytes(c) => {
551                target.max_compaction_bytes = *c;
552            }
553            MutableConfig::SubLevelMaxCompactionBytes(c) => {
554                target.sub_level_max_compaction_bytes = *c;
555            }
556            MutableConfig::Level0TierCompactFileNumber(c) => {
557                target.level0_tier_compact_file_number = *c;
558            }
559            MutableConfig::TargetFileSizeBase(c) => {
560                target.target_file_size_base = *c;
561            }
562            MutableConfig::CompactionFilterMask(c) => {
563                target.compaction_filter_mask = *c;
564            }
565            MutableConfig::MaxSubCompaction(c) => {
566                target.max_sub_compaction = *c;
567            }
568            MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
569                target.level0_stop_write_threshold_sub_level_number = *c;
570            }
571            MutableConfig::Level0SubLevelCompactLevelCount(c) => {
572                target.level0_sub_level_compact_level_count = *c;
573            }
574            MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
575                target.level0_overlapping_sub_level_compact_level_count = *c;
576            }
577            MutableConfig::MaxSpaceReclaimBytes(c) => {
578                target.max_space_reclaim_bytes = *c;
579            }
580            MutableConfig::Level0MaxCompactFileNumber(c) => {
581                target.level0_max_compact_file_number = *c;
582            }
583            MutableConfig::EnableEmergencyPicker(c) => {
584                target.enable_emergency_picker = *c;
585            }
586            MutableConfig::TombstoneReclaimRatio(c) => {
587                target.tombstone_reclaim_ratio = *c;
588            }
589            MutableConfig::CompressionAlgorithm(c) => {
590                let level = c.get_level();
591                let max_level = try_u32_max_level(target.max_level)?;
592                if level > max_level {
593                    return Err(Error::CompactionGroup(format!(
594                        "invalid compression_algorithm level {}, max_level is {}",
595                        level, target.max_level
596                    )));
597                }
598
599                let Some(algorithm) = target.compression_algorithm.get_mut(level as usize) else {
600                    return Err(Error::CompactionGroup(format!(
601                        "invalid compression_algorithm level {}, compression_algorithm len is {}",
602                        level,
603                        target.compression_algorithm.len()
604                    )));
605                };
606                algorithm.clone_from(&c.compression_algorithm);
607            }
608            MutableConfig::ResetCompressionAlgorithm(reset) => {
609                if *reset {
610                    target.compression_algorithm =
611                        default_compaction_config::compression_algorithm_vec(try_u32_max_level(
612                            target.max_level,
613                        )?);
614                }
615            }
616            MutableConfig::MaxL0CompactLevelCount(c) => {
617                target.max_l0_compact_level_count = Some(*c);
618            }
619            MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
620                target.sst_allowed_trivial_move_min_size = Some(*c);
621            }
622            MutableConfig::SplitWeightByVnode(c) => {
623                target.split_weight_by_vnode = *c;
624            }
625            MutableConfig::DisableAutoGroupScheduling(c) => {
626                target.disable_auto_group_scheduling = Some(*c);
627            }
628            MutableConfig::MaxOverlappingLevelSize(c) => {
629                target.max_overlapping_level_size = Some(*c);
630            }
631            MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
632                target.sst_allowed_trivial_move_max_count = Some(*c);
633            }
634            MutableConfig::EmergencyLevel0SstFileCount(c) => {
635                target.emergency_level0_sst_file_count = Some(*c);
636            }
637            MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
638                target.emergency_level0_sub_level_partition = Some(*c);
639            }
640            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
641                target.level0_stop_write_threshold_max_sst_count = Some(*c);
642            }
643            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
644                target.level0_stop_write_threshold_max_size = Some(*c);
645            }
646            MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
647                target.enable_optimize_l0_interval_selection = Some(*c);
648            }
649            #[expect(deprecated)]
650            MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
651                // Deprecated. Keep accepting the field for old clients but do not apply it.
652            }
653            MutableConfig::MaxKvCountForXor16(c) => {
654                target.max_kv_count_for_xor16 = optional_non_sentinel_u64_config(*c);
655            }
656            MutableConfig::MaxVnodeKeyRangeBytes(c) => {
657                target.max_vnode_key_range_bytes = optional_positive_u64_config(*c);
658            }
659            MutableConfig::SstableFilterKind(c) => {
660                if target.sstable_filter_kind.is_empty() {
661                    target.sstable_filter_kind =
662                        vec!["xor16".to_owned(); target.max_level as usize + 1];
663                }
664                let idx = c.get_level() as usize;
665                let level_entry = target.sstable_filter_kind.get_mut(idx).ok_or_else(|| {
666                    Error::CompactionGroup(format!(
667                        "sstable_filter_kind level {} is out of range",
668                        idx
669                    ))
670                })?;
671                level_entry.clone_from(&c.filter_kind);
672            }
673            MutableConfig::SstableFilterLayout(c) => {
674                if target.sstable_filter_layout.is_empty() {
675                    target.sstable_filter_layout =
676                        vec!["auto".to_owned(); target.max_level as usize + 1];
677                }
678                let idx = c.get_level() as usize;
679                let level_entry = target.sstable_filter_layout.get_mut(idx).ok_or_else(|| {
680                    Error::CompactionGroup(format!(
681                        "sstable_filter_layout level {} is out of range",
682                        idx
683                    ))
684                })?;
685                level_entry.clone_from(&c.layout);
686            }
687        }
688    }
689    Ok(())
690}
691
692fn optional_u64_config(value: u64) -> Option<u64> {
693    (value != u64::MIN && value != u64::MAX).then_some(value)
694}
695
696fn optional_non_sentinel_u64_config(value: u64) -> Option<u64> {
697    (value != u64::MAX).then_some(value)
698}
699
700fn optional_positive_u64_config(value: u64) -> Option<u64> {
701    optional_u64_config(value).filter(|value| *value > 0)
702}
703
704fn try_u32_max_level(max_level: u64) -> Result<u32> {
705    u32::try_from(max_level).map_err(|_| {
706        Error::CompactionGroup(format!(
707            "invalid max_level {}, expect <= {}",
708            max_level,
709            u32::MAX
710        ))
711    })
712}
713
714impl CompactionGroupTransaction<'_> {
715    /// Inserts compaction group configs if they do not exist.
716    pub fn try_create_compaction_groups(
717        &mut self,
718        compaction_group_ids: &[CompactionGroupId],
719        config: Arc<CompactionConfig>,
720    ) -> bool {
721        let mut trivial = true;
722        for id in compaction_group_ids {
723            if self.contains_key(id) {
724                continue;
725            }
726            let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
727            self.insert(*id, new_entry);
728
729            trivial = false;
730        }
731
732        !trivial
733    }
734
735    pub fn create_compaction_groups(
736        &mut self,
737        compaction_group_id: CompactionGroupId,
738        config: Arc<CompactionConfig>,
739    ) {
740        self.try_create_compaction_groups(&[compaction_group_id], config);
741    }
742
743    /// Tries to get compaction group config for `compaction_group_id`.
744    pub(crate) fn try_get_compaction_group_config(
745        &self,
746        compaction_group_id: CompactionGroupId,
747    ) -> Option<&CompactionGroup> {
748        self.get(&compaction_group_id)
749    }
750
751    /// Removes stale group configs.
752    pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
753        let stale_group = self
754            .tree_ref()
755            .keys()
756            .cloned()
757            .filter(|k| !existing_groups.contains(k))
758            .collect_vec();
759        if stale_group.is_empty() {
760            return;
761        }
762        for group in stale_group {
763            self.remove(group);
764        }
765    }
766
767    pub(crate) fn update_compaction_config(
768        &mut self,
769        compaction_group_ids: &[CompactionGroupId],
770        config_to_update: &[MutableConfig],
771    ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
772        let mut results = HashMap::default();
773        for compaction_group_id in compaction_group_ids.iter().unique() {
774            let group = self.get(compaction_group_id).ok_or_else(|| {
775                Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
776            })?;
777            let mut config = group.compaction_config.as_ref().clone();
778            update_compaction_config(&mut config, config_to_update)?;
779            if let Err(reason) = validate_compaction_config(&config) {
780                return Err(Error::CompactionGroup(reason));
781            }
782            let mut new_group = group.clone();
783            new_group.compaction_config = Arc::new(config);
784            self.insert(*compaction_group_id, new_group.clone());
785            results.insert(new_group.group_id(), new_group);
786        }
787
788        Ok(results)
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use std::collections::{BTreeMap, HashSet};
795    use std::sync::Arc;
796
797    use itertools::Itertools;
798    use risingwave_common::id::JobId;
799    use risingwave_hummock_sdk::CompactionGroupId;
800    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
801    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
802        CompressionAlgorithm, SstableFilterKind, SstableFilterLayout,
803    };
804
805    use crate::controller::SqlMetaStore;
806    use crate::hummock::commit_multi_var;
807    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
808    use crate::hummock::error::Result;
809    use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
810    use crate::hummock::test_utils::setup_compute_env;
811    use crate::model::{Fragment, StreamJobFragments};
812
813    #[test]
814    fn test_update_compaction_config_filter_kind_layout_backward_compat() {
815        let mut config = CompactionConfigBuilder::new().build();
816        config.sstable_filter_kind.clear();
817        config.sstable_filter_layout.clear();
818
819        super::update_compaction_config(
820            &mut config,
821            &[MutableConfig::SstableFilterKind(SstableFilterKind {
822                level: 0,
823                filter_kind: "xor8".to_owned(),
824            })],
825        )
826        .unwrap();
827        assert_eq!(
828            config.sstable_filter_kind.len(),
829            config.max_level as usize + 1
830        );
831        assert_eq!(config.sstable_filter_kind[0], "xor8");
832
833        super::update_compaction_config(
834            &mut config,
835            &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
836                level: 1,
837                layout: "plain".to_owned(),
838            })],
839        )
840        .unwrap();
841        assert_eq!(
842            config.sstable_filter_layout.len(),
843            config.max_level as usize + 1
844        );
845        assert_eq!(config.sstable_filter_layout[1], "plain");
846    }
847
848    #[test]
849    fn test_update_compaction_config_rejects_out_of_range_level() {
850        let mut config = CompactionConfigBuilder::new().build();
851        let oob = config.max_level as u32 + 1;
852
853        assert!(
854            super::update_compaction_config(
855                &mut config,
856                &[MutableConfig::SstableFilterKind(SstableFilterKind {
857                    level: oob,
858                    filter_kind: "xor8".to_owned(),
859                })],
860            )
861            .is_err()
862        );
863
864        assert!(
865            super::update_compaction_config(
866                &mut config,
867                &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
868                    level: oob,
869                    layout: "plain".to_owned(),
870                })],
871            )
872            .is_err()
873        );
874
875        assert!(
876            super::update_compaction_config(
877                &mut config,
878                &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
879                    level: oob,
880                    compression_algorithm: "Zstd".to_owned(),
881                })],
882            )
883            .is_err()
884        );
885    }
886
887    #[test]
888    fn test_reset_compression_algorithm_false_is_noop() {
889        let mut config = CompactionConfigBuilder::new().build();
890        config.compression_algorithm[3] = "Zstd".to_owned();
891
892        super::update_compaction_config(
893            &mut config,
894            &[MutableConfig::ResetCompressionAlgorithm(false)],
895        )
896        .unwrap();
897
898        assert_eq!(config.compression_algorithm[3], "Zstd");
899    }
900
901    #[tokio::test]
902    async fn test_inner() {
903        let (env, ..) = setup_compute_env(8080).await;
904        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
905        assert_eq!(inner.compaction_groups.len(), 2);
906
907        async fn update_compaction_config(
908            meta: &SqlMetaStore,
909            inner: &mut CompactionGroupManager,
910            cg_ids: &[impl Into<CompactionGroupId> + Copy],
911            config_to_update: &[MutableConfig],
912        ) -> Result<()> {
913            let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
914            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
915            compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
916            commit_multi_var!(meta, compaction_groups_txn)
917        }
918
919        async fn insert_compaction_group_configs(
920            meta: &SqlMetaStore,
921            inner: &mut CompactionGroupManager,
922            cg_ids: &[u64],
923        ) {
924            let default_config = inner.default_compaction_config();
925            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
926            if compaction_groups_txn.try_create_compaction_groups(
927                &cg_ids.iter().copied().map_into().collect_vec(),
928                default_config,
929            ) {
930                commit_multi_var!(meta, compaction_groups_txn).unwrap();
931            }
932        }
933
934        async fn insert_compaction_group_config_with_max_level(
935            meta: &SqlMetaStore,
936            inner: &mut CompactionGroupManager,
937            cg_id: u64,
938            max_level: u64,
939        ) {
940            let mut config = inner.default_compaction_config().as_ref().clone();
941            config.max_level = max_level;
942            config.compression_algorithm =
943                super::default_compaction_config::compression_algorithm_vec(
944                    super::try_u32_max_level(max_level).expect("max_level should fit u32 in test"),
945                );
946            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
947            compaction_groups_txn.create_compaction_groups(cg_id.into(), Arc::new(config));
948            commit_multi_var!(meta, compaction_groups_txn).unwrap();
949        }
950
951        update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
952            .await
953            .unwrap_err();
954        insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
955        assert_eq!(inner.compaction_groups.len(), 4);
956        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
957        assert_eq!(inner.compaction_groups.len(), 4);
958
959        update_compaction_config(
960            env.meta_store_ref(),
961            &mut inner,
962            &[100, 200],
963            &[MutableConfig::MaxSubCompaction(123)],
964        )
965        .await
966        .unwrap();
967        assert_eq!(inner.compaction_groups.len(), 4);
968        assert_eq!(
969            inner
970                .try_get_compaction_group_config(100)
971                .unwrap()
972                .compaction_config
973                .max_sub_compaction,
974            123
975        );
976        assert_eq!(
977            inner
978                .try_get_compaction_group_config(200)
979                .unwrap()
980                .compaction_config
981                .max_sub_compaction,
982            123
983        );
984
985        insert_compaction_group_config_with_max_level(env.meta_store_ref(), &mut inner, 300, 4)
986            .await;
987        update_compaction_config(
988            env.meta_store_ref(),
989            &mut inner,
990            &[300],
991            &[MutableConfig::ResetCompressionAlgorithm(true)],
992        )
993        .await
994        .unwrap();
995        assert_eq!(
996            inner
997                .try_get_compaction_group_config(300)
998                .unwrap()
999                .compaction_config
1000                .compression_algorithm,
1001            super::default_compaction_config::compression_algorithm_vec(4)
1002        );
1003        let err = update_compaction_config(
1004            env.meta_store_ref(),
1005            &mut inner,
1006            &[300],
1007            &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
1008                level: 6,
1009                compression_algorithm: "Zstd".to_owned(),
1010            })],
1011        )
1012        .await
1013        .unwrap_err();
1014        assert!(
1015            err.to_string()
1016                .contains("invalid compression_algorithm level 6")
1017        );
1018
1019        update_compaction_config(
1020            env.meta_store_ref(),
1021            &mut inner,
1022            &[100],
1023            &[MutableConfig::MaxKvCountForXor16(0)],
1024        )
1025        .await
1026        .unwrap();
1027        assert_eq!(
1028            inner
1029                .try_get_compaction_group_config(100)
1030                .unwrap()
1031                .compaction_config
1032                .max_kv_count_for_xor16,
1033            Some(0)
1034        );
1035        update_compaction_config(
1036            env.meta_store_ref(),
1037            &mut inner,
1038            &[100],
1039            &[MutableConfig::MaxKvCountForXor16(1024)],
1040        )
1041        .await
1042        .unwrap();
1043        assert_eq!(
1044            inner
1045                .try_get_compaction_group_config(100)
1046                .unwrap()
1047                .compaction_config
1048                .max_kv_count_for_xor16,
1049            Some(1024)
1050        );
1051        update_compaction_config(
1052            env.meta_store_ref(),
1053            &mut inner,
1054            &[100],
1055            &[MutableConfig::MaxKvCountForXor16(u64::MAX)],
1056        )
1057        .await
1058        .unwrap();
1059        assert_eq!(
1060            inner
1061                .try_get_compaction_group_config(100)
1062                .unwrap()
1063                .compaction_config
1064                .max_kv_count_for_xor16,
1065            None
1066        );
1067
1068        update_compaction_config(
1069            env.meta_store_ref(),
1070            &mut inner,
1071            &[100],
1072            &[MutableConfig::MaxVnodeKeyRangeBytes(0)],
1073        )
1074        .await
1075        .unwrap();
1076        assert_eq!(
1077            inner
1078                .try_get_compaction_group_config(100)
1079                .unwrap()
1080                .compaction_config
1081                .max_vnode_key_range_bytes,
1082            None
1083        );
1084        update_compaction_config(
1085            env.meta_store_ref(),
1086            &mut inner,
1087            &[100],
1088            &[MutableConfig::MaxVnodeKeyRangeBytes(1024)],
1089        )
1090        .await
1091        .unwrap();
1092        assert_eq!(
1093            inner
1094                .try_get_compaction_group_config(100)
1095                .unwrap()
1096                .compaction_config
1097                .max_vnode_key_range_bytes,
1098            Some(1024)
1099        );
1100    }
1101
1102    #[tokio::test]
1103    async fn test_manager() {
1104        let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
1105        let table_fragment_1 = StreamJobFragments::for_test(
1106            JobId::new(10),
1107            BTreeMap::from([(
1108                1.into(),
1109                Fragment {
1110                    fragment_id: 1.into(),
1111                    state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
1112                    ..Default::default()
1113                },
1114            )]),
1115        );
1116        let table_fragment_2 = StreamJobFragments::for_test(
1117            JobId::new(20),
1118            BTreeMap::from([(
1119                2.into(),
1120                Fragment {
1121                    fragment_id: 2.into(),
1122                    state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
1123                    ..Default::default()
1124                },
1125            )]),
1126        );
1127
1128        // Test register_table_fragments
1129        let registered_number = || async {
1130            compaction_group_manager
1131                .list_compaction_group()
1132                .await
1133                .iter()
1134                .map(|cg| cg.member_table_ids.len())
1135                .sum::<usize>()
1136        };
1137        let group_number =
1138            || async { compaction_group_manager.list_compaction_group().await.len() };
1139        assert_eq!(registered_number().await, 0);
1140
1141        compaction_group_manager
1142            .register_table_fragments(
1143                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1144                table_fragment_1
1145                    .internal_table_ids()
1146                    .into_iter()
1147                    .map_into()
1148                    .collect(),
1149            )
1150            .await
1151            .unwrap();
1152        assert_eq!(registered_number().await, 4);
1153        compaction_group_manager
1154            .register_table_fragments(
1155                Some(table_fragment_2.stream_job_id().as_mv_table_id()),
1156                table_fragment_2
1157                    .internal_table_ids()
1158                    .into_iter()
1159                    .map_into()
1160                    .collect(),
1161            )
1162            .await
1163            .unwrap();
1164        assert_eq!(registered_number().await, 8);
1165
1166        // Test unregister_table_fragments
1167        compaction_group_manager
1168            .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
1169            .await;
1170        assert_eq!(registered_number().await, 4);
1171
1172        // Test purge_stale_members: table fragments
1173        compaction_group_manager
1174            .purge(&table_fragment_2.all_table_ids().collect())
1175            .await
1176            .unwrap();
1177        assert_eq!(registered_number().await, 4);
1178        compaction_group_manager
1179            .purge(&HashSet::new())
1180            .await
1181            .unwrap();
1182        assert_eq!(registered_number().await, 0);
1183
1184        assert_eq!(group_number().await, 2);
1185
1186        compaction_group_manager
1187            .register_table_fragments(
1188                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1189                table_fragment_1
1190                    .internal_table_ids()
1191                    .into_iter()
1192                    .map_into()
1193                    .collect(),
1194            )
1195            .await
1196            .unwrap();
1197        assert_eq!(registered_number().await, 4);
1198        assert_eq!(group_number().await, 2);
1199
1200        compaction_group_manager
1201            .unregister_table_fragments_vec(&[table_fragment_1])
1202            .await;
1203        assert_eq!(registered_number().await, 0);
1204        assert_eq!(group_number().await, 2);
1205    }
1206}