1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<TableId>,
113 mut internal_tables: Vec<TableId>,
114 ) -> Result<()> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((mv_table, StaticCompactionGroupId::MaterializedView));
122 }
123 for table_id in internal_tables {
125 pairs.push((table_id, StaticCompactionGroupId::StateDefault));
126 }
127 self.register_table_ids_for_test(&pairs).await?;
128 Ok(())
129 }
130
131 #[cfg(test)]
132 pub async fn unregister_table_fragments_vec(
134 &self,
135 table_fragments: &[crate::model::StreamJobFragments],
136 ) {
137 self.unregister_table_ids(table_fragments.iter().flat_map(|t| t.all_table_ids()))
138 .await
139 .unwrap();
140 }
141
142 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
146 let to_unregister = self
147 .versioning
148 .read()
149 .await
150 .current_version
151 .state_table_info
152 .info()
153 .keys()
154 .cloned()
155 .filter(|table_id| !valid_ids.contains(table_id))
156 .collect_vec();
157
158 self.unregister_table_ids(to_unregister).await
161 }
162
163 pub async fn register_table_ids_for_test(
168 &self,
169 pairs: &[(impl Into<TableId> + Copy, CompactionGroupId)],
170 ) -> Result<()> {
171 if pairs.is_empty() {
172 return Ok(());
173 }
174 let mut versioning_guard = self.versioning.write().await;
175 let versioning = versioning_guard.deref_mut();
176 let mut compaction_group_manager = self.compaction_group_manager.write().await;
177 let current_version = &versioning.current_version;
178 let default_config = compaction_group_manager.default_compaction_config();
179 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
180
181 for (table_id, _) in pairs {
182 let table_id = (*table_id).into();
183 if let Some(info) = current_version.state_table_info.info().get(&table_id) {
184 return Err(Error::CompactionGroup(format!(
185 "table {} already {:?}",
186 table_id, info
187 )));
188 }
189 }
190 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
192 let mut version = HummockVersionTransaction::new(
193 &mut versioning.current_version,
194 &mut versioning.hummock_version_deltas,
195 &mut versioning.table_change_log,
196 self.env.notification_manager(),
197 None,
198 &self.metrics,
199 &self.env.opts,
200 );
201 let mut new_version_delta = version.new_delta();
202
203 let committed_epoch = new_version_delta
204 .latest_version()
205 .state_table_info
206 .info()
207 .values()
208 .map(|info| info.committed_epoch)
209 .max()
210 .unwrap_or(INVALID_EPOCH);
211
212 for (table_id, raw_group_id) in pairs {
213 let table_id = (*table_id).into();
214 let mut group_id = *raw_group_id;
215 if group_id == StaticCompactionGroupId::NewCompactionGroup {
216 let mut is_group_init = false;
217 group_id = *new_compaction_group_id
218 .get_or_try_init(|| async {
219 next_compaction_group_id(&self.env).await.inspect(|_| {
220 is_group_init = true;
221 })
222 })
223 .await?;
224 if is_group_init {
225 let group_deltas = &mut new_version_delta
226 .group_deltas
227 .entry(group_id)
228 .or_default()
229 .group_deltas;
230
231 let config =
232 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
233 Some(config) => config.compaction_config.as_ref().clone(),
234 None => {
235 compaction_groups_txn
236 .create_compaction_groups(group_id, default_config.clone());
237 default_config.as_ref().clone()
238 }
239 };
240
241 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
242 group_config: Some(config),
243 group_id,
244 ..Default::default()
245 }));
246
247 group_deltas.push(group_delta);
248 }
249 }
250 assert!(
251 new_version_delta
252 .state_table_info_delta
253 .insert(
254 table_id,
255 PbStateTableInfoDelta {
256 committed_epoch,
257 compaction_group_id: *raw_group_id,
258 }
259 )
260 .is_none()
261 );
262 }
263 new_version_delta.pre_apply();
264 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
265
266 Ok(())
267 }
268
269 pub async fn unregister_table_ids(
270 &self,
271 table_ids: impl IntoIterator<Item = TableId>,
272 ) -> Result<()> {
273 let table_ids = table_ids.into_iter().collect_vec();
274 if table_ids.is_empty() {
275 return Ok(());
276 }
277
278 {
279 let mut table_write_throughput_statistic_manager =
283 self.table_write_throughput_statistic_manager.write();
284 for &table_id in table_ids.iter().unique() {
285 table_write_throughput_statistic_manager.remove_table(table_id);
286 }
287 }
288
289 let mut versioning_guard = self.versioning.write().await;
290 let versioning = versioning_guard.deref_mut();
291 let mut version = HummockVersionTransaction::new(
292 &mut versioning.current_version,
293 &mut versioning.hummock_version_deltas,
294 &mut versioning.table_change_log,
295 self.env.notification_manager(),
296 None,
297 &self.metrics,
298 &self.env.opts,
299 );
300 let mut new_version_delta = version.new_delta();
301 let mut modified_groups: HashMap<CompactionGroupId, u64> =
302 HashMap::new();
303 for table_id in table_ids.into_iter().unique() {
305 let version = new_version_delta.latest_version();
306 let Some(info) = version.state_table_info.info().get(&table_id) else {
307 continue;
308 };
309
310 modified_groups
311 .entry(info.compaction_group_id)
312 .and_modify(|count| *count -= 1)
313 .or_insert(
314 version
315 .state_table_info
316 .compaction_group_member_tables()
317 .get(&info.compaction_group_id)
318 .expect("should exist")
319 .len() as u64
320 - 1,
321 );
322 new_version_delta.removed_table_ids.insert(table_id);
323 }
324
325 let groups_to_remove = modified_groups
326 .into_iter()
327 .filter_map(|(group_id, member_count)| {
328 if member_count == 0 && group_id > StaticCompactionGroupId::End {
329 return Some((
330 group_id,
331 new_version_delta
332 .latest_version()
333 .get_compaction_group_levels(group_id)
334 .levels
335 .len(),
336 ));
337 }
338 None
339 })
340 .collect_vec();
341 for (group_id, _) in &groups_to_remove {
342 let group_deltas = &mut new_version_delta
343 .group_deltas
344 .entry(*group_id)
345 .or_default()
346 .group_deltas;
347
348 let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
349 group_deltas.push(group_delta);
350 }
351
352 for (group_id, max_level) in groups_to_remove {
353 remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
354 self.compaction_state.remove_compaction_group(group_id);
356 }
357
358 new_version_delta.pre_apply();
359
360 let mut compaction_group_manager = self.compaction_group_manager.write().await;
363 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
364
365 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
366 version.latest_version(),
367 )));
368 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
369
370 Ok(())
372 }
373
374 pub async fn update_compaction_config(
375 &self,
376 compaction_group_ids: &[CompactionGroupId],
377 config_to_update: &[MutableConfig],
378 ) -> Result<()> {
379 {
380 let mut compaction_group_manager = self.compaction_group_manager.write().await;
382 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
383 compaction_groups_txn
384 .update_compaction_config(compaction_group_ids, config_to_update)?;
385 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
386 }
387
388 if config_to_update
389 .iter()
390 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
391 {
392 self.try_update_write_limits(compaction_group_ids).await;
394 }
395
396 Ok(())
397 }
398
399 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
402 let mut versioning_guard = self.versioning.write().await;
403 let versioning = versioning_guard.deref_mut();
404 let current_version = &versioning.current_version;
405 let mut results = vec![];
406 let compaction_group_manager = self.compaction_group_manager.read().await;
407
408 for levels in current_version.levels.values() {
409 let compaction_config = compaction_group_manager
410 .try_get_compaction_group_config(levels.group_id)
411 .unwrap()
412 .compaction_config
413 .as_ref()
414 .clone();
415 let group = CompactionGroupInfo {
416 id: levels.group_id,
417 parent_id: levels.parent_group_id,
418 member_table_ids: current_version
419 .state_table_info
420 .compaction_group_member_table_ids(levels.group_id)
421 .iter()
422 .copied()
423 .collect_vec(),
424 compaction_config: Some(compaction_config),
425 };
426 results.push(group);
427 }
428 results
429 }
430
431 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
432 let mut infos = vec![];
433 {
434 let versioning_guard = self.versioning.read().await;
435 let manager = self.compaction_group_manager.read().await;
436 let version = &versioning_guard.current_version;
437 for group_id in version.levels.keys() {
438 let mut group_info = CompactionGroupStatistic {
439 group_id: *group_id,
440 ..Default::default()
441 };
442 for table_id in version
443 .state_table_info
444 .compaction_group_member_table_ids(*group_id)
445 {
446 let stats_size = versioning_guard
447 .version_stats
448 .table_stats
449 .get(table_id)
450 .map(|stats| stats.total_key_size + stats.total_value_size)
451 .unwrap_or(0);
452 let table_size = std::cmp::max(stats_size, 0) as u64;
453 group_info.group_size += table_size;
454 group_info.table_statistic.insert(*table_id, table_size);
455 group_info.compaction_group_config =
456 manager.try_get_compaction_group_config(*group_id).unwrap();
457 }
458 infos.push(group_info);
459 }
460 };
461 infos
462 }
463
464 pub(crate) async fn initial_compaction_group_config_after_load(
465 &self,
466 versioning_guard: &Versioning,
467 compaction_group_manager: &mut CompactionGroupManager,
468 ) -> Result<()> {
469 let current_version = &versioning_guard.current_version;
471 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
472 let default_config = compaction_group_manager.default_compaction_config();
473 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
474 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
475 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
476
477 Ok(())
478 }
479}
480
481pub(crate) struct CompactionGroupManager {
489 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
490 default_config: Arc<CompactionConfig>,
491 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
493}
494
495impl CompactionGroupManager {
496 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
498 CompactionGroupTransaction::new(&mut self.compaction_groups)
499 }
500
501 #[expect(clippy::type_complexity)]
502 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
503 inner: P,
504 ) -> BTreeMapTransactionInner<
505 CompactionGroupId,
506 CompactionGroup,
507 DerefMutForward<
508 Self,
509 BTreeMap<CompactionGroupId, CompactionGroup>,
510 P,
511 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
512 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
513 >,
514 > {
515 BTreeMapTransactionInner::new(DerefMutForward::new(
516 inner,
517 |mgr| &mgr.compaction_groups,
518 |mgr| &mut mgr.compaction_groups,
519 ))
520 }
521
522 pub(crate) fn try_get_compaction_group_config(
524 &self,
525 compaction_group_id: impl Into<CompactionGroupId>,
526 ) -> Option<CompactionGroup> {
527 self.compaction_groups
528 .get(&compaction_group_id.into())
529 .cloned()
530 }
531
532 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
534 self.default_config.clone()
535 }
536}
537
538fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
539 for item in items {
540 match item {
541 MutableConfig::MaxBytesForLevelBase(c) => {
542 target.max_bytes_for_level_base = *c;
543 }
544 MutableConfig::MaxBytesForLevelMultiplier(c) => {
545 target.max_bytes_for_level_multiplier = *c;
546 }
547 MutableConfig::MaxCompactionBytes(c) => {
548 target.max_compaction_bytes = *c;
549 }
550 MutableConfig::SubLevelMaxCompactionBytes(c) => {
551 target.sub_level_max_compaction_bytes = *c;
552 }
553 MutableConfig::Level0TierCompactFileNumber(c) => {
554 target.level0_tier_compact_file_number = *c;
555 }
556 MutableConfig::TargetFileSizeBase(c) => {
557 target.target_file_size_base = *c;
558 }
559 MutableConfig::CompactionFilterMask(c) => {
560 target.compaction_filter_mask = *c;
561 }
562 MutableConfig::MaxSubCompaction(c) => {
563 target.max_sub_compaction = *c;
564 }
565 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
566 target.level0_stop_write_threshold_sub_level_number = *c;
567 }
568 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
569 target.level0_sub_level_compact_level_count = *c;
570 }
571 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
572 target.level0_overlapping_sub_level_compact_level_count = *c;
573 }
574 MutableConfig::MaxSpaceReclaimBytes(c) => {
575 target.max_space_reclaim_bytes = *c;
576 }
577 MutableConfig::Level0MaxCompactFileNumber(c) => {
578 target.level0_max_compact_file_number = *c;
579 }
580 MutableConfig::EnableEmergencyPicker(c) => {
581 target.enable_emergency_picker = *c;
582 }
583 MutableConfig::TombstoneReclaimRatio(c) => {
584 target.tombstone_reclaim_ratio = *c;
585 }
586 MutableConfig::CompressionAlgorithm(c) => {
587 target.compression_algorithm[c.get_level() as usize]
588 .clone_from(&c.compression_algorithm);
589 }
590 MutableConfig::MaxL0CompactLevelCount(c) => {
591 target.max_l0_compact_level_count = Some(*c);
592 }
593 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
594 target.sst_allowed_trivial_move_min_size = Some(*c);
595 }
596 MutableConfig::SplitWeightByVnode(c) => {
597 target.split_weight_by_vnode = *c;
598 }
599 MutableConfig::DisableAutoGroupScheduling(c) => {
600 target.disable_auto_group_scheduling = Some(*c);
601 }
602 MutableConfig::MaxOverlappingLevelSize(c) => {
603 target.max_overlapping_level_size = Some(*c);
604 }
605 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
606 target.sst_allowed_trivial_move_max_count = Some(*c);
607 }
608 MutableConfig::EmergencyLevel0SstFileCount(c) => {
609 target.emergency_level0_sst_file_count = Some(*c);
610 }
611 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
612 target.emergency_level0_sub_level_partition = Some(*c);
613 }
614 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
615 target.level0_stop_write_threshold_max_sst_count = Some(*c);
616 }
617 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
618 target.level0_stop_write_threshold_max_size = Some(*c);
619 }
620 MutableConfig::EnableOptimizeL0IntervalSelection(c) => {
621 target.enable_optimize_l0_interval_selection = Some(*c);
622 }
623 #[allow(deprecated)]
624 MutableConfig::VnodeAlignedLevelSizeThreshold(_) => {
625 }
627 MutableConfig::MaxKvCountForXor16(c) => {
628 target.max_kv_count_for_xor16 = (*c != u64::MIN && *c != u64::MAX).then_some(*c);
629 }
630 MutableConfig::MaxVnodeKeyRangeBytes(c) => {
631 target.max_vnode_key_range_bytes = (*c > 0).then_some(*c);
632 }
633 }
634 }
635}
636
637impl CompactionGroupTransaction<'_> {
638 pub fn try_create_compaction_groups(
640 &mut self,
641 compaction_group_ids: &[CompactionGroupId],
642 config: Arc<CompactionConfig>,
643 ) -> bool {
644 let mut trivial = true;
645 for id in compaction_group_ids {
646 if self.contains_key(id) {
647 continue;
648 }
649 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
650 self.insert(*id, new_entry);
651
652 trivial = false;
653 }
654
655 !trivial
656 }
657
658 pub fn create_compaction_groups(
659 &mut self,
660 compaction_group_id: CompactionGroupId,
661 config: Arc<CompactionConfig>,
662 ) {
663 self.try_create_compaction_groups(&[compaction_group_id], config);
664 }
665
666 pub(crate) fn try_get_compaction_group_config(
668 &self,
669 compaction_group_id: CompactionGroupId,
670 ) -> Option<&CompactionGroup> {
671 self.get(&compaction_group_id)
672 }
673
674 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
676 let stale_group = self
677 .tree_ref()
678 .keys()
679 .cloned()
680 .filter(|k| !existing_groups.contains(k))
681 .collect_vec();
682 if stale_group.is_empty() {
683 return;
684 }
685 for group in stale_group {
686 self.remove(group);
687 }
688 }
689
690 pub(crate) fn update_compaction_config(
691 &mut self,
692 compaction_group_ids: &[CompactionGroupId],
693 config_to_update: &[MutableConfig],
694 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
695 let mut results = HashMap::default();
696 for compaction_group_id in compaction_group_ids.iter().unique() {
697 let group = self.get(compaction_group_id).ok_or_else(|| {
698 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
699 })?;
700 let mut config = group.compaction_config.as_ref().clone();
701 update_compaction_config(&mut config, config_to_update);
702 if let Err(reason) = validate_compaction_config(&config) {
703 return Err(Error::CompactionGroup(reason));
704 }
705 let mut new_group = group.clone();
706 new_group.compaction_config = Arc::new(config);
707 self.insert(*compaction_group_id, new_group.clone());
708 results.insert(new_group.group_id(), new_group);
709 }
710
711 Ok(results)
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use std::collections::{BTreeMap, HashSet};
718
719 use itertools::Itertools;
720 use risingwave_common::id::JobId;
721 use risingwave_hummock_sdk::CompactionGroupId;
722 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
723
724 use crate::controller::SqlMetaStore;
725 use crate::hummock::commit_multi_var;
726 use crate::hummock::error::Result;
727 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
728 use crate::hummock::test_utils::setup_compute_env;
729 use crate::model::{Fragment, StreamJobFragments};
730
731 #[tokio::test]
732 async fn test_inner() {
733 let (env, ..) = setup_compute_env(8080).await;
734 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
735 assert_eq!(inner.compaction_groups.len(), 2);
736
737 async fn update_compaction_config(
738 meta: &SqlMetaStore,
739 inner: &mut CompactionGroupManager,
740 cg_ids: &[impl Into<CompactionGroupId> + Copy],
741 config_to_update: &[MutableConfig],
742 ) -> Result<()> {
743 let cg_ids = cg_ids.iter().copied().map_into().collect_vec();
744 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
745 compaction_groups_txn.update_compaction_config(&cg_ids, config_to_update)?;
746 commit_multi_var!(meta, compaction_groups_txn)
747 }
748
749 async fn insert_compaction_group_configs(
750 meta: &SqlMetaStore,
751 inner: &mut CompactionGroupManager,
752 cg_ids: &[u64],
753 ) {
754 let default_config = inner.default_compaction_config();
755 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
756 if compaction_groups_txn.try_create_compaction_groups(
757 &cg_ids.iter().copied().map_into().collect_vec(),
758 default_config,
759 ) {
760 commit_multi_var!(meta, compaction_groups_txn).unwrap();
761 }
762 }
763
764 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
765 .await
766 .unwrap_err();
767 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
768 assert_eq!(inner.compaction_groups.len(), 4);
769 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
770 assert_eq!(inner.compaction_groups.len(), 4);
771
772 update_compaction_config(
773 env.meta_store_ref(),
774 &mut inner,
775 &[100, 200],
776 &[MutableConfig::MaxSubCompaction(123)],
777 )
778 .await
779 .unwrap();
780 assert_eq!(inner.compaction_groups.len(), 4);
781 assert_eq!(
782 inner
783 .try_get_compaction_group_config(100)
784 .unwrap()
785 .compaction_config
786 .max_sub_compaction,
787 123
788 );
789 assert_eq!(
790 inner
791 .try_get_compaction_group_config(200)
792 .unwrap()
793 .compaction_config
794 .max_sub_compaction,
795 123
796 );
797 }
798
799 #[tokio::test]
800 async fn test_manager() {
801 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
802 let table_fragment_1 = StreamJobFragments::for_test(
803 JobId::new(10),
804 BTreeMap::from([(
805 1.into(),
806 Fragment {
807 fragment_id: 1.into(),
808 state_table_ids: vec![10.into(), 11.into(), 12.into(), 13.into()],
809 ..Default::default()
810 },
811 )]),
812 );
813 let table_fragment_2 = StreamJobFragments::for_test(
814 JobId::new(20),
815 BTreeMap::from([(
816 2.into(),
817 Fragment {
818 fragment_id: 2.into(),
819 state_table_ids: vec![20.into(), 21.into(), 22.into(), 23.into()],
820 ..Default::default()
821 },
822 )]),
823 );
824
825 let registered_number = || async {
827 compaction_group_manager
828 .list_compaction_group()
829 .await
830 .iter()
831 .map(|cg| cg.member_table_ids.len())
832 .sum::<usize>()
833 };
834 let group_number =
835 || async { compaction_group_manager.list_compaction_group().await.len() };
836 assert_eq!(registered_number().await, 0);
837
838 compaction_group_manager
839 .register_table_fragments(
840 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
841 table_fragment_1
842 .internal_table_ids()
843 .into_iter()
844 .map_into()
845 .collect(),
846 )
847 .await
848 .unwrap();
849 assert_eq!(registered_number().await, 4);
850 compaction_group_manager
851 .register_table_fragments(
852 Some(table_fragment_2.stream_job_id().as_mv_table_id()),
853 table_fragment_2
854 .internal_table_ids()
855 .into_iter()
856 .map_into()
857 .collect(),
858 )
859 .await
860 .unwrap();
861 assert_eq!(registered_number().await, 8);
862
863 compaction_group_manager
865 .unregister_table_fragments_vec(std::slice::from_ref(&table_fragment_1))
866 .await;
867 assert_eq!(registered_number().await, 4);
868
869 compaction_group_manager
871 .purge(&table_fragment_2.all_table_ids().collect())
872 .await
873 .unwrap();
874 assert_eq!(registered_number().await, 4);
875 compaction_group_manager
876 .purge(&HashSet::new())
877 .await
878 .unwrap();
879 assert_eq!(registered_number().await, 0);
880
881 assert_eq!(group_number().await, 2);
882
883 compaction_group_manager
884 .register_table_fragments(
885 Some(table_fragment_1.stream_job_id().as_mv_table_id()),
886 table_fragment_1
887 .internal_table_ids()
888 .into_iter()
889 .map_into()
890 .collect(),
891 )
892 .await
893 .unwrap();
894 assert_eq!(registered_number().await, 4);
895 assert_eq!(group_number().await, 2);
896
897 compaction_group_manager
898 .unregister_table_fragments_vec(&[table_fragment_1])
899 .await;
900 assert_eq!(registered_number().await, 0);
901 assert_eq!(group_number().await, 2);
902 }
903}