risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<TableId>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<TableId>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<TableId>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(table_id))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids
168        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169            return Err(Error::CompactionGroup(format!(
170                "invalid merge group_1 {} group_2 {}",
171                left_group_id, right_group_id
172            )));
173        }
174
175        let combined_member_table_ids = member_table_ids_1
176            .iter()
177            .chain(member_table_ids_2.iter())
178            .collect_vec();
179        assert!(combined_member_table_ids.is_sorted());
180
181        // check duplicated sst_id
182        let mut sst_id_set = HashSet::new();
183        for sst_id in versioning
184            .current_version
185            .get_sst_ids_by_group_id(left_group_id)
186            .chain(
187                versioning
188                    .current_version
189                    .get_sst_ids_by_group_id(right_group_id),
190            )
191        {
192            if !sst_id_set.insert(sst_id) {
193                return Err(Error::CompactionGroup(format!(
194                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195                    left_group_id, right_group_id, sst_id
196                )));
197            }
198        }
199
200        // check branched sst on non-overlap level
201        {
202            let left_levels = versioning
203                .current_version
204                .get_compaction_group_levels(group_1);
205
206            let right_levels = versioning
207                .current_version
208                .get_compaction_group_levels(group_2);
209
210            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
211            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
212            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213            for level_idx in 1..=max_level {
214                let left_level = left_levels.get_level(level_idx);
215                let right_level = right_levels.get_level(level_idx);
216                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217                    continue;
218                }
219
220                let left_last_sst = left_level.table_infos.last().unwrap().clone();
221                let right_first_sst = right_level.table_infos.first().unwrap().clone();
222                let left_sst_id = left_last_sst.sst_id;
223                let right_sst_id = right_first_sst.sst_id;
224                let left_obj_id = left_last_sst.object_id;
225                let right_obj_id = right_first_sst.object_id;
226
227                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
228                if !can_concat(&[left_last_sst, right_first_sst]) {
229                    return Err(Error::CompactionGroup(format!(
230                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231                        left_group_id,
232                        right_group_id,
233                        level_idx,
234                        left_sst_id,
235                        right_sst_id,
236                        left_obj_id,
237                        right_obj_id
238                    )));
239                }
240            }
241        }
242
243        let mut version = HummockVersionTransaction::new(
244            &mut versioning.current_version,
245            &mut versioning.hummock_version_deltas,
246            self.env.notification_manager(),
247            None,
248            &self.metrics,
249        );
250        let mut new_version_delta = version.new_delta();
251
252        let target_compaction_group_id = {
253            // merge right_group_id to left_group_id and remove right_group_id
254            new_version_delta.group_deltas.insert(
255                left_group_id,
256                GroupDeltas {
257                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258                        left_group_id,
259                        right_group_id,
260                    })],
261                },
262            );
263            left_group_id
264        };
265
266        // TODO: remove compaciton group_id from state_table_info
267        // rewrite compaction_group_id for all tables
268        new_version_delta.with_latest_version(|version, new_version_delta| {
269            for &table_id in combined_member_table_ids {
270                let info = version
271                    .state_table_info
272                    .info()
273                    .get(&table_id)
274                    .expect("have check exist previously");
275                assert!(
276                    new_version_delta
277                        .state_table_info_delta
278                        .insert(
279                            table_id,
280                            PbStateTableInfoDelta {
281                                committed_epoch: info.committed_epoch,
282                                compaction_group_id: target_compaction_group_id,
283                            }
284                        )
285                        .is_none()
286                );
287            }
288        });
289
290        {
291            let mut compaction_group_manager = self.compaction_group_manager.write().await;
292            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
293
294            // for metrics reclaim
295            {
296                let right_group_max_level = new_version_delta
297                    .latest_version()
298                    .get_compaction_group_levels(right_group_id)
299                    .levels
300                    .len();
301
302                remove_compaction_group_in_sst_stat(
303                    &self.metrics,
304                    right_group_id,
305                    right_group_max_level,
306                );
307            }
308
309            // clear `partition_vnode_count` for the hybrid group
310            {
311                if let Err(err) = compaction_groups_txn.update_compaction_config(
312                    &[left_group_id],
313                    &[MutableConfig::SplitWeightByVnode(0)], // default
314                ) {
315                    tracing::error!(
316                        error = %err.as_report(),
317                        "failed to update compaction config for group-{}",
318                        left_group_id
319                    );
320                }
321            }
322
323            new_version_delta.pre_apply();
324
325            // remove right_group_id
326            compaction_groups_txn.remove(right_group_id);
327            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
328        }
329
330        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
331        versioning.mark_next_time_travel_version_snapshot();
332
333        // cancel tasks
334        let mut canceled_tasks = vec![];
335        // after merge, all tasks in right_group_id should be canceled
336        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
337        let compact_task_assignments =
338            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
339        compact_task_assignments
340            .into_iter()
341            .for_each(|task_assignment| {
342                if let Some(task) = task_assignment.compact_task.as_ref() {
343                    assert_eq!(task.compaction_group_id, right_group_id);
344                    canceled_tasks.push(ReportTask {
345                        task_id: task.task_id,
346                        task_status: TaskStatus::ManualCanceled,
347                        table_stats_change: HashMap::default(),
348                        sorted_output_ssts: vec![],
349                        object_timestamps: HashMap::default(),
350                    });
351                }
352            });
353
354        if !canceled_tasks.is_empty() {
355            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
356                .await?;
357        }
358
359        self.metrics
360            .merge_compaction_group_count
361            .with_label_values(&[&left_group_id.to_string()])
362            .inc();
363
364        Ok(())
365    }
366}
367
368impl HummockManager {
369    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
370    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
371    /// The split will follow the following rules
372    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
373    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
374    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
375    ///   - `parent_group_id`: the `group_id` to split
376    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
377    ///   - `table_id_to_split`: the `table_id` to split
378    ///   - `vnode_to_split`: the `vnode` to split
379    ///   - `partition_vnode_count`: the partition count for the single table group if need
380    async fn split_compaction_group_impl(
381        &self,
382        parent_group_id: CompactionGroupId,
383        split_table_ids: &[StateTableId],
384        table_id_to_split: StateTableId,
385        vnode_to_split: VirtualNode,
386        partition_vnode_count: Option<u32>,
387    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
388        let mut result = vec![];
389        let compaction_guard = self.compaction.write().await;
390        let mut versioning_guard = self.versioning.write().await;
391        let versioning = versioning_guard.deref_mut();
392        // Validate parameters.
393        if !versioning
394            .current_version
395            .levels
396            .contains_key(&parent_group_id)
397        {
398            return Err(Error::CompactionGroup(format!(
399                "invalid group {}",
400                parent_group_id
401            )));
402        }
403
404        let member_table_ids = versioning
405            .current_version
406            .state_table_info
407            .compaction_group_member_table_ids(parent_group_id)
408            .iter()
409            .copied()
410            .collect::<BTreeSet<_>>();
411
412        if !member_table_ids.contains(&table_id_to_split) {
413            return Err(Error::CompactionGroup(format!(
414                "table {} doesn't in group {}",
415                table_id_to_split, parent_group_id
416            )));
417        }
418
419        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
420
421        // change to vec for partition
422        let table_ids = member_table_ids.into_iter().collect_vec();
423        if table_ids == split_table_ids {
424            return Err(Error::CompactionGroup(format!(
425                "invalid split attempt for group {}: all member tables are moved",
426                parent_group_id
427            )));
428        }
429        // avoid decode split_key when caller is aware of the table_id and vnode
430        let (table_ids_left, table_ids_right) =
431            group_split::split_table_ids_with_table_id_and_vnode(
432                &table_ids,
433                split_full_key.user_key.table_id,
434                split_full_key.user_key.get_vnode_id(),
435            );
436        if table_ids_left.is_empty() || table_ids_right.is_empty() {
437            // not need to split group if all tables are in the same side
438            if !table_ids_left.is_empty() {
439                result.push((parent_group_id, table_ids_left));
440            }
441
442            if !table_ids_right.is_empty() {
443                result.push((parent_group_id, table_ids_right));
444            }
445            return Ok(result);
446        }
447
448        result.push((parent_group_id, table_ids_left));
449
450        let split_key: Bytes = split_full_key.encode().into();
451
452        let mut version = HummockVersionTransaction::new(
453            &mut versioning.current_version,
454            &mut versioning.hummock_version_deltas,
455            self.env.notification_manager(),
456            None,
457            &self.metrics,
458        );
459        let mut new_version_delta = version.new_delta();
460
461        let split_sst_count = new_version_delta
462            .latest_version()
463            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
464
465        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
466        let (new_compaction_group_id, config) = {
467            // All NewCompactionGroup pairs are mapped to one new compaction group.
468            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
469            // Inherit config from parent group
470            let config = self
471                .compaction_group_manager
472                .read()
473                .await
474                .try_get_compaction_group_config(parent_group_id)
475                .ok_or_else(|| {
476                    Error::CompactionGroup(format!(
477                        "parent group {} config not found",
478                        parent_group_id
479                    ))
480                })?
481                .compaction_config()
482                .as_ref()
483                .clone();
484
485            #[expect(deprecated)]
486            // fill the deprecated field with default value
487            new_version_delta.group_deltas.insert(
488                new_compaction_group_id,
489                GroupDeltas {
490                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
491                        group_config: Some(config.clone()),
492                        group_id: new_compaction_group_id,
493                        parent_group_id,
494                        new_sst_start_id: new_sst_start_id.inner(),
495                        table_ids: vec![],
496                        version: CompatibilityVersion::LATEST as _, // for compatibility
497                        split_key: Some(split_key.into()),
498                    }))],
499                },
500            );
501            (new_compaction_group_id, config)
502        };
503
504        new_version_delta.with_latest_version(|version, new_version_delta| {
505            for &table_id in &table_ids_right {
506                let info = version
507                    .state_table_info
508                    .info()
509                    .get(&table_id)
510                    .expect("have check exist previously");
511                assert!(
512                    new_version_delta
513                        .state_table_info_delta
514                        .insert(
515                            table_id,
516                            PbStateTableInfoDelta {
517                                committed_epoch: info.committed_epoch,
518                                compaction_group_id: new_compaction_group_id,
519                            }
520                        )
521                        .is_none()
522                );
523            }
524        });
525
526        result.push((new_compaction_group_id, table_ids_right));
527
528        {
529            let mut compaction_group_manager = self.compaction_group_manager.write().await;
530            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
531            compaction_groups_txn
532                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
533
534            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
535            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
536            // The target `table_ids` might be split to an existing group, so we need to try to update its config
537            for (cg_id, table_ids) in &result {
538                // check the split_tables had been place to the dedicated compaction group
539                if let Some(partition_vnode_count) = partition_vnode_count
540                    && table_ids.len() == 1
541                    && table_ids == split_table_ids
542                    && let Err(err) = compaction_groups_txn.update_compaction_config(
543                        &[*cg_id],
544                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
545                    )
546                {
547                    tracing::error!(
548                        error = %err.as_report(),
549                        "failed to update compaction config for group-{}",
550                        cg_id
551                    );
552                }
553            }
554
555            new_version_delta.pre_apply();
556            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
557        }
558        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
559        versioning.mark_next_time_travel_version_snapshot();
560
561        // The expired compact tasks will be canceled.
562        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
563        let mut canceled_tasks = vec![];
564        let compact_task_assignments =
565            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
566        let levels = versioning
567            .current_version
568            .get_compaction_group_levels(parent_group_id);
569        compact_task_assignments
570            .into_iter()
571            .for_each(|task_assignment| {
572                if let Some(task) = task_assignment.compact_task.as_ref() {
573                    let is_expired = is_compaction_task_expired(
574                        task.compaction_group_version_id,
575                        levels.compaction_group_version_id,
576                    );
577                    if is_expired {
578                        canceled_tasks.push(ReportTask {
579                            task_id: task.task_id,
580                            task_status: TaskStatus::ManualCanceled,
581                            table_stats_change: HashMap::default(),
582                            sorted_output_ssts: vec![],
583                            object_timestamps: HashMap::default(),
584                        });
585                    }
586                }
587            });
588
589        if !canceled_tasks.is_empty() {
590            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
591                .await?;
592        }
593
594        self.metrics
595            .split_compaction_group_count
596            .with_label_values(&[&parent_group_id.to_string()])
597            .inc();
598
599        Ok(result)
600    }
601
602    /// Split `table_ids` to a dedicated compaction group.
603    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
604    pub async fn move_state_tables_to_dedicated_compaction_group(
605        &self,
606        parent_group_id: CompactionGroupId,
607        table_ids: &[StateTableId],
608        partition_vnode_count: Option<u32>,
609    ) -> Result<(
610        CompactionGroupId,
611        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
612    )> {
613        if table_ids.is_empty() {
614            return Err(Error::CompactionGroup(
615                "table_ids must not be empty".to_owned(),
616            ));
617        }
618
619        if !table_ids.is_sorted() {
620            return Err(Error::CompactionGroup(
621                "table_ids must be sorted".to_owned(),
622            ));
623        }
624
625        let parent_table_ids = {
626            let versioning_guard = self.versioning.read().await;
627            versioning_guard
628                .current_version
629                .state_table_info
630                .compaction_group_member_table_ids(parent_group_id)
631                .iter()
632                .copied()
633                .collect_vec()
634        };
635
636        if parent_table_ids == table_ids {
637            return Err(Error::CompactionGroup(format!(
638                "invalid split attempt for group {}: all member tables are moved",
639                parent_group_id
640            )));
641        }
642
643        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<TableId>>) {
644            // 1. table_ids in different cg are sorted.
645            {
646                cg_id_to_table_ids
647                    .iter()
648                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
649            }
650
651            // 2.table_ids in different cg are non-overlapping
652            {
653                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
654                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
655                assert!(table_table_ids_vec.concat().is_sorted());
656            }
657
658            // 3.table_ids belong to one and only one cg.
659            {
660                let mut all_table_ids = HashSet::new();
661                for table_ids in cg_id_to_table_ids.values() {
662                    for table_id in table_ids {
663                        assert!(all_table_ids.insert(*table_id));
664                    }
665                }
666            }
667        }
668
669        // move [3,4,5,6]
670        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
671        // split key
672        // 1. table_id = 3, vnode = 0, epoch = MAX
673        // 2. table_id = 7, vnode = 0, epoch = MAX
674
675        // The new compaction group id is always generate on the right side
676        // Hence, we return the first compaction group id as the result
677        // split 1
678        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
679        let table_id_to_split = *table_ids.first().unwrap();
680        let mut target_compaction_group_id = 0;
681        let result_vec = self
682            .split_compaction_group_impl(
683                parent_group_id,
684                table_ids,
685                table_id_to_split,
686                VirtualNode::ZERO,
687                partition_vnode_count,
688            )
689            .await?;
690        assert!(result_vec.len() <= 2);
691
692        let mut finish_move = false;
693        for (cg_id, table_ids_after_split) in result_vec {
694            if table_ids_after_split.contains(&table_id_to_split) {
695                target_compaction_group_id = cg_id;
696            }
697
698            if table_ids_after_split == table_ids {
699                finish_move = true;
700            }
701
702            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
703        }
704        check_table_ids_valid(&cg_id_to_table_ids);
705
706        if finish_move {
707            return Ok((target_compaction_group_id, cg_id_to_table_ids));
708        }
709
710        // split 2
711        // See the example above and the split rule in `split_compaction_group_impl`.
712        let table_id_to_split = *table_ids.last().unwrap();
713        let result_vec = self
714            .split_compaction_group_impl(
715                target_compaction_group_id,
716                table_ids,
717                table_id_to_split,
718                VirtualNode::MAX_REPRESENTABLE,
719                partition_vnode_count,
720            )
721            .await?;
722        assert!(result_vec.len() <= 2);
723        for (cg_id, table_ids_after_split) in result_vec {
724            if table_ids_after_split.contains(&table_id_to_split) {
725                target_compaction_group_id = cg_id;
726            }
727            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
728        }
729        check_table_ids_valid(&cg_id_to_table_ids);
730
731        Ok((target_compaction_group_id, cg_id_to_table_ids))
732    }
733}
734
735impl HummockManager {
736    /// Split the compaction group if the group is too large or contains high throughput tables.
737    pub async fn try_split_compaction_group(
738        &self,
739        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
740        group: CompactionGroupStatistic,
741    ) {
742        if group
743            .compaction_group_config
744            .compaction_config
745            .disable_auto_group_scheduling
746            .unwrap_or(false)
747        {
748            return;
749        }
750        // split high throughput table to dedicated compaction group
751        for (table_id, table_size) in &group.table_statistic {
752            self.try_move_high_throughput_table_to_dedicated_cg(
753                table_write_throughput_statistic_manager,
754                *table_id,
755                table_size,
756                group.group_id,
757            )
758            .await;
759        }
760
761        // split the huge group to multiple groups
762        self.try_split_huge_compaction_group(group).await;
763    }
764
765    /// Try to move the high throughput table to a dedicated compaction group.
766    pub async fn try_move_high_throughput_table_to_dedicated_cg(
767        &self,
768        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
769        table_id: TableId,
770        _table_size: &u64,
771        parent_group_id: u64,
772    ) {
773        let mut table_throughput = table_write_throughput_statistic_manager
774            .get_table_throughput_descending(
775                table_id,
776                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
777            )
778            .peekable();
779
780        if table_throughput.peek().is_none() {
781            return;
782        }
783
784        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
785            table_throughput,
786            self.env.opts.table_high_write_throughput_threshold,
787            self.env
788                .opts
789                .table_stat_high_write_throughput_ratio_for_split,
790        );
791
792        // do not split a table to dedicated compaction group if it is not high write throughput
793        if !is_high_write_throughput {
794            return;
795        }
796
797        let ret = self
798            .move_state_tables_to_dedicated_compaction_group(
799                parent_group_id,
800                &[table_id],
801                Some(self.env.opts.partition_vnode_count),
802            )
803            .await;
804        match ret {
805            Ok(split_result) => {
806                tracing::info!(
807                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
808                    table_id,
809                    parent_group_id,
810                    self.env.opts.partition_vnode_count,
811                    split_result
812                );
813            }
814            Err(e) => {
815                tracing::info!(
816                    error = %e.as_report(),
817                    "failed to split state table [{}] from group-{}",
818                    table_id,
819                    parent_group_id,
820                )
821            }
822        }
823    }
824
825    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
826        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
827            * self.env.opts.split_group_size_ratio) as u64;
828        let is_huge_hybrid_group =
829            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
830        if is_huge_hybrid_group {
831            let mut accumulated_size = 0;
832            let mut table_ids = Vec::default();
833            for (table_id, table_size) in &group.table_statistic {
834                accumulated_size += table_size;
835                table_ids.push(*table_id);
836                // split if the accumulated size is greater than half of the group size
837                // avoid split a small table to dedicated compaction group and trigger multiple merge
838                assert!(table_ids.is_sorted());
839                if accumulated_size * 2 > group_max_size {
840                    let ret = self
841                        .move_state_tables_to_dedicated_compaction_group(
842                            group.group_id,
843                            &table_ids,
844                            None,
845                        )
846                        .await;
847                    match ret {
848                        Ok(split_result) => {
849                            tracing::info!(
850                                "split_huge_compaction_group success {:?}",
851                                split_result
852                            );
853                            self.metrics
854                                .split_compaction_group_count
855                                .with_label_values(&[&group.group_id.to_string()])
856                                .inc();
857                            return;
858                        }
859                        Err(e) => {
860                            tracing::error!(
861                                error = %e.as_report(),
862                                "failed to split_huge_compaction_group table {:?} from group-{}",
863                                table_ids,
864                                group.group_id
865                            );
866
867                            return;
868                        }
869                    }
870                }
871            }
872        }
873    }
874
875    pub async fn try_merge_compaction_group(
876        &self,
877        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
878        group: &CompactionGroupStatistic,
879        next_group: &CompactionGroupStatistic,
880        created_tables: &HashSet<TableId>,
881    ) -> Result<()> {
882        GroupMergeValidator::validate_group_merge(
883            group,
884            next_group,
885            created_tables,
886            table_write_throughput_statistic_manager,
887            &self.env.opts,
888            &self.versioning,
889        )
890        .await?;
891
892        match self
893            .merge_compaction_group(group.group_id, next_group.group_id)
894            .await
895        {
896            Ok(()) => {
897                tracing::info!(
898                    "merge group-{} to group-{}",
899                    next_group.group_id,
900                    group.group_id,
901                );
902
903                self.metrics
904                    .merge_compaction_group_count
905                    .with_label_values(&[&group.group_id.to_string()])
906                    .inc();
907            }
908            Err(e) => {
909                tracing::info!(
910                    error = %e.as_report(),
911                    "failed to merge group-{} group-{}",
912                    next_group.group_id,
913                    group.group_id,
914                )
915            }
916        }
917
918        Ok(())
919    }
920}
921
922#[derive(Debug, Default)]
923struct GroupMergeValidator {}
924
925impl GroupMergeValidator {
926    /// Check if the table is high write throughput with the given threshold and ratio.
927    pub fn is_table_high_write_throughput(
928        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
929        threshold: u64,
930        high_write_throughput_ratio: f64,
931    ) -> bool {
932        let mut sample_size = 0;
933        let mut high_write_throughput_count = 0;
934        for statistic in table_throughput {
935            sample_size += 1;
936            if statistic.throughput > threshold {
937                high_write_throughput_count += 1;
938            }
939        }
940
941        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
942    }
943
944    pub fn is_table_low_write_throughput(
945        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
946        threshold: u64,
947        low_write_throughput_ratio: f64,
948    ) -> bool {
949        let mut sample_size = 0;
950        let mut low_write_throughput_count = 0;
951        for statistic in table_throughput {
952            sample_size += 1;
953            if statistic.throughput <= threshold {
954                low_write_throughput_count += 1;
955            }
956        }
957
958        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
959    }
960
961    fn check_is_low_write_throughput_compaction_group(
962        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
963        group: &CompactionGroupStatistic,
964        opts: &Arc<MetaOpts>,
965    ) -> bool {
966        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
967        for table_id in group.table_statistic.keys() {
968            let mut table_throughput = table_write_throughput_statistic_manager
969                .get_table_throughput_descending(
970                    *table_id,
971                    opts.table_stat_throuput_window_seconds_for_merge as i64,
972                )
973                .peekable();
974            if table_throughput.peek().is_none() {
975                continue;
976            }
977
978            table_with_statistic.push(table_throughput);
979        }
980
981        // if all tables in the group do not have enough statistics, return true
982        if table_with_statistic.is_empty() {
983            return true;
984        }
985
986        // check if all tables in the group are low write throughput with enough statistics
987        table_with_statistic.into_iter().all(|table_throughput| {
988            Self::is_table_low_write_throughput(
989                table_throughput,
990                opts.table_low_write_throughput_threshold,
991                opts.table_stat_low_write_throughput_ratio_for_merge,
992            )
993        })
994    }
995
996    fn check_is_creating_compaction_group(
997        group: &CompactionGroupStatistic,
998        created_tables: &HashSet<TableId>,
999    ) -> bool {
1000        group
1001            .table_statistic
1002            .keys()
1003            .any(|table_id| !created_tables.contains(table_id))
1004    }
1005
1006    async fn validate_group_merge(
1007        group: &CompactionGroupStatistic,
1008        next_group: &CompactionGroupStatistic,
1009        created_tables: &HashSet<TableId>,
1010        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1011        opts: &Arc<MetaOpts>,
1012        versioning: &MonitoredRwLock<Versioning>,
1013    ) -> Result<()> {
1014        // TODO: remove this check after refactor group id
1015        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1016            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1017            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1018                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1019        {
1020            return Err(Error::CompactionGroup(format!(
1021                "group-{} and group-{} are both StaticCompactionGroupId",
1022                group.group_id, next_group.group_id
1023            )));
1024        }
1025
1026        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1027            return Err(Error::CompactionGroup(format!(
1028                "group-{} or group-{} is empty",
1029                group.group_id, next_group.group_id
1030            )));
1031        }
1032
1033        if group
1034            .compaction_group_config
1035            .compaction_config
1036            .disable_auto_group_scheduling
1037            .unwrap_or(false)
1038            || next_group
1039                .compaction_group_config
1040                .compaction_config
1041                .disable_auto_group_scheduling
1042                .unwrap_or(false)
1043        {
1044            return Err(Error::CompactionGroup(format!(
1045                "group-{} or group-{} disable_auto_group_scheduling",
1046                group.group_id, next_group.group_id
1047            )));
1048        }
1049
1050        // Do not merge compaction groups with different compaction configs.
1051        // Different configs lead to different max_estimated_group_size calculations,
1052        // which can cause scheduling conflicts (continuous split/merge cycles).
1053        // The following fields in CompactionConfig affect max_estimated_group_size:
1054        //   - max_bytes_for_level_base
1055        //   - max_bytes_for_level_multiplier
1056        //   - max_compaction_bytes
1057        //   - sub_level_max_compaction_bytes
1058        // If any of these fields differ, the groups may have incompatible scheduling.
1059        if group.compaction_group_config.compaction_config
1060            != next_group.compaction_group_config.compaction_config
1061        {
1062            let left_config = group.compaction_group_config.compaction_config.as_ref();
1063            let right_config = next_group
1064                .compaction_group_config
1065                .compaction_config
1066                .as_ref();
1067
1068            tracing::warn!(
1069                group_id = group.group_id,
1070                next_group_id = next_group.group_id,
1071                left_config = ?left_config,
1072                right_config = ?right_config,
1073                "compaction config mismatch detected while merging compaction groups"
1074            );
1075
1076            return Err(Error::CompactionGroup(format!(
1077                "Cannot merge group {} and next_group {} with different compaction configs. left_config: {:?}, right_config: {:?}",
1078                group.group_id, next_group.group_id, left_config, right_config
1079            )));
1080        }
1081
1082        // do not merge the compaction group which is creating
1083        if Self::check_is_creating_compaction_group(group, created_tables) {
1084            return Err(Error::CompactionGroup(format!(
1085                "Cannot merge creating group {} next_group {}",
1086                group.group_id, next_group.group_id
1087            )));
1088        }
1089
1090        // do not merge high throughput group
1091        if !Self::check_is_low_write_throughput_compaction_group(
1092            table_write_throughput_statistic_manager,
1093            group,
1094            opts,
1095        ) {
1096            return Err(Error::CompactionGroup(format!(
1097                "Cannot merge high throughput group {} next_group {}",
1098                group.group_id, next_group.group_id
1099            )));
1100        }
1101
1102        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1103            * opts.split_group_size_ratio) as u64;
1104
1105        if (group.group_size + next_group.group_size) > size_limit {
1106            return Err(Error::CompactionGroup(format!(
1107                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1108                group.group_id,
1109                group.group_size,
1110                next_group.group_id,
1111                next_group.group_size,
1112                size_limit
1113            )));
1114        }
1115
1116        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1117            return Err(Error::CompactionGroup(format!(
1118                "Cannot merge creating group {} next group {}",
1119                group.group_id, next_group.group_id
1120            )));
1121        }
1122
1123        if !Self::check_is_low_write_throughput_compaction_group(
1124            table_write_throughput_statistic_manager,
1125            next_group,
1126            opts,
1127        ) {
1128            return Err(Error::CompactionGroup(format!(
1129                "Cannot merge high throughput group {} next group {}",
1130                group.group_id, next_group.group_id
1131            )));
1132        }
1133
1134        {
1135            // Avoid merge when the group is in emergency state
1136            let versioning_guard = versioning.read().await;
1137            let levels = &versioning_guard.current_version.levels;
1138            if !levels.contains_key(&group.group_id) {
1139                return Err(Error::CompactionGroup(format!(
1140                    "Cannot merge group {} not exist",
1141                    group.group_id
1142                )));
1143            }
1144
1145            if !levels.contains_key(&next_group.group_id) {
1146                return Err(Error::CompactionGroup(format!(
1147                    "Cannot merge next group {} not exist",
1148                    next_group.group_id
1149                )));
1150            }
1151
1152            let group_levels = versioning_guard
1153                .current_version
1154                .get_compaction_group_levels(group.group_id);
1155
1156            let next_group_levels = versioning_guard
1157                .current_version
1158                .get_compaction_group_levels(next_group.group_id);
1159
1160            let group_state = GroupStateValidator::group_state(
1161                group_levels,
1162                group.compaction_group_config.compaction_config().deref(),
1163            );
1164
1165            if group_state.is_write_stop() || group_state.is_emergency() {
1166                return Err(Error::CompactionGroup(format!(
1167                    "Cannot merge write limit group {} next group {}",
1168                    group.group_id, next_group.group_id
1169                )));
1170            }
1171
1172            let next_group_state = GroupStateValidator::group_state(
1173                next_group_levels,
1174                next_group
1175                    .compaction_group_config
1176                    .compaction_config()
1177                    .deref(),
1178            );
1179
1180            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1181                return Err(Error::CompactionGroup(format!(
1182                    "Cannot merge write limit next group {} group {}",
1183                    next_group.group_id, group.group_id
1184                )));
1185            }
1186
1187            // check whether the group is in the write stop state after merge
1188            let l0_sub_level_count_after_merge =
1189                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1190            if GroupStateValidator::write_stop_l0_file_count(
1191                (l0_sub_level_count_after_merge as f64
1192                    * opts.compaction_group_merge_dimension_threshold) as usize,
1193                group.compaction_group_config.compaction_config().deref(),
1194            ) {
1195                return Err(Error::CompactionGroup(format!(
1196                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1197                    group.group_id, next_group.group_id
1198                )));
1199            }
1200
1201            let l0_file_count_after_merge =
1202                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1203            if GroupStateValidator::write_stop_l0_file_count(
1204                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1205                    as usize,
1206                group.compaction_group_config.compaction_config().deref(),
1207            ) {
1208                return Err(Error::CompactionGroup(format!(
1209                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1210                    next_group.group_id, group.group_id
1211                )));
1212            }
1213
1214            let l0_size_after_merge =
1215                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1216
1217            if GroupStateValidator::write_stop_l0_size(
1218                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1219                    as u64,
1220                group.compaction_group_config.compaction_config().deref(),
1221            ) {
1222                return Err(Error::CompactionGroup(format!(
1223                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1224                    next_group.group_id, group.group_id
1225                )));
1226            }
1227
1228            // check whether the group is in the emergency state after merge
1229            if GroupStateValidator::emergency_l0_file_count(
1230                (l0_sub_level_count_after_merge as f64
1231                    * opts.compaction_group_merge_dimension_threshold) as usize,
1232                group.compaction_group_config.compaction_config().deref(),
1233            ) {
1234                return Err(Error::CompactionGroup(format!(
1235                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1236                    group.group_id, next_group.group_id
1237                )));
1238            }
1239        }
1240
1241        Ok(())
1242    }
1243}