risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<TableId>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<TableId>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<TableId>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(table_id))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids
168        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169            return Err(Error::CompactionGroup(format!(
170                "invalid merge group_1 {} group_2 {}",
171                left_group_id, right_group_id
172            )));
173        }
174
175        let combined_member_table_ids = member_table_ids_1
176            .iter()
177            .chain(member_table_ids_2.iter())
178            .collect_vec();
179        assert!(combined_member_table_ids.is_sorted());
180
181        // check duplicated sst_id
182        let mut sst_id_set = HashSet::new();
183        for sst_id in versioning
184            .current_version
185            .get_sst_ids_by_group_id(left_group_id)
186            .chain(
187                versioning
188                    .current_version
189                    .get_sst_ids_by_group_id(right_group_id),
190            )
191        {
192            if !sst_id_set.insert(sst_id) {
193                return Err(Error::CompactionGroup(format!(
194                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195                    left_group_id, right_group_id, sst_id
196                )));
197            }
198        }
199
200        // check branched sst on non-overlap level
201        {
202            let left_levels = versioning
203                .current_version
204                .get_compaction_group_levels(group_1);
205
206            let right_levels = versioning
207                .current_version
208                .get_compaction_group_levels(group_2);
209
210            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
211            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
212            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213            for level_idx in 1..=max_level {
214                let left_level = left_levels.get_level(level_idx);
215                let right_level = right_levels.get_level(level_idx);
216                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217                    continue;
218                }
219
220                let left_last_sst = left_level.table_infos.last().unwrap().clone();
221                let right_first_sst = right_level.table_infos.first().unwrap().clone();
222                let left_sst_id = left_last_sst.sst_id;
223                let right_sst_id = right_first_sst.sst_id;
224                let left_obj_id = left_last_sst.object_id;
225                let right_obj_id = right_first_sst.object_id;
226
227                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
228                if !can_concat(&[left_last_sst, right_first_sst]) {
229                    return Err(Error::CompactionGroup(format!(
230                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231                        left_group_id,
232                        right_group_id,
233                        level_idx,
234                        left_sst_id,
235                        right_sst_id,
236                        left_obj_id,
237                        right_obj_id
238                    )));
239                }
240            }
241        }
242
243        let mut version = HummockVersionTransaction::new(
244            &mut versioning.current_version,
245            &mut versioning.hummock_version_deltas,
246            self.env.notification_manager(),
247            None,
248            &self.metrics,
249        );
250        let mut new_version_delta = version.new_delta();
251
252        let target_compaction_group_id = {
253            // merge right_group_id to left_group_id and remove right_group_id
254            new_version_delta.group_deltas.insert(
255                left_group_id,
256                GroupDeltas {
257                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258                        left_group_id,
259                        right_group_id,
260                    })],
261                },
262            );
263            left_group_id
264        };
265
266        // TODO: remove compaciton group_id from state_table_info
267        // rewrite compaction_group_id for all tables
268        new_version_delta.with_latest_version(|version, new_version_delta| {
269            for &table_id in combined_member_table_ids {
270                let info = version
271                    .state_table_info
272                    .info()
273                    .get(&table_id)
274                    .expect("have check exist previously");
275                assert!(
276                    new_version_delta
277                        .state_table_info_delta
278                        .insert(
279                            table_id,
280                            PbStateTableInfoDelta {
281                                committed_epoch: info.committed_epoch,
282                                compaction_group_id: target_compaction_group_id,
283                            }
284                        )
285                        .is_none()
286                );
287            }
288        });
289
290        {
291            let mut compaction_group_manager = self.compaction_group_manager.write().await;
292            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
293
294            // for metrics reclaim
295            {
296                let right_group_max_level = new_version_delta
297                    .latest_version()
298                    .get_compaction_group_levels(right_group_id)
299                    .levels
300                    .len();
301
302                remove_compaction_group_in_sst_stat(
303                    &self.metrics,
304                    right_group_id,
305                    right_group_max_level,
306                );
307            }
308
309            // clear `partition_vnode_count` for the hybrid group
310            {
311                if let Err(err) = compaction_groups_txn.update_compaction_config(
312                    &[left_group_id],
313                    &[MutableConfig::SplitWeightByVnode(0)], // default
314                ) {
315                    tracing::error!(
316                        error = %err.as_report(),
317                        "failed to update compaction config for group-{}",
318                        left_group_id
319                    );
320                }
321            }
322
323            new_version_delta.pre_apply();
324
325            // remove right_group_id
326            compaction_groups_txn.remove(right_group_id);
327            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
328        }
329
330        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
331        versioning.mark_next_time_travel_version_snapshot();
332
333        // cancel tasks
334        let mut canceled_tasks = vec![];
335        // after merge, all tasks in right_group_id should be canceled
336        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
337        let compact_task_assignments =
338            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
339        compact_task_assignments
340            .into_iter()
341            .for_each(|task_assignment| {
342                if let Some(task) = task_assignment.compact_task.as_ref() {
343                    assert_eq!(task.compaction_group_id, right_group_id);
344                    canceled_tasks.push(ReportTask {
345                        task_id: task.task_id,
346                        task_status: TaskStatus::ManualCanceled,
347                        table_stats_change: HashMap::default(),
348                        sorted_output_ssts: vec![],
349                        object_timestamps: HashMap::default(),
350                    });
351                }
352            });
353
354        if !canceled_tasks.is_empty() {
355            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
356                .await?;
357        }
358
359        self.metrics
360            .merge_compaction_group_count
361            .with_label_values(&[&left_group_id.to_string()])
362            .inc();
363
364        Ok(())
365    }
366}
367
368impl HummockManager {
369    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
370    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
371    /// The split will follow the following rules
372    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
373    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
374    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
375    ///   - `parent_group_id`: the `group_id` to split
376    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
377    ///   - `table_id_to_split`: the `table_id` to split
378    ///   - `vnode_to_split`: the `vnode` to split
379    ///   - `partition_vnode_count`: the partition count for the single table group if need
380    async fn split_compaction_group_impl(
381        &self,
382        parent_group_id: CompactionGroupId,
383        split_table_ids: &[StateTableId],
384        table_id_to_split: StateTableId,
385        vnode_to_split: VirtualNode,
386        partition_vnode_count: Option<u32>,
387    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
388        let mut result = vec![];
389        let compaction_guard = self.compaction.write().await;
390        let mut versioning_guard = self.versioning.write().await;
391        let versioning = versioning_guard.deref_mut();
392        // Validate parameters.
393        if !versioning
394            .current_version
395            .levels
396            .contains_key(&parent_group_id)
397        {
398            return Err(Error::CompactionGroup(format!(
399                "invalid group {}",
400                parent_group_id
401            )));
402        }
403
404        let member_table_ids = versioning
405            .current_version
406            .state_table_info
407            .compaction_group_member_table_ids(parent_group_id)
408            .iter()
409            .copied()
410            .collect::<BTreeSet<_>>();
411
412        if !member_table_ids.contains(&table_id_to_split) {
413            return Err(Error::CompactionGroup(format!(
414                "table {} doesn't in group {}",
415                table_id_to_split, parent_group_id
416            )));
417        }
418
419        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
420
421        // change to vec for partition
422        let table_ids = member_table_ids.into_iter().collect_vec();
423        if table_ids == split_table_ids {
424            return Err(Error::CompactionGroup(format!(
425                "invalid split attempt for group {}: all member tables are moved",
426                parent_group_id
427            )));
428        }
429        // avoid decode split_key when caller is aware of the table_id and vnode
430        let (table_ids_left, table_ids_right) =
431            group_split::split_table_ids_with_table_id_and_vnode(
432                &table_ids,
433                split_full_key.user_key.table_id,
434                split_full_key.user_key.get_vnode_id(),
435            );
436        if table_ids_left.is_empty() || table_ids_right.is_empty() {
437            // not need to split group if all tables are in the same side
438            if !table_ids_left.is_empty() {
439                result.push((parent_group_id, table_ids_left));
440            }
441
442            if !table_ids_right.is_empty() {
443                result.push((parent_group_id, table_ids_right));
444            }
445            return Ok(result);
446        }
447
448        result.push((parent_group_id, table_ids_left));
449
450        let split_key: Bytes = split_full_key.encode().into();
451
452        let mut version = HummockVersionTransaction::new(
453            &mut versioning.current_version,
454            &mut versioning.hummock_version_deltas,
455            self.env.notification_manager(),
456            None,
457            &self.metrics,
458        );
459        let mut new_version_delta = version.new_delta();
460
461        let split_sst_count = new_version_delta
462            .latest_version()
463            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
464
465        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
466        let (new_compaction_group_id, config) = {
467            // All NewCompactionGroup pairs are mapped to one new compaction group.
468            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
469            // Inherit config from parent group
470            let config = self
471                .compaction_group_manager
472                .read()
473                .await
474                .try_get_compaction_group_config(parent_group_id)
475                .ok_or_else(|| {
476                    Error::CompactionGroup(format!(
477                        "parent group {} config not found",
478                        parent_group_id
479                    ))
480                })?
481                .compaction_config()
482                .as_ref()
483                .clone();
484
485            #[expect(deprecated)]
486            // fill the deprecated field with default value
487            new_version_delta.group_deltas.insert(
488                new_compaction_group_id,
489                GroupDeltas {
490                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
491                        group_config: Some(config.clone()),
492                        group_id: new_compaction_group_id,
493                        parent_group_id,
494                        new_sst_start_id: new_sst_start_id.inner(),
495                        table_ids: vec![],
496                        version: CompatibilityVersion::LATEST as _, // for compatibility
497                        split_key: Some(split_key.into()),
498                    }))],
499                },
500            );
501            (new_compaction_group_id, config)
502        };
503
504        new_version_delta.with_latest_version(|version, new_version_delta| {
505            for &table_id in &table_ids_right {
506                let info = version
507                    .state_table_info
508                    .info()
509                    .get(&table_id)
510                    .expect("have check exist previously");
511                assert!(
512                    new_version_delta
513                        .state_table_info_delta
514                        .insert(
515                            table_id,
516                            PbStateTableInfoDelta {
517                                committed_epoch: info.committed_epoch,
518                                compaction_group_id: new_compaction_group_id,
519                            }
520                        )
521                        .is_none()
522                );
523            }
524        });
525
526        result.push((new_compaction_group_id, table_ids_right));
527
528        {
529            let mut compaction_group_manager = self.compaction_group_manager.write().await;
530            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
531            compaction_groups_txn
532                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
533
534            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
535            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
536            // The target `table_ids` might be split to an existing group, so we need to try to update its config
537            for (cg_id, table_ids) in &result {
538                // check the split_tables had been place to the dedicated compaction group
539                if let Some(partition_vnode_count) = partition_vnode_count
540                    && table_ids.len() == 1
541                    && table_ids == split_table_ids
542                    && let Err(err) = compaction_groups_txn.update_compaction_config(
543                        &[*cg_id],
544                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
545                    )
546                {
547                    tracing::error!(
548                        error = %err.as_report(),
549                        "failed to update compaction config for group-{}",
550                        cg_id
551                    );
552                }
553            }
554
555            new_version_delta.pre_apply();
556            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
557        }
558        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
559        versioning.mark_next_time_travel_version_snapshot();
560
561        // The expired compact tasks will be canceled.
562        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
563        let mut canceled_tasks = vec![];
564        let compact_task_assignments =
565            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
566        let levels = versioning
567            .current_version
568            .get_compaction_group_levels(parent_group_id);
569        compact_task_assignments
570            .into_iter()
571            .for_each(|task_assignment| {
572                if let Some(task) = task_assignment.compact_task.as_ref() {
573                    let is_expired = is_compaction_task_expired(
574                        task.compaction_group_version_id,
575                        levels.compaction_group_version_id,
576                    );
577                    if is_expired {
578                        canceled_tasks.push(ReportTask {
579                            task_id: task.task_id,
580                            task_status: TaskStatus::ManualCanceled,
581                            table_stats_change: HashMap::default(),
582                            sorted_output_ssts: vec![],
583                            object_timestamps: HashMap::default(),
584                        });
585                    }
586                }
587            });
588
589        if !canceled_tasks.is_empty() {
590            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
591                .await?;
592        }
593
594        self.metrics
595            .split_compaction_group_count
596            .with_label_values(&[&parent_group_id.to_string()])
597            .inc();
598
599        Ok(result)
600    }
601
602    /// Split `table_ids` to a dedicated compaction group.
603    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
604    pub async fn move_state_tables_to_dedicated_compaction_group(
605        &self,
606        parent_group_id: CompactionGroupId,
607        table_ids: &[StateTableId],
608        partition_vnode_count: Option<u32>,
609    ) -> Result<(
610        CompactionGroupId,
611        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
612    )> {
613        if table_ids.is_empty() {
614            return Err(Error::CompactionGroup(
615                "table_ids must not be empty".to_owned(),
616            ));
617        }
618
619        if !table_ids.is_sorted() {
620            return Err(Error::CompactionGroup(
621                "table_ids must be sorted".to_owned(),
622            ));
623        }
624
625        let parent_table_ids = {
626            let versioning_guard = self.versioning.read().await;
627            versioning_guard
628                .current_version
629                .state_table_info
630                .compaction_group_member_table_ids(parent_group_id)
631                .iter()
632                .copied()
633                .collect_vec()
634        };
635
636        if parent_table_ids == table_ids {
637            return Err(Error::CompactionGroup(format!(
638                "invalid split attempt for group {}: all member tables are moved",
639                parent_group_id
640            )));
641        }
642
643        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<TableId>>) {
644            // 1. table_ids in different cg are sorted.
645            {
646                cg_id_to_table_ids
647                    .iter()
648                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
649            }
650
651            // 2.table_ids in different cg are non-overlapping
652            {
653                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
654                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
655                assert!(table_table_ids_vec.concat().is_sorted());
656            }
657
658            // 3.table_ids belong to one and only one cg.
659            {
660                let mut all_table_ids = HashSet::new();
661                for table_ids in cg_id_to_table_ids.values() {
662                    for table_id in table_ids {
663                        assert!(all_table_ids.insert(*table_id));
664                    }
665                }
666            }
667        }
668
669        // move [3,4,5,6]
670        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
671        // split key
672        // 1. table_id = 3, vnode = 0, epoch = MAX
673        // 2. table_id = 7, vnode = 0, epoch = MAX
674
675        // The new compaction group id is always generate on the right side
676        // Hence, we return the first compaction group id as the result
677        // split 1
678        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
679        let table_id_to_split = *table_ids.first().unwrap();
680        let mut target_compaction_group_id = 0;
681        let result_vec = self
682            .split_compaction_group_impl(
683                parent_group_id,
684                table_ids,
685                table_id_to_split,
686                VirtualNode::ZERO,
687                partition_vnode_count,
688            )
689            .await?;
690        assert!(result_vec.len() <= 2);
691
692        let mut finish_move = false;
693        for (cg_id, table_ids_after_split) in result_vec {
694            if table_ids_after_split.contains(&table_id_to_split) {
695                target_compaction_group_id = cg_id;
696            }
697
698            if table_ids_after_split == table_ids {
699                finish_move = true;
700            }
701
702            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
703        }
704        check_table_ids_valid(&cg_id_to_table_ids);
705
706        if finish_move {
707            return Ok((target_compaction_group_id, cg_id_to_table_ids));
708        }
709
710        // split 2
711        // See the example above and the split rule in `split_compaction_group_impl`.
712        let table_id_to_split = *table_ids.last().unwrap();
713        let result_vec = self
714            .split_compaction_group_impl(
715                target_compaction_group_id,
716                table_ids,
717                table_id_to_split,
718                VirtualNode::MAX_REPRESENTABLE,
719                partition_vnode_count,
720            )
721            .await?;
722        assert!(result_vec.len() <= 2);
723        for (cg_id, table_ids_after_split) in result_vec {
724            if table_ids_after_split.contains(&table_id_to_split) {
725                target_compaction_group_id = cg_id;
726            }
727            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
728        }
729        check_table_ids_valid(&cg_id_to_table_ids);
730
731        Ok((target_compaction_group_id, cg_id_to_table_ids))
732    }
733}
734
735impl HummockManager {
736    /// Split the compaction group if the group is too large or contains high throughput tables.
737    pub async fn try_split_compaction_group(
738        &self,
739        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
740        group: CompactionGroupStatistic,
741    ) {
742        if group
743            .compaction_group_config
744            .compaction_config
745            .disable_auto_group_scheduling
746            .unwrap_or(false)
747        {
748            return;
749        }
750        // split high throughput table to dedicated compaction group
751        for (table_id, table_size) in &group.table_statistic {
752            self.try_move_high_throughput_table_to_dedicated_cg(
753                table_write_throughput_statistic_manager,
754                *table_id,
755                table_size,
756                group.group_id,
757            )
758            .await;
759        }
760
761        // split the huge group to multiple groups
762        self.try_split_huge_compaction_group(group).await;
763    }
764
765    /// Try to move the high throughput table to a dedicated compaction group.
766    pub async fn try_move_high_throughput_table_to_dedicated_cg(
767        &self,
768        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
769        table_id: TableId,
770        _table_size: &u64,
771        parent_group_id: u64,
772    ) {
773        let mut table_throughput = table_write_throughput_statistic_manager
774            .get_table_throughput_descending(
775                table_id,
776                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
777            )
778            .peekable();
779
780        if table_throughput.peek().is_none() {
781            return;
782        }
783
784        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
785            table_throughput,
786            self.env.opts.table_high_write_throughput_threshold,
787            self.env
788                .opts
789                .table_stat_high_write_throughput_ratio_for_split,
790        );
791
792        // do not split a table to dedicated compaction group if it is not high write throughput
793        if !is_high_write_throughput {
794            return;
795        }
796
797        let ret = self
798            .move_state_tables_to_dedicated_compaction_group(
799                parent_group_id,
800                &[table_id],
801                Some(self.env.opts.partition_vnode_count),
802            )
803            .await;
804        match ret {
805            Ok(split_result) => {
806                tracing::info!(
807                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
808                    table_id,
809                    parent_group_id,
810                    self.env.opts.partition_vnode_count,
811                    split_result
812                );
813            }
814            Err(e) => {
815                tracing::info!(
816                    error = %e.as_report(),
817                    "failed to split state table [{}] from group-{}",
818                    table_id,
819                    parent_group_id,
820                )
821            }
822        }
823    }
824
825    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
826        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
827            * self.env.opts.split_group_size_ratio) as u64;
828        let is_huge_hybrid_group =
829            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
830        if is_huge_hybrid_group {
831            let mut accumulated_size = 0;
832            let mut table_ids = Vec::default();
833            for (table_id, table_size) in &group.table_statistic {
834                accumulated_size += table_size;
835                table_ids.push(*table_id);
836                // split if the accumulated size is greater than half of the group size
837                // avoid split a small table to dedicated compaction group and trigger multiple merge
838                assert!(table_ids.is_sorted());
839                let remaining_size = group.group_size.saturating_sub(accumulated_size);
840                if accumulated_size > group_max_size / 2
841                    && remaining_size > 0
842                    && table_ids.len() < group.table_statistic.len()
843                {
844                    let ret = self
845                        .move_state_tables_to_dedicated_compaction_group(
846                            group.group_id,
847                            &table_ids,
848                            None,
849                        )
850                        .await;
851                    match ret {
852                        Ok(split_result) => {
853                            tracing::info!(
854                                "split_huge_compaction_group success {:?}",
855                                split_result
856                            );
857                            self.metrics
858                                .split_compaction_group_count
859                                .with_label_values(&[&group.group_id.to_string()])
860                                .inc();
861                            return;
862                        }
863                        Err(e) => {
864                            tracing::error!(
865                                error = %e.as_report(),
866                                "failed to split_huge_compaction_group table {:?} from group-{}",
867                                table_ids,
868                                group.group_id
869                            );
870
871                            return;
872                        }
873                    }
874                }
875            }
876        }
877    }
878
879    pub async fn try_merge_compaction_group(
880        &self,
881        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
882        group: &CompactionGroupStatistic,
883        next_group: &CompactionGroupStatistic,
884        created_tables: &HashSet<TableId>,
885    ) -> Result<()> {
886        GroupMergeValidator::validate_group_merge(
887            group,
888            next_group,
889            created_tables,
890            table_write_throughput_statistic_manager,
891            &self.env.opts,
892            &self.versioning,
893        )
894        .await?;
895
896        match self
897            .merge_compaction_group(group.group_id, next_group.group_id)
898            .await
899        {
900            Ok(()) => {
901                tracing::info!(
902                    "merge group-{} to group-{}",
903                    next_group.group_id,
904                    group.group_id,
905                );
906
907                self.metrics
908                    .merge_compaction_group_count
909                    .with_label_values(&[&group.group_id.to_string()])
910                    .inc();
911            }
912            Err(e) => {
913                tracing::info!(
914                    error = %e.as_report(),
915                    "failed to merge group-{} group-{}",
916                    next_group.group_id,
917                    group.group_id,
918                )
919            }
920        }
921
922        Ok(())
923    }
924}
925
926#[derive(Debug, Default)]
927struct GroupMergeValidator {}
928
929impl GroupMergeValidator {
930    /// Check if the table is high write throughput with the given threshold and ratio.
931    pub fn is_table_high_write_throughput(
932        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
933        threshold: u64,
934        high_write_throughput_ratio: f64,
935    ) -> bool {
936        let mut sample_size = 0;
937        let mut high_write_throughput_count = 0;
938        for statistic in table_throughput {
939            sample_size += 1;
940            if statistic.throughput > threshold {
941                high_write_throughput_count += 1;
942            }
943        }
944
945        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
946    }
947
948    pub fn is_table_low_write_throughput(
949        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
950        threshold: u64,
951        low_write_throughput_ratio: f64,
952    ) -> bool {
953        let mut sample_size = 0;
954        let mut low_write_throughput_count = 0;
955        for statistic in table_throughput {
956            sample_size += 1;
957            if statistic.throughput <= threshold {
958                low_write_throughput_count += 1;
959            }
960        }
961
962        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
963    }
964
965    fn check_is_low_write_throughput_compaction_group(
966        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
967        group: &CompactionGroupStatistic,
968        opts: &Arc<MetaOpts>,
969    ) -> bool {
970        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
971        for table_id in group.table_statistic.keys() {
972            let mut table_throughput = table_write_throughput_statistic_manager
973                .get_table_throughput_descending(
974                    *table_id,
975                    opts.table_stat_throuput_window_seconds_for_merge as i64,
976                )
977                .peekable();
978            if table_throughput.peek().is_none() {
979                continue;
980            }
981
982            table_with_statistic.push(table_throughput);
983        }
984
985        // if all tables in the group do not have enough statistics, return true
986        if table_with_statistic.is_empty() {
987            return true;
988        }
989
990        // check if all tables in the group are low write throughput with enough statistics
991        table_with_statistic.into_iter().all(|table_throughput| {
992            Self::is_table_low_write_throughput(
993                table_throughput,
994                opts.table_low_write_throughput_threshold,
995                opts.table_stat_low_write_throughput_ratio_for_merge,
996            )
997        })
998    }
999
1000    fn check_is_creating_compaction_group(
1001        group: &CompactionGroupStatistic,
1002        created_tables: &HashSet<TableId>,
1003    ) -> bool {
1004        group
1005            .table_statistic
1006            .keys()
1007            .any(|table_id| !created_tables.contains(table_id))
1008    }
1009
1010    async fn validate_group_merge(
1011        group: &CompactionGroupStatistic,
1012        next_group: &CompactionGroupStatistic,
1013        created_tables: &HashSet<TableId>,
1014        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1015        opts: &Arc<MetaOpts>,
1016        versioning: &MonitoredRwLock<Versioning>,
1017    ) -> Result<()> {
1018        // TODO: remove this check after refactor group id
1019        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1020            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1021            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1022                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1023        {
1024            return Err(Error::CompactionGroup(format!(
1025                "group-{} and group-{} are both StaticCompactionGroupId",
1026                group.group_id, next_group.group_id
1027            )));
1028        }
1029
1030        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1031            return Err(Error::CompactionGroup(format!(
1032                "group-{} or group-{} is empty",
1033                group.group_id, next_group.group_id
1034            )));
1035        }
1036
1037        if group
1038            .compaction_group_config
1039            .compaction_config
1040            .disable_auto_group_scheduling
1041            .unwrap_or(false)
1042            || next_group
1043                .compaction_group_config
1044                .compaction_config
1045                .disable_auto_group_scheduling
1046                .unwrap_or(false)
1047        {
1048            return Err(Error::CompactionGroup(format!(
1049                "group-{} or group-{} disable_auto_group_scheduling",
1050                group.group_id, next_group.group_id
1051            )));
1052        }
1053
1054        // Do not merge compaction groups with different compaction configs.
1055        // Different configs lead to different max_estimated_group_size calculations,
1056        // which can cause scheduling conflicts (continuous split/merge cycles).
1057        // The following fields in CompactionConfig affect max_estimated_group_size:
1058        //   - max_bytes_for_level_base
1059        //   - max_bytes_for_level_multiplier
1060        //   - max_compaction_bytes
1061        //   - sub_level_max_compaction_bytes
1062        // If any of these fields differ, the groups may have incompatible scheduling.
1063        if group.compaction_group_config.compaction_config
1064            != next_group.compaction_group_config.compaction_config
1065        {
1066            let left_config = group.compaction_group_config.compaction_config.as_ref();
1067            let right_config = next_group
1068                .compaction_group_config
1069                .compaction_config
1070                .as_ref();
1071
1072            tracing::warn!(
1073                group_id = group.group_id,
1074                next_group_id = next_group.group_id,
1075                left_config = ?left_config,
1076                right_config = ?right_config,
1077                "compaction config mismatch detected while merging compaction groups"
1078            );
1079
1080            return Err(Error::CompactionGroup(format!(
1081                "Cannot merge group {} and next_group {} with different compaction configs. left_config: {:?}, right_config: {:?}",
1082                group.group_id, next_group.group_id, left_config, right_config
1083            )));
1084        }
1085
1086        // do not merge the compaction group which is creating
1087        if Self::check_is_creating_compaction_group(group, created_tables) {
1088            return Err(Error::CompactionGroup(format!(
1089                "Cannot merge creating group {} next_group {}",
1090                group.group_id, next_group.group_id
1091            )));
1092        }
1093
1094        // do not merge high throughput group
1095        if !Self::check_is_low_write_throughput_compaction_group(
1096            table_write_throughput_statistic_manager,
1097            group,
1098            opts,
1099        ) {
1100            return Err(Error::CompactionGroup(format!(
1101                "Cannot merge high throughput group {} next_group {}",
1102                group.group_id, next_group.group_id
1103            )));
1104        }
1105
1106        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1107            * opts.split_group_size_ratio) as u64;
1108
1109        if (group.group_size + next_group.group_size) > size_limit {
1110            return Err(Error::CompactionGroup(format!(
1111                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1112                group.group_id,
1113                group.group_size,
1114                next_group.group_id,
1115                next_group.group_size,
1116                size_limit
1117            )));
1118        }
1119
1120        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1121            return Err(Error::CompactionGroup(format!(
1122                "Cannot merge creating group {} next group {}",
1123                group.group_id, next_group.group_id
1124            )));
1125        }
1126
1127        if !Self::check_is_low_write_throughput_compaction_group(
1128            table_write_throughput_statistic_manager,
1129            next_group,
1130            opts,
1131        ) {
1132            return Err(Error::CompactionGroup(format!(
1133                "Cannot merge high throughput group {} next group {}",
1134                group.group_id, next_group.group_id
1135            )));
1136        }
1137
1138        {
1139            // Avoid merge when the group is in emergency state
1140            let versioning_guard = versioning.read().await;
1141            let levels = &versioning_guard.current_version.levels;
1142            if !levels.contains_key(&group.group_id) {
1143                return Err(Error::CompactionGroup(format!(
1144                    "Cannot merge group {} not exist",
1145                    group.group_id
1146                )));
1147            }
1148
1149            if !levels.contains_key(&next_group.group_id) {
1150                return Err(Error::CompactionGroup(format!(
1151                    "Cannot merge next group {} not exist",
1152                    next_group.group_id
1153                )));
1154            }
1155
1156            let group_levels = versioning_guard
1157                .current_version
1158                .get_compaction_group_levels(group.group_id);
1159
1160            let next_group_levels = versioning_guard
1161                .current_version
1162                .get_compaction_group_levels(next_group.group_id);
1163
1164            let group_state = GroupStateValidator::group_state(
1165                group_levels,
1166                group.compaction_group_config.compaction_config().deref(),
1167            );
1168
1169            if group_state.is_write_stop() || group_state.is_emergency() {
1170                return Err(Error::CompactionGroup(format!(
1171                    "Cannot merge write limit group {} next group {}",
1172                    group.group_id, next_group.group_id
1173                )));
1174            }
1175
1176            let next_group_state = GroupStateValidator::group_state(
1177                next_group_levels,
1178                next_group
1179                    .compaction_group_config
1180                    .compaction_config()
1181                    .deref(),
1182            );
1183
1184            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1185                return Err(Error::CompactionGroup(format!(
1186                    "Cannot merge write limit next group {} group {}",
1187                    next_group.group_id, group.group_id
1188                )));
1189            }
1190
1191            // check whether the group is in the write stop state after merge
1192            let l0_sub_level_count_after_merge =
1193                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1194            if GroupStateValidator::write_stop_l0_file_count(
1195                (l0_sub_level_count_after_merge as f64
1196                    * opts.compaction_group_merge_dimension_threshold) as usize,
1197                group.compaction_group_config.compaction_config().deref(),
1198            ) {
1199                return Err(Error::CompactionGroup(format!(
1200                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1201                    group.group_id, next_group.group_id
1202                )));
1203            }
1204
1205            let l0_file_count_after_merge =
1206                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1207            if GroupStateValidator::write_stop_l0_file_count(
1208                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1209                    as usize,
1210                group.compaction_group_config.compaction_config().deref(),
1211            ) {
1212                return Err(Error::CompactionGroup(format!(
1213                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1214                    next_group.group_id, group.group_id
1215                )));
1216            }
1217
1218            let l0_size_after_merge =
1219                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1220
1221            if GroupStateValidator::write_stop_l0_size(
1222                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1223                    as u64,
1224                group.compaction_group_config.compaction_config().deref(),
1225            ) {
1226                return Err(Error::CompactionGroup(format!(
1227                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1228                    next_group.group_id, group.group_id
1229                )));
1230            }
1231
1232            // check whether the group is in the emergency state after merge
1233            if GroupStateValidator::emergency_l0_file_count(
1234                (l0_sub_level_count_after_merge as f64
1235                    * opts.compaction_group_merge_dimension_threshold) as usize,
1236                group.compaction_group_config.compaction_config().deref(),
1237            ) {
1238                return Err(Error::CompactionGroup(format!(
1239                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1240                    group.group_id, next_group.group_id
1241                )));
1242            }
1243        }
1244
1245        Ok(())
1246    }
1247}