risingwave_meta/hummock/manager/compaction/
compaction_group_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
24use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37    CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48    BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54    pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55        let default_config = match env.opts.compaction_config.as_ref() {
56            None => CompactionConfigBuilder::new().build(),
57            Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58        };
59        Self::new_with_config(env, default_config).await
60    }
61
62    pub(crate) async fn new_with_config(
63        env: &MetaSrvEnv,
64        default_config: CompactionConfig,
65    ) -> Result<CompactionGroupManager> {
66        let mut compaction_group_manager = CompactionGroupManager {
67            compaction_groups: BTreeMap::new(),
68            default_config: Arc::new(default_config),
69            write_limit: Default::default(),
70        };
71
72        let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73            compaction_config::Entity::find()
74                .all(&env.meta_store_ref().conn)
75                .await
76                .map_err(MetadataModelError::from)?
77                .into_iter()
78                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79                .collect();
80
81        compaction_group_manager.init(loaded_compaction_groups);
82        Ok(compaction_group_manager)
83    }
84
85    fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86        if !loaded_compaction_groups.is_empty() {
87            self.compaction_groups = loaded_compaction_groups;
88        }
89    }
90}
91
92impl HummockManager {
93    /// Should not be called inside [`HummockManager`], because it requests locks internally.
94    /// The implementation acquires `versioning` lock.
95    pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96        get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97    }
98
99    /// The implementation acquires `compaction_group_manager` lock.
100    pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101        self.compaction_group_manager
102            .read()
103            .await
104            .compaction_groups
105            .clone()
106    }
107
108    #[cfg(test)]
109    /// Registers `table_fragments` to compaction groups.
110    pub async fn register_table_fragments(
111        &self,
112        mv_table: Option<u32>,
113        mut internal_tables: Vec<u32>,
114    ) -> Result<Vec<StateTableId>> {
115        let mut pairs = vec![];
116        if let Some(mv_table) = mv_table {
117            if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118                tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119            }
120            // materialized_view
121            pairs.push((
122                mv_table,
123                CompactionGroupId::from(StaticCompactionGroupId::MaterializedView),
124            ));
125        }
126        // internal states
127        for table_id in internal_tables {
128            pairs.push((
129                table_id,
130                CompactionGroupId::from(StaticCompactionGroupId::StateDefault),
131            ));
132        }
133        self.register_table_ids_for_test(&pairs).await?;
134        Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec())
135    }
136
137    #[cfg(test)]
138    /// Unregisters `table_fragments` from compaction groups
139    pub async fn unregister_table_fragments_vec(
140        &self,
141        table_fragments: &[crate::model::StreamJobFragments],
142    ) {
143        self.unregister_table_ids(
144            table_fragments
145                .iter()
146                .flat_map(|t| t.all_table_ids().map(TableId::new)),
147        )
148        .await
149        .unwrap();
150    }
151
152    /// Unregisters stale members and groups
153    /// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
154    /// Currently `purge` is only called during meta service start ups.
155    pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
156        let to_unregister = self
157            .versioning
158            .read()
159            .await
160            .current_version
161            .state_table_info
162            .info()
163            .keys()
164            .cloned()
165            .filter(|table_id| !valid_ids.contains(table_id))
166            .collect_vec();
167
168        // As we have released versioning lock, the version that `to_unregister` is calculated from
169        // may not be the same as the one used in unregister_table_ids. It is OK.
170        self.unregister_table_ids(to_unregister).await
171    }
172
173    /// The implementation acquires `versioning` lock.
174    ///
175    /// The method name is temporarily added with a `_for_test` prefix to mark
176    /// that it's currently only used in test.
177    pub async fn register_table_ids_for_test(
178        &self,
179        pairs: &[(StateTableId, CompactionGroupId)],
180    ) -> Result<()> {
181        if pairs.is_empty() {
182            return Ok(());
183        }
184        let mut versioning_guard = self.versioning.write().await;
185        let versioning = versioning_guard.deref_mut();
186        let mut compaction_group_manager = self.compaction_group_manager.write().await;
187        let current_version = &versioning.current_version;
188        let default_config = compaction_group_manager.default_compaction_config();
189        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
190
191        for (table_id, _) in pairs {
192            if let Some(info) = current_version
193                .state_table_info
194                .info()
195                .get(&TableId::new(*table_id))
196            {
197                return Err(Error::CompactionGroup(format!(
198                    "table {} already {:?}",
199                    *table_id, info
200                )));
201            }
202        }
203        // All NewCompactionGroup pairs are mapped to one new compaction group.
204        let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
205        let mut version = HummockVersionTransaction::new(
206            &mut versioning.current_version,
207            &mut versioning.hummock_version_deltas,
208            self.env.notification_manager(),
209            None,
210            &self.metrics,
211        );
212        let mut new_version_delta = version.new_delta();
213
214        let committed_epoch = new_version_delta
215            .latest_version()
216            .state_table_info
217            .info()
218            .values()
219            .map(|info| info.committed_epoch)
220            .max()
221            .unwrap_or(INVALID_EPOCH);
222
223        for (table_id, raw_group_id) in pairs {
224            let mut group_id = *raw_group_id;
225            if group_id == StaticCompactionGroupId::NewCompactionGroup as u64 {
226                let mut is_group_init = false;
227                group_id = *new_compaction_group_id
228                    .get_or_try_init(|| async {
229                        next_compaction_group_id(&self.env).await.inspect(|_| {
230                            is_group_init = true;
231                        })
232                    })
233                    .await?;
234                if is_group_init {
235                    let group_deltas = &mut new_version_delta
236                        .group_deltas
237                        .entry(group_id)
238                        .or_default()
239                        .group_deltas;
240
241                    let config =
242                        match compaction_groups_txn.try_get_compaction_group_config(group_id) {
243                            Some(config) => config.compaction_config.as_ref().clone(),
244                            None => {
245                                compaction_groups_txn
246                                    .create_compaction_groups(group_id, default_config.clone());
247                                default_config.as_ref().clone()
248                            }
249                        };
250
251                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
252                        group_config: Some(config),
253                        group_id,
254                        ..Default::default()
255                    }));
256
257                    group_deltas.push(group_delta);
258                }
259            }
260            assert!(
261                new_version_delta
262                    .state_table_info_delta
263                    .insert(
264                        TableId::new(*table_id),
265                        PbStateTableInfoDelta {
266                            committed_epoch,
267                            compaction_group_id: *raw_group_id,
268                        }
269                    )
270                    .is_none()
271            );
272        }
273        new_version_delta.pre_apply();
274        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
275
276        Ok(())
277    }
278
279    pub async fn unregister_table_ids(
280        &self,
281        table_ids: impl IntoIterator<Item = TableId> + Send,
282    ) -> Result<()> {
283        let table_ids = table_ids.into_iter().collect_vec();
284        if table_ids.is_empty() {
285            return Ok(());
286        }
287
288        {
289            // Remove table write throughput statistics
290            // The Caller acquires `Send`, so we should safely use `write` lock before the await point.
291            // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
292            let mut table_write_throughput_statistic_manager =
293                self.table_write_throughput_statistic_manager.write();
294            for table_id in table_ids.iter().unique() {
295                table_write_throughput_statistic_manager.remove_table(table_id.table_id);
296            }
297        }
298
299        let mut versioning_guard = self.versioning.write().await;
300        let versioning = versioning_guard.deref_mut();
301        let mut version = HummockVersionTransaction::new(
302            &mut versioning.current_version,
303            &mut versioning.hummock_version_deltas,
304            self.env.notification_manager(),
305            None,
306            &self.metrics,
307        );
308        let mut new_version_delta = version.new_delta();
309        let mut modified_groups: HashMap<CompactionGroupId, /* #member table */ u64> =
310            HashMap::new();
311        // Remove member tables
312        for table_id in table_ids.into_iter().unique() {
313            let version = new_version_delta.latest_version();
314            let Some(info) = version.state_table_info.info().get(&table_id) else {
315                continue;
316            };
317
318            modified_groups
319                .entry(info.compaction_group_id)
320                .and_modify(|count| *count -= 1)
321                .or_insert(
322                    version
323                        .state_table_info
324                        .compaction_group_member_tables()
325                        .get(&info.compaction_group_id)
326                        .expect("should exist")
327                        .len() as u64
328                        - 1,
329                );
330            new_version_delta.removed_table_ids.insert(table_id);
331        }
332
333        let groups_to_remove = modified_groups
334            .into_iter()
335            .filter_map(|(group_id, member_count)| {
336                if member_count == 0 && group_id > StaticCompactionGroupId::End as CompactionGroupId
337                {
338                    return Some((
339                        group_id,
340                        new_version_delta
341                            .latest_version()
342                            .get_compaction_group_levels(group_id)
343                            .levels
344                            .len(),
345                    ));
346                }
347                None
348            })
349            .collect_vec();
350        for (group_id, _) in &groups_to_remove {
351            let group_deltas = &mut new_version_delta
352                .group_deltas
353                .entry(*group_id)
354                .or_default()
355                .group_deltas;
356
357            let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
358            group_deltas.push(group_delta);
359        }
360
361        for (group_id, max_level) in groups_to_remove {
362            remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
363        }
364
365        new_version_delta.pre_apply();
366
367        // Purge may cause write to meta store. If it hurts performance while holding versioning
368        // lock, consider to make it in batch.
369        let mut compaction_group_manager = self.compaction_group_manager.write().await;
370        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
371
372        compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
373            version.latest_version(),
374        )));
375        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
376
377        // No need to handle DeltaType::GroupDestroy during time travel.
378        Ok(())
379    }
380
381    pub async fn update_compaction_config(
382        &self,
383        compaction_group_ids: &[CompactionGroupId],
384        config_to_update: &[MutableConfig],
385    ) -> Result<()> {
386        {
387            // Avoid lock conflicts with `try_update_write_limits``
388            let mut compaction_group_manager = self.compaction_group_manager.write().await;
389            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
390            compaction_groups_txn
391                .update_compaction_config(compaction_group_ids, config_to_update)?;
392            commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
393        }
394
395        if config_to_update
396            .iter()
397            .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
398        {
399            // Update write limits with lock
400            self.try_update_write_limits(compaction_group_ids).await;
401        }
402
403        Ok(())
404    }
405
406    /// Gets complete compaction group info.
407    /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
408    pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
409        let mut versioning_guard = self.versioning.write().await;
410        let versioning = versioning_guard.deref_mut();
411        let current_version = &versioning.current_version;
412        let mut results = vec![];
413        let compaction_group_manager = self.compaction_group_manager.read().await;
414
415        for levels in current_version.levels.values() {
416            let compaction_config = compaction_group_manager
417                .try_get_compaction_group_config(levels.group_id)
418                .unwrap()
419                .compaction_config
420                .as_ref()
421                .clone();
422            let group = CompactionGroupInfo {
423                id: levels.group_id,
424                parent_id: levels.parent_group_id,
425                member_table_ids: current_version
426                    .state_table_info
427                    .compaction_group_member_table_ids(levels.group_id)
428                    .iter()
429                    .map(|table_id| table_id.table_id)
430                    .collect_vec(),
431                compaction_config: Some(compaction_config),
432            };
433            results.push(group);
434        }
435        results
436    }
437
438    pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
439        let mut infos = vec![];
440        {
441            let versioning_guard = self.versioning.read().await;
442            let manager = self.compaction_group_manager.read().await;
443            let version = &versioning_guard.current_version;
444            for group_id in version.levels.keys() {
445                let mut group_info = CompactionGroupStatistic {
446                    group_id: *group_id,
447                    ..Default::default()
448                };
449                for table_id in version
450                    .state_table_info
451                    .compaction_group_member_table_ids(*group_id)
452                {
453                    let stats_size = versioning_guard
454                        .version_stats
455                        .table_stats
456                        .get(&table_id.table_id)
457                        .map(|stats| stats.total_key_size + stats.total_value_size)
458                        .unwrap_or(0);
459                    let table_size = std::cmp::max(stats_size, 0) as u64;
460                    group_info.group_size += table_size;
461                    group_info
462                        .table_statistic
463                        .insert(table_id.table_id, table_size);
464                    group_info.compaction_group_config =
465                        manager.try_get_compaction_group_config(*group_id).unwrap();
466                }
467                infos.push(group_info);
468            }
469        };
470        infos
471    }
472
473    pub(crate) async fn initial_compaction_group_config_after_load(
474        &self,
475        versioning_guard: &Versioning,
476        compaction_group_manager: &mut CompactionGroupManager,
477    ) -> Result<()> {
478        // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts.
479        let current_version = &versioning_guard.current_version;
480        let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
481        let default_config = compaction_group_manager.default_compaction_config();
482        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
483        compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
484        commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
485
486        Ok(())
487    }
488}
489
490/// We muse ensure there is an entry exists in [`CompactionGroupManager`] for any
491/// compaction group found in current hummock version. That's done by invoking
492/// `get_or_insert_compaction_group_config` or `get_or_insert_compaction_group_configs` before
493/// adding any group in current hummock version:
494/// 1. initialize default static compaction group.
495/// 2. register new table to new compaction group.
496/// 3. move existent table to new compaction group.
497pub(crate) struct CompactionGroupManager {
498    compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
499    default_config: Arc<CompactionConfig>,
500    /// Tables that write limit is trigger for.
501    pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
502}
503
504impl CompactionGroupManager {
505    /// Starts a transaction to update compaction group configs.
506    pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
507        CompactionGroupTransaction::new(&mut self.compaction_groups)
508    }
509
510    #[expect(clippy::type_complexity)]
511    pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
512        inner: P,
513    ) -> BTreeMapTransactionInner<
514        CompactionGroupId,
515        CompactionGroup,
516        DerefMutForward<
517            Self,
518            BTreeMap<CompactionGroupId, CompactionGroup>,
519            P,
520            impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
521            impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
522        >,
523    > {
524        BTreeMapTransactionInner::new(DerefMutForward::new(
525            inner,
526            |mgr| &mgr.compaction_groups,
527            |mgr| &mut mgr.compaction_groups,
528        ))
529    }
530
531    /// Tries to get compaction group config for `compaction_group_id`.
532    pub(crate) fn try_get_compaction_group_config(
533        &self,
534        compaction_group_id: CompactionGroupId,
535    ) -> Option<CompactionGroup> {
536        self.compaction_groups.get(&compaction_group_id).cloned()
537    }
538
539    /// Tries to get compaction group config for `compaction_group_id`.
540    pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
541        self.default_config.clone()
542    }
543}
544
545fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
546    for item in items {
547        match item {
548            MutableConfig::MaxBytesForLevelBase(c) => {
549                target.max_bytes_for_level_base = *c;
550            }
551            MutableConfig::MaxBytesForLevelMultiplier(c) => {
552                target.max_bytes_for_level_multiplier = *c;
553            }
554            MutableConfig::MaxCompactionBytes(c) => {
555                target.max_compaction_bytes = *c;
556            }
557            MutableConfig::SubLevelMaxCompactionBytes(c) => {
558                target.sub_level_max_compaction_bytes = *c;
559            }
560            MutableConfig::Level0TierCompactFileNumber(c) => {
561                target.level0_tier_compact_file_number = *c;
562            }
563            MutableConfig::TargetFileSizeBase(c) => {
564                target.target_file_size_base = *c;
565            }
566            MutableConfig::CompactionFilterMask(c) => {
567                target.compaction_filter_mask = *c;
568            }
569            MutableConfig::MaxSubCompaction(c) => {
570                target.max_sub_compaction = *c;
571            }
572            MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
573                target.level0_stop_write_threshold_sub_level_number = *c;
574            }
575            MutableConfig::Level0SubLevelCompactLevelCount(c) => {
576                target.level0_sub_level_compact_level_count = *c;
577            }
578            MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
579                target.level0_overlapping_sub_level_compact_level_count = *c;
580            }
581            MutableConfig::MaxSpaceReclaimBytes(c) => {
582                target.max_space_reclaim_bytes = *c;
583            }
584            MutableConfig::Level0MaxCompactFileNumber(c) => {
585                target.level0_max_compact_file_number = *c;
586            }
587            MutableConfig::EnableEmergencyPicker(c) => {
588                target.enable_emergency_picker = *c;
589            }
590            MutableConfig::TombstoneReclaimRatio(c) => {
591                target.tombstone_reclaim_ratio = *c;
592            }
593            MutableConfig::CompressionAlgorithm(c) => {
594                target.compression_algorithm[c.get_level() as usize]
595                    .clone_from(&c.compression_algorithm);
596            }
597            MutableConfig::MaxL0CompactLevelCount(c) => {
598                target.max_l0_compact_level_count = Some(*c);
599            }
600            MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
601                target.sst_allowed_trivial_move_min_size = Some(*c);
602            }
603            MutableConfig::SplitWeightByVnode(c) => {
604                target.split_weight_by_vnode = *c;
605            }
606            MutableConfig::DisableAutoGroupScheduling(c) => {
607                target.disable_auto_group_scheduling = Some(*c);
608            }
609            MutableConfig::MaxOverlappingLevelSize(c) => {
610                target.max_overlapping_level_size = Some(*c);
611            }
612            MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
613                target.sst_allowed_trivial_move_max_count = Some(*c);
614            }
615            MutableConfig::EmergencyLevel0SstFileCount(c) => {
616                target.emergency_level0_sst_file_count = Some(*c);
617            }
618            MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
619                target.emergency_level0_sub_level_partition = Some(*c);
620            }
621            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
622                target.level0_stop_write_threshold_max_sst_count = Some(*c);
623            }
624            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
625                target.level0_stop_write_threshold_max_size = Some(*c);
626            }
627        }
628    }
629}
630
631impl CompactionGroupTransaction<'_> {
632    /// Inserts compaction group configs if they do not exist.
633    pub fn try_create_compaction_groups(
634        &mut self,
635        compaction_group_ids: &[CompactionGroupId],
636        config: Arc<CompactionConfig>,
637    ) -> bool {
638        let mut trivial = true;
639        for id in compaction_group_ids {
640            if self.contains_key(id) {
641                continue;
642            }
643            let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
644            self.insert(*id, new_entry);
645
646            trivial = false;
647        }
648
649        !trivial
650    }
651
652    pub fn create_compaction_groups(
653        &mut self,
654        compaction_group_id: CompactionGroupId,
655        config: Arc<CompactionConfig>,
656    ) {
657        self.try_create_compaction_groups(&[compaction_group_id], config);
658    }
659
660    /// Tries to get compaction group config for `compaction_group_id`.
661    pub(crate) fn try_get_compaction_group_config(
662        &self,
663        compaction_group_id: CompactionGroupId,
664    ) -> Option<&CompactionGroup> {
665        self.get(&compaction_group_id)
666    }
667
668    /// Removes stale group configs.
669    pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
670        let stale_group = self
671            .tree_ref()
672            .keys()
673            .cloned()
674            .filter(|k| !existing_groups.contains(k))
675            .collect_vec();
676        if stale_group.is_empty() {
677            return;
678        }
679        for group in stale_group {
680            self.remove(group);
681        }
682    }
683
684    pub(crate) fn update_compaction_config(
685        &mut self,
686        compaction_group_ids: &[CompactionGroupId],
687        config_to_update: &[MutableConfig],
688    ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
689        let mut results = HashMap::default();
690        for compaction_group_id in compaction_group_ids.iter().unique() {
691            let group = self.get(compaction_group_id).ok_or_else(|| {
692                Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
693            })?;
694            let mut config = group.compaction_config.as_ref().clone();
695            update_compaction_config(&mut config, config_to_update);
696            if let Err(reason) = validate_compaction_config(&config) {
697                return Err(Error::CompactionGroup(reason));
698            }
699            let mut new_group = group.clone();
700            new_group.compaction_config = Arc::new(config);
701            self.insert(*compaction_group_id, new_group.clone());
702            results.insert(new_group.group_id(), new_group);
703        }
704
705        Ok(results)
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use std::collections::{BTreeMap, HashSet};
712
713    use risingwave_common::catalog::TableId;
714    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
715
716    use crate::controller::SqlMetaStore;
717    use crate::hummock::commit_multi_var;
718    use crate::hummock::error::Result;
719    use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
720    use crate::hummock::test_utils::setup_compute_env;
721    use crate::model::{Fragment, StreamJobFragments};
722
723    #[tokio::test]
724    async fn test_inner() {
725        let (env, ..) = setup_compute_env(8080).await;
726        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
727        assert_eq!(inner.compaction_groups.len(), 2);
728
729        async fn update_compaction_config(
730            meta: &SqlMetaStore,
731            inner: &mut CompactionGroupManager,
732            cg_ids: &[u64],
733            config_to_update: &[MutableConfig],
734        ) -> Result<()> {
735            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
736            compaction_groups_txn.update_compaction_config(cg_ids, config_to_update)?;
737            commit_multi_var!(meta, compaction_groups_txn)
738        }
739
740        async fn insert_compaction_group_configs(
741            meta: &SqlMetaStore,
742            inner: &mut CompactionGroupManager,
743            cg_ids: &[u64],
744        ) {
745            let default_config = inner.default_compaction_config();
746            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
747            if compaction_groups_txn.try_create_compaction_groups(cg_ids, default_config) {
748                commit_multi_var!(meta, compaction_groups_txn).unwrap();
749            }
750        }
751
752        update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
753            .await
754            .unwrap_err();
755        insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
756        assert_eq!(inner.compaction_groups.len(), 4);
757        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
758        assert_eq!(inner.compaction_groups.len(), 4);
759
760        update_compaction_config(
761            env.meta_store_ref(),
762            &mut inner,
763            &[100, 200],
764            &[MutableConfig::MaxSubCompaction(123)],
765        )
766        .await
767        .unwrap();
768        assert_eq!(inner.compaction_groups.len(), 4);
769        assert_eq!(
770            inner
771                .try_get_compaction_group_config(100)
772                .unwrap()
773                .compaction_config
774                .max_sub_compaction,
775            123
776        );
777        assert_eq!(
778            inner
779                .try_get_compaction_group_config(200)
780                .unwrap()
781                .compaction_config
782                .max_sub_compaction,
783            123
784        );
785    }
786
787    #[tokio::test]
788    async fn test_manager() {
789        let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
790        let table_fragment_1 = StreamJobFragments::for_test(
791            TableId::new(10),
792            BTreeMap::from([(
793                1,
794                Fragment {
795                    fragment_id: 1,
796                    state_table_ids: vec![10, 11, 12, 13],
797                    ..Default::default()
798                },
799            )]),
800        );
801        let table_fragment_2 = StreamJobFragments::for_test(
802            TableId::new(20),
803            BTreeMap::from([(
804                2,
805                Fragment {
806                    fragment_id: 2,
807                    state_table_ids: vec![20, 21, 22, 23],
808                    ..Default::default()
809                },
810            )]),
811        );
812
813        // Test register_table_fragments
814        let registered_number = || async {
815            compaction_group_manager
816                .list_compaction_group()
817                .await
818                .iter()
819                .map(|cg| cg.member_table_ids.len())
820                .sum::<usize>()
821        };
822        let group_number =
823            || async { compaction_group_manager.list_compaction_group().await.len() };
824        assert_eq!(registered_number().await, 0);
825
826        compaction_group_manager
827            .register_table_fragments(
828                Some(table_fragment_1.stream_job_id().table_id),
829                table_fragment_1.internal_table_ids(),
830            )
831            .await
832            .unwrap();
833        assert_eq!(registered_number().await, 4);
834        compaction_group_manager
835            .register_table_fragments(
836                Some(table_fragment_2.stream_job_id().table_id),
837                table_fragment_2.internal_table_ids(),
838            )
839            .await
840            .unwrap();
841        assert_eq!(registered_number().await, 8);
842
843        // Test unregister_table_fragments
844        compaction_group_manager
845            .unregister_table_fragments_vec(&[table_fragment_1.clone()])
846            .await;
847        assert_eq!(registered_number().await, 4);
848
849        // Test purge_stale_members: table fragments
850        compaction_group_manager
851            .purge(&table_fragment_2.all_table_ids().map(TableId::new).collect())
852            .await
853            .unwrap();
854        assert_eq!(registered_number().await, 4);
855        compaction_group_manager
856            .purge(&HashSet::new())
857            .await
858            .unwrap();
859        assert_eq!(registered_number().await, 0);
860
861        assert_eq!(group_number().await, 2);
862
863        compaction_group_manager
864            .register_table_fragments(
865                Some(table_fragment_1.stream_job_id().table_id),
866                table_fragment_1.internal_table_ids(),
867            )
868            .await
869            .unwrap();
870        assert_eq!(registered_number().await, 4);
871        assert_eq!(group_number().await, 2);
872
873        compaction_group_manager
874            .unregister_table_fragments_vec(&[table_fragment_1])
875            .await;
876        assert_eq!(registered_number().await, 0);
877        assert_eq!(group_number().await, 2);
878    }
879}