Skip to main content

risingwave_meta/hummock/manager/compaction/
compaction_group_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config as default_compaction_config;
22use risingwave_common::util::epoch::INVALID_EPOCH;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_hummock_sdk::filter_utils::{
27    parse_sstable_filter_layout, parse_sstable_filter_type,
28};
29use risingwave_hummock_sdk::version::GroupDelta;
30use risingwave_meta_model::compaction_config;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{
34    CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
35};
36use sea_orm::EntityTrait;
37use tokio::sync::OnceCell;
38
39use super::CompactionGroupStatistic;
40use crate::hummock::compaction::compaction_config::{
41    CompactionConfigBuilder, validate_compaction_config,
42};
43use crate::hummock::error::{Error, Result};
44use crate::hummock::manager::transaction::HummockVersionTransaction;
45use crate::hummock::manager::versioning::Versioning;
46use crate::hummock::manager::{HummockManager, commit_multi_var};
47use crate::hummock::metrics_utils::remove_compaction_group_metrics;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::sequence::next_compaction_group_id;
50use crate::manager::MetaSrvEnv;
51use crate::model::{
52    BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
53};
54
55type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
56
57impl CompactionGroupManager {
58    pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
59        let default_config = match env.opts.compaction_config.as_ref() {
60            None => CompactionConfigBuilder::new().build(),
61            Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
62        };
63        Self::new_with_config(env, default_config).await
64    }
65
66    pub(crate) async fn new_with_config(
67        env: &MetaSrvEnv,
68        default_config: CompactionConfig,
69    ) -> Result<CompactionGroupManager> {
70        let mut compaction_group_manager = CompactionGroupManager {
71            compaction_groups: BTreeMap::new(),
72            default_config: Arc::new(default_config),
73            write_limit: Default::default(),
74        };
75
76        let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
77            compaction_config::Entity::find()
78                .all(&env.meta_store_ref().conn)
79                .await
80                .map_err(MetadataModelError::from)?
81                .into_iter()
82                .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
83                .collect();
84
85        compaction_group_manager.init(loaded_compaction_groups);
86        Ok(compaction_group_manager)
87    }
88
89    fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
90        if !loaded_compaction_groups.is_empty() {
91            self.compaction_groups = loaded_compaction_groups;
92        }
93    }
94}
95
96impl HummockManager {
97    /// Should not be called inside [`HummockManager`], because it requests locks internally.
98    /// The implementation acquires `versioning` lock.
99    pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
100        get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
101    }
102
103    /// The implementation acquires `compaction_group_manager` lock.
104    pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
105        self.compaction_group_manager
106            .read()
107            .await
108            .compaction_groups
109            .clone()
110    }
111
112    #[cfg(test)]
113    /// Registers `table_fragments` to compaction groups.
114    pub async fn register_table_fragments(
115        &self,
116        mv_table: Option<TableId>,
117        mut internal_tables: Vec<TableId>,
118    ) -> Result<()> {
119        let mut pairs = vec![];
120        if let Some(mv_table) = mv_table {
121            if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
122                tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
123            }
124            // materialized_view
125            pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
126        }
127        // internal states
128        for table_id in internal_tables {
129            pairs.push((table_id, StaticCompactionGroupId::StateDefault));
130        }
131        self.register_table_ids_for_test(&pairs).await?;
132        Ok(())
133    }
134
135    #[cfg(test)]
136    /// Unregisters `table_fragments` from compaction groups
137    pub async fn unregister_table_fragments_vec(
138        &self,
139        table_fragments: &[crate::model::StreamJobFragments],
140    ) {
141        self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
142            .await
143            .unwrap();
144    }
145
146    /// Unregisters stale members and groups
147    /// The caller should ensure `table_fragments_list` remain unchanged during `purge`.
148    /// Currently `purge` is only called during meta service start ups.
149    pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
150        let to_unregister = self
151            .versioning
152            .read()
153            .await
154            .current_version
155            .state_table_info
156            .info()
157            .keys()
158            .cloned()
159            .filter(|table_id| !valid_ids.contains(table_id))
160            .collect_vec();
161
162        // As we have released versioning lock, the version that `to_unregister` is calculated from
163        // may not be the same as the one used in unregister_table_ids. It is OK.
164        self.unregister_table_ids(to_unregister).await
165    }
166
167    /// The implementation acquires `versioning` lock.
168    ///
169    /// The method name is temporarily added with a `_for_test` prefix to mark
170    /// that it's currently only used in test.
171    pub async fn register_table_ids_for_test(
172        &self,
173        pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
174    ) -> Result<()> {
175        if pairs.is_empty() {
176            return Ok(());
177        }
178        let mut versioning_guard = self.versioning.write().await;
179        let versioning = versioning_guard.deref_mut();
180        let mut compaction_group_manager = self.compaction_group_manager.write().await;
181        let current_version = &versioning.current_version;
182        let default_config = compaction_group_manager.default_compaction_config();
183        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
184
185        for (table_id, _) in pairs {
186            let table_id = (*table_id).into();
187            if let Some(info) = current_version.state_table_info.info().get(&table_id) {
188                return Err(Error::CompactionGroup(format!(
189                    "table {} already {:?}",
190                    table_id, info
191                )));
192            }
193        }
194        // All NewCompactionGroup pairs are mapped to one new compaction group.
195        let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
196        let mut version = HummockVersionTransaction::new(
197            &mut versioning.current_version,
198            &mut versioning.hummock_version_deltas,
199            &mut versioning.table_change_log,
200            self.env.notification_manager(),
201            None,
202            &self.metrics,
203            &self.env.opts,
204        );
205        let mut new_version_delta = version.new_delta();
206
207        let committed_epoch = new_version_delta
208            .latest_version()
209            .state_table_info
210            .info()
211            .values()
212            .map(|info| info.committed_epoch)
213            .max()
214            .unwrap_or(INVALID_EPOCH);
215
216        for (table_id, raw_group_id) in pairs {
217            let table_id = (*table_id).into();
218            let mut group_id = *raw_group_id;
219            if group_id == StaticCompactionGroupId::NewCompactionGroup {
220                let mut is_group_init = false;
221                group_id = *new_compaction_group_id
222                    .get_or_try_init(|| async {
223                        next_compaction_group_id(&self.env).await.inspect(|_| {
224                            is_group_init = true;
225                        })
226                    })
227                    .await?;
228                if is_group_init {
229                    let group_deltas = &mut new_version_delta
230                        .group_deltas
231                        .entry(group_id)
232                        .or_default()
233                        .group_deltas;
234
235                    let config =
236                        match compaction_groups_txn.try_get_compaction_group_config(group_id) {
237                            Some(config) => config.compaction_config.as_ref().clone(),
238                            None => {
239                                compaction_groups_txn
240                                    .create_compaction_groups(group_id, default_config.clone());
241                                default_config.as_ref().clone()
242                            }
243                        };
244
245                    let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
246                        group_config: Some(config),
247                        group_id,
248                        ..Default::default()
249                    }));
250
251                    group_deltas.push(group_delta);
252                }
253            }
254            assert!(
255                new_version_delta
256                    .state_table_info_delta
257                    .insert(
258                        table_id,
259                        PbStateTableInfoDelta {
260                            committed_epoch,
261                            compaction_group_id: *raw_group_id,
262                        }
263                    )
264                    .is_none()
265            );
266        }
267        new_version_delta.pre_apply();
268        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
269
270        Ok(())
271    }
272
273    pub async fn unregister_table_ids(
274        &self,
275        table_ids: impl IntoIterator<Item = TableId>,
276    ) -> Result<()> {
277        let table_ids = table_ids.into_iter().collect_vec();
278        if table_ids.is_empty() {
279            return Ok(());
280        }
281
282        {
283            // Remove table write throughput statistics
284            // The Caller acquires `Send`, so we should safely use `write` lock before the await point.
285            // The table write throughput statistic accepts data inconsistencies (unregister table ids fail), so we can clean it up in advance.
286            let mut table_write_throughput_statistic_manager =
287                self.table_write_throughput_statistic_manager.write();
288            for &table_id in table_ids.iter().unique() {
289                table_write_throughput_statistic_manager.remove_table(table_id);
290            }
291        }
292
293        let mut versioning_guard = self.versioning.write().await;
294        let versioning = versioning_guard.deref_mut();
295        let mut version = HummockVersionTransaction::new(
296            &mut versioning.current_version,
297            &mut versioning.hummock_version_deltas,
298            &mut versioning.table_change_log,
299            self.env.notification_manager(),
300            None,
301            &self.metrics,
302            &self.env.opts,
303        );
304        let mut new_version_delta = version.new_delta();
305        struct UnregisterGroupChange {
306            remaining_member_count: usize,
307            removed_table_ids: HashSet<TableId>,
308        }
309        let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
310        // Remove member tables
311        for table_id in table_ids.into_iter().unique() {
312            let version = new_version_delta.latest_version();
313            let Some(info) = version.state_table_info.info().get(&table_id) else {
314                continue;
315            };
316            let compaction_group_id = info.compaction_group_id;
317
318            let group_change =
319                group_changes
320                    .entry(compaction_group_id)
321                    .or_insert_with(|| UnregisterGroupChange {
322                        remaining_member_count: version
323                            .state_table_info
324                            .compaction_group_member_tables()
325                            .get(&compaction_group_id)
326                            .expect("should exist")
327                            .len(),
328                        removed_table_ids: HashSet::new(),
329                    });
330            group_change.remaining_member_count = group_change
331                .remaining_member_count
332                .checked_sub(1)
333                .expect("member table count should be positive");
334            assert!(group_change.removed_table_ids.insert(table_id));
335            new_version_delta.removed_table_ids.insert(table_id);
336        }
337
338        for (group_id, change) in group_changes {
339            if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
340                let max_level = new_version_delta
341                    .latest_version()
342                    .get_compaction_group_levels(group_id)
343                    .levels
344                    .len();
345                new_version_delta
346                    .group_deltas
347                    .entry(group_id)
348                    .or_default()
349                    .group_deltas
350                    .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
351                remove_compaction_group_metrics(&self.metrics, group_id, max_level);
352                // clean up compaction schedule state for the removed group
353                self.compaction_state.remove_compaction_group(group_id);
354            } else {
355                new_version_delta
356                    .group_deltas
357                    .entry(group_id)
358                    .or_default()
359                    .group_deltas
360                    .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
361            }
362        }
363
364        new_version_delta.pre_apply();
365
366        // Purge may cause write to meta store. If it hurts performance while holding versioning
367        // lock, consider to make it in batch.
368        let mut compaction_group_manager = self.compaction_group_manager.write().await;
369        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
370
371        compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
372            version.latest_version(),
373        )));
374        commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
375
376        // No need to handle DeltaType::GroupDestroy during time travel.
377        Ok(())
378    }
379
380    pub async fn update_compaction_config(
381        &self,
382        compaction_group_ids: &[CompactionGroupId],
383        config_to_update: &[MutableConfig],
384    ) -> Result<()> {
385        {
386            // Avoid lock conflicts with `try_update_write_limits``
387            let mut compaction_group_manager = self.compaction_group_manager.write().await;
388            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
389            compaction_groups_txn
390                .update_compaction_config(compaction_group_ids, config_to_update)?;
391            commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
392        }
393
394        if config_to_update
395            .iter()
396            .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
397        {
398            // Update write limits with lock
399            self.try_update_write_limits(compaction_group_ids).await;
400        }
401
402        Ok(())
403    }
404
405    /// Gets complete compaction group info.
406    /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig`
407    pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
408        let mut versioning_guard = self.versioning.write().await;
409        let versioning = versioning_guard.deref_mut();
410        let current_version = &versioning.current_version;
411        let mut results = vec![];
412        let compaction_group_manager = self.compaction_group_manager.read().await;
413
414        for levels in current_version.levels.values() {
415            let compaction_config = compaction_group_manager
416                .try_get_compaction_group_config(levels.group_id)
417                .unwrap()
418                .compaction_config
419                .as_ref()
420                .clone();
421            let group = CompactionGroupInfo {
422                id: levels.group_id,
423                parent_id: levels.parent_group_id,
424                member_table_ids: current_version
425                    .state_table_info
426                    .compaction_group_member_table_ids(levels.group_id)
427                    .iter()
428                    .copied()
429                    .collect_vec(),
430                compaction_config: Some(compaction_config),
431            };
432            results.push(group);
433        }
434        results
435    }
436
437    pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
438        let mut infos = vec![];
439        {
440            let versioning_guard = self.versioning.read().await;
441            let manager = self.compaction_group_manager.read().await;
442            let version = &versioning_guard.current_version;
443            for group_id in version.levels.keys() {
444                let mut group_info = CompactionGroupStatistic {
445                    group_id: *group_id,
446                    ..Default::default()
447                };
448                for table_id in version
449                    .state_table_info
450                    .compaction_group_member_table_ids(*group_id)
451                {
452                    let stats_size = versioning_guard
453                        .version_stats
454                        .table_stats
455                        .get(table_id)
456                        .map(|stats| stats.total_key_size + stats.total_value_size)
457                        .unwrap_or(0);
458                    let table_size = std::cmp::max(stats_size, 0) as u64;
459                    group_info.group_size += table_size;
460                    group_info.table_statistic.insert(*table_id, table_size);
461                    group_info.compaction_group_config =
462                        manager.try_get_compaction_group_config(*group_id).unwrap();
463                }
464                infos.push(group_info);
465            }
466        };
467        infos
468    }
469
470    pub(crate) async fn initial_compaction_group_config_after_load(
471        &self,
472        versioning_guard: &Versioning,
473        compaction_group_manager: &mut CompactionGroupManager,
474    ) -> Result<()> {
475        // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts.
476        let current_version = &versioning_guard.current_version;
477        let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
478        let default_config = compaction_group_manager.default_compaction_config();
479        let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
480        compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
481        commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
482
483        Ok(())
484    }
485}
486
487/// We muse ensure there is an entry exists in [`CompactionGroupManager`] for any
488/// compaction group found in current hummock version. That's done by invoking
489/// `get_or_insert_compaction_group_config` or `get_or_insert_compaction_group_configs` before
490/// adding any group in current hummock version:
491/// 1. initialize default static compaction group.
492/// 2. register new table to new compaction group.
493/// 3. move existent table to new compaction group.
494pub(crate) struct CompactionGroupManager {
495    compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
496    default_config: Arc<CompactionConfig>,
497    /// Tables that write limit is trigger for.
498    pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
499}
500
501impl CompactionGroupManager {
502    /// Starts a transaction to update compaction group configs.
503    pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
504        CompactionGroupTransaction::new(&mut self.compaction_groups)
505    }
506
507    #[expect(clippy::type_complexity)]
508    pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
509        inner: P,
510    ) -> BTreeMapTransactionInner<
511        CompactionGroupId,
512        CompactionGroup,
513        DerefMutForward<
514            Self,
515            BTreeMap<CompactionGroupId, CompactionGroup>,
516            P,
517            impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
518            impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
519        >,
520    > {
521        BTreeMapTransactionInner::new(DerefMutForward::new(
522            inner,
523            |mgr| &mgr.compaction_groups,
524            |mgr| &mut mgr.compaction_groups,
525        ))
526    }
527
528    /// Tries to get compaction group config for `compaction_group_id`.
529    pub(crate) fn try_get_compaction_group_config(
530        &self,
531        compaction_group_id: impl Into<CompactionGroupId>,
532    ) -> Option<CompactionGroup> {
533        self.compaction_groups
534            .get(&compaction_group_id.into())
535            .cloned()
536    }
537
538    /// Tries to get compaction group config for `compaction_group_id`.
539    pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
540        self.default_config.clone()
541    }
542}
543
544fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) -> Result<()> {
545    for item in items {
546        match item {
547            MutableConfig::MaxBytesForLevelBase(c) => {
548                target.max_bytes_for_level_base = *c;
549            }
550            MutableConfig::MaxBytesForLevelMultiplier(c) => {
551                target.max_bytes_for_level_multiplier = *c;
552            }
553            MutableConfig::MaxCompactionBytes(c) => {
554                target.max_compaction_bytes = *c;
555            }
556            MutableConfig::SubLevelMaxCompactionBytes(c) => {
557                target.sub_level_max_compaction_bytes = *c;
558            }
559            MutableConfig::Level0TierCompactFileNumber(c) => {
560                target.level0_tier_compact_file_number = *c;
561            }
562            MutableConfig::TargetFileSizeBase(c) => {
563                target.target_file_size_base = *c;
564            }
565            MutableConfig::CompactionFilterMask(c) => {
566                target.compaction_filter_mask = *c;
567            }
568            MutableConfig::MaxSubCompaction(c) => {
569                target.max_sub_compaction = *c;
570            }
571            MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
572                target.level0_stop_write_threshold_sub_level_number = *c;
573            }
574            MutableConfig::Level0SubLevelCompactLevelCount(c) => {
575                target.level0_sub_level_compact_level_count = *c;
576            }
577            MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
578                target.level0_overlapping_sub_level_compact_level_count = *c;
579            }
580            MutableConfig::MaxSpaceReclaimBytes(c) => {
581                target.max_space_reclaim_bytes = *c;
582            }
583            MutableConfig::Level0MaxCompactFileNumber(c) => {
584                target.level0_max_compact_file_number = *c;
585            }
586            MutableConfig::EnableEmergencyPicker(c) => {
587                target.enable_emergency_picker = *c;
588            }
589            MutableConfig::TombstoneReclaimRatio(c) => {
590                target.tombstone_reclaim_ratio = *c;
591            }
592            MutableConfig::CompressionAlgorithm(c) => {
593                let level = c.get_level();
594                let max_level = try_u32_max_level(target.max_level)?;
595                if level > max_level {
596                    return Err(Error::CompactionGroup(format!(
597                        "invalid compression_algorithm level {}, max_level is {}",
598                        level, target.max_level
599                    )));
600                }
601
602                let Some(algorithm) = target.compression_algorithm.get_mut(level as usize) else {
603                    return Err(Error::CompactionGroup(format!(
604                        "invalid compression_algorithm level {}, compression_algorithm len is {}",
605                        level,
606                        target.compression_algorithm.len()
607                    )));
608                };
609                algorithm.clone_from(&c.compression_algorithm);
610            }
611            MutableConfig::ResetCompressionAlgorithm(reset) => {
612                if *reset {
613                    target.compression_algorithm =
614                        default_compaction_config::compression_algorithm_vec(try_u32_max_level(
615                            target.max_level,
616                        )?);
617                }
618            }
619            MutableConfig::MaxL0CompactLevelCount(c) => {
620                target.max_l0_compact_level_count = Some(*c);
621            }
622            MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
623                target.sst_allowed_trivial_move_min_size = Some(*c);
624            }
625            MutableConfig::SplitWeightByVnode(c) => {
626                target.split_weight_by_vnode = *c;
627            }
628            MutableConfig::DisableAutoGroupScheduling(c) => {
629                target.disable_auto_group_scheduling = Some(*c);
630            }
631            MutableConfig::MaxOverlappingLevelSize(c) => {
632                target.max_overlapping_level_size = Some(*c);
633            }
634            MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
635                target.sst_allowed_trivial_move_max_count = Some(*c);
636            }
637            MutableConfig::EmergencyLevel0SstFileCount(c) => {
638                target.emergency_level0_sst_file_count = Some(*c);
639            }
640            MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
641                target.emergency_level0_sub_level_partition = Some(*c);
642            }
643            MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
644                target.level0_stop_write_threshold_max_sst_count = Some(*c);
645            }
646            MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
647                target.level0_stop_write_threshold_max_size = Some(*c);
648            }
649            MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
650                target.enable_optimize_l0_interval_selection = Some(*c);
651            }
652            #[expect(deprecated)]
653            MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
654                // Deprecated. Keep accepting the field for old clients but do not apply it.
655            }
656            MutableConfig::MaxKvCountForXor16(c) => {
657                target.max_kv_count_for_xor16 = optional_non_sentinel_u64_config(*c);
658            }
659            MutableConfig::MaxVnodeKeyRangeBytes(c) => {
660                target.max_vnode_key_range_bytes = optional_positive_u64_config(*c);
661            }
662            MutableConfig::SstableFilterType(c) => {
663                parse_sstable_filter_type(&c.filter_type).map_err(Error::CompactionGroup)?;
664                if target.sstable_filter_type.is_empty() {
665                    target.sstable_filter_type = default_compaction_config::sstable_filter_type();
666                    target
667                        .sstable_filter_type
668                        .resize(target.max_level as usize + 1, "xor16".to_owned());
669                }
670                let idx = c.get_level() as usize;
671                let level_entry = target.sstable_filter_type.get_mut(idx).ok_or_else(|| {
672                    Error::CompactionGroup(format!(
673                        "sstable_filter_type level {} is out of range",
674                        idx
675                    ))
676                })?;
677                level_entry.clone_from(&c.filter_type);
678            }
679            MutableConfig::SstableFilterLayout(c) => {
680                parse_sstable_filter_layout(&c.layout).map_err(Error::CompactionGroup)?;
681                if target.sstable_filter_layout.is_empty() {
682                    target.sstable_filter_layout =
683                        default_compaction_config::sstable_filter_layout();
684                    target
685                        .sstable_filter_layout
686                        .resize(target.max_level as usize + 1, "blocked".to_owned());
687                }
688                let idx = c.get_level() as usize;
689                let level_entry = target.sstable_filter_layout.get_mut(idx).ok_or_else(|| {
690                    Error::CompactionGroup(format!(
691                        "sstable_filter_layout level {} is out of range",
692                        idx
693                    ))
694                })?;
695                level_entry.clone_from(&c.layout);
696            }
697        }
698    }
699    Ok(())
700}
701
702fn optional_u64_config(value: u64) -> Option<u64> {
703    (value != u64::MIN && value != u64::MAX).then_some(value)
704}
705
706fn optional_non_sentinel_u64_config(value: u64) -> Option<u64> {
707    (value != u64::MAX).then_some(value)
708}
709
710fn optional_positive_u64_config(value: u64) -> Option<u64> {
711    optional_u64_config(value).filter(|value| *value > 0)
712}
713
714fn try_u32_max_level(max_level: u64) -> Result<u32> {
715    u32::try_from(max_level).map_err(|_| {
716        Error::CompactionGroup(format!(
717            "invalid max_level {}, expect <= {}",
718            max_level,
719            u32::MAX
720        ))
721    })
722}
723
724impl CompactionGroupTransaction<'_> {
725    /// Inserts compaction group configs if they do not exist.
726    pub fn try_create_compaction_groups(
727        &mut self,
728        compaction_group_ids: &[CompactionGroupId],
729        config: Arc<CompactionConfig>,
730    ) -> bool {
731        let mut trivial = true;
732        for id in compaction_group_ids {
733            if self.contains_key(id) {
734                continue;
735            }
736            let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
737            self.insert(*id, new_entry);
738
739            trivial = false;
740        }
741
742        !trivial
743    }
744
745    pub fn create_compaction_groups(
746        &mut self,
747        compaction_group_id: CompactionGroupId,
748        config: Arc<CompactionConfig>,
749    ) {
750        self.try_create_compaction_groups(&[compaction_group_id], config);
751    }
752
753    /// Tries to get compaction group config for `compaction_group_id`.
754    pub(crate) fn try_get_compaction_group_config(
755        &self,
756        compaction_group_id: CompactionGroupId,
757    ) -> Option<&CompactionGroup> {
758        self.get(&compaction_group_id)
759    }
760
761    /// Removes stale group configs.
762    pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
763        let stale_group = self
764            .tree_ref()
765            .keys()
766            .cloned()
767            .filter(|k| !existing_groups.contains(k))
768            .collect_vec();
769        if stale_group.is_empty() {
770            return;
771        }
772        for group in stale_group {
773            self.remove(group);
774        }
775    }
776
777    pub(crate) fn update_compaction_config(
778        &mut self,
779        compaction_group_ids: &[CompactionGroupId],
780        config_to_update: &[MutableConfig],
781    ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
782        let mut results = HashMap::default();
783        for compaction_group_id in compaction_group_ids.iter().unique() {
784            let group = self.get(compaction_group_id).ok_or_else(|| {
785                Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
786            })?;
787            let mut config = group.compaction_config.as_ref().clone();
788            update_compaction_config(&mut config, config_to_update)?;
789            if let Err(reason) = validate_compaction_config(&config) {
790                return Err(Error::CompactionGroup(reason));
791            }
792            let mut new_group = group.clone();
793            new_group.compaction_config = Arc::new(config);
794            self.insert(*compaction_group_id, new_group.clone());
795            results.insert(new_group.group_id(), new_group);
796        }
797
798        Ok(results)
799    }
800}
801
802#[cfg(test)]
803mod tests {
804    use std::collections::{BTreeMap, HashSet};
805    use std::sync::Arc;
806
807    use itertools::Itertools;
808    use risingwave_common::id::JobId;
809    use risingwave_hummock_sdk::CompactionGroupId;
810    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
811    use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
812        CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
813    };
814
815    use crate::controller::SqlMetaStore;
816    use crate::hummock::commit_multi_var;
817    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
818    use crate::hummock::error::Result;
819    use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
820    use crate::hummock::test_utils::setup_compute_env;
821    use crate::model::{Fragment, StreamJobFragments};
822
823    #[test]
824    fn test_update_compaction_config_filter_type_layout_backward_compat() {
825        let mut config = CompactionConfigBuilder::new().build();
826        config.sstable_filter_type.clear();
827        config.sstable_filter_layout.clear();
828
829        super::update_compaction_config(
830            &mut config,
831            &[MutableConfig::SstableFilterType(SstableFilterType {
832                level: 0,
833                filter_type: "xor8".to_owned(),
834            })],
835        )
836        .unwrap();
837        assert_eq!(
838            config.sstable_filter_type.len(),
839            config.max_level as usize + 1
840        );
841        assert_eq!(config.sstable_filter_type[0], "xor8");
842        assert_eq!(config.sstable_filter_type[5], "xor8");
843
844        super::update_compaction_config(
845            &mut config,
846            &[MutableConfig::SstableFilterType(SstableFilterType {
847                level: 1,
848                filter_type: "none".to_owned(),
849            })],
850        )
851        .unwrap();
852        assert_eq!(config.sstable_filter_type[1], "none");
853
854        super::update_compaction_config(
855            &mut config,
856            &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
857                level: 1,
858                layout: "plain".to_owned(),
859            })],
860        )
861        .unwrap();
862        assert_eq!(
863            config.sstable_filter_layout.len(),
864            config.max_level as usize + 1
865        );
866        assert_eq!(config.sstable_filter_layout[1], "plain");
867        assert_eq!(config.sstable_filter_layout[2], "blocked");
868    }
869
870    #[test]
871    fn test_update_compaction_config_rejects_out_of_range_level() {
872        let mut config = CompactionConfigBuilder::new().build();
873        let oob = config.max_level as u32 + 1;
874
875        assert!(
876            super::update_compaction_config(
877                &mut config,
878                &[MutableConfig::SstableFilterType(SstableFilterType {
879                    level: oob,
880                    filter_type: "xor8".to_owned(),
881                })],
882            )
883            .is_err()
884        );
885
886        assert!(
887            super::update_compaction_config(
888                &mut config,
889                &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
890                    level: oob,
891                    layout: "plain".to_owned(),
892                })],
893            )
894            .is_err()
895        );
896
897        assert!(
898            super::update_compaction_config(
899                &mut config,
900                &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
901                    level: oob,
902                    compression_algorithm: "Zstd".to_owned(),
903                })],
904            )
905            .is_err()
906        );
907    }
908
909    #[test]
910    fn test_update_compaction_config_rejects_invalid_filter_metadata() {
911        let mut config = CompactionConfigBuilder::new().build();
912
913        assert!(
914            super::update_compaction_config(
915                &mut config,
916                &[MutableConfig::SstableFilterType(SstableFilterType {
917                    level: 0,
918                    filter_type: "unknown".to_owned(),
919                })],
920            )
921            .is_err()
922        );
923
924        let mut config = CompactionConfigBuilder::new().build();
925
926        assert!(
927            super::update_compaction_config(
928                &mut config,
929                &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
930                    level: 0,
931                    layout: "unknown".to_owned(),
932                })],
933            )
934            .is_err()
935        );
936    }
937
938    #[test]
939    fn test_reset_compression_algorithm_false_is_noop() {
940        let mut config = CompactionConfigBuilder::new().build();
941        config.compression_algorithm[3] = "Zstd".to_owned();
942
943        super::update_compaction_config(
944            &mut config,
945            &[MutableConfig::ResetCompressionAlgorithm(false)],
946        )
947        .unwrap();
948
949        assert_eq!(config.compression_algorithm[3], "Zstd");
950    }
951
952    #[tokio::test]
953    async fn test_inner() {
954        let (env, ..) = setup_compute_env(8080).await;
955        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
956        assert_eq!(inner.compaction_groups.len(), 2);
957
958        async fn update_compaction_config(
959            meta: &SqlMetaStore,
960            inner: &mut CompactionGroupManager,
961            cg_ids: &[impl Into<CompactionGroupId> + Copy],
962            config_to_update: &[MutableConfig],
963        ) -> Result<()> {
964            let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
965            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
966            compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
967            commit_multi_var!(meta, compaction_groups_txn)
968        }
969
970        async fn insert_compaction_group_configs(
971            meta: &SqlMetaStore,
972            inner: &mut CompactionGroupManager,
973            cg_ids: &[u64],
974        ) {
975            let default_config = inner.default_compaction_config();
976            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
977            if compaction_groups_txn.try_create_compaction_groups(
978                &cg_ids.iter().copied().map_into().collect_vec(),
979                default_config,
980            ) {
981                commit_multi_var!(meta, compaction_groups_txn).unwrap();
982            }
983        }
984
985        async fn insert_compaction_group_config_with_max_level(
986            meta: &SqlMetaStore,
987            inner: &mut CompactionGroupManager,
988            cg_id: u64,
989            max_level: u64,
990        ) {
991            let mut config = inner.default_compaction_config().as_ref().clone();
992            config.max_level = max_level;
993            config.compression_algorithm =
994                super::default_compaction_config::compression_algorithm_vec(
995                    super::try_u32_max_level(max_level).expect("max_level should fit u32 in test"),
996                );
997            let mut compaction_groups_txn = inner.start_compaction_groups_txn();
998            compaction_groups_txn.create_compaction_groups(cg_id.into(), Arc::new(config));
999            commit_multi_var!(meta, compaction_groups_txn).unwrap();
1000        }
1001
1002        update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
1003            .await
1004            .unwrap_err();
1005        insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
1006        assert_eq!(inner.compaction_groups.len(), 4);
1007        let mut inner = CompactionGroupManager::new(&env).await.unwrap();
1008        assert_eq!(inner.compaction_groups.len(), 4);
1009
1010        update_compaction_config(
1011            env.meta_store_ref(),
1012            &mut inner,
1013            &[100, 200],
1014            &[MutableConfig::MaxSubCompaction(123)],
1015        )
1016        .await
1017        .unwrap();
1018        assert_eq!(inner.compaction_groups.len(), 4);
1019        assert_eq!(
1020            inner
1021                .try_get_compaction_group_config(100)
1022                .unwrap()
1023                .compaction_config
1024                .max_sub_compaction,
1025            123
1026        );
1027        assert_eq!(
1028            inner
1029                .try_get_compaction_group_config(200)
1030                .unwrap()
1031                .compaction_config
1032                .max_sub_compaction,
1033            123
1034        );
1035
1036        insert_compaction_group_config_with_max_level(env.meta_store_ref(), &mut inner, 300, 4)
1037            .await;
1038        update_compaction_config(
1039            env.meta_store_ref(),
1040            &mut inner,
1041            &[300],
1042            &[MutableConfig::ResetCompressionAlgorithm(true)],
1043        )
1044        .await
1045        .unwrap();
1046        assert_eq!(
1047            inner
1048                .try_get_compaction_group_config(300)
1049                .unwrap()
1050                .compaction_config
1051                .compression_algorithm,
1052            super::default_compaction_config::compression_algorithm_vec(4)
1053        );
1054        let err = update_compaction_config(
1055            env.meta_store_ref(),
1056            &mut inner,
1057            &[300],
1058            &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
1059                level: 6,
1060                compression_algorithm: "Zstd".to_owned(),
1061            })],
1062        )
1063        .await
1064        .unwrap_err();
1065        assert!(
1066            err.to_string()
1067                .contains("invalid compression_algorithm level 6")
1068        );
1069
1070        update_compaction_config(
1071            env.meta_store_ref(),
1072            &mut inner,
1073            &[100],
1074            &[MutableConfig::MaxKvCountForXor16(0)],
1075        )
1076        .await
1077        .unwrap();
1078        assert_eq!(
1079            inner
1080                .try_get_compaction_group_config(100)
1081                .unwrap()
1082                .compaction_config
1083                .max_kv_count_for_xor16,
1084            Some(0)
1085        );
1086        update_compaction_config(
1087            env.meta_store_ref(),
1088            &mut inner,
1089            &[100],
1090            &[MutableConfig::MaxKvCountForXor16(1024)],
1091        )
1092        .await
1093        .unwrap();
1094        assert_eq!(
1095            inner
1096                .try_get_compaction_group_config(100)
1097                .unwrap()
1098                .compaction_config
1099                .max_kv_count_for_xor16,
1100            Some(1024)
1101        );
1102        update_compaction_config(
1103            env.meta_store_ref(),
1104            &mut inner,
1105            &[100],
1106            &[MutableConfig::MaxKvCountForXor16(u64::MAX)],
1107        )
1108        .await
1109        .unwrap();
1110        assert_eq!(
1111            inner
1112                .try_get_compaction_group_config(100)
1113                .unwrap()
1114                .compaction_config
1115                .max_kv_count_for_xor16,
1116            None
1117        );
1118
1119        update_compaction_config(
1120            env.meta_store_ref(),
1121            &mut inner,
1122            &[100],
1123            &[MutableConfig::MaxVnodeKeyRangeBytes(0)],
1124        )
1125        .await
1126        .unwrap();
1127        assert_eq!(
1128            inner
1129                .try_get_compaction_group_config(100)
1130                .unwrap()
1131                .compaction_config
1132                .max_vnode_key_range_bytes,
1133            None
1134        );
1135        update_compaction_config(
1136            env.meta_store_ref(),
1137            &mut inner,
1138            &[100],
1139            &[MutableConfig::MaxVnodeKeyRangeBytes(1024)],
1140        )
1141        .await
1142        .unwrap();
1143        assert_eq!(
1144            inner
1145                .try_get_compaction_group_config(100)
1146                .unwrap()
1147                .compaction_config
1148                .max_vnode_key_range_bytes,
1149            Some(1024)
1150        );
1151    }
1152
1153    #[tokio::test]
1154    async fn test_manager() {
1155        let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
1156        let table_fragment_1 = StreamJobFragments::for_test(
1157            JobId::new(10),
1158            BTreeMap::from([(
1159                1.into(),
1160                Fragment {
1161                    fragment_id: 1.into(),
1162                    state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
1163                    ..Default::default()
1164                },
1165            )]),
1166        );
1167        let table_fragment_2 = StreamJobFragments::for_test(
1168            JobId::new(20),
1169            BTreeMap::from([(
1170                2.into(),
1171                Fragment {
1172                    fragment_id: 2.into(),
1173                    state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
1174                    ..Default::default()
1175                },
1176            )]),
1177        );
1178
1179        // Test register_table_fragments
1180        let registered_number = || async {
1181            compaction_group_manager
1182                .list_compaction_group()
1183                .await
1184                .iter()
1185                .map(|cg| cg.member_table_ids.len())
1186                .sum::<usize>()
1187        };
1188        let group_number =
1189            || async { compaction_group_manager.list_compaction_group().await.len() };
1190        assert_eq!(registered_number().await, 0);
1191
1192        compaction_group_manager
1193            .register_table_fragments(
1194                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1195                table_fragment_1
1196                    .internal_table_ids()
1197                    .into_iter()
1198                    .map_into()
1199                    .collect(),
1200            )
1201            .await
1202            .unwrap();
1203        assert_eq!(registered_number().await, 4);
1204        compaction_group_manager
1205            .register_table_fragments(
1206                Some(table_fragment_2.stream_job_id().as_mv_table_id()),
1207                table_fragment_2
1208                    .internal_table_ids()
1209                    .into_iter()
1210                    .map_into()
1211                    .collect(),
1212            )
1213            .await
1214            .unwrap();
1215        assert_eq!(registered_number().await, 8);
1216
1217        // Test unregister_table_fragments
1218        compaction_group_manager
1219            .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
1220            .await;
1221        assert_eq!(registered_number().await, 4);
1222
1223        // Test purge_stale_members: table fragments
1224        compaction_group_manager
1225            .purge(&table_fragment_2.all_table_ids().collect())
1226            .await
1227            .unwrap();
1228        assert_eq!(registered_number().await, 4);
1229        compaction_group_manager
1230            .purge(&HashSet::new())
1231            .await
1232            .unwrap();
1233        assert_eq!(registered_number().await, 0);
1234
1235        assert_eq!(group_number().await, 2);
1236
1237        compaction_group_manager
1238            .register_table_fragments(
1239                Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1240                table_fragment_1
1241                    .internal_table_ids()
1242                    .into_iter()
1243                    .map_into()
1244                    .collect(),
1245            )
1246            .await
1247            .unwrap();
1248        assert_eq!(registered_number().await, 4);
1249        assert_eq!(group_number().await, 2);
1250
1251        compaction_group_manager
1252            .unregister_table_fragments_vec(&[table_fragment_1])
1253            .await;
1254        assert_eq!(registered_number().await, 0);
1255        assert_eq!(group_number().await, 2);
1256    }
1257}