1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config as default_compaction_config;
22use risingwave_common::util::epoch::INVALID_EPOCH;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_hummock_sdk::version::GroupDelta;
27use risingwave_meta_model::compaction_config;
28use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
29use risingwave_pb::hummock::write_limits::WriteLimit;
30use risingwave_pb::hummock::{
31 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
32};
33use sea_orm::EntityTrait;
34use tokio::sync::OnceCell;
35
36use super::CompactionGroupStatistic;
37use crate::hummock::compaction::compaction_config::{
38 CompactionConfigBuilder, validate_compaction_config,
39};
40use crate::hummock::error::{Error, Result};
41use crate::hummock::manager::transaction::HummockVersionTransaction;
42use crate::hummock::manager::versioning::Versioning;
43use crate::hummock::manager::{HummockManager, commit_multi_var};
44use crate::hummock::metrics_utils::remove_compaction_group_metrics;
45use crate::hummock::model::CompactionGroup;
46use crate::hummock::sequence::next_compaction_group_id;
47use crate::manager::MetaSrvEnv;
48use crate::model::{
49 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
50};
51
52type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
53
54impl CompactionGroupManager {
55 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
56 let default_config = match env.opts.compaction_config.as_ref() {
57 None => CompactionConfigBuilder::new().build(),
58 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
59 };
60 Self::new_with_config(env, default_config).await
61 }
62
63 pub(crate) async fn new_with_config(
64 env: &MetaSrvEnv,
65 default_config: CompactionConfig,
66 ) -> Result<CompactionGroupManager> {
67 let mut compaction_group_manager = CompactionGroupManager {
68 compaction_groups: BTreeMap::new(),
69 default_config: Arc::new(default_config),
70 write_limit: Default::default(),
71 };
72
73 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
74 compaction_config::Entity::find()
75 .all(&env.meta_store_ref().conn)
76 .await
77 .map_err(MetadataModelError::from)?
78 .into_iter()
79 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
80 .collect();
81
82 compaction_group_manager.init(loaded_compaction_groups);
83 Ok(compaction_group_manager)
84 }
85
86 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
87 if !loaded_compaction_groups.is_empty() {
88 self.compaction_groups = loaded_compaction_groups;
89 }
90 }
91}
92
93impl HummockManager {
94 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
97 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
98 }
99
100 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
102 self.compaction_group_manager
103 .read()
104 .await
105 .compaction_groups
106 .clone()
107 }
108
109 #[cfg(test)]
110 pub async fn register_table_fragments(
112 &self,
113 mv_table: Option<TableId>,
114 mut internal_tables: Vec<TableId>,
115 ) -> Result<()> {
116 let mut pairs = vec![];
117 if let Some(mv_table) = mv_table {
118 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
119 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
120 }
121 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
123 }
124 for table_id in internal_tables {
126 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
127 }
128 self.register_table_ids_for_test(&pairs).await?;
129 Ok(())
130 }
131
132 #[cfg(test)]
133 pub async fn unregister_table_fragments_vec(
135 &self,
136 table_fragments: &[crate::model::StreamJobFragments],
137 ) {
138 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
139 .await
140 .unwrap();
141 }
142
143 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
147 let to_unregister = self
148 .versioning
149 .read()
150 .await
151 .current_version
152 .state_table_info
153 .info()
154 .keys()
155 .cloned()
156 .filter(|table_id| !valid_ids.contains(table_id))
157 .collect_vec();
158
159 self.unregister_table_ids(to_unregister).await
162 }
163
164 pub async fn register_table_ids_for_test(
169 &self,
170 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
171 ) -> Result<()> {
172 if pairs.is_empty() {
173 return Ok(());
174 }
175 let mut versioning_guard = self.versioning.write().await;
176 let versioning = versioning_guard.deref_mut();
177 let mut compaction_group_manager = self.compaction_group_manager.write().await;
178 let current_version = &versioning.current_version;
179 let default_config = compaction_group_manager.default_compaction_config();
180 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
181
182 for (table_id, _) in pairs {
183 let table_id = (*table_id).into();
184 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
185 return Err(Error::CompactionGroup(format!(
186 "table {} already {:?}",
187 table_id, info
188 )));
189 }
190 }
191 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
193 let mut version = HummockVersionTransaction::new(
194 &mut versioning.current_version,
195 &mut versioning.hummock_version_deltas,
196 &mut versioning.table_change_log,
197 self.env.notification_manager(),
198 None,
199 &self.metrics,
200 &self.env.opts,
201 );
202 let mut new_version_delta = version.new_delta();
203
204 let committed_epoch = new_version_delta
205 .latest_version()
206 .state_table_info
207 .info()
208 .values()
209 .map(|info| info.committed_epoch)
210 .max()
211 .unwrap_or(INVALID_EPOCH);
212
213 for (table_id, raw_group_id) in pairs {
214 let table_id = (*table_id).into();
215 let mut group_id = *raw_group_id;
216 if group_id == StaticCompactionGroupId::NewCompactionGroup {
217 let mut is_group_init = false;
218 group_id = *new_compaction_group_id
219 .get_or_try_init(|| async {
220 next_compaction_group_id(&self.env).await.inspect(|_| {
221 is_group_init = true;
222 })
223 })
224 .await?;
225 if is_group_init {
226 let group_deltas = &mut new_version_delta
227 .group_deltas
228 .entry(group_id)
229 .or_default()
230 .group_deltas;
231
232 let config =
233 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
234 Some(config) => config.compaction_config.as_ref().clone(),
235 None => {
236 compaction_groups_txn
237 .create_compaction_groups(group_id, default_config.clone());
238 default_config.as_ref().clone()
239 }
240 };
241
242 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
243 group_config: Some(config),
244 group_id,
245 ..Default::default()
246 }));
247
248 group_deltas.push(group_delta);
249 }
250 }
251 assert!(
252 new_version_delta
253 .state_table_info_delta
254 .insert(
255 table_id,
256 PbStateTableInfoDelta {
257 committed_epoch,
258 compaction_group_id: *raw_group_id,
259 }
260 )
261 .is_none()
262 );
263 }
264 new_version_delta.pre_apply();
265 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
266
267 Ok(())
268 }
269
270 pub async fn unregister_table_ids(
271 &self,
272 table_ids: impl IntoIterator<Item = TableId>,
273 ) -> Result<()> {
274 let table_ids = table_ids.into_iter().collect_vec();
275 if table_ids.is_empty() {
276 return Ok(());
277 }
278
279 {
280 let mut table_write_throughput_statistic_manager =
284 self.table_write_throughput_statistic_manager.write();
285 for &table_id in table_ids.iter().unique() {
286 table_write_throughput_statistic_manager.remove_table(table_id);
287 }
288 }
289
290 let mut versioning_guard = self.versioning.write().await;
291 let versioning = versioning_guard.deref_mut();
292 let mut version = HummockVersionTransaction::new(
293 &mut versioning.current_version,
294 &mut versioning.hummock_version_deltas,
295 &mut versioning.table_change_log,
296 self.env.notification_manager(),
297 None,
298 &self.metrics,
299 &self.env.opts,
300 );
301 let mut new_version_delta = version.new_delta();
302 struct UnregisterGroupChange {
303 remaining_member_count: usize,
304 removed_table_ids: HashSet<TableId>,
305 }
306 let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
307 for table_id in table_ids.into_iter().unique() {
309 let version = new_version_delta.latest_version();
310 let Some(info) = version.state_table_info.info().get(&table_id) else {
311 continue;
312 };
313 let compaction_group_id = info.compaction_group_id;
314
315 let group_change =
316 group_changes
317 .entry(compaction_group_id)
318 .or_insert_with(|| UnregisterGroupChange {
319 remaining_member_count: version
320 .state_table_info
321 .compaction_group_member_tables()
322 .get(&compaction_group_id)
323 .expect("should exist")
324 .len(),
325 removed_table_ids: HashSet::new(),
326 });
327 group_change.remaining_member_count = group_change
328 .remaining_member_count
329 .checked_sub(1)
330 .expect("member table count should be positive");
331 assert!(group_change.removed_table_ids.insert(table_id));
332 new_version_delta.removed_table_ids.insert(table_id);
333 }
334
335 for (group_id, change) in group_changes {
336 if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
337 let max_level = new_version_delta
338 .latest_version()
339 .get_compaction_group_levels(group_id)
340 .levels
341 .len();
342 new_version_delta
343 .group_deltas
344 .entry(group_id)
345 .or_default()
346 .group_deltas
347 .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
348 remove_compaction_group_metrics(&self.metrics, group_id, max_level);
349 self.compaction_state.remove_compaction_group(group_id);
351 } else {
352 new_version_delta
353 .group_deltas
354 .entry(group_id)
355 .or_default()
356 .group_deltas
357 .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
358 }
359 }
360
361 new_version_delta.pre_apply();
362
363 let mut compaction_group_manager = self.compaction_group_manager.write().await;
366 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
367
368 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
369 version.latest_version(),
370 )));
371 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
372
373 Ok(())
375 }
376
377 pub async fn update_compaction_config(
378 &self,
379 compaction_group_ids: &[CompactionGroupId],
380 config_to_update: &[MutableConfig],
381 ) -> Result<()> {
382 {
383 let mut compaction_group_manager = self.compaction_group_manager.write().await;
385 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
386 compaction_groups_txn
387 .update_compaction_config(compaction_group_ids, config_to_update)?;
388 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
389 }
390
391 if config_to_update
392 .iter()
393 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
394 {
395 self.try_update_write_limits(compaction_group_ids).await;
397 }
398
399 Ok(())
400 }
401
402 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
405 let mut versioning_guard = self.versioning.write().await;
406 let versioning = versioning_guard.deref_mut();
407 let current_version = &versioning.current_version;
408 let mut results = vec![];
409 let compaction_group_manager = self.compaction_group_manager.read().await;
410
411 for levels in current_version.levels.values() {
412 let compaction_config = compaction_group_manager
413 .try_get_compaction_group_config(levels.group_id)
414 .unwrap()
415 .compaction_config
416 .as_ref()
417 .clone();
418 let group = CompactionGroupInfo {
419 id: levels.group_id,
420 parent_id: levels.parent_group_id,
421 member_table_ids: current_version
422 .state_table_info
423 .compaction_group_member_table_ids(levels.group_id)
424 .iter()
425 .copied()
426 .collect_vec(),
427 compaction_config: Some(compaction_config),
428 };
429 results.push(group);
430 }
431 results
432 }
433
434 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
435 let mut infos = vec![];
436 {
437 let versioning_guard = self.versioning.read().await;
438 let manager = self.compaction_group_manager.read().await;
439 let version = &versioning_guard.current_version;
440 for group_id in version.levels.keys() {
441 let mut group_info = CompactionGroupStatistic {
442 group_id: *group_id,
443 ..Default::default()
444 };
445 for table_id in version
446 .state_table_info
447 .compaction_group_member_table_ids(*group_id)
448 {
449 let stats_size = versioning_guard
450 .version_stats
451 .table_stats
452 .get(table_id)
453 .map(|stats| stats.total_key_size + stats.total_value_size)
454 .unwrap_or(0);
455 let table_size = std::cmp::max(stats_size, 0) as u64;
456 group_info.group_size += table_size;
457 group_info.table_statistic.insert(*table_id, table_size);
458 group_info.compaction_group_config =
459 manager.try_get_compaction_group_config(*group_id).unwrap();
460 }
461 infos.push(group_info);
462 }
463 };
464 infos
465 }
466
467 pub(crate) async fn initial_compaction_group_config_after_load(
468 &self,
469 versioning_guard: &Versioning,
470 compaction_group_manager: &mut CompactionGroupManager,
471 ) -> Result<()> {
472 let current_version = &versioning_guard.current_version;
474 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
475 let default_config = compaction_group_manager.default_compaction_config();
476 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
477 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
478 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
479
480 Ok(())
481 }
482}
483
484pub(crate) struct CompactionGroupManager {
492 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
493 default_config: Arc<CompactionConfig>,
494 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
496}
497
498impl CompactionGroupManager {
499 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
501 CompactionGroupTransaction::new(&mut self.compaction_groups)
502 }
503
504 #[expect(clippy::type_complexity)]
505 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
506 inner: P,
507 ) -> BTreeMapTransactionInner<
508 CompactionGroupId,
509 CompactionGroup,
510 DerefMutForward<
511 Self,
512 BTreeMap<CompactionGroupId, CompactionGroup>,
513 P,
514 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
515 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
516 >,
517 > {
518 BTreeMapTransactionInner::new(DerefMutForward::new(
519 inner,
520 |mgr| &mgr.compaction_groups,
521 |mgr| &mut mgr.compaction_groups,
522 ))
523 }
524
525 pub(crate) fn try_get_compaction_group_config(
527 &self,
528 compaction_group_id: impl Into<CompactionGroupId>,
529 ) -> Option<CompactionGroup> {
530 self.compaction_groups
531 .get(&compaction_group_id.into())
532 .cloned()
533 }
534
535 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
537 self.default_config.clone()
538 }
539}
540
541fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) -> Result<()> {
542 for item in items {
543 match item {
544 MutableConfig::MaxBytesForLevelBase(c) => {
545 target.max_bytes_for_level_base = *c;
546 }
547 MutableConfig::MaxBytesForLevelMultiplier(c) => {
548 target.max_bytes_for_level_multiplier = *c;
549 }
550 MutableConfig::MaxCompactionBytes(c) => {
551 target.max_compaction_bytes = *c;
552 }
553 MutableConfig::SubLevelMaxCompactionBytes(c) => {
554 target.sub_level_max_compaction_bytes = *c;
555 }
556 MutableConfig::Level0TierCompactFileNumber(c) => {
557 target.level0_tier_compact_file_number = *c;
558 }
559 MutableConfig::TargetFileSizeBase(c) => {
560 target.target_file_size_base = *c;
561 }
562 MutableConfig::CompactionFilterMask(c) => {
563 target.compaction_filter_mask = *c;
564 }
565 MutableConfig::MaxSubCompaction(c) => {
566 target.max_sub_compaction = *c;
567 }
568 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
569 target.level0_stop_write_threshold_sub_level_number = *c;
570 }
571 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
572 target.level0_sub_level_compact_level_count = *c;
573 }
574 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
575 target.level0_overlapping_sub_level_compact_level_count = *c;
576 }
577 MutableConfig::MaxSpaceReclaimBytes(c) => {
578 target.max_space_reclaim_bytes = *c;
579 }
580 MutableConfig::Level0MaxCompactFileNumber(c) => {
581 target.level0_max_compact_file_number = *c;
582 }
583 MutableConfig::EnableEmergencyPicker(c) => {
584 target.enable_emergency_picker = *c;
585 }
586 MutableConfig::TombstoneReclaimRatio(c) => {
587 target.tombstone_reclaim_ratio = *c;
588 }
589 MutableConfig::CompressionAlgorithm(c) => {
590 let level = c.get_level();
591 let max_level = try_u32_max_level(target.max_level)?;
592 if level > max_level {
593 return Err(Error::CompactionGroup(format!(
594 "invalid compression_algorithm level {}, max_level is {}",
595 level, target.max_level
596 )));
597 }
598
599 let Some(algorithm) = target.compression_algorithm.get_mut(level as usize) else {
600 return Err(Error::CompactionGroup(format!(
601 "invalid compression_algorithm level {}, compression_algorithm len is {}",
602 level,
603 target.compression_algorithm.len()
604 )));
605 };
606 algorithm.clone_from(&c.compression_algorithm);
607 }
608 MutableConfig::ResetCompressionAlgorithm(reset) => {
609 if *reset {
610 target.compression_algorithm =
611 default_compaction_config::compression_algorithm_vec(try_u32_max_level(
612 target.max_level,
613 )?);
614 }
615 }
616 MutableConfig::MaxL0CompactLevelCount(c) => {
617 target.max_l0_compact_level_count = Some(*c);
618 }
619 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
620 target.sst_allowed_trivial_move_min_size = Some(*c);
621 }
622 MutableConfig::SplitWeightByVnode(c) => {
623 target.split_weight_by_vnode = *c;
624 }
625 MutableConfig::DisableAutoGroupScheduling(c) => {
626 target.disable_auto_group_scheduling = Some(*c);
627 }
628 MutableConfig::MaxOverlappingLevelSize(c) => {
629 target.max_overlapping_level_size = Some(*c);
630 }
631 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
632 target.sst_allowed_trivial_move_max_count = Some(*c);
633 }
634 MutableConfig::EmergencyLevel0SstFileCount(c) => {
635 target.emergency_level0_sst_file_count = Some(*c);
636 }
637 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
638 target.emergency_level0_sub_level_partition = Some(*c);
639 }
640 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
641 target.level0_stop_write_threshold_max_sst_count = Some(*c);
642 }
643 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
644 target.level0_stop_write_threshold_max_size = Some(*c);
645 }
646 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
647 target.enable_optimize_l0_interval_selection = Some(*c);
648 }
649 #[expect(deprecated)]
650 MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
651 }
653 MutableConfig::MaxKvCountForXor16(c) => {
654 target.max_kv_count_for_xor16 = optional_non_sentinel_u64_config(*c);
655 }
656 MutableConfig::MaxVnodeKeyRangeBytes(c) => {
657 target.max_vnode_key_range_bytes = optional_positive_u64_config(*c);
658 }
659 MutableConfig::SstableFilterKind(c) => {
660 if target.sstable_filter_kind.is_empty() {
661 target.sstable_filter_kind =
662 vec!["xor16".to_owned(); target.max_level as usize + 1];
663 }
664 let idx = c.get_level() as usize;
665 let level_entry = target.sstable_filter_kind.get_mut(idx).ok_or_else(|| {
666 Error::CompactionGroup(format!(
667 "sstable_filter_kind level {} is out of range",
668 idx
669 ))
670 })?;
671 level_entry.clone_from(&c.filter_kind);
672 }
673 MutableConfig::SstableFilterLayout(c) => {
674 if target.sstable_filter_layout.is_empty() {
675 target.sstable_filter_layout =
676 vec!["auto".to_owned(); target.max_level as usize + 1];
677 }
678 let idx = c.get_level() as usize;
679 let level_entry = target.sstable_filter_layout.get_mut(idx).ok_or_else(|| {
680 Error::CompactionGroup(format!(
681 "sstable_filter_layout level {} is out of range",
682 idx
683 ))
684 })?;
685 level_entry.clone_from(&c.layout);
686 }
687 }
688 }
689 Ok(())
690}
691
692fn optional_u64_config(value: u64) -> Option<u64> {
693 (value != u64::MIN && value != u64::MAX).then_some(value)
694}
695
696fn optional_non_sentinel_u64_config(value: u64) -> Option<u64> {
697 (value != u64::MAX).then_some(value)
698}
699
700fn optional_positive_u64_config(value: u64) -> Option<u64> {
701 optional_u64_config(value).filter(|value| *value > 0)
702}
703
704fn try_u32_max_level(max_level: u64) -> Result<u32> {
705 u32::try_from(max_level).map_err(|_| {
706 Error::CompactionGroup(format!(
707 "invalid max_level {}, expect <= {}",
708 max_level,
709 u32::MAX
710 ))
711 })
712}
713
714impl CompactionGroupTransaction<'_> {
715 pub fn try_create_compaction_groups(
717 &mut self,
718 compaction_group_ids: &[CompactionGroupId],
719 config: Arc<CompactionConfig>,
720 ) -> bool {
721 let mut trivial = true;
722 for id in compaction_group_ids {
723 if self.contains_key(id) {
724 continue;
725 }
726 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
727 self.insert(*id, new_entry);
728
729 trivial = false;
730 }
731
732 !trivial
733 }
734
735 pub fn create_compaction_groups(
736 &mut self,
737 compaction_group_id: CompactionGroupId,
738 config: Arc<CompactionConfig>,
739 ) {
740 self.try_create_compaction_groups(&[compaction_group_id], config);
741 }
742
743 pub(crate) fn try_get_compaction_group_config(
745 &self,
746 compaction_group_id: CompactionGroupId,
747 ) -> Option<&CompactionGroup> {
748 self.get(&compaction_group_id)
749 }
750
751 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
753 let stale_group = self
754 .tree_ref()
755 .keys()
756 .cloned()
757 .filter(|k| !existing_groups.contains(k))
758 .collect_vec();
759 if stale_group.is_empty() {
760 return;
761 }
762 for group in stale_group {
763 self.remove(group);
764 }
765 }
766
767 pub(crate) fn update_compaction_config(
768 &mut self,
769 compaction_group_ids: &[CompactionGroupId],
770 config_to_update: &[MutableConfig],
771 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
772 let mut results = HashMap::default();
773 for compaction_group_id in compaction_group_ids.iter().unique() {
774 let group = self.get(compaction_group_id).ok_or_else(|| {
775 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
776 })?;
777 let mut config = group.compaction_config.as_ref().clone();
778 update_compaction_config(&mut config, config_to_update)?;
779 if let Err(reason) = validate_compaction_config(&config) {
780 return Err(Error::CompactionGroup(reason));
781 }
782 let mut new_group = group.clone();
783 new_group.compaction_config = Arc::new(config);
784 self.insert(*compaction_group_id, new_group.clone());
785 results.insert(new_group.group_id(), new_group);
786 }
787
788 Ok(results)
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use std::collections::{BTreeMap, HashSet};
795 use std::sync::Arc;
796
797 use itertools::Itertools;
798 use risingwave_common::id::JobId;
799 use risingwave_hummock_sdk::CompactionGroupId;
800 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
801 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
802 CompressionAlgorithm, SstableFilterKind, SstableFilterLayout,
803 };
804
805 use crate::controller::SqlMetaStore;
806 use crate::hummock::commit_multi_var;
807 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
808 use crate::hummock::error::Result;
809 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
810 use crate::hummock::test_utils::setup_compute_env;
811 use crate::model::{Fragment, StreamJobFragments};
812
813 #[test]
814 fn test_update_compaction_config_filter_kind_layout_backward_compat() {
815 let mut config = CompactionConfigBuilder::new().build();
816 config.sstable_filter_kind.clear();
817 config.sstable_filter_layout.clear();
818
819 super::update_compaction_config(
820 &mut config,
821 &[MutableConfig::SstableFilterKind(SstableFilterKind {
822 level: 0,
823 filter_kind: "xor8".to_owned(),
824 })],
825 )
826 .unwrap();
827 assert_eq!(
828 config.sstable_filter_kind.len(),
829 config.max_level as usize + 1
830 );
831 assert_eq!(config.sstable_filter_kind[0], "xor8");
832
833 super::update_compaction_config(
834 &mut config,
835 &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
836 level: 1,
837 layout: "plain".to_owned(),
838 })],
839 )
840 .unwrap();
841 assert_eq!(
842 config.sstable_filter_layout.len(),
843 config.max_level as usize + 1
844 );
845 assert_eq!(config.sstable_filter_layout[1], "plain");
846 }
847
848 #[test]
849 fn test_update_compaction_config_rejects_out_of_range_level() {
850 let mut config = CompactionConfigBuilder::new().build();
851 let oob = config.max_level as u32 + 1;
852
853 assert!(
854 super::update_compaction_config(
855 &mut config,
856 &[MutableConfig::SstableFilterKind(SstableFilterKind {
857 level: oob,
858 filter_kind: "xor8".to_owned(),
859 })],
860 )
861 .is_err()
862 );
863
864 assert!(
865 super::update_compaction_config(
866 &mut config,
867 &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
868 level: oob,
869 layout: "plain".to_owned(),
870 })],
871 )
872 .is_err()
873 );
874
875 assert!(
876 super::update_compaction_config(
877 &mut config,
878 &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
879 level: oob,
880 compression_algorithm: "Zstd".to_owned(),
881 })],
882 )
883 .is_err()
884 );
885 }
886
887 #[test]
888 fn test_reset_compression_algorithm_false_is_noop() {
889 let mut config = CompactionConfigBuilder::new().build();
890 config.compression_algorithm[3] = "Zstd".to_owned();
891
892 super::update_compaction_config(
893 &mut config,
894 &[MutableConfig::ResetCompressionAlgorithm(false)],
895 )
896 .unwrap();
897
898 assert_eq!(config.compression_algorithm[3], "Zstd");
899 }
900
901 #[tokio::test]
902 async fn test_inner() {
903 let (env, ..) = setup_compute_env(8080).await;
904 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
905 assert_eq!(inner.compaction_groups.len(), 2);
906
907 async fn update_compaction_config(
908 meta: &SqlMetaStore,
909 inner: &mut CompactionGroupManager,
910 cg_ids: &[impl Into<CompactionGroupId> + Copy],
911 config_to_update: &[MutableConfig],
912 ) -> Result<()> {
913 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
914 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
915 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
916 commit_multi_var!(meta, compaction_groups_txn)
917 }
918
919 async fn insert_compaction_group_configs(
920 meta: &SqlMetaStore,
921 inner: &mut CompactionGroupManager,
922 cg_ids: &[u64],
923 ) {
924 let default_config = inner.default_compaction_config();
925 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
926 if compaction_groups_txn.try_create_compaction_groups(
927 &cg_ids.iter().copied().map_into().collect_vec(),
928 default_config,
929 ) {
930 commit_multi_var!(meta, compaction_groups_txn).unwrap();
931 }
932 }
933
934 async fn insert_compaction_group_config_with_max_level(
935 meta: &SqlMetaStore,
936 inner: &mut CompactionGroupManager,
937 cg_id: u64,
938 max_level: u64,
939 ) {
940 let mut config = inner.default_compaction_config().as_ref().clone();
941 config.max_level = max_level;
942 config.compression_algorithm =
943 super::default_compaction_config::compression_algorithm_vec(
944 super::try_u32_max_level(max_level).expect("max_level should fit u32 in test"),
945 );
946 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
947 compaction_groups_txn.create_compaction_groups(cg_id.into(), Arc::new(config));
948 commit_multi_var!(meta, compaction_groups_txn).unwrap();
949 }
950
951 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
952 .await
953 .unwrap_err();
954 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
955 assert_eq!(inner.compaction_groups.len(), 4);
956 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
957 assert_eq!(inner.compaction_groups.len(), 4);
958
959 update_compaction_config(
960 env.meta_store_ref(),
961 &mut inner,
962 &[100, 200],
963 &[MutableConfig::MaxSubCompaction(123)],
964 )
965 .await
966 .unwrap();
967 assert_eq!(inner.compaction_groups.len(), 4);
968 assert_eq!(
969 inner
970 .try_get_compaction_group_config(100)
971 .unwrap()
972 .compaction_config
973 .max_sub_compaction,
974 123
975 );
976 assert_eq!(
977 inner
978 .try_get_compaction_group_config(200)
979 .unwrap()
980 .compaction_config
981 .max_sub_compaction,
982 123
983 );
984
985 insert_compaction_group_config_with_max_level(env.meta_store_ref(), &mut inner, 300, 4)
986 .await;
987 update_compaction_config(
988 env.meta_store_ref(),
989 &mut inner,
990 &[300],
991 &[MutableConfig::ResetCompressionAlgorithm(true)],
992 )
993 .await
994 .unwrap();
995 assert_eq!(
996 inner
997 .try_get_compaction_group_config(300)
998 .unwrap()
999 .compaction_config
1000 .compression_algorithm,
1001 super::default_compaction_config::compression_algorithm_vec(4)
1002 );
1003 let err = update_compaction_config(
1004 env.meta_store_ref(),
1005 &mut inner,
1006 &[300],
1007 &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
1008 level: 6,
1009 compression_algorithm: "Zstd".to_owned(),
1010 })],
1011 )
1012 .await
1013 .unwrap_err();
1014 assert!(
1015 err.to_string()
1016 .contains("invalid compression_algorithm level 6")
1017 );
1018
1019 update_compaction_config(
1020 env.meta_store_ref(),
1021 &mut inner,
1022 &[100],
1023 &[MutableConfig::MaxKvCountForXor16(0)],
1024 )
1025 .await
1026 .unwrap();
1027 assert_eq!(
1028 inner
1029 .try_get_compaction_group_config(100)
1030 .unwrap()
1031 .compaction_config
1032 .max_kv_count_for_xor16,
1033 Some(0)
1034 );
1035 update_compaction_config(
1036 env.meta_store_ref(),
1037 &mut inner,
1038 &[100],
1039 &[MutableConfig::MaxKvCountForXor16(1024)],
1040 )
1041 .await
1042 .unwrap();
1043 assert_eq!(
1044 inner
1045 .try_get_compaction_group_config(100)
1046 .unwrap()
1047 .compaction_config
1048 .max_kv_count_for_xor16,
1049 Some(1024)
1050 );
1051 update_compaction_config(
1052 env.meta_store_ref(),
1053 &mut inner,
1054 &[100],
1055 &[MutableConfig::MaxKvCountForXor16(u64::MAX)],
1056 )
1057 .await
1058 .unwrap();
1059 assert_eq!(
1060 inner
1061 .try_get_compaction_group_config(100)
1062 .unwrap()
1063 .compaction_config
1064 .max_kv_count_for_xor16,
1065 None
1066 );
1067
1068 update_compaction_config(
1069 env.meta_store_ref(),
1070 &mut inner,
1071 &[100],
1072 &[MutableConfig::MaxVnodeKeyRangeBytes(0)],
1073 )
1074 .await
1075 .unwrap();
1076 assert_eq!(
1077 inner
1078 .try_get_compaction_group_config(100)
1079 .unwrap()
1080 .compaction_config
1081 .max_vnode_key_range_bytes,
1082 None
1083 );
1084 update_compaction_config(
1085 env.meta_store_ref(),
1086 &mut inner,
1087 &[100],
1088 &[MutableConfig::MaxVnodeKeyRangeBytes(1024)],
1089 )
1090 .await
1091 .unwrap();
1092 assert_eq!(
1093 inner
1094 .try_get_compaction_group_config(100)
1095 .unwrap()
1096 .compaction_config
1097 .max_vnode_key_range_bytes,
1098 Some(1024)
1099 );
1100 }
1101
1102 #[tokio::test]
1103 async fn test_manager() {
1104 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
1105 let table_fragment_1 = StreamJobFragments::for_test(
1106 JobId::new(10),
1107 BTreeMap::from([(
1108 1.into(),
1109 Fragment {
1110 fragment_id: 1.into(),
1111 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
1112 ..Default::default()
1113 },
1114 )]),
1115 );
1116 let table_fragment_2 = StreamJobFragments::for_test(
1117 JobId::new(20),
1118 BTreeMap::from([(
1119 2.into(),
1120 Fragment {
1121 fragment_id: 2.into(),
1122 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
1123 ..Default::default()
1124 },
1125 )]),
1126 );
1127
1128 let registered_number = || async {
1130 compaction_group_manager
1131 .list_compaction_group()
1132 .await
1133 .iter()
1134 .map(|cg| cg.member_table_ids.len())
1135 .sum::<usize>()
1136 };
1137 let group_number =
1138 || async { compaction_group_manager.list_compaction_group().await.len() };
1139 assert_eq!(registered_number().await, 0);
1140
1141 compaction_group_manager
1142 .register_table_fragments(
1143 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1144 table_fragment_1
1145 .internal_table_ids()
1146 .into_iter()
1147 .map_into()
1148 .collect(),
1149 )
1150 .await
1151 .unwrap();
1152 assert_eq!(registered_number().await, 4);
1153 compaction_group_manager
1154 .register_table_fragments(
1155 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
1156 table_fragment_2
1157 .internal_table_ids()
1158 .into_iter()
1159 .map_into()
1160 .collect(),
1161 )
1162 .await
1163 .unwrap();
1164 assert_eq!(registered_number().await, 8);
1165
1166 compaction_group_manager
1168 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
1169 .await;
1170 assert_eq!(registered_number().await, 4);
1171
1172 compaction_group_manager
1174 .purge(&table_fragment_2.all_table_ids().collect())
1175 .await
1176 .unwrap();
1177 assert_eq!(registered_number().await, 4);
1178 compaction_group_manager
1179 .purge(&HashSet::new())
1180 .await
1181 .unwrap();
1182 assert_eq!(registered_number().await, 0);
1183
1184 assert_eq!(group_number().await, 2);
1185
1186 compaction_group_manager
1187 .register_table_fragments(
1188 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1189 table_fragment_1
1190 .internal_table_ids()
1191 .into_iter()
1192 .map_into()
1193 .collect(),
1194 )
1195 .await
1196 .unwrap();
1197 assert_eq!(registered_number().await, 4);
1198 assert_eq!(group_number().await, 2);
1199
1200 compaction_group_manager
1201 .unregister_table_fragments_vec(&[table_fragment_1])
1202 .await;
1203 assert_eq!(registered_number().await, 0);
1204 assert_eq!(group_number().await, 2);
1205 }
1206}