risingwave_meta/hummock/manager/compaction/
compaction_group_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37    CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48    BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54    pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55        let default_config = match env.opts.compaction_config.as_ref() {
56            None => CompactionConfigBuilder::new().build(),
57            Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58        };
59        Self::new_with_config(env, default_config).await
60    }
61
62    pub(crate) async fn new_with_config(
63        env: &MetaSrvEnv,
64        default_config: CompactionConfig,
65    ) -> Result<CompactionGroupManager> {
66        let mut compaction_group_manager = CompactionGroupManager {
67            compaction_groups: BTreeMap::new(),
68            default_config: Arc::new(default_config),
69            write_limit: Default::default(),
70        };
71
72        let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73            compaction_config::Entity::find()
74                .all(&env.meta_store_ref().conn)
75                .await
76                .map_err(MetadataModelError::from)?
77                .into_iter()
78                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79                .collect();
80
81        compaction_group_manager.init(loaded_compaction_groups);
82        Ok(compaction_group_manager)
83    }
84
85    fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86        if !loaded_compaction_groups.is_empty() {
87            self.compaction_groups = loaded_compaction_groups;
88        }
89    }
90}
91
92impl HummockManager {
93    /// Should not be called inside [`HummockManager`], because it requests locks internally.
94    /// The implementation acquires `versioning` lock.
95    pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96        get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97    }
98
99    /// The implementation acquires `compaction_group_manager` lock.
100    pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101        self.compaction_group_manager
102            .read()
103            .await
104            .compaction_groups
105            .clone()
106    }
107
108    #[cfg(test)]
109    /// Registers `table_fragments` to compaction groups.
110    pub async fn register_table_fragments(
111        &self,
112        mv_table: Option<TableId>,
113        mut internal_tables: Vec<TableId>,
114    ) -> Result<()> {
115        let mut pairs = vec![];
116        if let Some(mv_table) = mv_table {
117            if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118                tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119            }
120            // materialized_view
121            pairs.push((
122                mv_table,
123                CompactionGroupId::from(StaticCompactionGroupId::MaterializedView),
124            ));
125        }
126        // internal states
127        for table_id in internal_tables {
128            pairs.push((
129                table_id,
130                CompactionGroupId::from(StaticCompactionGroupId::StateDefault),
131            ));
132        }
133        self.register_table_ids_for_test(&pairs).await?;
134        Ok(())
135    }
136
137    #[cfg(test)]
138    /// Unregisters `table_fragments` from compaction groups
139    pub async fn unregister_table_fragments_vec(
140        &self,
141        table_fragments: &[crate::model::StreamJobFragments],
142    ) {
143        self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
144            .await
145            .unwrap();
146    }
147
148    /// Unregisters stale members and groups
149    /// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
150    /// Currently `purge` is only called during meta service start ups.
151    pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
152        let to_unregister = self
153            .versioning
154            .read()
155            .await
156            .current_version
157            .state_table_info
158            .info()
159            .keys()
160            .cloned()
161            .filter(|table_id| !valid_ids.contains(table_id))
162            .collect_vec();
163
164        // As we have released versioning lock, the version that `to_unregister` is calculated from
165        // may not be the same as the one used in unregister_table_ids. It is OK.
166        self.unregister_table_ids(to_unregister).await
167    }
168
169    /// The implementation acquires `versioning` lock.
170    ///
171    /// The method name is temporarily added with a `_for_test` prefix to mark
172    /// that it's currently only used in test.
173    pub async fn register_table_ids_for_test(
174        &self,
175        pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
176    ) -> Result<()> {
177        if pairs.is_empty() {
178            return Ok(());
179        }
180        let mut versioning_guard = self.versioning.write().await;
181        let versioning = versioning_guard.deref_mut();
182        let mut compaction_group_manager = self.compaction_group_manager.write().await;
183        let current_version = &versioning.current_version;
184        let default_config = compaction_group_manager.default_compaction_config();
185        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
186
187        for (table_id, _) in pairs {
188            let table_id = (*table_id).into();
189            if let Some(info) = current_version.state_table_info.info().get(&table_id) {
190                return Err(Error::CompactionGroup(format!(
191                    "table {} already {:?}",
192                    table_id, info
193                )));
194            }
195        }
196        // All NewCompactionGroup pairs are mapped to one new compaction group.
197        let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
198        let mut version = HummockVersionTransaction::new(
199            &mut versioning.current_version,
200            &mut versioning.hummock_version_deltas,
201            self.env.notification_manager(),
202            None,
203            &self.metrics,
204        );
205        let mut new_version_delta = version.new_delta();
206
207        let committed_epoch = new_version_delta
208            .latest_version()
209            .state_table_info
210            .info()
211            .values()
212            .map(|info| info.committed_epoch)
213            .max()
214            .unwrap_or(INVALID_EPOCH);
215
216        for (table_id, raw_group_id) in pairs {
217            let table_id = (*table_id).into();
218            let mut group_id = *raw_group_id;
219            if group_id == StaticCompactionGroupId::NewCompactionGroup as u64 {
220                let mut is_group_init = false;
221                group_id = *new_compaction_group_id
222                    .get_or_try_init(|| async {
223                        next_compaction_group_id(&self.env).await.inspect(|_| {
224                            is_group_init = true;
225                        })
226                    })
227                    .await?;
228                if is_group_init {
229                    let group_deltas = &mut new_version_delta
230                        .group_deltas
231                        .entry(group_id)
232                        .or_default()
233                        .group_deltas;
234
235                    let config =
236                        match compaction_groups_txn.try_get_compaction_group_config(group_id) {
237                            Some(config) => config.compaction_config.as_ref().clone(),
238                            None => {
239                                compaction_groups_txn
240                                    .create_compaction_groups(group_id, default_config.clone());
241                                default_config.as_ref().clone()
242                            }
243                        };
244
245                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
246                        group_config: Some(config),
247                        group_id,
248                        ..Default::default()
249                    }));
250
251                    group_deltas.push(group_delta);
252                }
253            }
254            assert!(
255                new_version_delta
256                    .state_table_info_delta
257                    .insert(
258                        table_id,
259                        PbStateTableInfoDelta {
260                            committed_epoch,
261                            compaction_group_id: *raw_group_id,
262                        }
263                    )
264                    .is_none()
265            );
266        }
267        new_version_delta.pre_apply();
268        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
269
270        Ok(())
271    }
272
273    pub async fn unregister_table_ids(
274        &self,
275        table_ids: impl IntoIterator<Item = TableId>,
276    ) -> Result<()> {
277        let table_ids = table_ids.into_iter().collect_vec();
278        if table_ids.is_empty() {
279            return Ok(());
280        }
281
282        {
283            // Remove table write throughput statistics
284            // The Caller acquires `Send`, so we should safely use `write` lock before the await point.
285            // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
286            let mut table_write_throughput_statistic_manager =
287                self.table_write_throughput_statistic_manager.write();
288            for &table_id in table_ids.iter().unique() {
289                table_write_throughput_statistic_manager.remove_table(table_id);
290            }
291        }
292
293        let mut versioning_guard = self.versioning.write().await;
294        let versioning = versioning_guard.deref_mut();
295        let mut version = HummockVersionTransaction::new(
296            &mut versioning.current_version,
297            &mut versioning.hummock_version_deltas,
298            self.env.notification_manager(),
299            None,
300            &self.metrics,
301        );
302        let mut new_version_delta = version.new_delta();
303        let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
304            HashMap::new();
305        // Remove member tables
306        for table_id in table_ids.into_iter().unique() {
307            let version = new_version_delta.latest_version();
308            let Some(info) = version.state_table_info.info().get(&table_id) else {
309                continue;
310            };
311
312            modified_groups
313                .entry(info.compaction_group_id)
314                .and_modify(|count| *count -= 1)
315                .or_insert(
316                    version
317                        .state_table_info
318                        .compaction_group_member_tables()
319                        .get(&info.compaction_group_id)
320                        .expect("should exist")
321                        .len() as u64
322                        - 1,
323                );
324            new_version_delta.removed_table_ids.insert(table_id);
325        }
326
327        let groups_to_remove = modified_groups
328            .into_iter()
329            .filter_map(|(group_id, member_count)| {
330                if member_count == 0 && group_id > StaticCompactionGroupId::End as CompactionGroupId
331                {
332                    return Some((
333                        group_id,
334                        new_version_delta
335                            .latest_version()
336                            .get_compaction_group_levels(group_id)
337                            .levels
338                            .len(),
339                    ));
340                }
341                None
342            })
343            .collect_vec();
344        for (group_id, _) in &groups_to_remove {
345            let group_deltas = &mut new_version_delta
346                .group_deltas
347                .entry(*group_id)
348                .or_default()
349                .group_deltas;
350
351            let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
352            group_deltas.push(group_delta);
353        }
354
355        for (group_id, max_level) in groups_to_remove {
356            remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
357        }
358
359        new_version_delta.pre_apply();
360
361        // Purge may cause write to meta store. If it hurts performance while holding versioning
362        // lock, consider to make it in batch.
363        let mut compaction_group_manager = self.compaction_group_manager.write().await;
364        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
365
366        compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
367            version.latest_version(),
368        )));
369        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
370
371        // No need to handle DeltaType::GroupDestroy during time travel.
372        Ok(())
373    }
374
375    pub async fn update_compaction_config(
376        &self,
377        compaction_group_ids: &[CompactionGroupId],
378        config_to_update: &[MutableConfig],
379    ) -> Result<()> {
380        {
381            // Avoid lock conflicts with `try_update_write_limits``
382            let mut compaction_group_manager = self.compaction_group_manager.write().await;
383            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
384            compaction_groups_txn
385                .update_compaction_config(compaction_group_ids, config_to_update)?;
386            commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
387        }
388
389        if config_to_update
390            .iter()
391            .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
392        {
393            // Update write limits with lock
394            self.try_update_write_limits(compaction_group_ids).await;
395        }
396
397        Ok(())
398    }
399
400    /// Gets complete compaction group info.
401    /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
402    pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
403        let mut versioning_guard = self.versioning.write().await;
404        let versioning = versioning_guard.deref_mut();
405        let current_version = &versioning.current_version;
406        let mut results = vec![];
407        let compaction_group_manager = self.compaction_group_manager.read().await;
408
409        for levels in current_version.levels.values() {
410            let compaction_config = compaction_group_manager
411                .try_get_compaction_group_config(levels.group_id)
412                .unwrap()
413                .compaction_config
414                .as_ref()
415                .clone();
416            let group = CompactionGroupInfo {
417                id: levels.group_id,
418                parent_id: levels.parent_group_id,
419                member_table_ids: current_version
420                    .state_table_info
421                    .compaction_group_member_table_ids(levels.group_id)
422                    .iter()
423                    .copied()
424                    .collect_vec(),
425                compaction_config: Some(compaction_config),
426            };
427            results.push(group);
428        }
429        results
430    }
431
432    pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
433        let mut infos = vec![];
434        {
435            let versioning_guard = self.versioning.read().await;
436            let manager = self.compaction_group_manager.read().await;
437            let version = &versioning_guard.current_version;
438            for group_id in version.levels.keys() {
439                let mut group_info = CompactionGroupStatistic {
440                    group_id: *group_id,
441                    ..Default::default()
442                };
443                for table_id in version
444                    .state_table_info
445                    .compaction_group_member_table_ids(*group_id)
446                {
447                    let stats_size = versioning_guard
448                        .version_stats
449                        .table_stats
450                        .get(table_id)
451                        .map(|stats| stats.total_key_size + stats.total_value_size)
452                        .unwrap_or(0);
453                    let table_size = std::cmp::max(stats_size, 0) as u64;
454                    group_info.group_size += table_size;
455                    group_info.table_statistic.insert(*table_id, table_size);
456                    group_info.compaction_group_config =
457                        manager.try_get_compaction_group_config(*group_id).unwrap();
458                }
459                infos.push(group_info);
460            }
461        };
462        infos
463    }
464
465    pub(crate) async fn initial_compaction_group_config_after_load(
466        &self,
467        versioning_guard: &Versioning,
468        compaction_group_manager: &mut CompactionGroupManager,
469    ) -> Result<()> {
470        // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts.
471        let current_version = &versioning_guard.current_version;
472        let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
473        let default_config = compaction_group_manager.default_compaction_config();
474        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
475        compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
476        commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
477
478        Ok(())
479    }
480}
481
482/// We muse ensure there is an entry exists in [`CompactionGroupManager`] for any
483/// compaction group found in current hummock version. That's done by invoking
484/// `get_or_insert_compaction_group_config` or `get_or_insert_compaction_group_configs` before
485/// adding any group in current hummock version:
486/// 1. initialize default static compaction group.
487/// 2. register new table to new compaction group.
488/// 3. move existent table to new compaction group.
489pub(crate) struct CompactionGroupManager {
490    compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
491    default_config: Arc<CompactionConfig>,
492    /// Tables that write limit is trigger for.
493    pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
494}
495
496impl CompactionGroupManager {
497    /// Starts a transaction to update compaction group configs.
498    pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
499        CompactionGroupTransaction::new(&mut self.compaction_groups)
500    }
501
502    #[expect(clippy::type_complexity)]
503    pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
504        inner: P,
505    ) -> BTreeMapTransactionInner<
506        CompactionGroupId,
507        CompactionGroup,
508        DerefMutForward<
509            Self,
510            BTreeMap<CompactionGroupId, CompactionGroup>,
511            P,
512            impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
513            impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
514        >,
515    > {
516        BTreeMapTransactionInner::new(DerefMutForward::new(
517            inner,
518            |mgr| &mgr.compaction_groups,
519            |mgr| &mut mgr.compaction_groups,
520        ))
521    }
522
523    /// Tries to get compaction group config for `compaction_group_id`.
524    pub(crate) fn try_get_compaction_group_config(
525        &self,
526        compaction_group_id: CompactionGroupId,
527    ) -> Option<CompactionGroup> {
528        self.compaction_groups.get(&compaction_group_id).cloned()
529    }
530
531    /// Tries to get compaction group config for `compaction_group_id`.
532    pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
533        self.default_config.clone()
534    }
535}
536
537fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
538    for item in items {
539        match item {
540            MutableConfig::MaxBytesForLevelBase(c) => {
541                target.max_bytes_for_level_base = *c;
542            }
543            MutableConfig::MaxBytesForLevelMultiplier(c) => {
544                target.max_bytes_for_level_multiplier = *c;
545            }
546            MutableConfig::MaxCompactionBytes(c) => {
547                target.max_compaction_bytes = *c;
548            }
549            MutableConfig::SubLevelMaxCompactionBytes(c) => {
550                target.sub_level_max_compaction_bytes = *c;
551            }
552            MutableConfig::Level0TierCompactFileNumber(c) => {
553                target.level0_tier_compact_file_number = *c;
554            }
555            MutableConfig::TargetFileSizeBase(c) => {
556                target.target_file_size_base = *c;
557            }
558            MutableConfig::CompactionFilterMask(c) => {
559                target.compaction_filter_mask = *c;
560            }
561            MutableConfig::MaxSubCompaction(c) => {
562                target.max_sub_compaction = *c;
563            }
564            MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
565                target.level0_stop_write_threshold_sub_level_number = *c;
566            }
567            MutableConfig::Level0SubLevelCompactLevelCount(c) => {
568                target.level0_sub_level_compact_level_count = *c;
569            }
570            MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
571                target.level0_overlapping_sub_level_compact_level_count = *c;
572            }
573            MutableConfig::MaxSpaceReclaimBytes(c) => {
574                target.max_space_reclaim_bytes = *c;
575            }
576            MutableConfig::Level0MaxCompactFileNumber(c) => {
577                target.level0_max_compact_file_number = *c;
578            }
579            MutableConfig::EnableEmergencyPicker(c) => {
580                target.enable_emergency_picker = *c;
581            }
582            MutableConfig::TombstoneReclaimRatio(c) => {
583                target.tombstone_reclaim_ratio = *c;
584            }
585            MutableConfig::CompressionAlgorithm(c) => {
586                target.compression_algorithm[c.get_level() as usize]
587                    .clone_from(&c.compression_algorithm);
588            }
589            MutableConfig::MaxL0CompactLevelCount(c) => {
590                target.max_l0_compact_level_count = Some(*c);
591            }
592            MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
593                target.sst_allowed_trivial_move_min_size = Some(*c);
594            }
595            MutableConfig::SplitWeightByVnode(c) => {
596                target.split_weight_by_vnode = *c;
597            }
598            MutableConfig::DisableAutoGroupScheduling(c) => {
599                target.disable_auto_group_scheduling = Some(*c);
600            }
601            MutableConfig::MaxOverlappingLevelSize(c) => {
602                target.max_overlapping_level_size = Some(*c);
603            }
604            MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
605                target.sst_allowed_trivial_move_max_count = Some(*c);
606            }
607            MutableConfig::EmergencyLevel0SstFileCount(c) => {
608                target.emergency_level0_sst_file_count = Some(*c);
609            }
610            MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
611                target.emergency_level0_sub_level_partition = Some(*c);
612            }
613            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
614                target.level0_stop_write_threshold_max_sst_count = Some(*c);
615            }
616            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
617                target.level0_stop_write_threshold_max_size = Some(*c);
618            }
619            MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
620                target.enable_optimize_l0_interval_selection = Some(*c);
621            }
622            MutableConfig::VnodeAlignedLevelSizeThreshold(c) => {
623                target.vnode_aligned_level_size_threshold =
624                    (*c != u64::MIN && *c != u64::MAX).then_some(*c);
625            }
626        }
627    }
628}
629
630impl CompactionGroupTransaction<'_> {
631    /// Inserts compaction group configs if they do not exist.
632    pub fn try_create_compaction_groups(
633        &mut self,
634        compaction_group_ids: &[CompactionGroupId],
635        config: Arc<CompactionConfig>,
636    ) -> bool {
637        let mut trivial = true;
638        for id in compaction_group_ids {
639            if self.contains_key(id) {
640                continue;
641            }
642            let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
643            self.insert(*id, new_entry);
644
645            trivial = false;
646        }
647
648        !trivial
649    }
650
651    pub fn create_compaction_groups(
652        &mut self,
653        compaction_group_id: CompactionGroupId,
654        config: Arc<CompactionConfig>,
655    ) {
656        self.try_create_compaction_groups(&[compaction_group_id], config);
657    }
658
659    /// Tries to get compaction group config for `compaction_group_id`.
660    pub(crate) fn try_get_compaction_group_config(
661        &self,
662        compaction_group_id: CompactionGroupId,
663    ) -> Option<&CompactionGroup> {
664        self.get(&compaction_group_id)
665    }
666
667    /// Removes stale group configs.
668    pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
669        let stale_group = self
670            .tree_ref()
671            .keys()
672            .cloned()
673            .filter(|k| !existing_groups.contains(k))
674            .collect_vec();
675        if stale_group.is_empty() {
676            return;
677        }
678        for group in stale_group {
679            self.remove(group);
680        }
681    }
682
683    pub(crate) fn update_compaction_config(
684        &mut self,
685        compaction_group_ids: &[CompactionGroupId],
686        config_to_update: &[MutableConfig],
687    ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
688        let mut results = HashMap::default();
689        for compaction_group_id in compaction_group_ids.iter().unique() {
690            let group = self.get(compaction_group_id).ok_or_else(|| {
691                Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
692            })?;
693            let mut config = group.compaction_config.as_ref().clone();
694            update_compaction_config(&mut config, config_to_update);
695            if let Err(reason) = validate_compaction_config(&config) {
696                return Err(Error::CompactionGroup(reason));
697            }
698            let mut new_group = group.clone();
699            new_group.compaction_config = Arc::new(config);
700            self.insert(*compaction_group_id, new_group.clone());
701            results.insert(new_group.group_id(), new_group);
702        }
703
704        Ok(results)
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use std::collections::{BTreeMap, HashSet};
711
712    use itertools::Itertools;
713    use risingwave_common::id::JobId;
714    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
715
716    use crate::controller::SqlMetaStore;
717    use crate::hummock::commit_multi_var;
718    use crate::hummock::error::Result;
719    use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
720    use crate::hummock::test_utils::setup_compute_env;
721    use crate::model::{Fragment, StreamJobFragments};
722
723    #[tokio::test]
724    async fn test_inner() {
725        let (env, ..) = setup_compute_env(8080).await;
726        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
727        assert_eq!(inner.compaction_groups.len(), 2);
728
729        async fn update_compaction_config(
730            meta: &SqlMetaStore,
731            inner: &mut CompactionGroupManager,
732            cg_ids: &[u64],
733            config_to_update: &[MutableConfig],
734        ) -> Result<()> {
735            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
736            compaction_groups_txn.update_compaction_config(cg_ids, config_to_update)?;
737            commit_multi_var!(meta, compaction_groups_txn)
738        }
739
740        async fn insert_compaction_group_configs(
741            meta: &SqlMetaStore,
742            inner: &mut CompactionGroupManager,
743            cg_ids: &[u64],
744        ) {
745            let default_config = inner.default_compaction_config();
746            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
747            if compaction_groups_txn.try_create_compaction_groups(cg_ids, default_config) {
748                commit_multi_var!(meta, compaction_groups_txn).unwrap();
749            }
750        }
751
752        update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
753            .await
754            .unwrap_err();
755        insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
756        assert_eq!(inner.compaction_groups.len(), 4);
757        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
758        assert_eq!(inner.compaction_groups.len(), 4);
759
760        update_compaction_config(
761            env.meta_store_ref(),
762            &mut inner,
763            &[100, 200],
764            &[MutableConfig::MaxSubCompaction(123)],
765        )
766        .await
767        .unwrap();
768        assert_eq!(inner.compaction_groups.len(), 4);
769        assert_eq!(
770            inner
771                .try_get_compaction_group_config(100)
772                .unwrap()
773                .compaction_config
774                .max_sub_compaction,
775            123
776        );
777        assert_eq!(
778            inner
779                .try_get_compaction_group_config(200)
780                .unwrap()
781                .compaction_config
782                .max_sub_compaction,
783            123
784        );
785    }
786
787    #[tokio::test]
788    async fn test_manager() {
789        let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
790        let table_fragment_1 = StreamJobFragments::for_test(
791            JobId::new(10),
792            BTreeMap::from([(
793                1.into(),
794                Fragment {
795                    fragment_id: 1.into(),
796                    state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
797                    ..Default::default()
798                },
799            )]),
800        );
801        let table_fragment_2 = StreamJobFragments::for_test(
802            JobId::new(20),
803            BTreeMap::from([(
804                2.into(),
805                Fragment {
806                    fragment_id: 2.into(),
807                    state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
808                    ..Default::default()
809                },
810            )]),
811        );
812
813        // Test register_table_fragments
814        let registered_number = || async {
815            compaction_group_manager
816                .list_compaction_group()
817                .await
818                .iter()
819                .map(|cg| cg.member_table_ids.len())
820                .sum::<usize>()
821        };
822        let group_number =
823            || async { compaction_group_manager.list_compaction_group().await.len() };
824        assert_eq!(registered_number().await, 0);
825
826        compaction_group_manager
827            .register_table_fragments(
828                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
829                table_fragment_1
830                    .internal_table_ids()
831                    .into_iter()
832                    .map_into()
833                    .collect(),
834            )
835            .await
836            .unwrap();
837        assert_eq!(registered_number().await, 4);
838        compaction_group_manager
839            .register_table_fragments(
840                Some(table_fragment_2.stream_job_id().as_mv_table_id()),
841                table_fragment_2
842                    .internal_table_ids()
843                    .into_iter()
844                    .map_into()
845                    .collect(),
846            )
847            .await
848            .unwrap();
849        assert_eq!(registered_number().await, 8);
850
851        // Test unregister_table_fragments
852        compaction_group_manager
853            .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
854            .await;
855        assert_eq!(registered_number().await, 4);
856
857        // Test purge_stale_members: table fragments
858        compaction_group_manager
859            .purge(&table_fragment_2.all_table_ids().collect())
860            .await
861            .unwrap();
862        assert_eq!(registered_number().await, 4);
863        compaction_group_manager
864            .purge(&HashSet::new())
865            .await
866            .unwrap();
867        assert_eq!(registered_number().await, 0);
868
869        assert_eq!(group_number().await, 2);
870
871        compaction_group_manager
872            .register_table_fragments(
873                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
874                table_fragment_1
875                    .internal_table_ids()
876                    .into_iter()
877                    .map_into()
878                    .collect(),
879            )
880            .await
881            .unwrap();
882        assert_eq!(registered_number().await, 4);
883        assert_eq!(group_number().await, 2);
884
885        compaction_group_manager
886            .unregister_table_fragments_vec(&[table_fragment_1])
887            .await;
888        assert_eq!(registered_number().await, 0);
889        assert_eq!(group_number().await, 2);
890    }
891}