1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas, HummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::compaction_group_manager::CompactionGroupManager;
38use super::{CompactionGroupStatistic, GroupStateValidator};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_metrics;
44use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
45use crate::hummock::table_write_throughput_statistic::{
46 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
47};
48use crate::manager::MetaOpts;
49
50#[derive(Debug, PartialEq, Eq)]
51struct NormalizePlan {
52 parent_group_id: CompactionGroupId,
53 parent_table_ids: Vec<StateTableId>,
54 boundary_table_id: StateTableId,
55}
56
57impl NormalizePlan {
58 fn split_key(&self) -> Bytes {
59 group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO)
60 .encode()
61 .into()
62 }
63
64 fn split_table_ids(&self) -> (Vec<StateTableId>, Vec<StateTableId>) {
65 let split_full_key =
66 group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO);
67 let (table_ids_left, table_ids_right) =
68 group_split::split_table_ids_with_table_id_and_vnode(
69 &self.parent_table_ids,
70 split_full_key.user_key.table_id,
71 split_full_key.user_key.get_vnode_id(),
72 );
73 assert!(!table_ids_left.is_empty() && !table_ids_right.is_empty());
74 (table_ids_left, table_ids_right)
75 }
76}
77
78fn gen_normalize_plan(
79 left: &CompactionGroupStatistic,
80 right: &CompactionGroupStatistic,
81) -> Option<NormalizePlan> {
82 let left_table_ids = left.table_statistic.keys().copied().collect_vec();
83
84 if left_table_ids.len() <= 1 {
85 return None;
86 }
87
88 let left_max = *left_table_ids.last().unwrap();
89 let right_min = *right.table_statistic.keys().next().unwrap();
90 if left_max < right_min {
91 return None;
92 }
93
94 let boundary_index = left_table_ids.partition_point(|&table_id| table_id < right_min);
95 if boundary_index == 0 || boundary_index >= left_table_ids.len() {
96 return None;
97 }
98 let boundary_table_id = left_table_ids[boundary_index];
99
100 Some(NormalizePlan {
101 parent_group_id: left.group_id,
102 parent_table_ids: left_table_ids,
103 boundary_table_id,
104 })
105}
106
107fn build_normalize_plan_from_group_statistics(
108 groups: &[CompactionGroupStatistic],
109) -> Option<NormalizePlan> {
110 let mut groups = groups
113 .iter()
114 .filter(|group| !group.table_statistic.is_empty())
115 .collect_vec();
116 groups.sort_by_key(|group| *group.table_statistic.keys().next().unwrap());
117
118 groups
119 .split(|group| {
120 group
121 .compaction_group_config
122 .compaction_config
123 .disable_auto_group_scheduling
124 .unwrap_or(false)
125 })
126 .find_map(|segment| {
127 segment
128 .windows(2)
129 .find_map(|pair| gen_normalize_plan(pair[0], pair[1]))
130 })
131}
132
133fn collect_normalize_group_statistics(
134 version: &HummockVersion,
135 compaction_group_manager: &CompactionGroupManager,
136) -> Result<Vec<CompactionGroupStatistic>> {
137 let mut groups = vec![];
138 for group_id in version.levels.keys() {
139 let table_ids = version
140 .state_table_info
141 .compaction_group_member_table_ids(*group_id)
142 .iter()
143 .copied()
144 .collect_vec();
145 if table_ids.is_empty() {
146 continue;
147 }
148
149 let group_config = compaction_group_manager
150 .try_get_compaction_group_config(*group_id)
151 .ok_or_else(|| {
152 Error::CompactionGroup(format!(
153 "group {} config not found during normalize",
154 group_id
155 ))
156 })?;
157 groups.push(CompactionGroupStatistic {
158 group_id: *group_id,
159 group_size: 0,
160 table_statistic: table_ids
161 .into_iter()
162 .map(|table_id| (table_id, 0))
163 .collect(),
164 compaction_group_config: group_config,
165 });
166 }
167 Ok(groups)
168}
169
170impl HummockManager {
171 pub async fn merge_compaction_group(
172 &self,
173 group_1: CompactionGroupId,
174 group_2: CompactionGroupId,
175 ) -> Result<()> {
176 self.merge_compaction_group_impl(group_1, group_2, None)
177 .await
178 }
179
180 pub async fn merge_compaction_group_for_test(
181 &self,
182 group_1: CompactionGroupId,
183 group_2: CompactionGroupId,
184 created_tables: HashSet<TableId>,
185 ) -> Result<()> {
186 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
187 .await
188 }
189
190 pub async fn merge_compaction_group_impl(
191 &self,
192 group_1: CompactionGroupId,
193 group_2: CompactionGroupId,
194 created_tables: Option<HashSet<TableId>>,
195 ) -> Result<()> {
196 let compaction_guard = self.compaction.write().await;
197 let mut versioning_guard = self.versioning.write().await;
198 let versioning = versioning_guard.deref_mut();
199 if !versioning.current_version.levels.contains_key(&group_1) {
201 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
202 }
203
204 if !versioning.current_version.levels.contains_key(&group_2) {
205 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
206 }
207
208 let state_table_info = versioning.current_version.state_table_info.clone();
209 let mut member_table_ids_1 = state_table_info
210 .compaction_group_member_table_ids(group_1)
211 .iter()
212 .cloned()
213 .collect_vec();
214
215 if member_table_ids_1.is_empty() {
216 return Err(Error::CompactionGroup(format!(
217 "group_1 {} is empty",
218 group_1
219 )));
220 }
221
222 let mut member_table_ids_2 = state_table_info
223 .compaction_group_member_table_ids(group_2)
224 .iter()
225 .cloned()
226 .collect_vec();
227
228 if member_table_ids_2.is_empty() {
229 return Err(Error::CompactionGroup(format!(
230 "group_2 {} is empty",
231 group_2
232 )));
233 }
234
235 debug_assert!(!member_table_ids_1.is_empty());
236 debug_assert!(!member_table_ids_2.is_empty());
237 assert!(member_table_ids_1.is_sorted());
238 assert!(member_table_ids_2.is_sorted());
239
240 let created_tables = if let Some(created_tables) = created_tables {
241 #[expect(clippy::assertions_on_constants)]
243 {
244 assert!(cfg!(debug_assertions));
245 }
246 created_tables
247 } else {
248 match self.metadata_manager.get_created_table_ids().await {
249 Ok(created_tables) => HashSet::from_iter(created_tables),
250 Err(err) => {
251 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
252 return Err(Error::CompactionGroup(format!(
253 "merge group_1 {} group_2 {} failed to fetch created table ids",
254 group_1, group_2
255 )));
256 }
257 }
258 };
259
260 fn contains_creating_table(
261 table_ids: &Vec<TableId>,
262 created_tables: &HashSet<TableId>,
263 ) -> bool {
264 table_ids
265 .iter()
266 .any(|table_id| !created_tables.contains(table_id))
267 }
268
269 if contains_creating_table(&member_table_ids_1, &created_tables)
271 || contains_creating_table(&member_table_ids_2, &created_tables)
272 {
273 return Err(Error::CompactionGroup(format!(
274 "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
275 group_1, group_2, member_table_ids_1, member_table_ids_2
276 )));
277 }
278
279 let (left_group_id, right_group_id) =
281 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
282 (group_1, group_2)
283 } else {
284 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
285 (group_2, group_1)
286 };
287
288 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
293 return Err(Error::CompactionGroup(format!(
294 "invalid merge group_1 {} group_2 {}: table id ranges overlap",
295 left_group_id, right_group_id
296 )));
297 }
298
299 let combined_member_table_ids = member_table_ids_1
300 .iter()
301 .chain(member_table_ids_2.iter())
302 .collect_vec();
303 assert!(combined_member_table_ids.is_sorted());
304
305 let mut sst_id_set = HashSet::new();
307 for sst_id in versioning
308 .current_version
309 .get_sst_ids_by_group_id(left_group_id)
310 .chain(
311 versioning
312 .current_version
313 .get_sst_ids_by_group_id(right_group_id),
314 )
315 {
316 if !sst_id_set.insert(sst_id) {
317 return Err(Error::CompactionGroup(format!(
318 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
319 left_group_id, right_group_id, sst_id
320 )));
321 }
322 }
323
324 {
326 let left_levels = versioning
327 .current_version
328 .get_compaction_group_levels(group_1);
329
330 let right_levels = versioning
331 .current_version
332 .get_compaction_group_levels(group_2);
333
334 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
337 for level_idx in 1..=max_level {
338 let left_level = left_levels.get_level(level_idx);
339 let right_level = right_levels.get_level(level_idx);
340 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
341 continue;
342 }
343
344 let left_last_sst = left_level.table_infos.last().unwrap().clone();
345 let right_first_sst = right_level.table_infos.first().unwrap().clone();
346 let left_sst_id = left_last_sst.sst_id;
347 let right_sst_id = right_first_sst.sst_id;
348 let left_obj_id = left_last_sst.object_id;
349 let right_obj_id = right_first_sst.object_id;
350
351 if !can_concat(&[left_last_sst, right_first_sst]) {
353 return Err(Error::CompactionGroup(format!(
354 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
355 left_group_id,
356 right_group_id,
357 level_idx,
358 left_sst_id,
359 right_sst_id,
360 left_obj_id,
361 right_obj_id
362 )));
363 }
364 }
365 }
366
367 let mut version = HummockVersionTransaction::new(
368 &mut versioning.current_version,
369 &mut versioning.hummock_version_deltas,
370 &mut versioning.table_change_log,
371 self.env.notification_manager(),
372 None,
373 &self.metrics,
374 &self.env.opts,
375 );
376 let mut new_version_delta = version.new_delta();
377
378 let target_compaction_group_id = {
379 new_version_delta.group_deltas.insert(
381 left_group_id,
382 GroupDeltas {
383 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
384 left_group_id,
385 right_group_id,
386 })],
387 },
388 );
389 left_group_id
390 };
391
392 new_version_delta.with_latest_version(|version, new_version_delta| {
395 for &table_id in combined_member_table_ids {
396 let info = version
397 .state_table_info
398 .info()
399 .get(&table_id)
400 .expect("have check exist previously");
401 assert!(
402 new_version_delta
403 .state_table_info_delta
404 .insert(
405 table_id,
406 PbStateTableInfoDelta {
407 committed_epoch: info.committed_epoch,
408 compaction_group_id: target_compaction_group_id,
409 }
410 )
411 .is_none()
412 );
413 }
414 });
415
416 {
417 let mut compaction_group_manager = self.compaction_group_manager.write().await;
418 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
419
420 {
422 let right_group_max_level = new_version_delta
423 .latest_version()
424 .get_compaction_group_levels(right_group_id)
425 .levels
426 .len();
427
428 remove_compaction_group_metrics(
429 &self.metrics,
430 right_group_id,
431 right_group_max_level,
432 );
433 }
434
435 self.compaction_state
437 .remove_compaction_group(right_group_id);
438
439 {
441 if let Err(err) = compaction_groups_txn.update_compaction_config(
442 &[left_group_id],
443 &[MutableConfig::SplitWeightByVnode(0)], ) {
445 tracing::error!(
446 error = %err.as_report(),
447 "failed to update compaction config for group-{}",
448 left_group_id
449 );
450 }
451 }
452
453 new_version_delta.pre_apply();
454
455 compaction_groups_txn.remove(right_group_id);
457 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
458 }
459
460 versioning.mark_next_time_travel_version_snapshot();
462
463 let mut canceled_tasks = vec![];
465 let compact_task_assignments =
468 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
469 compact_task_assignments
470 .into_iter()
471 .for_each(|task_assignment| {
472 let task = &task_assignment.compact_task;
473 assert_eq!(task.compaction_group_id, right_group_id);
474 canceled_tasks.push(ReportTask {
475 task_id: task.task_id,
476 task_status: TaskStatus::ManualCanceled,
477 table_stats_change: HashMap::default(),
478 sorted_output_ssts: vec![],
479 object_timestamps: HashMap::default(),
480 });
481 });
482
483 if !canceled_tasks.is_empty() {
484 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
485 .await?;
486 } else {
487 drop(versioning_guard);
488 drop(compaction_guard);
489 }
490
491 self.try_update_write_limits(&[left_group_id, right_group_id])
492 .await;
493
494 self.metrics
495 .merge_compaction_group_count
496 .with_label_values(&[&left_group_id.to_string()])
497 .inc();
498
499 Ok(())
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use std::collections::BTreeMap;
506
507 use risingwave_hummock_sdk::CompactionGroupId;
508 use risingwave_pb::hummock::CompactionConfig;
509
510 use super::{
511 CompactionGroupStatistic, NormalizePlan, build_normalize_plan_from_group_statistics,
512 gen_normalize_plan,
513 };
514 use crate::hummock::model::CompactionGroup;
515
516 fn group(
517 group_id: CompactionGroupId,
518 table_ids: &[u32],
519 disable_auto_group_scheduling: bool,
520 ) -> CompactionGroupStatistic {
521 let config = CompactionConfig {
522 disable_auto_group_scheduling: Some(disable_auto_group_scheduling),
523 ..Default::default()
524 };
525 CompactionGroupStatistic {
526 group_id,
527 group_size: 0,
528 table_statistic: table_ids
529 .iter()
530 .copied()
531 .map(|table_id| (table_id.into(), 0_u64))
532 .collect::<BTreeMap<_, _>>(),
533 compaction_group_config: CompactionGroup::new(group_id, config),
534 }
535 }
536
537 #[test]
538 fn test_gen_normalize_plan_returns_none_for_single_table_group() {
539 let left = group(1.into(), &[10], false);
540 let right = group(2.into(), &[5, 20], false);
541
542 assert_eq!(None, gen_normalize_plan(&left, &right));
543 }
544
545 #[test]
546 fn test_gen_normalize_plan_returns_none_for_non_overlapping_groups() {
547 let left = group(1.into(), &[1, 2, 3], false);
548 let right = group(2.into(), &[4, 5, 6], false);
549
550 assert_eq!(None, gen_normalize_plan(&left, &right));
551 }
552
553 #[test]
554 fn test_gen_normalize_plan_returns_none_when_boundary_cannot_split_parent() {
555 let left = group(1.into(), &[5, 6, 7], false);
556 let right = group(2.into(), &[4, 8], false);
557
558 assert_eq!(None, gen_normalize_plan(&left, &right));
559 }
560
561 #[test]
562 fn test_gen_normalize_plan_generates_expected_boundary() {
563 let left = group(1.into(), &[1, 4, 7], false);
564 let right = group(2.into(), &[2, 5, 8], false);
565
566 assert_eq!(
567 Some(NormalizePlan {
568 parent_group_id: 1.into(),
569 parent_table_ids: vec![1.into(), 4.into(), 7.into()],
570 boundary_table_id: 4.into(),
571 }),
572 gen_normalize_plan(&left, &right)
573 );
574 }
575
576 #[test]
577 fn test_build_normalize_plan_skips_disabled_boundary_and_continues_later_segment() {
578 let groups = vec![
579 group(1.into(), &[1, 4, 7], false),
580 group(2.into(), &[2, 5, 8], true),
581 group(3.into(), &[10, 13, 16], false),
582 group(4.into(), &[11, 14, 17], false),
583 ];
584
585 assert_eq!(
586 Some(NormalizePlan {
587 parent_group_id: 3.into(),
588 parent_table_ids: vec![10.into(), 13.into(), 16.into()],
589 boundary_table_id: 13.into(),
590 }),
591 build_normalize_plan_from_group_statistics(&groups)
592 );
593 }
594}
595
596impl HummockManager {
597 async fn split_compaction_group_impl(
609 &self,
610 parent_group_id: CompactionGroupId,
611 split_table_ids: &[StateTableId],
612 table_id_to_split: StateTableId,
613 vnode_to_split: VirtualNode,
614 partition_vnode_count: Option<u32>,
615 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
616 let mut result = vec![];
617 let compaction_guard = self.compaction.write().await;
618 let mut versioning_guard = self.versioning.write().await;
619 let versioning = versioning_guard.deref_mut();
620 if !versioning
622 .current_version
623 .levels
624 .contains_key(&parent_group_id)
625 {
626 return Err(Error::CompactionGroup(format!(
627 "invalid group {}",
628 parent_group_id
629 )));
630 }
631
632 let member_table_ids = versioning
633 .current_version
634 .state_table_info
635 .compaction_group_member_table_ids(parent_group_id)
636 .iter()
637 .copied()
638 .collect::<BTreeSet<_>>();
639
640 if !member_table_ids.contains(&table_id_to_split) {
641 return Err(Error::CompactionGroup(format!(
642 "table {} doesn't in group {}",
643 table_id_to_split, parent_group_id
644 )));
645 }
646
647 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
648
649 let table_ids = member_table_ids.into_iter().collect_vec();
651 if table_ids == split_table_ids {
652 return Err(Error::CompactionGroup(format!(
653 "invalid split attempt for group {}: all member tables are moved",
654 parent_group_id
655 )));
656 }
657 let (table_ids_left, table_ids_right) =
659 group_split::split_table_ids_with_table_id_and_vnode(
660 &table_ids,
661 split_full_key.user_key.table_id,
662 split_full_key.user_key.get_vnode_id(),
663 );
664 if table_ids_left.is_empty() || table_ids_right.is_empty() {
665 if !table_ids_left.is_empty() {
667 result.push((parent_group_id, table_ids_left));
668 }
669
670 if !table_ids_right.is_empty() {
671 result.push((parent_group_id, table_ids_right));
672 }
673 return Ok(result);
674 }
675
676 result.push((parent_group_id, table_ids_left));
677
678 let split_key: Bytes = split_full_key.encode().into();
679
680 let mut version = HummockVersionTransaction::new(
681 &mut versioning.current_version,
682 &mut versioning.hummock_version_deltas,
683 &mut versioning.table_change_log,
684 self.env.notification_manager(),
685 None,
686 &self.metrics,
687 &self.env.opts,
688 );
689 let mut new_version_delta = version.new_delta();
690
691 let split_sst_count = new_version_delta
692 .latest_version()
693 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
694
695 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
696 let (new_compaction_group_id, config) = {
697 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
699 let config = self
701 .compaction_group_manager
702 .read()
703 .await
704 .try_get_compaction_group_config(parent_group_id)
705 .ok_or_else(|| {
706 Error::CompactionGroup(format!(
707 "parent group {} config not found",
708 parent_group_id
709 ))
710 })?
711 .compaction_config()
712 .as_ref()
713 .clone();
714
715 #[expect(deprecated)]
716 new_version_delta.group_deltas.insert(
718 new_compaction_group_id,
719 GroupDeltas {
720 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
721 group_config: Some(config.clone()),
722 group_id: new_compaction_group_id,
723 parent_group_id,
724 new_sst_start_id,
725 table_ids: vec![],
726 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
728 }))],
729 },
730 );
731 (new_compaction_group_id, config)
732 };
733
734 new_version_delta.with_latest_version(|version, new_version_delta| {
735 for &table_id in &table_ids_right {
736 let info = version
737 .state_table_info
738 .info()
739 .get(&table_id)
740 .expect("have check exist previously");
741 assert!(
742 new_version_delta
743 .state_table_info_delta
744 .insert(
745 table_id,
746 PbStateTableInfoDelta {
747 committed_epoch: info.committed_epoch,
748 compaction_group_id: new_compaction_group_id,
749 }
750 )
751 .is_none()
752 );
753 }
754 });
755
756 result.push((new_compaction_group_id, table_ids_right));
757
758 {
759 let mut compaction_group_manager = self.compaction_group_manager.write().await;
760 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
761 compaction_groups_txn
762 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
763
764 for (cg_id, table_ids) in &result {
768 if let Some(partition_vnode_count) = partition_vnode_count
770 && table_ids.len() == 1
771 && table_ids == split_table_ids
772 && let Err(err) = compaction_groups_txn.update_compaction_config(
773 &[*cg_id],
774 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
775 )
776 {
777 tracing::error!(
778 error = %err.as_report(),
779 "failed to update compaction config for group-{}",
780 cg_id
781 );
782 }
783 }
784
785 new_version_delta.pre_apply();
786 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
787 }
788 versioning.mark_next_time_travel_version_snapshot();
790
791 let mut canceled_tasks = vec![];
794 let compact_task_assignments =
795 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
796 let levels = versioning
797 .current_version
798 .get_compaction_group_levels(parent_group_id);
799 compact_task_assignments
800 .into_iter()
801 .for_each(|task_assignment| {
802 let task = &task_assignment.compact_task;
803 let is_expired = is_compaction_task_expired(
804 task.compaction_group_version_id,
805 levels.compaction_group_version_id,
806 );
807 if is_expired {
808 canceled_tasks.push(ReportTask {
809 task_id: task.task_id,
810 task_status: TaskStatus::ManualCanceled,
811 table_stats_change: HashMap::default(),
812 sorted_output_ssts: vec![],
813 object_timestamps: HashMap::default(),
814 });
815 }
816 });
817
818 if !canceled_tasks.is_empty() {
819 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
820 .await?;
821 } else {
822 drop(versioning_guard);
823 drop(compaction_guard);
824 }
825
826 let affected_group_ids = result.iter().map(|(cg_id, _)| *cg_id).collect_vec();
827 self.try_update_write_limits(&affected_group_ids).await;
828
829 self.metrics
830 .split_compaction_group_count
831 .with_label_values(&[&parent_group_id.to_string()])
832 .inc();
833
834 Ok(result)
835 }
836
837 pub async fn move_state_tables_to_dedicated_compaction_group(
840 &self,
841 parent_group_id: CompactionGroupId,
842 table_ids: &[StateTableId],
843 partition_vnode_count: Option<u32>,
844 ) -> Result<(
845 CompactionGroupId,
846 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
847 )> {
848 if table_ids.is_empty() {
849 return Err(Error::CompactionGroup(
850 "table_ids must not be empty".to_owned(),
851 ));
852 }
853
854 if !table_ids.is_sorted() {
855 return Err(Error::CompactionGroup(
856 "table_ids must be sorted".to_owned(),
857 ));
858 }
859
860 let parent_table_ids = {
861 let versioning_guard = self.versioning.read().await;
862 versioning_guard
863 .current_version
864 .state_table_info
865 .compaction_group_member_table_ids(parent_group_id)
866 .iter()
867 .copied()
868 .collect_vec()
869 };
870
871 if parent_table_ids == table_ids {
872 return Err(Error::CompactionGroup(format!(
873 "invalid split attempt for group {}: all member tables are moved",
874 parent_group_id
875 )));
876 }
877
878 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
879 {
881 cg_id_to_table_ids
882 .iter()
883 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
884 }
885
886 {
888 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
889 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
890 assert!(table_table_ids_vec.concat().is_sorted());
891 }
892
893 {
895 let mut all_table_ids = HashSet::new();
896 for table_ids in cg_id_to_table_ids.values() {
897 for table_id in table_ids {
898 assert!(all_table_ids.insert(*table_id));
899 }
900 }
901 }
902 }
903
904 let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
914 let table_id_to_split = *table_ids.first().unwrap();
915 let mut target_compaction_group_id: CompactionGroupId = 0.into();
916 let result_vec = self
917 .split_compaction_group_impl(
918 parent_group_id,
919 table_ids,
920 table_id_to_split,
921 VirtualNode::ZERO,
922 partition_vnode_count,
923 )
924 .await?;
925 assert!(result_vec.len() <= 2);
926
927 let mut finish_move = false;
928 for (cg_id, table_ids_after_split) in result_vec {
929 if table_ids_after_split.contains(&table_id_to_split) {
930 target_compaction_group_id = cg_id;
931 }
932
933 if table_ids_after_split == table_ids {
934 finish_move = true;
935 }
936
937 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
938 }
939 check_table_ids_valid(&cg_id_to_table_ids);
940
941 if finish_move {
942 return Ok((target_compaction_group_id, cg_id_to_table_ids));
943 }
944
945 let table_id_to_split = *table_ids.last().unwrap();
948 let result_vec = self
949 .split_compaction_group_impl(
950 target_compaction_group_id,
951 table_ids,
952 table_id_to_split,
953 VirtualNode::MAX_REPRESENTABLE,
954 partition_vnode_count,
955 )
956 .await?;
957 assert!(result_vec.len() <= 2);
958 for (cg_id, table_ids_after_split) in result_vec {
959 if table_ids_after_split.contains(&table_id_to_split) {
960 target_compaction_group_id = cg_id;
961 }
962 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
963 }
964 check_table_ids_valid(&cg_id_to_table_ids);
965
966 Ok((target_compaction_group_id, cg_id_to_table_ids))
967 }
968}
969
970impl HummockManager {
971 async fn build_normalize_plan(&self) -> Option<NormalizePlan> {
972 let groups = self.calculate_compaction_group_statistic().await;
973 build_normalize_plan_from_group_statistics(&groups)
974 }
975
976 async fn apply_normalize_plan(&self, plan: &NormalizePlan) -> Result<bool> {
977 let (table_ids_right, boundary_table_id, new_compaction_group_id) = {
978 let mut versioning_guard = self.versioning.write().await;
979 let versioning = versioning_guard.deref_mut();
980 let mut compaction_group_manager = self.compaction_group_manager.write().await;
981
982 let groups = collect_normalize_group_statistics(
983 &versioning.current_version,
984 &compaction_group_manager,
985 )?;
986 let Some(current_plan) = build_normalize_plan_from_group_statistics(&groups) else {
987 return Ok(false);
988 };
989
990 if ¤t_plan != plan {
991 return Ok(false);
992 }
993
994 let (_table_ids_left, table_ids_right) = plan.split_table_ids();
995
996 let config = compaction_group_manager
997 .try_get_compaction_group_config(plan.parent_group_id)
998 .ok_or_else(|| {
999 Error::CompactionGroup(format!(
1000 "parent group {} config not found",
1001 plan.parent_group_id
1002 ))
1003 })?
1004 .compaction_config()
1005 .as_ref()
1006 .clone();
1007
1008 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
1009 let mut version = HummockVersionTransaction::new(
1010 &mut versioning.current_version,
1011 &mut versioning.hummock_version_deltas,
1012 &mut versioning.table_change_log,
1013 self.env.notification_manager(),
1014 None,
1015 &self.metrics,
1016 &self.env.opts,
1017 );
1018 let mut new_version_delta = version.new_delta();
1019 let split_key = plan.split_key();
1020 let split_sst_count = new_version_delta
1021 .latest_version()
1022 .count_new_ssts_in_group_split(plan.parent_group_id, split_key.clone());
1023 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
1024 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
1025
1026 #[expect(deprecated)]
1027 new_version_delta.group_deltas.insert(
1028 new_compaction_group_id,
1029 GroupDeltas {
1030 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
1031 group_config: Some(config.clone()),
1032 group_id: new_compaction_group_id,
1033 parent_group_id: plan.parent_group_id,
1034 new_sst_start_id,
1035 table_ids: vec![],
1036 version: CompatibilityVersion::LATEST as _,
1037 split_key: Some(split_key.into()),
1038 }))],
1039 },
1040 );
1041
1042 new_version_delta.with_latest_version(|version, new_version_delta| {
1043 for &table_id in &table_ids_right {
1044 let info = version
1045 .state_table_info
1046 .info()
1047 .get(&table_id)
1048 .expect("table should exist before normalize split");
1049 assert!(
1050 new_version_delta
1051 .state_table_info_delta
1052 .insert(
1053 table_id,
1054 PbStateTableInfoDelta {
1055 committed_epoch: info.committed_epoch,
1056 compaction_group_id: new_compaction_group_id,
1057 }
1058 )
1059 .is_none()
1060 );
1061 }
1062 });
1063 new_version_delta.pre_apply();
1064 compaction_groups_txn
1065 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
1066
1067 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
1068 versioning.mark_next_time_travel_version_snapshot();
1069
1070 (
1071 table_ids_right,
1072 plan.boundary_table_id,
1073 new_compaction_group_id,
1074 )
1075 };
1076
1077 self.cancel_expired_normalize_split_tasks(plan.parent_group_id)
1078 .await?;
1079 self.try_update_write_limits(&[plan.parent_group_id, new_compaction_group_id])
1080 .await;
1081 self.metrics
1082 .split_compaction_group_count
1083 .with_label_values(&[&plan.parent_group_id.to_string()])
1084 .inc();
1085 tracing::info!(
1086 "normalize split success: parent_group={} boundary_table_id={} moved_tables={:?} new_group_id={}",
1087 plan.parent_group_id,
1088 boundary_table_id,
1089 table_ids_right,
1090 new_compaction_group_id
1091 );
1092
1093 Ok(true)
1094 }
1095
1096 async fn cancel_expired_normalize_split_tasks(
1097 &self,
1098 parent_group_id: CompactionGroupId,
1099 ) -> Result<()> {
1100 let mut canceled_tasks = vec![];
1101 let compaction_guard = self.compaction.write().await;
1102 let mut versioning_guard = self.versioning.write().await;
1103 let versioning = versioning_guard.deref_mut();
1104 let compact_task_assignments =
1105 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
1106 let levels = versioning
1107 .current_version
1108 .get_compaction_group_levels(parent_group_id);
1109 compact_task_assignments
1110 .into_iter()
1111 .for_each(|task_assignment| {
1112 let task = &task_assignment.compact_task;
1113 if is_compaction_task_expired(
1114 task.compaction_group_version_id,
1115 levels.compaction_group_version_id,
1116 ) {
1117 canceled_tasks.push(ReportTask {
1118 task_id: task.task_id,
1119 task_status: TaskStatus::ManualCanceled,
1120 table_stats_change: HashMap::default(),
1121 sorted_output_ssts: vec![],
1122 object_timestamps: HashMap::default(),
1123 });
1124 }
1125 });
1126 canceled_tasks.sort_by_key(|task| task.task_id);
1127 canceled_tasks.dedup_by_key(|task| task.task_id);
1128
1129 if !canceled_tasks.is_empty() {
1130 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
1131 .await?;
1132 }
1133
1134 Ok(())
1135 }
1136
1137 pub async fn normalize_overlapping_compaction_groups(&self) -> Result<usize> {
1144 self.normalize_overlapping_compaction_groups_with_limit(usize::MAX)
1145 .await
1146 }
1147
1148 pub async fn normalize_overlapping_compaction_groups_with_limit(
1149 &self,
1150 max_splits: usize,
1151 ) -> Result<usize> {
1152 let mut split_count = 0usize;
1153 while split_count < max_splits {
1154 let Some(plan) = self.build_normalize_plan().await else {
1155 break;
1156 };
1157
1158 if !self.apply_normalize_plan(&plan).await? {
1159 tracing::debug!(
1160 parent_group_id = %plan.parent_group_id,
1161 boundary_table_id = %plan.boundary_table_id,
1162 "normalize plan became stale before apply"
1163 );
1164 break;
1165 }
1166 split_count += 1;
1167 }
1168
1169 Ok(split_count)
1170 }
1171
1172 pub async fn try_split_compaction_group(
1174 &self,
1175 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1176 group: CompactionGroupStatistic,
1177 ) {
1178 if group
1179 .compaction_group_config
1180 .compaction_config
1181 .disable_auto_group_scheduling
1182 .unwrap_or(false)
1183 {
1184 return;
1185 }
1186 for (table_id, table_size) in &group.table_statistic {
1188 self.try_move_high_throughput_table_to_dedicated_cg(
1189 table_write_throughput_statistic_manager,
1190 *table_id,
1191 table_size,
1192 group.group_id,
1193 )
1194 .await;
1195 }
1196
1197 self.try_split_huge_compaction_group(group).await;
1199 }
1200
1201 pub async fn try_move_high_throughput_table_to_dedicated_cg(
1203 &self,
1204 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1205 table_id: TableId,
1206 _table_size: &u64,
1207 parent_group_id: CompactionGroupId,
1208 ) {
1209 let mut table_throughput = table_write_throughput_statistic_manager
1210 .get_table_throughput_descending(
1211 table_id,
1212 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
1213 )
1214 .peekable();
1215
1216 if table_throughput.peek().is_none() {
1217 return;
1218 }
1219
1220 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
1221 table_throughput,
1222 self.env.opts.table_high_write_throughput_threshold,
1223 self.env
1224 .opts
1225 .table_stat_high_write_throughput_ratio_for_split,
1226 );
1227
1228 if !is_high_write_throughput {
1230 return;
1231 }
1232
1233 let ret = self
1234 .move_state_tables_to_dedicated_compaction_group(
1235 parent_group_id,
1236 &[table_id],
1237 Some(self.env.opts.partition_vnode_count),
1238 )
1239 .await;
1240 match ret {
1241 Ok(split_result) => {
1242 tracing::info!(
1243 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
1244 table_id,
1245 parent_group_id,
1246 self.env.opts.partition_vnode_count,
1247 split_result
1248 );
1249 }
1250 Err(e) => {
1251 tracing::info!(
1252 error = %e.as_report(),
1253 "failed to split state table [{}] from group-{}",
1254 table_id,
1255 parent_group_id,
1256 )
1257 }
1258 }
1259 }
1260
1261 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
1262 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
1263 * self.env.opts.split_group_size_ratio) as u64;
1264 let is_huge_hybrid_group =
1265 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
1267 let mut accumulated_size = 0;
1268 let mut table_ids = Vec::default();
1269 for (table_id, table_size) in &group.table_statistic {
1270 accumulated_size += table_size;
1271 table_ids.push(*table_id);
1272 assert!(table_ids.is_sorted());
1275 let remaining_size = group.group_size.saturating_sub(accumulated_size);
1276 if accumulated_size > group_max_size / 2
1277 && remaining_size > 0
1278 && table_ids.len() < group.table_statistic.len()
1279 {
1280 let ret = self
1281 .move_state_tables_to_dedicated_compaction_group(
1282 group.group_id,
1283 &table_ids,
1284 None,
1285 )
1286 .await;
1287 match ret {
1288 Ok(split_result) => {
1289 tracing::info!(
1290 "split_huge_compaction_group success {:?}",
1291 split_result
1292 );
1293 self.metrics
1294 .split_compaction_group_count
1295 .with_label_values(&[&group.group_id.to_string()])
1296 .inc();
1297 return;
1298 }
1299 Err(e) => {
1300 tracing::error!(
1301 error = %e.as_report(),
1302 "failed to split_huge_compaction_group table {:?} from group-{}",
1303 table_ids,
1304 group.group_id
1305 );
1306
1307 return;
1308 }
1309 }
1310 }
1311 }
1312 }
1313 }
1314
1315 pub async fn try_merge_compaction_group(
1316 &self,
1317 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1318 group: &CompactionGroupStatistic,
1319 next_group: &CompactionGroupStatistic,
1320 created_tables: &HashSet<TableId>,
1321 ) -> Result<()> {
1322 GroupMergeValidator::validate_group_merge(
1323 group,
1324 next_group,
1325 created_tables,
1326 table_write_throughput_statistic_manager,
1327 &self.env.opts,
1328 &self.versioning,
1329 )
1330 .await?;
1331
1332 let result = self
1333 .merge_compaction_group(group.group_id, next_group.group_id)
1334 .await;
1335
1336 match &result {
1337 Ok(()) => {
1338 tracing::info!(
1339 "merge group-{} to group-{}",
1340 next_group.group_id,
1341 group.group_id,
1342 );
1343
1344 self.metrics
1345 .merge_compaction_group_count
1346 .with_label_values(&[&group.group_id.to_string()])
1347 .inc();
1348 }
1349 Err(e) => {
1350 tracing::info!(
1351 error = %e.as_report(),
1352 "failed to merge group-{} group-{}",
1353 next_group.group_id,
1354 group.group_id,
1355 );
1356 }
1357 }
1358
1359 result
1360 }
1361}
1362
1363#[derive(Debug, Default)]
1364struct GroupMergeValidator {}
1365
1366impl GroupMergeValidator {
1367 fn is_merge_compatible_by_semantics(
1370 group: &CompactionGroupStatistic,
1371 next_group: &CompactionGroupStatistic,
1372 ) -> bool {
1373 let (mut left, mut right) = (
1374 group
1375 .compaction_group_config
1376 .compaction_config
1377 .as_ref()
1378 .clone(),
1379 next_group
1380 .compaction_group_config
1381 .compaction_config
1382 .as_ref()
1383 .clone(),
1384 );
1385 left.split_weight_by_vnode = 0;
1386 right.split_weight_by_vnode = 0;
1387 left == right
1388 }
1389
1390 pub fn is_table_high_write_throughput(
1392 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1393 threshold: u64,
1394 high_write_throughput_ratio: f64,
1395 ) -> bool {
1396 let mut sample_size = 0;
1397 let mut high_write_throughput_count = 0;
1398 for statistic in table_throughput {
1399 sample_size += 1;
1400 if statistic.throughput > threshold {
1401 high_write_throughput_count += 1;
1402 }
1403 }
1404
1405 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
1406 }
1407
1408 pub fn is_table_low_write_throughput(
1409 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1410 threshold: u64,
1411 low_write_throughput_ratio: f64,
1412 ) -> bool {
1413 let mut sample_size = 0;
1414 let mut low_write_throughput_count = 0;
1415 for statistic in table_throughput {
1416 sample_size += 1;
1417 if statistic.throughput <= threshold {
1418 low_write_throughput_count += 1;
1419 }
1420 }
1421
1422 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
1423 }
1424
1425 fn check_is_low_write_throughput_compaction_group(
1426 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1427 group: &CompactionGroupStatistic,
1428 opts: &Arc<MetaOpts>,
1429 ) -> bool {
1430 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1431 for table_id in group.table_statistic.keys() {
1432 let mut table_throughput = table_write_throughput_statistic_manager
1433 .get_table_throughput_descending(
1434 *table_id,
1435 opts.table_stat_throuput_window_seconds_for_merge as i64,
1436 )
1437 .peekable();
1438 if table_throughput.peek().is_none() {
1439 continue;
1440 }
1441
1442 table_with_statistic.push(table_throughput);
1443 }
1444
1445 if table_with_statistic.is_empty() {
1447 return true;
1448 }
1449
1450 table_with_statistic.into_iter().all(|table_throughput| {
1452 Self::is_table_low_write_throughput(
1453 table_throughput,
1454 opts.table_low_write_throughput_threshold,
1455 opts.table_stat_low_write_throughput_ratio_for_merge,
1456 )
1457 })
1458 }
1459
1460 fn check_is_creating_compaction_group(
1461 group: &CompactionGroupStatistic,
1462 created_tables: &HashSet<TableId>,
1463 ) -> bool {
1464 group
1465 .table_statistic
1466 .keys()
1467 .any(|table_id| !created_tables.contains(table_id))
1468 }
1469
1470 async fn validate_group_merge(
1471 group: &CompactionGroupStatistic,
1472 next_group: &CompactionGroupStatistic,
1473 created_tables: &HashSet<TableId>,
1474 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1475 opts: &Arc<MetaOpts>,
1476 versioning: &MonitoredRwLock<Versioning>,
1477 ) -> Result<()> {
1478 if (group.group_id == StaticCompactionGroupId::StateDefault
1480 && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1481 || (group.group_id == StaticCompactionGroupId::MaterializedView
1482 && next_group.group_id == StaticCompactionGroupId::StateDefault)
1483 {
1484 return Err(Error::CompactionGroup(format!(
1485 "group-{} and group-{} are both StaticCompactionGroupId",
1486 group.group_id, next_group.group_id
1487 )));
1488 }
1489
1490 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1491 return Err(Error::CompactionGroup(format!(
1492 "group-{} or group-{} is empty",
1493 group.group_id, next_group.group_id
1494 )));
1495 }
1496
1497 {
1502 let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1503 let mut table_ids_2: Vec<TableId> =
1504 next_group.table_statistic.keys().cloned().collect_vec();
1505 table_ids_1.sort();
1506 table_ids_2.sort();
1507 if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1508 std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1509 }
1510 if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1511 return Err(Error::CompactionGroup(format!(
1512 "group-{} and group-{} have overlapping table id ranges, not mergeable",
1513 group.group_id, next_group.group_id
1514 )));
1515 }
1516 }
1517
1518 if group
1519 .compaction_group_config
1520 .compaction_config
1521 .disable_auto_group_scheduling
1522 .unwrap_or(false)
1523 || next_group
1524 .compaction_group_config
1525 .compaction_config
1526 .disable_auto_group_scheduling
1527 .unwrap_or(false)
1528 {
1529 return Err(Error::CompactionGroup(format!(
1530 "group-{} or group-{} disable_auto_group_scheduling",
1531 group.group_id, next_group.group_id
1532 )));
1533 }
1534
1535 if !Self::is_merge_compatible_by_semantics(group, next_group) {
1538 let left_config = group.compaction_group_config.compaction_config.as_ref();
1539 let right_config = next_group
1540 .compaction_group_config
1541 .compaction_config
1542 .as_ref();
1543
1544 tracing::warn!(
1545 group_id = %group.group_id,
1546 next_group_id = %next_group.group_id,
1547 left_config = ?left_config,
1548 right_config = ?right_config,
1549 "compaction config semantic mismatch detected while merging compaction groups"
1550 );
1551
1552 return Err(Error::CompactionGroup(format!(
1553 "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1554 group.group_id, next_group.group_id, left_config, right_config
1555 )));
1556 }
1557
1558 if Self::check_is_creating_compaction_group(group, created_tables) {
1560 return Err(Error::CompactionGroup(format!(
1561 "Cannot merge creating group {} next_group {}",
1562 group.group_id, next_group.group_id
1563 )));
1564 }
1565
1566 if !Self::check_is_low_write_throughput_compaction_group(
1568 table_write_throughput_statistic_manager,
1569 group,
1570 opts,
1571 ) {
1572 return Err(Error::CompactionGroup(format!(
1573 "Cannot merge high throughput group {} next_group {}",
1574 group.group_id, next_group.group_id
1575 )));
1576 }
1577
1578 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1579 * opts.split_group_size_ratio) as u64;
1580
1581 if (group.group_size + next_group.group_size) > size_limit {
1582 return Err(Error::CompactionGroup(format!(
1583 "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1584 group.group_id,
1585 group.group_size,
1586 next_group.group_id,
1587 next_group.group_size,
1588 size_limit
1589 )));
1590 }
1591
1592 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1593 return Err(Error::CompactionGroup(format!(
1594 "Cannot merge creating group {} next group {}",
1595 group.group_id, next_group.group_id
1596 )));
1597 }
1598
1599 if !Self::check_is_low_write_throughput_compaction_group(
1600 table_write_throughput_statistic_manager,
1601 next_group,
1602 opts,
1603 ) {
1604 return Err(Error::CompactionGroup(format!(
1605 "Cannot merge high throughput group {} next group {}",
1606 group.group_id, next_group.group_id
1607 )));
1608 }
1609
1610 {
1611 let versioning_guard = versioning.read().await;
1613 let levels = &versioning_guard.current_version.levels;
1614 if !levels.contains_key(&group.group_id) {
1615 return Err(Error::CompactionGroup(format!(
1616 "Cannot merge group {} not exist",
1617 group.group_id
1618 )));
1619 }
1620
1621 if !levels.contains_key(&next_group.group_id) {
1622 return Err(Error::CompactionGroup(format!(
1623 "Cannot merge next group {} not exist",
1624 next_group.group_id
1625 )));
1626 }
1627
1628 let group_levels = versioning_guard
1629 .current_version
1630 .get_compaction_group_levels(group.group_id);
1631
1632 let next_group_levels = versioning_guard
1633 .current_version
1634 .get_compaction_group_levels(next_group.group_id);
1635
1636 let group_state = GroupStateValidator::group_state(
1637 group_levels,
1638 group.compaction_group_config.compaction_config().deref(),
1639 );
1640
1641 if group_state.is_write_stop() || group_state.is_emergency() {
1642 return Err(Error::CompactionGroup(format!(
1643 "Cannot merge write limit group {} next group {}",
1644 group.group_id, next_group.group_id
1645 )));
1646 }
1647
1648 let next_group_state = GroupStateValidator::group_state(
1649 next_group_levels,
1650 next_group
1651 .compaction_group_config
1652 .compaction_config()
1653 .deref(),
1654 );
1655
1656 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1657 return Err(Error::CompactionGroup(format!(
1658 "Cannot merge write limit next group {} group {}",
1659 next_group.group_id, group.group_id
1660 )));
1661 }
1662
1663 let l0_sub_level_count_after_merge =
1665 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1666 if GroupStateValidator::write_stop_sub_level_count(
1667 (l0_sub_level_count_after_merge as f64
1668 * opts.compaction_group_merge_dimension_threshold) as usize,
1669 group.compaction_group_config.compaction_config().deref(),
1670 ) {
1671 return Err(Error::CompactionGroup(format!(
1672 "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1673 group.group_id, next_group.group_id
1674 )));
1675 }
1676
1677 let l0_file_count_after_merge = group_levels
1678 .l0
1679 .sub_levels
1680 .iter()
1681 .chain(next_group_levels.l0.sub_levels.iter())
1682 .map(|level| level.table_infos.len())
1683 .sum::<usize>();
1684 if GroupStateValidator::write_stop_l0_file_count(
1685 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1686 as usize,
1687 group.compaction_group_config.compaction_config().deref(),
1688 ) {
1689 return Err(Error::CompactionGroup(format!(
1690 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1691 next_group.group_id, group.group_id
1692 )));
1693 }
1694
1695 let l0_size_after_merge =
1696 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1697
1698 if GroupStateValidator::write_stop_l0_size(
1699 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1700 as u64,
1701 group.compaction_group_config.compaction_config().deref(),
1702 ) {
1703 return Err(Error::CompactionGroup(format!(
1704 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1705 next_group.group_id, group.group_id
1706 )));
1707 }
1708
1709 if GroupStateValidator::emergency_l0_file_count(
1711 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1712 as usize,
1713 group.compaction_group_config.compaction_config().deref(),
1714 ) {
1715 return Err(Error::CompactionGroup(format!(
1716 "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1717 group.group_id, next_group.group_id
1718 )));
1719 }
1720 }
1721
1722 Ok(())
1723 }
1724}