1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<u32>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<u32>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 #[expect(clippy::assertions_on_constants)]
122 {
123 assert!(cfg!(debug_assertions));
124 }
125 created_tables
126 } else {
127 match self.metadata_manager.get_created_table_ids().await {
128 Ok(created_tables) => HashSet::from_iter(created_tables),
129 Err(err) => {
130 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131 return Err(Error::CompactionGroup(format!(
132 "merge group_1 {} group_2 {} failed to fetch created table ids",
133 group_1, group_2
134 )));
135 }
136 }
137 };
138
139 fn contains_creating_table(
140 table_ids: &Vec<TableId>,
141 created_tables: &HashSet<u32>,
142 ) -> bool {
143 table_ids
144 .iter()
145 .any(|table_id| !created_tables.contains(&table_id.table_id()))
146 }
147
148 if contains_creating_table(&member_table_ids_1, &created_tables)
150 || contains_creating_table(&member_table_ids_2, &created_tables)
151 {
152 return Err(Error::CompactionGroup(format!(
153 "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154 group_1, group_2, member_table_ids_1, member_table_ids_2
155 )));
156 }
157
158 let (left_group_id, right_group_id) =
160 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161 (group_1, group_2)
162 } else {
163 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164 (group_2, group_1)
165 };
166
167 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169 return Err(Error::CompactionGroup(format!(
170 "invalid merge group_1 {} group_2 {}",
171 left_group_id, right_group_id
172 )));
173 }
174
175 let combined_member_table_ids = member_table_ids_1
176 .iter()
177 .chain(member_table_ids_2.iter())
178 .collect_vec();
179 assert!(combined_member_table_ids.is_sorted());
180
181 let mut sst_id_set = HashSet::new();
183 for sst_id in versioning
184 .current_version
185 .get_sst_ids_by_group_id(left_group_id)
186 .chain(
187 versioning
188 .current_version
189 .get_sst_ids_by_group_id(right_group_id),
190 )
191 {
192 if !sst_id_set.insert(sst_id) {
193 return Err(Error::CompactionGroup(format!(
194 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195 left_group_id, right_group_id, sst_id
196 )));
197 }
198 }
199
200 {
202 let left_levels = versioning
203 .current_version
204 .get_compaction_group_levels(group_1);
205
206 let right_levels = versioning
207 .current_version
208 .get_compaction_group_levels(group_2);
209
210 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213 for level_idx in 1..=max_level {
214 let left_level = left_levels.get_level(level_idx);
215 let right_level = right_levels.get_level(level_idx);
216 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217 continue;
218 }
219
220 let left_last_sst = left_level.table_infos.last().unwrap().clone();
221 let right_first_sst = right_level.table_infos.first().unwrap().clone();
222 let left_sst_id = left_last_sst.sst_id;
223 let right_sst_id = right_first_sst.sst_id;
224 let left_obj_id = left_last_sst.object_id;
225 let right_obj_id = right_first_sst.object_id;
226
227 if !can_concat(&[left_last_sst, right_first_sst]) {
229 return Err(Error::CompactionGroup(format!(
230 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231 left_group_id,
232 right_group_id,
233 level_idx,
234 left_sst_id,
235 right_sst_id,
236 left_obj_id,
237 right_obj_id
238 )));
239 }
240 }
241 }
242
243 let mut version = HummockVersionTransaction::new(
244 &mut versioning.current_version,
245 &mut versioning.hummock_version_deltas,
246 self.env.notification_manager(),
247 None,
248 &self.metrics,
249 );
250 let mut new_version_delta = version.new_delta();
251
252 let target_compaction_group_id = {
253 new_version_delta.group_deltas.insert(
255 left_group_id,
256 GroupDeltas {
257 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258 left_group_id,
259 right_group_id,
260 })],
261 },
262 );
263 left_group_id
264 };
265
266 new_version_delta.with_latest_version(|version, new_version_delta| {
269 for table_id in combined_member_table_ids {
270 let table_id = TableId::new(table_id.table_id());
271 let info = version
272 .state_table_info
273 .info()
274 .get(&table_id)
275 .expect("have check exist previously");
276 assert!(
277 new_version_delta
278 .state_table_info_delta
279 .insert(
280 table_id,
281 PbStateTableInfoDelta {
282 committed_epoch: info.committed_epoch,
283 compaction_group_id: target_compaction_group_id,
284 }
285 )
286 .is_none()
287 );
288 }
289 });
290
291 {
292 let mut compaction_group_manager = self.compaction_group_manager.write().await;
293 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
294
295 {
297 let right_group_max_level = new_version_delta
298 .latest_version()
299 .get_compaction_group_levels(right_group_id)
300 .levels
301 .len();
302
303 remove_compaction_group_in_sst_stat(
304 &self.metrics,
305 right_group_id,
306 right_group_max_level,
307 );
308 }
309
310 {
312 if let Err(err) = compaction_groups_txn.update_compaction_config(
313 &[left_group_id],
314 &[MutableConfig::SplitWeightByVnode(0)], ) {
316 tracing::error!(
317 error = %err.as_report(),
318 "failed to update compaction config for group-{}",
319 left_group_id
320 );
321 }
322 }
323
324 new_version_delta.pre_apply();
325
326 compaction_groups_txn.remove(right_group_id);
328 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
329 }
330
331 versioning.mark_next_time_travel_version_snapshot();
333
334 let mut canceled_tasks = vec![];
336 let compact_task_assignments =
339 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
340 compact_task_assignments
341 .into_iter()
342 .for_each(|task_assignment| {
343 if let Some(task) = task_assignment.compact_task.as_ref() {
344 assert_eq!(task.compaction_group_id, right_group_id);
345 canceled_tasks.push(ReportTask {
346 task_id: task.task_id,
347 task_status: TaskStatus::ManualCanceled,
348 table_stats_change: HashMap::default(),
349 sorted_output_ssts: vec![],
350 object_timestamps: HashMap::default(),
351 });
352 }
353 });
354
355 if !canceled_tasks.is_empty() {
356 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
357 .await?;
358 }
359
360 self.metrics
361 .merge_compaction_group_count
362 .with_label_values(&[&left_group_id.to_string()])
363 .inc();
364
365 Ok(())
366 }
367}
368
369impl HummockManager {
370 async fn split_compaction_group_impl(
382 &self,
383 parent_group_id: CompactionGroupId,
384 split_table_ids: &[StateTableId],
385 table_id_to_split: StateTableId,
386 vnode_to_split: VirtualNode,
387 partition_vnode_count: Option<u32>,
388 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
389 let mut result = vec![];
390 let compaction_guard = self.compaction.write().await;
391 let mut versioning_guard = self.versioning.write().await;
392 let versioning = versioning_guard.deref_mut();
393 if !versioning
395 .current_version
396 .levels
397 .contains_key(&parent_group_id)
398 {
399 return Err(Error::CompactionGroup(format!(
400 "invalid group {}",
401 parent_group_id
402 )));
403 }
404
405 let member_table_ids = versioning
406 .current_version
407 .state_table_info
408 .compaction_group_member_table_ids(parent_group_id)
409 .iter()
410 .map(|table_id| table_id.table_id)
411 .collect::<BTreeSet<_>>();
412
413 if !member_table_ids.contains(&table_id_to_split) {
414 return Err(Error::CompactionGroup(format!(
415 "table {} doesn't in group {}",
416 table_id_to_split, parent_group_id
417 )));
418 }
419
420 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
421
422 let table_ids = member_table_ids.into_iter().collect_vec();
424 if table_ids == split_table_ids {
425 return Err(Error::CompactionGroup(format!(
426 "invalid split attempt for group {}: all member tables are moved",
427 parent_group_id
428 )));
429 }
430 let (table_ids_left, table_ids_right) =
432 group_split::split_table_ids_with_table_id_and_vnode(
433 &table_ids,
434 split_full_key.user_key.table_id.table_id(),
435 split_full_key.user_key.get_vnode_id(),
436 );
437 if table_ids_left.is_empty() || table_ids_right.is_empty() {
438 if !table_ids_left.is_empty() {
440 result.push((parent_group_id, table_ids_left));
441 }
442
443 if !table_ids_right.is_empty() {
444 result.push((parent_group_id, table_ids_right));
445 }
446 return Ok(result);
447 }
448
449 result.push((parent_group_id, table_ids_left));
450
451 let split_key: Bytes = split_full_key.encode().into();
452
453 let mut version = HummockVersionTransaction::new(
454 &mut versioning.current_version,
455 &mut versioning.hummock_version_deltas,
456 self.env.notification_manager(),
457 None,
458 &self.metrics,
459 );
460 let mut new_version_delta = version.new_delta();
461
462 let split_sst_count = new_version_delta
463 .latest_version()
464 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
465
466 let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
467 let (new_compaction_group_id, config) = {
468 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
470 let config = self
472 .compaction_group_manager
473 .read()
474 .await
475 .default_compaction_config()
476 .as_ref()
477 .clone();
478
479 #[expect(deprecated)]
480 new_version_delta.group_deltas.insert(
482 new_compaction_group_id,
483 GroupDeltas {
484 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
485 group_config: Some(config.clone()),
486 group_id: new_compaction_group_id,
487 parent_group_id,
488 new_sst_start_id: new_sst_start_id.inner(),
489 table_ids: vec![],
490 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
492 }))],
493 },
494 );
495 (new_compaction_group_id, config)
496 };
497
498 new_version_delta.with_latest_version(|version, new_version_delta| {
499 for table_id in &table_ids_right {
500 let table_id = TableId::new(*table_id);
501 let info = version
502 .state_table_info
503 .info()
504 .get(&table_id)
505 .expect("have check exist previously");
506 assert!(
507 new_version_delta
508 .state_table_info_delta
509 .insert(
510 table_id,
511 PbStateTableInfoDelta {
512 committed_epoch: info.committed_epoch,
513 compaction_group_id: new_compaction_group_id,
514 }
515 )
516 .is_none()
517 );
518 }
519 });
520
521 result.push((new_compaction_group_id, table_ids_right));
522
523 {
524 let mut compaction_group_manager = self.compaction_group_manager.write().await;
525 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
526 compaction_groups_txn
527 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
528
529 for (cg_id, table_ids) in &result {
533 if let Some(partition_vnode_count) = partition_vnode_count
535 && table_ids.len() == 1
536 && table_ids == split_table_ids
537 && let Err(err) = compaction_groups_txn.update_compaction_config(
538 &[*cg_id],
539 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
540 )
541 {
542 tracing::error!(
543 error = %err.as_report(),
544 "failed to update compaction config for group-{}",
545 cg_id
546 );
547 }
548 }
549
550 new_version_delta.pre_apply();
551 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
552 }
553 versioning.mark_next_time_travel_version_snapshot();
555
556 let mut canceled_tasks = vec![];
559 let compact_task_assignments =
560 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
561 let levels = versioning
562 .current_version
563 .get_compaction_group_levels(parent_group_id);
564 compact_task_assignments
565 .into_iter()
566 .for_each(|task_assignment| {
567 if let Some(task) = task_assignment.compact_task.as_ref() {
568 let is_expired = is_compaction_task_expired(
569 task.compaction_group_version_id,
570 levels.compaction_group_version_id,
571 );
572 if is_expired {
573 canceled_tasks.push(ReportTask {
574 task_id: task.task_id,
575 task_status: TaskStatus::ManualCanceled,
576 table_stats_change: HashMap::default(),
577 sorted_output_ssts: vec![],
578 object_timestamps: HashMap::default(),
579 });
580 }
581 }
582 });
583
584 if !canceled_tasks.is_empty() {
585 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
586 .await?;
587 }
588
589 self.metrics
590 .split_compaction_group_count
591 .with_label_values(&[&parent_group_id.to_string()])
592 .inc();
593
594 Ok(result)
595 }
596
597 pub async fn move_state_tables_to_dedicated_compaction_group(
600 &self,
601 parent_group_id: CompactionGroupId,
602 table_ids: &[StateTableId],
603 partition_vnode_count: Option<u32>,
604 ) -> Result<(
605 CompactionGroupId,
606 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
607 )> {
608 if table_ids.is_empty() {
609 return Err(Error::CompactionGroup(
610 "table_ids must not be empty".to_owned(),
611 ));
612 }
613
614 if !table_ids.is_sorted() {
615 return Err(Error::CompactionGroup(
616 "table_ids must be sorted".to_owned(),
617 ));
618 }
619
620 let parent_table_ids = {
621 let versioning_guard = self.versioning.read().await;
622 versioning_guard
623 .current_version
624 .state_table_info
625 .compaction_group_member_table_ids(parent_group_id)
626 .iter()
627 .map(|table_id| table_id.table_id)
628 .collect_vec()
629 };
630
631 if parent_table_ids == table_ids {
632 return Err(Error::CompactionGroup(format!(
633 "invalid split attempt for group {}: all member tables are moved",
634 parent_group_id
635 )));
636 }
637
638 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
639 {
641 cg_id_to_table_ids
642 .iter()
643 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
644 }
645
646 {
648 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
649 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
650 assert!(table_table_ids_vec.concat().is_sorted());
651 }
652
653 {
655 let mut all_table_ids = HashSet::new();
656 for table_ids in cg_id_to_table_ids.values() {
657 for table_id in table_ids {
658 assert!(all_table_ids.insert(*table_id));
659 }
660 }
661 }
662 }
663
664 let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
674 let table_id_to_split = *table_ids.first().unwrap();
675 let mut target_compaction_group_id = 0;
676 let result_vec = self
677 .split_compaction_group_impl(
678 parent_group_id,
679 table_ids,
680 table_id_to_split,
681 VirtualNode::ZERO,
682 partition_vnode_count,
683 )
684 .await?;
685 assert!(result_vec.len() <= 2);
686
687 let mut finish_move = false;
688 for (cg_id, table_ids_after_split) in result_vec {
689 if table_ids_after_split.contains(&table_id_to_split) {
690 target_compaction_group_id = cg_id;
691 }
692
693 if table_ids_after_split == table_ids {
694 finish_move = true;
695 }
696
697 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
698 }
699 check_table_ids_valid(&cg_id_to_table_ids);
700
701 if finish_move {
702 return Ok((target_compaction_group_id, cg_id_to_table_ids));
703 }
704
705 let table_id_to_split = *table_ids.last().unwrap();
708 let result_vec = self
709 .split_compaction_group_impl(
710 target_compaction_group_id,
711 table_ids,
712 table_id_to_split,
713 VirtualNode::MAX_REPRESENTABLE,
714 partition_vnode_count,
715 )
716 .await?;
717 assert!(result_vec.len() <= 2);
718 for (cg_id, table_ids_after_split) in result_vec {
719 if table_ids_after_split.contains(&table_id_to_split) {
720 target_compaction_group_id = cg_id;
721 }
722 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
723 }
724 check_table_ids_valid(&cg_id_to_table_ids);
725
726 Ok((target_compaction_group_id, cg_id_to_table_ids))
727 }
728}
729
730impl HummockManager {
731 pub async fn try_split_compaction_group(
733 &self,
734 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
735 group: CompactionGroupStatistic,
736 ) {
737 if group
738 .compaction_group_config
739 .compaction_config
740 .disable_auto_group_scheduling
741 .unwrap_or(false)
742 {
743 return;
744 }
745 for (table_id, table_size) in &group.table_statistic {
747 self.try_move_high_throughput_table_to_dedicated_cg(
748 table_write_throughput_statistic_manager,
749 *table_id,
750 table_size,
751 group.group_id,
752 )
753 .await;
754 }
755
756 self.try_split_huge_compaction_group(group).await;
758 }
759
760 pub async fn try_move_high_throughput_table_to_dedicated_cg(
762 &self,
763 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
764 table_id: u32,
765 _table_size: &u64,
766 parent_group_id: u64,
767 ) {
768 let mut table_throughput = table_write_throughput_statistic_manager
769 .get_table_throughput_descending(
770 table_id,
771 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
772 )
773 .peekable();
774
775 if table_throughput.peek().is_none() {
776 return;
777 }
778
779 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
780 table_throughput,
781 self.env.opts.table_high_write_throughput_threshold,
782 self.env
783 .opts
784 .table_stat_high_write_throughput_ratio_for_split,
785 );
786
787 if !is_high_write_throughput {
789 return;
790 }
791
792 let ret = self
793 .move_state_tables_to_dedicated_compaction_group(
794 parent_group_id,
795 &[table_id],
796 Some(self.env.opts.partition_vnode_count),
797 )
798 .await;
799 match ret {
800 Ok(split_result) => {
801 tracing::info!(
802 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
803 table_id,
804 parent_group_id,
805 self.env.opts.partition_vnode_count,
806 split_result
807 );
808 }
809 Err(e) => {
810 tracing::info!(
811 error = %e.as_report(),
812 "failed to split state table [{}] from group-{}",
813 table_id,
814 parent_group_id,
815 )
816 }
817 }
818 }
819
820 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
821 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
822 * self.env.opts.split_group_size_ratio) as u64;
823 let is_huge_hybrid_group =
824 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
826 let mut accumulated_size = 0;
827 let mut table_ids = Vec::default();
828 for (table_id, table_size) in &group.table_statistic {
829 accumulated_size += table_size;
830 table_ids.push(*table_id);
831 assert!(table_ids.is_sorted());
834 if accumulated_size * 2 > group_max_size {
835 let ret = self
836 .move_state_tables_to_dedicated_compaction_group(
837 group.group_id,
838 &table_ids,
839 None,
840 )
841 .await;
842 match ret {
843 Ok(split_result) => {
844 tracing::info!(
845 "split_huge_compaction_group success {:?}",
846 split_result
847 );
848 self.metrics
849 .split_compaction_group_count
850 .with_label_values(&[&group.group_id.to_string()])
851 .inc();
852 return;
853 }
854 Err(e) => {
855 tracing::error!(
856 error = %e.as_report(),
857 "failed to split_huge_compaction_group table {:?} from group-{}",
858 table_ids,
859 group.group_id
860 );
861
862 return;
863 }
864 }
865 }
866 }
867 }
868 }
869
870 pub async fn try_merge_compaction_group(
871 &self,
872 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
873 group: &CompactionGroupStatistic,
874 next_group: &CompactionGroupStatistic,
875 created_tables: &HashSet<u32>,
876 ) -> Result<()> {
877 GroupMergeValidator::validate_group_merge(
878 group,
879 next_group,
880 created_tables,
881 table_write_throughput_statistic_manager,
882 &self.env.opts,
883 &self.versioning,
884 )
885 .await?;
886
887 match self
888 .merge_compaction_group(group.group_id, next_group.group_id)
889 .await
890 {
891 Ok(()) => {
892 tracing::info!(
893 "merge group-{} to group-{}",
894 next_group.group_id,
895 group.group_id,
896 );
897
898 self.metrics
899 .merge_compaction_group_count
900 .with_label_values(&[&group.group_id.to_string()])
901 .inc();
902 }
903 Err(e) => {
904 tracing::info!(
905 error = %e.as_report(),
906 "failed to merge group-{} group-{}",
907 next_group.group_id,
908 group.group_id,
909 )
910 }
911 }
912
913 Ok(())
914 }
915}
916
917#[derive(Debug, Default)]
918struct GroupMergeValidator {}
919
920impl GroupMergeValidator {
921 pub fn is_table_high_write_throughput(
923 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
924 threshold: u64,
925 high_write_throughput_ratio: f64,
926 ) -> bool {
927 let mut sample_size = 0;
928 let mut high_write_throughput_count = 0;
929 for statistic in table_throughput {
930 sample_size += 1;
931 if statistic.throughput > threshold {
932 high_write_throughput_count += 1;
933 }
934 }
935
936 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
937 }
938
939 pub fn is_table_low_write_throughput(
940 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
941 threshold: u64,
942 low_write_throughput_ratio: f64,
943 ) -> bool {
944 let mut sample_size = 0;
945 let mut low_write_throughput_count = 0;
946 for statistic in table_throughput {
947 sample_size += 1;
948 if statistic.throughput <= threshold {
949 low_write_throughput_count += 1;
950 }
951 }
952
953 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
954 }
955
956 fn check_is_low_write_throughput_compaction_group(
957 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
958 group: &CompactionGroupStatistic,
959 opts: &Arc<MetaOpts>,
960 ) -> bool {
961 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
962 for table_id in group.table_statistic.keys() {
963 let mut table_throughput = table_write_throughput_statistic_manager
964 .get_table_throughput_descending(
965 *table_id,
966 opts.table_stat_throuput_window_seconds_for_merge as i64,
967 )
968 .peekable();
969 if table_throughput.peek().is_none() {
970 continue;
971 }
972
973 table_with_statistic.push(table_throughput);
974 }
975
976 if table_with_statistic.is_empty() {
978 return true;
979 }
980
981 table_with_statistic.into_iter().all(|table_throughput| {
983 Self::is_table_low_write_throughput(
984 table_throughput,
985 opts.table_low_write_throughput_threshold,
986 opts.table_stat_low_write_throughput_ratio_for_merge,
987 )
988 })
989 }
990
991 fn check_is_creating_compaction_group(
992 group: &CompactionGroupStatistic,
993 created_tables: &HashSet<u32>,
994 ) -> bool {
995 group
996 .table_statistic
997 .keys()
998 .any(|table_id| !created_tables.contains(table_id))
999 }
1000
1001 async fn validate_group_merge(
1002 group: &CompactionGroupStatistic,
1003 next_group: &CompactionGroupStatistic,
1004 created_tables: &HashSet<u32>,
1005 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1006 opts: &Arc<MetaOpts>,
1007 versioning: &MonitoredRwLock<Versioning>,
1008 ) -> Result<()> {
1009 if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1011 && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1012 || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1013 && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1014 {
1015 return Err(Error::CompactionGroup(format!(
1016 "group-{} and group-{} are both StaticCompactionGroupId",
1017 group.group_id, next_group.group_id
1018 )));
1019 }
1020
1021 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1022 return Err(Error::CompactionGroup(format!(
1023 "group-{} or group-{} is empty",
1024 group.group_id, next_group.group_id
1025 )));
1026 }
1027
1028 if group
1029 .compaction_group_config
1030 .compaction_config
1031 .disable_auto_group_scheduling
1032 .unwrap_or(false)
1033 || next_group
1034 .compaction_group_config
1035 .compaction_config
1036 .disable_auto_group_scheduling
1037 .unwrap_or(false)
1038 {
1039 return Err(Error::CompactionGroup(format!(
1040 "group-{} or group-{} disable_auto_group_scheduling",
1041 group.group_id, next_group.group_id
1042 )));
1043 }
1044
1045 if Self::check_is_creating_compaction_group(group, created_tables) {
1047 return Err(Error::CompactionGroup(format!(
1048 "Not Merge creating group {} next_group {}",
1049 group.group_id, next_group.group_id
1050 )));
1051 }
1052
1053 if !Self::check_is_low_write_throughput_compaction_group(
1055 table_write_throughput_statistic_manager,
1056 group,
1057 opts,
1058 ) {
1059 return Err(Error::CompactionGroup(format!(
1060 "Not Merge high throughput group {} next_group {}",
1061 group.group_id, next_group.group_id
1062 )));
1063 }
1064
1065 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1066 * opts.split_group_size_ratio) as u64;
1067
1068 if (group.group_size + next_group.group_size) > size_limit {
1069 return Err(Error::CompactionGroup(format!(
1070 "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1071 group.group_id,
1072 group.group_size,
1073 next_group.group_id,
1074 next_group.group_size,
1075 size_limit
1076 )));
1077 }
1078
1079 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1080 return Err(Error::CompactionGroup(format!(
1081 "Not Merge creating group {} next group {}",
1082 group.group_id, next_group.group_id
1083 )));
1084 }
1085
1086 if !Self::check_is_low_write_throughput_compaction_group(
1087 table_write_throughput_statistic_manager,
1088 next_group,
1089 opts,
1090 ) {
1091 return Err(Error::CompactionGroup(format!(
1092 "Not Merge high throughput group {} next group {}",
1093 group.group_id, next_group.group_id
1094 )));
1095 }
1096
1097 {
1098 let versioning_guard = versioning.read().await;
1100 let levels = &versioning_guard.current_version.levels;
1101 if !levels.contains_key(&group.group_id) {
1102 return Err(Error::CompactionGroup(format!(
1103 "Not Merge group {} not exist",
1104 group.group_id
1105 )));
1106 }
1107
1108 if !levels.contains_key(&next_group.group_id) {
1109 return Err(Error::CompactionGroup(format!(
1110 "Not Merge next group {} not exist",
1111 next_group.group_id
1112 )));
1113 }
1114
1115 let group_levels = versioning_guard
1116 .current_version
1117 .get_compaction_group_levels(group.group_id);
1118
1119 let next_group_levels = versioning_guard
1120 .current_version
1121 .get_compaction_group_levels(next_group.group_id);
1122
1123 let group_state = GroupStateValidator::group_state(
1124 group_levels,
1125 group.compaction_group_config.compaction_config().deref(),
1126 );
1127
1128 if group_state.is_write_stop() || group_state.is_emergency() {
1129 return Err(Error::CompactionGroup(format!(
1130 "Not Merge write limit group {} next group {}",
1131 group.group_id, next_group.group_id
1132 )));
1133 }
1134
1135 let next_group_state = GroupStateValidator::group_state(
1136 next_group_levels,
1137 next_group
1138 .compaction_group_config
1139 .compaction_config()
1140 .deref(),
1141 );
1142
1143 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1144 return Err(Error::CompactionGroup(format!(
1145 "Not Merge write limit next group {} group {}",
1146 next_group.group_id, group.group_id
1147 )));
1148 }
1149
1150 let l0_sub_level_count_after_merge =
1152 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1153 if GroupStateValidator::write_stop_l0_file_count(
1154 (l0_sub_level_count_after_merge as f64
1155 * opts.compaction_group_merge_dimension_threshold) as usize,
1156 group.compaction_group_config.compaction_config().deref(),
1157 ) {
1158 return Err(Error::CompactionGroup(format!(
1159 "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1160 group.group_id, next_group.group_id
1161 )));
1162 }
1163
1164 let l0_file_count_after_merge =
1165 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1166 if GroupStateValidator::write_stop_l0_file_count(
1167 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1168 as usize,
1169 group.compaction_group_config.compaction_config().deref(),
1170 ) {
1171 return Err(Error::CompactionGroup(format!(
1172 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1173 next_group.group_id, group.group_id
1174 )));
1175 }
1176
1177 let l0_size_after_merge =
1178 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1179
1180 if GroupStateValidator::write_stop_l0_size(
1181 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1182 as u64,
1183 group.compaction_group_config.compaction_config().deref(),
1184 ) {
1185 return Err(Error::CompactionGroup(format!(
1186 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1187 next_group.group_id, group.group_id
1188 )));
1189 }
1190
1191 if GroupStateValidator::emergency_l0_file_count(
1193 (l0_sub_level_count_after_merge as f64
1194 * opts.compaction_group_merge_dimension_threshold) as usize,
1195 group.compaction_group_config.compaction_config().deref(),
1196 ) {
1197 return Err(Error::CompactionGroup(format!(
1198 "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1199 group.group_id, next_group.group_id
1200 )));
1201 }
1202 }
1203
1204 Ok(())
1205 }
1206}