1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<TableId>,
113 mut internal_tables: Vec<TableId>,
114 ) -> Result<()> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((
122 mv_table,
123 CompactionGroupId::from(StaticCompactionGroupId::MaterializedView),
124 ));
125 }
126 for table_id in internal_tables {
128 pairs.push((
129 table_id,
130 CompactionGroupId::from(StaticCompactionGroupId::StateDefault),
131 ));
132 }
133 self.register_table_ids_for_test(&pairs).await?;
134 Ok(())
135 }
136
137 #[cfg(test)]
138 pub async fn unregister_table_fragments_vec(
140 &self,
141 table_fragments: &[crate::model::StreamJobFragments],
142 ) {
143 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
144 .await
145 .unwrap();
146 }
147
148 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
152 let to_unregister = self
153 .versioning
154 .read()
155 .await
156 .current_version
157 .state_table_info
158 .info()
159 .keys()
160 .cloned()
161 .filter(|table_id| !valid_ids.contains(table_id))
162 .collect_vec();
163
164 self.unregister_table_ids(to_unregister).await
167 }
168
169 pub async fn register_table_ids_for_test(
174 &self,
175 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
176 ) -> Result<()> {
177 if pairs.is_empty() {
178 return Ok(());
179 }
180 let mut versioning_guard = self.versioning.write().await;
181 let versioning = versioning_guard.deref_mut();
182 let mut compaction_group_manager = self.compaction_group_manager.write().await;
183 let current_version = &versioning.current_version;
184 let default_config = compaction_group_manager.default_compaction_config();
185 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
186
187 for (table_id, _) in pairs {
188 let table_id = (*table_id).into();
189 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
190 return Err(Error::CompactionGroup(format!(
191 "table {} already {:?}",
192 table_id, info
193 )));
194 }
195 }
196 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
198 let mut version = HummockVersionTransaction::new(
199 &mut versioning.current_version,
200 &mut versioning.hummock_version_deltas,
201 self.env.notification_manager(),
202 None,
203 &self.metrics,
204 );
205 let mut new_version_delta = version.new_delta();
206
207 let committed_epoch = new_version_delta
208 .latest_version()
209 .state_table_info
210 .info()
211 .values()
212 .map(|info| info.committed_epoch)
213 .max()
214 .unwrap_or(INVALID_EPOCH);
215
216 for (table_id, raw_group_id) in pairs {
217 let table_id = (*table_id).into();
218 let mut group_id = *raw_group_id;
219 if group_id == StaticCompactionGroupId::NewCompactionGroup as u64 {
220 let mut is_group_init = false;
221 group_id = *new_compaction_group_id
222 .get_or_try_init(|| async {
223 next_compaction_group_id(&self.env).await.inspect(|_| {
224 is_group_init = true;
225 })
226 })
227 .await?;
228 if is_group_init {
229 let group_deltas = &mut new_version_delta
230 .group_deltas
231 .entry(group_id)
232 .or_default()
233 .group_deltas;
234
235 let config =
236 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
237 Some(config) => config.compaction_config.as_ref().clone(),
238 None => {
239 compaction_groups_txn
240 .create_compaction_groups(group_id, default_config.clone());
241 default_config.as_ref().clone()
242 }
243 };
244
245 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
246 group_config: Some(config),
247 group_id,
248 ..Default::default()
249 }));
250
251 group_deltas.push(group_delta);
252 }
253 }
254 assert!(
255 new_version_delta
256 .state_table_info_delta
257 .insert(
258 table_id,
259 PbStateTableInfoDelta {
260 committed_epoch,
261 compaction_group_id: *raw_group_id,
262 }
263 )
264 .is_none()
265 );
266 }
267 new_version_delta.pre_apply();
268 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
269
270 Ok(())
271 }
272
273 pub async fn unregister_table_ids(
274 &self,
275 table_ids: impl IntoIterator<Item = TableId>,
276 ) -> Result<()> {
277 let table_ids = table_ids.into_iter().collect_vec();
278 if table_ids.is_empty() {
279 return Ok(());
280 }
281
282 {
283 let mut table_write_throughput_statistic_manager =
287 self.table_write_throughput_statistic_manager.write();
288 for &table_id in table_ids.iter().unique() {
289 table_write_throughput_statistic_manager.remove_table(table_id);
290 }
291 }
292
293 let mut versioning_guard = self.versioning.write().await;
294 let versioning = versioning_guard.deref_mut();
295 let mut version = HummockVersionTransaction::new(
296 &mut versioning.current_version,
297 &mut versioning.hummock_version_deltas,
298 self.env.notification_manager(),
299 None,
300 &self.metrics,
301 );
302 let mut new_version_delta = version.new_delta();
303 let mut modified_groups: HashMap<CompactionGroupId, u64> =
304 HashMap::new();
305 for table_id in table_ids.into_iter().unique() {
307 let version = new_version_delta.latest_version();
308 let Some(info) = version.state_table_info.info().get(&table_id) else {
309 continue;
310 };
311
312 modified_groups
313 .entry(info.compaction_group_id)
314 .and_modify(|count| *count -= 1)
315 .or_insert(
316 version
317 .state_table_info
318 .compaction_group_member_tables()
319 .get(&info.compaction_group_id)
320 .expect("should exist")
321 .len() as u64
322 - 1,
323 );
324 new_version_delta.removed_table_ids.insert(table_id);
325 }
326
327 let groups_to_remove = modified_groups
328 .into_iter()
329 .filter_map(|(group_id, member_count)| {
330 if member_count == 0 && group_id > StaticCompactionGroupId::End as CompactionGroupId
331 {
332 return Some((
333 group_id,
334 new_version_delta
335 .latest_version()
336 .get_compaction_group_levels(group_id)
337 .levels
338 .len(),
339 ));
340 }
341 None
342 })
343 .collect_vec();
344 for (group_id, _) in &groups_to_remove {
345 let group_deltas = &mut new_version_delta
346 .group_deltas
347 .entry(*group_id)
348 .or_default()
349 .group_deltas;
350
351 let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
352 group_deltas.push(group_delta);
353 }
354
355 for (group_id, max_level) in groups_to_remove {
356 remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
357 }
358
359 new_version_delta.pre_apply();
360
361 let mut compaction_group_manager = self.compaction_group_manager.write().await;
364 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
365
366 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
367 version.latest_version(),
368 )));
369 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
370
371 Ok(())
373 }
374
375 pub async fn update_compaction_config(
376 &self,
377 compaction_group_ids: &[CompactionGroupId],
378 config_to_update: &[MutableConfig],
379 ) -> Result<()> {
380 {
381 let mut compaction_group_manager = self.compaction_group_manager.write().await;
383 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
384 compaction_groups_txn
385 .update_compaction_config(compaction_group_ids, config_to_update)?;
386 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
387 }
388
389 if config_to_update
390 .iter()
391 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
392 {
393 self.try_update_write_limits(compaction_group_ids).await;
395 }
396
397 Ok(())
398 }
399
400 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
403 let mut versioning_guard = self.versioning.write().await;
404 let versioning = versioning_guard.deref_mut();
405 let current_version = &versioning.current_version;
406 let mut results = vec![];
407 let compaction_group_manager = self.compaction_group_manager.read().await;
408
409 for levels in current_version.levels.values() {
410 let compaction_config = compaction_group_manager
411 .try_get_compaction_group_config(levels.group_id)
412 .unwrap()
413 .compaction_config
414 .as_ref()
415 .clone();
416 let group = CompactionGroupInfo {
417 id: levels.group_id,
418 parent_id: levels.parent_group_id,
419 member_table_ids: current_version
420 .state_table_info
421 .compaction_group_member_table_ids(levels.group_id)
422 .iter()
423 .copied()
424 .collect_vec(),
425 compaction_config: Some(compaction_config),
426 };
427 results.push(group);
428 }
429 results
430 }
431
432 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
433 let mut infos = vec![];
434 {
435 let versioning_guard = self.versioning.read().await;
436 let manager = self.compaction_group_manager.read().await;
437 let version = &versioning_guard.current_version;
438 for group_id in version.levels.keys() {
439 let mut group_info = CompactionGroupStatistic {
440 group_id: *group_id,
441 ..Default::default()
442 };
443 for table_id in version
444 .state_table_info
445 .compaction_group_member_table_ids(*group_id)
446 {
447 let stats_size = versioning_guard
448 .version_stats
449 .table_stats
450 .get(table_id)
451 .map(|stats| stats.total_key_size + stats.total_value_size)
452 .unwrap_or(0);
453 let table_size = std::cmp::max(stats_size, 0) as u64;
454 group_info.group_size += table_size;
455 group_info.table_statistic.insert(*table_id, table_size);
456 group_info.compaction_group_config =
457 manager.try_get_compaction_group_config(*group_id).unwrap();
458 }
459 infos.push(group_info);
460 }
461 };
462 infos
463 }
464
465 pub(crate) async fn initial_compaction_group_config_after_load(
466 &self,
467 versioning_guard: &Versioning,
468 compaction_group_manager: &mut CompactionGroupManager,
469 ) -> Result<()> {
470 let current_version = &versioning_guard.current_version;
472 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
473 let default_config = compaction_group_manager.default_compaction_config();
474 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
475 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
476 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
477
478 Ok(())
479 }
480}
481
482pub(crate) struct CompactionGroupManager {
490 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
491 default_config: Arc<CompactionConfig>,
492 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
494}
495
496impl CompactionGroupManager {
497 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
499 CompactionGroupTransaction::new(&mut self.compaction_groups)
500 }
501
502 #[expect(clippy::type_complexity)]
503 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
504 inner: P,
505 ) -> BTreeMapTransactionInner<
506 CompactionGroupId,
507 CompactionGroup,
508 DerefMutForward<
509 Self,
510 BTreeMap<CompactionGroupId, CompactionGroup>,
511 P,
512 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
513 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
514 >,
515 > {
516 BTreeMapTransactionInner::new(DerefMutForward::new(
517 inner,
518 |mgr| &mgr.compaction_groups,
519 |mgr| &mut mgr.compaction_groups,
520 ))
521 }
522
523 pub(crate) fn try_get_compaction_group_config(
525 &self,
526 compaction_group_id: CompactionGroupId,
527 ) -> Option<CompactionGroup> {
528 self.compaction_groups.get(&compaction_group_id).cloned()
529 }
530
531 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
533 self.default_config.clone()
534 }
535}
536
537fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
538 for item in items {
539 match item {
540 MutableConfig::MaxBytesForLevelBase(c) => {
541 target.max_bytes_for_level_base = *c;
542 }
543 MutableConfig::MaxBytesForLevelMultiplier(c) => {
544 target.max_bytes_for_level_multiplier = *c;
545 }
546 MutableConfig::MaxCompactionBytes(c) => {
547 target.max_compaction_bytes = *c;
548 }
549 MutableConfig::SubLevelMaxCompactionBytes(c) => {
550 target.sub_level_max_compaction_bytes = *c;
551 }
552 MutableConfig::Level0TierCompactFileNumber(c) => {
553 target.level0_tier_compact_file_number = *c;
554 }
555 MutableConfig::TargetFileSizeBase(c) => {
556 target.target_file_size_base = *c;
557 }
558 MutableConfig::CompactionFilterMask(c) => {
559 target.compaction_filter_mask = *c;
560 }
561 MutableConfig::MaxSubCompaction(c) => {
562 target.max_sub_compaction = *c;
563 }
564 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
565 target.level0_stop_write_threshold_sub_level_number = *c;
566 }
567 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
568 target.level0_sub_level_compact_level_count = *c;
569 }
570 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
571 target.level0_overlapping_sub_level_compact_level_count = *c;
572 }
573 MutableConfig::MaxSpaceReclaimBytes(c) => {
574 target.max_space_reclaim_bytes = *c;
575 }
576 MutableConfig::Level0MaxCompactFileNumber(c) => {
577 target.level0_max_compact_file_number = *c;
578 }
579 MutableConfig::EnableEmergencyPicker(c) => {
580 target.enable_emergency_picker = *c;
581 }
582 MutableConfig::TombstoneReclaimRatio(c) => {
583 target.tombstone_reclaim_ratio = *c;
584 }
585 MutableConfig::CompressionAlgorithm(c) => {
586 target.compression_algorithm[c.get_level() as usize]
587 .clone_from(&c.compression_algorithm);
588 }
589 MutableConfig::MaxL0CompactLevelCount(c) => {
590 target.max_l0_compact_level_count = Some(*c);
591 }
592 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
593 target.sst_allowed_trivial_move_min_size = Some(*c);
594 }
595 MutableConfig::SplitWeightByVnode(c) => {
596 target.split_weight_by_vnode = *c;
597 }
598 MutableConfig::DisableAutoGroupScheduling(c) => {
599 target.disable_auto_group_scheduling = Some(*c);
600 }
601 MutableConfig::MaxOverlappingLevelSize(c) => {
602 target.max_overlapping_level_size = Some(*c);
603 }
604 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
605 target.sst_allowed_trivial_move_max_count = Some(*c);
606 }
607 MutableConfig::EmergencyLevel0SstFileCount(c) => {
608 target.emergency_level0_sst_file_count = Some(*c);
609 }
610 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
611 target.emergency_level0_sub_level_partition = Some(*c);
612 }
613 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
614 target.level0_stop_write_threshold_max_sst_count = Some(*c);
615 }
616 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
617 target.level0_stop_write_threshold_max_size = Some(*c);
618 }
619 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
620 target.enable_optimize_l0_interval_selection = Some(*c);
621 }
622 }
623 }
624}
625
626impl CompactionGroupTransaction<'_> {
627 pub fn try_create_compaction_groups(
629 &mut self,
630 compaction_group_ids: &[CompactionGroupId],
631 config: Arc<CompactionConfig>,
632 ) -> bool {
633 let mut trivial = true;
634 for id in compaction_group_ids {
635 if self.contains_key(id) {
636 continue;
637 }
638 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
639 self.insert(*id, new_entry);
640
641 trivial = false;
642 }
643
644 !trivial
645 }
646
647 pub fn create_compaction_groups(
648 &mut self,
649 compaction_group_id: CompactionGroupId,
650 config: Arc<CompactionConfig>,
651 ) {
652 self.try_create_compaction_groups(&[compaction_group_id], config);
653 }
654
655 pub(crate) fn try_get_compaction_group_config(
657 &self,
658 compaction_group_id: CompactionGroupId,
659 ) -> Option<&CompactionGroup> {
660 self.get(&compaction_group_id)
661 }
662
663 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
665 let stale_group = self
666 .tree_ref()
667 .keys()
668 .cloned()
669 .filter(|k| !existing_groups.contains(k))
670 .collect_vec();
671 if stale_group.is_empty() {
672 return;
673 }
674 for group in stale_group {
675 self.remove(group);
676 }
677 }
678
679 pub(crate) fn update_compaction_config(
680 &mut self,
681 compaction_group_ids: &[CompactionGroupId],
682 config_to_update: &[MutableConfig],
683 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
684 let mut results = HashMap::default();
685 for compaction_group_id in compaction_group_ids.iter().unique() {
686 let group = self.get(compaction_group_id).ok_or_else(|| {
687 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
688 })?;
689 let mut config = group.compaction_config.as_ref().clone();
690 update_compaction_config(&mut config, config_to_update);
691 if let Err(reason) = validate_compaction_config(&config) {
692 return Err(Error::CompactionGroup(reason));
693 }
694 let mut new_group = group.clone();
695 new_group.compaction_config = Arc::new(config);
696 self.insert(*compaction_group_id, new_group.clone());
697 results.insert(new_group.group_id(), new_group);
698 }
699
700 Ok(results)
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use std::collections::{BTreeMap, HashSet};
707
708 use itertools::Itertools;
709 use risingwave_common::id::JobId;
710 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
711
712 use crate::controller::SqlMetaStore;
713 use crate::hummock::commit_multi_var;
714 use crate::hummock::error::Result;
715 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
716 use crate::hummock::test_utils::setup_compute_env;
717 use crate::model::{Fragment, StreamJobFragments};
718
719 #[tokio::test]
720 async fn test_inner() {
721 let (env, ..) = setup_compute_env(8080).await;
722 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
723 assert_eq!(inner.compaction_groups.len(), 2);
724
725 async fn update_compaction_config(
726 meta: &SqlMetaStore,
727 inner: &mut CompactionGroupManager,
728 cg_ids: &[u64],
729 config_to_update: &[MutableConfig],
730 ) -> Result<()> {
731 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
732 compaction_groups_txn.update_compaction_config(cg_ids, config_to_update)?;
733 commit_multi_var!(meta, compaction_groups_txn)
734 }
735
736 async fn insert_compaction_group_configs(
737 meta: &SqlMetaStore,
738 inner: &mut CompactionGroupManager,
739 cg_ids: &[u64],
740 ) {
741 let default_config = inner.default_compaction_config();
742 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
743 if compaction_groups_txn.try_create_compaction_groups(cg_ids, default_config) {
744 commit_multi_var!(meta, compaction_groups_txn).unwrap();
745 }
746 }
747
748 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
749 .await
750 .unwrap_err();
751 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
752 assert_eq!(inner.compaction_groups.len(), 4);
753 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
754 assert_eq!(inner.compaction_groups.len(), 4);
755
756 update_compaction_config(
757 env.meta_store_ref(),
758 &mut inner,
759 &[100, 200],
760 &[MutableConfig::MaxSubCompaction(123)],
761 )
762 .await
763 .unwrap();
764 assert_eq!(inner.compaction_groups.len(), 4);
765 assert_eq!(
766 inner
767 .try_get_compaction_group_config(100)
768 .unwrap()
769 .compaction_config
770 .max_sub_compaction,
771 123
772 );
773 assert_eq!(
774 inner
775 .try_get_compaction_group_config(200)
776 .unwrap()
777 .compaction_config
778 .max_sub_compaction,
779 123
780 );
781 }
782
783 #[tokio::test]
784 async fn test_manager() {
785 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
786 let table_fragment_1 = StreamJobFragments::for_test(
787 JobId::new(10),
788 BTreeMap::from([(
789 1.into(),
790 Fragment {
791 fragment_id: 1.into(),
792 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
793 ..Default::default()
794 },
795 )]),
796 );
797 let table_fragment_2 = StreamJobFragments::for_test(
798 JobId::new(20),
799 BTreeMap::from([(
800 2.into(),
801 Fragment {
802 fragment_id: 2.into(),
803 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
804 ..Default::default()
805 },
806 )]),
807 );
808
809 let registered_number = || async {
811 compaction_group_manager
812 .list_compaction_group()
813 .await
814 .iter()
815 .map(|cg| cg.member_table_ids.len())
816 .sum::<usize>()
817 };
818 let group_number =
819 || async { compaction_group_manager.list_compaction_group().await.len() };
820 assert_eq!(registered_number().await, 0);
821
822 compaction_group_manager
823 .register_table_fragments(
824 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
825 table_fragment_1
826 .internal_table_ids()
827 .into_iter()
828 .map_into()
829 .collect(),
830 )
831 .await
832 .unwrap();
833 assert_eq!(registered_number().await, 4);
834 compaction_group_manager
835 .register_table_fragments(
836 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
837 table_fragment_2
838 .internal_table_ids()
839 .into_iter()
840 .map_into()
841 .collect(),
842 )
843 .await
844 .unwrap();
845 assert_eq!(registered_number().await, 8);
846
847 compaction_group_manager
849 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
850 .await;
851 assert_eq!(registered_number().await, 4);
852
853 compaction_group_manager
855 .purge(&table_fragment_2.all_table_ids().collect())
856 .await
857 .unwrap();
858 assert_eq!(registered_number().await, 4);
859 compaction_group_manager
860 .purge(&HashSet::new())
861 .await
862 .unwrap();
863 assert_eq!(registered_number().await, 0);
864
865 assert_eq!(group_number().await, 2);
866
867 compaction_group_manager
868 .register_table_fragments(
869 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
870 table_fragment_1
871 .internal_table_ids()
872 .into_iter()
873 .map_into()
874 .collect(),
875 )
876 .await
877 .unwrap();
878 assert_eq!(registered_number().await, 4);
879 assert_eq!(group_number().await, 2);
880
881 compaction_group_manager
882 .unregister_table_fragments_vec(&[table_fragment_1])
883 .await;
884 assert_eq!(registered_number().await, 0);
885 assert_eq!(group_number().await, 2);
886 }
887}