1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_metrics;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<TableId>,
113 mut internal_tables: Vec<TableId>,
114 ) -> Result<()> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
122 }
123 for table_id in internal_tables {
125 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
126 }
127 self.register_table_ids_for_test(&pairs).await?;
128 Ok(())
129 }
130
131 #[cfg(test)]
132 pub async fn unregister_table_fragments_vec(
134 &self,
135 table_fragments: &[crate::model::StreamJobFragments],
136 ) {
137 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
138 .await
139 .unwrap();
140 }
141
142 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
146 let to_unregister = self
147 .versioning
148 .read()
149 .await
150 .current_version
151 .state_table_info
152 .info()
153 .keys()
154 .cloned()
155 .filter(|table_id| !valid_ids.contains(table_id))
156 .collect_vec();
157
158 self.unregister_table_ids(to_unregister).await
161 }
162
163 pub async fn register_table_ids_for_test(
168 &self,
169 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
170 ) -> Result<()> {
171 if pairs.is_empty() {
172 return Ok(());
173 }
174 let mut versioning_guard = self.versioning.write().await;
175 let versioning = versioning_guard.deref_mut();
176 let mut compaction_group_manager = self.compaction_group_manager.write().await;
177 let current_version = &versioning.current_version;
178 let default_config = compaction_group_manager.default_compaction_config();
179 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
180
181 for (table_id, _) in pairs {
182 let table_id = (*table_id).into();
183 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
184 return Err(Error::CompactionGroup(format!(
185 "table {} already {:?}",
186 table_id, info
187 )));
188 }
189 }
190 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
192 let mut version = HummockVersionTransaction::new(
193 &mut versioning.current_version,
194 &mut versioning.hummock_version_deltas,
195 &mut versioning.table_change_log,
196 self.env.notification_manager(),
197 None,
198 &self.metrics,
199 &self.env.opts,
200 );
201 let mut new_version_delta = version.new_delta();
202
203 let committed_epoch = new_version_delta
204 .latest_version()
205 .state_table_info
206 .info()
207 .values()
208 .map(|info| info.committed_epoch)
209 .max()
210 .unwrap_or(INVALID_EPOCH);
211
212 for (table_id, raw_group_id) in pairs {
213 let table_id = (*table_id).into();
214 let mut group_id = *raw_group_id;
215 if group_id == StaticCompactionGroupId::NewCompactionGroup {
216 let mut is_group_init = false;
217 group_id = *new_compaction_group_id
218 .get_or_try_init(|| async {
219 next_compaction_group_id(&self.env).await.inspect(|_| {
220 is_group_init = true;
221 })
222 })
223 .await?;
224 if is_group_init {
225 let group_deltas = &mut new_version_delta
226 .group_deltas
227 .entry(group_id)
228 .or_default()
229 .group_deltas;
230
231 let config =
232 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
233 Some(config) => config.compaction_config.as_ref().clone(),
234 None => {
235 compaction_groups_txn
236 .create_compaction_groups(group_id, default_config.clone());
237 default_config.as_ref().clone()
238 }
239 };
240
241 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
242 group_config: Some(config),
243 group_id,
244 ..Default::default()
245 }));
246
247 group_deltas.push(group_delta);
248 }
249 }
250 assert!(
251 new_version_delta
252 .state_table_info_delta
253 .insert(
254 table_id,
255 PbStateTableInfoDelta {
256 committed_epoch,
257 compaction_group_id: *raw_group_id,
258 }
259 )
260 .is_none()
261 );
262 }
263 new_version_delta.pre_apply();
264 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
265
266 Ok(())
267 }
268
269 pub async fn unregister_table_ids(
270 &self,
271 table_ids: impl IntoIterator<Item = TableId>,
272 ) -> Result<()> {
273 let table_ids = table_ids.into_iter().collect_vec();
274 if table_ids.is_empty() {
275 return Ok(());
276 }
277
278 {
279 let mut table_write_throughput_statistic_manager =
283 self.table_write_throughput_statistic_manager.write();
284 for &table_id in table_ids.iter().unique() {
285 table_write_throughput_statistic_manager.remove_table(table_id);
286 }
287 }
288
289 let mut versioning_guard = self.versioning.write().await;
290 let versioning = versioning_guard.deref_mut();
291 let mut version = HummockVersionTransaction::new(
292 &mut versioning.current_version,
293 &mut versioning.hummock_version_deltas,
294 &mut versioning.table_change_log,
295 self.env.notification_manager(),
296 None,
297 &self.metrics,
298 &self.env.opts,
299 );
300 let mut new_version_delta = version.new_delta();
301 struct UnregisterGroupChange {
302 remaining_member_count: usize,
303 removed_table_ids: HashSet<TableId>,
304 }
305 let mut group_changes: HashMap<CompactionGroupId, UnregisterGroupChange> = HashMap::new();
306 for table_id in table_ids.into_iter().unique() {
308 let version = new_version_delta.latest_version();
309 let Some(info) = version.state_table_info.info().get(&table_id) else {
310 continue;
311 };
312 let compaction_group_id = info.compaction_group_id;
313
314 let group_change =
315 group_changes
316 .entry(compaction_group_id)
317 .or_insert_with(|| UnregisterGroupChange {
318 remaining_member_count: version
319 .state_table_info
320 .compaction_group_member_tables()
321 .get(&compaction_group_id)
322 .expect("should exist")
323 .len(),
324 removed_table_ids: HashSet::new(),
325 });
326 group_change.remaining_member_count = group_change
327 .remaining_member_count
328 .checked_sub(1)
329 .expect("member table count should be positive");
330 assert!(group_change.removed_table_ids.insert(table_id));
331 new_version_delta.removed_table_ids.insert(table_id);
332 }
333
334 for (group_id, change) in group_changes {
335 if change.remaining_member_count == 0 && group_id > StaticCompactionGroupId::End {
336 let max_level = new_version_delta
337 .latest_version()
338 .get_compaction_group_levels(group_id)
339 .levels
340 .len();
341 new_version_delta
342 .group_deltas
343 .entry(group_id)
344 .or_default()
345 .group_deltas
346 .push(GroupDelta::GroupDestroy(PbGroupDestroy {}));
347 remove_compaction_group_metrics(&self.metrics, group_id, max_level);
348 self.compaction_state.remove_compaction_group(group_id);
350 } else {
351 new_version_delta
352 .group_deltas
353 .entry(group_id)
354 .or_default()
355 .group_deltas
356 .push(GroupDelta::PruneTableIdsFromSsts(change.removed_table_ids));
357 }
358 }
359
360 new_version_delta.pre_apply();
361
362 let mut compaction_group_manager = self.compaction_group_manager.write().await;
365 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
366
367 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
368 version.latest_version(),
369 )));
370 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
371
372 Ok(())
374 }
375
376 pub async fn update_compaction_config(
377 &self,
378 compaction_group_ids: &[CompactionGroupId],
379 config_to_update: &[MutableConfig],
380 ) -> Result<()> {
381 {
382 let mut compaction_group_manager = self.compaction_group_manager.write().await;
384 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
385 compaction_groups_txn
386 .update_compaction_config(compaction_group_ids, config_to_update)?;
387 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
388 }
389
390 if config_to_update
391 .iter()
392 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
393 {
394 self.try_update_write_limits(compaction_group_ids).await;
396 }
397
398 Ok(())
399 }
400
401 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
404 let mut versioning_guard = self.versioning.write().await;
405 let versioning = versioning_guard.deref_mut();
406 let current_version = &versioning.current_version;
407 let mut results = vec![];
408 let compaction_group_manager = self.compaction_group_manager.read().await;
409
410 for levels in current_version.levels.values() {
411 let compaction_config = compaction_group_manager
412 .try_get_compaction_group_config(levels.group_id)
413 .unwrap()
414 .compaction_config
415 .as_ref()
416 .clone();
417 let group = CompactionGroupInfo {
418 id: levels.group_id,
419 parent_id: levels.parent_group_id,
420 member_table_ids: current_version
421 .state_table_info
422 .compaction_group_member_table_ids(levels.group_id)
423 .iter()
424 .copied()
425 .collect_vec(),
426 compaction_config: Some(compaction_config),
427 };
428 results.push(group);
429 }
430 results
431 }
432
433 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
434 let mut infos = vec![];
435 {
436 let versioning_guard = self.versioning.read().await;
437 let manager = self.compaction_group_manager.read().await;
438 let version = &versioning_guard.current_version;
439 for group_id in version.levels.keys() {
440 let mut group_info = CompactionGroupStatistic {
441 group_id: *group_id,
442 ..Default::default()
443 };
444 for table_id in version
445 .state_table_info
446 .compaction_group_member_table_ids(*group_id)
447 {
448 let stats_size = versioning_guard
449 .version_stats
450 .table_stats
451 .get(table_id)
452 .map(|stats| stats.total_key_size + stats.total_value_size)
453 .unwrap_or(0);
454 let table_size = std::cmp::max(stats_size, 0) as u64;
455 group_info.group_size += table_size;
456 group_info.table_statistic.insert(*table_id, table_size);
457 group_info.compaction_group_config =
458 manager.try_get_compaction_group_config(*group_id).unwrap();
459 }
460 infos.push(group_info);
461 }
462 };
463 infos
464 }
465
466 pub(crate) async fn initial_compaction_group_config_after_load(
467 &self,
468 versioning_guard: &Versioning,
469 compaction_group_manager: &mut CompactionGroupManager,
470 ) -> Result<()> {
471 let current_version = &versioning_guard.current_version;
473 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
474 let default_config = compaction_group_manager.default_compaction_config();
475 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
476 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
477 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
478
479 Ok(())
480 }
481}
482
483pub(crate) struct CompactionGroupManager {
491 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
492 default_config: Arc<CompactionConfig>,
493 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
495}
496
497impl CompactionGroupManager {
498 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
500 CompactionGroupTransaction::new(&mut self.compaction_groups)
501 }
502
503 #[expect(clippy::type_complexity)]
504 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
505 inner: P,
506 ) -> BTreeMapTransactionInner<
507 CompactionGroupId,
508 CompactionGroup,
509 DerefMutForward<
510 Self,
511 BTreeMap<CompactionGroupId, CompactionGroup>,
512 P,
513 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
514 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
515 >,
516 > {
517 BTreeMapTransactionInner::new(DerefMutForward::new(
518 inner,
519 |mgr| &mgr.compaction_groups,
520 |mgr| &mut mgr.compaction_groups,
521 ))
522 }
523
524 pub(crate) fn try_get_compaction_group_config(
526 &self,
527 compaction_group_id: impl Into<CompactionGroupId>,
528 ) -> Option<CompactionGroup> {
529 self.compaction_groups
530 .get(&compaction_group_id.into())
531 .cloned()
532 }
533
534 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
536 self.default_config.clone()
537 }
538}
539
540fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
541 for item in items {
542 match item {
543 MutableConfig::MaxBytesForLevelBase(c) => {
544 target.max_bytes_for_level_base = *c;
545 }
546 MutableConfig::MaxBytesForLevelMultiplier(c) => {
547 target.max_bytes_for_level_multiplier = *c;
548 }
549 MutableConfig::MaxCompactionBytes(c) => {
550 target.max_compaction_bytes = *c;
551 }
552 MutableConfig::SubLevelMaxCompactionBytes(c) => {
553 target.sub_level_max_compaction_bytes = *c;
554 }
555 MutableConfig::Level0TierCompactFileNumber(c) => {
556 target.level0_tier_compact_file_number = *c;
557 }
558 MutableConfig::TargetFileSizeBase(c) => {
559 target.target_file_size_base = *c;
560 }
561 MutableConfig::CompactionFilterMask(c) => {
562 target.compaction_filter_mask = *c;
563 }
564 MutableConfig::MaxSubCompaction(c) => {
565 target.max_sub_compaction = *c;
566 }
567 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
568 target.level0_stop_write_threshold_sub_level_number = *c;
569 }
570 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
571 target.level0_sub_level_compact_level_count = *c;
572 }
573 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
574 target.level0_overlapping_sub_level_compact_level_count = *c;
575 }
576 MutableConfig::MaxSpaceReclaimBytes(c) => {
577 target.max_space_reclaim_bytes = *c;
578 }
579 MutableConfig::Level0MaxCompactFileNumber(c) => {
580 target.level0_max_compact_file_number = *c;
581 }
582 MutableConfig::EnableEmergencyPicker(c) => {
583 target.enable_emergency_picker = *c;
584 }
585 MutableConfig::TombstoneReclaimRatio(c) => {
586 target.tombstone_reclaim_ratio = *c;
587 }
588 MutableConfig::CompressionAlgorithm(c) => {
589 target.compression_algorithm[c.get_level() as usize]
590 .clone_from(&c.compression_algorithm);
591 }
592 MutableConfig::MaxL0CompactLevelCount(c) => {
593 target.max_l0_compact_level_count = Some(*c);
594 }
595 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
596 target.sst_allowed_trivial_move_min_size = Some(*c);
597 }
598 MutableConfig::SplitWeightByVnode(c) => {
599 target.split_weight_by_vnode = *c;
600 }
601 MutableConfig::DisableAutoGroupScheduling(c) => {
602 target.disable_auto_group_scheduling = Some(*c);
603 }
604 MutableConfig::MaxOverlappingLevelSize(c) => {
605 target.max_overlapping_level_size = Some(*c);
606 }
607 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
608 target.sst_allowed_trivial_move_max_count = Some(*c);
609 }
610 MutableConfig::EmergencyLevel0SstFileCount(c) => {
611 target.emergency_level0_sst_file_count = Some(*c);
612 }
613 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
614 target.emergency_level0_sub_level_partition = Some(*c);
615 }
616 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
617 target.level0_stop_write_threshold_max_sst_count = Some(*c);
618 }
619 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
620 target.level0_stop_write_threshold_max_size = Some(*c);
621 }
622 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
623 target.enable_optimize_l0_interval_selection = Some(*c);
624 }
625 #[expect(deprecated)]
626 MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
627 }
629 MutableConfig::MaxKvCountForXor16(c) => {
630 target.max_kv_count_for_xor16 = (*c != u64::MIN && *c != u64::MAX).then_some(*c);
631 }
632 MutableConfig::MaxVnodeKeyRangeBytes(c) => {
633 target.max_vnode_key_range_bytes = (*c > 0).then_some(*c);
634 }
635 }
636 }
637}
638
639impl CompactionGroupTransaction<'_> {
640 pub fn try_create_compaction_groups(
642 &mut self,
643 compaction_group_ids: &[CompactionGroupId],
644 config: Arc<CompactionConfig>,
645 ) -> bool {
646 let mut trivial = true;
647 for id in compaction_group_ids {
648 if self.contains_key(id) {
649 continue;
650 }
651 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
652 self.insert(*id, new_entry);
653
654 trivial = false;
655 }
656
657 !trivial
658 }
659
660 pub fn create_compaction_groups(
661 &mut self,
662 compaction_group_id: CompactionGroupId,
663 config: Arc<CompactionConfig>,
664 ) {
665 self.try_create_compaction_groups(&[compaction_group_id], config);
666 }
667
668 pub(crate) fn try_get_compaction_group_config(
670 &self,
671 compaction_group_id: CompactionGroupId,
672 ) -> Option<&CompactionGroup> {
673 self.get(&compaction_group_id)
674 }
675
676 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
678 let stale_group = self
679 .tree_ref()
680 .keys()
681 .cloned()
682 .filter(|k| !existing_groups.contains(k))
683 .collect_vec();
684 if stale_group.is_empty() {
685 return;
686 }
687 for group in stale_group {
688 self.remove(group);
689 }
690 }
691
692 pub(crate) fn update_compaction_config(
693 &mut self,
694 compaction_group_ids: &[CompactionGroupId],
695 config_to_update: &[MutableConfig],
696 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
697 let mut results = HashMap::default();
698 for compaction_group_id in compaction_group_ids.iter().unique() {
699 let group = self.get(compaction_group_id).ok_or_else(|| {
700 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
701 })?;
702 let mut config = group.compaction_config.as_ref().clone();
703 update_compaction_config(&mut config, config_to_update);
704 if let Err(reason) = validate_compaction_config(&config) {
705 return Err(Error::CompactionGroup(reason));
706 }
707 let mut new_group = group.clone();
708 new_group.compaction_config = Arc::new(config);
709 self.insert(*compaction_group_id, new_group.clone());
710 results.insert(new_group.group_id(), new_group);
711 }
712
713 Ok(results)
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use std::collections::{BTreeMap, HashSet};
720
721 use itertools::Itertools;
722 use risingwave_common::id::JobId;
723 use risingwave_hummock_sdk::CompactionGroupId;
724 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
725
726 use crate::controller::SqlMetaStore;
727 use crate::hummock::commit_multi_var;
728 use crate::hummock::error::Result;
729 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
730 use crate::hummock::test_utils::setup_compute_env;
731 use crate::model::{Fragment, StreamJobFragments};
732
733 #[tokio::test]
734 async fn test_inner() {
735 let (env, ..) = setup_compute_env(8080).await;
736 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
737 assert_eq!(inner.compaction_groups.len(), 2);
738
739 async fn update_compaction_config(
740 meta: &SqlMetaStore,
741 inner: &mut CompactionGroupManager,
742 cg_ids: &[impl Into<CompactionGroupId> + Copy],
743 config_to_update: &[MutableConfig],
744 ) -> Result<()> {
745 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
746 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
747 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
748 commit_multi_var!(meta, compaction_groups_txn)
749 }
750
751 async fn insert_compaction_group_configs(
752 meta: &SqlMetaStore,
753 inner: &mut CompactionGroupManager,
754 cg_ids: &[u64],
755 ) {
756 let default_config = inner.default_compaction_config();
757 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
758 if compaction_groups_txn.try_create_compaction_groups(
759 &cg_ids.iter().copied().map_into().collect_vec(),
760 default_config,
761 ) {
762 commit_multi_var!(meta, compaction_groups_txn).unwrap();
763 }
764 }
765
766 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
767 .await
768 .unwrap_err();
769 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
770 assert_eq!(inner.compaction_groups.len(), 4);
771 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
772 assert_eq!(inner.compaction_groups.len(), 4);
773
774 update_compaction_config(
775 env.meta_store_ref(),
776 &mut inner,
777 &[100, 200],
778 &[MutableConfig::MaxSubCompaction(123)],
779 )
780 .await
781 .unwrap();
782 assert_eq!(inner.compaction_groups.len(), 4);
783 assert_eq!(
784 inner
785 .try_get_compaction_group_config(100)
786 .unwrap()
787 .compaction_config
788 .max_sub_compaction,
789 123
790 );
791 assert_eq!(
792 inner
793 .try_get_compaction_group_config(200)
794 .unwrap()
795 .compaction_config
796 .max_sub_compaction,
797 123
798 );
799 }
800
801 #[tokio::test]
802 async fn test_manager() {
803 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
804 let table_fragment_1 = StreamJobFragments::for_test(
805 JobId::new(10),
806 BTreeMap::from([(
807 1.into(),
808 Fragment {
809 fragment_id: 1.into(),
810 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
811 ..Default::default()
812 },
813 )]),
814 );
815 let table_fragment_2 = StreamJobFragments::for_test(
816 JobId::new(20),
817 BTreeMap::from([(
818 2.into(),
819 Fragment {
820 fragment_id: 2.into(),
821 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
822 ..Default::default()
823 },
824 )]),
825 );
826
827 let registered_number = || async {
829 compaction_group_manager
830 .list_compaction_group()
831 .await
832 .iter()
833 .map(|cg| cg.member_table_ids.len())
834 .sum::<usize>()
835 };
836 let group_number =
837 || async { compaction_group_manager.list_compaction_group().await.len() };
838 assert_eq!(registered_number().await, 0);
839
840 compaction_group_manager
841 .register_table_fragments(
842 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
843 table_fragment_1
844 .internal_table_ids()
845 .into_iter()
846 .map_into()
847 .collect(),
848 )
849 .await
850 .unwrap();
851 assert_eq!(registered_number().await, 4);
852 compaction_group_manager
853 .register_table_fragments(
854 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
855 table_fragment_2
856 .internal_table_ids()
857 .into_iter()
858 .map_into()
859 .collect(),
860 )
861 .await
862 .unwrap();
863 assert_eq!(registered_number().await, 8);
864
865 compaction_group_manager
867 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
868 .await;
869 assert_eq!(registered_number().await, 4);
870
871 compaction_group_manager
873 .purge(&table_fragment_2.all_table_ids().collect())
874 .await
875 .unwrap();
876 assert_eq!(registered_number().await, 4);
877 compaction_group_manager
878 .purge(&HashSet::new())
879 .await
880 .unwrap();
881 assert_eq!(registered_number().await, 0);
882
883 assert_eq!(group_number().await, 2);
884
885 compaction_group_manager
886 .register_table_fragments(
887 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
888 table_fragment_1
889 .internal_table_ids()
890 .into_iter()
891 .map_into()
892 .collect(),
893 )
894 .await
895 .unwrap();
896 assert_eq!(registered_number().await, 4);
897 assert_eq!(group_number().await, 2);
898
899 compaction_group_manager
900 .unregister_table_fragments_vec(&[table_fragment_1])
901 .await;
902 assert_eq!(registered_number().await, 0);
903 assert_eq!(group_number().await, 2);
904 }
905}