Skip to main content

risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas, HummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::compaction_group_manager::CompactionGroupManager;
38use super::{CompactionGroupStatistic, GroupStateValidator};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_metrics;
44use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
45use crate::hummock::table_write_throughput_statistic::{
46    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
47};
48use crate::manager::MetaOpts;
49
50#[derive(Debug, PartialEq, Eq)]
51struct NormalizePlan {
52    parent_group_id: CompactionGroupId,
53    parent_table_ids: Vec<StateTableId>,
54    boundary_table_id: StateTableId,
55}
56
57impl NormalizePlan {
58    fn split_key(&self) -> Bytes {
59        group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO)
60            .encode()
61            .into()
62    }
63
64    fn split_table_ids(&self) -> (Vec<StateTableId>, Vec<StateTableId>) {
65        let split_full_key =
66            group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO);
67        let (table_ids_left, table_ids_right) =
68            group_split::split_table_ids_with_table_id_and_vnode(
69                &self.parent_table_ids,
70                split_full_key.user_key.table_id,
71                split_full_key.user_key.get_vnode_id(),
72            );
73        assert!(!table_ids_left.is_empty() && !table_ids_right.is_empty());
74        (table_ids_left, table_ids_right)
75    }
76}
77
78fn gen_normalize_plan(
79    left: &CompactionGroupStatistic,
80    right: &CompactionGroupStatistic,
81) -> Option<NormalizePlan> {
82    let left_table_ids = left.table_statistic.keys().copied().collect_vec();
83
84    if left_table_ids.len() <= 1 {
85        return None;
86    }
87
88    let left_max = *left_table_ids.last().unwrap();
89    let right_min = *right.table_statistic.keys().next().unwrap();
90    if left_max < right_min {
91        return None;
92    }
93
94    let boundary_index = left_table_ids.partition_point(|&table_id| table_id < right_min);
95    if boundary_index == 0 || boundary_index >= left_table_ids.len() {
96        return None;
97    }
98    let boundary_table_id = left_table_ids[boundary_index];
99
100    Some(NormalizePlan {
101        parent_group_id: left.group_id,
102        parent_table_ids: left_table_ids,
103        boundary_table_id,
104    })
105}
106
107fn build_normalize_plan_from_group_statistics(
108    groups: &[CompactionGroupStatistic],
109) -> Option<NormalizePlan> {
110    // `calculate_compaction_group_statistic()` iterates all version levels, so newly created or
111    // transiently empty groups can appear here without any member tables.
112    let mut groups = groups
113        .iter()
114        .filter(|group| !group.table_statistic.is_empty())
115        .collect_vec();
116    groups.sort_by_key(|group| *group.table_statistic.keys().next().unwrap());
117
118    groups
119        .split(|group| {
120            group
121                .compaction_group_config
122                .compaction_config
123                .disable_auto_group_scheduling
124                .unwrap_or(false)
125        })
126        .find_map(|segment| {
127            segment
128                .windows(2)
129                .find_map(|pair| gen_normalize_plan(pair[0], pair[1]))
130        })
131}
132
133fn collect_normalize_group_statistics(
134    version: &HummockVersion,
135    compaction_group_manager: &CompactionGroupManager,
136) -> Result<Vec<CompactionGroupStatistic>> {
137    let mut groups = vec![];
138    for group_id in version.levels.keys() {
139        let table_ids = version
140            .state_table_info
141            .compaction_group_member_table_ids(*group_id)
142            .iter()
143            .copied()
144            .collect_vec();
145        if table_ids.is_empty() {
146            continue;
147        }
148
149        let group_config = compaction_group_manager
150            .try_get_compaction_group_config(*group_id)
151            .ok_or_else(|| {
152                Error::CompactionGroup(format!(
153                    "group {} config not found during normalize",
154                    group_id
155                ))
156            })?;
157        groups.push(CompactionGroupStatistic {
158            group_id: *group_id,
159            group_size: 0,
160            table_statistic: table_ids
161                .into_iter()
162                .map(|table_id| (table_id, 0))
163                .collect(),
164            compaction_group_config: group_config,
165        });
166    }
167    Ok(groups)
168}
169
170impl HummockManager {
171    pub async fn merge_compaction_group(
172        &self,
173        group_1: CompactionGroupId,
174        group_2: CompactionGroupId,
175    ) -> Result<()> {
176        self.merge_compaction_group_impl(group_1, group_2, None)
177            .await
178    }
179
180    pub async fn merge_compaction_group_for_test(
181        &self,
182        group_1: CompactionGroupId,
183        group_2: CompactionGroupId,
184        created_tables: HashSet<TableId>,
185    ) -> Result<()> {
186        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
187            .await
188    }
189
190    pub async fn merge_compaction_group_impl(
191        &self,
192        group_1: CompactionGroupId,
193        group_2: CompactionGroupId,
194        created_tables: Option<HashSet<TableId>>,
195    ) -> Result<()> {
196        let compaction_guard = self.compaction.write().await;
197        let mut versioning_guard = self.versioning.write().await;
198        let versioning = versioning_guard.deref_mut();
199        // Validate parameters.
200        if !versioning.current_version.levels.contains_key(&group_1) {
201            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
202        }
203
204        if !versioning.current_version.levels.contains_key(&group_2) {
205            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
206        }
207
208        let state_table_info = versioning.current_version.state_table_info.clone();
209        let mut member_table_ids_1 = state_table_info
210            .compaction_group_member_table_ids(group_1)
211            .iter()
212            .cloned()
213            .collect_vec();
214
215        if member_table_ids_1.is_empty() {
216            return Err(Error::CompactionGroup(format!(
217                "group_1 {} is empty",
218                group_1
219            )));
220        }
221
222        let mut member_table_ids_2 = state_table_info
223            .compaction_group_member_table_ids(group_2)
224            .iter()
225            .cloned()
226            .collect_vec();
227
228        if member_table_ids_2.is_empty() {
229            return Err(Error::CompactionGroup(format!(
230                "group_2 {} is empty",
231                group_2
232            )));
233        }
234
235        debug_assert!(!member_table_ids_1.is_empty());
236        debug_assert!(!member_table_ids_2.is_empty());
237        assert!(member_table_ids_1.is_sorted());
238        assert!(member_table_ids_2.is_sorted());
239
240        let created_tables = if let Some(created_tables) = created_tables {
241            // if the created_tables is provided, use it directly, most for test
242            #[expect(clippy::assertions_on_constants)]
243            {
244                assert!(cfg!(debug_assertions));
245            }
246            created_tables
247        } else {
248            match self.metadata_manager.get_created_table_ids().await {
249                Ok(created_tables) => HashSet::from_iter(created_tables),
250                Err(err) => {
251                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
252                    return Err(Error::CompactionGroup(format!(
253                        "merge group_1 {} group_2 {} failed to fetch created table ids",
254                        group_1, group_2
255                    )));
256                }
257            }
258        };
259
260        fn contains_creating_table(
261            table_ids: &Vec<TableId>,
262            created_tables: &HashSet<TableId>,
263        ) -> bool {
264            table_ids
265                .iter()
266                .any(|table_id| !created_tables.contains(table_id))
267        }
268
269        // do not merge the compaction group which is creating
270        if contains_creating_table(&member_table_ids_1, &created_tables)
271            || contains_creating_table(&member_table_ids_2, &created_tables)
272        {
273            return Err(Error::CompactionGroup(format!(
274                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
275                group_1, group_2, member_table_ids_1, member_table_ids_2
276            )));
277        }
278
279        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
280        let (left_group_id, right_group_id) =
281            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
282                (group_1, group_2)
283            } else {
284                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
285                (group_2, group_1)
286            };
287
288        // We can only merge two groups with non-overlapping member table ids.
289        // After the swap above, member_table_ids_1 has the smaller first element.
290        // If the last element of member_table_ids_1 >= the first element of member_table_ids_2,
291        // the two groups' table id ranges overlap and cannot be merged.
292        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
293            return Err(Error::CompactionGroup(format!(
294                "invalid merge group_1 {} group_2 {}: table id ranges overlap",
295                left_group_id, right_group_id
296            )));
297        }
298
299        let combined_member_table_ids = member_table_ids_1
300            .iter()
301            .chain(member_table_ids_2.iter())
302            .collect_vec();
303        assert!(combined_member_table_ids.is_sorted());
304
305        // check duplicated sst_id
306        let mut sst_id_set = HashSet::new();
307        for sst_id in versioning
308            .current_version
309            .get_sst_ids_by_group_id(left_group_id)
310            .chain(
311                versioning
312                    .current_version
313                    .get_sst_ids_by_group_id(right_group_id),
314            )
315        {
316            if !sst_id_set.insert(sst_id) {
317                return Err(Error::CompactionGroup(format!(
318                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
319                    left_group_id, right_group_id, sst_id
320                )));
321            }
322        }
323
324        // check branched sst on non-overlap level
325        {
326            let left_levels = versioning
327                .current_version
328                .get_compaction_group_levels(group_1);
329
330            let right_levels = versioning
331                .current_version
332                .get_compaction_group_levels(group_2);
333
334            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
335            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
336            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
337            for level_idx in 1..=max_level {
338                let left_level = left_levels.get_level(level_idx);
339                let right_level = right_levels.get_level(level_idx);
340                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
341                    continue;
342                }
343
344                let left_last_sst = left_level.table_infos.last().unwrap().clone();
345                let right_first_sst = right_level.table_infos.first().unwrap().clone();
346                let left_sst_id = left_last_sst.sst_id;
347                let right_sst_id = right_first_sst.sst_id;
348                let left_obj_id = left_last_sst.object_id;
349                let right_obj_id = right_first_sst.object_id;
350
351                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
352                if !can_concat(&[left_last_sst, right_first_sst]) {
353                    return Err(Error::CompactionGroup(format!(
354                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
355                        left_group_id,
356                        right_group_id,
357                        level_idx,
358                        left_sst_id,
359                        right_sst_id,
360                        left_obj_id,
361                        right_obj_id
362                    )));
363                }
364            }
365        }
366
367        let mut version = HummockVersionTransaction::new(
368            &mut versioning.current_version,
369            &mut versioning.hummock_version_deltas,
370            &mut versioning.table_change_log,
371            self.env.notification_manager(),
372            None,
373            &self.metrics,
374            &self.env.opts,
375        );
376        let mut new_version_delta = version.new_delta();
377
378        let target_compaction_group_id = {
379            // merge right_group_id to left_group_id and remove right_group_id
380            new_version_delta.group_deltas.insert(
381                left_group_id,
382                GroupDeltas {
383                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
384                        left_group_id,
385                        right_group_id,
386                    })],
387                },
388            );
389            left_group_id
390        };
391
392        // TODO: remove compaciton group_id from state_table_info
393        // rewrite compaction_group_id for all tables
394        new_version_delta.with_latest_version(|version, new_version_delta| {
395            for &table_id in combined_member_table_ids {
396                let info = version
397                    .state_table_info
398                    .info()
399                    .get(&table_id)
400                    .expect("have check exist previously");
401                assert!(
402                    new_version_delta
403                        .state_table_info_delta
404                        .insert(
405                            table_id,
406                            PbStateTableInfoDelta {
407                                committed_epoch: info.committed_epoch,
408                                compaction_group_id: target_compaction_group_id,
409                            }
410                        )
411                        .is_none()
412                );
413            }
414        });
415
416        {
417            let mut compaction_group_manager = self.compaction_group_manager.write().await;
418            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
419
420            // for metrics reclaim
421            {
422                let right_group_max_level = new_version_delta
423                    .latest_version()
424                    .get_compaction_group_levels(right_group_id)
425                    .levels
426                    .len();
427
428                remove_compaction_group_metrics(
429                    &self.metrics,
430                    right_group_id,
431                    right_group_max_level,
432                );
433            }
434
435            // clean up compaction schedule state for the merged group
436            self.compaction_state
437                .remove_compaction_group(right_group_id);
438
439            // clear `partition_vnode_count` for the hybrid group
440            {
441                if let Err(err) = compaction_groups_txn.update_compaction_config(
442                    &[left_group_id],
443                    &[MutableConfig::SplitWeightByVnode(0)], // default
444                ) {
445                    tracing::error!(
446                        error = %err.as_report(),
447                        "failed to update compaction config for group-{}",
448                        left_group_id
449                    );
450                }
451            }
452
453            new_version_delta.pre_apply();
454
455            // remove right_group_id
456            compaction_groups_txn.remove(right_group_id);
457            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
458        }
459
460        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
461        versioning.mark_next_time_travel_version_snapshot();
462
463        // cancel tasks
464        let mut canceled_tasks = vec![];
465        // after merge, all tasks in right_group_id should be canceled
466        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
467        let compact_task_assignments =
468            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
469        compact_task_assignments
470            .into_iter()
471            .for_each(|task_assignment| {
472                let task = &task_assignment.compact_task;
473                assert_eq!(task.compaction_group_id, right_group_id);
474                canceled_tasks.push(ReportTask {
475                    task_id: task.task_id,
476                    task_status: TaskStatus::ManualCanceled,
477                    table_stats_change: HashMap::default(),
478                    sorted_output_ssts: vec![],
479                    object_timestamps: HashMap::default(),
480                });
481            });
482
483        if !canceled_tasks.is_empty() {
484            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
485                .await?;
486        } else {
487            drop(versioning_guard);
488            drop(compaction_guard);
489        }
490
491        self.try_update_write_limits(&[left_group_id, right_group_id])
492            .await;
493
494        self.metrics
495            .merge_compaction_group_count
496            .with_label_values(&[&left_group_id.to_string()])
497            .inc();
498
499        Ok(())
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use std::collections::BTreeMap;
506
507    use risingwave_hummock_sdk::CompactionGroupId;
508    use risingwave_pb::hummock::CompactionConfig;
509
510    use super::{
511        CompactionGroupStatistic, NormalizePlan, build_normalize_plan_from_group_statistics,
512        gen_normalize_plan,
513    };
514    use crate::hummock::model::CompactionGroup;
515
516    fn group(
517        group_id: CompactionGroupId,
518        table_ids: &[u32],
519        disable_auto_group_scheduling: bool,
520    ) -> CompactionGroupStatistic {
521        let config = CompactionConfig {
522            disable_auto_group_scheduling: Some(disable_auto_group_scheduling),
523            ..Default::default()
524        };
525        CompactionGroupStatistic {
526            group_id,
527            group_size: 0,
528            table_statistic: table_ids
529                .iter()
530                .copied()
531                .map(|table_id| (table_id.into(), 0_u64))
532                .collect::<BTreeMap<_, _>>(),
533            compaction_group_config: CompactionGroup::new(group_id, config),
534        }
535    }
536
537    #[test]
538    fn test_gen_normalize_plan_returns_none_for_single_table_group() {
539        let left = group(1.into(), &[10], false);
540        let right = group(2.into(), &[5, 20], false);
541
542        assert_eq!(None, gen_normalize_plan(&left, &right));
543    }
544
545    #[test]
546    fn test_gen_normalize_plan_returns_none_for_non_overlapping_groups() {
547        let left = group(1.into(), &[1, 2, 3], false);
548        let right = group(2.into(), &[4, 5, 6], false);
549
550        assert_eq!(None, gen_normalize_plan(&left, &right));
551    }
552
553    #[test]
554    fn test_gen_normalize_plan_returns_none_when_boundary_cannot_split_parent() {
555        let left = group(1.into(), &[5, 6, 7], false);
556        let right = group(2.into(), &[4, 8], false);
557
558        assert_eq!(None, gen_normalize_plan(&left, &right));
559    }
560
561    #[test]
562    fn test_gen_normalize_plan_generates_expected_boundary() {
563        let left = group(1.into(), &[1, 4, 7], false);
564        let right = group(2.into(), &[2, 5, 8], false);
565
566        assert_eq!(
567            Some(NormalizePlan {
568                parent_group_id: 1.into(),
569                parent_table_ids: vec![1.into(), 4.into(), 7.into()],
570                boundary_table_id: 4.into(),
571            }),
572            gen_normalize_plan(&left, &right)
573        );
574    }
575
576    #[test]
577    fn test_build_normalize_plan_skips_disabled_boundary_and_continues_later_segment() {
578        let groups = vec![
579            group(1.into(), &[1, 4, 7], false),
580            group(2.into(), &[2, 5, 8], true),
581            group(3.into(), &[10, 13, 16], false),
582            group(4.into(), &[11, 14, 17], false),
583        ];
584
585        assert_eq!(
586            Some(NormalizePlan {
587                parent_group_id: 3.into(),
588                parent_table_ids: vec![10.into(), 13.into(), 16.into()],
589                boundary_table_id: 13.into(),
590            }),
591            build_normalize_plan_from_group_statistics(&groups)
592        );
593    }
594}
595
596impl HummockManager {
597    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
598    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
599    /// The split will follow the following rules
600    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
601    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
602    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
603    ///   - `parent_group_id`: the `group_id` to split
604    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
605    ///   - `table_id_to_split`: the `table_id` to split
606    ///   - `vnode_to_split`: the `vnode` to split
607    ///   - `partition_vnode_count`: the partition count for the single table group if need
608    async fn split_compaction_group_impl(
609        &self,
610        parent_group_id: CompactionGroupId,
611        split_table_ids: &[StateTableId],
612        table_id_to_split: StateTableId,
613        vnode_to_split: VirtualNode,
614        partition_vnode_count: Option<u32>,
615    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
616        let mut result = vec![];
617        let compaction_guard = self.compaction.write().await;
618        let mut versioning_guard = self.versioning.write().await;
619        let versioning = versioning_guard.deref_mut();
620        // Validate parameters.
621        if !versioning
622            .current_version
623            .levels
624            .contains_key(&parent_group_id)
625        {
626            return Err(Error::CompactionGroup(format!(
627                "invalid group {}",
628                parent_group_id
629            )));
630        }
631
632        let member_table_ids = versioning
633            .current_version
634            .state_table_info
635            .compaction_group_member_table_ids(parent_group_id)
636            .iter()
637            .copied()
638            .collect::<BTreeSet<_>>();
639
640        if !member_table_ids.contains(&table_id_to_split) {
641            return Err(Error::CompactionGroup(format!(
642                "table {} doesn't in group {}",
643                table_id_to_split, parent_group_id
644            )));
645        }
646
647        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
648
649        // change to vec for partition
650        let table_ids = member_table_ids.into_iter().collect_vec();
651        if table_ids == split_table_ids {
652            return Err(Error::CompactionGroup(format!(
653                "invalid split attempt for group {}: all member tables are moved",
654                parent_group_id
655            )));
656        }
657        // avoid decode split_key when caller is aware of the table_id and vnode
658        let (table_ids_left, table_ids_right) =
659            group_split::split_table_ids_with_table_id_and_vnode(
660                &table_ids,
661                split_full_key.user_key.table_id,
662                split_full_key.user_key.get_vnode_id(),
663            );
664        if table_ids_left.is_empty() || table_ids_right.is_empty() {
665            // not need to split group if all tables are in the same side
666            if !table_ids_left.is_empty() {
667                result.push((parent_group_id, table_ids_left));
668            }
669
670            if !table_ids_right.is_empty() {
671                result.push((parent_group_id, table_ids_right));
672            }
673            return Ok(result);
674        }
675
676        result.push((parent_group_id, table_ids_left));
677
678        let split_key: Bytes = split_full_key.encode().into();
679
680        let mut version = HummockVersionTransaction::new(
681            &mut versioning.current_version,
682            &mut versioning.hummock_version_deltas,
683            &mut versioning.table_change_log,
684            self.env.notification_manager(),
685            None,
686            &self.metrics,
687            &self.env.opts,
688        );
689        let mut new_version_delta = version.new_delta();
690
691        let split_sst_count = new_version_delta
692            .latest_version()
693            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
694
695        let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
696        let (new_compaction_group_id, config) = {
697            // All NewCompactionGroup pairs are mapped to one new compaction group.
698            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
699            // Inherit config from parent group
700            let config = self
701                .compaction_group_manager
702                .read()
703                .await
704                .try_get_compaction_group_config(parent_group_id)
705                .ok_or_else(|| {
706                    Error::CompactionGroup(format!(
707                        "parent group {} config not found",
708                        parent_group_id
709                    ))
710                })?
711                .compaction_config()
712                .as_ref()
713                .clone();
714
715            #[expect(deprecated)]
716            // fill the deprecated field with default value
717            new_version_delta.group_deltas.insert(
718                new_compaction_group_id,
719                GroupDeltas {
720                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
721                        group_config: Some(config.clone()),
722                        group_id: new_compaction_group_id,
723                        parent_group_id,
724                        new_sst_start_id,
725                        table_ids: vec![],
726                        version: CompatibilityVersion::LATEST as _, // for compatibility
727                        split_key: Some(split_key.into()),
728                    }))],
729                },
730            );
731            (new_compaction_group_id, config)
732        };
733
734        new_version_delta.with_latest_version(|version, new_version_delta| {
735            for &table_id in &table_ids_right {
736                let info = version
737                    .state_table_info
738                    .info()
739                    .get(&table_id)
740                    .expect("have check exist previously");
741                assert!(
742                    new_version_delta
743                        .state_table_info_delta
744                        .insert(
745                            table_id,
746                            PbStateTableInfoDelta {
747                                committed_epoch: info.committed_epoch,
748                                compaction_group_id: new_compaction_group_id,
749                            }
750                        )
751                        .is_none()
752                );
753            }
754        });
755
756        result.push((new_compaction_group_id, table_ids_right));
757
758        {
759            let mut compaction_group_manager = self.compaction_group_manager.write().await;
760            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
761            compaction_groups_txn
762                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
763
764            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
765            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
766            // The target `table_ids` might be split to an existing group, so we need to try to update its config
767            for (cg_id, table_ids) in &result {
768                // check the split_tables had been place to the dedicated compaction group
769                if let Some(partition_vnode_count) = partition_vnode_count
770                    && table_ids.len() == 1
771                    && table_ids == split_table_ids
772                    && let Err(err) = compaction_groups_txn.update_compaction_config(
773                        &[*cg_id],
774                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
775                    )
776                {
777                    tracing::error!(
778                        error = %err.as_report(),
779                        "failed to update compaction config for group-{}",
780                        cg_id
781                    );
782                }
783            }
784
785            new_version_delta.pre_apply();
786            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
787        }
788        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
789        versioning.mark_next_time_travel_version_snapshot();
790
791        // The expired compact tasks will be canceled.
792        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
793        let mut canceled_tasks = vec![];
794        let compact_task_assignments =
795            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
796        let levels = versioning
797            .current_version
798            .get_compaction_group_levels(parent_group_id);
799        compact_task_assignments
800            .into_iter()
801            .for_each(|task_assignment| {
802                let task = &task_assignment.compact_task;
803                let is_expired = is_compaction_task_expired(
804                    task.compaction_group_version_id,
805                    levels.compaction_group_version_id,
806                );
807                if is_expired {
808                    canceled_tasks.push(ReportTask {
809                        task_id: task.task_id,
810                        task_status: TaskStatus::ManualCanceled,
811                        table_stats_change: HashMap::default(),
812                        sorted_output_ssts: vec![],
813                        object_timestamps: HashMap::default(),
814                    });
815                }
816            });
817
818        if !canceled_tasks.is_empty() {
819            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
820                .await?;
821        } else {
822            drop(versioning_guard);
823            drop(compaction_guard);
824        }
825
826        let affected_group_ids = result.iter().map(|(cg_id, _)| *cg_id).collect_vec();
827        self.try_update_write_limits(&affected_group_ids).await;
828
829        self.metrics
830            .split_compaction_group_count
831            .with_label_values(&[&parent_group_id.to_string()])
832            .inc();
833
834        Ok(result)
835    }
836
837    /// Split `table_ids` to a dedicated compaction group.
838    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
839    pub async fn move_state_tables_to_dedicated_compaction_group(
840        &self,
841        parent_group_id: CompactionGroupId,
842        table_ids: &[StateTableId],
843        partition_vnode_count: Option<u32>,
844    ) -> Result<(
845        CompactionGroupId,
846        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
847    )> {
848        if table_ids.is_empty() {
849            return Err(Error::CompactionGroup(
850                "table_ids must not be empty".to_owned(),
851            ));
852        }
853
854        if !table_ids.is_sorted() {
855            return Err(Error::CompactionGroup(
856                "table_ids must be sorted".to_owned(),
857            ));
858        }
859
860        let parent_table_ids = {
861            let versioning_guard = self.versioning.read().await;
862            versioning_guard
863                .current_version
864                .state_table_info
865                .compaction_group_member_table_ids(parent_group_id)
866                .iter()
867                .copied()
868                .collect_vec()
869        };
870
871        if parent_table_ids == table_ids {
872            return Err(Error::CompactionGroup(format!(
873                "invalid split attempt for group {}: all member tables are moved",
874                parent_group_id
875            )));
876        }
877
878        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
879            // 1. table_ids in different cg are sorted.
880            {
881                cg_id_to_table_ids
882                    .iter()
883                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
884            }
885
886            // 2.table_ids in different cg are non-overlapping
887            {
888                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
889                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
890                assert!(table_table_ids_vec.concat().is_sorted());
891            }
892
893            // 3.table_ids belong to one and only one cg.
894            {
895                let mut all_table_ids = HashSet::new();
896                for table_ids in cg_id_to_table_ids.values() {
897                    for table_id in table_ids {
898                        assert!(all_table_ids.insert(*table_id));
899                    }
900                }
901            }
902        }
903
904        // move [3,4,5,6]
905        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
906        // split key
907        // 1. table_id = 3, vnode = 0, epoch = MAX
908        // 2. table_id = 7, vnode = 0, epoch = MAX
909
910        // The new compaction group id is always generate on the right side
911        // Hence, we return the first compaction group id as the result
912        // split 1
913        let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
914        let table_id_to_split = *table_ids.first().unwrap();
915        let mut target_compaction_group_id: CompactionGroupId = 0.into();
916        let result_vec = self
917            .split_compaction_group_impl(
918                parent_group_id,
919                table_ids,
920                table_id_to_split,
921                VirtualNode::ZERO,
922                partition_vnode_count,
923            )
924            .await?;
925        assert!(result_vec.len() <= 2);
926
927        let mut finish_move = false;
928        for (cg_id, table_ids_after_split) in result_vec {
929            if table_ids_after_split.contains(&table_id_to_split) {
930                target_compaction_group_id = cg_id;
931            }
932
933            if table_ids_after_split == table_ids {
934                finish_move = true;
935            }
936
937            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
938        }
939        check_table_ids_valid(&cg_id_to_table_ids);
940
941        if finish_move {
942            return Ok((target_compaction_group_id, cg_id_to_table_ids));
943        }
944
945        // split 2
946        // See the example above and the split rule in `split_compaction_group_impl`.
947        let table_id_to_split = *table_ids.last().unwrap();
948        let result_vec = self
949            .split_compaction_group_impl(
950                target_compaction_group_id,
951                table_ids,
952                table_id_to_split,
953                VirtualNode::MAX_REPRESENTABLE,
954                partition_vnode_count,
955            )
956            .await?;
957        assert!(result_vec.len() <= 2);
958        for (cg_id, table_ids_after_split) in result_vec {
959            if table_ids_after_split.contains(&table_id_to_split) {
960                target_compaction_group_id = cg_id;
961            }
962            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
963        }
964        check_table_ids_valid(&cg_id_to_table_ids);
965
966        Ok((target_compaction_group_id, cg_id_to_table_ids))
967    }
968}
969
970impl HummockManager {
971    async fn build_normalize_plan(&self) -> Option<NormalizePlan> {
972        let groups = self.calculate_compaction_group_statistic().await;
973        build_normalize_plan_from_group_statistics(&groups)
974    }
975
976    async fn apply_normalize_plan(&self, plan: &NormalizePlan) -> Result<bool> {
977        let (table_ids_right, boundary_table_id, new_compaction_group_id) = {
978            let mut versioning_guard = self.versioning.write().await;
979            let versioning = versioning_guard.deref_mut();
980            let mut compaction_group_manager = self.compaction_group_manager.write().await;
981
982            let groups = collect_normalize_group_statistics(
983                &versioning.current_version,
984                &compaction_group_manager,
985            )?;
986            let Some(current_plan) = build_normalize_plan_from_group_statistics(&groups) else {
987                return Ok(false);
988            };
989
990            if &current_plan != plan {
991                return Ok(false);
992            }
993
994            let (_table_ids_left, table_ids_right) = plan.split_table_ids();
995
996            let config = compaction_group_manager
997                .try_get_compaction_group_config(plan.parent_group_id)
998                .ok_or_else(|| {
999                    Error::CompactionGroup(format!(
1000                        "parent group {} config not found",
1001                        plan.parent_group_id
1002                    ))
1003                })?
1004                .compaction_config()
1005                .as_ref()
1006                .clone();
1007
1008            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
1009            let mut version = HummockVersionTransaction::new(
1010                &mut versioning.current_version,
1011                &mut versioning.hummock_version_deltas,
1012                &mut versioning.table_change_log,
1013                self.env.notification_manager(),
1014                None,
1015                &self.metrics,
1016                &self.env.opts,
1017            );
1018            let mut new_version_delta = version.new_delta();
1019            let split_key = plan.split_key();
1020            let split_sst_count = new_version_delta
1021                .latest_version()
1022                .count_new_ssts_in_group_split(plan.parent_group_id, split_key.clone());
1023            let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
1024            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
1025
1026            #[expect(deprecated)]
1027            new_version_delta.group_deltas.insert(
1028                new_compaction_group_id,
1029                GroupDeltas {
1030                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
1031                        group_config: Some(config.clone()),
1032                        group_id: new_compaction_group_id,
1033                        parent_group_id: plan.parent_group_id,
1034                        new_sst_start_id,
1035                        table_ids: vec![],
1036                        version: CompatibilityVersion::LATEST as _,
1037                        split_key: Some(split_key.into()),
1038                    }))],
1039                },
1040            );
1041
1042            new_version_delta.with_latest_version(|version, new_version_delta| {
1043                for &table_id in &table_ids_right {
1044                    let info = version
1045                        .state_table_info
1046                        .info()
1047                        .get(&table_id)
1048                        .expect("table should exist before normalize split");
1049                    assert!(
1050                        new_version_delta
1051                            .state_table_info_delta
1052                            .insert(
1053                                table_id,
1054                                PbStateTableInfoDelta {
1055                                    committed_epoch: info.committed_epoch,
1056                                    compaction_group_id: new_compaction_group_id,
1057                                }
1058                            )
1059                            .is_none()
1060                    );
1061                }
1062            });
1063            new_version_delta.pre_apply();
1064            compaction_groups_txn
1065                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
1066
1067            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
1068            versioning.mark_next_time_travel_version_snapshot();
1069
1070            (
1071                table_ids_right,
1072                plan.boundary_table_id,
1073                new_compaction_group_id,
1074            )
1075        };
1076
1077        self.cancel_expired_normalize_split_tasks(plan.parent_group_id)
1078            .await?;
1079        self.try_update_write_limits(&[plan.parent_group_id, new_compaction_group_id])
1080            .await;
1081        self.metrics
1082            .split_compaction_group_count
1083            .with_label_values(&[&plan.parent_group_id.to_string()])
1084            .inc();
1085        tracing::info!(
1086            "normalize split success: parent_group={} boundary_table_id={} moved_tables={:?} new_group_id={}",
1087            plan.parent_group_id,
1088            boundary_table_id,
1089            table_ids_right,
1090            new_compaction_group_id
1091        );
1092
1093        Ok(true)
1094    }
1095
1096    async fn cancel_expired_normalize_split_tasks(
1097        &self,
1098        parent_group_id: CompactionGroupId,
1099    ) -> Result<()> {
1100        let mut canceled_tasks = vec![];
1101        let compaction_guard = self.compaction.write().await;
1102        let mut versioning_guard = self.versioning.write().await;
1103        let versioning = versioning_guard.deref_mut();
1104        let compact_task_assignments =
1105            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
1106        let levels = versioning
1107            .current_version
1108            .get_compaction_group_levels(parent_group_id);
1109        compact_task_assignments
1110            .into_iter()
1111            .for_each(|task_assignment| {
1112                let task = &task_assignment.compact_task;
1113                if is_compaction_task_expired(
1114                    task.compaction_group_version_id,
1115                    levels.compaction_group_version_id,
1116                ) {
1117                    canceled_tasks.push(ReportTask {
1118                        task_id: task.task_id,
1119                        task_status: TaskStatus::ManualCanceled,
1120                        table_stats_change: HashMap::default(),
1121                        sorted_output_ssts: vec![],
1122                        object_timestamps: HashMap::default(),
1123                    });
1124                }
1125            });
1126        canceled_tasks.sort_by_key(|task| task.task_id);
1127        canceled_tasks.dedup_by_key(|task| task.task_id);
1128
1129        if !canceled_tasks.is_empty() {
1130            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
1131                .await?;
1132        }
1133
1134        Ok(())
1135    }
1136
1137    /// Normalize overlapping adjacent compaction groups by split only.
1138    ///
1139    /// The algorithm repeatedly scans adjacent groups by `min(table_id)` and if
1140    /// `max(left) >= min(right)`, it splits `left` at the first table id `>= min(right)`.
1141    /// Each step is planned from a read snapshot, then revalidated and applied with a short write
1142    /// transaction.
1143    pub async fn normalize_overlapping_compaction_groups(&self) -> Result<usize> {
1144        self.normalize_overlapping_compaction_groups_with_limit(usize::MAX)
1145            .await
1146    }
1147
1148    pub async fn normalize_overlapping_compaction_groups_with_limit(
1149        &self,
1150        max_splits: usize,
1151    ) -> Result<usize> {
1152        let mut split_count = 0usize;
1153        while split_count < max_splits {
1154            let Some(plan) = self.build_normalize_plan().await else {
1155                break;
1156            };
1157
1158            if !self.apply_normalize_plan(&plan).await? {
1159                tracing::debug!(
1160                    parent_group_id = %plan.parent_group_id,
1161                    boundary_table_id = %plan.boundary_table_id,
1162                    "normalize plan became stale before apply"
1163                );
1164                break;
1165            }
1166            split_count += 1;
1167        }
1168
1169        Ok(split_count)
1170    }
1171
1172    /// Split the compaction group if the group is too large or contains high throughput tables.
1173    pub async fn try_split_compaction_group(
1174        &self,
1175        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1176        group: CompactionGroupStatistic,
1177    ) {
1178        if group
1179            .compaction_group_config
1180            .compaction_config
1181            .disable_auto_group_scheduling
1182            .unwrap_or(false)
1183        {
1184            return;
1185        }
1186        // split high throughput table to dedicated compaction group
1187        for (table_id, table_size) in &group.table_statistic {
1188            self.try_move_high_throughput_table_to_dedicated_cg(
1189                table_write_throughput_statistic_manager,
1190                *table_id,
1191                table_size,
1192                group.group_id,
1193            )
1194            .await;
1195        }
1196
1197        // split the huge group to multiple groups
1198        self.try_split_huge_compaction_group(group).await;
1199    }
1200
1201    /// Try to move the high throughput table to a dedicated compaction group.
1202    pub async fn try_move_high_throughput_table_to_dedicated_cg(
1203        &self,
1204        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1205        table_id: TableId,
1206        _table_size: &u64,
1207        parent_group_id: CompactionGroupId,
1208    ) {
1209        let mut table_throughput = table_write_throughput_statistic_manager
1210            .get_table_throughput_descending(
1211                table_id,
1212                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
1213            )
1214            .peekable();
1215
1216        if table_throughput.peek().is_none() {
1217            return;
1218        }
1219
1220        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
1221            table_throughput,
1222            self.env.opts.table_high_write_throughput_threshold,
1223            self.env
1224                .opts
1225                .table_stat_high_write_throughput_ratio_for_split,
1226        );
1227
1228        // do not split a table to dedicated compaction group if it is not high write throughput
1229        if !is_high_write_throughput {
1230            return;
1231        }
1232
1233        let ret = self
1234            .move_state_tables_to_dedicated_compaction_group(
1235                parent_group_id,
1236                &[table_id],
1237                Some(self.env.opts.partition_vnode_count),
1238            )
1239            .await;
1240        match ret {
1241            Ok(split_result) => {
1242                tracing::info!(
1243                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
1244                    table_id,
1245                    parent_group_id,
1246                    self.env.opts.partition_vnode_count,
1247                    split_result
1248                );
1249            }
1250            Err(e) => {
1251                tracing::info!(
1252                    error = %e.as_report(),
1253                    "failed to split state table [{}] from group-{}",
1254                    table_id,
1255                    parent_group_id,
1256                )
1257            }
1258        }
1259    }
1260
1261    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
1262        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
1263            * self.env.opts.split_group_size_ratio) as u64;
1264        let is_huge_hybrid_group =
1265            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
1266        if is_huge_hybrid_group {
1267            let mut accumulated_size = 0;
1268            let mut table_ids = Vec::default();
1269            for (table_id, table_size) in &group.table_statistic {
1270                accumulated_size += table_size;
1271                table_ids.push(*table_id);
1272                // split if the accumulated size is greater than half of the group size
1273                // avoid split a small table to dedicated compaction group and trigger multiple merge
1274                assert!(table_ids.is_sorted());
1275                let remaining_size = group.group_size.saturating_sub(accumulated_size);
1276                if accumulated_size > group_max_size / 2
1277                    && remaining_size > 0
1278                    && table_ids.len() < group.table_statistic.len()
1279                {
1280                    let ret = self
1281                        .move_state_tables_to_dedicated_compaction_group(
1282                            group.group_id,
1283                            &table_ids,
1284                            None,
1285                        )
1286                        .await;
1287                    match ret {
1288                        Ok(split_result) => {
1289                            tracing::info!(
1290                                "split_huge_compaction_group success {:?}",
1291                                split_result
1292                            );
1293                            self.metrics
1294                                .split_compaction_group_count
1295                                .with_label_values(&[&group.group_id.to_string()])
1296                                .inc();
1297                            return;
1298                        }
1299                        Err(e) => {
1300                            tracing::error!(
1301                                error = %e.as_report(),
1302                                "failed to split_huge_compaction_group table {:?} from group-{}",
1303                                table_ids,
1304                                group.group_id
1305                            );
1306
1307                            return;
1308                        }
1309                    }
1310                }
1311            }
1312        }
1313    }
1314
1315    pub async fn try_merge_compaction_group(
1316        &self,
1317        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1318        group: &CompactionGroupStatistic,
1319        next_group: &CompactionGroupStatistic,
1320        created_tables: &HashSet<TableId>,
1321    ) -> Result<()> {
1322        GroupMergeValidator::validate_group_merge(
1323            group,
1324            next_group,
1325            created_tables,
1326            table_write_throughput_statistic_manager,
1327            &self.env.opts,
1328            &self.versioning,
1329        )
1330        .await?;
1331
1332        let result = self
1333            .merge_compaction_group(group.group_id, next_group.group_id)
1334            .await;
1335
1336        match &result {
1337            Ok(()) => {
1338                tracing::info!(
1339                    "merge group-{} to group-{}",
1340                    next_group.group_id,
1341                    group.group_id,
1342                );
1343
1344                self.metrics
1345                    .merge_compaction_group_count
1346                    .with_label_values(&[&group.group_id.to_string()])
1347                    .inc();
1348            }
1349            Err(e) => {
1350                tracing::info!(
1351                    error = %e.as_report(),
1352                    "failed to merge group-{} group-{}",
1353                    next_group.group_id,
1354                    group.group_id,
1355                );
1356            }
1357        }
1358
1359        result
1360    }
1361}
1362
1363#[derive(Debug, Default)]
1364struct GroupMergeValidator {}
1365
1366impl GroupMergeValidator {
1367    /// Check if two groups have compatible compaction configs for merging.
1368    /// Ignores `split_weight_by_vnode` since it's per-table and will be reset after merge.
1369    fn is_merge_compatible_by_semantics(
1370        group: &CompactionGroupStatistic,
1371        next_group: &CompactionGroupStatistic,
1372    ) -> bool {
1373        let (mut left, mut right) = (
1374            group
1375                .compaction_group_config
1376                .compaction_config
1377                .as_ref()
1378                .clone(),
1379            next_group
1380                .compaction_group_config
1381                .compaction_config
1382                .as_ref()
1383                .clone(),
1384        );
1385        left.split_weight_by_vnode = 0;
1386        right.split_weight_by_vnode = 0;
1387        left == right
1388    }
1389
1390    /// Check if the table is high write throughput with the given threshold and ratio.
1391    pub fn is_table_high_write_throughput(
1392        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1393        threshold: u64,
1394        high_write_throughput_ratio: f64,
1395    ) -> bool {
1396        let mut sample_size = 0;
1397        let mut high_write_throughput_count = 0;
1398        for statistic in table_throughput {
1399            sample_size += 1;
1400            if statistic.throughput > threshold {
1401                high_write_throughput_count += 1;
1402            }
1403        }
1404
1405        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
1406    }
1407
1408    pub fn is_table_low_write_throughput(
1409        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1410        threshold: u64,
1411        low_write_throughput_ratio: f64,
1412    ) -> bool {
1413        let mut sample_size = 0;
1414        let mut low_write_throughput_count = 0;
1415        for statistic in table_throughput {
1416            sample_size += 1;
1417            if statistic.throughput <= threshold {
1418                low_write_throughput_count += 1;
1419            }
1420        }
1421
1422        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
1423    }
1424
1425    fn check_is_low_write_throughput_compaction_group(
1426        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1427        group: &CompactionGroupStatistic,
1428        opts: &Arc<MetaOpts>,
1429    ) -> bool {
1430        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1431        for table_id in group.table_statistic.keys() {
1432            let mut table_throughput = table_write_throughput_statistic_manager
1433                .get_table_throughput_descending(
1434                    *table_id,
1435                    opts.table_stat_throuput_window_seconds_for_merge as i64,
1436                )
1437                .peekable();
1438            if table_throughput.peek().is_none() {
1439                continue;
1440            }
1441
1442            table_with_statistic.push(table_throughput);
1443        }
1444
1445        // if all tables in the group do not have enough statistics, return true
1446        if table_with_statistic.is_empty() {
1447            return true;
1448        }
1449
1450        // check if all tables in the group are low write throughput with enough statistics
1451        table_with_statistic.into_iter().all(|table_throughput| {
1452            Self::is_table_low_write_throughput(
1453                table_throughput,
1454                opts.table_low_write_throughput_threshold,
1455                opts.table_stat_low_write_throughput_ratio_for_merge,
1456            )
1457        })
1458    }
1459
1460    fn check_is_creating_compaction_group(
1461        group: &CompactionGroupStatistic,
1462        created_tables: &HashSet<TableId>,
1463    ) -> bool {
1464        group
1465            .table_statistic
1466            .keys()
1467            .any(|table_id| !created_tables.contains(table_id))
1468    }
1469
1470    async fn validate_group_merge(
1471        group: &CompactionGroupStatistic,
1472        next_group: &CompactionGroupStatistic,
1473        created_tables: &HashSet<TableId>,
1474        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1475        opts: &Arc<MetaOpts>,
1476        versioning: &MonitoredRwLock<Versioning>,
1477    ) -> Result<()> {
1478        // TODO: remove this check after refactor group id
1479        if (group.group_id == StaticCompactionGroupId::StateDefault
1480            && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1481            || (group.group_id == StaticCompactionGroupId::MaterializedView
1482                && next_group.group_id == StaticCompactionGroupId::StateDefault)
1483        {
1484            return Err(Error::CompactionGroup(format!(
1485                "group-{} and group-{} are both StaticCompactionGroupId",
1486                group.group_id, next_group.group_id
1487            )));
1488        }
1489
1490        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1491            return Err(Error::CompactionGroup(format!(
1492                "group-{} or group-{} is empty",
1493                group.group_id, next_group.group_id
1494            )));
1495        }
1496
1497        // Check non-overlapping table ids early to avoid acquiring heavyweight write locks
1498        // in merge_compaction_group_impl only to fail at the overlap check.
1499        // Sort both sides and ensure table_ids_1 has the smaller first element,
1500        // then reject if table_ids_1's last element >= table_ids_2's first element (overlap).
1501        {
1502            let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1503            let mut table_ids_2: Vec<TableId> =
1504                next_group.table_statistic.keys().cloned().collect_vec();
1505            table_ids_1.sort();
1506            table_ids_2.sort();
1507            if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1508                std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1509            }
1510            if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1511                return Err(Error::CompactionGroup(format!(
1512                    "group-{} and group-{} have overlapping table id ranges, not mergeable",
1513                    group.group_id, next_group.group_id
1514                )));
1515            }
1516        }
1517
1518        if group
1519            .compaction_group_config
1520            .compaction_config
1521            .disable_auto_group_scheduling
1522            .unwrap_or(false)
1523            || next_group
1524                .compaction_group_config
1525                .compaction_config
1526                .disable_auto_group_scheduling
1527                .unwrap_or(false)
1528        {
1529            return Err(Error::CompactionGroup(format!(
1530                "group-{} or group-{} disable_auto_group_scheduling",
1531                group.group_id, next_group.group_id
1532            )));
1533        }
1534
1535        // Keep merge compatibility as a feature, but ignore split_weight_by_vnode, because it is
1536        // only used for per-table split behavior and will be reset after merge.
1537        if !Self::is_merge_compatible_by_semantics(group, next_group) {
1538            let left_config = group.compaction_group_config.compaction_config.as_ref();
1539            let right_config = next_group
1540                .compaction_group_config
1541                .compaction_config
1542                .as_ref();
1543
1544            tracing::warn!(
1545                group_id = %group.group_id,
1546                next_group_id = %next_group.group_id,
1547                left_config = ?left_config,
1548                right_config = ?right_config,
1549                "compaction config semantic mismatch detected while merging compaction groups"
1550            );
1551
1552            return Err(Error::CompactionGroup(format!(
1553                "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1554                group.group_id, next_group.group_id, left_config, right_config
1555            )));
1556        }
1557
1558        // do not merge the compaction group which is creating
1559        if Self::check_is_creating_compaction_group(group, created_tables) {
1560            return Err(Error::CompactionGroup(format!(
1561                "Cannot merge creating group {} next_group {}",
1562                group.group_id, next_group.group_id
1563            )));
1564        }
1565
1566        // do not merge high throughput group
1567        if !Self::check_is_low_write_throughput_compaction_group(
1568            table_write_throughput_statistic_manager,
1569            group,
1570            opts,
1571        ) {
1572            return Err(Error::CompactionGroup(format!(
1573                "Cannot merge high throughput group {} next_group {}",
1574                group.group_id, next_group.group_id
1575            )));
1576        }
1577
1578        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1579            * opts.split_group_size_ratio) as u64;
1580
1581        if (group.group_size + next_group.group_size) > size_limit {
1582            return Err(Error::CompactionGroup(format!(
1583                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1584                group.group_id,
1585                group.group_size,
1586                next_group.group_id,
1587                next_group.group_size,
1588                size_limit
1589            )));
1590        }
1591
1592        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1593            return Err(Error::CompactionGroup(format!(
1594                "Cannot merge creating group {} next group {}",
1595                group.group_id, next_group.group_id
1596            )));
1597        }
1598
1599        if !Self::check_is_low_write_throughput_compaction_group(
1600            table_write_throughput_statistic_manager,
1601            next_group,
1602            opts,
1603        ) {
1604            return Err(Error::CompactionGroup(format!(
1605                "Cannot merge high throughput group {} next group {}",
1606                group.group_id, next_group.group_id
1607            )));
1608        }
1609
1610        {
1611            // Avoid merge when the group is in emergency state
1612            let versioning_guard = versioning.read().await;
1613            let levels = &versioning_guard.current_version.levels;
1614            if !levels.contains_key(&group.group_id) {
1615                return Err(Error::CompactionGroup(format!(
1616                    "Cannot merge group {} not exist",
1617                    group.group_id
1618                )));
1619            }
1620
1621            if !levels.contains_key(&next_group.group_id) {
1622                return Err(Error::CompactionGroup(format!(
1623                    "Cannot merge next group {} not exist",
1624                    next_group.group_id
1625                )));
1626            }
1627
1628            let group_levels = versioning_guard
1629                .current_version
1630                .get_compaction_group_levels(group.group_id);
1631
1632            let next_group_levels = versioning_guard
1633                .current_version
1634                .get_compaction_group_levels(next_group.group_id);
1635
1636            let group_state = GroupStateValidator::group_state(
1637                group_levels,
1638                group.compaction_group_config.compaction_config().deref(),
1639            );
1640
1641            if group_state.is_write_stop() || group_state.is_emergency() {
1642                return Err(Error::CompactionGroup(format!(
1643                    "Cannot merge write limit group {} next group {}",
1644                    group.group_id, next_group.group_id
1645                )));
1646            }
1647
1648            let next_group_state = GroupStateValidator::group_state(
1649                next_group_levels,
1650                next_group
1651                    .compaction_group_config
1652                    .compaction_config()
1653                    .deref(),
1654            );
1655
1656            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1657                return Err(Error::CompactionGroup(format!(
1658                    "Cannot merge write limit next group {} group {}",
1659                    next_group.group_id, group.group_id
1660                )));
1661            }
1662
1663            // check whether the group is in the write stop state after merge
1664            let l0_sub_level_count_after_merge =
1665                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1666            if GroupStateValidator::write_stop_sub_level_count(
1667                (l0_sub_level_count_after_merge as f64
1668                    * opts.compaction_group_merge_dimension_threshold) as usize,
1669                group.compaction_group_config.compaction_config().deref(),
1670            ) {
1671                return Err(Error::CompactionGroup(format!(
1672                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1673                    group.group_id, next_group.group_id
1674                )));
1675            }
1676
1677            let l0_file_count_after_merge = group_levels
1678                .l0
1679                .sub_levels
1680                .iter()
1681                .chain(next_group_levels.l0.sub_levels.iter())
1682                .map(|level| level.table_infos.len())
1683                .sum::<usize>();
1684            if GroupStateValidator::write_stop_l0_file_count(
1685                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1686                    as usize,
1687                group.compaction_group_config.compaction_config().deref(),
1688            ) {
1689                return Err(Error::CompactionGroup(format!(
1690                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1691                    next_group.group_id, group.group_id
1692                )));
1693            }
1694
1695            let l0_size_after_merge =
1696                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1697
1698            if GroupStateValidator::write_stop_l0_size(
1699                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1700                    as u64,
1701                group.compaction_group_config.compaction_config().deref(),
1702            ) {
1703                return Err(Error::CompactionGroup(format!(
1704                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1705                    next_group.group_id, group.group_id
1706                )));
1707            }
1708
1709            // check whether the group is in the emergency state after merge
1710            if GroupStateValidator::emergency_l0_file_count(
1711                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1712                    as usize,
1713                group.compaction_group_config.compaction_config().deref(),
1714            ) {
1715                return Err(Error::CompactionGroup(format!(
1716                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1717                    group.group_id, next_group.group_id
1718                )));
1719            }
1720        }
1721
1722        Ok(())
1723    }
1724}