1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26 StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33 CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
44use crate::hummock::table_write_throughput_statistic::{
45 TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50 pub async fn merge_compaction_group(
51 &self,
52 group_1: CompactionGroupId,
53 group_2: CompactionGroupId,
54 ) -> Result<()> {
55 self.merge_compaction_group_impl(group_1, group_2, None)
56 .await
57 }
58
59 pub async fn merge_compaction_group_for_test(
60 &self,
61 group_1: CompactionGroupId,
62 group_2: CompactionGroupId,
63 created_tables: HashSet<TableId>,
64 ) -> Result<()> {
65 self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66 .await
67 }
68
69 pub async fn merge_compaction_group_impl(
70 &self,
71 group_1: CompactionGroupId,
72 group_2: CompactionGroupId,
73 created_tables: Option<HashSet<TableId>>,
74 ) -> Result<()> {
75 let compaction_guard = self.compaction.write().await;
76 let mut versioning_guard = self.versioning.write().await;
77 let versioning = versioning_guard.deref_mut();
78 if !versioning.current_version.levels.contains_key(&group_1) {
80 return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81 }
82
83 if !versioning.current_version.levels.contains_key(&group_2) {
84 return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85 }
86
87 let state_table_info = versioning.current_version.state_table_info.clone();
88 let mut member_table_ids_1 = state_table_info
89 .compaction_group_member_table_ids(group_1)
90 .iter()
91 .cloned()
92 .collect_vec();
93
94 if member_table_ids_1.is_empty() {
95 return Err(Error::CompactionGroup(format!(
96 "group_1 {} is empty",
97 group_1
98 )));
99 }
100
101 let mut member_table_ids_2 = state_table_info
102 .compaction_group_member_table_ids(group_2)
103 .iter()
104 .cloned()
105 .collect_vec();
106
107 if member_table_ids_2.is_empty() {
108 return Err(Error::CompactionGroup(format!(
109 "group_2 {} is empty",
110 group_2
111 )));
112 }
113
114 debug_assert!(!member_table_ids_1.is_empty());
115 debug_assert!(!member_table_ids_2.is_empty());
116 assert!(member_table_ids_1.is_sorted());
117 assert!(member_table_ids_2.is_sorted());
118
119 let created_tables = if let Some(created_tables) = created_tables {
120 #[expect(clippy::assertions_on_constants)]
122 {
123 assert!(cfg!(debug_assertions));
124 }
125 created_tables
126 } else {
127 match self.metadata_manager.get_created_table_ids().await {
128 Ok(created_tables) => HashSet::from_iter(created_tables),
129 Err(err) => {
130 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131 return Err(Error::CompactionGroup(format!(
132 "merge group_1 {} group_2 {} failed to fetch created table ids",
133 group_1, group_2
134 )));
135 }
136 }
137 };
138
139 fn contains_creating_table(
140 table_ids: &Vec<TableId>,
141 created_tables: &HashSet<TableId>,
142 ) -> bool {
143 table_ids
144 .iter()
145 .any(|table_id| !created_tables.contains(table_id))
146 }
147
148 if contains_creating_table(&member_table_ids_1, &created_tables)
150 || contains_creating_table(&member_table_ids_2, &created_tables)
151 {
152 return Err(Error::CompactionGroup(format!(
153 "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154 group_1, group_2, member_table_ids_1, member_table_ids_2
155 )));
156 }
157
158 let (left_group_id, right_group_id) =
160 if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161 (group_1, group_2)
162 } else {
163 std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164 (group_2, group_1)
165 };
166
167 if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
172 return Err(Error::CompactionGroup(format!(
173 "invalid merge group_1 {} group_2 {}: table id ranges overlap",
174 left_group_id, right_group_id
175 )));
176 }
177
178 let combined_member_table_ids = member_table_ids_1
179 .iter()
180 .chain(member_table_ids_2.iter())
181 .collect_vec();
182 assert!(combined_member_table_ids.is_sorted());
183
184 let mut sst_id_set = HashSet::new();
186 for sst_id in versioning
187 .current_version
188 .get_sst_ids_by_group_id(left_group_id)
189 .chain(
190 versioning
191 .current_version
192 .get_sst_ids_by_group_id(right_group_id),
193 )
194 {
195 if !sst_id_set.insert(sst_id) {
196 return Err(Error::CompactionGroup(format!(
197 "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
198 left_group_id, right_group_id, sst_id
199 )));
200 }
201 }
202
203 {
205 let left_levels = versioning
206 .current_version
207 .get_compaction_group_levels(group_1);
208
209 let right_levels = versioning
210 .current_version
211 .get_compaction_group_levels(group_2);
212
213 let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
216 for level_idx in 1..=max_level {
217 let left_level = left_levels.get_level(level_idx);
218 let right_level = right_levels.get_level(level_idx);
219 if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
220 continue;
221 }
222
223 let left_last_sst = left_level.table_infos.last().unwrap().clone();
224 let right_first_sst = right_level.table_infos.first().unwrap().clone();
225 let left_sst_id = left_last_sst.sst_id;
226 let right_sst_id = right_first_sst.sst_id;
227 let left_obj_id = left_last_sst.object_id;
228 let right_obj_id = right_first_sst.object_id;
229
230 if !can_concat(&[left_last_sst, right_first_sst]) {
232 return Err(Error::CompactionGroup(format!(
233 "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
234 left_group_id,
235 right_group_id,
236 level_idx,
237 left_sst_id,
238 right_sst_id,
239 left_obj_id,
240 right_obj_id
241 )));
242 }
243 }
244 }
245
246 let mut version = HummockVersionTransaction::new(
247 &mut versioning.current_version,
248 &mut versioning.hummock_version_deltas,
249 &mut versioning.table_change_log,
250 self.env.notification_manager(),
251 None,
252 &self.metrics,
253 &self.env.opts,
254 );
255 let mut new_version_delta = version.new_delta();
256
257 let target_compaction_group_id = {
258 new_version_delta.group_deltas.insert(
260 left_group_id,
261 GroupDeltas {
262 group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
263 left_group_id,
264 right_group_id,
265 })],
266 },
267 );
268 left_group_id
269 };
270
271 new_version_delta.with_latest_version(|version, new_version_delta| {
274 for &table_id in combined_member_table_ids {
275 let info = version
276 .state_table_info
277 .info()
278 .get(&table_id)
279 .expect("have check exist previously");
280 assert!(
281 new_version_delta
282 .state_table_info_delta
283 .insert(
284 table_id,
285 PbStateTableInfoDelta {
286 committed_epoch: info.committed_epoch,
287 compaction_group_id: target_compaction_group_id,
288 }
289 )
290 .is_none()
291 );
292 }
293 });
294
295 {
296 let mut compaction_group_manager = self.compaction_group_manager.write().await;
297 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
298
299 {
301 let right_group_max_level = new_version_delta
302 .latest_version()
303 .get_compaction_group_levels(right_group_id)
304 .levels
305 .len();
306
307 remove_compaction_group_in_sst_stat(
308 &self.metrics,
309 right_group_id,
310 right_group_max_level,
311 );
312 }
313
314 self.compaction_state
316 .remove_compaction_group(right_group_id);
317
318 {
320 if let Err(err) = compaction_groups_txn.update_compaction_config(
321 &[left_group_id],
322 &[MutableConfig::SplitWeightByVnode(0)], ) {
324 tracing::error!(
325 error = %err.as_report(),
326 "failed to update compaction config for group-{}",
327 left_group_id
328 );
329 }
330 }
331
332 new_version_delta.pre_apply();
333
334 compaction_groups_txn.remove(right_group_id);
336 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
337 }
338
339 versioning.mark_next_time_travel_version_snapshot();
341
342 let mut canceled_tasks = vec![];
344 let compact_task_assignments =
347 compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
348 compact_task_assignments
349 .into_iter()
350 .for_each(|task_assignment| {
351 if let Some(task) = task_assignment.compact_task.as_ref() {
352 assert_eq!(task.compaction_group_id, right_group_id);
353 canceled_tasks.push(ReportTask {
354 task_id: task.task_id,
355 task_status: TaskStatus::ManualCanceled,
356 table_stats_change: HashMap::default(),
357 sorted_output_ssts: vec![],
358 object_timestamps: HashMap::default(),
359 });
360 }
361 });
362
363 if !canceled_tasks.is_empty() {
364 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
365 .await?;
366 }
367
368 self.metrics
369 .merge_compaction_group_count
370 .with_label_values(&[&left_group_id.to_string()])
371 .inc();
372
373 Ok(())
374 }
375}
376
377impl HummockManager {
378 async fn split_compaction_group_impl(
390 &self,
391 parent_group_id: CompactionGroupId,
392 split_table_ids: &[StateTableId],
393 table_id_to_split: StateTableId,
394 vnode_to_split: VirtualNode,
395 partition_vnode_count: Option<u32>,
396 ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
397 let mut result = vec![];
398 let compaction_guard = self.compaction.write().await;
399 let mut versioning_guard = self.versioning.write().await;
400 let versioning = versioning_guard.deref_mut();
401 if !versioning
403 .current_version
404 .levels
405 .contains_key(&parent_group_id)
406 {
407 return Err(Error::CompactionGroup(format!(
408 "invalid group {}",
409 parent_group_id
410 )));
411 }
412
413 let member_table_ids = versioning
414 .current_version
415 .state_table_info
416 .compaction_group_member_table_ids(parent_group_id)
417 .iter()
418 .copied()
419 .collect::<BTreeSet<_>>();
420
421 if !member_table_ids.contains(&table_id_to_split) {
422 return Err(Error::CompactionGroup(format!(
423 "table {} doesn't in group {}",
424 table_id_to_split, parent_group_id
425 )));
426 }
427
428 let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
429
430 let table_ids = member_table_ids.into_iter().collect_vec();
432 if table_ids == split_table_ids {
433 return Err(Error::CompactionGroup(format!(
434 "invalid split attempt for group {}: all member tables are moved",
435 parent_group_id
436 )));
437 }
438 let (table_ids_left, table_ids_right) =
440 group_split::split_table_ids_with_table_id_and_vnode(
441 &table_ids,
442 split_full_key.user_key.table_id,
443 split_full_key.user_key.get_vnode_id(),
444 );
445 if table_ids_left.is_empty() || table_ids_right.is_empty() {
446 if !table_ids_left.is_empty() {
448 result.push((parent_group_id, table_ids_left));
449 }
450
451 if !table_ids_right.is_empty() {
452 result.push((parent_group_id, table_ids_right));
453 }
454 return Ok(result);
455 }
456
457 result.push((parent_group_id, table_ids_left));
458
459 let split_key: Bytes = split_full_key.encode().into();
460
461 let mut version = HummockVersionTransaction::new(
462 &mut versioning.current_version,
463 &mut versioning.hummock_version_deltas,
464 &mut versioning.table_change_log,
465 self.env.notification_manager(),
466 None,
467 &self.metrics,
468 &self.env.opts,
469 );
470 let mut new_version_delta = version.new_delta();
471
472 let split_sst_count = new_version_delta
473 .latest_version()
474 .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
475
476 let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
477 let (new_compaction_group_id, config) = {
478 let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
480 let config = self
482 .compaction_group_manager
483 .read()
484 .await
485 .try_get_compaction_group_config(parent_group_id)
486 .ok_or_else(|| {
487 Error::CompactionGroup(format!(
488 "parent group {} config not found",
489 parent_group_id
490 ))
491 })?
492 .compaction_config()
493 .as_ref()
494 .clone();
495
496 #[expect(deprecated)]
497 new_version_delta.group_deltas.insert(
499 new_compaction_group_id,
500 GroupDeltas {
501 group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
502 group_config: Some(config.clone()),
503 group_id: new_compaction_group_id,
504 parent_group_id,
505 new_sst_start_id,
506 table_ids: vec![],
507 version: CompatibilityVersion::LATEST as _, split_key: Some(split_key.into()),
509 }))],
510 },
511 );
512 (new_compaction_group_id, config)
513 };
514
515 new_version_delta.with_latest_version(|version, new_version_delta| {
516 for &table_id in &table_ids_right {
517 let info = version
518 .state_table_info
519 .info()
520 .get(&table_id)
521 .expect("have check exist previously");
522 assert!(
523 new_version_delta
524 .state_table_info_delta
525 .insert(
526 table_id,
527 PbStateTableInfoDelta {
528 committed_epoch: info.committed_epoch,
529 compaction_group_id: new_compaction_group_id,
530 }
531 )
532 .is_none()
533 );
534 }
535 });
536
537 result.push((new_compaction_group_id, table_ids_right));
538
539 {
540 let mut compaction_group_manager = self.compaction_group_manager.write().await;
541 let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
542 compaction_groups_txn
543 .create_compaction_groups(new_compaction_group_id, Arc::new(config));
544
545 for (cg_id, table_ids) in &result {
549 if let Some(partition_vnode_count) = partition_vnode_count
551 && table_ids.len() == 1
552 && table_ids == split_table_ids
553 && let Err(err) = compaction_groups_txn.update_compaction_config(
554 &[*cg_id],
555 &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
556 )
557 {
558 tracing::error!(
559 error = %err.as_report(),
560 "failed to update compaction config for group-{}",
561 cg_id
562 );
563 }
564 }
565
566 new_version_delta.pre_apply();
567 commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
568 }
569 versioning.mark_next_time_travel_version_snapshot();
571
572 let mut canceled_tasks = vec![];
575 let compact_task_assignments =
576 compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
577 let levels = versioning
578 .current_version
579 .get_compaction_group_levels(parent_group_id);
580 compact_task_assignments
581 .into_iter()
582 .for_each(|task_assignment| {
583 if let Some(task) = task_assignment.compact_task.as_ref() {
584 let is_expired = is_compaction_task_expired(
585 task.compaction_group_version_id,
586 levels.compaction_group_version_id,
587 );
588 if is_expired {
589 canceled_tasks.push(ReportTask {
590 task_id: task.task_id,
591 task_status: TaskStatus::ManualCanceled,
592 table_stats_change: HashMap::default(),
593 sorted_output_ssts: vec![],
594 object_timestamps: HashMap::default(),
595 });
596 }
597 }
598 });
599
600 if !canceled_tasks.is_empty() {
601 self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
602 .await?;
603 }
604
605 self.metrics
606 .split_compaction_group_count
607 .with_label_values(&[&parent_group_id.to_string()])
608 .inc();
609
610 Ok(result)
611 }
612
613 pub async fn move_state_tables_to_dedicated_compaction_group(
616 &self,
617 parent_group_id: CompactionGroupId,
618 table_ids: &[StateTableId],
619 partition_vnode_count: Option<u32>,
620 ) -> Result<(
621 CompactionGroupId,
622 BTreeMap<CompactionGroupId, Vec<StateTableId>>,
623 )> {
624 if table_ids.is_empty() {
625 return Err(Error::CompactionGroup(
626 "table_ids must not be empty".to_owned(),
627 ));
628 }
629
630 if !table_ids.is_sorted() {
631 return Err(Error::CompactionGroup(
632 "table_ids must be sorted".to_owned(),
633 ));
634 }
635
636 let parent_table_ids = {
637 let versioning_guard = self.versioning.read().await;
638 versioning_guard
639 .current_version
640 .state_table_info
641 .compaction_group_member_table_ids(parent_group_id)
642 .iter()
643 .copied()
644 .collect_vec()
645 };
646
647 if parent_table_ids == table_ids {
648 return Err(Error::CompactionGroup(format!(
649 "invalid split attempt for group {}: all member tables are moved",
650 parent_group_id
651 )));
652 }
653
654 fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
655 {
657 cg_id_to_table_ids
658 .iter()
659 .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
660 }
661
662 {
664 let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
665 table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
666 assert!(table_table_ids_vec.concat().is_sorted());
667 }
668
669 {
671 let mut all_table_ids = HashSet::new();
672 for table_ids in cg_id_to_table_ids.values() {
673 for table_id in table_ids {
674 assert!(all_table_ids.insert(*table_id));
675 }
676 }
677 }
678 }
679
680 let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
690 let table_id_to_split = *table_ids.first().unwrap();
691 let mut target_compaction_group_id: CompactionGroupId = 0.into();
692 let result_vec = self
693 .split_compaction_group_impl(
694 parent_group_id,
695 table_ids,
696 table_id_to_split,
697 VirtualNode::ZERO,
698 partition_vnode_count,
699 )
700 .await?;
701 assert!(result_vec.len() <= 2);
702
703 let mut finish_move = false;
704 for (cg_id, table_ids_after_split) in result_vec {
705 if table_ids_after_split.contains(&table_id_to_split) {
706 target_compaction_group_id = cg_id;
707 }
708
709 if table_ids_after_split == table_ids {
710 finish_move = true;
711 }
712
713 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
714 }
715 check_table_ids_valid(&cg_id_to_table_ids);
716
717 if finish_move {
718 return Ok((target_compaction_group_id, cg_id_to_table_ids));
719 }
720
721 let table_id_to_split = *table_ids.last().unwrap();
724 let result_vec = self
725 .split_compaction_group_impl(
726 target_compaction_group_id,
727 table_ids,
728 table_id_to_split,
729 VirtualNode::MAX_REPRESENTABLE,
730 partition_vnode_count,
731 )
732 .await?;
733 assert!(result_vec.len() <= 2);
734 for (cg_id, table_ids_after_split) in result_vec {
735 if table_ids_after_split.contains(&table_id_to_split) {
736 target_compaction_group_id = cg_id;
737 }
738 cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
739 }
740 check_table_ids_valid(&cg_id_to_table_ids);
741
742 Ok((target_compaction_group_id, cg_id_to_table_ids))
743 }
744}
745
746impl HummockManager {
747 pub async fn try_split_compaction_group(
749 &self,
750 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
751 group: CompactionGroupStatistic,
752 ) {
753 if group
754 .compaction_group_config
755 .compaction_config
756 .disable_auto_group_scheduling
757 .unwrap_or(false)
758 {
759 return;
760 }
761 for (table_id, table_size) in &group.table_statistic {
763 self.try_move_high_throughput_table_to_dedicated_cg(
764 table_write_throughput_statistic_manager,
765 *table_id,
766 table_size,
767 group.group_id,
768 )
769 .await;
770 }
771
772 self.try_split_huge_compaction_group(group).await;
774 }
775
776 pub async fn try_move_high_throughput_table_to_dedicated_cg(
778 &self,
779 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
780 table_id: TableId,
781 _table_size: &u64,
782 parent_group_id: CompactionGroupId,
783 ) {
784 let mut table_throughput = table_write_throughput_statistic_manager
785 .get_table_throughput_descending(
786 table_id,
787 self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
788 )
789 .peekable();
790
791 if table_throughput.peek().is_none() {
792 return;
793 }
794
795 let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
796 table_throughput,
797 self.env.opts.table_high_write_throughput_threshold,
798 self.env
799 .opts
800 .table_stat_high_write_throughput_ratio_for_split,
801 );
802
803 if !is_high_write_throughput {
805 return;
806 }
807
808 let ret = self
809 .move_state_tables_to_dedicated_compaction_group(
810 parent_group_id,
811 &[table_id],
812 Some(self.env.opts.partition_vnode_count),
813 )
814 .await;
815 match ret {
816 Ok(split_result) => {
817 tracing::info!(
818 "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
819 table_id,
820 parent_group_id,
821 self.env.opts.partition_vnode_count,
822 split_result
823 );
824 }
825 Err(e) => {
826 tracing::info!(
827 error = %e.as_report(),
828 "failed to split state table [{}] from group-{}",
829 table_id,
830 parent_group_id,
831 )
832 }
833 }
834 }
835
836 pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
837 let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
838 * self.env.opts.split_group_size_ratio) as u64;
839 let is_huge_hybrid_group =
840 group.group_size > group_max_size && group.table_statistic.len() > 1; if is_huge_hybrid_group {
842 let mut accumulated_size = 0;
843 let mut table_ids = Vec::default();
844 for (table_id, table_size) in &group.table_statistic {
845 accumulated_size += table_size;
846 table_ids.push(*table_id);
847 assert!(table_ids.is_sorted());
850 let remaining_size = group.group_size.saturating_sub(accumulated_size);
851 if accumulated_size > group_max_size / 2
852 && remaining_size > 0
853 && table_ids.len() < group.table_statistic.len()
854 {
855 let ret = self
856 .move_state_tables_to_dedicated_compaction_group(
857 group.group_id,
858 &table_ids,
859 None,
860 )
861 .await;
862 match ret {
863 Ok(split_result) => {
864 tracing::info!(
865 "split_huge_compaction_group success {:?}",
866 split_result
867 );
868 self.metrics
869 .split_compaction_group_count
870 .with_label_values(&[&group.group_id.to_string()])
871 .inc();
872 return;
873 }
874 Err(e) => {
875 tracing::error!(
876 error = %e.as_report(),
877 "failed to split_huge_compaction_group table {:?} from group-{}",
878 table_ids,
879 group.group_id
880 );
881
882 return;
883 }
884 }
885 }
886 }
887 }
888 }
889
890 pub async fn try_merge_compaction_group(
891 &self,
892 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
893 group: &CompactionGroupStatistic,
894 next_group: &CompactionGroupStatistic,
895 created_tables: &HashSet<TableId>,
896 ) -> Result<()> {
897 GroupMergeValidator::validate_group_merge(
898 group,
899 next_group,
900 created_tables,
901 table_write_throughput_statistic_manager,
902 &self.env.opts,
903 &self.versioning,
904 )
905 .await?;
906
907 let result = self
908 .merge_compaction_group(group.group_id, next_group.group_id)
909 .await;
910
911 match &result {
912 Ok(()) => {
913 tracing::info!(
914 "merge group-{} to group-{}",
915 next_group.group_id,
916 group.group_id,
917 );
918
919 self.metrics
920 .merge_compaction_group_count
921 .with_label_values(&[&group.group_id.to_string()])
922 .inc();
923 }
924 Err(e) => {
925 tracing::info!(
926 error = %e.as_report(),
927 "failed to merge group-{} group-{}",
928 next_group.group_id,
929 group.group_id,
930 );
931 }
932 }
933
934 result
935 }
936}
937
938#[derive(Debug, Default)]
939struct GroupMergeValidator {}
940
941impl GroupMergeValidator {
942 fn is_merge_compatible_by_semantics(
945 group: &CompactionGroupStatistic,
946 next_group: &CompactionGroupStatistic,
947 ) -> bool {
948 let (mut left, mut right) = (
949 group
950 .compaction_group_config
951 .compaction_config
952 .as_ref()
953 .clone(),
954 next_group
955 .compaction_group_config
956 .compaction_config
957 .as_ref()
958 .clone(),
959 );
960 left.split_weight_by_vnode = 0;
961 right.split_weight_by_vnode = 0;
962 left == right
963 }
964
965 pub fn is_table_high_write_throughput(
967 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
968 threshold: u64,
969 high_write_throughput_ratio: f64,
970 ) -> bool {
971 let mut sample_size = 0;
972 let mut high_write_throughput_count = 0;
973 for statistic in table_throughput {
974 sample_size += 1;
975 if statistic.throughput > threshold {
976 high_write_throughput_count += 1;
977 }
978 }
979
980 high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
981 }
982
983 pub fn is_table_low_write_throughput(
984 table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
985 threshold: u64,
986 low_write_throughput_ratio: f64,
987 ) -> bool {
988 let mut sample_size = 0;
989 let mut low_write_throughput_count = 0;
990 for statistic in table_throughput {
991 sample_size += 1;
992 if statistic.throughput <= threshold {
993 low_write_throughput_count += 1;
994 }
995 }
996
997 low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
998 }
999
1000 fn check_is_low_write_throughput_compaction_group(
1001 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1002 group: &CompactionGroupStatistic,
1003 opts: &Arc<MetaOpts>,
1004 ) -> bool {
1005 let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1006 for table_id in group.table_statistic.keys() {
1007 let mut table_throughput = table_write_throughput_statistic_manager
1008 .get_table_throughput_descending(
1009 *table_id,
1010 opts.table_stat_throuput_window_seconds_for_merge as i64,
1011 )
1012 .peekable();
1013 if table_throughput.peek().is_none() {
1014 continue;
1015 }
1016
1017 table_with_statistic.push(table_throughput);
1018 }
1019
1020 if table_with_statistic.is_empty() {
1022 return true;
1023 }
1024
1025 table_with_statistic.into_iter().all(|table_throughput| {
1027 Self::is_table_low_write_throughput(
1028 table_throughput,
1029 opts.table_low_write_throughput_threshold,
1030 opts.table_stat_low_write_throughput_ratio_for_merge,
1031 )
1032 })
1033 }
1034
1035 fn check_is_creating_compaction_group(
1036 group: &CompactionGroupStatistic,
1037 created_tables: &HashSet<TableId>,
1038 ) -> bool {
1039 group
1040 .table_statistic
1041 .keys()
1042 .any(|table_id| !created_tables.contains(table_id))
1043 }
1044
1045 async fn validate_group_merge(
1046 group: &CompactionGroupStatistic,
1047 next_group: &CompactionGroupStatistic,
1048 created_tables: &HashSet<TableId>,
1049 table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1050 opts: &Arc<MetaOpts>,
1051 versioning: &MonitoredRwLock<Versioning>,
1052 ) -> Result<()> {
1053 if (group.group_id == StaticCompactionGroupId::StateDefault
1055 && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1056 || (group.group_id == StaticCompactionGroupId::MaterializedView
1057 && next_group.group_id == StaticCompactionGroupId::StateDefault)
1058 {
1059 return Err(Error::CompactionGroup(format!(
1060 "group-{} and group-{} are both StaticCompactionGroupId",
1061 group.group_id, next_group.group_id
1062 )));
1063 }
1064
1065 if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1066 return Err(Error::CompactionGroup(format!(
1067 "group-{} or group-{} is empty",
1068 group.group_id, next_group.group_id
1069 )));
1070 }
1071
1072 {
1077 let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1078 let mut table_ids_2: Vec<TableId> =
1079 next_group.table_statistic.keys().cloned().collect_vec();
1080 table_ids_1.sort();
1081 table_ids_2.sort();
1082 if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1083 std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1084 }
1085 if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1086 return Err(Error::CompactionGroup(format!(
1087 "group-{} and group-{} have overlapping table id ranges, not mergeable",
1088 group.group_id, next_group.group_id
1089 )));
1090 }
1091 }
1092
1093 if group
1094 .compaction_group_config
1095 .compaction_config
1096 .disable_auto_group_scheduling
1097 .unwrap_or(false)
1098 || next_group
1099 .compaction_group_config
1100 .compaction_config
1101 .disable_auto_group_scheduling
1102 .unwrap_or(false)
1103 {
1104 return Err(Error::CompactionGroup(format!(
1105 "group-{} or group-{} disable_auto_group_scheduling",
1106 group.group_id, next_group.group_id
1107 )));
1108 }
1109
1110 if !Self::is_merge_compatible_by_semantics(group, next_group) {
1113 let left_config = group.compaction_group_config.compaction_config.as_ref();
1114 let right_config = next_group
1115 .compaction_group_config
1116 .compaction_config
1117 .as_ref();
1118
1119 tracing::warn!(
1120 group_id = %group.group_id,
1121 next_group_id = %next_group.group_id,
1122 left_config = ?left_config,
1123 right_config = ?right_config,
1124 "compaction config semantic mismatch detected while merging compaction groups"
1125 );
1126
1127 return Err(Error::CompactionGroup(format!(
1128 "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1129 group.group_id, next_group.group_id, left_config, right_config
1130 )));
1131 }
1132
1133 if Self::check_is_creating_compaction_group(group, created_tables) {
1135 return Err(Error::CompactionGroup(format!(
1136 "Cannot merge creating group {} next_group {}",
1137 group.group_id, next_group.group_id
1138 )));
1139 }
1140
1141 if !Self::check_is_low_write_throughput_compaction_group(
1143 table_write_throughput_statistic_manager,
1144 group,
1145 opts,
1146 ) {
1147 return Err(Error::CompactionGroup(format!(
1148 "Cannot merge high throughput group {} next_group {}",
1149 group.group_id, next_group.group_id
1150 )));
1151 }
1152
1153 let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1154 * opts.split_group_size_ratio) as u64;
1155
1156 if (group.group_size + next_group.group_size) > size_limit {
1157 return Err(Error::CompactionGroup(format!(
1158 "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1159 group.group_id,
1160 group.group_size,
1161 next_group.group_id,
1162 next_group.group_size,
1163 size_limit
1164 )));
1165 }
1166
1167 if Self::check_is_creating_compaction_group(next_group, created_tables) {
1168 return Err(Error::CompactionGroup(format!(
1169 "Cannot merge creating group {} next group {}",
1170 group.group_id, next_group.group_id
1171 )));
1172 }
1173
1174 if !Self::check_is_low_write_throughput_compaction_group(
1175 table_write_throughput_statistic_manager,
1176 next_group,
1177 opts,
1178 ) {
1179 return Err(Error::CompactionGroup(format!(
1180 "Cannot merge high throughput group {} next group {}",
1181 group.group_id, next_group.group_id
1182 )));
1183 }
1184
1185 {
1186 let versioning_guard = versioning.read().await;
1188 let levels = &versioning_guard.current_version.levels;
1189 if !levels.contains_key(&group.group_id) {
1190 return Err(Error::CompactionGroup(format!(
1191 "Cannot merge group {} not exist",
1192 group.group_id
1193 )));
1194 }
1195
1196 if !levels.contains_key(&next_group.group_id) {
1197 return Err(Error::CompactionGroup(format!(
1198 "Cannot merge next group {} not exist",
1199 next_group.group_id
1200 )));
1201 }
1202
1203 let group_levels = versioning_guard
1204 .current_version
1205 .get_compaction_group_levels(group.group_id);
1206
1207 let next_group_levels = versioning_guard
1208 .current_version
1209 .get_compaction_group_levels(next_group.group_id);
1210
1211 let group_state = GroupStateValidator::group_state(
1212 group_levels,
1213 group.compaction_group_config.compaction_config().deref(),
1214 );
1215
1216 if group_state.is_write_stop() || group_state.is_emergency() {
1217 return Err(Error::CompactionGroup(format!(
1218 "Cannot merge write limit group {} next group {}",
1219 group.group_id, next_group.group_id
1220 )));
1221 }
1222
1223 let next_group_state = GroupStateValidator::group_state(
1224 next_group_levels,
1225 next_group
1226 .compaction_group_config
1227 .compaction_config()
1228 .deref(),
1229 );
1230
1231 if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1232 return Err(Error::CompactionGroup(format!(
1233 "Cannot merge write limit next group {} group {}",
1234 next_group.group_id, group.group_id
1235 )));
1236 }
1237
1238 let l0_sub_level_count_after_merge =
1240 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1241 if GroupStateValidator::write_stop_l0_file_count(
1242 (l0_sub_level_count_after_merge as f64
1243 * opts.compaction_group_merge_dimension_threshold) as usize,
1244 group.compaction_group_config.compaction_config().deref(),
1245 ) {
1246 return Err(Error::CompactionGroup(format!(
1247 "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1248 group.group_id, next_group.group_id
1249 )));
1250 }
1251
1252 let l0_file_count_after_merge =
1253 group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1254 if GroupStateValidator::write_stop_l0_file_count(
1255 (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1256 as usize,
1257 group.compaction_group_config.compaction_config().deref(),
1258 ) {
1259 return Err(Error::CompactionGroup(format!(
1260 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1261 next_group.group_id, group.group_id
1262 )));
1263 }
1264
1265 let l0_size_after_merge =
1266 group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1267
1268 if GroupStateValidator::write_stop_l0_size(
1269 (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1270 as u64,
1271 group.compaction_group_config.compaction_config().deref(),
1272 ) {
1273 return Err(Error::CompactionGroup(format!(
1274 "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1275 next_group.group_id, group.group_id
1276 )));
1277 }
1278
1279 if GroupStateValidator::emergency_l0_file_count(
1281 (l0_sub_level_count_after_merge as f64
1282 * opts.compaction_group_merge_dimension_threshold) as usize,
1283 group.compaction_group_config.compaction_config().deref(),
1284 ) {
1285 return Err(Error::CompactionGroup(format!(
1286 "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1287 group.group_id, next_group.group_id
1288 )));
1289 }
1290 }
1291
1292 Ok(())
1293 }
1294}