risingwave_meta/hummock/manager/compaction/
compaction_group_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30    CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37    CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_metrics;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48    BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54    pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55        let default_config = match env.opts.compaction_config.as_ref() {
56            None => CompactionConfigBuilder::new().build(),
57            Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58        };
59        Self::new_with_config(env, default_config).await
60    }
61
62    pub(crate) async fn new_with_config(
63        env: &MetaSrvEnv,
64        default_config: CompactionConfig,
65    ) -> Result<CompactionGroupManager> {
66        let mut compaction_group_manager = CompactionGroupManager {
67            compaction_groups: BTreeMap::new(),
68            default_config: Arc::new(default_config),
69            write_limit: Default::default(),
70        };
71
72        let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73            compaction_config::Entity::find()
74                .all(&env.meta_store_ref().conn)
75                .await
76                .map_err(MetadataModelError::from)?
77                .into_iter()
78                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79                .collect();
80
81        compaction_group_manager.init(loaded_compaction_groups);
82        Ok(compaction_group_manager)
83    }
84
85    fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86        if !loaded_compaction_groups.is_empty() {
87            self.compaction_groups = loaded_compaction_groups;
88        }
89    }
90}
91
92impl HummockManager {
93    /// Should not be called inside [`HummockManager`], because it requests locks internally.
94    /// The implementation acquires `versioning` lock.
95    pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96        get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97    }
98
99    /// The implementation acquires `compaction_group_manager` lock.
100    pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101        self.compaction_group_manager
102            .read()
103            .await
104            .compaction_groups
105            .clone()
106    }
107
108    #[cfg(test)]
109    /// Registers `table_fragments` to compaction groups.
110    pub async fn register_table_fragments(
111        &self,
112        mv_table: Option<TableId>,
113        mut internal_tables: Vec<TableId>,
114    ) -> Result<()> {
115        let mut pairs = vec![];
116        if let Some(mv_table) = mv_table {
117            if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118                tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119            }
120            // materialized_view
121            pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
122        }
123        // internal states
124        for table_id in internal_tables {
125            pairs.push((table_id, StaticCompactionGroupId::StateDefault));
126        }
127        self.register_table_ids_for_test(&pairs).await?;
128        Ok(())
129    }
130
131    #[cfg(test)]
132    /// Unregisters `table_fragments` from compaction groups
133    pub async fn unregister_table_fragments_vec(
134        &self,
135        table_fragments: &[crate::model::StreamJobFragments],
136    ) {
137        self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
138            .await
139            .unwrap();
140    }
141
142    /// Unregisters stale members and groups
143    /// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
144    /// Currently `purge` is only called during meta service start ups.
145    pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
146        let to_unregister = self
147            .versioning
148            .read()
149            .await
150            .current_version
151            .state_table_info
152            .info()
153            .keys()
154            .cloned()
155            .filter(|table_id| !valid_ids.contains(table_id))
156            .collect_vec();
157
158        // As we have released versioning lock, the version that `to_unregister` is calculated from
159        // may not be the same as the one used in unregister_table_ids. It is OK.
160        self.unregister_table_ids(to_unregister).await
161    }
162
163    /// The implementation acquires `versioning` lock.
164    ///
165    /// The method name is temporarily added with a `_for_test` prefix to mark
166    /// that it's currently only used in test.
167    pub async fn register_table_ids_for_test(
168        &self,
169        pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
170    ) -> Result<()> {
171        if pairs.is_empty() {
172            return Ok(());
173        }
174        let mut versioning_guard = self.versioning.write().await;
175        let versioning = versioning_guard.deref_mut();
176        let mut compaction_group_manager = self.compaction_group_manager.write().await;
177        let current_version = &versioning.current_version;
178        let default_config = compaction_group_manager.default_compaction_config();
179        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
180
181        for (table_id, _) in pairs {
182            let table_id = (*table_id).into();
183            if let Some(info) = current_version.state_table_info.info().get(&table_id) {
184                return Err(Error::CompactionGroup(format!(
185                    "table {} already {:?}",
186                    table_id, info
187                )));
188            }
189        }
190        // All NewCompactionGroup pairs are mapped to one new compaction group.
191        let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
192        let mut version = HummockVersionTransaction::new(
193            &mut versioning.current_version,
194            &mut versioning.hummock_version_deltas,
195            &mut versioning.table_change_log,
196            self.env.notification_manager(),
197            None,
198            &self.metrics,
199            &self.env.opts,
200        );
201        let mut new_version_delta = version.new_delta();
202
203        let committed_epoch = new_version_delta
204            .latest_version()
205            .state_table_info
206            .info()
207            .values()
208            .map(|info| info.committed_epoch)
209            .max()
210            .unwrap_or(INVALID_EPOCH);
211
212        for (table_id, raw_group_id) in pairs {
213            let table_id = (*table_id).into();
214            let mut group_id = *raw_group_id;
215            if group_id == StaticCompactionGroupId::NewCompactionGroup {
216                let mut is_group_init = false;
217                group_id = *new_compaction_group_id
218                    .get_or_try_init(|| async {
219                        next_compaction_group_id(&self.env).await.inspect(|_| {
220                            is_group_init = true;
221                        })
222                    })
223                    .await?;
224                if is_group_init {
225                    let group_deltas = &mut new_version_delta
226                        .group_deltas
227                        .entry(group_id)
228                        .or_default()
229                        .group_deltas;
230
231                    let config =
232                        match compaction_groups_txn.try_get_compaction_group_config(group_id) {
233                            Some(config) => config.compaction_config.as_ref().clone(),
234                            None => {
235                                compaction_groups_txn
236                                    .create_compaction_groups(group_id, default_config.clone());
237                                default_config.as_ref().clone()
238                            }
239                        };
240
241                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
242                        group_config: Some(config),
243                        group_id,
244                        ..Default::default()
245                    }));
246
247                    group_deltas.push(group_delta);
248                }
249            }
250            assert!(
251                new_version_delta
252                    .state_table_info_delta
253                    .insert(
254                        table_id,
255                        PbStateTableInfoDelta {
256                            committed_epoch,
257                            compaction_group_id: *raw_group_id,
258                        }
259                    )
260                    .is_none()
261            );
262        }
263        new_version_delta.pre_apply();
264        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
265
266        Ok(())
267    }
268
269    pub async fn unregister_table_ids(
270        &self,
271        table_ids: impl IntoIterator<Item = TableId>,
272    ) -> Result<()> {
273        let table_ids = table_ids.into_iter().collect_vec();
274        if table_ids.is_empty() {
275            return Ok(());
276        }
277
278        {
279            // Remove table write throughput statistics
280            // The Caller acquires `Send`, so we should safely use `write` lock before the await point.
281            // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
282            let mut table_write_throughput_statistic_manager =
283                self.table_write_throughput_statistic_manager.write();
284            for &table_id in table_ids.iter().unique() {
285                table_write_throughput_statistic_manager.remove_table(table_id);
286            }
287        }
288
289        let mut versioning_guard = self.versioning.write().await;
290        let versioning = versioning_guard.deref_mut();
291        let mut version = HummockVersionTransaction::new(
292            &mut versioning.current_version,
293            &mut versioning.hummock_version_deltas,
294            &mut versioning.table_change_log,
295            self.env.notification_manager(),
296            None,
297            &self.metrics,
298            &self.env.opts,
299        );
300        let mut new_version_delta = version.new_delta();
301        struct UnregisterGroupChange {
302            remaining_member_count: usize,
303            removed_table_ids: HashSet<TableId>,
304        }
305        let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
306        // Remove member tables
307        for table_id in table_ids.into_iter().unique() {
308            let version = new_version_delta.latest_version();
309            let Some(info) = version.state_table_info.info().get(&table_id) else {
310                continue;
311            };
312            let compaction_group_id = info.compaction_group_id;
313
314            let group_change =
315                group_changes
316                    .entry(compaction_group_id)
317                    .or_insert_with(|| UnregisterGroupChange {
318                        remaining_member_count: version
319                            .state_table_info
320                            .compaction_group_member_tables()
321                            .get(&compaction_group_id)
322                            .expect("should exist")
323                            .len(),
324                        removed_table_ids: HashSet::new(),
325                    });
326            group_change.remaining_member_count = group_change
327                .remaining_member_count
328                .checked_sub(1)
329                .expect("member table count should be positive");
330            assert!(group_change.removed_table_ids.insert(table_id));
331            new_version_delta.removed_table_ids.insert(table_id);
332        }
333
334        for (group_id, change) in group_changes {
335            if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
336                let max_level = new_version_delta
337                    .latest_version()
338                    .get_compaction_group_levels(group_id)
339                    .levels
340                    .len();
341                new_version_delta
342                    .group_deltas
343                    .entry(group_id)
344                    .or_default()
345                    .group_deltas
346                    .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
347                remove_compaction_group_metrics(&self.metrics, group_id, max_level);
348                // clean up compaction schedule state for the removed group
349                self.compaction_state.remove_compaction_group(group_id);
350            } else {
351                new_version_delta
352                    .group_deltas
353                    .entry(group_id)
354                    .or_default()
355                    .group_deltas
356                    .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
357            }
358        }
359
360        new_version_delta.pre_apply();
361
362        // Purge may cause write to meta store. If it hurts performance while holding versioning
363        // lock, consider to make it in batch.
364        let mut compaction_group_manager = self.compaction_group_manager.write().await;
365        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
366
367        compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
368            version.latest_version(),
369        )));
370        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
371
372        // No need to handle DeltaType::GroupDestroy during time travel.
373        Ok(())
374    }
375
376    pub async fn update_compaction_config(
377        &self,
378        compaction_group_ids: &[CompactionGroupId],
379        config_to_update: &[MutableConfig],
380    ) -> Result<()> {
381        {
382            // Avoid lock conflicts with `try_update_write_limits``
383            let mut compaction_group_manager = self.compaction_group_manager.write().await;
384            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
385            compaction_groups_txn
386                .update_compaction_config(compaction_group_ids, config_to_update)?;
387            commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
388        }
389
390        if config_to_update
391            .iter()
392            .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
393        {
394            // Update write limits with lock
395            self.try_update_write_limits(compaction_group_ids).await;
396        }
397
398        Ok(())
399    }
400
401    /// Gets complete compaction group info.
402    /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
403    pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
404        let mut versioning_guard = self.versioning.write().await;
405        let versioning = versioning_guard.deref_mut();
406        let current_version = &versioning.current_version;
407        let mut results = vec![];
408        let compaction_group_manager = self.compaction_group_manager.read().await;
409
410        for levels in current_version.levels.values() {
411            let compaction_config = compaction_group_manager
412                .try_get_compaction_group_config(levels.group_id)
413                .unwrap()
414                .compaction_config
415                .as_ref()
416                .clone();
417            let group = CompactionGroupInfo {
418                id: levels.group_id,
419                parent_id: levels.parent_group_id,
420                member_table_ids: current_version
421                    .state_table_info
422                    .compaction_group_member_table_ids(levels.group_id)
423                    .iter()
424                    .copied()
425                    .collect_vec(),
426                compaction_config: Some(compaction_config),
427            };
428            results.push(group);
429        }
430        results
431    }
432
433    pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
434        let mut infos = vec![];
435        {
436            let versioning_guard = self.versioning.read().await;
437            let manager = self.compaction_group_manager.read().await;
438            let version = &versioning_guard.current_version;
439            for group_id in version.levels.keys() {
440                let mut group_info = CompactionGroupStatistic {
441                    group_id: *group_id,
442                    ..Default::default()
443                };
444                for table_id in version
445                    .state_table_info
446                    .compaction_group_member_table_ids(*group_id)
447                {
448                    let stats_size = versioning_guard
449                        .version_stats
450                        .table_stats
451                        .get(table_id)
452                        .map(|stats| stats.total_key_size + stats.total_value_size)
453                        .unwrap_or(0);
454                    let table_size = std::cmp::max(stats_size, 0) as u64;
455                    group_info.group_size += table_size;
456                    group_info.table_statistic.insert(*table_id, table_size);
457                    group_info.compaction_group_config =
458                        manager.try_get_compaction_group_config(*group_id).unwrap();
459                }
460                infos.push(group_info);
461            }
462        };
463        infos
464    }
465
466    pub(crate) async fn initial_compaction_group_config_after_load(
467        &self,
468        versioning_guard: &Versioning,
469        compaction_group_manager: &mut CompactionGroupManager,
470    ) -> Result<()> {
471        // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts.
472        let current_version = &versioning_guard.current_version;
473        let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
474        let default_config = compaction_group_manager.default_compaction_config();
475        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
476        compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
477        commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
478
479        Ok(())
480    }
481}
482
483/// We muse ensure there is an entry exists in [`CompactionGroupManager`] for any
484/// compaction group found in current hummock version. That's done by invoking
485/// `get_or_insert_compaction_group_config` or `get_or_insert_compaction_group_configs` before
486/// adding any group in current hummock version:
487/// 1. initialize default static compaction group.
488/// 2. register new table to new compaction group.
489/// 3. move existent table to new compaction group.
490pub(crate) struct CompactionGroupManager {
491    compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
492    default_config: Arc<CompactionConfig>,
493    /// Tables that write limit is trigger for.
494    pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
495}
496
497impl CompactionGroupManager {
498    /// Starts a transaction to update compaction group configs.
499    pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
500        CompactionGroupTransaction::new(&mut self.compaction_groups)
501    }
502
503    #[expect(clippy::type_complexity)]
504    pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
505        inner: P,
506    ) -> BTreeMapTransactionInner<
507        CompactionGroupId,
508        CompactionGroup,
509        DerefMutForward<
510            Self,
511            BTreeMap<CompactionGroupId, CompactionGroup>,
512            P,
513            impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
514            impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
515        >,
516    > {
517        BTreeMapTransactionInner::new(DerefMutForward::new(
518            inner,
519            |mgr| &mgr.compaction_groups,
520            |mgr| &mut mgr.compaction_groups,
521        ))
522    }
523
524    /// Tries to get compaction group config for `compaction_group_id`.
525    pub(crate) fn try_get_compaction_group_config(
526        &self,
527        compaction_group_id: impl Into<CompactionGroupId>,
528    ) -> Option<CompactionGroup> {
529        self.compaction_groups
530            .get(&compaction_group_id.into())
531            .cloned()
532    }
533
534    /// Tries to get compaction group config for `compaction_group_id`.
535    pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
536        self.default_config.clone()
537    }
538}
539
540fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
541    for item in items {
542        match item {
543            MutableConfig::MaxBytesForLevelBase(c) => {
544                target.max_bytes_for_level_base = *c;
545            }
546            MutableConfig::MaxBytesForLevelMultiplier(c) => {
547                target.max_bytes_for_level_multiplier = *c;
548            }
549            MutableConfig::MaxCompactionBytes(c) => {
550                target.max_compaction_bytes = *c;
551            }
552            MutableConfig::SubLevelMaxCompactionBytes(c) => {
553                target.sub_level_max_compaction_bytes = *c;
554            }
555            MutableConfig::Level0TierCompactFileNumber(c) => {
556                target.level0_tier_compact_file_number = *c;
557            }
558            MutableConfig::TargetFileSizeBase(c) => {
559                target.target_file_size_base = *c;
560            }
561            MutableConfig::CompactionFilterMask(c) => {
562                target.compaction_filter_mask = *c;
563            }
564            MutableConfig::MaxSubCompaction(c) => {
565                target.max_sub_compaction = *c;
566            }
567            MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
568                target.level0_stop_write_threshold_sub_level_number = *c;
569            }
570            MutableConfig::Level0SubLevelCompactLevelCount(c) => {
571                target.level0_sub_level_compact_level_count = *c;
572            }
573            MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
574                target.level0_overlapping_sub_level_compact_level_count = *c;
575            }
576            MutableConfig::MaxSpaceReclaimBytes(c) => {
577                target.max_space_reclaim_bytes = *c;
578            }
579            MutableConfig::Level0MaxCompactFileNumber(c) => {
580                target.level0_max_compact_file_number = *c;
581            }
582            MutableConfig::EnableEmergencyPicker(c) => {
583                target.enable_emergency_picker = *c;
584            }
585            MutableConfig::TombstoneReclaimRatio(c) => {
586                target.tombstone_reclaim_ratio = *c;
587            }
588            MutableConfig::CompressionAlgorithm(c) => {
589                target.compression_algorithm[c.get_level() as usize]
590                    .clone_from(&c.compression_algorithm);
591            }
592            MutableConfig::MaxL0CompactLevelCount(c) => {
593                target.max_l0_compact_level_count = Some(*c);
594            }
595            MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
596                target.sst_allowed_trivial_move_min_size = Some(*c);
597            }
598            MutableConfig::SplitWeightByVnode(c) => {
599                target.split_weight_by_vnode = *c;
600            }
601            MutableConfig::DisableAutoGroupScheduling(c) => {
602                target.disable_auto_group_scheduling = Some(*c);
603            }
604            MutableConfig::MaxOverlappingLevelSize(c) => {
605                target.max_overlapping_level_size = Some(*c);
606            }
607            MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
608                target.sst_allowed_trivial_move_max_count = Some(*c);
609            }
610            MutableConfig::EmergencyLevel0SstFileCount(c) => {
611                target.emergency_level0_sst_file_count = Some(*c);
612            }
613            MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
614                target.emergency_level0_sub_level_partition = Some(*c);
615            }
616            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
617                target.level0_stop_write_threshold_max_sst_count = Some(*c);
618            }
619            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
620                target.level0_stop_write_threshold_max_size = Some(*c);
621            }
622            MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
623                target.enable_optimize_l0_interval_selection = Some(*c);
624            }
625            #[expect(deprecated)]
626            MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
627                // Deprecated. Keep accepting the field for old clients but do not apply it.
628            }
629            MutableConfig::MaxKvCountForXor16(c) => {
630                target.max_kv_count_for_xor16 = (*c != u64::MIN && *c != u64::MAX).then_some(*c);
631            }
632            MutableConfig::MaxVnodeKeyRangeBytes(c) => {
633                target.max_vnode_key_range_bytes = (*c > 0).then_some(*c);
634            }
635        }
636    }
637}
638
639impl CompactionGroupTransaction<'_> {
640    /// Inserts compaction group configs if they do not exist.
641    pub fn try_create_compaction_groups(
642        &mut self,
643        compaction_group_ids: &[CompactionGroupId],
644        config: Arc<CompactionConfig>,
645    ) -> bool {
646        let mut trivial = true;
647        for id in compaction_group_ids {
648            if self.contains_key(id) {
649                continue;
650            }
651            let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
652            self.insert(*id, new_entry);
653
654            trivial = false;
655        }
656
657        !trivial
658    }
659
660    pub fn create_compaction_groups(
661        &mut self,
662        compaction_group_id: CompactionGroupId,
663        config: Arc<CompactionConfig>,
664    ) {
665        self.try_create_compaction_groups(&[compaction_group_id], config);
666    }
667
668    /// Tries to get compaction group config for `compaction_group_id`.
669    pub(crate) fn try_get_compaction_group_config(
670        &self,
671        compaction_group_id: CompactionGroupId,
672    ) -> Option<&CompactionGroup> {
673        self.get(&compaction_group_id)
674    }
675
676    /// Removes stale group configs.
677    pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
678        let stale_group = self
679            .tree_ref()
680            .keys()
681            .cloned()
682            .filter(|k| !existing_groups.contains(k))
683            .collect_vec();
684        if stale_group.is_empty() {
685            return;
686        }
687        for group in stale_group {
688            self.remove(group);
689        }
690    }
691
692    pub(crate) fn update_compaction_config(
693        &mut self,
694        compaction_group_ids: &[CompactionGroupId],
695        config_to_update: &[MutableConfig],
696    ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
697        let mut results = HashMap::default();
698        for compaction_group_id in compaction_group_ids.iter().unique() {
699            let group = self.get(compaction_group_id).ok_or_else(|| {
700                Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
701            })?;
702            let mut config = group.compaction_config.as_ref().clone();
703            update_compaction_config(&mut config, config_to_update);
704            if let Err(reason) = validate_compaction_config(&config) {
705                return Err(Error::CompactionGroup(reason));
706            }
707            let mut new_group = group.clone();
708            new_group.compaction_config = Arc::new(config);
709            self.insert(*compaction_group_id, new_group.clone());
710            results.insert(new_group.group_id(), new_group);
711        }
712
713        Ok(results)
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use std::collections::{BTreeMap, HashSet};
720
721    use itertools::Itertools;
722    use risingwave_common::id::JobId;
723    use risingwave_hummock_sdk::CompactionGroupId;
724    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
725
726    use crate::controller::SqlMetaStore;
727    use crate::hummock::commit_multi_var;
728    use crate::hummock::error::Result;
729    use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
730    use crate::hummock::test_utils::setup_compute_env;
731    use crate::model::{Fragment, StreamJobFragments};
732
733    #[tokio::test]
734    async fn test_inner() {
735        let (env, ..) = setup_compute_env(8080).await;
736        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
737        assert_eq!(inner.compaction_groups.len(), 2);
738
739        async fn update_compaction_config(
740            meta: &SqlMetaStore,
741            inner: &mut CompactionGroupManager,
742            cg_ids: &[impl Into<CompactionGroupId> + Copy],
743            config_to_update: &[MutableConfig],
744        ) -> Result<()> {
745            let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
746            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
747            compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
748            commit_multi_var!(meta, compaction_groups_txn)
749        }
750
751        async fn insert_compaction_group_configs(
752            meta: &SqlMetaStore,
753            inner: &mut CompactionGroupManager,
754            cg_ids: &[u64],
755        ) {
756            let default_config = inner.default_compaction_config();
757            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
758            if compaction_groups_txn.try_create_compaction_groups(
759                &cg_ids.iter().copied().map_into().collect_vec(),
760                default_config,
761            ) {
762                commit_multi_var!(meta, compaction_groups_txn).unwrap();
763            }
764        }
765
766        update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
767            .await
768            .unwrap_err();
769        insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
770        assert_eq!(inner.compaction_groups.len(), 4);
771        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
772        assert_eq!(inner.compaction_groups.len(), 4);
773
774        update_compaction_config(
775            env.meta_store_ref(),
776            &mut inner,
777            &[100, 200],
778            &[MutableConfig::MaxSubCompaction(123)],
779        )
780        .await
781        .unwrap();
782        assert_eq!(inner.compaction_groups.len(), 4);
783        assert_eq!(
784            inner
785                .try_get_compaction_group_config(100)
786                .unwrap()
787                .compaction_config
788                .max_sub_compaction,
789            123
790        );
791        assert_eq!(
792            inner
793                .try_get_compaction_group_config(200)
794                .unwrap()
795                .compaction_config
796                .max_sub_compaction,
797            123
798        );
799    }
800
801    #[tokio::test]
802    async fn test_manager() {
803        let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
804        let table_fragment_1 = StreamJobFragments::for_test(
805            JobId::new(10),
806            BTreeMap::from([(
807                1.into(),
808                Fragment {
809                    fragment_id: 1.into(),
810                    state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
811                    ..Default::default()
812                },
813            )]),
814        );
815        let table_fragment_2 = StreamJobFragments::for_test(
816            JobId::new(20),
817            BTreeMap::from([(
818                2.into(),
819                Fragment {
820                    fragment_id: 2.into(),
821                    state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
822                    ..Default::default()
823                },
824            )]),
825        );
826
827        // Test register_table_fragments
828        let registered_number = || async {
829            compaction_group_manager
830                .list_compaction_group()
831                .await
832                .iter()
833                .map(|cg| cg.member_table_ids.len())
834                .sum::<usize>()
835        };
836        let group_number =
837            || async { compaction_group_manager.list_compaction_group().await.len() };
838        assert_eq!(registered_number().await, 0);
839
840        compaction_group_manager
841            .register_table_fragments(
842                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
843                table_fragment_1
844                    .internal_table_ids()
845                    .into_iter()
846                    .map_into()
847                    .collect(),
848            )
849            .await
850            .unwrap();
851        assert_eq!(registered_number().await, 4);
852        compaction_group_manager
853            .register_table_fragments(
854                Some(table_fragment_2.stream_job_id().as_mv_table_id()),
855                table_fragment_2
856                    .internal_table_ids()
857                    .into_iter()
858                    .map_into()
859                    .collect(),
860            )
861            .await
862            .unwrap();
863        assert_eq!(registered_number().await, 8);
864
865        // Test unregister_table_fragments
866        compaction_group_manager
867            .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
868            .await;
869        assert_eq!(registered_number().await, 4);
870
871        // Test purge_stale_members: table fragments
872        compaction_group_manager
873            .purge(&table_fragment_2.all_table_ids().collect())
874            .await
875            .unwrap();
876        assert_eq!(registered_number().await, 4);
877        compaction_group_manager
878            .purge(&HashSet::new())
879            .await
880            .unwrap();
881        assert_eq!(registered_number().await, 0);
882
883        assert_eq!(group_number().await, 2);
884
885        compaction_group_manager
886            .register_table_fragments(
887                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
888                table_fragment_1
889                    .internal_table_ids()
890                    .into_iter()
891                    .map_into()
892                    .collect(),
893            )
894            .await
895            .unwrap();
896        assert_eq!(registered_number().await, 4);
897        assert_eq!(group_number().await, 2);
898
899        compaction_group_manager
900            .unregister_table_fragments_vec(&[table_fragment_1])
901            .await;
902        assert_eq!(registered_number().await, 0);
903        assert_eq!(group_number().await, 2);
904    }
905}