1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<TableId>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<TableId>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 #[expect(clippy::assertions_on_constants)]
122 {
123 assert!(cfg!(debug_assertions));
124 }
125 created_tables
126 } else {
127 match self.metadata_manager.get_created_table_ids().await {
128 Ok(created_tables) => HashSet::from_iter(created_tables),
129 Err(err) => {
130 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131 return Err(Error::CompactionGroup(format!(
132 "merge group_1 {} group_2 {} failed to fetch created table ids",
133 group_1, group_2
134 )));
135 }
136 }
137 };
138
139 fn contains_creating_table(
140 table_ids: &Vec<TableId>,
141 created_tables: &HashSet<TableId>,
142 ) -> bool {
143 table_ids
144 .iter()
145 .any(|table_id| !created_tables.contains(table_id))
146 }
147
148 if contains_creating_table(&member_table_ids_1, &created_tables)
150 || contains_creating_table(&member_table_ids_2, &created_tables)
151 {
152 return Err(Error::CompactionGroup(format!(
153 "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154 group_1, group_2, member_table_ids_1, member_table_ids_2
155 )));
156 }
157
158 let (left_group_id, right_group_id) =
160 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161 (group_1, group_2)
162 } else {
163 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164 (group_2, group_1)
165 };
166
167 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169 return Err(Error::CompactionGroup(format!(
170 "invalid merge group_1 {} group_2 {}",
171 left_group_id, right_group_id
172 )));
173 }
174
175 let combined_member_table_ids = member_table_ids_1
176 .iter()
177 .chain(member_table_ids_2.iter())
178 .collect_vec();
179 assert!(combined_member_table_ids.is_sorted());
180
181 let mut sst_id_set = HashSet::new();
183 for sst_id in versioning
184 .current_version
185 .get_sst_ids_by_group_id(left_group_id)
186 .chain(
187 versioning
188 .current_version
189 .get_sst_ids_by_group_id(right_group_id),
190 )
191 {
192 if !sst_id_set.insert(sst_id) {
193 return Err(Error::CompactionGroup(format!(
194 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195 left_group_id, right_group_id, sst_id
196 )));
197 }
198 }
199
200 {
202 let left_levels = versioning
203 .current_version
204 .get_compaction_group_levels(group_1);
205
206 let right_levels = versioning
207 .current_version
208 .get_compaction_group_levels(group_2);
209
210 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213 for level_idx in 1..=max_level {
214 let left_level = left_levels.get_level(level_idx);
215 let right_level = right_levels.get_level(level_idx);
216 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217 continue;
218 }
219
220 let left_last_sst = left_level.table_infos.last().unwrap().clone();
221 let right_first_sst = right_level.table_infos.first().unwrap().clone();
222 let left_sst_id = left_last_sst.sst_id;
223 let right_sst_id = right_first_sst.sst_id;
224 let left_obj_id = left_last_sst.object_id;
225 let right_obj_id = right_first_sst.object_id;
226
227 if !can_concat(&[left_last_sst, right_first_sst]) {
229 return Err(Error::CompactionGroup(format!(
230 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231 left_group_id,
232 right_group_id,
233 level_idx,
234 left_sst_id,
235 right_sst_id,
236 left_obj_id,
237 right_obj_id
238 )));
239 }
240 }
241 }
242
243 let mut version = HummockVersionTransaction::new(
244 &mut versioning.current_version,
245 &mut versioning.hummock_version_deltas,
246 self.env.notification_manager(),
247 None,
248 &self.metrics,
249 );
250 let mut new_version_delta = version.new_delta();
251
252 let target_compaction_group_id = {
253 new_version_delta.group_deltas.insert(
255 left_group_id,
256 GroupDeltas {
257 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258 left_group_id,
259 right_group_id,
260 })],
261 },
262 );
263 left_group_id
264 };
265
266 new_version_delta.with_latest_version(|version, new_version_delta| {
269 for &table_id in combined_member_table_ids {
270 let info = version
271 .state_table_info
272 .info()
273 .get(&table_id)
274 .expect("have check exist previously");
275 assert!(
276 new_version_delta
277 .state_table_info_delta
278 .insert(
279 table_id,
280 PbStateTableInfoDelta {
281 committed_epoch: info.committed_epoch,
282 compaction_group_id: target_compaction_group_id,
283 }
284 )
285 .is_none()
286 );
287 }
288 });
289
290 {
291 let mut compaction_group_manager = self.compaction_group_manager.write().await;
292 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
293
294 {
296 let right_group_max_level = new_version_delta
297 .latest_version()
298 .get_compaction_group_levels(right_group_id)
299 .levels
300 .len();
301
302 remove_compaction_group_in_sst_stat(
303 &self.metrics,
304 right_group_id,
305 right_group_max_level,
306 );
307 }
308
309 {
311 if let Err(err) = compaction_groups_txn.update_compaction_config(
312 &[left_group_id],
313 &[MutableConfig::SplitWeightByVnode(0)], ) {
315 tracing::error!(
316 error = %err.as_report(),
317 "failed to update compaction config for group-{}",
318 left_group_id
319 );
320 }
321 }
322
323 new_version_delta.pre_apply();
324
325 compaction_groups_txn.remove(right_group_id);
327 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
328 }
329
330 versioning.mark_next_time_travel_version_snapshot();
332
333 let mut canceled_tasks = vec![];
335 let compact_task_assignments =
338 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
339 compact_task_assignments
340 .into_iter()
341 .for_each(|task_assignment| {
342 if let Some(task) = task_assignment.compact_task.as_ref() {
343 assert_eq!(task.compaction_group_id, right_group_id);
344 canceled_tasks.push(ReportTask {
345 task_id: task.task_id,
346 task_status: TaskStatus::ManualCanceled,
347 table_stats_change: HashMap::default(),
348 sorted_output_ssts: vec![],
349 object_timestamps: HashMap::default(),
350 });
351 }
352 });
353
354 if !canceled_tasks.is_empty() {
355 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
356 .await?;
357 }
358
359 self.metrics
360 .merge_compaction_group_count
361 .with_label_values(&[&left_group_id.to_string()])
362 .inc();
363
364 Ok(())
365 }
366}
367
368impl HummockManager {
369 async fn split_compaction_group_impl(
381 &self,
382 parent_group_id: CompactionGroupId,
383 split_table_ids: &[StateTableId],
384 table_id_to_split: StateTableId,
385 vnode_to_split: VirtualNode,
386 partition_vnode_count: Option<u32>,
387 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
388 let mut result = vec![];
389 let compaction_guard = self.compaction.write().await;
390 let mut versioning_guard = self.versioning.write().await;
391 let versioning = versioning_guard.deref_mut();
392 if !versioning
394 .current_version
395 .levels
396 .contains_key(&parent_group_id)
397 {
398 return Err(Error::CompactionGroup(format!(
399 "invalid group {}",
400 parent_group_id
401 )));
402 }
403
404 let member_table_ids = versioning
405 .current_version
406 .state_table_info
407 .compaction_group_member_table_ids(parent_group_id)
408 .iter()
409 .copied()
410 .collect::<BTreeSet<_>>();
411
412 if !member_table_ids.contains(&table_id_to_split) {
413 return Err(Error::CompactionGroup(format!(
414 "table {} doesn't in group {}",
415 table_id_to_split, parent_group_id
416 )));
417 }
418
419 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
420
421 let table_ids = member_table_ids.into_iter().collect_vec();
423 if table_ids == split_table_ids {
424 return Err(Error::CompactionGroup(format!(
425 "invalid split attempt for group {}: all member tables are moved",
426 parent_group_id
427 )));
428 }
429 let (table_ids_left, table_ids_right) =
431 group_split::split_table_ids_with_table_id_and_vnode(
432 &table_ids,
433 split_full_key.user_key.table_id,
434 split_full_key.user_key.get_vnode_id(),
435 );
436 if table_ids_left.is_empty() || table_ids_right.is_empty() {
437 if !table_ids_left.is_empty() {
439 result.push((parent_group_id, table_ids_left));
440 }
441
442 if !table_ids_right.is_empty() {
443 result.push((parent_group_id, table_ids_right));
444 }
445 return Ok(result);
446 }
447
448 result.push((parent_group_id, table_ids_left));
449
450 let split_key: Bytes = split_full_key.encode().into();
451
452 let mut version = HummockVersionTransaction::new(
453 &mut versioning.current_version,
454 &mut versioning.hummock_version_deltas,
455 self.env.notification_manager(),
456 None,
457 &self.metrics,
458 );
459 let mut new_version_delta = version.new_delta();
460
461 let split_sst_count = new_version_delta
462 .latest_version()
463 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
464
465 let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
466 let (new_compaction_group_id, config) = {
467 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
469 let config = self
471 .compaction_group_manager
472 .read()
473 .await
474 .try_get_compaction_group_config(parent_group_id)
475 .ok_or_else(|| {
476 Error::CompactionGroup(format!(
477 "parent group {} config not found",
478 parent_group_id
479 ))
480 })?
481 .compaction_config()
482 .as_ref()
483 .clone();
484
485 #[expect(deprecated)]
486 new_version_delta.group_deltas.insert(
488 new_compaction_group_id,
489 GroupDeltas {
490 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
491 group_config: Some(config.clone()),
492 group_id: new_compaction_group_id,
493 parent_group_id,
494 new_sst_start_id: new_sst_start_id.inner(),
495 table_ids: vec![],
496 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
498 }))],
499 },
500 );
501 (new_compaction_group_id, config)
502 };
503
504 new_version_delta.with_latest_version(|version, new_version_delta| {
505 for &table_id in &table_ids_right {
506 let info = version
507 .state_table_info
508 .info()
509 .get(&table_id)
510 .expect("have check exist previously");
511 assert!(
512 new_version_delta
513 .state_table_info_delta
514 .insert(
515 table_id,
516 PbStateTableInfoDelta {
517 committed_epoch: info.committed_epoch,
518 compaction_group_id: new_compaction_group_id,
519 }
520 )
521 .is_none()
522 );
523 }
524 });
525
526 result.push((new_compaction_group_id, table_ids_right));
527
528 {
529 let mut compaction_group_manager = self.compaction_group_manager.write().await;
530 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
531 compaction_groups_txn
532 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
533
534 for (cg_id, table_ids) in &result {
538 if let Some(partition_vnode_count) = partition_vnode_count
540 && table_ids.len() == 1
541 && table_ids == split_table_ids
542 && let Err(err) = compaction_groups_txn.update_compaction_config(
543 &[*cg_id],
544 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
545 )
546 {
547 tracing::error!(
548 error = %err.as_report(),
549 "failed to update compaction config for group-{}",
550 cg_id
551 );
552 }
553 }
554
555 new_version_delta.pre_apply();
556 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
557 }
558 versioning.mark_next_time_travel_version_snapshot();
560
561 let mut canceled_tasks = vec![];
564 let compact_task_assignments =
565 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
566 let levels = versioning
567 .current_version
568 .get_compaction_group_levels(parent_group_id);
569 compact_task_assignments
570 .into_iter()
571 .for_each(|task_assignment| {
572 if let Some(task) = task_assignment.compact_task.as_ref() {
573 let is_expired = is_compaction_task_expired(
574 task.compaction_group_version_id,
575 levels.compaction_group_version_id,
576 );
577 if is_expired {
578 canceled_tasks.push(ReportTask {
579 task_id: task.task_id,
580 task_status: TaskStatus::ManualCanceled,
581 table_stats_change: HashMap::default(),
582 sorted_output_ssts: vec![],
583 object_timestamps: HashMap::default(),
584 });
585 }
586 }
587 });
588
589 if !canceled_tasks.is_empty() {
590 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
591 .await?;
592 }
593
594 self.metrics
595 .split_compaction_group_count
596 .with_label_values(&[&parent_group_id.to_string()])
597 .inc();
598
599 Ok(result)
600 }
601
602 pub async fn move_state_tables_to_dedicated_compaction_group(
605 &self,
606 parent_group_id: CompactionGroupId,
607 table_ids: &[StateTableId],
608 partition_vnode_count: Option<u32>,
609 ) -> Result<(
610 CompactionGroupId,
611 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
612 )> {
613 if table_ids.is_empty() {
614 return Err(Error::CompactionGroup(
615 "table_ids must not be empty".to_owned(),
616 ));
617 }
618
619 if !table_ids.is_sorted() {
620 return Err(Error::CompactionGroup(
621 "table_ids must be sorted".to_owned(),
622 ));
623 }
624
625 let parent_table_ids = {
626 let versioning_guard = self.versioning.read().await;
627 versioning_guard
628 .current_version
629 .state_table_info
630 .compaction_group_member_table_ids(parent_group_id)
631 .iter()
632 .copied()
633 .collect_vec()
634 };
635
636 if parent_table_ids == table_ids {
637 return Err(Error::CompactionGroup(format!(
638 "invalid split attempt for group {}: all member tables are moved",
639 parent_group_id
640 )));
641 }
642
643 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<TableId>>) {
644 {
646 cg_id_to_table_ids
647 .iter()
648 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
649 }
650
651 {
653 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
654 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
655 assert!(table_table_ids_vec.concat().is_sorted());
656 }
657
658 {
660 let mut all_table_ids = HashSet::new();
661 for table_ids in cg_id_to_table_ids.values() {
662 for table_id in table_ids {
663 assert!(all_table_ids.insert(*table_id));
664 }
665 }
666 }
667 }
668
669 let mut cg_id_to_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
679 let table_id_to_split = *table_ids.first().unwrap();
680 let mut target_compaction_group_id = 0;
681 let result_vec = self
682 .split_compaction_group_impl(
683 parent_group_id,
684 table_ids,
685 table_id_to_split,
686 VirtualNode::ZERO,
687 partition_vnode_count,
688 )
689 .await?;
690 assert!(result_vec.len() <= 2);
691
692 let mut finish_move = false;
693 for (cg_id, table_ids_after_split) in result_vec {
694 if table_ids_after_split.contains(&table_id_to_split) {
695 target_compaction_group_id = cg_id;
696 }
697
698 if table_ids_after_split == table_ids {
699 finish_move = true;
700 }
701
702 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
703 }
704 check_table_ids_valid(&cg_id_to_table_ids);
705
706 if finish_move {
707 return Ok((target_compaction_group_id, cg_id_to_table_ids));
708 }
709
710 let table_id_to_split = *table_ids.last().unwrap();
713 let result_vec = self
714 .split_compaction_group_impl(
715 target_compaction_group_id,
716 table_ids,
717 table_id_to_split,
718 VirtualNode::MAX_REPRESENTABLE,
719 partition_vnode_count,
720 )
721 .await?;
722 assert!(result_vec.len() <= 2);
723 for (cg_id, table_ids_after_split) in result_vec {
724 if table_ids_after_split.contains(&table_id_to_split) {
725 target_compaction_group_id = cg_id;
726 }
727 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
728 }
729 check_table_ids_valid(&cg_id_to_table_ids);
730
731 Ok((target_compaction_group_id, cg_id_to_table_ids))
732 }
733}
734
735impl HummockManager {
736 pub async fn try_split_compaction_group(
738 &self,
739 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
740 group: CompactionGroupStatistic,
741 ) {
742 if group
743 .compaction_group_config
744 .compaction_config
745 .disable_auto_group_scheduling
746 .unwrap_or(false)
747 {
748 return;
749 }
750 for (table_id, table_size) in &group.table_statistic {
752 self.try_move_high_throughput_table_to_dedicated_cg(
753 table_write_throughput_statistic_manager,
754 *table_id,
755 table_size,
756 group.group_id,
757 )
758 .await;
759 }
760
761 self.try_split_huge_compaction_group(group).await;
763 }
764
765 pub async fn try_move_high_throughput_table_to_dedicated_cg(
767 &self,
768 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
769 table_id: TableId,
770 _table_size: &u64,
771 parent_group_id: u64,
772 ) {
773 let mut table_throughput = table_write_throughput_statistic_manager
774 .get_table_throughput_descending(
775 table_id,
776 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
777 )
778 .peekable();
779
780 if table_throughput.peek().is_none() {
781 return;
782 }
783
784 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
785 table_throughput,
786 self.env.opts.table_high_write_throughput_threshold,
787 self.env
788 .opts
789 .table_stat_high_write_throughput_ratio_for_split,
790 );
791
792 if !is_high_write_throughput {
794 return;
795 }
796
797 let ret = self
798 .move_state_tables_to_dedicated_compaction_group(
799 parent_group_id,
800 &[table_id],
801 Some(self.env.opts.partition_vnode_count),
802 )
803 .await;
804 match ret {
805 Ok(split_result) => {
806 tracing::info!(
807 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
808 table_id,
809 parent_group_id,
810 self.env.opts.partition_vnode_count,
811 split_result
812 );
813 }
814 Err(e) => {
815 tracing::info!(
816 error = %e.as_report(),
817 "failed to split state table [{}] from group-{}",
818 table_id,
819 parent_group_id,
820 )
821 }
822 }
823 }
824
825 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
826 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
827 * self.env.opts.split_group_size_ratio) as u64;
828 let is_huge_hybrid_group =
829 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
831 let mut accumulated_size = 0;
832 let mut table_ids = Vec::default();
833 for (table_id, table_size) in &group.table_statistic {
834 accumulated_size += table_size;
835 table_ids.push(*table_id);
836 assert!(table_ids.is_sorted());
839 if accumulated_size * 2 > group_max_size {
840 let ret = self
841 .move_state_tables_to_dedicated_compaction_group(
842 group.group_id,
843 &table_ids,
844 None,
845 )
846 .await;
847 match ret {
848 Ok(split_result) => {
849 tracing::info!(
850 "split_huge_compaction_group success {:?}",
851 split_result
852 );
853 self.metrics
854 .split_compaction_group_count
855 .with_label_values(&[&group.group_id.to_string()])
856 .inc();
857 return;
858 }
859 Err(e) => {
860 tracing::error!(
861 error = %e.as_report(),
862 "failed to split_huge_compaction_group table {:?} from group-{}",
863 table_ids,
864 group.group_id
865 );
866
867 return;
868 }
869 }
870 }
871 }
872 }
873 }
874
875 pub async fn try_merge_compaction_group(
876 &self,
877 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
878 group: &CompactionGroupStatistic,
879 next_group: &CompactionGroupStatistic,
880 created_tables: &HashSet<TableId>,
881 ) -> Result<()> {
882 GroupMergeValidator::validate_group_merge(
883 group,
884 next_group,
885 created_tables,
886 table_write_throughput_statistic_manager,
887 &self.env.opts,
888 &self.versioning,
889 )
890 .await?;
891
892 match self
893 .merge_compaction_group(group.group_id, next_group.group_id)
894 .await
895 {
896 Ok(()) => {
897 tracing::info!(
898 "merge group-{} to group-{}",
899 next_group.group_id,
900 group.group_id,
901 );
902
903 self.metrics
904 .merge_compaction_group_count
905 .with_label_values(&[&group.group_id.to_string()])
906 .inc();
907 }
908 Err(e) => {
909 tracing::info!(
910 error = %e.as_report(),
911 "failed to merge group-{} group-{}",
912 next_group.group_id,
913 group.group_id,
914 )
915 }
916 }
917
918 Ok(())
919 }
920}
921
922#[derive(Debug, Default)]
923struct GroupMergeValidator {}
924
925impl GroupMergeValidator {
926 pub fn is_table_high_write_throughput(
928 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
929 threshold: u64,
930 high_write_throughput_ratio: f64,
931 ) -> bool {
932 let mut sample_size = 0;
933 let mut high_write_throughput_count = 0;
934 for statistic in table_throughput {
935 sample_size += 1;
936 if statistic.throughput > threshold {
937 high_write_throughput_count += 1;
938 }
939 }
940
941 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
942 }
943
944 pub fn is_table_low_write_throughput(
945 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
946 threshold: u64,
947 low_write_throughput_ratio: f64,
948 ) -> bool {
949 let mut sample_size = 0;
950 let mut low_write_throughput_count = 0;
951 for statistic in table_throughput {
952 sample_size += 1;
953 if statistic.throughput <= threshold {
954 low_write_throughput_count += 1;
955 }
956 }
957
958 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
959 }
960
961 fn check_is_low_write_throughput_compaction_group(
962 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
963 group: &CompactionGroupStatistic,
964 opts: &Arc<MetaOpts>,
965 ) -> bool {
966 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
967 for table_id in group.table_statistic.keys() {
968 let mut table_throughput = table_write_throughput_statistic_manager
969 .get_table_throughput_descending(
970 *table_id,
971 opts.table_stat_throuput_window_seconds_for_merge as i64,
972 )
973 .peekable();
974 if table_throughput.peek().is_none() {
975 continue;
976 }
977
978 table_with_statistic.push(table_throughput);
979 }
980
981 if table_with_statistic.is_empty() {
983 return true;
984 }
985
986 table_with_statistic.into_iter().all(|table_throughput| {
988 Self::is_table_low_write_throughput(
989 table_throughput,
990 opts.table_low_write_throughput_threshold,
991 opts.table_stat_low_write_throughput_ratio_for_merge,
992 )
993 })
994 }
995
996 fn check_is_creating_compaction_group(
997 group: &CompactionGroupStatistic,
998 created_tables: &HashSet<TableId>,
999 ) -> bool {
1000 group
1001 .table_statistic
1002 .keys()
1003 .any(|table_id| !created_tables.contains(table_id))
1004 }
1005
1006 async fn validate_group_merge(
1007 group: &CompactionGroupStatistic,
1008 next_group: &CompactionGroupStatistic,
1009 created_tables: &HashSet<TableId>,
1010 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1011 opts: &Arc<MetaOpts>,
1012 versioning: &MonitoredRwLock<Versioning>,
1013 ) -> Result<()> {
1014 if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1016 && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1017 || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1018 && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1019 {
1020 return Err(Error::CompactionGroup(format!(
1021 "group-{} and group-{} are both StaticCompactionGroupId",
1022 group.group_id, next_group.group_id
1023 )));
1024 }
1025
1026 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1027 return Err(Error::CompactionGroup(format!(
1028 "group-{} or group-{} is empty",
1029 group.group_id, next_group.group_id
1030 )));
1031 }
1032
1033 if group
1034 .compaction_group_config
1035 .compaction_config
1036 .disable_auto_group_scheduling
1037 .unwrap_or(false)
1038 || next_group
1039 .compaction_group_config
1040 .compaction_config
1041 .disable_auto_group_scheduling
1042 .unwrap_or(false)
1043 {
1044 return Err(Error::CompactionGroup(format!(
1045 "group-{} or group-{} disable_auto_group_scheduling",
1046 group.group_id, next_group.group_id
1047 )));
1048 }
1049
1050 if group.compaction_group_config.compaction_config
1060 != next_group.compaction_group_config.compaction_config
1061 {
1062 let left_config = group.compaction_group_config.compaction_config.as_ref();
1063 let right_config = next_group
1064 .compaction_group_config
1065 .compaction_config
1066 .as_ref();
1067
1068 tracing::warn!(
1069 group_id = group.group_id,
1070 next_group_id = next_group.group_id,
1071 left_config = ?left_config,
1072 right_config = ?right_config,
1073 "compaction config mismatch detected while merging compaction groups"
1074 );
1075
1076 return Err(Error::CompactionGroup(format!(
1077 "Cannot merge group {} and next_group {} with different compaction configs. left_config: {:?}, right_config: {:?}",
1078 group.group_id, next_group.group_id, left_config, right_config
1079 )));
1080 }
1081
1082 if Self::check_is_creating_compaction_group(group, created_tables) {
1084 return Err(Error::CompactionGroup(format!(
1085 "Cannot merge creating group {} next_group {}",
1086 group.group_id, next_group.group_id
1087 )));
1088 }
1089
1090 if !Self::check_is_low_write_throughput_compaction_group(
1092 table_write_throughput_statistic_manager,
1093 group,
1094 opts,
1095 ) {
1096 return Err(Error::CompactionGroup(format!(
1097 "Cannot merge high throughput group {} next_group {}",
1098 group.group_id, next_group.group_id
1099 )));
1100 }
1101
1102 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1103 * opts.split_group_size_ratio) as u64;
1104
1105 if (group.group_size + next_group.group_size) > size_limit {
1106 return Err(Error::CompactionGroup(format!(
1107 "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1108 group.group_id,
1109 group.group_size,
1110 next_group.group_id,
1111 next_group.group_size,
1112 size_limit
1113 )));
1114 }
1115
1116 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1117 return Err(Error::CompactionGroup(format!(
1118 "Cannot merge creating group {} next group {}",
1119 group.group_id, next_group.group_id
1120 )));
1121 }
1122
1123 if !Self::check_is_low_write_throughput_compaction_group(
1124 table_write_throughput_statistic_manager,
1125 next_group,
1126 opts,
1127 ) {
1128 return Err(Error::CompactionGroup(format!(
1129 "Cannot merge high throughput group {} next group {}",
1130 group.group_id, next_group.group_id
1131 )));
1132 }
1133
1134 {
1135 let versioning_guard = versioning.read().await;
1137 let levels = &versioning_guard.current_version.levels;
1138 if !levels.contains_key(&group.group_id) {
1139 return Err(Error::CompactionGroup(format!(
1140 "Cannot merge group {} not exist",
1141 group.group_id
1142 )));
1143 }
1144
1145 if !levels.contains_key(&next_group.group_id) {
1146 return Err(Error::CompactionGroup(format!(
1147 "Cannot merge next group {} not exist",
1148 next_group.group_id
1149 )));
1150 }
1151
1152 let group_levels = versioning_guard
1153 .current_version
1154 .get_compaction_group_levels(group.group_id);
1155
1156 let next_group_levels = versioning_guard
1157 .current_version
1158 .get_compaction_group_levels(next_group.group_id);
1159
1160 let group_state = GroupStateValidator::group_state(
1161 group_levels,
1162 group.compaction_group_config.compaction_config().deref(),
1163 );
1164
1165 if group_state.is_write_stop() || group_state.is_emergency() {
1166 return Err(Error::CompactionGroup(format!(
1167 "Cannot merge write limit group {} next group {}",
1168 group.group_id, next_group.group_id
1169 )));
1170 }
1171
1172 let next_group_state = GroupStateValidator::group_state(
1173 next_group_levels,
1174 next_group
1175 .compaction_group_config
1176 .compaction_config()
1177 .deref(),
1178 );
1179
1180 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1181 return Err(Error::CompactionGroup(format!(
1182 "Cannot merge write limit next group {} group {}",
1183 next_group.group_id, group.group_id
1184 )));
1185 }
1186
1187 let l0_sub_level_count_after_merge =
1189 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1190 if GroupStateValidator::write_stop_l0_file_count(
1191 (l0_sub_level_count_after_merge as f64
1192 * opts.compaction_group_merge_dimension_threshold) as usize,
1193 group.compaction_group_config.compaction_config().deref(),
1194 ) {
1195 return Err(Error::CompactionGroup(format!(
1196 "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1197 group.group_id, next_group.group_id
1198 )));
1199 }
1200
1201 let l0_file_count_after_merge =
1202 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1203 if GroupStateValidator::write_stop_l0_file_count(
1204 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1205 as usize,
1206 group.compaction_group_config.compaction_config().deref(),
1207 ) {
1208 return Err(Error::CompactionGroup(format!(
1209 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1210 next_group.group_id, group.group_id
1211 )));
1212 }
1213
1214 let l0_size_after_merge =
1215 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1216
1217 if GroupStateValidator::write_stop_l0_size(
1218 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1219 as u64,
1220 group.compaction_group_config.compaction_config().deref(),
1221 ) {
1222 return Err(Error::CompactionGroup(format!(
1223 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1224 next_group.group_id, group.group_id
1225 )));
1226 }
1227
1228 if GroupStateValidator::emergency_l0_file_count(
1230 (l0_sub_level_count_after_merge as f64
1231 * opts.compaction_group_merge_dimension_threshold) as usize,
1232 group.compaction_group_config.compaction_config().deref(),
1233 ) {
1234 return Err(Error::CompactionGroup(format!(
1235 "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1236 group.group_id, next_group.group_id
1237 )));
1238 }
1239 }
1240
1241 Ok(())
1242 }
1243}