1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<TableId>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<TableId>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 #[expect(clippy::assertions_on_constants)]
122 {
123 assert!(cfg!(debug_assertions));
124 }
125 created_tables
126 } else {
127 match self.metadata_manager.get_created_table_ids().await {
128 Ok(created_tables) => HashSet::from_iter(created_tables),
129 Err(err) => {
130 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131 return Err(Error::CompactionGroup(format!(
132 "merge group_1 {} group_2 {} failed to fetch created table ids",
133 group_1, group_2
134 )));
135 }
136 }
137 };
138
139 fn contains_creating_table(
140 table_ids: &Vec<TableId>,
141 created_tables: &HashSet<TableId>,
142 ) -> bool {
143 table_ids
144 .iter()
145 .any(|table_id| !created_tables.contains(table_id))
146 }
147
148 if contains_creating_table(&member_table_ids_1, &created_tables)
150 || contains_creating_table(&member_table_ids_2, &created_tables)
151 {
152 return Err(Error::CompactionGroup(format!(
153 "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154 group_1, group_2, member_table_ids_1, member_table_ids_2
155 )));
156 }
157
158 let (left_group_id, right_group_id) =
160 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161 (group_1, group_2)
162 } else {
163 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164 (group_2, group_1)
165 };
166
167 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169 return Err(Error::CompactionGroup(format!(
170 "invalid merge group_1 {} group_2 {}",
171 left_group_id, right_group_id
172 )));
173 }
174
175 let combined_member_table_ids = member_table_ids_1
176 .iter()
177 .chain(member_table_ids_2.iter())
178 .collect_vec();
179 assert!(combined_member_table_ids.is_sorted());
180
181 let mut sst_id_set = HashSet::new();
183 for sst_id in versioning
184 .current_version
185 .get_sst_ids_by_group_id(left_group_id)
186 .chain(
187 versioning
188 .current_version
189 .get_sst_ids_by_group_id(right_group_id),
190 )
191 {
192 if !sst_id_set.insert(sst_id) {
193 return Err(Error::CompactionGroup(format!(
194 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195 left_group_id, right_group_id, sst_id
196 )));
197 }
198 }
199
200 {
202 let left_levels = versioning
203 .current_version
204 .get_compaction_group_levels(group_1);
205
206 let right_levels = versioning
207 .current_version
208 .get_compaction_group_levels(group_2);
209
210 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213 for level_idx in 1..=max_level {
214 let left_level = left_levels.get_level(level_idx);
215 let right_level = right_levels.get_level(level_idx);
216 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217 continue;
218 }
219
220 let left_last_sst = left_level.table_infos.last().unwrap().clone();
221 let right_first_sst = right_level.table_infos.first().unwrap().clone();
222 let left_sst_id = left_last_sst.sst_id;
223 let right_sst_id = right_first_sst.sst_id;
224 let left_obj_id = left_last_sst.object_id;
225 let right_obj_id = right_first_sst.object_id;
226
227 if !can_concat(&[left_last_sst, right_first_sst]) {
229 return Err(Error::CompactionGroup(format!(
230 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231 left_group_id,
232 right_group_id,
233 level_idx,
234 left_sst_id,
235 right_sst_id,
236 left_obj_id,
237 right_obj_id
238 )));
239 }
240 }
241 }
242
243 let mut version = HummockVersionTransaction::new(
244 &mut versioning.current_version,
245 &mut versioning.hummock_version_deltas,
246 self.env.notification_manager(),
247 None,
248 &self.metrics,
249 );
250 let mut new_version_delta = version.new_delta();
251
252 let target_compaction_group_id = {
253 new_version_delta.group_deltas.insert(
255 left_group_id,
256 GroupDeltas {
257 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258 left_group_id,
259 right_group_id,
260 })],
261 },
262 );
263 left_group_id
264 };
265
266 new_version_delta.with_latest_version(|version, new_version_delta| {
269 for &table_id in combined_member_table_ids {
270 let info = version
271 .state_table_info
272 .info()
273 .get(&table_id)
274 .expect("have check exist previously");
275 assert!(
276 new_version_delta
277 .state_table_info_delta
278 .insert(
279 table_id,
280 PbStateTableInfoDelta {
281 committed_epoch: info.committed_epoch,
282 compaction_group_id: target_compaction_group_id,
283 }
284 )
285 .is_none()
286 );
287 }
288 });
289
290 {
291 let mut compaction_group_manager = self.compaction_group_manager.write().await;
292 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
293
294 {
296 let right_group_max_level = new_version_delta
297 .latest_version()
298 .get_compaction_group_levels(right_group_id)
299 .levels
300 .len();
301
302 remove_compaction_group_in_sst_stat(
303 &self.metrics,
304 right_group_id,
305 right_group_max_level,
306 );
307 }
308
309 {
311 if let Err(err) = compaction_groups_txn.update_compaction_config(
312 &[left_group_id],
313 &[MutableConfig::SplitWeightByVnode(0)], ) {
315 tracing::error!(
316 error = %err.as_report(),
317 "failed to update compaction config for group-{}",
318 left_group_id
319 );
320 }
321 }
322
323 new_version_delta.pre_apply();
324
325 compaction_groups_txn.remove(right_group_id);
327 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
328 }
329
330 versioning.mark_next_time_travel_version_snapshot();
332
333 let mut canceled_tasks = vec![];
335 let compact_task_assignments =
338 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
339 compact_task_assignments
340 .into_iter()
341 .for_each(|task_assignment| {
342 if let Some(task) = task_assignment.compact_task.as_ref() {
343 assert_eq!(task.compaction_group_id, right_group_id);
344 canceled_tasks.push(ReportTask {
345 task_id: task.task_id,
346 task_status: TaskStatus::ManualCanceled,
347 table_stats_change: HashMap::default(),
348 sorted_output_ssts: vec![],
349 object_timestamps: HashMap::default(),
350 });
351 }
352 });
353
354 if !canceled_tasks.is_empty() {
355 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
356 .await?;
357 }
358
359 self.metrics
360 .merge_compaction_group_count
361 .with_label_values(&[&left_group_id.to_string()])
362 .inc();
363
364 Ok(())
365 }
366}
367
368impl HummockManager {
369 async fn split_compaction_group_impl(
381 &self,
382 parent_group_id: CompactionGroupId,
383 split_table_ids: &[StateTableId],
384 table_id_to_split: StateTableId,
385 vnode_to_split: VirtualNode,
386 partition_vnode_count: Option<u32>,
387 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
388 let mut result = vec![];
389 let compaction_guard = self.compaction.write().await;
390 let mut versioning_guard = self.versioning.write().await;
391 let versioning = versioning_guard.deref_mut();
392 if !versioning
394 .current_version
395 .levels
396 .contains_key(&parent_group_id)
397 {
398 return Err(Error::CompactionGroup(format!(
399 "invalid group {}",
400 parent_group_id
401 )));
402 }
403
404 let member_table_ids = versioning
405 .current_version
406 .state_table_info
407 .compaction_group_member_table_ids(parent_group_id)
408 .iter()
409 .copied()
410 .collect::<BTreeSet<_>>();
411
412 if !member_table_ids.contains(&table_id_to_split) {
413 return Err(Error::CompactionGroup(format!(
414 "table {} doesn't in group {}",
415 table_id_to_split, parent_group_id
416 )));
417 }
418
419 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
420
421 let table_ids = member_table_ids.into_iter().collect_vec();
423 if table_ids == split_table_ids {
424 return Err(Error::CompactionGroup(format!(
425 "invalid split attempt for group {}: all member tables are moved",
426 parent_group_id
427 )));
428 }
429 let (table_ids_left, table_ids_right) =
431 group_split::split_table_ids_with_table_id_and_vnode(
432 &table_ids,
433 split_full_key.user_key.table_id,
434 split_full_key.user_key.get_vnode_id(),
435 );
436 if table_ids_left.is_empty() || table_ids_right.is_empty() {
437 if !table_ids_left.is_empty() {
439 result.push((parent_group_id, table_ids_left));
440 }
441
442 if !table_ids_right.is_empty() {
443 result.push((parent_group_id, table_ids_right));
444 }
445 return Ok(result);
446 }
447
448 result.push((parent_group_id, table_ids_left));
449
450 let split_key: Bytes = split_full_key.encode().into();
451
452 let mut version = HummockVersionTransaction::new(
453 &mut versioning.current_version,
454 &mut versioning.hummock_version_deltas,
455 self.env.notification_manager(),
456 None,
457 &self.metrics,
458 );
459 let mut new_version_delta = version.new_delta();
460
461 let split_sst_count = new_version_delta
462 .latest_version()
463 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
464
465 let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
466 let (new_compaction_group_id, config) = {
467 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
469 let config = self
471 .compaction_group_manager
472 .read()
473 .await
474 .default_compaction_config()
475 .as_ref()
476 .clone();
477
478 #[expect(deprecated)]
479 new_version_delta.group_deltas.insert(
481 new_compaction_group_id,
482 GroupDeltas {
483 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
484 group_config: Some(config.clone()),
485 group_id: new_compaction_group_id,
486 parent_group_id,
487 new_sst_start_id: new_sst_start_id.inner(),
488 table_ids: vec![],
489 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
491 }))],
492 },
493 );
494 (new_compaction_group_id, config)
495 };
496
497 new_version_delta.with_latest_version(|version, new_version_delta| {
498 for &table_id in &table_ids_right {
499 let info = version
500 .state_table_info
501 .info()
502 .get(&table_id)
503 .expect("have check exist previously");
504 assert!(
505 new_version_delta
506 .state_table_info_delta
507 .insert(
508 table_id,
509 PbStateTableInfoDelta {
510 committed_epoch: info.committed_epoch,
511 compaction_group_id: new_compaction_group_id,
512 }
513 )
514 .is_none()
515 );
516 }
517 });
518
519 result.push((new_compaction_group_id, table_ids_right));
520
521 {
522 let mut compaction_group_manager = self.compaction_group_manager.write().await;
523 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
524 compaction_groups_txn
525 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
526
527 for (cg_id, table_ids) in &result {
531 if let Some(partition_vnode_count) = partition_vnode_count
533 && table_ids.len() == 1
534 && table_ids == split_table_ids
535 && let Err(err) = compaction_groups_txn.update_compaction_config(
536 &[*cg_id],
537 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
538 )
539 {
540 tracing::error!(
541 error = %err.as_report(),
542 "failed to update compaction config for group-{}",
543 cg_id
544 );
545 }
546 }
547
548 new_version_delta.pre_apply();
549 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
550 }
551 versioning.mark_next_time_travel_version_snapshot();
553
554 let mut canceled_tasks = vec![];
557 let compact_task_assignments =
558 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
559 let levels = versioning
560 .current_version
561 .get_compaction_group_levels(parent_group_id);
562 compact_task_assignments
563 .into_iter()
564 .for_each(|task_assignment| {
565 if let Some(task) = task_assignment.compact_task.as_ref() {
566 let is_expired = is_compaction_task_expired(
567 task.compaction_group_version_id,
568 levels.compaction_group_version_id,
569 );
570 if is_expired {
571 canceled_tasks.push(ReportTask {
572 task_id: task.task_id,
573 task_status: TaskStatus::ManualCanceled,
574 table_stats_change: HashMap::default(),
575 sorted_output_ssts: vec![],
576 object_timestamps: HashMap::default(),
577 });
578 }
579 }
580 });
581
582 if !canceled_tasks.is_empty() {
583 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
584 .await?;
585 }
586
587 self.metrics
588 .split_compaction_group_count
589 .with_label_values(&[&parent_group_id.to_string()])
590 .inc();
591
592 Ok(result)
593 }
594
595 pub async fn move_state_tables_to_dedicated_compaction_group(
598 &self,
599 parent_group_id: CompactionGroupId,
600 table_ids: &[StateTableId],
601 partition_vnode_count: Option<u32>,
602 ) -> Result<(
603 CompactionGroupId,
604 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
605 )> {
606 if table_ids.is_empty() {
607 return Err(Error::CompactionGroup(
608 "table_ids must not be empty".to_owned(),
609 ));
610 }
611
612 if !table_ids.is_sorted() {
613 return Err(Error::CompactionGroup(
614 "table_ids must be sorted".to_owned(),
615 ));
616 }
617
618 let parent_table_ids = {
619 let versioning_guard = self.versioning.read().await;
620 versioning_guard
621 .current_version
622 .state_table_info
623 .compaction_group_member_table_ids(parent_group_id)
624 .iter()
625 .copied()
626 .collect_vec()
627 };
628
629 if parent_table_ids == table_ids {
630 return Err(Error::CompactionGroup(format!(
631 "invalid split attempt for group {}: all member tables are moved",
632 parent_group_id
633 )));
634 }
635
636 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<TableId>>) {
637 {
639 cg_id_to_table_ids
640 .iter()
641 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
642 }
643
644 {
646 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
647 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
648 assert!(table_table_ids_vec.concat().is_sorted());
649 }
650
651 {
653 let mut all_table_ids = HashSet::new();
654 for table_ids in cg_id_to_table_ids.values() {
655 for table_id in table_ids {
656 assert!(all_table_ids.insert(*table_id));
657 }
658 }
659 }
660 }
661
662 let mut cg_id_to_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
672 let table_id_to_split = *table_ids.first().unwrap();
673 let mut target_compaction_group_id = 0;
674 let result_vec = self
675 .split_compaction_group_impl(
676 parent_group_id,
677 table_ids,
678 table_id_to_split,
679 VirtualNode::ZERO,
680 partition_vnode_count,
681 )
682 .await?;
683 assert!(result_vec.len() <= 2);
684
685 let mut finish_move = false;
686 for (cg_id, table_ids_after_split) in result_vec {
687 if table_ids_after_split.contains(&table_id_to_split) {
688 target_compaction_group_id = cg_id;
689 }
690
691 if table_ids_after_split == table_ids {
692 finish_move = true;
693 }
694
695 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
696 }
697 check_table_ids_valid(&cg_id_to_table_ids);
698
699 if finish_move {
700 return Ok((target_compaction_group_id, cg_id_to_table_ids));
701 }
702
703 let table_id_to_split = *table_ids.last().unwrap();
706 let result_vec = self
707 .split_compaction_group_impl(
708 target_compaction_group_id,
709 table_ids,
710 table_id_to_split,
711 VirtualNode::MAX_REPRESENTABLE,
712 partition_vnode_count,
713 )
714 .await?;
715 assert!(result_vec.len() <= 2);
716 for (cg_id, table_ids_after_split) in result_vec {
717 if table_ids_after_split.contains(&table_id_to_split) {
718 target_compaction_group_id = cg_id;
719 }
720 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
721 }
722 check_table_ids_valid(&cg_id_to_table_ids);
723
724 Ok((target_compaction_group_id, cg_id_to_table_ids))
725 }
726}
727
728impl HummockManager {
729 pub async fn try_split_compaction_group(
731 &self,
732 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
733 group: CompactionGroupStatistic,
734 ) {
735 if group
736 .compaction_group_config
737 .compaction_config
738 .disable_auto_group_scheduling
739 .unwrap_or(false)
740 {
741 return;
742 }
743 for (table_id, table_size) in &group.table_statistic {
745 self.try_move_high_throughput_table_to_dedicated_cg(
746 table_write_throughput_statistic_manager,
747 *table_id,
748 table_size,
749 group.group_id,
750 )
751 .await;
752 }
753
754 self.try_split_huge_compaction_group(group).await;
756 }
757
758 pub async fn try_move_high_throughput_table_to_dedicated_cg(
760 &self,
761 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
762 table_id: TableId,
763 _table_size: &u64,
764 parent_group_id: u64,
765 ) {
766 let mut table_throughput = table_write_throughput_statistic_manager
767 .get_table_throughput_descending(
768 table_id,
769 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
770 )
771 .peekable();
772
773 if table_throughput.peek().is_none() {
774 return;
775 }
776
777 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
778 table_throughput,
779 self.env.opts.table_high_write_throughput_threshold,
780 self.env
781 .opts
782 .table_stat_high_write_throughput_ratio_for_split,
783 );
784
785 if !is_high_write_throughput {
787 return;
788 }
789
790 let ret = self
791 .move_state_tables_to_dedicated_compaction_group(
792 parent_group_id,
793 &[table_id],
794 Some(self.env.opts.partition_vnode_count),
795 )
796 .await;
797 match ret {
798 Ok(split_result) => {
799 tracing::info!(
800 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
801 table_id,
802 parent_group_id,
803 self.env.opts.partition_vnode_count,
804 split_result
805 );
806 }
807 Err(e) => {
808 tracing::info!(
809 error = %e.as_report(),
810 "failed to split state table [{}] from group-{}",
811 table_id,
812 parent_group_id,
813 )
814 }
815 }
816 }
817
818 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
819 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
820 * self.env.opts.split_group_size_ratio) as u64;
821 let is_huge_hybrid_group =
822 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
824 let mut accumulated_size = 0;
825 let mut table_ids = Vec::default();
826 for (table_id, table_size) in &group.table_statistic {
827 accumulated_size += table_size;
828 table_ids.push(*table_id);
829 assert!(table_ids.is_sorted());
832 if accumulated_size * 2 > group_max_size {
833 let ret = self
834 .move_state_tables_to_dedicated_compaction_group(
835 group.group_id,
836 &table_ids,
837 None,
838 )
839 .await;
840 match ret {
841 Ok(split_result) => {
842 tracing::info!(
843 "split_huge_compaction_group success {:?}",
844 split_result
845 );
846 self.metrics
847 .split_compaction_group_count
848 .with_label_values(&[&group.group_id.to_string()])
849 .inc();
850 return;
851 }
852 Err(e) => {
853 tracing::error!(
854 error = %e.as_report(),
855 "failed to split_huge_compaction_group table {:?} from group-{}",
856 table_ids,
857 group.group_id
858 );
859
860 return;
861 }
862 }
863 }
864 }
865 }
866 }
867
868 pub async fn try_merge_compaction_group(
869 &self,
870 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
871 group: &CompactionGroupStatistic,
872 next_group: &CompactionGroupStatistic,
873 created_tables: &HashSet<TableId>,
874 ) -> Result<()> {
875 GroupMergeValidator::validate_group_merge(
876 group,
877 next_group,
878 created_tables,
879 table_write_throughput_statistic_manager,
880 &self.env.opts,
881 &self.versioning,
882 )
883 .await?;
884
885 match self
886 .merge_compaction_group(group.group_id, next_group.group_id)
887 .await
888 {
889 Ok(()) => {
890 tracing::info!(
891 "merge group-{} to group-{}",
892 next_group.group_id,
893 group.group_id,
894 );
895
896 self.metrics
897 .merge_compaction_group_count
898 .with_label_values(&[&group.group_id.to_string()])
899 .inc();
900 }
901 Err(e) => {
902 tracing::info!(
903 error = %e.as_report(),
904 "failed to merge group-{} group-{}",
905 next_group.group_id,
906 group.group_id,
907 )
908 }
909 }
910
911 Ok(())
912 }
913}
914
915#[derive(Debug, Default)]
916struct GroupMergeValidator {}
917
918impl GroupMergeValidator {
919 pub fn is_table_high_write_throughput(
921 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
922 threshold: u64,
923 high_write_throughput_ratio: f64,
924 ) -> bool {
925 let mut sample_size = 0;
926 let mut high_write_throughput_count = 0;
927 for statistic in table_throughput {
928 sample_size += 1;
929 if statistic.throughput > threshold {
930 high_write_throughput_count += 1;
931 }
932 }
933
934 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
935 }
936
937 pub fn is_table_low_write_throughput(
938 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
939 threshold: u64,
940 low_write_throughput_ratio: f64,
941 ) -> bool {
942 let mut sample_size = 0;
943 let mut low_write_throughput_count = 0;
944 for statistic in table_throughput {
945 sample_size += 1;
946 if statistic.throughput <= threshold {
947 low_write_throughput_count += 1;
948 }
949 }
950
951 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
952 }
953
954 fn check_is_low_write_throughput_compaction_group(
955 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
956 group: &CompactionGroupStatistic,
957 opts: &Arc<MetaOpts>,
958 ) -> bool {
959 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
960 for table_id in group.table_statistic.keys() {
961 let mut table_throughput = table_write_throughput_statistic_manager
962 .get_table_throughput_descending(
963 *table_id,
964 opts.table_stat_throuput_window_seconds_for_merge as i64,
965 )
966 .peekable();
967 if table_throughput.peek().is_none() {
968 continue;
969 }
970
971 table_with_statistic.push(table_throughput);
972 }
973
974 if table_with_statistic.is_empty() {
976 return true;
977 }
978
979 table_with_statistic.into_iter().all(|table_throughput| {
981 Self::is_table_low_write_throughput(
982 table_throughput,
983 opts.table_low_write_throughput_threshold,
984 opts.table_stat_low_write_throughput_ratio_for_merge,
985 )
986 })
987 }
988
989 fn check_is_creating_compaction_group(
990 group: &CompactionGroupStatistic,
991 created_tables: &HashSet<TableId>,
992 ) -> bool {
993 group
994 .table_statistic
995 .keys()
996 .any(|table_id| !created_tables.contains(table_id))
997 }
998
999 async fn validate_group_merge(
1000 group: &CompactionGroupStatistic,
1001 next_group: &CompactionGroupStatistic,
1002 created_tables: &HashSet<TableId>,
1003 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1004 opts: &Arc<MetaOpts>,
1005 versioning: &MonitoredRwLock<Versioning>,
1006 ) -> Result<()> {
1007 if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1009 && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1010 || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1011 && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1012 {
1013 return Err(Error::CompactionGroup(format!(
1014 "group-{} and group-{} are both StaticCompactionGroupId",
1015 group.group_id, next_group.group_id
1016 )));
1017 }
1018
1019 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1020 return Err(Error::CompactionGroup(format!(
1021 "group-{} or group-{} is empty",
1022 group.group_id, next_group.group_id
1023 )));
1024 }
1025
1026 if group
1027 .compaction_group_config
1028 .compaction_config
1029 .disable_auto_group_scheduling
1030 .unwrap_or(false)
1031 || next_group
1032 .compaction_group_config
1033 .compaction_config
1034 .disable_auto_group_scheduling
1035 .unwrap_or(false)
1036 {
1037 return Err(Error::CompactionGroup(format!(
1038 "group-{} or group-{} disable_auto_group_scheduling",
1039 group.group_id, next_group.group_id
1040 )));
1041 }
1042
1043 if Self::check_is_creating_compaction_group(group, created_tables) {
1045 return Err(Error::CompactionGroup(format!(
1046 "Not Merge creating group {} next_group {}",
1047 group.group_id, next_group.group_id
1048 )));
1049 }
1050
1051 if !Self::check_is_low_write_throughput_compaction_group(
1053 table_write_throughput_statistic_manager,
1054 group,
1055 opts,
1056 ) {
1057 return Err(Error::CompactionGroup(format!(
1058 "Not Merge high throughput group {} next_group {}",
1059 group.group_id, next_group.group_id
1060 )));
1061 }
1062
1063 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1064 * opts.split_group_size_ratio) as u64;
1065
1066 if (group.group_size + next_group.group_size) > size_limit {
1067 return Err(Error::CompactionGroup(format!(
1068 "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1069 group.group_id,
1070 group.group_size,
1071 next_group.group_id,
1072 next_group.group_size,
1073 size_limit
1074 )));
1075 }
1076
1077 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1078 return Err(Error::CompactionGroup(format!(
1079 "Not Merge creating group {} next group {}",
1080 group.group_id, next_group.group_id
1081 )));
1082 }
1083
1084 if !Self::check_is_low_write_throughput_compaction_group(
1085 table_write_throughput_statistic_manager,
1086 next_group,
1087 opts,
1088 ) {
1089 return Err(Error::CompactionGroup(format!(
1090 "Not Merge high throughput group {} next group {}",
1091 group.group_id, next_group.group_id
1092 )));
1093 }
1094
1095 {
1096 let versioning_guard = versioning.read().await;
1098 let levels = &versioning_guard.current_version.levels;
1099 if !levels.contains_key(&group.group_id) {
1100 return Err(Error::CompactionGroup(format!(
1101 "Not Merge group {} not exist",
1102 group.group_id
1103 )));
1104 }
1105
1106 if !levels.contains_key(&next_group.group_id) {
1107 return Err(Error::CompactionGroup(format!(
1108 "Not Merge next group {} not exist",
1109 next_group.group_id
1110 )));
1111 }
1112
1113 let group_levels = versioning_guard
1114 .current_version
1115 .get_compaction_group_levels(group.group_id);
1116
1117 let next_group_levels = versioning_guard
1118 .current_version
1119 .get_compaction_group_levels(next_group.group_id);
1120
1121 let group_state = GroupStateValidator::group_state(
1122 group_levels,
1123 group.compaction_group_config.compaction_config().deref(),
1124 );
1125
1126 if group_state.is_write_stop() || group_state.is_emergency() {
1127 return Err(Error::CompactionGroup(format!(
1128 "Not Merge write limit group {} next group {}",
1129 group.group_id, next_group.group_id
1130 )));
1131 }
1132
1133 let next_group_state = GroupStateValidator::group_state(
1134 next_group_levels,
1135 next_group
1136 .compaction_group_config
1137 .compaction_config()
1138 .deref(),
1139 );
1140
1141 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1142 return Err(Error::CompactionGroup(format!(
1143 "Not Merge write limit next group {} group {}",
1144 next_group.group_id, group.group_id
1145 )));
1146 }
1147
1148 let l0_sub_level_count_after_merge =
1150 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1151 if GroupStateValidator::write_stop_l0_file_count(
1152 (l0_sub_level_count_after_merge as f64
1153 * opts.compaction_group_merge_dimension_threshold) as usize,
1154 group.compaction_group_config.compaction_config().deref(),
1155 ) {
1156 return Err(Error::CompactionGroup(format!(
1157 "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1158 group.group_id, next_group.group_id
1159 )));
1160 }
1161
1162 let l0_file_count_after_merge =
1163 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1164 if GroupStateValidator::write_stop_l0_file_count(
1165 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1166 as usize,
1167 group.compaction_group_config.compaction_config().deref(),
1168 ) {
1169 return Err(Error::CompactionGroup(format!(
1170 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1171 next_group.group_id, group.group_id
1172 )));
1173 }
1174
1175 let l0_size_after_merge =
1176 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1177
1178 if GroupStateValidator::write_stop_l0_size(
1179 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1180 as u64,
1181 group.compaction_group_config.compaction_config().deref(),
1182 ) {
1183 return Err(Error::CompactionGroup(format!(
1184 "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1185 next_group.group_id, group.group_id
1186 )));
1187 }
1188
1189 if GroupStateValidator::emergency_l0_file_count(
1191 (l0_sub_level_count_after_merge as f64
1192 * opts.compaction_group_merge_dimension_threshold) as usize,
1193 group.compaction_group_config.compaction_config().deref(),
1194 ) {
1195 return Err(Error::CompactionGroup(format!(
1196 "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1197 group.group_id, next_group.group_id
1198 )));
1199 }
1200 }
1201
1202 Ok(())
1203 }
1204}