risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas, HummockVersion};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::compaction_group_manager::CompactionGroupManager;
38use super::{CompactionGroupStatistic, GroupStateValidator};
39use crate::hummock::error::{Error, Result};
40use crate::hummock::manager::transaction::HummockVersionTransaction;
41use crate::hummock::manager::versioning::Versioning;
42use crate::hummock::manager::{HummockManager, commit_multi_var};
43use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
44use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
45use crate::hummock::table_write_throughput_statistic::{
46    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
47};
48use crate::manager::MetaOpts;
49
50#[derive(Debug, PartialEq, Eq)]
51struct NormalizePlan {
52    parent_group_id: CompactionGroupId,
53    parent_table_ids: Vec<StateTableId>,
54    boundary_table_id: StateTableId,
55}
56
57impl NormalizePlan {
58    fn split_key(&self) -> Bytes {
59        group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO)
60            .encode()
61            .into()
62    }
63
64    fn split_table_ids(&self) -> (Vec<StateTableId>, Vec<StateTableId>) {
65        let split_full_key =
66            group_split::build_split_full_key(self.boundary_table_id, VirtualNode::ZERO);
67        let (table_ids_left, table_ids_right) =
68            group_split::split_table_ids_with_table_id_and_vnode(
69                &self.parent_table_ids,
70                split_full_key.user_key.table_id,
71                split_full_key.user_key.get_vnode_id(),
72            );
73        assert!(!table_ids_left.is_empty() && !table_ids_right.is_empty());
74        (table_ids_left, table_ids_right)
75    }
76}
77
78fn gen_normalize_plan(
79    left: &CompactionGroupStatistic,
80    right: &CompactionGroupStatistic,
81) -> Option<NormalizePlan> {
82    let left_table_ids = left.table_statistic.keys().copied().collect_vec();
83
84    if left_table_ids.len() <= 1 {
85        return None;
86    }
87
88    let left_max = *left_table_ids.last().unwrap();
89    let right_min = *right.table_statistic.keys().next().unwrap();
90    if left_max < right_min {
91        return None;
92    }
93
94    let boundary_index = left_table_ids.partition_point(|&table_id| table_id < right_min);
95    if boundary_index == 0 || boundary_index >= left_table_ids.len() {
96        return None;
97    }
98    let boundary_table_id = left_table_ids[boundary_index];
99
100    Some(NormalizePlan {
101        parent_group_id: left.group_id,
102        parent_table_ids: left_table_ids,
103        boundary_table_id,
104    })
105}
106
107fn build_normalize_plan_from_group_statistics(
108    groups: &[CompactionGroupStatistic],
109) -> Option<NormalizePlan> {
110    // `calculate_compaction_group_statistic()` iterates all version levels, so newly created or
111    // transiently empty groups can appear here without any member tables.
112    let mut groups = groups
113        .iter()
114        .filter(|group| !group.table_statistic.is_empty())
115        .collect_vec();
116    groups.sort_by_key(|group| *group.table_statistic.keys().next().unwrap());
117
118    groups
119        .split(|group| {
120            group
121                .compaction_group_config
122                .compaction_config
123                .disable_auto_group_scheduling
124                .unwrap_or(false)
125        })
126        .find_map(|segment| {
127            segment
128                .windows(2)
129                .find_map(|pair| gen_normalize_plan(pair[0], pair[1]))
130        })
131}
132
133fn collect_normalize_group_statistics(
134    version: &HummockVersion,
135    compaction_group_manager: &CompactionGroupManager,
136) -> Result<Vec<CompactionGroupStatistic>> {
137    let mut groups = vec![];
138    for group_id in version.levels.keys() {
139        let table_ids = version
140            .state_table_info
141            .compaction_group_member_table_ids(*group_id)
142            .iter()
143            .copied()
144            .collect_vec();
145        if table_ids.is_empty() {
146            continue;
147        }
148
149        let group_config = compaction_group_manager
150            .try_get_compaction_group_config(*group_id)
151            .ok_or_else(|| {
152                Error::CompactionGroup(format!(
153                    "group {} config not found during normalize",
154                    group_id
155                ))
156            })?;
157        groups.push(CompactionGroupStatistic {
158            group_id: *group_id,
159            group_size: 0,
160            table_statistic: table_ids
161                .into_iter()
162                .map(|table_id| (table_id, 0))
163                .collect(),
164            compaction_group_config: group_config,
165        });
166    }
167    Ok(groups)
168}
169
170impl HummockManager {
171    pub async fn merge_compaction_group(
172        &self,
173        group_1: CompactionGroupId,
174        group_2: CompactionGroupId,
175    ) -> Result<()> {
176        self.merge_compaction_group_impl(group_1, group_2, None)
177            .await
178    }
179
180    pub async fn merge_compaction_group_for_test(
181        &self,
182        group_1: CompactionGroupId,
183        group_2: CompactionGroupId,
184        created_tables: HashSet<TableId>,
185    ) -> Result<()> {
186        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
187            .await
188    }
189
190    pub async fn merge_compaction_group_impl(
191        &self,
192        group_1: CompactionGroupId,
193        group_2: CompactionGroupId,
194        created_tables: Option<HashSet<TableId>>,
195    ) -> Result<()> {
196        let compaction_guard = self.compaction.write().await;
197        let mut versioning_guard = self.versioning.write().await;
198        let versioning = versioning_guard.deref_mut();
199        // Validate parameters.
200        if !versioning.current_version.levels.contains_key(&group_1) {
201            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
202        }
203
204        if !versioning.current_version.levels.contains_key(&group_2) {
205            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
206        }
207
208        let state_table_info = versioning.current_version.state_table_info.clone();
209        let mut member_table_ids_1 = state_table_info
210            .compaction_group_member_table_ids(group_1)
211            .iter()
212            .cloned()
213            .collect_vec();
214
215        if member_table_ids_1.is_empty() {
216            return Err(Error::CompactionGroup(format!(
217                "group_1 {} is empty",
218                group_1
219            )));
220        }
221
222        let mut member_table_ids_2 = state_table_info
223            .compaction_group_member_table_ids(group_2)
224            .iter()
225            .cloned()
226            .collect_vec();
227
228        if member_table_ids_2.is_empty() {
229            return Err(Error::CompactionGroup(format!(
230                "group_2 {} is empty",
231                group_2
232            )));
233        }
234
235        debug_assert!(!member_table_ids_1.is_empty());
236        debug_assert!(!member_table_ids_2.is_empty());
237        assert!(member_table_ids_1.is_sorted());
238        assert!(member_table_ids_2.is_sorted());
239
240        let created_tables = if let Some(created_tables) = created_tables {
241            // if the created_tables is provided, use it directly, most for test
242            #[expect(clippy::assertions_on_constants)]
243            {
244                assert!(cfg!(debug_assertions));
245            }
246            created_tables
247        } else {
248            match self.metadata_manager.get_created_table_ids().await {
249                Ok(created_tables) => HashSet::from_iter(created_tables),
250                Err(err) => {
251                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
252                    return Err(Error::CompactionGroup(format!(
253                        "merge group_1 {} group_2 {} failed to fetch created table ids",
254                        group_1, group_2
255                    )));
256                }
257            }
258        };
259
260        fn contains_creating_table(
261            table_ids: &Vec<TableId>,
262            created_tables: &HashSet<TableId>,
263        ) -> bool {
264            table_ids
265                .iter()
266                .any(|table_id| !created_tables.contains(table_id))
267        }
268
269        // do not merge the compaction group which is creating
270        if contains_creating_table(&member_table_ids_1, &created_tables)
271            || contains_creating_table(&member_table_ids_2, &created_tables)
272        {
273            return Err(Error::CompactionGroup(format!(
274                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
275                group_1, group_2, member_table_ids_1, member_table_ids_2
276            )));
277        }
278
279        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
280        let (left_group_id, right_group_id) =
281            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
282                (group_1, group_2)
283            } else {
284                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
285                (group_2, group_1)
286            };
287
288        // We can only merge two groups with non-overlapping member table ids.
289        // After the swap above, member_table_ids_1 has the smaller first element.
290        // If the last element of member_table_ids_1 >= the first element of member_table_ids_2,
291        // the two groups' table id ranges overlap and cannot be merged.
292        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
293            return Err(Error::CompactionGroup(format!(
294                "invalid merge group_1 {} group_2 {}: table id ranges overlap",
295                left_group_id, right_group_id
296            )));
297        }
298
299        let combined_member_table_ids = member_table_ids_1
300            .iter()
301            .chain(member_table_ids_2.iter())
302            .collect_vec();
303        assert!(combined_member_table_ids.is_sorted());
304
305        // check duplicated sst_id
306        let mut sst_id_set = HashSet::new();
307        for sst_id in versioning
308            .current_version
309            .get_sst_ids_by_group_id(left_group_id)
310            .chain(
311                versioning
312                    .current_version
313                    .get_sst_ids_by_group_id(right_group_id),
314            )
315        {
316            if !sst_id_set.insert(sst_id) {
317                return Err(Error::CompactionGroup(format!(
318                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
319                    left_group_id, right_group_id, sst_id
320                )));
321            }
322        }
323
324        // check branched sst on non-overlap level
325        {
326            let left_levels = versioning
327                .current_version
328                .get_compaction_group_levels(group_1);
329
330            let right_levels = versioning
331                .current_version
332                .get_compaction_group_levels(group_2);
333
334            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
335            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
336            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
337            for level_idx in 1..=max_level {
338                let left_level = left_levels.get_level(level_idx);
339                let right_level = right_levels.get_level(level_idx);
340                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
341                    continue;
342                }
343
344                let left_last_sst = left_level.table_infos.last().unwrap().clone();
345                let right_first_sst = right_level.table_infos.first().unwrap().clone();
346                let left_sst_id = left_last_sst.sst_id;
347                let right_sst_id = right_first_sst.sst_id;
348                let left_obj_id = left_last_sst.object_id;
349                let right_obj_id = right_first_sst.object_id;
350
351                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
352                if !can_concat(&[left_last_sst, right_first_sst]) {
353                    return Err(Error::CompactionGroup(format!(
354                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
355                        left_group_id,
356                        right_group_id,
357                        level_idx,
358                        left_sst_id,
359                        right_sst_id,
360                        left_obj_id,
361                        right_obj_id
362                    )));
363                }
364            }
365        }
366
367        let mut version = HummockVersionTransaction::new(
368            &mut versioning.current_version,
369            &mut versioning.hummock_version_deltas,
370            &mut versioning.table_change_log,
371            self.env.notification_manager(),
372            None,
373            &self.metrics,
374            &self.env.opts,
375        );
376        let mut new_version_delta = version.new_delta();
377
378        let target_compaction_group_id = {
379            // merge right_group_id to left_group_id and remove right_group_id
380            new_version_delta.group_deltas.insert(
381                left_group_id,
382                GroupDeltas {
383                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
384                        left_group_id,
385                        right_group_id,
386                    })],
387                },
388            );
389            left_group_id
390        };
391
392        // TODO: remove compaciton group_id from state_table_info
393        // rewrite compaction_group_id for all tables
394        new_version_delta.with_latest_version(|version, new_version_delta| {
395            for &table_id in combined_member_table_ids {
396                let info = version
397                    .state_table_info
398                    .info()
399                    .get(&table_id)
400                    .expect("have check exist previously");
401                assert!(
402                    new_version_delta
403                        .state_table_info_delta
404                        .insert(
405                            table_id,
406                            PbStateTableInfoDelta {
407                                committed_epoch: info.committed_epoch,
408                                compaction_group_id: target_compaction_group_id,
409                            }
410                        )
411                        .is_none()
412                );
413            }
414        });
415
416        {
417            let mut compaction_group_manager = self.compaction_group_manager.write().await;
418            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
419
420            // for metrics reclaim
421            {
422                let right_group_max_level = new_version_delta
423                    .latest_version()
424                    .get_compaction_group_levels(right_group_id)
425                    .levels
426                    .len();
427
428                remove_compaction_group_in_sst_stat(
429                    &self.metrics,
430                    right_group_id,
431                    right_group_max_level,
432                );
433            }
434
435            // clean up compaction schedule state for the merged group
436            self.compaction_state
437                .remove_compaction_group(right_group_id);
438
439            // clear `partition_vnode_count` for the hybrid group
440            {
441                if let Err(err) = compaction_groups_txn.update_compaction_config(
442                    &[left_group_id],
443                    &[MutableConfig::SplitWeightByVnode(0)], // default
444                ) {
445                    tracing::error!(
446                        error = %err.as_report(),
447                        "failed to update compaction config for group-{}",
448                        left_group_id
449                    );
450                }
451            }
452
453            new_version_delta.pre_apply();
454
455            // remove right_group_id
456            compaction_groups_txn.remove(right_group_id);
457            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
458        }
459
460        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
461        versioning.mark_next_time_travel_version_snapshot();
462
463        // cancel tasks
464        let mut canceled_tasks = vec![];
465        // after merge, all tasks in right_group_id should be canceled
466        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
467        let compact_task_assignments =
468            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
469        compact_task_assignments
470            .into_iter()
471            .for_each(|task_assignment| {
472                if let Some(task) = task_assignment.compact_task.as_ref() {
473                    assert_eq!(task.compaction_group_id, right_group_id);
474                    canceled_tasks.push(ReportTask {
475                        task_id: task.task_id,
476                        task_status: TaskStatus::ManualCanceled,
477                        table_stats_change: HashMap::default(),
478                        sorted_output_ssts: vec![],
479                        object_timestamps: HashMap::default(),
480                    });
481                }
482            });
483
484        if !canceled_tasks.is_empty() {
485            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
486                .await?;
487        }
488
489        self.metrics
490            .merge_compaction_group_count
491            .with_label_values(&[&left_group_id.to_string()])
492            .inc();
493
494        Ok(())
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use std::collections::BTreeMap;
501
502    use risingwave_hummock_sdk::CompactionGroupId;
503    use risingwave_pb::hummock::CompactionConfig;
504
505    use super::{
506        CompactionGroupStatistic, NormalizePlan, build_normalize_plan_from_group_statistics,
507        gen_normalize_plan,
508    };
509    use crate::hummock::model::CompactionGroup;
510
511    fn group(
512        group_id: CompactionGroupId,
513        table_ids: &[u32],
514        disable_auto_group_scheduling: bool,
515    ) -> CompactionGroupStatistic {
516        let config = CompactionConfig {
517            disable_auto_group_scheduling: Some(disable_auto_group_scheduling),
518            ..Default::default()
519        };
520        CompactionGroupStatistic {
521            group_id,
522            group_size: 0,
523            table_statistic: table_ids
524                .iter()
525                .copied()
526                .map(|table_id| (table_id.into(), 0_u64))
527                .collect::<BTreeMap<_, _>>(),
528            compaction_group_config: CompactionGroup::new(group_id, config),
529        }
530    }
531
532    #[test]
533    fn test_gen_normalize_plan_returns_none_for_single_table_group() {
534        let left = group(1.into(), &[10], false);
535        let right = group(2.into(), &[5, 20], false);
536
537        assert_eq!(None, gen_normalize_plan(&left, &right));
538    }
539
540    #[test]
541    fn test_gen_normalize_plan_returns_none_for_non_overlapping_groups() {
542        let left = group(1.into(), &[1, 2, 3], false);
543        let right = group(2.into(), &[4, 5, 6], false);
544
545        assert_eq!(None, gen_normalize_plan(&left, &right));
546    }
547
548    #[test]
549    fn test_gen_normalize_plan_returns_none_when_boundary_cannot_split_parent() {
550        let left = group(1.into(), &[5, 6, 7], false);
551        let right = group(2.into(), &[4, 8], false);
552
553        assert_eq!(None, gen_normalize_plan(&left, &right));
554    }
555
556    #[test]
557    fn test_gen_normalize_plan_generates_expected_boundary() {
558        let left = group(1.into(), &[1, 4, 7], false);
559        let right = group(2.into(), &[2, 5, 8], false);
560
561        assert_eq!(
562            Some(NormalizePlan {
563                parent_group_id: 1.into(),
564                parent_table_ids: vec![1.into(), 4.into(), 7.into()],
565                boundary_table_id: 4.into(),
566            }),
567            gen_normalize_plan(&left, &right)
568        );
569    }
570
571    #[test]
572    fn test_build_normalize_plan_skips_disabled_boundary_and_continues_later_segment() {
573        let groups = vec![
574            group(1.into(), &[1, 4, 7], false),
575            group(2.into(), &[2, 5, 8], true),
576            group(3.into(), &[10, 13, 16], false),
577            group(4.into(), &[11, 14, 17], false),
578        ];
579
580        assert_eq!(
581            Some(NormalizePlan {
582                parent_group_id: 3.into(),
583                parent_table_ids: vec![10.into(), 13.into(), 16.into()],
584                boundary_table_id: 13.into(),
585            }),
586            build_normalize_plan_from_group_statistics(&groups)
587        );
588    }
589}
590
591impl HummockManager {
592    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
593    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
594    /// The split will follow the following rules
595    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
596    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
597    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
598    ///   - `parent_group_id`: the `group_id` to split
599    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
600    ///   - `table_id_to_split`: the `table_id` to split
601    ///   - `vnode_to_split`: the `vnode` to split
602    ///   - `partition_vnode_count`: the partition count for the single table group if need
603    async fn split_compaction_group_impl(
604        &self,
605        parent_group_id: CompactionGroupId,
606        split_table_ids: &[StateTableId],
607        table_id_to_split: StateTableId,
608        vnode_to_split: VirtualNode,
609        partition_vnode_count: Option<u32>,
610    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
611        let mut result = vec![];
612        let compaction_guard = self.compaction.write().await;
613        let mut versioning_guard = self.versioning.write().await;
614        let versioning = versioning_guard.deref_mut();
615        // Validate parameters.
616        if !versioning
617            .current_version
618            .levels
619            .contains_key(&parent_group_id)
620        {
621            return Err(Error::CompactionGroup(format!(
622                "invalid group {}",
623                parent_group_id
624            )));
625        }
626
627        let member_table_ids = versioning
628            .current_version
629            .state_table_info
630            .compaction_group_member_table_ids(parent_group_id)
631            .iter()
632            .copied()
633            .collect::<BTreeSet<_>>();
634
635        if !member_table_ids.contains(&table_id_to_split) {
636            return Err(Error::CompactionGroup(format!(
637                "table {} doesn't in group {}",
638                table_id_to_split, parent_group_id
639            )));
640        }
641
642        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
643
644        // change to vec for partition
645        let table_ids = member_table_ids.into_iter().collect_vec();
646        if table_ids == split_table_ids {
647            return Err(Error::CompactionGroup(format!(
648                "invalid split attempt for group {}: all member tables are moved",
649                parent_group_id
650            )));
651        }
652        // avoid decode split_key when caller is aware of the table_id and vnode
653        let (table_ids_left, table_ids_right) =
654            group_split::split_table_ids_with_table_id_and_vnode(
655                &table_ids,
656                split_full_key.user_key.table_id,
657                split_full_key.user_key.get_vnode_id(),
658            );
659        if table_ids_left.is_empty() || table_ids_right.is_empty() {
660            // not need to split group if all tables are in the same side
661            if !table_ids_left.is_empty() {
662                result.push((parent_group_id, table_ids_left));
663            }
664
665            if !table_ids_right.is_empty() {
666                result.push((parent_group_id, table_ids_right));
667            }
668            return Ok(result);
669        }
670
671        result.push((parent_group_id, table_ids_left));
672
673        let split_key: Bytes = split_full_key.encode().into();
674
675        let mut version = HummockVersionTransaction::new(
676            &mut versioning.current_version,
677            &mut versioning.hummock_version_deltas,
678            &mut versioning.table_change_log,
679            self.env.notification_manager(),
680            None,
681            &self.metrics,
682            &self.env.opts,
683        );
684        let mut new_version_delta = version.new_delta();
685
686        let split_sst_count = new_version_delta
687            .latest_version()
688            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
689
690        let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
691        let (new_compaction_group_id, config) = {
692            // All NewCompactionGroup pairs are mapped to one new compaction group.
693            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
694            // Inherit config from parent group
695            let config = self
696                .compaction_group_manager
697                .read()
698                .await
699                .try_get_compaction_group_config(parent_group_id)
700                .ok_or_else(|| {
701                    Error::CompactionGroup(format!(
702                        "parent group {} config not found",
703                        parent_group_id
704                    ))
705                })?
706                .compaction_config()
707                .as_ref()
708                .clone();
709
710            #[expect(deprecated)]
711            // fill the deprecated field with default value
712            new_version_delta.group_deltas.insert(
713                new_compaction_group_id,
714                GroupDeltas {
715                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
716                        group_config: Some(config.clone()),
717                        group_id: new_compaction_group_id,
718                        parent_group_id,
719                        new_sst_start_id,
720                        table_ids: vec![],
721                        version: CompatibilityVersion::LATEST as _, // for compatibility
722                        split_key: Some(split_key.into()),
723                    }))],
724                },
725            );
726            (new_compaction_group_id, config)
727        };
728
729        new_version_delta.with_latest_version(|version, new_version_delta| {
730            for &table_id in &table_ids_right {
731                let info = version
732                    .state_table_info
733                    .info()
734                    .get(&table_id)
735                    .expect("have check exist previously");
736                assert!(
737                    new_version_delta
738                        .state_table_info_delta
739                        .insert(
740                            table_id,
741                            PbStateTableInfoDelta {
742                                committed_epoch: info.committed_epoch,
743                                compaction_group_id: new_compaction_group_id,
744                            }
745                        )
746                        .is_none()
747                );
748            }
749        });
750
751        result.push((new_compaction_group_id, table_ids_right));
752
753        {
754            let mut compaction_group_manager = self.compaction_group_manager.write().await;
755            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
756            compaction_groups_txn
757                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
758
759            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
760            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
761            // The target `table_ids` might be split to an existing group, so we need to try to update its config
762            for (cg_id, table_ids) in &result {
763                // check the split_tables had been place to the dedicated compaction group
764                if let Some(partition_vnode_count) = partition_vnode_count
765                    && table_ids.len() == 1
766                    && table_ids == split_table_ids
767                    && let Err(err) = compaction_groups_txn.update_compaction_config(
768                        &[*cg_id],
769                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
770                    )
771                {
772                    tracing::error!(
773                        error = %err.as_report(),
774                        "failed to update compaction config for group-{}",
775                        cg_id
776                    );
777                }
778            }
779
780            new_version_delta.pre_apply();
781            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
782        }
783        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
784        versioning.mark_next_time_travel_version_snapshot();
785
786        // The expired compact tasks will be canceled.
787        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
788        let mut canceled_tasks = vec![];
789        let compact_task_assignments =
790            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
791        let levels = versioning
792            .current_version
793            .get_compaction_group_levels(parent_group_id);
794        compact_task_assignments
795            .into_iter()
796            .for_each(|task_assignment| {
797                if let Some(task) = task_assignment.compact_task.as_ref() {
798                    let is_expired = is_compaction_task_expired(
799                        task.compaction_group_version_id,
800                        levels.compaction_group_version_id,
801                    );
802                    if is_expired {
803                        canceled_tasks.push(ReportTask {
804                            task_id: task.task_id,
805                            task_status: TaskStatus::ManualCanceled,
806                            table_stats_change: HashMap::default(),
807                            sorted_output_ssts: vec![],
808                            object_timestamps: HashMap::default(),
809                        });
810                    }
811                }
812            });
813
814        if !canceled_tasks.is_empty() {
815            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
816                .await?;
817        }
818
819        self.metrics
820            .split_compaction_group_count
821            .with_label_values(&[&parent_group_id.to_string()])
822            .inc();
823
824        Ok(result)
825    }
826
827    /// Split `table_ids` to a dedicated compaction group.
828    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
829    pub async fn move_state_tables_to_dedicated_compaction_group(
830        &self,
831        parent_group_id: CompactionGroupId,
832        table_ids: &[StateTableId],
833        partition_vnode_count: Option<u32>,
834    ) -> Result<(
835        CompactionGroupId,
836        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
837    )> {
838        if table_ids.is_empty() {
839            return Err(Error::CompactionGroup(
840                "table_ids must not be empty".to_owned(),
841            ));
842        }
843
844        if !table_ids.is_sorted() {
845            return Err(Error::CompactionGroup(
846                "table_ids must be sorted".to_owned(),
847            ));
848        }
849
850        let parent_table_ids = {
851            let versioning_guard = self.versioning.read().await;
852            versioning_guard
853                .current_version
854                .state_table_info
855                .compaction_group_member_table_ids(parent_group_id)
856                .iter()
857                .copied()
858                .collect_vec()
859        };
860
861        if parent_table_ids == table_ids {
862            return Err(Error::CompactionGroup(format!(
863                "invalid split attempt for group {}: all member tables are moved",
864                parent_group_id
865            )));
866        }
867
868        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
869            // 1. table_ids in different cg are sorted.
870            {
871                cg_id_to_table_ids
872                    .iter()
873                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
874            }
875
876            // 2.table_ids in different cg are non-overlapping
877            {
878                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
879                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
880                assert!(table_table_ids_vec.concat().is_sorted());
881            }
882
883            // 3.table_ids belong to one and only one cg.
884            {
885                let mut all_table_ids = HashSet::new();
886                for table_ids in cg_id_to_table_ids.values() {
887                    for table_id in table_ids {
888                        assert!(all_table_ids.insert(*table_id));
889                    }
890                }
891            }
892        }
893
894        // move [3,4,5,6]
895        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
896        // split key
897        // 1. table_id = 3, vnode = 0, epoch = MAX
898        // 2. table_id = 7, vnode = 0, epoch = MAX
899
900        // The new compaction group id is always generate on the right side
901        // Hence, we return the first compaction group id as the result
902        // split 1
903        let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
904        let table_id_to_split = *table_ids.first().unwrap();
905        let mut target_compaction_group_id: CompactionGroupId = 0.into();
906        let result_vec = self
907            .split_compaction_group_impl(
908                parent_group_id,
909                table_ids,
910                table_id_to_split,
911                VirtualNode::ZERO,
912                partition_vnode_count,
913            )
914            .await?;
915        assert!(result_vec.len() <= 2);
916
917        let mut finish_move = false;
918        for (cg_id, table_ids_after_split) in result_vec {
919            if table_ids_after_split.contains(&table_id_to_split) {
920                target_compaction_group_id = cg_id;
921            }
922
923            if table_ids_after_split == table_ids {
924                finish_move = true;
925            }
926
927            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
928        }
929        check_table_ids_valid(&cg_id_to_table_ids);
930
931        if finish_move {
932            return Ok((target_compaction_group_id, cg_id_to_table_ids));
933        }
934
935        // split 2
936        // See the example above and the split rule in `split_compaction_group_impl`.
937        let table_id_to_split = *table_ids.last().unwrap();
938        let result_vec = self
939            .split_compaction_group_impl(
940                target_compaction_group_id,
941                table_ids,
942                table_id_to_split,
943                VirtualNode::MAX_REPRESENTABLE,
944                partition_vnode_count,
945            )
946            .await?;
947        assert!(result_vec.len() <= 2);
948        for (cg_id, table_ids_after_split) in result_vec {
949            if table_ids_after_split.contains(&table_id_to_split) {
950                target_compaction_group_id = cg_id;
951            }
952            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
953        }
954        check_table_ids_valid(&cg_id_to_table_ids);
955
956        Ok((target_compaction_group_id, cg_id_to_table_ids))
957    }
958}
959
960impl HummockManager {
961    async fn build_normalize_plan(&self) -> Option<NormalizePlan> {
962        let groups = self.calculate_compaction_group_statistic().await;
963        build_normalize_plan_from_group_statistics(&groups)
964    }
965
966    async fn apply_normalize_plan(&self, plan: &NormalizePlan) -> Result<bool> {
967        let (table_ids_right, boundary_table_id, new_compaction_group_id) = {
968            let mut versioning_guard = self.versioning.write().await;
969            let versioning = versioning_guard.deref_mut();
970            let mut compaction_group_manager = self.compaction_group_manager.write().await;
971
972            let groups = collect_normalize_group_statistics(
973                &versioning.current_version,
974                &compaction_group_manager,
975            )?;
976            let Some(current_plan) = build_normalize_plan_from_group_statistics(&groups) else {
977                return Ok(false);
978            };
979
980            if &current_plan != plan {
981                return Ok(false);
982            }
983
984            let (_table_ids_left, table_ids_right) = plan.split_table_ids();
985
986            let config = compaction_group_manager
987                .try_get_compaction_group_config(plan.parent_group_id)
988                .ok_or_else(|| {
989                    Error::CompactionGroup(format!(
990                        "parent group {} config not found",
991                        plan.parent_group_id
992                    ))
993                })?
994                .compaction_config()
995                .as_ref()
996                .clone();
997
998            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
999            let mut version = HummockVersionTransaction::new(
1000                &mut versioning.current_version,
1001                &mut versioning.hummock_version_deltas,
1002                &mut versioning.table_change_log,
1003                self.env.notification_manager(),
1004                None,
1005                &self.metrics,
1006                &self.env.opts,
1007            );
1008            let mut new_version_delta = version.new_delta();
1009            let split_key = plan.split_key();
1010            let split_sst_count = new_version_delta
1011                .latest_version()
1012                .count_new_ssts_in_group_split(plan.parent_group_id, split_key.clone());
1013            let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
1014            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
1015
1016            #[expect(deprecated)]
1017            new_version_delta.group_deltas.insert(
1018                new_compaction_group_id,
1019                GroupDeltas {
1020                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
1021                        group_config: Some(config.clone()),
1022                        group_id: new_compaction_group_id,
1023                        parent_group_id: plan.parent_group_id,
1024                        new_sst_start_id,
1025                        table_ids: vec![],
1026                        version: CompatibilityVersion::LATEST as _,
1027                        split_key: Some(split_key.into()),
1028                    }))],
1029                },
1030            );
1031
1032            new_version_delta.with_latest_version(|version, new_version_delta| {
1033                for &table_id in &table_ids_right {
1034                    let info = version
1035                        .state_table_info
1036                        .info()
1037                        .get(&table_id)
1038                        .expect("table should exist before normalize split");
1039                    assert!(
1040                        new_version_delta
1041                            .state_table_info_delta
1042                            .insert(
1043                                table_id,
1044                                PbStateTableInfoDelta {
1045                                    committed_epoch: info.committed_epoch,
1046                                    compaction_group_id: new_compaction_group_id,
1047                                }
1048                            )
1049                            .is_none()
1050                    );
1051                }
1052            });
1053            new_version_delta.pre_apply();
1054            compaction_groups_txn
1055                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
1056
1057            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
1058            versioning.mark_next_time_travel_version_snapshot();
1059
1060            (
1061                table_ids_right,
1062                plan.boundary_table_id,
1063                new_compaction_group_id,
1064            )
1065        };
1066
1067        self.cancel_expired_normalize_split_tasks(plan.parent_group_id)
1068            .await?;
1069        self.metrics
1070            .split_compaction_group_count
1071            .with_label_values(&[&plan.parent_group_id.to_string()])
1072            .inc();
1073        tracing::info!(
1074            "normalize split success: parent_group={} boundary_table_id={} moved_tables={:?} new_group_id={}",
1075            plan.parent_group_id,
1076            boundary_table_id,
1077            table_ids_right,
1078            new_compaction_group_id
1079        );
1080
1081        Ok(true)
1082    }
1083
1084    async fn cancel_expired_normalize_split_tasks(
1085        &self,
1086        parent_group_id: CompactionGroupId,
1087    ) -> Result<()> {
1088        let mut canceled_tasks = vec![];
1089        let compaction_guard = self.compaction.write().await;
1090        let mut versioning_guard = self.versioning.write().await;
1091        let versioning = versioning_guard.deref_mut();
1092        let compact_task_assignments =
1093            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
1094        let levels = versioning
1095            .current_version
1096            .get_compaction_group_levels(parent_group_id);
1097        compact_task_assignments
1098            .into_iter()
1099            .for_each(|task_assignment| {
1100                if let Some(task) = task_assignment.compact_task.as_ref()
1101                    && is_compaction_task_expired(
1102                        task.compaction_group_version_id,
1103                        levels.compaction_group_version_id,
1104                    )
1105                {
1106                    canceled_tasks.push(ReportTask {
1107                        task_id: task.task_id,
1108                        task_status: TaskStatus::ManualCanceled,
1109                        table_stats_change: HashMap::default(),
1110                        sorted_output_ssts: vec![],
1111                        object_timestamps: HashMap::default(),
1112                    });
1113                }
1114            });
1115        canceled_tasks.sort_by_key(|task| task.task_id);
1116        canceled_tasks.dedup_by_key(|task| task.task_id);
1117
1118        if !canceled_tasks.is_empty() {
1119            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
1120                .await?;
1121        }
1122
1123        Ok(())
1124    }
1125
1126    /// Normalize overlapping adjacent compaction groups by split only.
1127    ///
1128    /// The algorithm repeatedly scans adjacent groups by `min(table_id)` and if
1129    /// `max(left) >= min(right)`, it splits `left` at the first table id `>= min(right)`.
1130    /// Each step is planned from a read snapshot, then revalidated and applied with a short write
1131    /// transaction.
1132    pub async fn normalize_overlapping_compaction_groups(&self) -> Result<usize> {
1133        self.normalize_overlapping_compaction_groups_with_limit(usize::MAX)
1134            .await
1135    }
1136
1137    pub async fn normalize_overlapping_compaction_groups_with_limit(
1138        &self,
1139        max_splits: usize,
1140    ) -> Result<usize> {
1141        let mut split_count = 0usize;
1142        while split_count < max_splits {
1143            let Some(plan) = self.build_normalize_plan().await else {
1144                break;
1145            };
1146
1147            if !self.apply_normalize_plan(&plan).await? {
1148                tracing::debug!(
1149                    parent_group_id = %plan.parent_group_id,
1150                    boundary_table_id = %plan.boundary_table_id,
1151                    "normalize plan became stale before apply"
1152                );
1153                break;
1154            }
1155            split_count += 1;
1156        }
1157
1158        Ok(split_count)
1159    }
1160
1161    /// Split the compaction group if the group is too large or contains high throughput tables.
1162    pub async fn try_split_compaction_group(
1163        &self,
1164        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1165        group: CompactionGroupStatistic,
1166    ) {
1167        if group
1168            .compaction_group_config
1169            .compaction_config
1170            .disable_auto_group_scheduling
1171            .unwrap_or(false)
1172        {
1173            return;
1174        }
1175        // split high throughput table to dedicated compaction group
1176        for (table_id, table_size) in &group.table_statistic {
1177            self.try_move_high_throughput_table_to_dedicated_cg(
1178                table_write_throughput_statistic_manager,
1179                *table_id,
1180                table_size,
1181                group.group_id,
1182            )
1183            .await;
1184        }
1185
1186        // split the huge group to multiple groups
1187        self.try_split_huge_compaction_group(group).await;
1188    }
1189
1190    /// Try to move the high throughput table to a dedicated compaction group.
1191    pub async fn try_move_high_throughput_table_to_dedicated_cg(
1192        &self,
1193        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1194        table_id: TableId,
1195        _table_size: &u64,
1196        parent_group_id: CompactionGroupId,
1197    ) {
1198        let mut table_throughput = table_write_throughput_statistic_manager
1199            .get_table_throughput_descending(
1200                table_id,
1201                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
1202            )
1203            .peekable();
1204
1205        if table_throughput.peek().is_none() {
1206            return;
1207        }
1208
1209        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
1210            table_throughput,
1211            self.env.opts.table_high_write_throughput_threshold,
1212            self.env
1213                .opts
1214                .table_stat_high_write_throughput_ratio_for_split,
1215        );
1216
1217        // do not split a table to dedicated compaction group if it is not high write throughput
1218        if !is_high_write_throughput {
1219            return;
1220        }
1221
1222        let ret = self
1223            .move_state_tables_to_dedicated_compaction_group(
1224                parent_group_id,
1225                &[table_id],
1226                Some(self.env.opts.partition_vnode_count),
1227            )
1228            .await;
1229        match ret {
1230            Ok(split_result) => {
1231                tracing::info!(
1232                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
1233                    table_id,
1234                    parent_group_id,
1235                    self.env.opts.partition_vnode_count,
1236                    split_result
1237                );
1238            }
1239            Err(e) => {
1240                tracing::info!(
1241                    error = %e.as_report(),
1242                    "failed to split state table [{}] from group-{}",
1243                    table_id,
1244                    parent_group_id,
1245                )
1246            }
1247        }
1248    }
1249
1250    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
1251        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
1252            * self.env.opts.split_group_size_ratio) as u64;
1253        let is_huge_hybrid_group =
1254            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
1255        if is_huge_hybrid_group {
1256            let mut accumulated_size = 0;
1257            let mut table_ids = Vec::default();
1258            for (table_id, table_size) in &group.table_statistic {
1259                accumulated_size += table_size;
1260                table_ids.push(*table_id);
1261                // split if the accumulated size is greater than half of the group size
1262                // avoid split a small table to dedicated compaction group and trigger multiple merge
1263                assert!(table_ids.is_sorted());
1264                let remaining_size = group.group_size.saturating_sub(accumulated_size);
1265                if accumulated_size > group_max_size / 2
1266                    && remaining_size > 0
1267                    && table_ids.len() < group.table_statistic.len()
1268                {
1269                    let ret = self
1270                        .move_state_tables_to_dedicated_compaction_group(
1271                            group.group_id,
1272                            &table_ids,
1273                            None,
1274                        )
1275                        .await;
1276                    match ret {
1277                        Ok(split_result) => {
1278                            tracing::info!(
1279                                "split_huge_compaction_group success {:?}",
1280                                split_result
1281                            );
1282                            self.metrics
1283                                .split_compaction_group_count
1284                                .with_label_values(&[&group.group_id.to_string()])
1285                                .inc();
1286                            return;
1287                        }
1288                        Err(e) => {
1289                            tracing::error!(
1290                                error = %e.as_report(),
1291                                "failed to split_huge_compaction_group table {:?} from group-{}",
1292                                table_ids,
1293                                group.group_id
1294                            );
1295
1296                            return;
1297                        }
1298                    }
1299                }
1300            }
1301        }
1302    }
1303
1304    pub async fn try_merge_compaction_group(
1305        &self,
1306        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1307        group: &CompactionGroupStatistic,
1308        next_group: &CompactionGroupStatistic,
1309        created_tables: &HashSet<TableId>,
1310    ) -> Result<()> {
1311        GroupMergeValidator::validate_group_merge(
1312            group,
1313            next_group,
1314            created_tables,
1315            table_write_throughput_statistic_manager,
1316            &self.env.opts,
1317            &self.versioning,
1318        )
1319        .await?;
1320
1321        let result = self
1322            .merge_compaction_group(group.group_id, next_group.group_id)
1323            .await;
1324
1325        match &result {
1326            Ok(()) => {
1327                tracing::info!(
1328                    "merge group-{} to group-{}",
1329                    next_group.group_id,
1330                    group.group_id,
1331                );
1332
1333                self.metrics
1334                    .merge_compaction_group_count
1335                    .with_label_values(&[&group.group_id.to_string()])
1336                    .inc();
1337            }
1338            Err(e) => {
1339                tracing::info!(
1340                    error = %e.as_report(),
1341                    "failed to merge group-{} group-{}",
1342                    next_group.group_id,
1343                    group.group_id,
1344                );
1345            }
1346        }
1347
1348        result
1349    }
1350}
1351
1352#[derive(Debug, Default)]
1353struct GroupMergeValidator {}
1354
1355impl GroupMergeValidator {
1356    /// Check if two groups have compatible compaction configs for merging.
1357    /// Ignores `split_weight_by_vnode` since it's per-table and will be reset after merge.
1358    fn is_merge_compatible_by_semantics(
1359        group: &CompactionGroupStatistic,
1360        next_group: &CompactionGroupStatistic,
1361    ) -> bool {
1362        let (mut left, mut right) = (
1363            group
1364                .compaction_group_config
1365                .compaction_config
1366                .as_ref()
1367                .clone(),
1368            next_group
1369                .compaction_group_config
1370                .compaction_config
1371                .as_ref()
1372                .clone(),
1373        );
1374        left.split_weight_by_vnode = 0;
1375        right.split_weight_by_vnode = 0;
1376        left == right
1377    }
1378
1379    /// Check if the table is high write throughput with the given threshold and ratio.
1380    pub fn is_table_high_write_throughput(
1381        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1382        threshold: u64,
1383        high_write_throughput_ratio: f64,
1384    ) -> bool {
1385        let mut sample_size = 0;
1386        let mut high_write_throughput_count = 0;
1387        for statistic in table_throughput {
1388            sample_size += 1;
1389            if statistic.throughput > threshold {
1390                high_write_throughput_count += 1;
1391            }
1392        }
1393
1394        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
1395    }
1396
1397    pub fn is_table_low_write_throughput(
1398        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
1399        threshold: u64,
1400        low_write_throughput_ratio: f64,
1401    ) -> bool {
1402        let mut sample_size = 0;
1403        let mut low_write_throughput_count = 0;
1404        for statistic in table_throughput {
1405            sample_size += 1;
1406            if statistic.throughput <= threshold {
1407                low_write_throughput_count += 1;
1408            }
1409        }
1410
1411        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
1412    }
1413
1414    fn check_is_low_write_throughput_compaction_group(
1415        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1416        group: &CompactionGroupStatistic,
1417        opts: &Arc<MetaOpts>,
1418    ) -> bool {
1419        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1420        for table_id in group.table_statistic.keys() {
1421            let mut table_throughput = table_write_throughput_statistic_manager
1422                .get_table_throughput_descending(
1423                    *table_id,
1424                    opts.table_stat_throuput_window_seconds_for_merge as i64,
1425                )
1426                .peekable();
1427            if table_throughput.peek().is_none() {
1428                continue;
1429            }
1430
1431            table_with_statistic.push(table_throughput);
1432        }
1433
1434        // if all tables in the group do not have enough statistics, return true
1435        if table_with_statistic.is_empty() {
1436            return true;
1437        }
1438
1439        // check if all tables in the group are low write throughput with enough statistics
1440        table_with_statistic.into_iter().all(|table_throughput| {
1441            Self::is_table_low_write_throughput(
1442                table_throughput,
1443                opts.table_low_write_throughput_threshold,
1444                opts.table_stat_low_write_throughput_ratio_for_merge,
1445            )
1446        })
1447    }
1448
1449    fn check_is_creating_compaction_group(
1450        group: &CompactionGroupStatistic,
1451        created_tables: &HashSet<TableId>,
1452    ) -> bool {
1453        group
1454            .table_statistic
1455            .keys()
1456            .any(|table_id| !created_tables.contains(table_id))
1457    }
1458
1459    async fn validate_group_merge(
1460        group: &CompactionGroupStatistic,
1461        next_group: &CompactionGroupStatistic,
1462        created_tables: &HashSet<TableId>,
1463        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1464        opts: &Arc<MetaOpts>,
1465        versioning: &MonitoredRwLock<Versioning>,
1466    ) -> Result<()> {
1467        // TODO: remove this check after refactor group id
1468        if (group.group_id == StaticCompactionGroupId::StateDefault
1469            && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1470            || (group.group_id == StaticCompactionGroupId::MaterializedView
1471                && next_group.group_id == StaticCompactionGroupId::StateDefault)
1472        {
1473            return Err(Error::CompactionGroup(format!(
1474                "group-{} and group-{} are both StaticCompactionGroupId",
1475                group.group_id, next_group.group_id
1476            )));
1477        }
1478
1479        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1480            return Err(Error::CompactionGroup(format!(
1481                "group-{} or group-{} is empty",
1482                group.group_id, next_group.group_id
1483            )));
1484        }
1485
1486        // Check non-overlapping table ids early to avoid acquiring heavyweight write locks
1487        // in merge_compaction_group_impl only to fail at the overlap check.
1488        // Sort both sides and ensure table_ids_1 has the smaller first element,
1489        // then reject if table_ids_1's last element >= table_ids_2's first element (overlap).
1490        {
1491            let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1492            let mut table_ids_2: Vec<TableId> =
1493                next_group.table_statistic.keys().cloned().collect_vec();
1494            table_ids_1.sort();
1495            table_ids_2.sort();
1496            if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1497                std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1498            }
1499            if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1500                return Err(Error::CompactionGroup(format!(
1501                    "group-{} and group-{} have overlapping table id ranges, not mergeable",
1502                    group.group_id, next_group.group_id
1503                )));
1504            }
1505        }
1506
1507        if group
1508            .compaction_group_config
1509            .compaction_config
1510            .disable_auto_group_scheduling
1511            .unwrap_or(false)
1512            || next_group
1513                .compaction_group_config
1514                .compaction_config
1515                .disable_auto_group_scheduling
1516                .unwrap_or(false)
1517        {
1518            return Err(Error::CompactionGroup(format!(
1519                "group-{} or group-{} disable_auto_group_scheduling",
1520                group.group_id, next_group.group_id
1521            )));
1522        }
1523
1524        // Keep merge compatibility as a feature, but ignore split_weight_by_vnode, because it is
1525        // only used for per-table split behavior and will be reset after merge.
1526        if !Self::is_merge_compatible_by_semantics(group, next_group) {
1527            let left_config = group.compaction_group_config.compaction_config.as_ref();
1528            let right_config = next_group
1529                .compaction_group_config
1530                .compaction_config
1531                .as_ref();
1532
1533            tracing::warn!(
1534                group_id = %group.group_id,
1535                next_group_id = %next_group.group_id,
1536                left_config = ?left_config,
1537                right_config = ?right_config,
1538                "compaction config semantic mismatch detected while merging compaction groups"
1539            );
1540
1541            return Err(Error::CompactionGroup(format!(
1542                "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1543                group.group_id, next_group.group_id, left_config, right_config
1544            )));
1545        }
1546
1547        // do not merge the compaction group which is creating
1548        if Self::check_is_creating_compaction_group(group, created_tables) {
1549            return Err(Error::CompactionGroup(format!(
1550                "Cannot merge creating group {} next_group {}",
1551                group.group_id, next_group.group_id
1552            )));
1553        }
1554
1555        // do not merge high throughput group
1556        if !Self::check_is_low_write_throughput_compaction_group(
1557            table_write_throughput_statistic_manager,
1558            group,
1559            opts,
1560        ) {
1561            return Err(Error::CompactionGroup(format!(
1562                "Cannot merge high throughput group {} next_group {}",
1563                group.group_id, next_group.group_id
1564            )));
1565        }
1566
1567        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1568            * opts.split_group_size_ratio) as u64;
1569
1570        if (group.group_size + next_group.group_size) > size_limit {
1571            return Err(Error::CompactionGroup(format!(
1572                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1573                group.group_id,
1574                group.group_size,
1575                next_group.group_id,
1576                next_group.group_size,
1577                size_limit
1578            )));
1579        }
1580
1581        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1582            return Err(Error::CompactionGroup(format!(
1583                "Cannot merge creating group {} next group {}",
1584                group.group_id, next_group.group_id
1585            )));
1586        }
1587
1588        if !Self::check_is_low_write_throughput_compaction_group(
1589            table_write_throughput_statistic_manager,
1590            next_group,
1591            opts,
1592        ) {
1593            return Err(Error::CompactionGroup(format!(
1594                "Cannot merge high throughput group {} next group {}",
1595                group.group_id, next_group.group_id
1596            )));
1597        }
1598
1599        {
1600            // Avoid merge when the group is in emergency state
1601            let versioning_guard = versioning.read().await;
1602            let levels = &versioning_guard.current_version.levels;
1603            if !levels.contains_key(&group.group_id) {
1604                return Err(Error::CompactionGroup(format!(
1605                    "Cannot merge group {} not exist",
1606                    group.group_id
1607                )));
1608            }
1609
1610            if !levels.contains_key(&next_group.group_id) {
1611                return Err(Error::CompactionGroup(format!(
1612                    "Cannot merge next group {} not exist",
1613                    next_group.group_id
1614                )));
1615            }
1616
1617            let group_levels = versioning_guard
1618                .current_version
1619                .get_compaction_group_levels(group.group_id);
1620
1621            let next_group_levels = versioning_guard
1622                .current_version
1623                .get_compaction_group_levels(next_group.group_id);
1624
1625            let group_state = GroupStateValidator::group_state(
1626                group_levels,
1627                group.compaction_group_config.compaction_config().deref(),
1628            );
1629
1630            if group_state.is_write_stop() || group_state.is_emergency() {
1631                return Err(Error::CompactionGroup(format!(
1632                    "Cannot merge write limit group {} next group {}",
1633                    group.group_id, next_group.group_id
1634                )));
1635            }
1636
1637            let next_group_state = GroupStateValidator::group_state(
1638                next_group_levels,
1639                next_group
1640                    .compaction_group_config
1641                    .compaction_config()
1642                    .deref(),
1643            );
1644
1645            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1646                return Err(Error::CompactionGroup(format!(
1647                    "Cannot merge write limit next group {} group {}",
1648                    next_group.group_id, group.group_id
1649                )));
1650            }
1651
1652            // check whether the group is in the write stop state after merge
1653            let l0_sub_level_count_after_merge =
1654                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1655            if GroupStateValidator::write_stop_l0_file_count(
1656                (l0_sub_level_count_after_merge as f64
1657                    * opts.compaction_group_merge_dimension_threshold) as usize,
1658                group.compaction_group_config.compaction_config().deref(),
1659            ) {
1660                return Err(Error::CompactionGroup(format!(
1661                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1662                    group.group_id, next_group.group_id
1663                )));
1664            }
1665
1666            let l0_file_count_after_merge =
1667                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1668            if GroupStateValidator::write_stop_l0_file_count(
1669                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1670                    as usize,
1671                group.compaction_group_config.compaction_config().deref(),
1672            ) {
1673                return Err(Error::CompactionGroup(format!(
1674                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1675                    next_group.group_id, group.group_id
1676                )));
1677            }
1678
1679            let l0_size_after_merge =
1680                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1681
1682            if GroupStateValidator::write_stop_l0_size(
1683                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1684                    as u64,
1685                group.compaction_group_config.compaction_config().deref(),
1686            ) {
1687                return Err(Error::CompactionGroup(format!(
1688                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1689                    next_group.group_id, group.group_id
1690                )));
1691            }
1692
1693            // check whether the group is in the emergency state after merge
1694            if GroupStateValidator::emergency_l0_file_count(
1695                (l0_sub_level_count_after_merge as f64
1696                    * opts.compaction_group_merge_dimension_threshold) as usize,
1697                group.compaction_group_config.compaction_config().deref(),
1698            ) {
1699                return Err(Error::CompactionGroup(format!(
1700                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1701                    group.group_id, next_group.group_id
1702                )));
1703            }
1704        }
1705
1706        Ok(())
1707    }
1708}