1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<TableId>,
113 mut internal_tables: Vec<TableId>,
114 ) -> Result<()> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
122 }
123 for table_id in internal_tables {
125 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
126 }
127 self.register_table_ids_for_test(&pairs).await?;
128 Ok(())
129 }
130
131 #[cfg(test)]
132 pub async fn unregister_table_fragments_vec(
134 &self,
135 table_fragments: &[crate::model::StreamJobFragments],
136 ) {
137 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
138 .await
139 .unwrap();
140 }
141
142 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
146 let to_unregister = self
147 .versioning
148 .read()
149 .await
150 .current_version
151 .state_table_info
152 .info()
153 .keys()
154 .cloned()
155 .filter(|table_id| !valid_ids.contains(table_id))
156 .collect_vec();
157
158 self.unregister_table_ids(to_unregister).await
161 }
162
163 pub async fn register_table_ids_for_test(
168 &self,
169 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
170 ) -> Result<()> {
171 if pairs.is_empty() {
172 return Ok(());
173 }
174 let mut versioning_guard = self.versioning.write().await;
175 let versioning = versioning_guard.deref_mut();
176 let mut compaction_group_manager = self.compaction_group_manager.write().await;
177 let current_version = &versioning.current_version;
178 let default_config = compaction_group_manager.default_compaction_config();
179 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
180
181 for (table_id, _) in pairs {
182 let table_id = (*table_id).into();
183 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
184 return Err(Error::CompactionGroup(format!(
185 "table {} already {:?}",
186 table_id, info
187 )));
188 }
189 }
190 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
192 let mut version = HummockVersionTransaction::new(
193 &mut versioning.current_version,
194 &mut versioning.hummock_version_deltas,
195 self.env.notification_manager(),
196 None,
197 &self.metrics,
198 );
199 let mut new_version_delta = version.new_delta();
200
201 let committed_epoch = new_version_delta
202 .latest_version()
203 .state_table_info
204 .info()
205 .values()
206 .map(|info| info.committed_epoch)
207 .max()
208 .unwrap_or(INVALID_EPOCH);
209
210 for (table_id, raw_group_id) in pairs {
211 let table_id = (*table_id).into();
212 let mut group_id = *raw_group_id;
213 if group_id == StaticCompactionGroupId::NewCompactionGroup {
214 let mut is_group_init = false;
215 group_id = *new_compaction_group_id
216 .get_or_try_init(|| async {
217 next_compaction_group_id(&self.env).await.inspect(|_| {
218 is_group_init = true;
219 })
220 })
221 .await?;
222 if is_group_init {
223 let group_deltas = &mut new_version_delta
224 .group_deltas
225 .entry(group_id)
226 .or_default()
227 .group_deltas;
228
229 let config =
230 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
231 Some(config) => config.compaction_config.as_ref().clone(),
232 None => {
233 compaction_groups_txn
234 .create_compaction_groups(group_id, default_config.clone());
235 default_config.as_ref().clone()
236 }
237 };
238
239 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
240 group_config: Some(config),
241 group_id,
242 ..Default::default()
243 }));
244
245 group_deltas.push(group_delta);
246 }
247 }
248 assert!(
249 new_version_delta
250 .state_table_info_delta
251 .insert(
252 table_id,
253 PbStateTableInfoDelta {
254 committed_epoch,
255 compaction_group_id: *raw_group_id,
256 }
257 )
258 .is_none()
259 );
260 }
261 new_version_delta.pre_apply();
262 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
263
264 Ok(())
265 }
266
267 pub async fn unregister_table_ids(
268 &self,
269 table_ids: impl IntoIterator<Item = TableId>,
270 ) -> Result<()> {
271 let table_ids = table_ids.into_iter().collect_vec();
272 if table_ids.is_empty() {
273 return Ok(());
274 }
275
276 {
277 let mut table_write_throughput_statistic_manager =
281 self.table_write_throughput_statistic_manager.write();
282 for &table_id in table_ids.iter().unique() {
283 table_write_throughput_statistic_manager.remove_table(table_id);
284 }
285 }
286
287 let mut versioning_guard = self.versioning.write().await;
288 let versioning = versioning_guard.deref_mut();
289 let mut version = HummockVersionTransaction::new(
290 &mut versioning.current_version,
291 &mut versioning.hummock_version_deltas,
292 self.env.notification_manager(),
293 None,
294 &self.metrics,
295 );
296 let mut new_version_delta = version.new_delta();
297 let mut modified_groups: HashMap<CompactionGroupId, u64> =
298 HashMap::new();
299 for table_id in table_ids.into_iter().unique() {
301 let version = new_version_delta.latest_version();
302 let Some(info) = version.state_table_info.info().get(&table_id) else {
303 continue;
304 };
305
306 modified_groups
307 .entry(info.compaction_group_id)
308 .and_modify(|count| *count -= 1)
309 .or_insert(
310 version
311 .state_table_info
312 .compaction_group_member_tables()
313 .get(&info.compaction_group_id)
314 .expect("should exist")
315 .len() as u64
316 - 1,
317 );
318 new_version_delta.removed_table_ids.insert(table_id);
319 }
320
321 let groups_to_remove = modified_groups
322 .into_iter()
323 .filter_map(|(group_id, member_count)| {
324 if member_count == 0 && group_id > StaticCompactionGroupId::End {
325 return Some((
326 group_id,
327 new_version_delta
328 .latest_version()
329 .get_compaction_group_levels(group_id)
330 .levels
331 .len(),
332 ));
333 }
334 None
335 })
336 .collect_vec();
337 for (group_id, _) in &groups_to_remove {
338 let group_deltas = &mut new_version_delta
339 .group_deltas
340 .entry(*group_id)
341 .or_default()
342 .group_deltas;
343
344 let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
345 group_deltas.push(group_delta);
346 }
347
348 for (group_id, max_level) in groups_to_remove {
349 remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
350 self.compaction_state.remove_compaction_group(group_id);
352 }
353
354 new_version_delta.pre_apply();
355
356 let mut compaction_group_manager = self.compaction_group_manager.write().await;
359 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
360
361 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
362 version.latest_version(),
363 )));
364 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
365
366 Ok(())
368 }
369
370 pub async fn update_compaction_config(
371 &self,
372 compaction_group_ids: &[CompactionGroupId],
373 config_to_update: &[MutableConfig],
374 ) -> Result<()> {
375 {
376 let mut compaction_group_manager = self.compaction_group_manager.write().await;
378 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
379 compaction_groups_txn
380 .update_compaction_config(compaction_group_ids, config_to_update)?;
381 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
382 }
383
384 if config_to_update
385 .iter()
386 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
387 {
388 self.try_update_write_limits(compaction_group_ids).await;
390 }
391
392 Ok(())
393 }
394
395 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
398 let mut versioning_guard = self.versioning.write().await;
399 let versioning = versioning_guard.deref_mut();
400 let current_version = &versioning.current_version;
401 let mut results = vec![];
402 let compaction_group_manager = self.compaction_group_manager.read().await;
403
404 for levels in current_version.levels.values() {
405 let compaction_config = compaction_group_manager
406 .try_get_compaction_group_config(levels.group_id)
407 .unwrap()
408 .compaction_config
409 .as_ref()
410 .clone();
411 let group = CompactionGroupInfo {
412 id: levels.group_id,
413 parent_id: levels.parent_group_id,
414 member_table_ids: current_version
415 .state_table_info
416 .compaction_group_member_table_ids(levels.group_id)
417 .iter()
418 .copied()
419 .collect_vec(),
420 compaction_config: Some(compaction_config),
421 };
422 results.push(group);
423 }
424 results
425 }
426
427 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
428 let mut infos = vec![];
429 {
430 let versioning_guard = self.versioning.read().await;
431 let manager = self.compaction_group_manager.read().await;
432 let version = &versioning_guard.current_version;
433 for group_id in version.levels.keys() {
434 let mut group_info = CompactionGroupStatistic {
435 group_id: *group_id,
436 ..Default::default()
437 };
438 for table_id in version
439 .state_table_info
440 .compaction_group_member_table_ids(*group_id)
441 {
442 let stats_size = versioning_guard
443 .version_stats
444 .table_stats
445 .get(table_id)
446 .map(|stats| stats.total_key_size + stats.total_value_size)
447 .unwrap_or(0);
448 let table_size = std::cmp::max(stats_size, 0) as u64;
449 group_info.group_size += table_size;
450 group_info.table_statistic.insert(*table_id, table_size);
451 group_info.compaction_group_config =
452 manager.try_get_compaction_group_config(*group_id).unwrap();
453 }
454 infos.push(group_info);
455 }
456 };
457 infos
458 }
459
460 pub(crate) async fn initial_compaction_group_config_after_load(
461 &self,
462 versioning_guard: &Versioning,
463 compaction_group_manager: &mut CompactionGroupManager,
464 ) -> Result<()> {
465 let current_version = &versioning_guard.current_version;
467 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
468 let default_config = compaction_group_manager.default_compaction_config();
469 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
470 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
471 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
472
473 Ok(())
474 }
475}
476
477pub(crate) struct CompactionGroupManager {
485 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
486 default_config: Arc<CompactionConfig>,
487 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
489}
490
491impl CompactionGroupManager {
492 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
494 CompactionGroupTransaction::new(&mut self.compaction_groups)
495 }
496
497 #[expect(clippy::type_complexity)]
498 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
499 inner: P,
500 ) -> BTreeMapTransactionInner<
501 CompactionGroupId,
502 CompactionGroup,
503 DerefMutForward<
504 Self,
505 BTreeMap<CompactionGroupId, CompactionGroup>,
506 P,
507 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
508 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
509 >,
510 > {
511 BTreeMapTransactionInner::new(DerefMutForward::new(
512 inner,
513 |mgr| &mgr.compaction_groups,
514 |mgr| &mut mgr.compaction_groups,
515 ))
516 }
517
518 pub(crate) fn try_get_compaction_group_config(
520 &self,
521 compaction_group_id: impl Into<CompactionGroupId>,
522 ) -> Option<CompactionGroup> {
523 self.compaction_groups
524 .get(&compaction_group_id.into())
525 .cloned()
526 }
527
528 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
530 self.default_config.clone()
531 }
532}
533
534fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
535 for item in items {
536 match item {
537 MutableConfig::MaxBytesForLevelBase(c) => {
538 target.max_bytes_for_level_base = *c;
539 }
540 MutableConfig::MaxBytesForLevelMultiplier(c) => {
541 target.max_bytes_for_level_multiplier = *c;
542 }
543 MutableConfig::MaxCompactionBytes(c) => {
544 target.max_compaction_bytes = *c;
545 }
546 MutableConfig::SubLevelMaxCompactionBytes(c) => {
547 target.sub_level_max_compaction_bytes = *c;
548 }
549 MutableConfig::Level0TierCompactFileNumber(c) => {
550 target.level0_tier_compact_file_number = *c;
551 }
552 MutableConfig::TargetFileSizeBase(c) => {
553 target.target_file_size_base = *c;
554 }
555 MutableConfig::CompactionFilterMask(c) => {
556 target.compaction_filter_mask = *c;
557 }
558 MutableConfig::MaxSubCompaction(c) => {
559 target.max_sub_compaction = *c;
560 }
561 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
562 target.level0_stop_write_threshold_sub_level_number = *c;
563 }
564 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
565 target.level0_sub_level_compact_level_count = *c;
566 }
567 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
568 target.level0_overlapping_sub_level_compact_level_count = *c;
569 }
570 MutableConfig::MaxSpaceReclaimBytes(c) => {
571 target.max_space_reclaim_bytes = *c;
572 }
573 MutableConfig::Level0MaxCompactFileNumber(c) => {
574 target.level0_max_compact_file_number = *c;
575 }
576 MutableConfig::EnableEmergencyPicker(c) => {
577 target.enable_emergency_picker = *c;
578 }
579 MutableConfig::TombstoneReclaimRatio(c) => {
580 target.tombstone_reclaim_ratio = *c;
581 }
582 MutableConfig::CompressionAlgorithm(c) => {
583 target.compression_algorithm[c.get_level() as usize]
584 .clone_from(&c.compression_algorithm);
585 }
586 MutableConfig::MaxL0CompactLevelCount(c) => {
587 target.max_l0_compact_level_count = Some(*c);
588 }
589 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
590 target.sst_allowed_trivial_move_min_size = Some(*c);
591 }
592 MutableConfig::SplitWeightByVnode(c) => {
593 target.split_weight_by_vnode = *c;
594 }
595 MutableConfig::DisableAutoGroupScheduling(c) => {
596 target.disable_auto_group_scheduling = Some(*c);
597 }
598 MutableConfig::MaxOverlappingLevelSize(c) => {
599 target.max_overlapping_level_size = Some(*c);
600 }
601 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
602 target.sst_allowed_trivial_move_max_count = Some(*c);
603 }
604 MutableConfig::EmergencyLevel0SstFileCount(c) => {
605 target.emergency_level0_sst_file_count = Some(*c);
606 }
607 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
608 target.emergency_level0_sub_level_partition = Some(*c);
609 }
610 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
611 target.level0_stop_write_threshold_max_sst_count = Some(*c);
612 }
613 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
614 target.level0_stop_write_threshold_max_size = Some(*c);
615 }
616 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
617 target.enable_optimize_l0_interval_selection = Some(*c);
618 }
619 MutableConfig::VnodeAlignedLevelSizeThreshold(c) => {
620 target.vnode_aligned_level_size_threshold =
621 (*c != u64::MIN && *c != u64::MAX).then_some(*c);
622 }
623 MutableConfig::MaxKvCountForXor16(c) => {
624 target.max_kv_count_for_xor16 = (*c != u64::MIN && *c != u64::MAX).then_some(*c);
625 }
626 }
627 }
628}
629
630impl CompactionGroupTransaction<'_> {
631 pub fn try_create_compaction_groups(
633 &mut self,
634 compaction_group_ids: &[CompactionGroupId],
635 config: Arc<CompactionConfig>,
636 ) -> bool {
637 let mut trivial = true;
638 for id in compaction_group_ids {
639 if self.contains_key(id) {
640 continue;
641 }
642 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
643 self.insert(*id, new_entry);
644
645 trivial = false;
646 }
647
648 !trivial
649 }
650
651 pub fn create_compaction_groups(
652 &mut self,
653 compaction_group_id: CompactionGroupId,
654 config: Arc<CompactionConfig>,
655 ) {
656 self.try_create_compaction_groups(&[compaction_group_id], config);
657 }
658
659 pub(crate) fn try_get_compaction_group_config(
661 &self,
662 compaction_group_id: CompactionGroupId,
663 ) -> Option<&CompactionGroup> {
664 self.get(&compaction_group_id)
665 }
666
667 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
669 let stale_group = self
670 .tree_ref()
671 .keys()
672 .cloned()
673 .filter(|k| !existing_groups.contains(k))
674 .collect_vec();
675 if stale_group.is_empty() {
676 return;
677 }
678 for group in stale_group {
679 self.remove(group);
680 }
681 }
682
683 pub(crate) fn update_compaction_config(
684 &mut self,
685 compaction_group_ids: &[CompactionGroupId],
686 config_to_update: &[MutableConfig],
687 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
688 let mut results = HashMap::default();
689 for compaction_group_id in compaction_group_ids.iter().unique() {
690 let group = self.get(compaction_group_id).ok_or_else(|| {
691 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
692 })?;
693 let mut config = group.compaction_config.as_ref().clone();
694 update_compaction_config(&mut config, config_to_update);
695 if let Err(reason) = validate_compaction_config(&config) {
696 return Err(Error::CompactionGroup(reason));
697 }
698 let mut new_group = group.clone();
699 new_group.compaction_config = Arc::new(config);
700 self.insert(*compaction_group_id, new_group.clone());
701 results.insert(new_group.group_id(), new_group);
702 }
703
704 Ok(results)
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 use std::collections::{BTreeMap, HashSet};
711
712 use itertools::Itertools;
713 use risingwave_common::id::JobId;
714 use risingwave_hummock_sdk::CompactionGroupId;
715 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
716
717 use crate::controller::SqlMetaStore;
718 use crate::hummock::commit_multi_var;
719 use crate::hummock::error::Result;
720 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
721 use crate::hummock::test_utils::setup_compute_env;
722 use crate::model::{Fragment, StreamJobFragments};
723
724 #[tokio::test]
725 async fn test_inner() {
726 let (env, ..) = setup_compute_env(8080).await;
727 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
728 assert_eq!(inner.compaction_groups.len(), 2);
729
730 async fn update_compaction_config(
731 meta: &SqlMetaStore,
732 inner: &mut CompactionGroupManager,
733 cg_ids: &[impl Into<CompactionGroupId> + Copy],
734 config_to_update: &[MutableConfig],
735 ) -> Result<()> {
736 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
737 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
738 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
739 commit_multi_var!(meta, compaction_groups_txn)
740 }
741
742 async fn insert_compaction_group_configs(
743 meta: &SqlMetaStore,
744 inner: &mut CompactionGroupManager,
745 cg_ids: &[u64],
746 ) {
747 let default_config = inner.default_compaction_config();
748 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
749 if compaction_groups_txn.try_create_compaction_groups(
750 &cg_ids.iter().copied().map_into().collect_vec(),
751 default_config,
752 ) {
753 commit_multi_var!(meta, compaction_groups_txn).unwrap();
754 }
755 }
756
757 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
758 .await
759 .unwrap_err();
760 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
761 assert_eq!(inner.compaction_groups.len(), 4);
762 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
763 assert_eq!(inner.compaction_groups.len(), 4);
764
765 update_compaction_config(
766 env.meta_store_ref(),
767 &mut inner,
768 &[100, 200],
769 &[MutableConfig::MaxSubCompaction(123)],
770 )
771 .await
772 .unwrap();
773 assert_eq!(inner.compaction_groups.len(), 4);
774 assert_eq!(
775 inner
776 .try_get_compaction_group_config(100)
777 .unwrap()
778 .compaction_config
779 .max_sub_compaction,
780 123
781 );
782 assert_eq!(
783 inner
784 .try_get_compaction_group_config(200)
785 .unwrap()
786 .compaction_config
787 .max_sub_compaction,
788 123
789 );
790 }
791
792 #[tokio::test]
793 async fn test_manager() {
794 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
795 let table_fragment_1 = StreamJobFragments::for_test(
796 JobId::new(10),
797 BTreeMap::from([(
798 1.into(),
799 Fragment {
800 fragment_id: 1.into(),
801 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
802 ..Default::default()
803 },
804 )]),
805 );
806 let table_fragment_2 = StreamJobFragments::for_test(
807 JobId::new(20),
808 BTreeMap::from([(
809 2.into(),
810 Fragment {
811 fragment_id: 2.into(),
812 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
813 ..Default::default()
814 },
815 )]),
816 );
817
818 let registered_number = || async {
820 compaction_group_manager
821 .list_compaction_group()
822 .await
823 .iter()
824 .map(|cg| cg.member_table_ids.len())
825 .sum::<usize>()
826 };
827 let group_number =
828 || async { compaction_group_manager.list_compaction_group().await.len() };
829 assert_eq!(registered_number().await, 0);
830
831 compaction_group_manager
832 .register_table_fragments(
833 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
834 table_fragment_1
835 .internal_table_ids()
836 .into_iter()
837 .map_into()
838 .collect(),
839 )
840 .await
841 .unwrap();
842 assert_eq!(registered_number().await, 4);
843 compaction_group_manager
844 .register_table_fragments(
845 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
846 table_fragment_2
847 .internal_table_ids()
848 .into_iter()
849 .map_into()
850 .collect(),
851 )
852 .await
853 .unwrap();
854 assert_eq!(registered_number().await, 8);
855
856 compaction_group_manager
858 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
859 .await;
860 assert_eq!(registered_number().await, 4);
861
862 compaction_group_manager
864 .purge(&table_fragment_2.all_table_ids().collect())
865 .await
866 .unwrap();
867 assert_eq!(registered_number().await, 4);
868 compaction_group_manager
869 .purge(&HashSet::new())
870 .await
871 .unwrap();
872 assert_eq!(registered_number().await, 0);
873
874 assert_eq!(group_number().await, 2);
875
876 compaction_group_manager
877 .register_table_fragments(
878 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
879 table_fragment_1
880 .internal_table_ids()
881 .into_iter()
882 .map_into()
883 .collect(),
884 )
885 .await
886 .unwrap();
887 assert_eq!(registered_number().await, 4);
888 assert_eq!(group_number().await, 2);
889
890 compaction_group_manager
891 .unregister_table_fragments_vec(&[table_fragment_1])
892 .await;
893 assert_eq!(registered_number().await, 0);
894 assert_eq!(group_number().await, 2);
895 }
896}