1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<TableId>,
113 mut internal_tables: Vec<TableId>,
114 ) -> Result<()> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
122 }
123 for table_id in internal_tables {
125 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
126 }
127 self.register_table_ids_for_test(&pairs).await?;
128 Ok(())
129 }
130
131 #[cfg(test)]
132 pub async fn unregister_table_fragments_vec(
134 &self,
135 table_fragments: &[crate::model::StreamJobFragments],
136 ) {
137 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
138 .await
139 .unwrap();
140 }
141
142 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
146 let to_unregister = self
147 .versioning
148 .read()
149 .await
150 .current_version
151 .state_table_info
152 .info()
153 .keys()
154 .cloned()
155 .filter(|table_id| !valid_ids.contains(table_id))
156 .collect_vec();
157
158 self.unregister_table_ids(to_unregister).await
161 }
162
163 pub async fn register_table_ids_for_test(
168 &self,
169 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
170 ) -> Result<()> {
171 if pairs.is_empty() {
172 return Ok(());
173 }
174 let mut versioning_guard = self.versioning.write().await;
175 let versioning = versioning_guard.deref_mut();
176 let mut compaction_group_manager = self.compaction_group_manager.write().await;
177 let current_version = &versioning.current_version;
178 let default_config = compaction_group_manager.default_compaction_config();
179 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
180
181 for (table_id, _) in pairs {
182 let table_id = (*table_id).into();
183 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
184 return Err(Error::CompactionGroup(format!(
185 "table {} already {:?}",
186 table_id, info
187 )));
188 }
189 }
190 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
192 let mut version = HummockVersionTransaction::new(
193 &mut versioning.current_version,
194 &mut versioning.hummock_version_deltas,
195 self.env.notification_manager(),
196 None,
197 &self.metrics,
198 );
199 let mut new_version_delta = version.new_delta();
200
201 let committed_epoch = new_version_delta
202 .latest_version()
203 .state_table_info
204 .info()
205 .values()
206 .map(|info| info.committed_epoch)
207 .max()
208 .unwrap_or(INVALID_EPOCH);
209
210 for (table_id, raw_group_id) in pairs {
211 let table_id = (*table_id).into();
212 let mut group_id = *raw_group_id;
213 if group_id == StaticCompactionGroupId::NewCompactionGroup {
214 let mut is_group_init = false;
215 group_id = *new_compaction_group_id
216 .get_or_try_init(|| async {
217 next_compaction_group_id(&self.env).await.inspect(|_| {
218 is_group_init = true;
219 })
220 })
221 .await?;
222 if is_group_init {
223 let group_deltas = &mut new_version_delta
224 .group_deltas
225 .entry(group_id)
226 .or_default()
227 .group_deltas;
228
229 let config =
230 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
231 Some(config) => config.compaction_config.as_ref().clone(),
232 None => {
233 compaction_groups_txn
234 .create_compaction_groups(group_id, default_config.clone());
235 default_config.as_ref().clone()
236 }
237 };
238
239 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
240 group_config: Some(config),
241 group_id,
242 ..Default::default()
243 }));
244
245 group_deltas.push(group_delta);
246 }
247 }
248 assert!(
249 new_version_delta
250 .state_table_info_delta
251 .insert(
252 table_id,
253 PbStateTableInfoDelta {
254 committed_epoch,
255 compaction_group_id: *raw_group_id,
256 }
257 )
258 .is_none()
259 );
260 }
261 new_version_delta.pre_apply();
262 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
263
264 Ok(())
265 }
266
267 pub async fn unregister_table_ids(
268 &self,
269 table_ids: impl IntoIterator<Item = TableId>,
270 ) -> Result<()> {
271 let table_ids = table_ids.into_iter().collect_vec();
272 if table_ids.is_empty() {
273 return Ok(());
274 }
275
276 {
277 let mut table_write_throughput_statistic_manager =
281 self.table_write_throughput_statistic_manager.write();
282 for &table_id in table_ids.iter().unique() {
283 table_write_throughput_statistic_manager.remove_table(table_id);
284 }
285 }
286
287 let mut versioning_guard = self.versioning.write().await;
288 let versioning = versioning_guard.deref_mut();
289 let mut version = HummockVersionTransaction::new(
290 &mut versioning.current_version,
291 &mut versioning.hummock_version_deltas,
292 self.env.notification_manager(),
293 None,
294 &self.metrics,
295 );
296 let mut new_version_delta = version.new_delta();
297 let mut modified_groups: HashMap<CompactionGroupId, u64> =
298 HashMap::new();
299 for table_id in table_ids.into_iter().unique() {
301 let version = new_version_delta.latest_version();
302 let Some(info) = version.state_table_info.info().get(&table_id) else {
303 continue;
304 };
305
306 modified_groups
307 .entry(info.compaction_group_id)
308 .and_modify(|count| *count -= 1)
309 .or_insert(
310 version
311 .state_table_info
312 .compaction_group_member_tables()
313 .get(&info.compaction_group_id)
314 .expect("should exist")
315 .len() as u64
316 - 1,
317 );
318 new_version_delta.removed_table_ids.insert(table_id);
319 }
320
321 let groups_to_remove = modified_groups
322 .into_iter()
323 .filter_map(|(group_id, member_count)| {
324 if member_count == 0 && group_id > StaticCompactionGroupId::End {
325 return Some((
326 group_id,
327 new_version_delta
328 .latest_version()
329 .get_compaction_group_levels(group_id)
330 .levels
331 .len(),
332 ));
333 }
334 None
335 })
336 .collect_vec();
337 for (group_id, _) in &groups_to_remove {
338 let group_deltas = &mut new_version_delta
339 .group_deltas
340 .entry(*group_id)
341 .or_default()
342 .group_deltas;
343
344 let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
345 group_deltas.push(group_delta);
346 }
347
348 for (group_id, max_level) in groups_to_remove {
349 remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
350 self.compaction_state.remove_compaction_group(group_id);
352 }
353
354 new_version_delta.pre_apply();
355
356 let mut compaction_group_manager = self.compaction_group_manager.write().await;
359 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
360
361 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
362 version.latest_version(),
363 )));
364 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
365
366 Ok(())
368 }
369
370 pub async fn update_compaction_config(
371 &self,
372 compaction_group_ids: &[CompactionGroupId],
373 config_to_update: &[MutableConfig],
374 ) -> Result<()> {
375 {
376 let mut compaction_group_manager = self.compaction_group_manager.write().await;
378 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
379 compaction_groups_txn
380 .update_compaction_config(compaction_group_ids, config_to_update)?;
381 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
382 }
383
384 if config_to_update
385 .iter()
386 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
387 {
388 self.try_update_write_limits(compaction_group_ids).await;
390 }
391
392 Ok(())
393 }
394
395 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
398 let mut versioning_guard = self.versioning.write().await;
399 let versioning = versioning_guard.deref_mut();
400 let current_version = &versioning.current_version;
401 let mut results = vec![];
402 let compaction_group_manager = self.compaction_group_manager.read().await;
403
404 for levels in current_version.levels.values() {
405 let compaction_config = compaction_group_manager
406 .try_get_compaction_group_config(levels.group_id)
407 .unwrap()
408 .compaction_config
409 .as_ref()
410 .clone();
411 let group = CompactionGroupInfo {
412 id: levels.group_id,
413 parent_id: levels.parent_group_id,
414 member_table_ids: current_version
415 .state_table_info
416 .compaction_group_member_table_ids(levels.group_id)
417 .iter()
418 .copied()
419 .collect_vec(),
420 compaction_config: Some(compaction_config),
421 };
422 results.push(group);
423 }
424 results
425 }
426
427 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
428 let mut infos = vec![];
429 {
430 let versioning_guard = self.versioning.read().await;
431 let manager = self.compaction_group_manager.read().await;
432 let version = &versioning_guard.current_version;
433 for group_id in version.levels.keys() {
434 let mut group_info = CompactionGroupStatistic {
435 group_id: *group_id,
436 ..Default::default()
437 };
438 for table_id in version
439 .state_table_info
440 .compaction_group_member_table_ids(*group_id)
441 {
442 let stats_size = versioning_guard
443 .version_stats
444 .table_stats
445 .get(table_id)
446 .map(|stats| stats.total_key_size + stats.total_value_size)
447 .unwrap_or(0);
448 let table_size = std::cmp::max(stats_size, 0) as u64;
449 group_info.group_size += table_size;
450 group_info.table_statistic.insert(*table_id, table_size);
451 group_info.compaction_group_config =
452 manager.try_get_compaction_group_config(*group_id).unwrap();
453 }
454 infos.push(group_info);
455 }
456 };
457 infos
458 }
459
460 pub(crate) async fn initial_compaction_group_config_after_load(
461 &self,
462 versioning_guard: &Versioning,
463 compaction_group_manager: &mut CompactionGroupManager,
464 ) -> Result<()> {
465 let current_version = &versioning_guard.current_version;
467 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
468 let default_config = compaction_group_manager.default_compaction_config();
469 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
470 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
471 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
472
473 Ok(())
474 }
475}
476
477pub(crate) struct CompactionGroupManager {
485 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
486 default_config: Arc<CompactionConfig>,
487 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
489}
490
491impl CompactionGroupManager {
492 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
494 CompactionGroupTransaction::new(&mut self.compaction_groups)
495 }
496
497 #[expect(clippy::type_complexity)]
498 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
499 inner: P,
500 ) -> BTreeMapTransactionInner<
501 CompactionGroupId,
502 CompactionGroup,
503 DerefMutForward<
504 Self,
505 BTreeMap<CompactionGroupId, CompactionGroup>,
506 P,
507 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
508 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
509 >,
510 > {
511 BTreeMapTransactionInner::new(DerefMutForward::new(
512 inner,
513 |mgr| &mgr.compaction_groups,
514 |mgr| &mut mgr.compaction_groups,
515 ))
516 }
517
518 pub(crate) fn try_get_compaction_group_config(
520 &self,
521 compaction_group_id: impl Into<CompactionGroupId>,
522 ) -> Option<CompactionGroup> {
523 self.compaction_groups
524 .get(&compaction_group_id.into())
525 .cloned()
526 }
527
528 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
530 self.default_config.clone()
531 }
532}
533
534fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
535 for item in items {
536 match item {
537 MutableConfig::MaxBytesForLevelBase(c) => {
538 target.max_bytes_for_level_base = *c;
539 }
540 MutableConfig::MaxBytesForLevelMultiplier(c) => {
541 target.max_bytes_for_level_multiplier = *c;
542 }
543 MutableConfig::MaxCompactionBytes(c) => {
544 target.max_compaction_bytes = *c;
545 }
546 MutableConfig::SubLevelMaxCompactionBytes(c) => {
547 target.sub_level_max_compaction_bytes = *c;
548 }
549 MutableConfig::Level0TierCompactFileNumber(c) => {
550 target.level0_tier_compact_file_number = *c;
551 }
552 MutableConfig::TargetFileSizeBase(c) => {
553 target.target_file_size_base = *c;
554 }
555 MutableConfig::CompactionFilterMask(c) => {
556 target.compaction_filter_mask = *c;
557 }
558 MutableConfig::MaxSubCompaction(c) => {
559 target.max_sub_compaction = *c;
560 }
561 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
562 target.level0_stop_write_threshold_sub_level_number = *c;
563 }
564 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
565 target.level0_sub_level_compact_level_count = *c;
566 }
567 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
568 target.level0_overlapping_sub_level_compact_level_count = *c;
569 }
570 MutableConfig::MaxSpaceReclaimBytes(c) => {
571 target.max_space_reclaim_bytes = *c;
572 }
573 MutableConfig::Level0MaxCompactFileNumber(c) => {
574 target.level0_max_compact_file_number = *c;
575 }
576 MutableConfig::EnableEmergencyPicker(c) => {
577 target.enable_emergency_picker = *c;
578 }
579 MutableConfig::TombstoneReclaimRatio(c) => {
580 target.tombstone_reclaim_ratio = *c;
581 }
582 MutableConfig::CompressionAlgorithm(c) => {
583 target.compression_algorithm[c.get_level() as usize]
584 .clone_from(&c.compression_algorithm);
585 }
586 MutableConfig::MaxL0CompactLevelCount(c) => {
587 target.max_l0_compact_level_count = Some(*c);
588 }
589 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
590 target.sst_allowed_trivial_move_min_size = Some(*c);
591 }
592 MutableConfig::SplitWeightByVnode(c) => {
593 target.split_weight_by_vnode = *c;
594 }
595 MutableConfig::DisableAutoGroupScheduling(c) => {
596 target.disable_auto_group_scheduling = Some(*c);
597 }
598 MutableConfig::MaxOverlappingLevelSize(c) => {
599 target.max_overlapping_level_size = Some(*c);
600 }
601 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
602 target.sst_allowed_trivial_move_max_count = Some(*c);
603 }
604 MutableConfig::EmergencyLevel0SstFileCount(c) => {
605 target.emergency_level0_sst_file_count = Some(*c);
606 }
607 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
608 target.emergency_level0_sub_level_partition = Some(*c);
609 }
610 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
611 target.level0_stop_write_threshold_max_sst_count = Some(*c);
612 }
613 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
614 target.level0_stop_write_threshold_max_size = Some(*c);
615 }
616 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
617 target.enable_optimize_l0_interval_selection = Some(*c);
618 }
619 #[allow(deprecated)]
620 MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
621 }
623 MutableConfig::MaxKvCountForXor16(c) => {
624 target.max_kv_count_for_xor16 = (*c != u64::MIN && *c != u64::MAX).then_some(*c);
625 }
626 MutableConfig::MaxVnodeKeyRangeBytes(c) => {
627 target.max_vnode_key_range_bytes = (*c > 0).then_some(*c);
628 }
629 }
630 }
631}
632
633impl CompactionGroupTransaction<'_> {
634 pub fn try_create_compaction_groups(
636 &mut self,
637 compaction_group_ids: &[CompactionGroupId],
638 config: Arc<CompactionConfig>,
639 ) -> bool {
640 let mut trivial = true;
641 for id in compaction_group_ids {
642 if self.contains_key(id) {
643 continue;
644 }
645 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
646 self.insert(*id, new_entry);
647
648 trivial = false;
649 }
650
651 !trivial
652 }
653
654 pub fn create_compaction_groups(
655 &mut self,
656 compaction_group_id: CompactionGroupId,
657 config: Arc<CompactionConfig>,
658 ) {
659 self.try_create_compaction_groups(&[compaction_group_id], config);
660 }
661
662 pub(crate) fn try_get_compaction_group_config(
664 &self,
665 compaction_group_id: CompactionGroupId,
666 ) -> Option<&CompactionGroup> {
667 self.get(&compaction_group_id)
668 }
669
670 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
672 let stale_group = self
673 .tree_ref()
674 .keys()
675 .cloned()
676 .filter(|k| !existing_groups.contains(k))
677 .collect_vec();
678 if stale_group.is_empty() {
679 return;
680 }
681 for group in stale_group {
682 self.remove(group);
683 }
684 }
685
686 pub(crate) fn update_compaction_config(
687 &mut self,
688 compaction_group_ids: &[CompactionGroupId],
689 config_to_update: &[MutableConfig],
690 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
691 let mut results = HashMap::default();
692 for compaction_group_id in compaction_group_ids.iter().unique() {
693 let group = self.get(compaction_group_id).ok_or_else(|| {
694 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
695 })?;
696 let mut config = group.compaction_config.as_ref().clone();
697 update_compaction_config(&mut config, config_to_update);
698 if let Err(reason) = validate_compaction_config(&config) {
699 return Err(Error::CompactionGroup(reason));
700 }
701 let mut new_group = group.clone();
702 new_group.compaction_config = Arc::new(config);
703 self.insert(*compaction_group_id, new_group.clone());
704 results.insert(new_group.group_id(), new_group);
705 }
706
707 Ok(results)
708 }
709}
710
711#[cfg(test)]
712mod tests {
713 use std::collections::{BTreeMap, HashSet};
714
715 use itertools::Itertools;
716 use risingwave_common::id::JobId;
717 use risingwave_hummock_sdk::CompactionGroupId;
718 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
719
720 use crate::controller::SqlMetaStore;
721 use crate::hummock::commit_multi_var;
722 use crate::hummock::error::Result;
723 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
724 use crate::hummock::test_utils::setup_compute_env;
725 use crate::model::{Fragment, StreamJobFragments};
726
727 #[tokio::test]
728 async fn test_inner() {
729 let (env, ..) = setup_compute_env(8080).await;
730 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
731 assert_eq!(inner.compaction_groups.len(), 2);
732
733 async fn update_compaction_config(
734 meta: &SqlMetaStore,
735 inner: &mut CompactionGroupManager,
736 cg_ids: &[impl Into<CompactionGroupId> + Copy],
737 config_to_update: &[MutableConfig],
738 ) -> Result<()> {
739 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
740 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
741 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
742 commit_multi_var!(meta, compaction_groups_txn)
743 }
744
745 async fn insert_compaction_group_configs(
746 meta: &SqlMetaStore,
747 inner: &mut CompactionGroupManager,
748 cg_ids: &[u64],
749 ) {
750 let default_config = inner.default_compaction_config();
751 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
752 if compaction_groups_txn.try_create_compaction_groups(
753 &cg_ids.iter().copied().map_into().collect_vec(),
754 default_config,
755 ) {
756 commit_multi_var!(meta, compaction_groups_txn).unwrap();
757 }
758 }
759
760 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
761 .await
762 .unwrap_err();
763 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
764 assert_eq!(inner.compaction_groups.len(), 4);
765 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
766 assert_eq!(inner.compaction_groups.len(), 4);
767
768 update_compaction_config(
769 env.meta_store_ref(),
770 &mut inner,
771 &[100, 200],
772 &[MutableConfig::MaxSubCompaction(123)],
773 )
774 .await
775 .unwrap();
776 assert_eq!(inner.compaction_groups.len(), 4);
777 assert_eq!(
778 inner
779 .try_get_compaction_group_config(100)
780 .unwrap()
781 .compaction_config
782 .max_sub_compaction,
783 123
784 );
785 assert_eq!(
786 inner
787 .try_get_compaction_group_config(200)
788 .unwrap()
789 .compaction_config
790 .max_sub_compaction,
791 123
792 );
793 }
794
795 #[tokio::test]
796 async fn test_manager() {
797 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
798 let table_fragment_1 = StreamJobFragments::for_test(
799 JobId::new(10),
800 BTreeMap::from([(
801 1.into(),
802 Fragment {
803 fragment_id: 1.into(),
804 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
805 ..Default::default()
806 },
807 )]),
808 );
809 let table_fragment_2 = StreamJobFragments::for_test(
810 JobId::new(20),
811 BTreeMap::from([(
812 2.into(),
813 Fragment {
814 fragment_id: 2.into(),
815 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
816 ..Default::default()
817 },
818 )]),
819 );
820
821 let registered_number = || async {
823 compaction_group_manager
824 .list_compaction_group()
825 .await
826 .iter()
827 .map(|cg| cg.member_table_ids.len())
828 .sum::<usize>()
829 };
830 let group_number =
831 || async { compaction_group_manager.list_compaction_group().await.len() };
832 assert_eq!(registered_number().await, 0);
833
834 compaction_group_manager
835 .register_table_fragments(
836 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
837 table_fragment_1
838 .internal_table_ids()
839 .into_iter()
840 .map_into()
841 .collect(),
842 )
843 .await
844 .unwrap();
845 assert_eq!(registered_number().await, 4);
846 compaction_group_manager
847 .register_table_fragments(
848 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
849 table_fragment_2
850 .internal_table_ids()
851 .into_iter()
852 .map_into()
853 .collect(),
854 )
855 .await
856 .unwrap();
857 assert_eq!(registered_number().await, 8);
858
859 compaction_group_manager
861 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
862 .await;
863 assert_eq!(registered_number().await, 4);
864
865 compaction_group_manager
867 .purge(&table_fragment_2.all_table_ids().collect())
868 .await
869 .unwrap();
870 assert_eq!(registered_number().await, 4);
871 compaction_group_manager
872 .purge(&HashSet::new())
873 .await
874 .unwrap();
875 assert_eq!(registered_number().await, 0);
876
877 assert_eq!(group_number().await, 2);
878
879 compaction_group_manager
880 .register_table_fragments(
881 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
882 table_fragment_1
883 .internal_table_ids()
884 .into_iter()
885 .map_into()
886 .collect(),
887 )
888 .await
889 .unwrap();
890 assert_eq!(registered_number().await, 4);
891 assert_eq!(group_number().await, 2);
892
893 compaction_group_manager
894 .unregister_table_fragments_vec(&[table_fragment_1])
895 .await;
896 assert_eq!(registered_number().await, 0);
897 assert_eq!(group_number().await, 2);
898 }
899}