1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ops::DerefMut;
17use std::sync::Arc;
18
19use itertools::Itertools;
20use risingwave_common::catalog::TableId;
21use risingwave_common::util::epoch::INVALID_EPOCH;
22use risingwave_hummock_sdk::CompactionGroupId;
23use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
24use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
25use risingwave_hummock_sdk::version::GroupDelta;
26use risingwave_meta_model::compaction_config;
27use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
28use risingwave_pb::hummock::write_limits::WriteLimit;
29use risingwave_pb::hummock::{
30 CompactionConfig, CompactionGroupInfo, PbGroupConstruct, PbGroupDestroy, PbStateTableInfoDelta,
31};
32use sea_orm::EntityTrait;
33use tokio::sync::OnceCell;
34
35use super::CompactionGroupStatistic;
36use crate::hummock::compaction::compaction_config::{
37 CompactionConfigBuilder, validate_compaction_config,
38};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::model::CompactionGroup;
45use crate::hummock::sequence::next_compaction_group_id;
46use crate::manager::MetaSrvEnv;
47use crate::model::{
48 BTreeMapTransaction, BTreeMapTransactionInner, DerefMutForward, MetadataModelError,
49};
50
51type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>;
52
53impl CompactionGroupManager {
54 pub(crate) async fn new(env: &MetaSrvEnv) -> Result<CompactionGroupManager> {
55 let default_config = match env.opts.compaction_config.as_ref() {
56 None => CompactionConfigBuilder::new().build(),
57 Some(opt) => CompactionConfigBuilder::with_opt(opt).build(),
58 };
59 Self::new_with_config(env, default_config).await
60 }
61
62 pub(crate) async fn new_with_config(
63 env: &MetaSrvEnv,
64 default_config: CompactionConfig,
65 ) -> Result<CompactionGroupManager> {
66 let mut compaction_group_manager = CompactionGroupManager {
67 compaction_groups: BTreeMap::new(),
68 default_config: Arc::new(default_config),
69 write_limit: Default::default(),
70 };
71
72 let loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup> =
73 compaction_config::Entity::find()
74 .all(&env.meta_store_ref().conn)
75 .await
76 .map_err(MetadataModelError::from)?
77 .into_iter()
78 .map(|m| (m.compaction_group_id as CompactionGroupId, m.into()))
79 .collect();
80
81 compaction_group_manager.init(loaded_compaction_groups);
82 Ok(compaction_group_manager)
83 }
84
85 fn init(&mut self, loaded_compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>) {
86 if !loaded_compaction_groups.is_empty() {
87 self.compaction_groups = loaded_compaction_groups;
88 }
89 }
90}
91
92impl HummockManager {
93 pub async fn compaction_group_ids(&self) -> Vec<CompactionGroupId> {
96 get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec()
97 }
98
99 pub async fn get_compaction_group_map(&self) -> BTreeMap<CompactionGroupId, CompactionGroup> {
101 self.compaction_group_manager
102 .read()
103 .await
104 .compaction_groups
105 .clone()
106 }
107
108 #[cfg(test)]
109 pub async fn register_table_fragments(
111 &self,
112 mv_table: Option<u32>,
113 mut internal_tables: Vec<u32>,
114 ) -> Result<Vec<StateTableId>> {
115 let mut pairs = vec![];
116 if let Some(mv_table) = mv_table {
117 if internal_tables.extract_if(.., |t| *t == mv_table).count() > 0 {
118 tracing::warn!("`mv_table` {} found in `internal_tables`", mv_table);
119 }
120 pairs.push((
122 mv_table,
123 CompactionGroupId::from(StaticCompactionGroupId::MaterializedView),
124 ));
125 }
126 for table_id in internal_tables {
128 pairs.push((
129 table_id,
130 CompactionGroupId::from(StaticCompactionGroupId::StateDefault),
131 ));
132 }
133 self.register_table_ids_for_test(&pairs).await?;
134 Ok(pairs.iter().map(|(table_id, ..)| *table_id).collect_vec())
135 }
136
137 #[cfg(test)]
138 pub async fn unregister_table_fragments_vec(
140 &self,
141 table_fragments: &[crate::model::StreamJobFragments],
142 ) {
143 self.unregister_table_ids(
144 table_fragments
145 .iter()
146 .flat_map(|t| t.all_table_ids().map(TableId::new)),
147 )
148 .await
149 .unwrap();
150 }
151
152 pub async fn purge(&self, valid_ids: &HashSet<TableId>) -> Result<()> {
156 let to_unregister = self
157 .versioning
158 .read()
159 .await
160 .current_version
161 .state_table_info
162 .info()
163 .keys()
164 .cloned()
165 .filter(|table_id| !valid_ids.contains(table_id))
166 .collect_vec();
167
168 self.unregister_table_ids(to_unregister).await
171 }
172
173 pub async fn register_table_ids_for_test(
178 &self,
179 pairs: &[(StateTableId, CompactionGroupId)],
180 ) -> Result<()> {
181 if pairs.is_empty() {
182 return Ok(());
183 }
184 let mut versioning_guard = self.versioning.write().await;
185 let versioning = versioning_guard.deref_mut();
186 let mut compaction_group_manager = self.compaction_group_manager.write().await;
187 let current_version = &versioning.current_version;
188 let default_config = compaction_group_manager.default_compaction_config();
189 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
190
191 for (table_id, _) in pairs {
192 if let Some(info) = current_version
193 .state_table_info
194 .info()
195 .get(&TableId::new(*table_id))
196 {
197 return Err(Error::CompactionGroup(format!(
198 "table {} already {:?}",
199 *table_id, info
200 )));
201 }
202 }
203 let new_compaction_group_id: OnceCell<CompactionGroupId> = OnceCell::new();
205 let mut version = HummockVersionTransaction::new(
206 &mut versioning.current_version,
207 &mut versioning.hummock_version_deltas,
208 self.env.notification_manager(),
209 None,
210 &self.metrics,
211 );
212 let mut new_version_delta = version.new_delta();
213
214 let committed_epoch = new_version_delta
215 .latest_version()
216 .state_table_info
217 .info()
218 .values()
219 .map(|info| info.committed_epoch)
220 .max()
221 .unwrap_or(INVALID_EPOCH);
222
223 for (table_id, raw_group_id) in pairs {
224 let mut group_id = *raw_group_id;
225 if group_id == StaticCompactionGroupId::NewCompactionGroup as u64 {
226 let mut is_group_init = false;
227 group_id = *new_compaction_group_id
228 .get_or_try_init(|| async {
229 next_compaction_group_id(&self.env).await.inspect(|_| {
230 is_group_init = true;
231 })
232 })
233 .await?;
234 if is_group_init {
235 let group_deltas = &mut new_version_delta
236 .group_deltas
237 .entry(group_id)
238 .or_default()
239 .group_deltas;
240
241 let config =
242 match compaction_groups_txn.try_get_compaction_group_config(group_id) {
243 Some(config) => config.compaction_config.as_ref().clone(),
244 None => {
245 compaction_groups_txn
246 .create_compaction_groups(group_id, default_config.clone());
247 default_config.as_ref().clone()
248 }
249 };
250
251 let group_delta = GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
252 group_config: Some(config),
253 group_id,
254 ..Default::default()
255 }));
256
257 group_deltas.push(group_delta);
258 }
259 }
260 assert!(
261 new_version_delta
262 .state_table_info_delta
263 .insert(
264 TableId::new(*table_id),
265 PbStateTableInfoDelta {
266 committed_epoch,
267 compaction_group_id: *raw_group_id,
268 }
269 )
270 .is_none()
271 );
272 }
273 new_version_delta.pre_apply();
274 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
275
276 Ok(())
277 }
278
279 pub async fn unregister_table_ids(
280 &self,
281 table_ids: impl IntoIterator<Item = TableId> + Send,
282 ) -> Result<()> {
283 let table_ids = table_ids.into_iter().collect_vec();
284 if table_ids.is_empty() {
285 return Ok(());
286 }
287
288 {
289 let mut table_write_throughput_statistic_manager =
293 self.table_write_throughput_statistic_manager.write();
294 for table_id in table_ids.iter().unique() {
295 table_write_throughput_statistic_manager.remove_table(table_id.table_id);
296 }
297 }
298
299 let mut versioning_guard = self.versioning.write().await;
300 let versioning = versioning_guard.deref_mut();
301 let mut version = HummockVersionTransaction::new(
302 &mut versioning.current_version,
303 &mut versioning.hummock_version_deltas,
304 self.env.notification_manager(),
305 None,
306 &self.metrics,
307 );
308 let mut new_version_delta = version.new_delta();
309 let mut modified_groups: HashMap<CompactionGroupId, u64> =
310 HashMap::new();
311 for table_id in table_ids.into_iter().unique() {
313 let version = new_version_delta.latest_version();
314 let Some(info) = version.state_table_info.info().get(&table_id) else {
315 continue;
316 };
317
318 modified_groups
319 .entry(info.compaction_group_id)
320 .and_modify(|count| *count -= 1)
321 .or_insert(
322 version
323 .state_table_info
324 .compaction_group_member_tables()
325 .get(&info.compaction_group_id)
326 .expect("should exist")
327 .len() as u64
328 - 1,
329 );
330 new_version_delta.removed_table_ids.insert(table_id);
331 }
332
333 let groups_to_remove = modified_groups
334 .into_iter()
335 .filter_map(|(group_id, member_count)| {
336 if member_count == 0 && group_id > StaticCompactionGroupId::End as CompactionGroupId
337 {
338 return Some((
339 group_id,
340 new_version_delta
341 .latest_version()
342 .get_compaction_group_levels(group_id)
343 .levels
344 .len(),
345 ));
346 }
347 None
348 })
349 .collect_vec();
350 for (group_id, _) in &groups_to_remove {
351 let group_deltas = &mut new_version_delta
352 .group_deltas
353 .entry(*group_id)
354 .or_default()
355 .group_deltas;
356
357 let group_delta = GroupDelta::GroupDestroy(PbGroupDestroy {});
358 group_deltas.push(group_delta);
359 }
360
361 for (group_id, max_level) in groups_to_remove {
362 remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level);
363 }
364
365 new_version_delta.pre_apply();
366
367 let mut compaction_group_manager = self.compaction_group_manager.write().await;
370 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
371
372 compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids(
373 version.latest_version(),
374 )));
375 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
376
377 Ok(())
379 }
380
381 pub async fn update_compaction_config(
382 &self,
383 compaction_group_ids: &[CompactionGroupId],
384 config_to_update: &[MutableConfig],
385 ) -> Result<()> {
386 {
387 let mut compaction_group_manager = self.compaction_group_manager.write().await;
389 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
390 compaction_groups_txn
391 .update_compaction_config(compaction_group_ids, config_to_update)?;
392 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
393 }
394
395 if config_to_update
396 .iter()
397 .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_)))
398 {
399 self.try_update_write_limits(compaction_group_ids).await;
401 }
402
403 Ok(())
404 }
405
406 pub async fn list_compaction_group(&self) -> Vec<CompactionGroupInfo> {
409 let mut versioning_guard = self.versioning.write().await;
410 let versioning = versioning_guard.deref_mut();
411 let current_version = &versioning.current_version;
412 let mut results = vec![];
413 let compaction_group_manager = self.compaction_group_manager.read().await;
414
415 for levels in current_version.levels.values() {
416 let compaction_config = compaction_group_manager
417 .try_get_compaction_group_config(levels.group_id)
418 .unwrap()
419 .compaction_config
420 .as_ref()
421 .clone();
422 let group = CompactionGroupInfo {
423 id: levels.group_id,
424 parent_id: levels.parent_group_id,
425 member_table_ids: current_version
426 .state_table_info
427 .compaction_group_member_table_ids(levels.group_id)
428 .iter()
429 .map(|table_id| table_id.table_id)
430 .collect_vec(),
431 compaction_config: Some(compaction_config),
432 };
433 results.push(group);
434 }
435 results
436 }
437
438 pub async fn calculate_compaction_group_statistic(&self) -> Vec<CompactionGroupStatistic> {
439 let mut infos = vec![];
440 {
441 let versioning_guard = self.versioning.read().await;
442 let manager = self.compaction_group_manager.read().await;
443 let version = &versioning_guard.current_version;
444 for group_id in version.levels.keys() {
445 let mut group_info = CompactionGroupStatistic {
446 group_id: *group_id,
447 ..Default::default()
448 };
449 for table_id in version
450 .state_table_info
451 .compaction_group_member_table_ids(*group_id)
452 {
453 let stats_size = versioning_guard
454 .version_stats
455 .table_stats
456 .get(&table_id.table_id)
457 .map(|stats| stats.total_key_size + stats.total_value_size)
458 .unwrap_or(0);
459 let table_size = std::cmp::max(stats_size, 0) as u64;
460 group_info.group_size += table_size;
461 group_info
462 .table_statistic
463 .insert(table_id.table_id, table_size);
464 group_info.compaction_group_config =
465 manager.try_get_compaction_group_config(*group_id).unwrap();
466 }
467 infos.push(group_info);
468 }
469 };
470 infos
471 }
472
473 pub(crate) async fn initial_compaction_group_config_after_load(
474 &self,
475 versioning_guard: &Versioning,
476 compaction_group_manager: &mut CompactionGroupManager,
477 ) -> Result<()> {
478 let current_version = &versioning_guard.current_version;
480 let all_group_ids = get_compaction_group_ids(current_version).collect_vec();
481 let default_config = compaction_group_manager.default_compaction_config();
482 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
483 compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config);
484 commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?;
485
486 Ok(())
487 }
488}
489
490pub(crate) struct CompactionGroupManager {
498 compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
499 default_config: Arc<CompactionConfig>,
500 pub write_limit: HashMap<CompactionGroupId, WriteLimit>,
502}
503
504impl CompactionGroupManager {
505 pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> {
507 CompactionGroupTransaction::new(&mut self.compaction_groups)
508 }
509
510 #[expect(clippy::type_complexity)]
511 pub fn start_owned_compaction_groups_txn<P: DerefMut<Target = Self>>(
512 inner: P,
513 ) -> BTreeMapTransactionInner<
514 CompactionGroupId,
515 CompactionGroup,
516 DerefMutForward<
517 Self,
518 BTreeMap<CompactionGroupId, CompactionGroup>,
519 P,
520 impl Fn(&Self) -> &BTreeMap<CompactionGroupId, CompactionGroup>,
521 impl Fn(&mut Self) -> &mut BTreeMap<CompactionGroupId, CompactionGroup>,
522 >,
523 > {
524 BTreeMapTransactionInner::new(DerefMutForward::new(
525 inner,
526 |mgr| &mgr.compaction_groups,
527 |mgr| &mut mgr.compaction_groups,
528 ))
529 }
530
531 pub(crate) fn try_get_compaction_group_config(
533 &self,
534 compaction_group_id: CompactionGroupId,
535 ) -> Option<CompactionGroup> {
536 self.compaction_groups.get(&compaction_group_id).cloned()
537 }
538
539 pub(crate) fn default_compaction_config(&self) -> Arc<CompactionConfig> {
541 self.default_config.clone()
542 }
543}
544
545fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) {
546 for item in items {
547 match item {
548 MutableConfig::MaxBytesForLevelBase(c) => {
549 target.max_bytes_for_level_base = *c;
550 }
551 MutableConfig::MaxBytesForLevelMultiplier(c) => {
552 target.max_bytes_for_level_multiplier = *c;
553 }
554 MutableConfig::MaxCompactionBytes(c) => {
555 target.max_compaction_bytes = *c;
556 }
557 MutableConfig::SubLevelMaxCompactionBytes(c) => {
558 target.sub_level_max_compaction_bytes = *c;
559 }
560 MutableConfig::Level0TierCompactFileNumber(c) => {
561 target.level0_tier_compact_file_number = *c;
562 }
563 MutableConfig::TargetFileSizeBase(c) => {
564 target.target_file_size_base = *c;
565 }
566 MutableConfig::CompactionFilterMask(c) => {
567 target.compaction_filter_mask = *c;
568 }
569 MutableConfig::MaxSubCompaction(c) => {
570 target.max_sub_compaction = *c;
571 }
572 MutableConfig::Level0StopWriteThresholdSubLevelNumber(c) => {
573 target.level0_stop_write_threshold_sub_level_number = *c;
574 }
575 MutableConfig::Level0SubLevelCompactLevelCount(c) => {
576 target.level0_sub_level_compact_level_count = *c;
577 }
578 MutableConfig::Level0OverlappingSubLevelCompactLevelCount(c) => {
579 target.level0_overlapping_sub_level_compact_level_count = *c;
580 }
581 MutableConfig::MaxSpaceReclaimBytes(c) => {
582 target.max_space_reclaim_bytes = *c;
583 }
584 MutableConfig::Level0MaxCompactFileNumber(c) => {
585 target.level0_max_compact_file_number = *c;
586 }
587 MutableConfig::EnableEmergencyPicker(c) => {
588 target.enable_emergency_picker = *c;
589 }
590 MutableConfig::TombstoneReclaimRatio(c) => {
591 target.tombstone_reclaim_ratio = *c;
592 }
593 MutableConfig::CompressionAlgorithm(c) => {
594 target.compression_algorithm[c.get_level() as usize]
595 .clone_from(&c.compression_algorithm);
596 }
597 MutableConfig::MaxL0CompactLevelCount(c) => {
598 target.max_l0_compact_level_count = Some(*c);
599 }
600 MutableConfig::SstAllowedTrivialMoveMinSize(c) => {
601 target.sst_allowed_trivial_move_min_size = Some(*c);
602 }
603 MutableConfig::SplitWeightByVnode(c) => {
604 target.split_weight_by_vnode = *c;
605 }
606 MutableConfig::DisableAutoGroupScheduling(c) => {
607 target.disable_auto_group_scheduling = Some(*c);
608 }
609 MutableConfig::MaxOverlappingLevelSize(c) => {
610 target.max_overlapping_level_size = Some(*c);
611 }
612 MutableConfig::SstAllowedTrivialMoveMaxCount(c) => {
613 target.sst_allowed_trivial_move_max_count = Some(*c);
614 }
615 MutableConfig::EmergencyLevel0SstFileCount(c) => {
616 target.emergency_level0_sst_file_count = Some(*c);
617 }
618 MutableConfig::EmergencyLevel0SubLevelPartition(c) => {
619 target.emergency_level0_sub_level_partition = Some(*c);
620 }
621 MutableConfig::Level0StopWriteThresholdMaxSstCount(c) => {
622 target.level0_stop_write_threshold_max_sst_count = Some(*c);
623 }
624 MutableConfig::Level0StopWriteThresholdMaxSize(c) => {
625 target.level0_stop_write_threshold_max_size = Some(*c);
626 }
627 }
628 }
629}
630
631impl CompactionGroupTransaction<'_> {
632 pub fn try_create_compaction_groups(
634 &mut self,
635 compaction_group_ids: &[CompactionGroupId],
636 config: Arc<CompactionConfig>,
637 ) -> bool {
638 let mut trivial = true;
639 for id in compaction_group_ids {
640 if self.contains_key(id) {
641 continue;
642 }
643 let new_entry = CompactionGroup::new(*id, config.as_ref().clone());
644 self.insert(*id, new_entry);
645
646 trivial = false;
647 }
648
649 !trivial
650 }
651
652 pub fn create_compaction_groups(
653 &mut self,
654 compaction_group_id: CompactionGroupId,
655 config: Arc<CompactionConfig>,
656 ) {
657 self.try_create_compaction_groups(&[compaction_group_id], config);
658 }
659
660 pub(crate) fn try_get_compaction_group_config(
662 &self,
663 compaction_group_id: CompactionGroupId,
664 ) -> Option<&CompactionGroup> {
665 self.get(&compaction_group_id)
666 }
667
668 pub fn purge(&mut self, existing_groups: HashSet<CompactionGroupId>) {
670 let stale_group = self
671 .tree_ref()
672 .keys()
673 .cloned()
674 .filter(|k| !existing_groups.contains(k))
675 .collect_vec();
676 if stale_group.is_empty() {
677 return;
678 }
679 for group in stale_group {
680 self.remove(group);
681 }
682 }
683
684 pub(crate) fn update_compaction_config(
685 &mut self,
686 compaction_group_ids: &[CompactionGroupId],
687 config_to_update: &[MutableConfig],
688 ) -> Result<HashMap<CompactionGroupId, CompactionGroup>> {
689 let mut results = HashMap::default();
690 for compaction_group_id in compaction_group_ids.iter().unique() {
691 let group = self.get(compaction_group_id).ok_or_else(|| {
692 Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
693 })?;
694 let mut config = group.compaction_config.as_ref().clone();
695 update_compaction_config(&mut config, config_to_update);
696 if let Err(reason) = validate_compaction_config(&config) {
697 return Err(Error::CompactionGroup(reason));
698 }
699 let mut new_group = group.clone();
700 new_group.compaction_config = Arc::new(config);
701 self.insert(*compaction_group_id, new_group.clone());
702 results.insert(new_group.group_id(), new_group);
703 }
704
705 Ok(results)
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use std::collections::{BTreeMap, HashSet};
712
713 use risingwave_common::catalog::TableId;
714 use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
715
716 use crate::controller::SqlMetaStore;
717 use crate::hummock::commit_multi_var;
718 use crate::hummock::error::Result;
719 use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
720 use crate::hummock::test_utils::setup_compute_env;
721 use crate::model::{Fragment, StreamJobFragments};
722
723 #[tokio::test]
724 async fn test_inner() {
725 let (env, ..) = setup_compute_env(8080).await;
726 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
727 assert_eq!(inner.compaction_groups.len(), 2);
728
729 async fn update_compaction_config(
730 meta: &SqlMetaStore,
731 inner: &mut CompactionGroupManager,
732 cg_ids: &[u64],
733 config_to_update: &[MutableConfig],
734 ) -> Result<()> {
735 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
736 compaction_groups_txn.update_compaction_config(cg_ids, config_to_update)?;
737 commit_multi_var!(meta, compaction_groups_txn)
738 }
739
740 async fn insert_compaction_group_configs(
741 meta: &SqlMetaStore,
742 inner: &mut CompactionGroupManager,
743 cg_ids: &[u64],
744 ) {
745 let default_config = inner.default_compaction_config();
746 let mut compaction_groups_txn = inner.start_compaction_groups_txn();
747 if compaction_groups_txn.try_create_compaction_groups(cg_ids, default_config) {
748 commit_multi_var!(meta, compaction_groups_txn).unwrap();
749 }
750 }
751
752 update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[])
753 .await
754 .unwrap_err();
755 insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await;
756 assert_eq!(inner.compaction_groups.len(), 4);
757 let mut inner = CompactionGroupManager::new(&env).await.unwrap();
758 assert_eq!(inner.compaction_groups.len(), 4);
759
760 update_compaction_config(
761 env.meta_store_ref(),
762 &mut inner,
763 &[100, 200],
764 &[MutableConfig::MaxSubCompaction(123)],
765 )
766 .await
767 .unwrap();
768 assert_eq!(inner.compaction_groups.len(), 4);
769 assert_eq!(
770 inner
771 .try_get_compaction_group_config(100)
772 .unwrap()
773 .compaction_config
774 .max_sub_compaction,
775 123
776 );
777 assert_eq!(
778 inner
779 .try_get_compaction_group_config(200)
780 .unwrap()
781 .compaction_config
782 .max_sub_compaction,
783 123
784 );
785 }
786
787 #[tokio::test]
788 async fn test_manager() {
789 let (_, compaction_group_manager, ..) = setup_compute_env(8080).await;
790 let table_fragment_1 = StreamJobFragments::for_test(
791 TableId::new(10),
792 BTreeMap::from([(
793 1,
794 Fragment {
795 fragment_id: 1,
796 state_table_ids: vec![10, 11, 12, 13],
797 ..Default::default()
798 },
799 )]),
800 );
801 let table_fragment_2 = StreamJobFragments::for_test(
802 TableId::new(20),
803 BTreeMap::from([(
804 2,
805 Fragment {
806 fragment_id: 2,
807 state_table_ids: vec![20, 21, 22, 23],
808 ..Default::default()
809 },
810 )]),
811 );
812
813 let registered_number = || async {
815 compaction_group_manager
816 .list_compaction_group()
817 .await
818 .iter()
819 .map(|cg| cg.member_table_ids.len())
820 .sum::<usize>()
821 };
822 let group_number =
823 || async { compaction_group_manager.list_compaction_group().await.len() };
824 assert_eq!(registered_number().await, 0);
825
826 compaction_group_manager
827 .register_table_fragments(
828 Some(table_fragment_1.stream_job_id().table_id),
829 table_fragment_1.internal_table_ids(),
830 )
831 .await
832 .unwrap();
833 assert_eq!(registered_number().await, 4);
834 compaction_group_manager
835 .register_table_fragments(
836 Some(table_fragment_2.stream_job_id().table_id),
837 table_fragment_2.internal_table_ids(),
838 )
839 .await
840 .unwrap();
841 assert_eq!(registered_number().await, 8);
842
843 compaction_group_manager
845 .unregister_table_fragments_vec(&[table_fragment_1.clone()])
846 .await;
847 assert_eq!(registered_number().await, 4);
848
849 compaction_group_manager
851 .purge(&table_fragment_2.all_table_ids().map(TableId::new).collect())
852 .await
853 .unwrap();
854 assert_eq!(registered_number().await, 4);
855 compaction_group_manager
856 .purge(&HashSet::new())
857 .await
858 .unwrap();
859 assert_eq!(registered_number().await, 0);
860
861 assert_eq!(group_number().await, 2);
862
863 compaction_group_manager
864 .register_table_fragments(
865 Some(table_fragment_1.stream_job_id().table_id),
866 table_fragment_1.internal_table_ids(),
867 )
868 .await
869 .unwrap();
870 assert_eq!(registered_number().await, 4);
871 assert_eq!(group_number().await, 2);
872
873 compaction_group_manager
874 .unregister_table_fragments_vec(&[table_fragment_1])
875 .await;
876 assert_eq!(registered_number().await, 0);
877 assert_eq!(group_number().await, 2);
878 }
879}