1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<TableId>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<TableId>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 #[expect(clippy::assertions_on_constants)]
122 {
123 assert!(cfg!(debug_assertions));
124 }
125 created_tables
126 } else {
127 match self.metadata_manager.get_created_table_ids().await {
128 Ok(created_tables) => HashSet::from_iter(created_tables),
129 Err(err) => {
130 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131 return Err(Error::CompactionGroup(format!(
132 "merge group_1 {} group_2 {} failed to fetch created table ids",
133 group_1, group_2
134 )));
135 }
136 }
137 };
138
139 fn contains_creating_table(
140 table_ids: &Vec<TableId>,
141 created_tables: &HashSet<TableId>,
142 ) -> bool {
143 table_ids
144 .iter()
145 .any(|table_id| !created_tables.contains(table_id))
146 }
147
148 if contains_creating_table(&member_table_ids_1, &created_tables)
150 || contains_creating_table(&member_table_ids_2, &created_tables)
151 {
152 return Err(Error::CompactionGroup(format!(
153 "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154 group_1, group_2, member_table_ids_1, member_table_ids_2
155 )));
156 }
157
158 let (left_group_id, right_group_id) =
160 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161 (group_1, group_2)
162 } else {
163 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164 (group_2, group_1)
165 };
166
167 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
172 return Err(Error::CompactionGroup(format!(
173 "invalid merge group_1 {} group_2 {}: table id ranges overlap",
174 left_group_id, right_group_id
175 )));
176 }
177
178 let combined_member_table_ids = member_table_ids_1
179 .iter()
180 .chain(member_table_ids_2.iter())
181 .collect_vec();
182 assert!(combined_member_table_ids.is_sorted());
183
184 let mut sst_id_set = HashSet::new();
186 for sst_id in versioning
187 .current_version
188 .get_sst_ids_by_group_id(left_group_id)
189 .chain(
190 versioning
191 .current_version
192 .get_sst_ids_by_group_id(right_group_id),
193 )
194 {
195 if !sst_id_set.insert(sst_id) {
196 return Err(Error::CompactionGroup(format!(
197 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
198 left_group_id, right_group_id, sst_id
199 )));
200 }
201 }
202
203 {
205 let left_levels = versioning
206 .current_version
207 .get_compaction_group_levels(group_1);
208
209 let right_levels = versioning
210 .current_version
211 .get_compaction_group_levels(group_2);
212
213 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
216 for level_idx in 1..=max_level {
217 let left_level = left_levels.get_level(level_idx);
218 let right_level = right_levels.get_level(level_idx);
219 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
220 continue;
221 }
222
223 let left_last_sst = left_level.table_infos.last().unwrap().clone();
224 let right_first_sst = right_level.table_infos.first().unwrap().clone();
225 let left_sst_id = left_last_sst.sst_id;
226 let right_sst_id = right_first_sst.sst_id;
227 let left_obj_id = left_last_sst.object_id;
228 let right_obj_id = right_first_sst.object_id;
229
230 if !can_concat(&[left_last_sst, right_first_sst]) {
232 return Err(Error::CompactionGroup(format!(
233 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
234 left_group_id,
235 right_group_id,
236 level_idx,
237 left_sst_id,
238 right_sst_id,
239 left_obj_id,
240 right_obj_id
241 )));
242 }
243 }
244 }
245
246 let mut version = HummockVersionTransaction::new(
247 &mut versioning.current_version,
248 &mut versioning.hummock_version_deltas,
249 self.env.notification_manager(),
250 None,
251 &self.metrics,
252 );
253 let mut new_version_delta = version.new_delta();
254
255 let target_compaction_group_id = {
256 new_version_delta.group_deltas.insert(
258 left_group_id,
259 GroupDeltas {
260 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
261 left_group_id,
262 right_group_id,
263 })],
264 },
265 );
266 left_group_id
267 };
268
269 new_version_delta.with_latest_version(|version, new_version_delta| {
272 for &table_id in combined_member_table_ids {
273 let info = version
274 .state_table_info
275 .info()
276 .get(&table_id)
277 .expect("have check exist previously");
278 assert!(
279 new_version_delta
280 .state_table_info_delta
281 .insert(
282 table_id,
283 PbStateTableInfoDelta {
284 committed_epoch: info.committed_epoch,
285 compaction_group_id: target_compaction_group_id,
286 }
287 )
288 .is_none()
289 );
290 }
291 });
292
293 {
294 let mut compaction_group_manager = self.compaction_group_manager.write().await;
295 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
296
297 {
299 let right_group_max_level = new_version_delta
300 .latest_version()
301 .get_compaction_group_levels(right_group_id)
302 .levels
303 .len();
304
305 remove_compaction_group_in_sst_stat(
306 &self.metrics,
307 right_group_id,
308 right_group_max_level,
309 );
310 }
311
312 self.compaction_state
314 .remove_compaction_group(right_group_id);
315
316 {
318 if let Err(err) = compaction_groups_txn.update_compaction_config(
319 &[left_group_id],
320 &[MutableConfig::SplitWeightByVnode(0)], ) {
322 tracing::error!(
323 error = %err.as_report(),
324 "failed to update compaction config for group-{}",
325 left_group_id
326 );
327 }
328 }
329
330 new_version_delta.pre_apply();
331
332 compaction_groups_txn.remove(right_group_id);
334 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
335 }
336
337 versioning.mark_next_time_travel_version_snapshot();
339
340 let mut canceled_tasks = vec![];
342 let compact_task_assignments =
345 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
346 compact_task_assignments
347 .into_iter()
348 .for_each(|task_assignment| {
349 if let Some(task) = task_assignment.compact_task.as_ref() {
350 assert_eq!(task.compaction_group_id, right_group_id);
351 canceled_tasks.push(ReportTask {
352 task_id: task.task_id,
353 task_status: TaskStatus::ManualCanceled,
354 table_stats_change: HashMap::default(),
355 sorted_output_ssts: vec![],
356 object_timestamps: HashMap::default(),
357 });
358 }
359 });
360
361 if !canceled_tasks.is_empty() {
362 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
363 .await?;
364 }
365
366 self.metrics
367 .merge_compaction_group_count
368 .with_label_values(&[&left_group_id.to_string()])
369 .inc();
370
371 Ok(())
372 }
373}
374
375impl HummockManager {
376 async fn split_compaction_group_impl(
388 &self,
389 parent_group_id: CompactionGroupId,
390 split_table_ids: &[StateTableId],
391 table_id_to_split: StateTableId,
392 vnode_to_split: VirtualNode,
393 partition_vnode_count: Option<u32>,
394 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
395 let mut result = vec![];
396 let compaction_guard = self.compaction.write().await;
397 let mut versioning_guard = self.versioning.write().await;
398 let versioning = versioning_guard.deref_mut();
399 if !versioning
401 .current_version
402 .levels
403 .contains_key(&parent_group_id)
404 {
405 return Err(Error::CompactionGroup(format!(
406 "invalid group {}",
407 parent_group_id
408 )));
409 }
410
411 let member_table_ids = versioning
412 .current_version
413 .state_table_info
414 .compaction_group_member_table_ids(parent_group_id)
415 .iter()
416 .copied()
417 .collect::<BTreeSet<_>>();
418
419 if !member_table_ids.contains(&table_id_to_split) {
420 return Err(Error::CompactionGroup(format!(
421 "table {} doesn't in group {}",
422 table_id_to_split, parent_group_id
423 )));
424 }
425
426 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
427
428 let table_ids = member_table_ids.into_iter().collect_vec();
430 if table_ids == split_table_ids {
431 return Err(Error::CompactionGroup(format!(
432 "invalid split attempt for group {}: all member tables are moved",
433 parent_group_id
434 )));
435 }
436 let (table_ids_left, table_ids_right) =
438 group_split::split_table_ids_with_table_id_and_vnode(
439 &table_ids,
440 split_full_key.user_key.table_id,
441 split_full_key.user_key.get_vnode_id(),
442 );
443 if table_ids_left.is_empty() || table_ids_right.is_empty() {
444 if !table_ids_left.is_empty() {
446 result.push((parent_group_id, table_ids_left));
447 }
448
449 if !table_ids_right.is_empty() {
450 result.push((parent_group_id, table_ids_right));
451 }
452 return Ok(result);
453 }
454
455 result.push((parent_group_id, table_ids_left));
456
457 let split_key: Bytes = split_full_key.encode().into();
458
459 let mut version = HummockVersionTransaction::new(
460 &mut versioning.current_version,
461 &mut versioning.hummock_version_deltas,
462 self.env.notification_manager(),
463 None,
464 &self.metrics,
465 );
466 let mut new_version_delta = version.new_delta();
467
468 let split_sst_count = new_version_delta
469 .latest_version()
470 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
471
472 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
473 let (new_compaction_group_id, config) = {
474 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
476 let config = self
478 .compaction_group_manager
479 .read()
480 .await
481 .try_get_compaction_group_config(parent_group_id)
482 .ok_or_else(|| {
483 Error::CompactionGroup(format!(
484 "parent group {} config not found",
485 parent_group_id
486 ))
487 })?
488 .compaction_config()
489 .as_ref()
490 .clone();
491
492 #[expect(deprecated)]
493 new_version_delta.group_deltas.insert(
495 new_compaction_group_id,
496 GroupDeltas {
497 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
498 group_config: Some(config.clone()),
499 group_id: new_compaction_group_id,
500 parent_group_id,
501 new_sst_start_id,
502 table_ids: vec![],
503 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
505 }))],
506 },
507 );
508 (new_compaction_group_id, config)
509 };
510
511 new_version_delta.with_latest_version(|version, new_version_delta| {
512 for &table_id in &table_ids_right {
513 let info = version
514 .state_table_info
515 .info()
516 .get(&table_id)
517 .expect("have check exist previously");
518 assert!(
519 new_version_delta
520 .state_table_info_delta
521 .insert(
522 table_id,
523 PbStateTableInfoDelta {
524 committed_epoch: info.committed_epoch,
525 compaction_group_id: new_compaction_group_id,
526 }
527 )
528 .is_none()
529 );
530 }
531 });
532
533 result.push((new_compaction_group_id, table_ids_right));
534
535 {
536 let mut compaction_group_manager = self.compaction_group_manager.write().await;
537 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
538 compaction_groups_txn
539 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
540
541 for (cg_id, table_ids) in &result {
545 if let Some(partition_vnode_count) = partition_vnode_count
547 && table_ids.len() == 1
548 && table_ids == split_table_ids
549 && let Err(err) = compaction_groups_txn.update_compaction_config(
550 &[*cg_id],
551 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
552 )
553 {
554 tracing::error!(
555 error = %err.as_report(),
556 "failed to update compaction config for group-{}",
557 cg_id
558 );
559 }
560 }
561
562 new_version_delta.pre_apply();
563 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
564 }
565 versioning.mark_next_time_travel_version_snapshot();
567
568 let mut canceled_tasks = vec![];
571 let compact_task_assignments =
572 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
573 let levels = versioning
574 .current_version
575 .get_compaction_group_levels(parent_group_id);
576 compact_task_assignments
577 .into_iter()
578 .for_each(|task_assignment| {
579 if let Some(task) = task_assignment.compact_task.as_ref() {
580 let is_expired = is_compaction_task_expired(
581 task.compaction_group_version_id,
582 levels.compaction_group_version_id,
583 );
584 if is_expired {
585 canceled_tasks.push(ReportTask {
586 task_id: task.task_id,
587 task_status: TaskStatus::ManualCanceled,
588 table_stats_change: HashMap::default(),
589 sorted_output_ssts: vec![],
590 object_timestamps: HashMap::default(),
591 });
592 }
593 }
594 });
595
596 if !canceled_tasks.is_empty() {
597 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
598 .await?;
599 }
600
601 self.metrics
602 .split_compaction_group_count
603 .with_label_values(&[&parent_group_id.to_string()])
604 .inc();
605
606 Ok(result)
607 }
608
609 pub async fn move_state_tables_to_dedicated_compaction_group(
612 &self,
613 parent_group_id: CompactionGroupId,
614 table_ids: &[StateTableId],
615 partition_vnode_count: Option<u32>,
616 ) -> Result<(
617 CompactionGroupId,
618 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
619 )> {
620 if table_ids.is_empty() {
621 return Err(Error::CompactionGroup(
622 "table_ids must not be empty".to_owned(),
623 ));
624 }
625
626 if !table_ids.is_sorted() {
627 return Err(Error::CompactionGroup(
628 "table_ids must be sorted".to_owned(),
629 ));
630 }
631
632 let parent_table_ids = {
633 let versioning_guard = self.versioning.read().await;
634 versioning_guard
635 .current_version
636 .state_table_info
637 .compaction_group_member_table_ids(parent_group_id)
638 .iter()
639 .copied()
640 .collect_vec()
641 };
642
643 if parent_table_ids == table_ids {
644 return Err(Error::CompactionGroup(format!(
645 "invalid split attempt for group {}: all member tables are moved",
646 parent_group_id
647 )));
648 }
649
650 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
651 {
653 cg_id_to_table_ids
654 .iter()
655 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
656 }
657
658 {
660 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
661 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
662 assert!(table_table_ids_vec.concat().is_sorted());
663 }
664
665 {
667 let mut all_table_ids = HashSet::new();
668 for table_ids in cg_id_to_table_ids.values() {
669 for table_id in table_ids {
670 assert!(all_table_ids.insert(*table_id));
671 }
672 }
673 }
674 }
675
676 let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
686 let table_id_to_split = *table_ids.first().unwrap();
687 let mut target_compaction_group_id: CompactionGroupId = 0.into();
688 let result_vec = self
689 .split_compaction_group_impl(
690 parent_group_id,
691 table_ids,
692 table_id_to_split,
693 VirtualNode::ZERO,
694 partition_vnode_count,
695 )
696 .await?;
697 assert!(result_vec.len() <= 2);
698
699 let mut finish_move = false;
700 for (cg_id, table_ids_after_split) in result_vec {
701 if table_ids_after_split.contains(&table_id_to_split) {
702 target_compaction_group_id = cg_id;
703 }
704
705 if table_ids_after_split == table_ids {
706 finish_move = true;
707 }
708
709 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
710 }
711 check_table_ids_valid(&cg_id_to_table_ids);
712
713 if finish_move {
714 return Ok((target_compaction_group_id, cg_id_to_table_ids));
715 }
716
717 let table_id_to_split = *table_ids.last().unwrap();
720 let result_vec = self
721 .split_compaction_group_impl(
722 target_compaction_group_id,
723 table_ids,
724 table_id_to_split,
725 VirtualNode::MAX_REPRESENTABLE,
726 partition_vnode_count,
727 )
728 .await?;
729 assert!(result_vec.len() <= 2);
730 for (cg_id, table_ids_after_split) in result_vec {
731 if table_ids_after_split.contains(&table_id_to_split) {
732 target_compaction_group_id = cg_id;
733 }
734 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
735 }
736 check_table_ids_valid(&cg_id_to_table_ids);
737
738 Ok((target_compaction_group_id, cg_id_to_table_ids))
739 }
740}
741
742impl HummockManager {
743 pub async fn try_split_compaction_group(
745 &self,
746 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
747 group: CompactionGroupStatistic,
748 ) {
749 if group
750 .compaction_group_config
751 .compaction_config
752 .disable_auto_group_scheduling
753 .unwrap_or(false)
754 {
755 return;
756 }
757 for (table_id, table_size) in &group.table_statistic {
759 self.try_move_high_throughput_table_to_dedicated_cg(
760 table_write_throughput_statistic_manager,
761 *table_id,
762 table_size,
763 group.group_id,
764 )
765 .await;
766 }
767
768 self.try_split_huge_compaction_group(group).await;
770 }
771
772 pub async fn try_move_high_throughput_table_to_dedicated_cg(
774 &self,
775 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
776 table_id: TableId,
777 _table_size: &u64,
778 parent_group_id: CompactionGroupId,
779 ) {
780 let mut table_throughput = table_write_throughput_statistic_manager
781 .get_table_throughput_descending(
782 table_id,
783 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
784 )
785 .peekable();
786
787 if table_throughput.peek().is_none() {
788 return;
789 }
790
791 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
792 table_throughput,
793 self.env.opts.table_high_write_throughput_threshold,
794 self.env
795 .opts
796 .table_stat_high_write_throughput_ratio_for_split,
797 );
798
799 if !is_high_write_throughput {
801 return;
802 }
803
804 let ret = self
805 .move_state_tables_to_dedicated_compaction_group(
806 parent_group_id,
807 &[table_id],
808 Some(self.env.opts.partition_vnode_count),
809 )
810 .await;
811 match ret {
812 Ok(split_result) => {
813 tracing::info!(
814 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
815 table_id,
816 parent_group_id,
817 self.env.opts.partition_vnode_count,
818 split_result
819 );
820 }
821 Err(e) => {
822 tracing::info!(
823 error = %e.as_report(),
824 "failed to split state table [{}] from group-{}",
825 table_id,
826 parent_group_id,
827 )
828 }
829 }
830 }
831
832 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
833 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
834 * self.env.opts.split_group_size_ratio) as u64;
835 let is_huge_hybrid_group =
836 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
838 let mut accumulated_size = 0;
839 let mut table_ids = Vec::default();
840 for (table_id, table_size) in &group.table_statistic {
841 accumulated_size += table_size;
842 table_ids.push(*table_id);
843 assert!(table_ids.is_sorted());
846 let remaining_size = group.group_size.saturating_sub(accumulated_size);
847 if accumulated_size > group_max_size / 2
848 && remaining_size > 0
849 && table_ids.len() < group.table_statistic.len()
850 {
851 let ret = self
852 .move_state_tables_to_dedicated_compaction_group(
853 group.group_id,
854 &table_ids,
855 None,
856 )
857 .await;
858 match ret {
859 Ok(split_result) => {
860 tracing::info!(
861 "split_huge_compaction_group success {:?}",
862 split_result
863 );
864 self.metrics
865 .split_compaction_group_count
866 .with_label_values(&[&group.group_id.to_string()])
867 .inc();
868 return;
869 }
870 Err(e) => {
871 tracing::error!(
872 error = %e.as_report(),
873 "failed to split_huge_compaction_group table {:?} from group-{}",
874 table_ids,
875 group.group_id
876 );
877
878 return;
879 }
880 }
881 }
882 }
883 }
884 }
885
886 pub async fn try_merge_compaction_group(
887 &self,
888 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
889 group: &CompactionGroupStatistic,
890 next_group: &CompactionGroupStatistic,
891 created_tables: &HashSet<TableId>,
892 ) -> Result<()> {
893 GroupMergeValidator::validate_group_merge(
894 group,
895 next_group,
896 created_tables,
897 table_write_throughput_statistic_manager,
898 &self.env.opts,
899 &self.versioning,
900 )
901 .await?;
902
903 let result = self
904 .merge_compaction_group(group.group_id, next_group.group_id)
905 .await;
906
907 match &result {
908 Ok(()) => {
909 tracing::info!(
910 "merge group-{} to group-{}",
911 next_group.group_id,
912 group.group_id,
913 );
914
915 self.metrics
916 .merge_compaction_group_count
917 .with_label_values(&[&group.group_id.to_string()])
918 .inc();
919 }
920 Err(e) => {
921 tracing::info!(
922 error = %e.as_report(),
923 "failed to merge group-{} group-{}",
924 next_group.group_id,
925 group.group_id,
926 );
927 }
928 }
929
930 result
931 }
932}
933
934#[derive(Debug, Default)]
935struct GroupMergeValidator {}
936
937impl GroupMergeValidator {
938 fn is_merge_compatible_by_semantics(
941 group: &CompactionGroupStatistic,
942 next_group: &CompactionGroupStatistic,
943 ) -> bool {
944 let (mut left, mut right) = (
945 group
946 .compaction_group_config
947 .compaction_config
948 .as_ref()
949 .clone(),
950 next_group
951 .compaction_group_config
952 .compaction_config
953 .as_ref()
954 .clone(),
955 );
956 left.split_weight_by_vnode = 0;
957 right.split_weight_by_vnode = 0;
958 left == right
959 }
960
961 pub fn is_table_high_write_throughput(
963 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
964 threshold: u64,
965 high_write_throughput_ratio: f64,
966 ) -> bool {
967 let mut sample_size = 0;
968 let mut high_write_throughput_count = 0;
969 for statistic in table_throughput {
970 sample_size += 1;
971 if statistic.throughput > threshold {
972 high_write_throughput_count += 1;
973 }
974 }
975
976 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
977 }
978
979 pub fn is_table_low_write_throughput(
980 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
981 threshold: u64,
982 low_write_throughput_ratio: f64,
983 ) -> bool {
984 let mut sample_size = 0;
985 let mut low_write_throughput_count = 0;
986 for statistic in table_throughput {
987 sample_size += 1;
988 if statistic.throughput <= threshold {
989 low_write_throughput_count += 1;
990 }
991 }
992
993 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
994 }
995
996 fn check_is_low_write_throughput_compaction_group(
997 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
998 group: &CompactionGroupStatistic,
999 opts: &Arc<MetaOpts>,
1000 ) -> bool {
1001 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1002 for table_id in group.table_statistic.keys() {
1003 let mut table_throughput = table_write_throughput_statistic_manager
1004 .get_table_throughput_descending(
1005 *table_id,
1006 opts.table_stat_throuput_window_seconds_for_merge as i64,
1007 )
1008 .peekable();
1009 if table_throughput.peek().is_none() {
1010 continue;
1011 }
1012
1013 table_with_statistic.push(table_throughput);
1014 }
1015
1016 if table_with_statistic.is_empty() {
1018 return true;
1019 }
1020
1021 table_with_statistic.into_iter().all(|table_throughput| {
1023 Self::is_table_low_write_throughput(
1024 table_throughput,
1025 opts.table_low_write_throughput_threshold,
1026 opts.table_stat_low_write_throughput_ratio_for_merge,
1027 )
1028 })
1029 }
1030
1031 fn check_is_creating_compaction_group(
1032 group: &CompactionGroupStatistic,
1033 created_tables: &HashSet<TableId>,
1034 ) -> bool {
1035 group
1036 .table_statistic
1037 .keys()
1038 .any(|table_id| !created_tables.contains(table_id))
1039 }
1040
1041 async fn validate_group_merge(
1042 group: &CompactionGroupStatistic,
1043 next_group: &CompactionGroupStatistic,
1044 created_tables: &HashSet<TableId>,
1045 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1046 opts: &Arc<MetaOpts>,
1047 versioning: &MonitoredRwLock<Versioning>,
1048 ) -> Result<()> {
1049 if (group.group_id == StaticCompactionGroupId::StateDefault
1051 && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1052 || (group.group_id == StaticCompactionGroupId::MaterializedView
1053 && next_group.group_id == StaticCompactionGroupId::StateDefault)
1054 {
1055 return Err(Error::CompactionGroup(format!(
1056 "group-{} and group-{} are both StaticCompactionGroupId",
1057 group.group_id, next_group.group_id
1058 )));
1059 }
1060
1061 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1062 return Err(Error::CompactionGroup(format!(
1063 "group-{} or group-{} is empty",
1064 group.group_id, next_group.group_id
1065 )));
1066 }
1067
1068 {
1073 let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1074 let mut table_ids_2: Vec<TableId> =
1075 next_group.table_statistic.keys().cloned().collect_vec();
1076 table_ids_1.sort();
1077 table_ids_2.sort();
1078 if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1079 std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1080 }
1081 if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1082 return Err(Error::CompactionGroup(format!(
1083 "group-{} and group-{} have overlapping table id ranges, not mergeable",
1084 group.group_id, next_group.group_id
1085 )));
1086 }
1087 }
1088
1089 if group
1090 .compaction_group_config
1091 .compaction_config
1092 .disable_auto_group_scheduling
1093 .unwrap_or(false)
1094 || next_group
1095 .compaction_group_config
1096 .compaction_config
1097 .disable_auto_group_scheduling
1098 .unwrap_or(false)
1099 {
1100 return Err(Error::CompactionGroup(format!(
1101 "group-{} or group-{} disable_auto_group_scheduling",
1102 group.group_id, next_group.group_id
1103 )));
1104 }
1105
1106 if !Self::is_merge_compatible_by_semantics(group, next_group) {
1109 let left_config = group.compaction_group_config.compaction_config.as_ref();
1110 let right_config = next_group
1111 .compaction_group_config
1112 .compaction_config
1113 .as_ref();
1114
1115 tracing::warn!(
1116 group_id = %group.group_id,
1117 next_group_id = %next_group.group_id,
1118 left_config = ?left_config,
1119 right_config = ?right_config,
1120 "compaction config semantic mismatch detected while merging compaction groups"
1121 );
1122
1123 return Err(Error::CompactionGroup(format!(
1124 "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1125 group.group_id, next_group.group_id, left_config, right_config
1126 )));
1127 }
1128
1129 if Self::check_is_creating_compaction_group(group, created_tables) {
1131 return Err(Error::CompactionGroup(format!(
1132 "Cannot merge creating group {} next_group {}",
1133 group.group_id, next_group.group_id
1134 )));
1135 }
1136
1137 if !Self::check_is_low_write_throughput_compaction_group(
1139 table_write_throughput_statistic_manager,
1140 group,
1141 opts,
1142 ) {
1143 return Err(Error::CompactionGroup(format!(
1144 "Cannot merge high throughput group {} next_group {}",
1145 group.group_id, next_group.group_id
1146 )));
1147 }
1148
1149 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1150 * opts.split_group_size_ratio) as u64;
1151
1152 if (group.group_size + next_group.group_size) > size_limit {
1153 return Err(Error::CompactionGroup(format!(
1154 "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1155 group.group_id,
1156 group.group_size,
1157 next_group.group_id,
1158 next_group.group_size,
1159 size_limit
1160 )));
1161 }
1162
1163 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1164 return Err(Error::CompactionGroup(format!(
1165 "Cannot merge creating group {} next group {}",
1166 group.group_id, next_group.group_id
1167 )));
1168 }
1169
1170 if !Self::check_is_low_write_throughput_compaction_group(
1171 table_write_throughput_statistic_manager,
1172 next_group,
1173 opts,
1174 ) {
1175 return Err(Error::CompactionGroup(format!(
1176 "Cannot merge high throughput group {} next group {}",
1177 group.group_id, next_group.group_id
1178 )));
1179 }
1180
1181 {
1182 let versioning_guard = versioning.read().await;
1184 let levels = &versioning_guard.current_version.levels;
1185 if !levels.contains_key(&group.group_id) {
1186 return Err(Error::CompactionGroup(format!(
1187 "Cannot merge group {} not exist",
1188 group.group_id
1189 )));
1190 }
1191
1192 if !levels.contains_key(&next_group.group_id) {
1193 return Err(Error::CompactionGroup(format!(
1194 "Cannot merge next group {} not exist",
1195 next_group.group_id
1196 )));
1197 }
1198
1199 let group_levels = versioning_guard
1200 .current_version
1201 .get_compaction_group_levels(group.group_id);
1202
1203 let next_group_levels = versioning_guard
1204 .current_version
1205 .get_compaction_group_levels(next_group.group_id);
1206
1207 let group_state = GroupStateValidator::group_state(
1208 group_levels,
1209 group.compaction_group_config.compaction_config().deref(),
1210 );
1211
1212 if group_state.is_write_stop() || group_state.is_emergency() {
1213 return Err(Error::CompactionGroup(format!(
1214 "Cannot merge write limit group {} next group {}",
1215 group.group_id, next_group.group_id
1216 )));
1217 }
1218
1219 let next_group_state = GroupStateValidator::group_state(
1220 next_group_levels,
1221 next_group
1222 .compaction_group_config
1223 .compaction_config()
1224 .deref(),
1225 );
1226
1227 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1228 return Err(Error::CompactionGroup(format!(
1229 "Cannot merge write limit next group {} group {}",
1230 next_group.group_id, group.group_id
1231 )));
1232 }
1233
1234 let l0_sub_level_count_after_merge =
1236 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1237 if GroupStateValidator::write_stop_l0_file_count(
1238 (l0_sub_level_count_after_merge as f64
1239 * opts.compaction_group_merge_dimension_threshold) as usize,
1240 group.compaction_group_config.compaction_config().deref(),
1241 ) {
1242 return Err(Error::CompactionGroup(format!(
1243 "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1244 group.group_id, next_group.group_id
1245 )));
1246 }
1247
1248 let l0_file_count_after_merge =
1249 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1250 if GroupStateValidator::write_stop_l0_file_count(
1251 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1252 as usize,
1253 group.compaction_group_config.compaction_config().deref(),
1254 ) {
1255 return Err(Error::CompactionGroup(format!(
1256 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1257 next_group.group_id, group.group_id
1258 )));
1259 }
1260
1261 let l0_size_after_merge =
1262 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1263
1264 if GroupStateValidator::write_stop_l0_size(
1265 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1266 as u64,
1267 group.compaction_group_config.compaction_config().deref(),
1268 ) {
1269 return Err(Error::CompactionGroup(format!(
1270 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1271 next_group.group_id, group.group_id
1272 )));
1273 }
1274
1275 if GroupStateValidator::emergency_l0_file_count(
1277 (l0_sub_level_count_after_merge as f64
1278 * opts.compaction_group_merge_dimension_threshold) as usize,
1279 group.compaction_group_config.compaction_config().deref(),
1280 ) {
1281 return Err(Error::CompactionGroup(format!(
1282 "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1283 group.group_id, next_group.group_id
1284 )));
1285 }
1286 }
1287
1288 Ok(())
1289 }
1290}