1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas, HummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::compaction_group_manager::CompactionGroupManager;
38use super::{CompactionGroupStatistic, GroupStateValidator};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
45use crate::hummock::table_write_throughput_statistic::{
46 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
47};
48use crate::manager::MetaOpts;
49
50#[derive(Debug, PartialEq, Eq)]
51struct NormalizePlan {
52 parent_group_id: CompactionGroupId,
53 parent_table_ids: Vec<StateTableId>,
54 boundary_table_id: StateTableId,
55}
56
57impl NormalizePlan {
58 fn split_key(&self) -> Bytes {
59 group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO)
60 .encode()
61 .into()
62 }
63
64 fn split_table_ids(&self) -> (Vec<StateTableId>, Vec<StateTableId>) {
65 let split_full_key =
66 group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO);
67 let (table_ids_left, table_ids_right) =
68 group_split::split_table_ids_with_table_id_and_vnode(
69 &self.parent_table_ids,
70 split_full_key.user_key.table_id,
71 split_full_key.user_key.get_vnode_id(),
72 );
73 assert!(!table_ids_left.is_empty() && !table_ids_right.is_empty());
74 (table_ids_left, table_ids_right)
75 }
76}
77
78fn gen_normalize_plan(
79 left: &CompactionGroupStatistic,
80 right: &CompactionGroupStatistic,
81) -> Option<NormalizePlan> {
82 let left_table_ids = left.table_statistic.keys().copied().collect_vec();
83
84 if left_table_ids.len() <= 1 {
85 return None;
86 }
87
88 let left_max = *left_table_ids.last().unwrap();
89 let right_min = *right.table_statistic.keys().next().unwrap();
90 if left_max < right_min {
91 return None;
92 }
93
94 let boundary_index = left_table_ids.partition_point(|&table_id| table_id < right_min);
95 if boundary_index == 0 || boundary_index >= left_table_ids.len() {
96 return None;
97 }
98 let boundary_table_id = left_table_ids[boundary_index];
99
100 Some(NormalizePlan {
101 parent_group_id: left.group_id,
102 parent_table_ids: left_table_ids,
103 boundary_table_id,
104 })
105}
106
107fn build_normalize_plan_from_group_statistics(
108 groups: &[CompactionGroupStatistic],
109) -> Option<NormalizePlan> {
110 let mut groups = groups
113 .iter()
114 .filter(|group| !group.table_statistic.is_empty())
115 .collect_vec();
116 groups.sort_by_key(|group| *group.table_statistic.keys().next().unwrap());
117
118 groups
119 .split(|group| {
120 group
121 .compaction_group_config
122 .compaction_config
123 .disable_auto_group_scheduling
124 .unwrap_or(false)
125 })
126 .find_map(|segment| {
127 segment
128 .windows(2)
129 .find_map(|pair| gen_normalize_plan(pair[0], pair[1]))
130 })
131}
132
133fn collect_normalize_group_statistics(
134 version: &HummockVersion,
135 compaction_group_manager: &CompactionGroupManager,
136) -> Result<Vec<CompactionGroupStatistic>> {
137 let mut groups = vec![];
138 for group_id in version.levels.keys() {
139 let table_ids = version
140 .state_table_info
141 .compaction_group_member_table_ids(*group_id)
142 .iter()
143 .copied()
144 .collect_vec();
145 if table_ids.is_empty() {
146 continue;
147 }
148
149 let group_config = compaction_group_manager
150 .try_get_compaction_group_config(*group_id)
151 .ok_or_else(|| {
152 Error::CompactionGroup(format!(
153 "group {} config not found during normalize",
154 group_id
155 ))
156 })?;
157 groups.push(CompactionGroupStatistic {
158 group_id: *group_id,
159 group_size: 0,
160 table_statistic: table_ids
161 .into_iter()
162 .map(|table_id| (table_id, 0))
163 .collect(),
164 compaction_group_config: group_config,
165 });
166 }
167 Ok(groups)
168}
169
170impl HummockManager {
171 pub async fn merge_compaction_group(
172 &self,
173 group_1: CompactionGroupId,
174 group_2: CompactionGroupId,
175 ) -> Result<()> {
176 self.merge_compaction_group_impl(group_1, group_2, None)
177 .await
178 }
179
180 pub async fn merge_compaction_group_for_test(
181 &self,
182 group_1: CompactionGroupId,
183 group_2: CompactionGroupId,
184 created_tables: HashSet<TableId>,
185 ) -> Result<()> {
186 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
187 .await
188 }
189
190 pub async fn merge_compaction_group_impl(
191 &self,
192 group_1: CompactionGroupId,
193 group_2: CompactionGroupId,
194 created_tables: Option<HashSet<TableId>>,
195 ) -> Result<()> {
196 let compaction_guard = self.compaction.write().await;
197 let mut versioning_guard = self.versioning.write().await;
198 let versioning = versioning_guard.deref_mut();
199 if !versioning.current_version.levels.contains_key(&group_1) {
201 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
202 }
203
204 if !versioning.current_version.levels.contains_key(&group_2) {
205 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
206 }
207
208 let state_table_info = versioning.current_version.state_table_info.clone();
209 let mut member_table_ids_1 = state_table_info
210 .compaction_group_member_table_ids(group_1)
211 .iter()
212 .cloned()
213 .collect_vec();
214
215 if member_table_ids_1.is_empty() {
216 return Err(Error::CompactionGroup(format!(
217 "group_1 {} is empty",
218 group_1
219 )));
220 }
221
222 let mut member_table_ids_2 = state_table_info
223 .compaction_group_member_table_ids(group_2)
224 .iter()
225 .cloned()
226 .collect_vec();
227
228 if member_table_ids_2.is_empty() {
229 return Err(Error::CompactionGroup(format!(
230 "group_2 {} is empty",
231 group_2
232 )));
233 }
234
235 debug_assert!(!member_table_ids_1.is_empty());
236 debug_assert!(!member_table_ids_2.is_empty());
237 assert!(member_table_ids_1.is_sorted());
238 assert!(member_table_ids_2.is_sorted());
239
240 let created_tables = if let Some(created_tables) = created_tables {
241 #[expect(clippy::assertions_on_constants)]
243 {
244 assert!(cfg!(debug_assertions));
245 }
246 created_tables
247 } else {
248 match self.metadata_manager.get_created_table_ids().await {
249 Ok(created_tables) => HashSet::from_iter(created_tables),
250 Err(err) => {
251 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
252 return Err(Error::CompactionGroup(format!(
253 "merge group_1 {} group_2 {} failed to fetch created table ids",
254 group_1, group_2
255 )));
256 }
257 }
258 };
259
260 fn contains_creating_table(
261 table_ids: &Vec<TableId>,
262 created_tables: &HashSet<TableId>,
263 ) -> bool {
264 table_ids
265 .iter()
266 .any(|table_id| !created_tables.contains(table_id))
267 }
268
269 if contains_creating_table(&member_table_ids_1, &created_tables)
271 || contains_creating_table(&member_table_ids_2, &created_tables)
272 {
273 return Err(Error::CompactionGroup(format!(
274 "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
275 group_1, group_2, member_table_ids_1, member_table_ids_2
276 )));
277 }
278
279 let (left_group_id, right_group_id) =
281 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
282 (group_1, group_2)
283 } else {
284 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
285 (group_2, group_1)
286 };
287
288 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
293 return Err(Error::CompactionGroup(format!(
294 "invalid merge group_1 {} group_2 {}: table id ranges overlap",
295 left_group_id, right_group_id
296 )));
297 }
298
299 let combined_member_table_ids = member_table_ids_1
300 .iter()
301 .chain(member_table_ids_2.iter())
302 .collect_vec();
303 assert!(combined_member_table_ids.is_sorted());
304
305 let mut sst_id_set = HashSet::new();
307 for sst_id in versioning
308 .current_version
309 .get_sst_ids_by_group_id(left_group_id)
310 .chain(
311 versioning
312 .current_version
313 .get_sst_ids_by_group_id(right_group_id),
314 )
315 {
316 if !sst_id_set.insert(sst_id) {
317 return Err(Error::CompactionGroup(format!(
318 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
319 left_group_id, right_group_id, sst_id
320 )));
321 }
322 }
323
324 {
326 let left_levels = versioning
327 .current_version
328 .get_compaction_group_levels(group_1);
329
330 let right_levels = versioning
331 .current_version
332 .get_compaction_group_levels(group_2);
333
334 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
337 for level_idx in 1..=max_level {
338 let left_level = left_levels.get_level(level_idx);
339 let right_level = right_levels.get_level(level_idx);
340 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
341 continue;
342 }
343
344 let left_last_sst = left_level.table_infos.last().unwrap().clone();
345 let right_first_sst = right_level.table_infos.first().unwrap().clone();
346 let left_sst_id = left_last_sst.sst_id;
347 let right_sst_id = right_first_sst.sst_id;
348 let left_obj_id = left_last_sst.object_id;
349 let right_obj_id = right_first_sst.object_id;
350
351 if !can_concat(&[left_last_sst, right_first_sst]) {
353 return Err(Error::CompactionGroup(format!(
354 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
355 left_group_id,
356 right_group_id,
357 level_idx,
358 left_sst_id,
359 right_sst_id,
360 left_obj_id,
361 right_obj_id
362 )));
363 }
364 }
365 }
366
367 let mut version = HummockVersionTransaction::new(
368 &mut versioning.current_version,
369 &mut versioning.hummock_version_deltas,
370 &mut versioning.table_change_log,
371 self.env.notification_manager(),
372 None,
373 &self.metrics,
374 &self.env.opts,
375 );
376 let mut new_version_delta = version.new_delta();
377
378 let target_compaction_group_id = {
379 new_version_delta.group_deltas.insert(
381 left_group_id,
382 GroupDeltas {
383 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
384 left_group_id,
385 right_group_id,
386 })],
387 },
388 );
389 left_group_id
390 };
391
392 new_version_delta.with_latest_version(|version, new_version_delta| {
395 for &table_id in combined_member_table_ids {
396 let info = version
397 .state_table_info
398 .info()
399 .get(&table_id)
400 .expect("have check exist previously");
401 assert!(
402 new_version_delta
403 .state_table_info_delta
404 .insert(
405 table_id,
406 PbStateTableInfoDelta {
407 committed_epoch: info.committed_epoch,
408 compaction_group_id: target_compaction_group_id,
409 }
410 )
411 .is_none()
412 );
413 }
414 });
415
416 {
417 let mut compaction_group_manager = self.compaction_group_manager.write().await;
418 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
419
420 {
422 let right_group_max_level = new_version_delta
423 .latest_version()
424 .get_compaction_group_levels(right_group_id)
425 .levels
426 .len();
427
428 remove_compaction_group_in_sst_stat(
429 &self.metrics,
430 right_group_id,
431 right_group_max_level,
432 );
433 }
434
435 self.compaction_state
437 .remove_compaction_group(right_group_id);
438
439 {
441 if let Err(err) = compaction_groups_txn.update_compaction_config(
442 &[left_group_id],
443 &[MutableConfig::SplitWeightByVnode(0)], ) {
445 tracing::error!(
446 error = %err.as_report(),
447 "failed to update compaction config for group-{}",
448 left_group_id
449 );
450 }
451 }
452
453 new_version_delta.pre_apply();
454
455 compaction_groups_txn.remove(right_group_id);
457 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
458 }
459
460 versioning.mark_next_time_travel_version_snapshot();
462
463 let mut canceled_tasks = vec![];
465 let compact_task_assignments =
468 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
469 compact_task_assignments
470 .into_iter()
471 .for_each(|task_assignment| {
472 if let Some(task) = task_assignment.compact_task.as_ref() {
473 assert_eq!(task.compaction_group_id, right_group_id);
474 canceled_tasks.push(ReportTask {
475 task_id: task.task_id,
476 task_status: TaskStatus::ManualCanceled,
477 table_stats_change: HashMap::default(),
478 sorted_output_ssts: vec![],
479 object_timestamps: HashMap::default(),
480 });
481 }
482 });
483
484 if !canceled_tasks.is_empty() {
485 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
486 .await?;
487 }
488
489 self.metrics
490 .merge_compaction_group_count
491 .with_label_values(&[&left_group_id.to_string()])
492 .inc();
493
494 Ok(())
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use std::collections::BTreeMap;
501
502 use risingwave_hummock_sdk::CompactionGroupId;
503 use risingwave_pb::hummock::CompactionConfig;
504
505 use super::{
506 CompactionGroupStatistic, NormalizePlan, build_normalize_plan_from_group_statistics,
507 gen_normalize_plan,
508 };
509 use crate::hummock::model::CompactionGroup;
510
511 fn group(
512 group_id: CompactionGroupId,
513 table_ids: &[u32],
514 disable_auto_group_scheduling: bool,
515 ) -> CompactionGroupStatistic {
516 let config = CompactionConfig {
517 disable_auto_group_scheduling: Some(disable_auto_group_scheduling),
518 ..Default::default()
519 };
520 CompactionGroupStatistic {
521 group_id,
522 group_size: 0,
523 table_statistic: table_ids
524 .iter()
525 .copied()
526 .map(|table_id| (table_id.into(), 0_u64))
527 .collect::<BTreeMap<_, _>>(),
528 compaction_group_config: CompactionGroup::new(group_id, config),
529 }
530 }
531
532 #[test]
533 fn test_gen_normalize_plan_returns_none_for_single_table_group() {
534 let left = group(1.into(), &[10], false);
535 let right = group(2.into(), &[5, 20], false);
536
537 assert_eq!(None, gen_normalize_plan(&left, &right));
538 }
539
540 #[test]
541 fn test_gen_normalize_plan_returns_none_for_non_overlapping_groups() {
542 let left = group(1.into(), &[1, 2, 3], false);
543 let right = group(2.into(), &[4, 5, 6], false);
544
545 assert_eq!(None, gen_normalize_plan(&left, &right));
546 }
547
548 #[test]
549 fn test_gen_normalize_plan_returns_none_when_boundary_cannot_split_parent() {
550 let left = group(1.into(), &[5, 6, 7], false);
551 let right = group(2.into(), &[4, 8], false);
552
553 assert_eq!(None, gen_normalize_plan(&left, &right));
554 }
555
556 #[test]
557 fn test_gen_normalize_plan_generates_expected_boundary() {
558 let left = group(1.into(), &[1, 4, 7], false);
559 let right = group(2.into(), &[2, 5, 8], false);
560
561 assert_eq!(
562 Some(NormalizePlan {
563 parent_group_id: 1.into(),
564 parent_table_ids: vec![1.into(), 4.into(), 7.into()],
565 boundary_table_id: 4.into(),
566 }),
567 gen_normalize_plan(&left, &right)
568 );
569 }
570
571 #[test]
572 fn test_build_normalize_plan_skips_disabled_boundary_and_continues_later_segment() {
573 let groups = vec![
574 group(1.into(), &[1, 4, 7], false),
575 group(2.into(), &[2, 5, 8], true),
576 group(3.into(), &[10, 13, 16], false),
577 group(4.into(), &[11, 14, 17], false),
578 ];
579
580 assert_eq!(
581 Some(NormalizePlan {
582 parent_group_id: 3.into(),
583 parent_table_ids: vec![10.into(), 13.into(), 16.into()],
584 boundary_table_id: 13.into(),
585 }),
586 build_normalize_plan_from_group_statistics(&groups)
587 );
588 }
589}
590
591impl HummockManager {
592 async fn split_compaction_group_impl(
604 &self,
605 parent_group_id: CompactionGroupId,
606 split_table_ids: &[StateTableId],
607 table_id_to_split: StateTableId,
608 vnode_to_split: VirtualNode,
609 partition_vnode_count: Option<u32>,
610 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
611 let mut result = vec![];
612 let compaction_guard = self.compaction.write().await;
613 let mut versioning_guard = self.versioning.write().await;
614 let versioning = versioning_guard.deref_mut();
615 if !versioning
617 .current_version
618 .levels
619 .contains_key(&parent_group_id)
620 {
621 return Err(Error::CompactionGroup(format!(
622 "invalid group {}",
623 parent_group_id
624 )));
625 }
626
627 let member_table_ids = versioning
628 .current_version
629 .state_table_info
630 .compaction_group_member_table_ids(parent_group_id)
631 .iter()
632 .copied()
633 .collect::<BTreeSet<_>>();
634
635 if !member_table_ids.contains(&table_id_to_split) {
636 return Err(Error::CompactionGroup(format!(
637 "table {} doesn't in group {}",
638 table_id_to_split, parent_group_id
639 )));
640 }
641
642 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
643
644 let table_ids = member_table_ids.into_iter().collect_vec();
646 if table_ids == split_table_ids {
647 return Err(Error::CompactionGroup(format!(
648 "invalid split attempt for group {}: all member tables are moved",
649 parent_group_id
650 )));
651 }
652 let (table_ids_left, table_ids_right) =
654 group_split::split_table_ids_with_table_id_and_vnode(
655 &table_ids,
656 split_full_key.user_key.table_id,
657 split_full_key.user_key.get_vnode_id(),
658 );
659 if table_ids_left.is_empty() || table_ids_right.is_empty() {
660 if !table_ids_left.is_empty() {
662 result.push((parent_group_id, table_ids_left));
663 }
664
665 if !table_ids_right.is_empty() {
666 result.push((parent_group_id, table_ids_right));
667 }
668 return Ok(result);
669 }
670
671 result.push((parent_group_id, table_ids_left));
672
673 let split_key: Bytes = split_full_key.encode().into();
674
675 let mut version = HummockVersionTransaction::new(
676 &mut versioning.current_version,
677 &mut versioning.hummock_version_deltas,
678 &mut versioning.table_change_log,
679 self.env.notification_manager(),
680 None,
681 &self.metrics,
682 &self.env.opts,
683 );
684 let mut new_version_delta = version.new_delta();
685
686 let split_sst_count = new_version_delta
687 .latest_version()
688 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
689
690 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
691 let (new_compaction_group_id, config) = {
692 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
694 let config = self
696 .compaction_group_manager
697 .read()
698 .await
699 .try_get_compaction_group_config(parent_group_id)
700 .ok_or_else(|| {
701 Error::CompactionGroup(format!(
702 "parent group {} config not found",
703 parent_group_id
704 ))
705 })?
706 .compaction_config()
707 .as_ref()
708 .clone();
709
710 #[expect(deprecated)]
711 new_version_delta.group_deltas.insert(
713 new_compaction_group_id,
714 GroupDeltas {
715 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
716 group_config: Some(config.clone()),
717 group_id: new_compaction_group_id,
718 parent_group_id,
719 new_sst_start_id,
720 table_ids: vec![],
721 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
723 }))],
724 },
725 );
726 (new_compaction_group_id, config)
727 };
728
729 new_version_delta.with_latest_version(|version, new_version_delta| {
730 for &table_id in &table_ids_right {
731 let info = version
732 .state_table_info
733 .info()
734 .get(&table_id)
735 .expect("have check exist previously");
736 assert!(
737 new_version_delta
738 .state_table_info_delta
739 .insert(
740 table_id,
741 PbStateTableInfoDelta {
742 committed_epoch: info.committed_epoch,
743 compaction_group_id: new_compaction_group_id,
744 }
745 )
746 .is_none()
747 );
748 }
749 });
750
751 result.push((new_compaction_group_id, table_ids_right));
752
753 {
754 let mut compaction_group_manager = self.compaction_group_manager.write().await;
755 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
756 compaction_groups_txn
757 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
758
759 for (cg_id, table_ids) in &result {
763 if let Some(partition_vnode_count) = partition_vnode_count
765 && table_ids.len() == 1
766 && table_ids == split_table_ids
767 && let Err(err) = compaction_groups_txn.update_compaction_config(
768 &[*cg_id],
769 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
770 )
771 {
772 tracing::error!(
773 error = %err.as_report(),
774 "failed to update compaction config for group-{}",
775 cg_id
776 );
777 }
778 }
779
780 new_version_delta.pre_apply();
781 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
782 }
783 versioning.mark_next_time_travel_version_snapshot();
785
786 let mut canceled_tasks = vec![];
789 let compact_task_assignments =
790 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
791 let levels = versioning
792 .current_version
793 .get_compaction_group_levels(parent_group_id);
794 compact_task_assignments
795 .into_iter()
796 .for_each(|task_assignment| {
797 if let Some(task) = task_assignment.compact_task.as_ref() {
798 let is_expired = is_compaction_task_expired(
799 task.compaction_group_version_id,
800 levels.compaction_group_version_id,
801 );
802 if is_expired {
803 canceled_tasks.push(ReportTask {
804 task_id: task.task_id,
805 task_status: TaskStatus::ManualCanceled,
806 table_stats_change: HashMap::default(),
807 sorted_output_ssts: vec![],
808 object_timestamps: HashMap::default(),
809 });
810 }
811 }
812 });
813
814 if !canceled_tasks.is_empty() {
815 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
816 .await?;
817 }
818
819 self.metrics
820 .split_compaction_group_count
821 .with_label_values(&[&parent_group_id.to_string()])
822 .inc();
823
824 Ok(result)
825 }
826
827 pub async fn move_state_tables_to_dedicated_compaction_group(
830 &self,
831 parent_group_id: CompactionGroupId,
832 table_ids: &[StateTableId],
833 partition_vnode_count: Option<u32>,
834 ) -> Result<(
835 CompactionGroupId,
836 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
837 )> {
838 if table_ids.is_empty() {
839 return Err(Error::CompactionGroup(
840 "table_ids must not be empty".to_owned(),
841 ));
842 }
843
844 if !table_ids.is_sorted() {
845 return Err(Error::CompactionGroup(
846 "table_ids must be sorted".to_owned(),
847 ));
848 }
849
850 let parent_table_ids = {
851 let versioning_guard = self.versioning.read().await;
852 versioning_guard
853 .current_version
854 .state_table_info
855 .compaction_group_member_table_ids(parent_group_id)
856 .iter()
857 .copied()
858 .collect_vec()
859 };
860
861 if parent_table_ids == table_ids {
862 return Err(Error::CompactionGroup(format!(
863 "invalid split attempt for group {}: all member tables are moved",
864 parent_group_id
865 )));
866 }
867
868 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
869 {
871 cg_id_to_table_ids
872 .iter()
873 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
874 }
875
876 {
878 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
879 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
880 assert!(table_table_ids_vec.concat().is_sorted());
881 }
882
883 {
885 let mut all_table_ids = HashSet::new();
886 for table_ids in cg_id_to_table_ids.values() {
887 for table_id in table_ids {
888 assert!(all_table_ids.insert(*table_id));
889 }
890 }
891 }
892 }
893
894 let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
904 let table_id_to_split = *table_ids.first().unwrap();
905 let mut target_compaction_group_id: CompactionGroupId = 0.into();
906 let result_vec = self
907 .split_compaction_group_impl(
908 parent_group_id,
909 table_ids,
910 table_id_to_split,
911 VirtualNode::ZERO,
912 partition_vnode_count,
913 )
914 .await?;
915 assert!(result_vec.len() <= 2);
916
917 let mut finish_move = false;
918 for (cg_id, table_ids_after_split) in result_vec {
919 if table_ids_after_split.contains(&table_id_to_split) {
920 target_compaction_group_id = cg_id;
921 }
922
923 if table_ids_after_split == table_ids {
924 finish_move = true;
925 }
926
927 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
928 }
929 check_table_ids_valid(&cg_id_to_table_ids);
930
931 if finish_move {
932 return Ok((target_compaction_group_id, cg_id_to_table_ids));
933 }
934
935 let table_id_to_split = *table_ids.last().unwrap();
938 let result_vec = self
939 .split_compaction_group_impl(
940 target_compaction_group_id,
941 table_ids,
942 table_id_to_split,
943 VirtualNode::MAX_REPRESENTABLE,
944 partition_vnode_count,
945 )
946 .await?;
947 assert!(result_vec.len() <= 2);
948 for (cg_id, table_ids_after_split) in result_vec {
949 if table_ids_after_split.contains(&table_id_to_split) {
950 target_compaction_group_id = cg_id;
951 }
952 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
953 }
954 check_table_ids_valid(&cg_id_to_table_ids);
955
956 Ok((target_compaction_group_id, cg_id_to_table_ids))
957 }
958}
959
960impl HummockManager {
961 async fn build_normalize_plan(&self) -> Option<NormalizePlan> {
962 let groups = self.calculate_compaction_group_statistic().await;
963 build_normalize_plan_from_group_statistics(&groups)
964 }
965
966 async fn apply_normalize_plan(&self, plan: &NormalizePlan) -> Result<bool> {
967 let (table_ids_right, boundary_table_id, new_compaction_group_id) = {
968 let mut versioning_guard = self.versioning.write().await;
969 let versioning = versioning_guard.deref_mut();
970 let mut compaction_group_manager = self.compaction_group_manager.write().await;
971
972 let groups = collect_normalize_group_statistics(
973 &versioning.current_version,
974 &compaction_group_manager,
975 )?;
976 let Some(current_plan) = build_normalize_plan_from_group_statistics(&groups) else {
977 return Ok(false);
978 };
979
980 if ¤t_plan != plan {
981 return Ok(false);
982 }
983
984 let (_table_ids_left, table_ids_right) = plan.split_table_ids();
985
986 let config = compaction_group_manager
987 .try_get_compaction_group_config(plan.parent_group_id)
988 .ok_or_else(|| {
989 Error::CompactionGroup(format!(
990 "parent group {} config not found",
991 plan.parent_group_id
992 ))
993 })?
994 .compaction_config()
995 .as_ref()
996 .clone();
997
998 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
999 let mut version = HummockVersionTransaction::new(
1000 &mut versioning.current_version,
1001 &mut versioning.hummock_version_deltas,
1002 &mut versioning.table_change_log,
1003 self.env.notification_manager(),
1004 None,
1005 &self.metrics,
1006 &self.env.opts,
1007 );
1008 let mut new_version_delta = version.new_delta();
1009 let split_key = plan.split_key();
1010 let split_sst_count = new_version_delta
1011 .latest_version()
1012 .count_new_ssts_in_group_split(plan.parent_group_id, split_key.clone());
1013 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
1014 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
1015
1016 #[expect(deprecated)]
1017 new_version_delta.group_deltas.insert(
1018 new_compaction_group_id,
1019 GroupDeltas {
1020 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
1021 group_config: Some(config.clone()),
1022 group_id: new_compaction_group_id,
1023 parent_group_id: plan.parent_group_id,
1024 new_sst_start_id,
1025 table_ids: vec![],
1026 version: CompatibilityVersion::LATEST as _,
1027 split_key: Some(split_key.into()),
1028 }))],
1029 },
1030 );
1031
1032 new_version_delta.with_latest_version(|version, new_version_delta| {
1033 for &table_id in &table_ids_right {
1034 let info = version
1035 .state_table_info
1036 .info()
1037 .get(&table_id)
1038 .expect("table should exist before normalize split");
1039 assert!(
1040 new_version_delta
1041 .state_table_info_delta
1042 .insert(
1043 table_id,
1044 PbStateTableInfoDelta {
1045 committed_epoch: info.committed_epoch,
1046 compaction_group_id: new_compaction_group_id,
1047 }
1048 )
1049 .is_none()
1050 );
1051 }
1052 });
1053 new_version_delta.pre_apply();
1054 compaction_groups_txn
1055 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
1056
1057 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
1058 versioning.mark_next_time_travel_version_snapshot();
1059
1060 (
1061 table_ids_right,
1062 plan.boundary_table_id,
1063 new_compaction_group_id,
1064 )
1065 };
1066
1067 self.cancel_expired_normalize_split_tasks(plan.parent_group_id)
1068 .await?;
1069 self.metrics
1070 .split_compaction_group_count
1071 .with_label_values(&[&plan.parent_group_id.to_string()])
1072 .inc();
1073 tracing::info!(
1074 "normalize split success: parent_group={} boundary_table_id={} moved_tables={:?} new_group_id={}",
1075 plan.parent_group_id,
1076 boundary_table_id,
1077 table_ids_right,
1078 new_compaction_group_id
1079 );
1080
1081 Ok(true)
1082 }
1083
1084 async fn cancel_expired_normalize_split_tasks(
1085 &self,
1086 parent_group_id: CompactionGroupId,
1087 ) -> Result<()> {
1088 let mut canceled_tasks = vec![];
1089 let compaction_guard = self.compaction.write().await;
1090 let mut versioning_guard = self.versioning.write().await;
1091 let versioning = versioning_guard.deref_mut();
1092 let compact_task_assignments =
1093 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
1094 let levels = versioning
1095 .current_version
1096 .get_compaction_group_levels(parent_group_id);
1097 compact_task_assignments
1098 .into_iter()
1099 .for_each(|task_assignment| {
1100 if let Some(task) = task_assignment.compact_task.as_ref()
1101 && is_compaction_task_expired(
1102 task.compaction_group_version_id,
1103 levels.compaction_group_version_id,
1104 )
1105 {
1106 canceled_tasks.push(ReportTask {
1107 task_id: task.task_id,
1108 task_status: TaskStatus::ManualCanceled,
1109 table_stats_change: HashMap::default(),
1110 sorted_output_ssts: vec![],
1111 object_timestamps: HashMap::default(),
1112 });
1113 }
1114 });
1115 canceled_tasks.sort_by_key(|task| task.task_id);
1116 canceled_tasks.dedup_by_key(|task| task.task_id);
1117
1118 if !canceled_tasks.is_empty() {
1119 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
1120 .await?;
1121 }
1122
1123 Ok(())
1124 }
1125
1126 pub async fn normalize_overlapping_compaction_groups(&self) -> Result<usize> {
1133 self.normalize_overlapping_compaction_groups_with_limit(usize::MAX)
1134 .await
1135 }
1136
1137 pub async fn normalize_overlapping_compaction_groups_with_limit(
1138 &self,
1139 max_splits: usize,
1140 ) -> Result<usize> {
1141 let mut split_count = 0usize;
1142 while split_count < max_splits {
1143 let Some(plan) = self.build_normalize_plan().await else {
1144 break;
1145 };
1146
1147 if !self.apply_normalize_plan(&plan).await? {
1148 tracing::debug!(
1149 parent_group_id = %plan.parent_group_id,
1150 boundary_table_id = %plan.boundary_table_id,
1151 "normalize plan became stale before apply"
1152 );
1153 break;
1154 }
1155 split_count += 1;
1156 }
1157
1158 Ok(split_count)
1159 }
1160
1161 pub async fn try_split_compaction_group(
1163 &self,
1164 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1165 group: CompactionGroupStatistic,
1166 ) {
1167 if group
1168 .compaction_group_config
1169 .compaction_config
1170 .disable_auto_group_scheduling
1171 .unwrap_or(false)
1172 {
1173 return;
1174 }
1175 for (table_id, table_size) in &group.table_statistic {
1177 self.try_move_high_throughput_table_to_dedicated_cg(
1178 table_write_throughput_statistic_manager,
1179 *table_id,
1180 table_size,
1181 group.group_id,
1182 )
1183 .await;
1184 }
1185
1186 self.try_split_huge_compaction_group(group).await;
1188 }
1189
1190 pub async fn try_move_high_throughput_table_to_dedicated_cg(
1192 &self,
1193 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1194 table_id: TableId,
1195 _table_size: &u64,
1196 parent_group_id: CompactionGroupId,
1197 ) {
1198 let mut table_throughput = table_write_throughput_statistic_manager
1199 .get_table_throughput_descending(
1200 table_id,
1201 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
1202 )
1203 .peekable();
1204
1205 if table_throughput.peek().is_none() {
1206 return;
1207 }
1208
1209 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
1210 table_throughput,
1211 self.env.opts.table_high_write_throughput_threshold,
1212 self.env
1213 .opts
1214 .table_stat_high_write_throughput_ratio_for_split,
1215 );
1216
1217 if !is_high_write_throughput {
1219 return;
1220 }
1221
1222 let ret = self
1223 .move_state_tables_to_dedicated_compaction_group(
1224 parent_group_id,
1225 &[table_id],
1226 Some(self.env.opts.partition_vnode_count),
1227 )
1228 .await;
1229 match ret {
1230 Ok(split_result) => {
1231 tracing::info!(
1232 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
1233 table_id,
1234 parent_group_id,
1235 self.env.opts.partition_vnode_count,
1236 split_result
1237 );
1238 }
1239 Err(e) => {
1240 tracing::info!(
1241 error = %e.as_report(),
1242 "failed to split state table [{}] from group-{}",
1243 table_id,
1244 parent_group_id,
1245 )
1246 }
1247 }
1248 }
1249
1250 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
1251 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
1252 * self.env.opts.split_group_size_ratio) as u64;
1253 let is_huge_hybrid_group =
1254 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
1256 let mut accumulated_size = 0;
1257 let mut table_ids = Vec::default();
1258 for (table_id, table_size) in &group.table_statistic {
1259 accumulated_size += table_size;
1260 table_ids.push(*table_id);
1261 assert!(table_ids.is_sorted());
1264 let remaining_size = group.group_size.saturating_sub(accumulated_size);
1265 if accumulated_size > group_max_size / 2
1266 && remaining_size > 0
1267 && table_ids.len() < group.table_statistic.len()
1268 {
1269 let ret = self
1270 .move_state_tables_to_dedicated_compaction_group(
1271 group.group_id,
1272 &table_ids,
1273 None,
1274 )
1275 .await;
1276 match ret {
1277 Ok(split_result) => {
1278 tracing::info!(
1279 "split_huge_compaction_group success {:?}",
1280 split_result
1281 );
1282 self.metrics
1283 .split_compaction_group_count
1284 .with_label_values(&[&group.group_id.to_string()])
1285 .inc();
1286 return;
1287 }
1288 Err(e) => {
1289 tracing::error!(
1290 error = %e.as_report(),
1291 "failed to split_huge_compaction_group table {:?} from group-{}",
1292 table_ids,
1293 group.group_id
1294 );
1295
1296 return;
1297 }
1298 }
1299 }
1300 }
1301 }
1302 }
1303
1304 pub async fn try_merge_compaction_group(
1305 &self,
1306 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1307 group: &CompactionGroupStatistic,
1308 next_group: &CompactionGroupStatistic,
1309 created_tables: &HashSet<TableId>,
1310 ) -> Result<()> {
1311 GroupMergeValidator::validate_group_merge(
1312 group,
1313 next_group,
1314 created_tables,
1315 table_write_throughput_statistic_manager,
1316 &self.env.opts,
1317 &self.versioning,
1318 )
1319 .await?;
1320
1321 let result = self
1322 .merge_compaction_group(group.group_id, next_group.group_id)
1323 .await;
1324
1325 match &result {
1326 Ok(()) => {
1327 tracing::info!(
1328 "merge group-{} to group-{}",
1329 next_group.group_id,
1330 group.group_id,
1331 );
1332
1333 self.metrics
1334 .merge_compaction_group_count
1335 .with_label_values(&[&group.group_id.to_string()])
1336 .inc();
1337 }
1338 Err(e) => {
1339 tracing::info!(
1340 error = %e.as_report(),
1341 "failed to merge group-{} group-{}",
1342 next_group.group_id,
1343 group.group_id,
1344 );
1345 }
1346 }
1347
1348 result
1349 }
1350}
1351
1352#[derive(Debug, Default)]
1353struct GroupMergeValidator {}
1354
1355impl GroupMergeValidator {
1356 fn is_merge_compatible_by_semantics(
1359 group: &CompactionGroupStatistic,
1360 next_group: &CompactionGroupStatistic,
1361 ) -> bool {
1362 let (mut left, mut right) = (
1363 group
1364 .compaction_group_config
1365 .compaction_config
1366 .as_ref()
1367 .clone(),
1368 next_group
1369 .compaction_group_config
1370 .compaction_config
1371 .as_ref()
1372 .clone(),
1373 );
1374 left.split_weight_by_vnode = 0;
1375 right.split_weight_by_vnode = 0;
1376 left == right
1377 }
1378
1379 pub fn is_table_high_write_throughput(
1381 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1382 threshold: u64,
1383 high_write_throughput_ratio: f64,
1384 ) -> bool {
1385 let mut sample_size = 0;
1386 let mut high_write_throughput_count = 0;
1387 for statistic in table_throughput {
1388 sample_size += 1;
1389 if statistic.throughput > threshold {
1390 high_write_throughput_count += 1;
1391 }
1392 }
1393
1394 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
1395 }
1396
1397 pub fn is_table_low_write_throughput(
1398 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1399 threshold: u64,
1400 low_write_throughput_ratio: f64,
1401 ) -> bool {
1402 let mut sample_size = 0;
1403 let mut low_write_throughput_count = 0;
1404 for statistic in table_throughput {
1405 sample_size += 1;
1406 if statistic.throughput <= threshold {
1407 low_write_throughput_count += 1;
1408 }
1409 }
1410
1411 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
1412 }
1413
1414 fn check_is_low_write_throughput_compaction_group(
1415 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1416 group: &CompactionGroupStatistic,
1417 opts: &Arc<MetaOpts>,
1418 ) -> bool {
1419 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1420 for table_id in group.table_statistic.keys() {
1421 let mut table_throughput = table_write_throughput_statistic_manager
1422 .get_table_throughput_descending(
1423 *table_id,
1424 opts.table_stat_throuput_window_seconds_for_merge as i64,
1425 )
1426 .peekable();
1427 if table_throughput.peek().is_none() {
1428 continue;
1429 }
1430
1431 table_with_statistic.push(table_throughput);
1432 }
1433
1434 if table_with_statistic.is_empty() {
1436 return true;
1437 }
1438
1439 table_with_statistic.into_iter().all(|table_throughput| {
1441 Self::is_table_low_write_throughput(
1442 table_throughput,
1443 opts.table_low_write_throughput_threshold,
1444 opts.table_stat_low_write_throughput_ratio_for_merge,
1445 )
1446 })
1447 }
1448
1449 fn check_is_creating_compaction_group(
1450 group: &CompactionGroupStatistic,
1451 created_tables: &HashSet<TableId>,
1452 ) -> bool {
1453 group
1454 .table_statistic
1455 .keys()
1456 .any(|table_id| !created_tables.contains(table_id))
1457 }
1458
1459 async fn validate_group_merge(
1460 group: &CompactionGroupStatistic,
1461 next_group: &CompactionGroupStatistic,
1462 created_tables: &HashSet<TableId>,
1463 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1464 opts: &Arc<MetaOpts>,
1465 versioning: &MonitoredRwLock<Versioning>,
1466 ) -> Result<()> {
1467 if (group.group_id == StaticCompactionGroupId::StateDefault
1469 && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1470 || (group.group_id == StaticCompactionGroupId::MaterializedView
1471 && next_group.group_id == StaticCompactionGroupId::StateDefault)
1472 {
1473 return Err(Error::CompactionGroup(format!(
1474 "group-{} and group-{} are both StaticCompactionGroupId",
1475 group.group_id, next_group.group_id
1476 )));
1477 }
1478
1479 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1480 return Err(Error::CompactionGroup(format!(
1481 "group-{} or group-{} is empty",
1482 group.group_id, next_group.group_id
1483 )));
1484 }
1485
1486 {
1491 let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1492 let mut table_ids_2: Vec<TableId> =
1493 next_group.table_statistic.keys().cloned().collect_vec();
1494 table_ids_1.sort();
1495 table_ids_2.sort();
1496 if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1497 std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1498 }
1499 if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1500 return Err(Error::CompactionGroup(format!(
1501 "group-{} and group-{} have overlapping table id ranges, not mergeable",
1502 group.group_id, next_group.group_id
1503 )));
1504 }
1505 }
1506
1507 if group
1508 .compaction_group_config
1509 .compaction_config
1510 .disable_auto_group_scheduling
1511 .unwrap_or(false)
1512 || next_group
1513 .compaction_group_config
1514 .compaction_config
1515 .disable_auto_group_scheduling
1516 .unwrap_or(false)
1517 {
1518 return Err(Error::CompactionGroup(format!(
1519 "group-{} or group-{} disable_auto_group_scheduling",
1520 group.group_id, next_group.group_id
1521 )));
1522 }
1523
1524 if !Self::is_merge_compatible_by_semantics(group, next_group) {
1527 let left_config = group.compaction_group_config.compaction_config.as_ref();
1528 let right_config = next_group
1529 .compaction_group_config
1530 .compaction_config
1531 .as_ref();
1532
1533 tracing::warn!(
1534 group_id = %group.group_id,
1535 next_group_id = %next_group.group_id,
1536 left_config = ?left_config,
1537 right_config = ?right_config,
1538 "compaction config semantic mismatch detected while merging compaction groups"
1539 );
1540
1541 return Err(Error::CompactionGroup(format!(
1542 "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1543 group.group_id, next_group.group_id, left_config, right_config
1544 )));
1545 }
1546
1547 if Self::check_is_creating_compaction_group(group, created_tables) {
1549 return Err(Error::CompactionGroup(format!(
1550 "Cannot merge creating group {} next_group {}",
1551 group.group_id, next_group.group_id
1552 )));
1553 }
1554
1555 if !Self::check_is_low_write_throughput_compaction_group(
1557 table_write_throughput_statistic_manager,
1558 group,
1559 opts,
1560 ) {
1561 return Err(Error::CompactionGroup(format!(
1562 "Cannot merge high throughput group {} next_group {}",
1563 group.group_id, next_group.group_id
1564 )));
1565 }
1566
1567 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1568 * opts.split_group_size_ratio) as u64;
1569
1570 if (group.group_size + next_group.group_size) > size_limit {
1571 return Err(Error::CompactionGroup(format!(
1572 "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1573 group.group_id,
1574 group.group_size,
1575 next_group.group_id,
1576 next_group.group_size,
1577 size_limit
1578 )));
1579 }
1580
1581 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1582 return Err(Error::CompactionGroup(format!(
1583 "Cannot merge creating group {} next group {}",
1584 group.group_id, next_group.group_id
1585 )));
1586 }
1587
1588 if !Self::check_is_low_write_throughput_compaction_group(
1589 table_write_throughput_statistic_manager,
1590 next_group,
1591 opts,
1592 ) {
1593 return Err(Error::CompactionGroup(format!(
1594 "Cannot merge high throughput group {} next group {}",
1595 group.group_id, next_group.group_id
1596 )));
1597 }
1598
1599 {
1600 let versioning_guard = versioning.read().await;
1602 let levels = &versioning_guard.current_version.levels;
1603 if !levels.contains_key(&group.group_id) {
1604 return Err(Error::CompactionGroup(format!(
1605 "Cannot merge group {} not exist",
1606 group.group_id
1607 )));
1608 }
1609
1610 if !levels.contains_key(&next_group.group_id) {
1611 return Err(Error::CompactionGroup(format!(
1612 "Cannot merge next group {} not exist",
1613 next_group.group_id
1614 )));
1615 }
1616
1617 let group_levels = versioning_guard
1618 .current_version
1619 .get_compaction_group_levels(group.group_id);
1620
1621 let next_group_levels = versioning_guard
1622 .current_version
1623 .get_compaction_group_levels(next_group.group_id);
1624
1625 let group_state = GroupStateValidator::group_state(
1626 group_levels,
1627 group.compaction_group_config.compaction_config().deref(),
1628 );
1629
1630 if group_state.is_write_stop() || group_state.is_emergency() {
1631 return Err(Error::CompactionGroup(format!(
1632 "Cannot merge write limit group {} next group {}",
1633 group.group_id, next_group.group_id
1634 )));
1635 }
1636
1637 let next_group_state = GroupStateValidator::group_state(
1638 next_group_levels,
1639 next_group
1640 .compaction_group_config
1641 .compaction_config()
1642 .deref(),
1643 );
1644
1645 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1646 return Err(Error::CompactionGroup(format!(
1647 "Cannot merge write limit next group {} group {}",
1648 next_group.group_id, group.group_id
1649 )));
1650 }
1651
1652 let l0_sub_level_count_after_merge =
1654 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1655 if GroupStateValidator::write_stop_l0_file_count(
1656 (l0_sub_level_count_after_merge as f64
1657 * opts.compaction_group_merge_dimension_threshold) as usize,
1658 group.compaction_group_config.compaction_config().deref(),
1659 ) {
1660 return Err(Error::CompactionGroup(format!(
1661 "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1662 group.group_id, next_group.group_id
1663 )));
1664 }
1665
1666 let l0_file_count_after_merge =
1667 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1668 if GroupStateValidator::write_stop_l0_file_count(
1669 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1670 as usize,
1671 group.compaction_group_config.compaction_config().deref(),
1672 ) {
1673 return Err(Error::CompactionGroup(format!(
1674 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1675 next_group.group_id, group.group_id
1676 )));
1677 }
1678
1679 let l0_size_after_merge =
1680 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1681
1682 if GroupStateValidator::write_stop_l0_size(
1683 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1684 as u64,
1685 group.compaction_group_config.compaction_config().deref(),
1686 ) {
1687 return Err(Error::CompactionGroup(format!(
1688 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1689 next_group.group_id, group.group_id
1690 )));
1691 }
1692
1693 if GroupStateValidator::emergency_l0_file_count(
1695 (l0_sub_level_count_after_merge as f64
1696 * opts.compaction_group_merge_dimension_threshold) as usize,
1697 group.compaction_group_config.compaction_config().deref(),
1698 ) {
1699 return Err(Error::CompactionGroup(format!(
1700 "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1701 group.group_id, next_group.group_id
1702 )));
1703 }
1704 }
1705
1706 Ok(())
1707 }
1708}