1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<u32>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<u32>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 assert!(cfg!(debug_assertions));
122 created_tables
123 } else {
124 match self.metadata_manager.get_created_table_ids().await {
125 Ok(created_tables) => HashSet::from_iter(created_tables),
126 Err(err) => {
127 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
128 return Err(Error::CompactionGroup(format!(
129 "merge group_1 {} group_2 {} failed to fetch created table ids",
130 group_1, group_2
131 )));
132 }
133 }
134 };
135
136 fn contains_creating_table(
137 table_ids: &Vec<TableId>,
138 created_tables: &HashSet<u32>,
139 ) -> bool {
140 table_ids
141 .iter()
142 .any(|table_id| !created_tables.contains(&table_id.table_id()))
143 }
144
145 if contains_creating_table(&member_table_ids_1, &created_tables)
147 || contains_creating_table(&member_table_ids_2, &created_tables)
148 {
149 return Err(Error::CompactionGroup(format!(
150 "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
151 group_1, group_2, member_table_ids_1, member_table_ids_2
152 )));
153 }
154
155 let (left_group_id, right_group_id) =
157 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
158 (group_1, group_2)
159 } else {
160 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
161 (group_2, group_1)
162 };
163
164 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
166 return Err(Error::CompactionGroup(format!(
167 "invalid merge group_1 {} group_2 {}",
168 left_group_id, right_group_id
169 )));
170 }
171
172 let combined_member_table_ids = member_table_ids_1
173 .iter()
174 .chain(member_table_ids_2.iter())
175 .collect_vec();
176 assert!(combined_member_table_ids.is_sorted());
177
178 let mut sst_id_set = HashSet::new();
180 for sst_id in versioning
181 .current_version
182 .get_sst_ids_by_group_id(left_group_id)
183 .chain(
184 versioning
185 .current_version
186 .get_sst_ids_by_group_id(right_group_id),
187 )
188 {
189 if !sst_id_set.insert(sst_id) {
190 return Err(Error::CompactionGroup(format!(
191 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
192 left_group_id, right_group_id, sst_id
193 )));
194 }
195 }
196
197 {
199 let left_levels = versioning
200 .current_version
201 .get_compaction_group_levels(group_1);
202
203 let right_levels = versioning
204 .current_version
205 .get_compaction_group_levels(group_2);
206
207 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
210 for level_idx in 1..=max_level {
211 let left_level = left_levels.get_level(level_idx);
212 let right_level = right_levels.get_level(level_idx);
213 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
214 continue;
215 }
216
217 let left_last_sst = left_level.table_infos.last().unwrap().clone();
218 let right_first_sst = right_level.table_infos.first().unwrap().clone();
219 let left_sst_id = left_last_sst.sst_id;
220 let right_sst_id = right_first_sst.sst_id;
221 let left_obj_id = left_last_sst.object_id;
222 let right_obj_id = right_first_sst.object_id;
223
224 if !can_concat(&[left_last_sst, right_first_sst]) {
226 return Err(Error::CompactionGroup(format!(
227 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
228 left_group_id,
229 right_group_id,
230 level_idx,
231 left_sst_id,
232 right_sst_id,
233 left_obj_id,
234 right_obj_id
235 )));
236 }
237 }
238 }
239
240 let mut version = HummockVersionTransaction::new(
241 &mut versioning.current_version,
242 &mut versioning.hummock_version_deltas,
243 self.env.notification_manager(),
244 None,
245 &self.metrics,
246 );
247 let mut new_version_delta = version.new_delta();
248
249 let target_compaction_group_id = {
250 new_version_delta.group_deltas.insert(
252 left_group_id,
253 GroupDeltas {
254 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
255 left_group_id,
256 right_group_id,
257 })],
258 },
259 );
260 left_group_id
261 };
262
263 new_version_delta.with_latest_version(|version, new_version_delta| {
266 for table_id in combined_member_table_ids {
267 let table_id = TableId::new(table_id.table_id());
268 let info = version
269 .state_table_info
270 .info()
271 .get(&table_id)
272 .expect("have check exist previously");
273 assert!(
274 new_version_delta
275 .state_table_info_delta
276 .insert(
277 table_id,
278 PbStateTableInfoDelta {
279 committed_epoch: info.committed_epoch,
280 compaction_group_id: target_compaction_group_id,
281 }
282 )
283 .is_none()
284 );
285 }
286 });
287
288 {
289 let mut compaction_group_manager = self.compaction_group_manager.write().await;
290 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
291
292 {
294 let right_group_max_level = new_version_delta
295 .latest_version()
296 .get_compaction_group_levels(right_group_id)
297 .levels
298 .len();
299
300 remove_compaction_group_in_sst_stat(
301 &self.metrics,
302 right_group_id,
303 right_group_max_level,
304 );
305 }
306
307 {
309 if let Err(err) = compaction_groups_txn.update_compaction_config(
310 &[left_group_id],
311 &[MutableConfig::SplitWeightByVnode(0)], ) {
313 tracing::error!(
314 error = %err.as_report(),
315 "failed to update compaction config for group-{}",
316 left_group_id
317 );
318 }
319 }
320
321 new_version_delta.pre_apply();
322
323 compaction_groups_txn.remove(right_group_id);
325 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
326 }
327
328 versioning.mark_next_time_travel_version_snapshot();
330
331 let mut canceled_tasks = vec![];
333 let compact_task_assignments =
336 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
337 compact_task_assignments
338 .into_iter()
339 .for_each(|task_assignment| {
340 if let Some(task) = task_assignment.compact_task.as_ref() {
341 assert_eq!(task.compaction_group_id, right_group_id);
342 canceled_tasks.push(ReportTask {
343 task_id: task.task_id,
344 task_status: TaskStatus::ManualCanceled,
345 table_stats_change: HashMap::default(),
346 sorted_output_ssts: vec![],
347 object_timestamps: HashMap::default(),
348 });
349 }
350 });
351
352 if !canceled_tasks.is_empty() {
353 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
354 .await?;
355 }
356
357 self.metrics
358 .merge_compaction_group_count
359 .with_label_values(&[&left_group_id.to_string()])
360 .inc();
361
362 Ok(())
363 }
364}
365
366impl HummockManager {
367 async fn split_compaction_group_impl(
379 &self,
380 parent_group_id: CompactionGroupId,
381 split_table_ids: &[StateTableId],
382 table_id_to_split: StateTableId,
383 vnode_to_split: VirtualNode,
384 partition_vnode_count: Option<u32>,
385 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
386 let mut result = vec![];
387 let compaction_guard = self.compaction.write().await;
388 let mut versioning_guard = self.versioning.write().await;
389 let versioning = versioning_guard.deref_mut();
390 if !versioning
392 .current_version
393 .levels
394 .contains_key(&parent_group_id)
395 {
396 return Err(Error::CompactionGroup(format!(
397 "invalid group {}",
398 parent_group_id
399 )));
400 }
401
402 let member_table_ids = versioning
403 .current_version
404 .state_table_info
405 .compaction_group_member_table_ids(parent_group_id)
406 .iter()
407 .map(|table_id| table_id.table_id)
408 .collect::<BTreeSet<_>>();
409
410 if !member_table_ids.contains(&table_id_to_split) {
411 return Err(Error::CompactionGroup(format!(
412 "table {} doesn't in group {}",
413 table_id_to_split, parent_group_id
414 )));
415 }
416
417 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
418
419 let table_ids = member_table_ids.into_iter().collect_vec();
421 if table_ids == split_table_ids {
422 return Err(Error::CompactionGroup(format!(
423 "invalid split attempt for group {}: all member tables are moved",
424 parent_group_id
425 )));
426 }
427 let (table_ids_left, table_ids_right) =
429 group_split::split_table_ids_with_table_id_and_vnode(
430 &table_ids,
431 split_full_key.user_key.table_id.table_id(),
432 split_full_key.user_key.get_vnode_id(),
433 );
434 if table_ids_left.is_empty() || table_ids_right.is_empty() {
435 if !table_ids_left.is_empty() {
437 result.push((parent_group_id, table_ids_left));
438 }
439
440 if !table_ids_right.is_empty() {
441 result.push((parent_group_id, table_ids_right));
442 }
443 return Ok(result);
444 }
445
446 result.push((parent_group_id, table_ids_left));
447
448 let split_key: Bytes = split_full_key.encode().into();
449
450 let mut version = HummockVersionTransaction::new(
451 &mut versioning.current_version,
452 &mut versioning.hummock_version_deltas,
453 self.env.notification_manager(),
454 None,
455 &self.metrics,
456 );
457 let mut new_version_delta = version.new_delta();
458
459 let split_sst_count = new_version_delta
460 .latest_version()
461 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
462
463 let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
464 let (new_compaction_group_id, config) = {
465 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
467 let config = self
469 .compaction_group_manager
470 .read()
471 .await
472 .default_compaction_config()
473 .as_ref()
474 .clone();
475
476 #[expect(deprecated)]
477 new_version_delta.group_deltas.insert(
479 new_compaction_group_id,
480 GroupDeltas {
481 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
482 group_config: Some(config.clone()),
483 group_id: new_compaction_group_id,
484 parent_group_id,
485 new_sst_start_id: new_sst_start_id.inner(),
486 table_ids: vec![],
487 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
489 }))],
490 },
491 );
492 (new_compaction_group_id, config)
493 };
494
495 new_version_delta.with_latest_version(|version, new_version_delta| {
496 for table_id in &table_ids_right {
497 let table_id = TableId::new(*table_id);
498 let info = version
499 .state_table_info
500 .info()
501 .get(&table_id)
502 .expect("have check exist previously");
503 assert!(
504 new_version_delta
505 .state_table_info_delta
506 .insert(
507 table_id,
508 PbStateTableInfoDelta {
509 committed_epoch: info.committed_epoch,
510 compaction_group_id: new_compaction_group_id,
511 }
512 )
513 .is_none()
514 );
515 }
516 });
517
518 result.push((new_compaction_group_id, table_ids_right));
519
520 {
521 let mut compaction_group_manager = self.compaction_group_manager.write().await;
522 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
523 compaction_groups_txn
524 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
525
526 for (cg_id, table_ids) in &result {
530 if let Some(partition_vnode_count) = partition_vnode_count
532 && table_ids.len() == 1
533 && table_ids == split_table_ids
534 && let Err(err) = compaction_groups_txn.update_compaction_config(
535 &[*cg_id],
536 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
537 )
538 {
539 tracing::error!(
540 error = %err.as_report(),
541 "failed to update compaction config for group-{}",
542 cg_id
543 );
544 }
545 }
546
547 new_version_delta.pre_apply();
548 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
549 }
550 versioning.mark_next_time_travel_version_snapshot();
552
553 let mut canceled_tasks = vec![];
556 let compact_task_assignments =
557 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
558 let levels = versioning
559 .current_version
560 .get_compaction_group_levels(parent_group_id);
561 compact_task_assignments
562 .into_iter()
563 .for_each(|task_assignment| {
564 if let Some(task) = task_assignment.compact_task.as_ref() {
565 let is_expired = is_compaction_task_expired(
566 task.compaction_group_version_id,
567 levels.compaction_group_version_id,
568 );
569 if is_expired {
570 canceled_tasks.push(ReportTask {
571 task_id: task.task_id,
572 task_status: TaskStatus::ManualCanceled,
573 table_stats_change: HashMap::default(),
574 sorted_output_ssts: vec![],
575 object_timestamps: HashMap::default(),
576 });
577 }
578 }
579 });
580
581 if !canceled_tasks.is_empty() {
582 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
583 .await?;
584 }
585
586 self.metrics
587 .split_compaction_group_count
588 .with_label_values(&[&parent_group_id.to_string()])
589 .inc();
590
591 Ok(result)
592 }
593
594 pub async fn move_state_tables_to_dedicated_compaction_group(
597 &self,
598 parent_group_id: CompactionGroupId,
599 table_ids: &[StateTableId],
600 partition_vnode_count: Option<u32>,
601 ) -> Result<(
602 CompactionGroupId,
603 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
604 )> {
605 if table_ids.is_empty() {
606 return Err(Error::CompactionGroup(
607 "table_ids must not be empty".to_owned(),
608 ));
609 }
610
611 if !table_ids.is_sorted() {
612 return Err(Error::CompactionGroup(
613 "table_ids must be sorted".to_owned(),
614 ));
615 }
616
617 let parent_table_ids = {
618 let versioning_guard = self.versioning.read().await;
619 versioning_guard
620 .current_version
621 .state_table_info
622 .compaction_group_member_table_ids(parent_group_id)
623 .iter()
624 .map(|table_id| table_id.table_id)
625 .collect_vec()
626 };
627
628 if parent_table_ids == table_ids {
629 return Err(Error::CompactionGroup(format!(
630 "invalid split attempt for group {}: all member tables are moved",
631 parent_group_id
632 )));
633 }
634
635 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
636 {
638 cg_id_to_table_ids
639 .iter()
640 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
641 }
642
643 {
645 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
646 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
647 assert!(table_table_ids_vec.concat().is_sorted());
648 }
649
650 {
652 let mut all_table_ids = HashSet::new();
653 for table_ids in cg_id_to_table_ids.values() {
654 for table_id in table_ids {
655 assert!(all_table_ids.insert(*table_id));
656 }
657 }
658 }
659 }
660
661 let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
671 let table_id_to_split = *table_ids.first().unwrap();
672 let mut target_compaction_group_id = 0;
673 let result_vec = self
674 .split_compaction_group_impl(
675 parent_group_id,
676 table_ids,
677 table_id_to_split,
678 VirtualNode::ZERO,
679 partition_vnode_count,
680 )
681 .await?;
682 assert!(result_vec.len() <= 2);
683
684 let mut finish_move = false;
685 for (cg_id, table_ids_after_split) in result_vec {
686 if table_ids_after_split.contains(&table_id_to_split) {
687 target_compaction_group_id = cg_id;
688 }
689
690 if table_ids_after_split == table_ids {
691 finish_move = true;
692 }
693
694 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
695 }
696 check_table_ids_valid(&cg_id_to_table_ids);
697
698 if finish_move {
699 return Ok((target_compaction_group_id, cg_id_to_table_ids));
700 }
701
702 let table_id_to_split = *table_ids.last().unwrap();
705 let result_vec = self
706 .split_compaction_group_impl(
707 target_compaction_group_id,
708 table_ids,
709 table_id_to_split,
710 VirtualNode::MAX_REPRESENTABLE,
711 partition_vnode_count,
712 )
713 .await?;
714 assert!(result_vec.len() <= 2);
715 for (cg_id, table_ids_after_split) in result_vec {
716 if table_ids_after_split.contains(&table_id_to_split) {
717 target_compaction_group_id = cg_id;
718 }
719 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
720 }
721 check_table_ids_valid(&cg_id_to_table_ids);
722
723 Ok((target_compaction_group_id, cg_id_to_table_ids))
724 }
725}
726
727impl HummockManager {
728 pub async fn try_split_compaction_group(
730 &self,
731 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
732 group: CompactionGroupStatistic,
733 ) {
734 if group
735 .compaction_group_config
736 .compaction_config
737 .disable_auto_group_scheduling
738 .unwrap_or(false)
739 {
740 return;
741 }
742 for (table_id, table_size) in &group.table_statistic {
744 self.try_move_high_throughput_table_to_dedicated_cg(
745 table_write_throughput_statistic_manager,
746 *table_id,
747 table_size,
748 group.group_id,
749 )
750 .await;
751 }
752
753 self.try_split_huge_compaction_group(group).await;
755 }
756
757 pub async fn try_move_high_throughput_table_to_dedicated_cg(
759 &self,
760 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
761 table_id: u32,
762 _table_size: &u64,
763 parent_group_id: u64,
764 ) {
765 let mut table_throughput = table_write_throughput_statistic_manager
766 .get_table_throughput_descending(
767 table_id,
768 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
769 )
770 .peekable();
771
772 if table_throughput.peek().is_none() {
773 return;
774 }
775
776 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
777 table_throughput,
778 self.env.opts.table_high_write_throughput_threshold,
779 self.env
780 .opts
781 .table_stat_high_write_throughput_ratio_for_split,
782 );
783
784 if !is_high_write_throughput {
786 return;
787 }
788
789 let ret = self
790 .move_state_tables_to_dedicated_compaction_group(
791 parent_group_id,
792 &[table_id],
793 Some(self.env.opts.partition_vnode_count),
794 )
795 .await;
796 match ret {
797 Ok(split_result) => {
798 tracing::info!(
799 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
800 table_id,
801 parent_group_id,
802 self.env.opts.partition_vnode_count,
803 split_result
804 );
805 }
806 Err(e) => {
807 tracing::info!(
808 error = %e.as_report(),
809 "failed to split state table [{}] from group-{}",
810 table_id,
811 parent_group_id,
812 )
813 }
814 }
815 }
816
817 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
818 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
819 * self.env.opts.split_group_size_ratio) as u64;
820 let is_huge_hybrid_group =
821 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
823 let mut accumulated_size = 0;
824 let mut table_ids = Vec::default();
825 for (table_id, table_size) in &group.table_statistic {
826 accumulated_size += table_size;
827 table_ids.push(*table_id);
828 assert!(table_ids.is_sorted());
831 if accumulated_size * 2 > group_max_size {
832 let ret = self
833 .move_state_tables_to_dedicated_compaction_group(
834 group.group_id,
835 &table_ids,
836 None,
837 )
838 .await;
839 match ret {
840 Ok(split_result) => {
841 tracing::info!(
842 "split_huge_compaction_group success {:?}",
843 split_result
844 );
845 self.metrics
846 .split_compaction_group_count
847 .with_label_values(&[&group.group_id.to_string()])
848 .inc();
849 return;
850 }
851 Err(e) => {
852 tracing::error!(
853 error = %e.as_report(),
854 "failed to split_huge_compaction_group table {:?} from group-{}",
855 table_ids,
856 group.group_id
857 );
858
859 return;
860 }
861 }
862 }
863 }
864 }
865 }
866
867 pub async fn try_merge_compaction_group(
868 &self,
869 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
870 group: &CompactionGroupStatistic,
871 next_group: &CompactionGroupStatistic,
872 created_tables: &HashSet<u32>,
873 ) -> Result<()> {
874 GroupMergeValidator::validate_group_merge(
875 group,
876 next_group,
877 created_tables,
878 table_write_throughput_statistic_manager,
879 &self.env.opts,
880 &self.versioning,
881 )
882 .await?;
883
884 match self
885 .merge_compaction_group(group.group_id, next_group.group_id)
886 .await
887 {
888 Ok(()) => {
889 tracing::info!(
890 "merge group-{} to group-{}",
891 next_group.group_id,
892 group.group_id,
893 );
894
895 self.metrics
896 .merge_compaction_group_count
897 .with_label_values(&[&group.group_id.to_string()])
898 .inc();
899 }
900 Err(e) => {
901 tracing::info!(
902 error = %e.as_report(),
903 "failed to merge group-{} group-{}",
904 next_group.group_id,
905 group.group_id,
906 )
907 }
908 }
909
910 Ok(())
911 }
912}
913
914#[derive(Debug, Default)]
915struct GroupMergeValidator {}
916
917impl GroupMergeValidator {
918 pub fn is_table_high_write_throughput(
920 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
921 threshold: u64,
922 high_write_throughput_ratio: f64,
923 ) -> bool {
924 let mut sample_size = 0;
925 let mut high_write_throughput_count = 0;
926 for statistic in table_throughput {
927 sample_size += 1;
928 if statistic.throughput > threshold {
929 high_write_throughput_count += 1;
930 }
931 }
932
933 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
934 }
935
936 pub fn is_table_low_write_throughput(
937 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
938 threshold: u64,
939 low_write_throughput_ratio: f64,
940 ) -> bool {
941 let mut sample_size = 0;
942 let mut low_write_throughput_count = 0;
943 for statistic in table_throughput {
944 sample_size += 1;
945 if statistic.throughput <= threshold {
946 low_write_throughput_count += 1;
947 }
948 }
949
950 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
951 }
952
953 fn check_is_low_write_throughput_compaction_group(
954 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
955 group: &CompactionGroupStatistic,
956 opts: &Arc<MetaOpts>,
957 ) -> bool {
958 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
959 for table_id in group.table_statistic.keys() {
960 let mut table_throughput = table_write_throughput_statistic_manager
961 .get_table_throughput_descending(
962 *table_id,
963 opts.table_stat_throuput_window_seconds_for_merge as i64,
964 )
965 .peekable();
966 if table_throughput.peek().is_none() {
967 continue;
968 }
969
970 table_with_statistic.push(table_throughput);
971 }
972
973 if table_with_statistic.is_empty() {
975 return true;
976 }
977
978 table_with_statistic.into_iter().all(|table_throughput| {
980 Self::is_table_low_write_throughput(
981 table_throughput,
982 opts.table_low_write_throughput_threshold,
983 opts.table_stat_low_write_throughput_ratio_for_merge,
984 )
985 })
986 }
987
988 fn check_is_creating_compaction_group(
989 group: &CompactionGroupStatistic,
990 created_tables: &HashSet<u32>,
991 ) -> bool {
992 group
993 .table_statistic
994 .keys()
995 .any(|table_id| !created_tables.contains(table_id))
996 }
997
998 async fn validate_group_merge(
999 group: &CompactionGroupStatistic,
1000 next_group: &CompactionGroupStatistic,
1001 created_tables: &HashSet<u32>,
1002 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1003 opts: &Arc<MetaOpts>,
1004 versioning: &MonitoredRwLock<Versioning>,
1005 ) -> Result<()> {
1006 if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1008 && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1009 || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1010 && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1011 {
1012 return Err(Error::CompactionGroup(format!(
1013 "group-{} and group-{} are both StaticCompactionGroupId",
1014 group.group_id, next_group.group_id
1015 )));
1016 }
1017
1018 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1019 return Err(Error::CompactionGroup(format!(
1020 "group-{} or group-{} is empty",
1021 group.group_id, next_group.group_id
1022 )));
1023 }
1024
1025 if group
1026 .compaction_group_config
1027 .compaction_config
1028 .disable_auto_group_scheduling
1029 .unwrap_or(false)
1030 || next_group
1031 .compaction_group_config
1032 .compaction_config
1033 .disable_auto_group_scheduling
1034 .unwrap_or(false)
1035 {
1036 return Err(Error::CompactionGroup(format!(
1037 "group-{} or group-{} disable_auto_group_scheduling",
1038 group.group_id, next_group.group_id
1039 )));
1040 }
1041
1042 if Self::check_is_creating_compaction_group(group, created_tables) {
1044 return Err(Error::CompactionGroup(format!(
1045 "Not Merge creating group {} next_group {}",
1046 group.group_id, next_group.group_id
1047 )));
1048 }
1049
1050 if !Self::check_is_low_write_throughput_compaction_group(
1052 table_write_throughput_statistic_manager,
1053 group,
1054 opts,
1055 ) {
1056 return Err(Error::CompactionGroup(format!(
1057 "Not Merge high throughput group {} next_group {}",
1058 group.group_id, next_group.group_id
1059 )));
1060 }
1061
1062 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1063 * opts.split_group_size_ratio) as u64;
1064
1065 if (group.group_size + next_group.group_size) > size_limit {
1066 return Err(Error::CompactionGroup(format!(
1067 "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1068 group.group_id,
1069 group.group_size,
1070 next_group.group_id,
1071 next_group.group_size,
1072 size_limit
1073 )));
1074 }
1075
1076 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1077 return Err(Error::CompactionGroup(format!(
1078 "Not Merge creating group {} next group {}",
1079 group.group_id, next_group.group_id
1080 )));
1081 }
1082
1083 if !Self::check_is_low_write_throughput_compaction_group(
1084 table_write_throughput_statistic_manager,
1085 next_group,
1086 opts,
1087 ) {
1088 return Err(Error::CompactionGroup(format!(
1089 "Not Merge high throughput group {} next group {}",
1090 group.group_id, next_group.group_id
1091 )));
1092 }
1093
1094 {
1095 let versioning_guard = versioning.read().await;
1097 let levels = &versioning_guard.current_version.levels;
1098 if !levels.contains_key(&group.group_id) {
1099 return Err(Error::CompactionGroup(format!(
1100 "Not Merge group {} not exist",
1101 group.group_id
1102 )));
1103 }
1104
1105 if !levels.contains_key(&next_group.group_id) {
1106 return Err(Error::CompactionGroup(format!(
1107 "Not Merge next group {} not exist",
1108 next_group.group_id
1109 )));
1110 }
1111
1112 let group_levels = versioning_guard
1113 .current_version
1114 .get_compaction_group_levels(group.group_id);
1115
1116 let next_group_levels = versioning_guard
1117 .current_version
1118 .get_compaction_group_levels(next_group.group_id);
1119
1120 let group_state = GroupStateValidator::group_state(
1121 group_levels,
1122 group.compaction_group_config.compaction_config().deref(),
1123 );
1124
1125 if group_state.is_write_stop() || group_state.is_emergency() {
1126 return Err(Error::CompactionGroup(format!(
1127 "Not Merge write limit group {} next group {}",
1128 group.group_id, next_group.group_id
1129 )));
1130 }
1131
1132 let next_group_state = GroupStateValidator::group_state(
1133 next_group_levels,
1134 next_group
1135 .compaction_group_config
1136 .compaction_config()
1137 .deref(),
1138 );
1139
1140 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1141 return Err(Error::CompactionGroup(format!(
1142 "Not Merge write limit next group {} group {}",
1143 next_group.group_id, group.group_id
1144 )));
1145 }
1146
1147 let l0_sub_level_count_after_merge =
1149 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1150 if GroupStateValidator::write_stop_l0_file_count(
1151 (l0_sub_level_count_after_merge as f64
1152 * opts.compaction_group_merge_dimension_threshold) as usize,
1153 group.compaction_group_config.compaction_config().deref(),
1154 ) {
1155 return Err(Error::CompactionGroup(format!(
1156 "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1157 group.group_id, next_group.group_id
1158 )));
1159 }
1160
1161 let l0_file_count_after_merge =
1162 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1163 if GroupStateValidator::write_stop_l0_file_count(
1164 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1165 as usize,
1166 group.compaction_group_config.compaction_config().deref(),
1167 ) {
1168 return Err(Error::CompactionGroup(format!(
1169 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1170 next_group.group_id, group.group_id
1171 )));
1172 }
1173
1174 let l0_size_after_merge =
1175 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1176
1177 if GroupStateValidator::write_stop_l0_size(
1178 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1179 as u64,
1180 group.compaction_group_config.compaction_config().deref(),
1181 ) {
1182 return Err(Error::CompactionGroup(format!(
1183 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1184 next_group.group_id, group.group_id
1185 )));
1186 }
1187
1188 if GroupStateValidator::emergency_l0_file_count(
1190 (l0_sub_level_count_after_merge as f64
1191 * opts.compaction_group_merge_dimension_threshold) as usize,
1192 group.compaction_group_config.compaction_config().deref(),
1193 ) {
1194 return Err(Error::CompactionGroup(format!(
1195 "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1196 group.group_id, next_group.group_id
1197 )));
1198 }
1199 }
1200
1201 Ok(())
1202 }
1203}