1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::config::meta::default::compaction_config as default_compaction_config;
22use risingwave_common::util::epoch::INVALID_EPOCH;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_hummock_sdk::filter_utils::{
27 parse_sstable_filter_layout, parse_sstable_filter_type,
28};
29use risingwave_hummock_sdk::version::GroupDelta;
30use risingwave_meta_model::compaction_config;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::write_limits::WriteLimit;
33use risingwave_pb::hummock::{
34 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
35};
36use sea_orm::EntityTrait;
37use tokio::sync::OnceCell;
38
39use super::CompactionGroupStatistic;
40use crate::hummock::compaction::compaction_config::{
41 CompactionConfigBuilder, validate_compaction_config,
42};
43use crate::hummock::error::{Error, Result};
44use crate::hummock::manager::transaction::HummockVersionTransaction;
45use crate::hummock::manager::versioning::Versioning;
46use crate::hummock::manager::{HummockManager, commit_multi_var};
47use crate::hummock::metrics_utils::remove_compaction_group_metrics;
48use crate::hummock::model::CompactionGroup;
49use crate::hummock::sequence::next_compaction_group_id;
50use crate::manager::MetaSrvEnv;
51use crate::model::{
52 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
53};
54
55type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
56
57impl CompactionGroupManager {
58 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
59 let default_config = match env.opts.compaction_config.as_ref() {
60 None => CompactionConfigBuilder::new().build(),
61 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
62 };
63 Self::new_with_config(env, default_config).await
64 }
65
66 pub(crate) async fn new_with_config(
67 env: &MetaSrvEnv,
68 default_config: CompactionConfig,
69 ) -> Result<CompactionGroupManager> {
70 let mut compaction_group_manager = CompactionGroupManager {
71 compaction_groups: BTreeMap::new(),
72 default_config: Arc::new(default_config),
73 write_limit: Default::default(),
74 };
75
76 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
77 compaction_config::Entity::find()
78 .all(&env.meta_store_ref().conn)
79 .await
80 .map_err(MetadataModelError::from)?
81 .into_iter()
82 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
83 .collect();
84
85 compaction_group_manager.init(loaded_compaction_groups);
86 Ok(compaction_group_manager)
87 }
88
89 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
90 if !loaded_compaction_groups.is_empty() {
91 self.compaction_groups = loaded_compaction_groups;
92 }
93 }
94}
95
96impl HummockManager {
97 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
100 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
101 }
102
103 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
105 self.compaction_group_manager
106 .read()
107 .await
108 .compaction_groups
109 .clone()
110 }
111
112 #[cfg(test)]
113 pub async fn register_table_fragments(
115 &self,
116 mv_table: Option<TableId>,
117 mut internal_tables: Vec<TableId>,
118 ) -> Result<()> {
119 let mut pairs = vec![];
120 if let Some(mv_table) = mv_table {
121 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
122 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
123 }
124 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
126 }
127 for table_id in internal_tables {
129 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
130 }
131 self.register_table_ids_for_test(&pairs).await?;
132 Ok(())
133 }
134
135 #[cfg(test)]
136 pub async fn unregister_table_fragments_vec(
138 &self,
139 table_fragments: &[crate::model::StreamJobFragments],
140 ) {
141 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
142 .await
143 .unwrap();
144 }
145
146 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
150 let to_unregister = self
151 .versioning
152 .read()
153 .await
154 .current_version
155 .state_table_info
156 .info()
157 .keys()
158 .cloned()
159 .filter(|table_id| !valid_ids.contains(table_id))
160 .collect_vec();
161
162 self.unregister_table_ids(to_unregister).await
165 }
166
167 pub async fn register_table_ids_for_test(
172 &self,
173 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
174 ) -> Result<()> {
175 if pairs.is_empty() {
176 return Ok(());
177 }
178 let mut versioning_guard = self.versioning.write().await;
179 let versioning = versioning_guard.deref_mut();
180 let mut compaction_group_manager = self.compaction_group_manager.write().await;
181 let current_version = &versioning.current_version;
182 let default_config = compaction_group_manager.default_compaction_config();
183 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
184
185 for (table_id, _) in pairs {
186 let table_id = (*table_id).into();
187 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
188 return Err(Error::CompactionGroup(format!(
189 "table {} already {:?}",
190 table_id, info
191 )));
192 }
193 }
194 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
196 let mut version = HummockVersionTransaction::new(
197 &mut versioning.current_version,
198 &mut versioning.hummock_version_deltas,
199 &mut versioning.table_change_log,
200 self.env.notification_manager(),
201 None,
202 &self.metrics,
203 &self.env.opts,
204 );
205 let mut new_version_delta = version.new_delta();
206
207 let committed_epoch = new_version_delta
208 .latest_version()
209 .state_table_info
210 .info()
211 .values()
212 .map(|info| info.committed_epoch)
213 .max()
214 .unwrap_or(INVALID_EPOCH);
215
216 for (table_id, raw_group_id) in pairs {
217 let table_id = (*table_id).into();
218 let mut group_id = *raw_group_id;
219 if group_id == StaticCompactionGroupId::NewCompactionGroup {
220 let mut is_group_init = false;
221 group_id = *new_compaction_group_id
222 .get_or_try_init(|| async {
223 next_compaction_group_id(&self.env).await.inspect(|_| {
224 is_group_init = true;
225 })
226 })
227 .await?;
228 if is_group_init {
229 let group_deltas = &mut new_version_delta
230 .group_deltas
231 .entry(group_id)
232 .or_default()
233 .group_deltas;
234
235 let config =
236 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
237 Some(config) => config.compaction_config.as_ref().clone(),
238 None => {
239 compaction_groups_txn
240 .create_compaction_groups(group_id, default_config.clone());
241 default_config.as_ref().clone()
242 }
243 };
244
245 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
246 group_config: Some(config),
247 group_id,
248 ..Default::default()
249 }));
250
251 group_deltas.push(group_delta);
252 }
253 }
254 assert!(
255 new_version_delta
256 .state_table_info_delta
257 .insert(
258 table_id,
259 PbStateTableInfoDelta {
260 committed_epoch,
261 compaction_group_id: *raw_group_id,
262 }
263 )
264 .is_none()
265 );
266 }
267 new_version_delta.pre_apply();
268 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
269
270 Ok(())
271 }
272
273 pub async fn unregister_table_ids(
274 &self,
275 table_ids: impl IntoIterator<Item = TableId>,
276 ) -> Result<()> {
277 let table_ids = table_ids.into_iter().collect_vec();
278 if table_ids.is_empty() {
279 return Ok(());
280 }
281
282 {
283 let mut table_write_throughput_statistic_manager =
287 self.table_write_throughput_statistic_manager.write();
288 for &table_id in table_ids.iter().unique() {
289 table_write_throughput_statistic_manager.remove_table(table_id);
290 }
291 }
292
293 let mut versioning_guard = self.versioning.write().await;
294 let versioning = versioning_guard.deref_mut();
295 let mut version = HummockVersionTransaction::new(
296 &mut versioning.current_version,
297 &mut versioning.hummock_version_deltas,
298 &mut versioning.table_change_log,
299 self.env.notification_manager(),
300 None,
301 &self.metrics,
302 &self.env.opts,
303 );
304 let mut new_version_delta = version.new_delta();
305 struct UnregisterGroupChange {
306 remaining_member_count: usize,
307 removed_table_ids: HashSet<TableId>,
308 }
309 let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
310 for table_id in table_ids.into_iter().unique() {
312 let version = new_version_delta.latest_version();
313 let Some(info) = version.state_table_info.info().get(&table_id) else {
314 continue;
315 };
316 let compaction_group_id = info.compaction_group_id;
317
318 let group_change =
319 group_changes
320 .entry(compaction_group_id)
321 .or_insert_with(|| UnregisterGroupChange {
322 remaining_member_count: version
323 .state_table_info
324 .compaction_group_member_tables()
325 .get(&compaction_group_id)
326 .expect("should exist")
327 .len(),
328 removed_table_ids: HashSet::new(),
329 });
330 group_change.remaining_member_count = group_change
331 .remaining_member_count
332 .checked_sub(1)
333 .expect("member table count should be positive");
334 assert!(group_change.removed_table_ids.insert(table_id));
335 new_version_delta.removed_table_ids.insert(table_id);
336 }
337
338 for (group_id, change) in group_changes {
339 if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
340 let max_level = new_version_delta
341 .latest_version()
342 .get_compaction_group_levels(group_id)
343 .levels
344 .len();
345 new_version_delta
346 .group_deltas
347 .entry(group_id)
348 .or_default()
349 .group_deltas
350 .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
351 remove_compaction_group_metrics(&self.metrics, group_id, max_level);
352 self.compaction_state.remove_compaction_group(group_id);
354 } else {
355 new_version_delta
356 .group_deltas
357 .entry(group_id)
358 .or_default()
359 .group_deltas
360 .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
361 }
362 }
363
364 new_version_delta.pre_apply();
365
366 let mut compaction_group_manager = self.compaction_group_manager.write().await;
369 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
370
371 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
372 version.latest_version(),
373 )));
374 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
375
376 Ok(())
378 }
379
380 pub async fn update_compaction_config(
381 &self,
382 compaction_group_ids: &[CompactionGroupId],
383 config_to_update: &[MutableConfig],
384 ) -> Result<()> {
385 {
386 let mut compaction_group_manager = self.compaction_group_manager.write().await;
388 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
389 compaction_groups_txn
390 .update_compaction_config(compaction_group_ids, config_to_update)?;
391 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
392 }
393
394 if config_to_update
395 .iter()
396 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
397 {
398 self.try_update_write_limits(compaction_group_ids).await;
400 }
401
402 Ok(())
403 }
404
405 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
408 let mut versioning_guard = self.versioning.write().await;
409 let versioning = versioning_guard.deref_mut();
410 let current_version = &versioning.current_version;
411 let mut results = vec![];
412 let compaction_group_manager = self.compaction_group_manager.read().await;
413
414 for levels in current_version.levels.values() {
415 let compaction_config = compaction_group_manager
416 .try_get_compaction_group_config(levels.group_id)
417 .unwrap()
418 .compaction_config
419 .as_ref()
420 .clone();
421 let group = CompactionGroupInfo {
422 id: levels.group_id,
423 parent_id: levels.parent_group_id,
424 member_table_ids: current_version
425 .state_table_info
426 .compaction_group_member_table_ids(levels.group_id)
427 .iter()
428 .copied()
429 .collect_vec(),
430 compaction_config: Some(compaction_config),
431 };
432 results.push(group);
433 }
434 results
435 }
436
437 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
438 let mut infos = vec![];
439 {
440 let versioning_guard = self.versioning.read().await;
441 let manager = self.compaction_group_manager.read().await;
442 let version = &versioning_guard.current_version;
443 for group_id in version.levels.keys() {
444 let mut group_info = CompactionGroupStatistic {
445 group_id: *group_id,
446 ..Default::default()
447 };
448 for table_id in version
449 .state_table_info
450 .compaction_group_member_table_ids(*group_id)
451 {
452 let stats_size = versioning_guard
453 .version_stats
454 .table_stats
455 .get(table_id)
456 .map(|stats| stats.total_key_size + stats.total_value_size)
457 .unwrap_or(0);
458 let table_size = std::cmp::max(stats_size, 0) as u64;
459 group_info.group_size += table_size;
460 group_info.table_statistic.insert(*table_id, table_size);
461 group_info.compaction_group_config =
462 manager.try_get_compaction_group_config(*group_id).unwrap();
463 }
464 infos.push(group_info);
465 }
466 };
467 infos
468 }
469
470 pub(crate) async fn initial_compaction_group_config_after_load(
471 &self,
472 versioning_guard: &Versioning,
473 compaction_group_manager: &mut CompactionGroupManager,
474 ) -> Result<()> {
475 let current_version = &versioning_guard.current_version;
477 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
478 let default_config = compaction_group_manager.default_compaction_config();
479 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
480 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
481 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
482
483 Ok(())
484 }
485}
486
487pub(crate) struct CompactionGroupManager {
495 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
496 default_config: Arc<CompactionConfig>,
497 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
499}
500
501impl CompactionGroupManager {
502 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
504 CompactionGroupTransaction::new(&mut self.compaction_groups)
505 }
506
507 #[expect(clippy::type_complexity)]
508 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
509 inner: P,
510 ) -> BTreeMapTransactionInner<
511 CompactionGroupId,
512 CompactionGroup,
513 DerefMutForward<
514 Self,
515 BTreeMap<CompactionGroupId, CompactionGroup>,
516 P,
517 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
518 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
519 >,
520 > {
521 BTreeMapTransactionInner::new(DerefMutForward::new(
522 inner,
523 |mgr| &mgr.compaction_groups,
524 |mgr| &mut mgr.compaction_groups,
525 ))
526 }
527
528 pub(crate) fn try_get_compaction_group_config(
530 &self,
531 compaction_group_id: impl Into<CompactionGroupId>,
532 ) -> Option<CompactionGroup> {
533 self.compaction_groups
534 .get(&compaction_group_id.into())
535 .cloned()
536 }
537
538 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
540 self.default_config.clone()
541 }
542}
543
544fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) -> Result<()> {
545 for item in items {
546 match item {
547 MutableConfig::MaxBytesForLevelBase(c) => {
548 target.max_bytes_for_level_base = *c;
549 }
550 MutableConfig::MaxBytesForLevelMultiplier(c) => {
551 target.max_bytes_for_level_multiplier = *c;
552 }
553 MutableConfig::MaxCompactionBytes(c) => {
554 target.max_compaction_bytes = *c;
555 }
556 MutableConfig::SubLevelMaxCompactionBytes(c) => {
557 target.sub_level_max_compaction_bytes = *c;
558 }
559 MutableConfig::Level0TierCompactFileNumber(c) => {
560 target.level0_tier_compact_file_number = *c;
561 }
562 MutableConfig::TargetFileSizeBase(c) => {
563 target.target_file_size_base = *c;
564 }
565 MutableConfig::CompactionFilterMask(c) => {
566 target.compaction_filter_mask = *c;
567 }
568 MutableConfig::MaxSubCompaction(c) => {
569 target.max_sub_compaction = *c;
570 }
571 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
572 target.level0_stop_write_threshold_sub_level_number = *c;
573 }
574 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
575 target.level0_sub_level_compact_level_count = *c;
576 }
577 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
578 target.level0_overlapping_sub_level_compact_level_count = *c;
579 }
580 MutableConfig::MaxSpaceReclaimBytes(c) => {
581 target.max_space_reclaim_bytes = *c;
582 }
583 MutableConfig::Level0MaxCompactFileNumber(c) => {
584 target.level0_max_compact_file_number = *c;
585 }
586 MutableConfig::EnableEmergencyPicker(c) => {
587 target.enable_emergency_picker = *c;
588 }
589 MutableConfig::TombstoneReclaimRatio(c) => {
590 target.tombstone_reclaim_ratio = *c;
591 }
592 MutableConfig::CompressionAlgorithm(c) => {
593 let level = c.get_level();
594 let max_level = try_u32_max_level(target.max_level)?;
595 if level > max_level {
596 return Err(Error::CompactionGroup(format!(
597 "invalid compression_algorithm level {}, max_level is {}",
598 level, target.max_level
599 )));
600 }
601
602 let Some(algorithm) = target.compression_algorithm.get_mut(level as usize) else {
603 return Err(Error::CompactionGroup(format!(
604 "invalid compression_algorithm level {}, compression_algorithm len is {}",
605 level,
606 target.compression_algorithm.len()
607 )));
608 };
609 algorithm.clone_from(&c.compression_algorithm);
610 }
611 MutableConfig::ResetCompressionAlgorithm(reset) => {
612 if *reset {
613 target.compression_algorithm =
614 default_compaction_config::compression_algorithm_vec(try_u32_max_level(
615 target.max_level,
616 )?);
617 }
618 }
619 MutableConfig::MaxL0CompactLevelCount(c) => {
620 target.max_l0_compact_level_count = Some(*c);
621 }
622 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
623 target.sst_allowed_trivial_move_min_size = Some(*c);
624 }
625 MutableConfig::SplitWeightByVnode(c) => {
626 target.split_weight_by_vnode = *c;
627 }
628 MutableConfig::DisableAutoGroupScheduling(c) => {
629 target.disable_auto_group_scheduling = Some(*c);
630 }
631 MutableConfig::MaxOverlappingLevelSize(c) => {
632 target.max_overlapping_level_size = Some(*c);
633 }
634 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
635 target.sst_allowed_trivial_move_max_count = Some(*c);
636 }
637 MutableConfig::EmergencyLevel0SstFileCount(c) => {
638 target.emergency_level0_sst_file_count = Some(*c);
639 }
640 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
641 target.emergency_level0_sub_level_partition = Some(*c);
642 }
643 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
644 target.level0_stop_write_threshold_max_sst_count = Some(*c);
645 }
646 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
647 target.level0_stop_write_threshold_max_size = Some(*c);
648 }
649 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
650 target.enable_optimize_l0_interval_selection = Some(*c);
651 }
652 #[expect(deprecated)]
653 MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
654 }
656 MutableConfig::MaxKvCountForXor16(c) => {
657 target.max_kv_count_for_xor16 = optional_non_sentinel_u64_config(*c);
658 }
659 MutableConfig::MaxVnodeKeyRangeBytes(c) => {
660 target.max_vnode_key_range_bytes = optional_positive_u64_config(*c);
661 }
662 MutableConfig::SstableFilterType(c) => {
663 parse_sstable_filter_type(&c.filter_type).map_err(Error::CompactionGroup)?;
664 if target.sstable_filter_type.is_empty() {
665 target.sstable_filter_type = default_compaction_config::sstable_filter_type();
666 target
667 .sstable_filter_type
668 .resize(target.max_level as usize + 1, "xor16".to_owned());
669 }
670 let idx = c.get_level() as usize;
671 let level_entry = target.sstable_filter_type.get_mut(idx).ok_or_else(|| {
672 Error::CompactionGroup(format!(
673 "sstable_filter_type level {} is out of range",
674 idx
675 ))
676 })?;
677 level_entry.clone_from(&c.filter_type);
678 }
679 MutableConfig::SstableFilterLayout(c) => {
680 parse_sstable_filter_layout(&c.layout).map_err(Error::CompactionGroup)?;
681 if target.sstable_filter_layout.is_empty() {
682 target.sstable_filter_layout =
683 default_compaction_config::sstable_filter_layout();
684 target
685 .sstable_filter_layout
686 .resize(target.max_level as usize + 1, "blocked".to_owned());
687 }
688 let idx = c.get_level() as usize;
689 let level_entry = target.sstable_filter_layout.get_mut(idx).ok_or_else(|| {
690 Error::CompactionGroup(format!(
691 "sstable_filter_layout level {} is out of range",
692 idx
693 ))
694 })?;
695 level_entry.clone_from(&c.layout);
696 }
697 }
698 }
699 Ok(())
700}
701
702fn optional_u64_config(value: u64) -> Option<u64> {
703 (value != u64::MIN && value != u64::MAX).then_some(value)
704}
705
706fn optional_non_sentinel_u64_config(value: u64) -> Option<u64> {
707 (value != u64::MAX).then_some(value)
708}
709
710fn optional_positive_u64_config(value: u64) -> Option<u64> {
711 optional_u64_config(value).filter(|value| *value > 0)
712}
713
714fn try_u32_max_level(max_level: u64) -> Result<u32> {
715 u32::try_from(max_level).map_err(|_| {
716 Error::CompactionGroup(format!(
717 "invalid max_level {}, expect <= {}",
718 max_level,
719 u32::MAX
720 ))
721 })
722}
723
724impl CompactionGroupTransaction<'_> {
725 pub fn try_create_compaction_groups(
727 &mut self,
728 compaction_group_ids: &[CompactionGroupId],
729 config: Arc<CompactionConfig>,
730 ) -> bool {
731 let mut trivial = true;
732 for id in compaction_group_ids {
733 if self.contains_key(id) {
734 continue;
735 }
736 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
737 self.insert(*id, new_entry);
738
739 trivial = false;
740 }
741
742 !trivial
743 }
744
745 pub fn create_compaction_groups(
746 &mut self,
747 compaction_group_id: CompactionGroupId,
748 config: Arc<CompactionConfig>,
749 ) {
750 self.try_create_compaction_groups(&[compaction_group_id], config);
751 }
752
753 pub(crate) fn try_get_compaction_group_config(
755 &self,
756 compaction_group_id: CompactionGroupId,
757 ) -> Option<&CompactionGroup> {
758 self.get(&compaction_group_id)
759 }
760
761 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
763 let stale_group = self
764 .tree_ref()
765 .keys()
766 .cloned()
767 .filter(|k| !existing_groups.contains(k))
768 .collect_vec();
769 if stale_group.is_empty() {
770 return;
771 }
772 for group in stale_group {
773 self.remove(group);
774 }
775 }
776
777 pub(crate) fn update_compaction_config(
778 &mut self,
779 compaction_group_ids: &[CompactionGroupId],
780 config_to_update: &[MutableConfig],
781 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
782 let mut results = HashMap::default();
783 for compaction_group_id in compaction_group_ids.iter().unique() {
784 let group = self.get(compaction_group_id).ok_or_else(|| {
785 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
786 })?;
787 let mut config = group.compaction_config.as_ref().clone();
788 update_compaction_config(&mut config, config_to_update)?;
789 if let Err(reason) = validate_compaction_config(&config) {
790 return Err(Error::CompactionGroup(reason));
791 }
792 let mut new_group = group.clone();
793 new_group.compaction_config = Arc::new(config);
794 self.insert(*compaction_group_id, new_group.clone());
795 results.insert(new_group.group_id(), new_group);
796 }
797
798 Ok(results)
799 }
800}
801
802#[cfg(test)]
803mod tests {
804 use std::collections::{BTreeMap, HashSet};
805 use std::sync::Arc;
806
807 use itertools::Itertools;
808 use risingwave_common::id::JobId;
809 use risingwave_hummock_sdk::CompactionGroupId;
810 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
811 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::{
812 CompressionAlgorithm, SstableFilterLayout, SstableFilterType,
813 };
814
815 use crate::controller::SqlMetaStore;
816 use crate::hummock::commit_multi_var;
817 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
818 use crate::hummock::error::Result;
819 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
820 use crate::hummock::test_utils::setup_compute_env;
821 use crate::model::{Fragment, StreamJobFragments};
822
823 #[test]
824 fn test_update_compaction_config_filter_type_layout_backward_compat() {
825 let mut config = CompactionConfigBuilder::new().build();
826 config.sstable_filter_type.clear();
827 config.sstable_filter_layout.clear();
828
829 super::update_compaction_config(
830 &mut config,
831 &[MutableConfig::SstableFilterType(SstableFilterType {
832 level: 0,
833 filter_type: "xor8".to_owned(),
834 })],
835 )
836 .unwrap();
837 assert_eq!(
838 config.sstable_filter_type.len(),
839 config.max_level as usize + 1
840 );
841 assert_eq!(config.sstable_filter_type[0], "xor8");
842 assert_eq!(config.sstable_filter_type[5], "xor8");
843
844 super::update_compaction_config(
845 &mut config,
846 &[MutableConfig::SstableFilterType(SstableFilterType {
847 level: 1,
848 filter_type: "none".to_owned(),
849 })],
850 )
851 .unwrap();
852 assert_eq!(config.sstable_filter_type[1], "none");
853
854 super::update_compaction_config(
855 &mut config,
856 &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
857 level: 1,
858 layout: "plain".to_owned(),
859 })],
860 )
861 .unwrap();
862 assert_eq!(
863 config.sstable_filter_layout.len(),
864 config.max_level as usize + 1
865 );
866 assert_eq!(config.sstable_filter_layout[1], "plain");
867 assert_eq!(config.sstable_filter_layout[2], "blocked");
868 }
869
870 #[test]
871 fn test_update_compaction_config_rejects_out_of_range_level() {
872 let mut config = CompactionConfigBuilder::new().build();
873 let oob = config.max_level as u32 + 1;
874
875 assert!(
876 super::update_compaction_config(
877 &mut config,
878 &[MutableConfig::SstableFilterType(SstableFilterType {
879 level: oob,
880 filter_type: "xor8".to_owned(),
881 })],
882 )
883 .is_err()
884 );
885
886 assert!(
887 super::update_compaction_config(
888 &mut config,
889 &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
890 level: oob,
891 layout: "plain".to_owned(),
892 })],
893 )
894 .is_err()
895 );
896
897 assert!(
898 super::update_compaction_config(
899 &mut config,
900 &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
901 level: oob,
902 compression_algorithm: "Zstd".to_owned(),
903 })],
904 )
905 .is_err()
906 );
907 }
908
909 #[test]
910 fn test_update_compaction_config_rejects_invalid_filter_metadata() {
911 let mut config = CompactionConfigBuilder::new().build();
912
913 assert!(
914 super::update_compaction_config(
915 &mut config,
916 &[MutableConfig::SstableFilterType(SstableFilterType {
917 level: 0,
918 filter_type: "unknown".to_owned(),
919 })],
920 )
921 .is_err()
922 );
923
924 let mut config = CompactionConfigBuilder::new().build();
925
926 assert!(
927 super::update_compaction_config(
928 &mut config,
929 &[MutableConfig::SstableFilterLayout(SstableFilterLayout {
930 level: 0,
931 layout: "unknown".to_owned(),
932 })],
933 )
934 .is_err()
935 );
936 }
937
938 #[test]
939 fn test_reset_compression_algorithm_false_is_noop() {
940 let mut config = CompactionConfigBuilder::new().build();
941 config.compression_algorithm[3] = "Zstd".to_owned();
942
943 super::update_compaction_config(
944 &mut config,
945 &[MutableConfig::ResetCompressionAlgorithm(false)],
946 )
947 .unwrap();
948
949 assert_eq!(config.compression_algorithm[3], "Zstd");
950 }
951
952 #[tokio::test]
953 async fn test_inner() {
954 let (env, ..) = setup_compute_env(8080).await;
955 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
956 assert_eq!(inner.compaction_groups.len(), 2);
957
958 async fn update_compaction_config(
959 meta: &SqlMetaStore,
960 inner: &mut CompactionGroupManager,
961 cg_ids: &[impl Into<CompactionGroupId> + Copy],
962 config_to_update: &[MutableConfig],
963 ) -> Result<()> {
964 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
965 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
966 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
967 commit_multi_var!(meta, compaction_groups_txn)
968 }
969
970 async fn insert_compaction_group_configs(
971 meta: &SqlMetaStore,
972 inner: &mut CompactionGroupManager,
973 cg_ids: &[u64],
974 ) {
975 let default_config = inner.default_compaction_config();
976 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
977 if compaction_groups_txn.try_create_compaction_groups(
978 &cg_ids.iter().copied().map_into().collect_vec(),
979 default_config,
980 ) {
981 commit_multi_var!(meta, compaction_groups_txn).unwrap();
982 }
983 }
984
985 async fn insert_compaction_group_config_with_max_level(
986 meta: &SqlMetaStore,
987 inner: &mut CompactionGroupManager,
988 cg_id: u64,
989 max_level: u64,
990 ) {
991 let mut config = inner.default_compaction_config().as_ref().clone();
992 config.max_level = max_level;
993 config.compression_algorithm =
994 super::default_compaction_config::compression_algorithm_vec(
995 super::try_u32_max_level(max_level).expect("max_level should fit u32 in test"),
996 );
997 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
998 compaction_groups_txn.create_compaction_groups(cg_id.into(), Arc::new(config));
999 commit_multi_var!(meta, compaction_groups_txn).unwrap();
1000 }
1001
1002 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
1003 .await
1004 .unwrap_err();
1005 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
1006 assert_eq!(inner.compaction_groups.len(), 4);
1007 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
1008 assert_eq!(inner.compaction_groups.len(), 4);
1009
1010 update_compaction_config(
1011 env.meta_store_ref(),
1012 &mut inner,
1013 &[100, 200],
1014 &[MutableConfig::MaxSubCompaction(123)],
1015 )
1016 .await
1017 .unwrap();
1018 assert_eq!(inner.compaction_groups.len(), 4);
1019 assert_eq!(
1020 inner
1021 .try_get_compaction_group_config(100)
1022 .unwrap()
1023 .compaction_config
1024 .max_sub_compaction,
1025 123
1026 );
1027 assert_eq!(
1028 inner
1029 .try_get_compaction_group_config(200)
1030 .unwrap()
1031 .compaction_config
1032 .max_sub_compaction,
1033 123
1034 );
1035
1036 insert_compaction_group_config_with_max_level(env.meta_store_ref(), &mut inner, 300, 4)
1037 .await;
1038 update_compaction_config(
1039 env.meta_store_ref(),
1040 &mut inner,
1041 &[300],
1042 &[MutableConfig::ResetCompressionAlgorithm(true)],
1043 )
1044 .await
1045 .unwrap();
1046 assert_eq!(
1047 inner
1048 .try_get_compaction_group_config(300)
1049 .unwrap()
1050 .compaction_config
1051 .compression_algorithm,
1052 super::default_compaction_config::compression_algorithm_vec(4)
1053 );
1054 let err = update_compaction_config(
1055 env.meta_store_ref(),
1056 &mut inner,
1057 &[300],
1058 &[MutableConfig::CompressionAlgorithm(CompressionAlgorithm {
1059 level: 6,
1060 compression_algorithm: "Zstd".to_owned(),
1061 })],
1062 )
1063 .await
1064 .unwrap_err();
1065 assert!(
1066 err.to_string()
1067 .contains("invalid compression_algorithm level 6")
1068 );
1069
1070 update_compaction_config(
1071 env.meta_store_ref(),
1072 &mut inner,
1073 &[100],
1074 &[MutableConfig::MaxKvCountForXor16(0)],
1075 )
1076 .await
1077 .unwrap();
1078 assert_eq!(
1079 inner
1080 .try_get_compaction_group_config(100)
1081 .unwrap()
1082 .compaction_config
1083 .max_kv_count_for_xor16,
1084 Some(0)
1085 );
1086 update_compaction_config(
1087 env.meta_store_ref(),
1088 &mut inner,
1089 &[100],
1090 &[MutableConfig::MaxKvCountForXor16(1024)],
1091 )
1092 .await
1093 .unwrap();
1094 assert_eq!(
1095 inner
1096 .try_get_compaction_group_config(100)
1097 .unwrap()
1098 .compaction_config
1099 .max_kv_count_for_xor16,
1100 Some(1024)
1101 );
1102 update_compaction_config(
1103 env.meta_store_ref(),
1104 &mut inner,
1105 &[100],
1106 &[MutableConfig::MaxKvCountForXor16(u64::MAX)],
1107 )
1108 .await
1109 .unwrap();
1110 assert_eq!(
1111 inner
1112 .try_get_compaction_group_config(100)
1113 .unwrap()
1114 .compaction_config
1115 .max_kv_count_for_xor16,
1116 None
1117 );
1118
1119 update_compaction_config(
1120 env.meta_store_ref(),
1121 &mut inner,
1122 &[100],
1123 &[MutableConfig::MaxVnodeKeyRangeBytes(0)],
1124 )
1125 .await
1126 .unwrap();
1127 assert_eq!(
1128 inner
1129 .try_get_compaction_group_config(100)
1130 .unwrap()
1131 .compaction_config
1132 .max_vnode_key_range_bytes,
1133 None
1134 );
1135 update_compaction_config(
1136 env.meta_store_ref(),
1137 &mut inner,
1138 &[100],
1139 &[MutableConfig::MaxVnodeKeyRangeBytes(1024)],
1140 )
1141 .await
1142 .unwrap();
1143 assert_eq!(
1144 inner
1145 .try_get_compaction_group_config(100)
1146 .unwrap()
1147 .compaction_config
1148 .max_vnode_key_range_bytes,
1149 Some(1024)
1150 );
1151 }
1152
1153 #[tokio::test]
1154 async fn test_manager() {
1155 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
1156 let table_fragment_1 = StreamJobFragments::for_test(
1157 JobId::new(10),
1158 BTreeMap::from([(
1159 1.into(),
1160 Fragment {
1161 fragment_id: 1.into(),
1162 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
1163 ..Default::default()
1164 },
1165 )]),
1166 );
1167 let table_fragment_2 = StreamJobFragments::for_test(
1168 JobId::new(20),
1169 BTreeMap::from([(
1170 2.into(),
1171 Fragment {
1172 fragment_id: 2.into(),
1173 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
1174 ..Default::default()
1175 },
1176 )]),
1177 );
1178
1179 let registered_number = || async {
1181 compaction_group_manager
1182 .list_compaction_group()
1183 .await
1184 .iter()
1185 .map(|cg| cg.member_table_ids.len())
1186 .sum::<usize>()
1187 };
1188 let group_number =
1189 || async { compaction_group_manager.list_compaction_group().await.len() };
1190 assert_eq!(registered_number().await, 0);
1191
1192 compaction_group_manager
1193 .register_table_fragments(
1194 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1195 table_fragment_1
1196 .internal_table_ids()
1197 .into_iter()
1198 .map_into()
1199 .collect(),
1200 )
1201 .await
1202 .unwrap();
1203 assert_eq!(registered_number().await, 4);
1204 compaction_group_manager
1205 .register_table_fragments(
1206 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
1207 table_fragment_2
1208 .internal_table_ids()
1209 .into_iter()
1210 .map_into()
1211 .collect(),
1212 )
1213 .await
1214 .unwrap();
1215 assert_eq!(registered_number().await, 8);
1216
1217 compaction_group_manager
1219 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
1220 .await;
1221 assert_eq!(registered_number().await, 4);
1222
1223 compaction_group_manager
1225 .purge(&table_fragment_2.all_table_ids().collect())
1226 .await
1227 .unwrap();
1228 assert_eq!(registered_number().await, 4);
1229 compaction_group_manager
1230 .purge(&HashSet::new())
1231 .await
1232 .unwrap();
1233 assert_eq!(registered_number().await, 0);
1234
1235 assert_eq!(group_number().await, 2);
1236
1237 compaction_group_manager
1238 .register_table_fragments(
1239 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
1240 table_fragment_1
1241 .internal_table_ids()
1242 .into_iter()
1243 .map_into()
1244 .collect(),
1245 )
1246 .await
1247 .unwrap();
1248 assert_eq!(registered_number().await, 4);
1249 assert_eq!(group_number().await, 2);
1250
1251 compaction_group_manager
1252 .unregister_table_fragments_vec(&[table_fragment_1])
1253 .await;
1254 assert_eq!(registered_number().await, 0);
1255 assert_eq!(group_number().await, 2);
1256 }
1257}