risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<TableId>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<TableId>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<TableId>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(table_id))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids
168        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169            return Err(Error::CompactionGroup(format!(
170                "invalid merge group_1 {} group_2 {}",
171                left_group_id, right_group_id
172            )));
173        }
174
175        let combined_member_table_ids = member_table_ids_1
176            .iter()
177            .chain(member_table_ids_2.iter())
178            .collect_vec();
179        assert!(combined_member_table_ids.is_sorted());
180
181        // check duplicated sst_id
182        let mut sst_id_set = HashSet::new();
183        for sst_id in versioning
184            .current_version
185            .get_sst_ids_by_group_id(left_group_id)
186            .chain(
187                versioning
188                    .current_version
189                    .get_sst_ids_by_group_id(right_group_id),
190            )
191        {
192            if !sst_id_set.insert(sst_id) {
193                return Err(Error::CompactionGroup(format!(
194                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195                    left_group_id, right_group_id, sst_id
196                )));
197            }
198        }
199
200        // check branched sst on non-overlap level
201        {
202            let left_levels = versioning
203                .current_version
204                .get_compaction_group_levels(group_1);
205
206            let right_levels = versioning
207                .current_version
208                .get_compaction_group_levels(group_2);
209
210            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
211            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
212            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213            for level_idx in 1..=max_level {
214                let left_level = left_levels.get_level(level_idx);
215                let right_level = right_levels.get_level(level_idx);
216                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217                    continue;
218                }
219
220                let left_last_sst = left_level.table_infos.last().unwrap().clone();
221                let right_first_sst = right_level.table_infos.first().unwrap().clone();
222                let left_sst_id = left_last_sst.sst_id;
223                let right_sst_id = right_first_sst.sst_id;
224                let left_obj_id = left_last_sst.object_id;
225                let right_obj_id = right_first_sst.object_id;
226
227                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
228                if !can_concat(&[left_last_sst, right_first_sst]) {
229                    return Err(Error::CompactionGroup(format!(
230                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231                        left_group_id,
232                        right_group_id,
233                        level_idx,
234                        left_sst_id,
235                        right_sst_id,
236                        left_obj_id,
237                        right_obj_id
238                    )));
239                }
240            }
241        }
242
243        let mut version = HummockVersionTransaction::new(
244            &mut versioning.current_version,
245            &mut versioning.hummock_version_deltas,
246            self.env.notification_manager(),
247            None,
248            &self.metrics,
249        );
250        let mut new_version_delta = version.new_delta();
251
252        let target_compaction_group_id = {
253            // merge right_group_id to left_group_id and remove right_group_id
254            new_version_delta.group_deltas.insert(
255                left_group_id,
256                GroupDeltas {
257                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258                        left_group_id,
259                        right_group_id,
260                    })],
261                },
262            );
263            left_group_id
264        };
265
266        // TODO: remove compaciton group_id from state_table_info
267        // rewrite compaction_group_id for all tables
268        new_version_delta.with_latest_version(|version, new_version_delta| {
269            for &table_id in combined_member_table_ids {
270                let info = version
271                    .state_table_info
272                    .info()
273                    .get(&table_id)
274                    .expect("have check exist previously");
275                assert!(
276                    new_version_delta
277                        .state_table_info_delta
278                        .insert(
279                            table_id,
280                            PbStateTableInfoDelta {
281                                committed_epoch: info.committed_epoch,
282                                compaction_group_id: target_compaction_group_id,
283                            }
284                        )
285                        .is_none()
286                );
287            }
288        });
289
290        {
291            let mut compaction_group_manager = self.compaction_group_manager.write().await;
292            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
293
294            // for metrics reclaim
295            {
296                let right_group_max_level = new_version_delta
297                    .latest_version()
298                    .get_compaction_group_levels(right_group_id)
299                    .levels
300                    .len();
301
302                remove_compaction_group_in_sst_stat(
303                    &self.metrics,
304                    right_group_id,
305                    right_group_max_level,
306                );
307            }
308
309            // clear `partition_vnode_count` for the hybrid group
310            {
311                if let Err(err) = compaction_groups_txn.update_compaction_config(
312                    &[left_group_id],
313                    &[MutableConfig::SplitWeightByVnode(0)], // default
314                ) {
315                    tracing::error!(
316                        error = %err.as_report(),
317                        "failed to update compaction config for group-{}",
318                        left_group_id
319                    );
320                }
321            }
322
323            new_version_delta.pre_apply();
324
325            // remove right_group_id
326            compaction_groups_txn.remove(right_group_id);
327            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
328        }
329
330        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
331        versioning.mark_next_time_travel_version_snapshot();
332
333        // cancel tasks
334        let mut canceled_tasks = vec![];
335        // after merge, all tasks in right_group_id should be canceled
336        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
337        let compact_task_assignments =
338            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
339        compact_task_assignments
340            .into_iter()
341            .for_each(|task_assignment| {
342                if let Some(task) = task_assignment.compact_task.as_ref() {
343                    assert_eq!(task.compaction_group_id, right_group_id);
344                    canceled_tasks.push(ReportTask {
345                        task_id: task.task_id,
346                        task_status: TaskStatus::ManualCanceled,
347                        table_stats_change: HashMap::default(),
348                        sorted_output_ssts: vec![],
349                        object_timestamps: HashMap::default(),
350                    });
351                }
352            });
353
354        if !canceled_tasks.is_empty() {
355            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
356                .await?;
357        }
358
359        self.metrics
360            .merge_compaction_group_count
361            .with_label_values(&[&left_group_id.to_string()])
362            .inc();
363
364        Ok(())
365    }
366}
367
368impl HummockManager {
369    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
370    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
371    /// The split will follow the following rules
372    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
373    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
374    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
375    ///   - `parent_group_id`: the `group_id` to split
376    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
377    ///   - `table_id_to_split`: the `table_id` to split
378    ///   - `vnode_to_split`: the `vnode` to split
379    ///   - `partition_vnode_count`: the partition count for the single table group if need
380    async fn split_compaction_group_impl(
381        &self,
382        parent_group_id: CompactionGroupId,
383        split_table_ids: &[StateTableId],
384        table_id_to_split: StateTableId,
385        vnode_to_split: VirtualNode,
386        partition_vnode_count: Option<u32>,
387    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
388        let mut result = vec![];
389        let compaction_guard = self.compaction.write().await;
390        let mut versioning_guard = self.versioning.write().await;
391        let versioning = versioning_guard.deref_mut();
392        // Validate parameters.
393        if !versioning
394            .current_version
395            .levels
396            .contains_key(&parent_group_id)
397        {
398            return Err(Error::CompactionGroup(format!(
399                "invalid group {}",
400                parent_group_id
401            )));
402        }
403
404        let member_table_ids = versioning
405            .current_version
406            .state_table_info
407            .compaction_group_member_table_ids(parent_group_id)
408            .iter()
409            .copied()
410            .collect::<BTreeSet<_>>();
411
412        if !member_table_ids.contains(&table_id_to_split) {
413            return Err(Error::CompactionGroup(format!(
414                "table {} doesn't in group {}",
415                table_id_to_split, parent_group_id
416            )));
417        }
418
419        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
420
421        // change to vec for partition
422        let table_ids = member_table_ids.into_iter().collect_vec();
423        if table_ids == split_table_ids {
424            return Err(Error::CompactionGroup(format!(
425                "invalid split attempt for group {}: all member tables are moved",
426                parent_group_id
427            )));
428        }
429        // avoid decode split_key when caller is aware of the table_id and vnode
430        let (table_ids_left, table_ids_right) =
431            group_split::split_table_ids_with_table_id_and_vnode(
432                &table_ids,
433                split_full_key.user_key.table_id,
434                split_full_key.user_key.get_vnode_id(),
435            );
436        if table_ids_left.is_empty() || table_ids_right.is_empty() {
437            // not need to split group if all tables are in the same side
438            if !table_ids_left.is_empty() {
439                result.push((parent_group_id, table_ids_left));
440            }
441
442            if !table_ids_right.is_empty() {
443                result.push((parent_group_id, table_ids_right));
444            }
445            return Ok(result);
446        }
447
448        result.push((parent_group_id, table_ids_left));
449
450        let split_key: Bytes = split_full_key.encode().into();
451
452        let mut version = HummockVersionTransaction::new(
453            &mut versioning.current_version,
454            &mut versioning.hummock_version_deltas,
455            self.env.notification_manager(),
456            None,
457            &self.metrics,
458        );
459        let mut new_version_delta = version.new_delta();
460
461        let split_sst_count = new_version_delta
462            .latest_version()
463            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
464
465        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
466        let (new_compaction_group_id, config) = {
467            // All NewCompactionGroup pairs are mapped to one new compaction group.
468            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
469            // The new config will be persisted later.
470            let config = self
471                .compaction_group_manager
472                .read()
473                .await
474                .default_compaction_config()
475                .as_ref()
476                .clone();
477
478            #[expect(deprecated)]
479            // fill the deprecated field with default value
480            new_version_delta.group_deltas.insert(
481                new_compaction_group_id,
482                GroupDeltas {
483                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
484                        group_config: Some(config.clone()),
485                        group_id: new_compaction_group_id,
486                        parent_group_id,
487                        new_sst_start_id: new_sst_start_id.inner(),
488                        table_ids: vec![],
489                        version: CompatibilityVersion::LATEST as _, // for compatibility
490                        split_key: Some(split_key.into()),
491                    }))],
492                },
493            );
494            (new_compaction_group_id, config)
495        };
496
497        new_version_delta.with_latest_version(|version, new_version_delta| {
498            for &table_id in &table_ids_right {
499                let info = version
500                    .state_table_info
501                    .info()
502                    .get(&table_id)
503                    .expect("have check exist previously");
504                assert!(
505                    new_version_delta
506                        .state_table_info_delta
507                        .insert(
508                            table_id,
509                            PbStateTableInfoDelta {
510                                committed_epoch: info.committed_epoch,
511                                compaction_group_id: new_compaction_group_id,
512                            }
513                        )
514                        .is_none()
515                );
516            }
517        });
518
519        result.push((new_compaction_group_id, table_ids_right));
520
521        {
522            let mut compaction_group_manager = self.compaction_group_manager.write().await;
523            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
524            compaction_groups_txn
525                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
526
527            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
528            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
529            // The target `table_ids` might be split to an existing group, so we need to try to update its config
530            for (cg_id, table_ids) in &result {
531                // check the split_tables had been place to the dedicated compaction group
532                if let Some(partition_vnode_count) = partition_vnode_count
533                    && table_ids.len() == 1
534                    && table_ids == split_table_ids
535                    && let Err(err) = compaction_groups_txn.update_compaction_config(
536                        &[*cg_id],
537                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
538                    )
539                {
540                    tracing::error!(
541                        error = %err.as_report(),
542                        "failed to update compaction config for group-{}",
543                        cg_id
544                    );
545                }
546            }
547
548            new_version_delta.pre_apply();
549            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
550        }
551        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
552        versioning.mark_next_time_travel_version_snapshot();
553
554        // The expired compact tasks will be canceled.
555        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
556        let mut canceled_tasks = vec![];
557        let compact_task_assignments =
558            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
559        let levels = versioning
560            .current_version
561            .get_compaction_group_levels(parent_group_id);
562        compact_task_assignments
563            .into_iter()
564            .for_each(|task_assignment| {
565                if let Some(task) = task_assignment.compact_task.as_ref() {
566                    let is_expired = is_compaction_task_expired(
567                        task.compaction_group_version_id,
568                        levels.compaction_group_version_id,
569                    );
570                    if is_expired {
571                        canceled_tasks.push(ReportTask {
572                            task_id: task.task_id,
573                            task_status: TaskStatus::ManualCanceled,
574                            table_stats_change: HashMap::default(),
575                            sorted_output_ssts: vec![],
576                            object_timestamps: HashMap::default(),
577                        });
578                    }
579                }
580            });
581
582        if !canceled_tasks.is_empty() {
583            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
584                .await?;
585        }
586
587        self.metrics
588            .split_compaction_group_count
589            .with_label_values(&[&parent_group_id.to_string()])
590            .inc();
591
592        Ok(result)
593    }
594
595    /// Split `table_ids` to a dedicated compaction group.
596    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
597    pub async fn move_state_tables_to_dedicated_compaction_group(
598        &self,
599        parent_group_id: CompactionGroupId,
600        table_ids: &[StateTableId],
601        partition_vnode_count: Option<u32>,
602    ) -> Result<(
603        CompactionGroupId,
604        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
605    )> {
606        if table_ids.is_empty() {
607            return Err(Error::CompactionGroup(
608                "table_ids must not be empty".to_owned(),
609            ));
610        }
611
612        if !table_ids.is_sorted() {
613            return Err(Error::CompactionGroup(
614                "table_ids must be sorted".to_owned(),
615            ));
616        }
617
618        let parent_table_ids = {
619            let versioning_guard = self.versioning.read().await;
620            versioning_guard
621                .current_version
622                .state_table_info
623                .compaction_group_member_table_ids(parent_group_id)
624                .iter()
625                .copied()
626                .collect_vec()
627        };
628
629        if parent_table_ids == table_ids {
630            return Err(Error::CompactionGroup(format!(
631                "invalid split attempt for group {}: all member tables are moved",
632                parent_group_id
633            )));
634        }
635
636        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<TableId>>) {
637            // 1. table_ids in different cg are sorted.
638            {
639                cg_id_to_table_ids
640                    .iter()
641                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
642            }
643
644            // 2.table_ids in different cg are non-overlapping
645            {
646                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
647                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
648                assert!(table_table_ids_vec.concat().is_sorted());
649            }
650
651            // 3.table_ids belong to one and only one cg.
652            {
653                let mut all_table_ids = HashSet::new();
654                for table_ids in cg_id_to_table_ids.values() {
655                    for table_id in table_ids {
656                        assert!(all_table_ids.insert(*table_id));
657                    }
658                }
659            }
660        }
661
662        // move [3,4,5,6]
663        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
664        // split key
665        // 1. table_id = 3, vnode = 0, epoch = MAX
666        // 2. table_id = 7, vnode = 0, epoch = MAX
667
668        // The new compaction group id is always generate on the right side
669        // Hence, we return the first compaction group id as the result
670        // split 1
671        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<TableId>> = BTreeMap::new();
672        let table_id_to_split = *table_ids.first().unwrap();
673        let mut target_compaction_group_id = 0;
674        let result_vec = self
675            .split_compaction_group_impl(
676                parent_group_id,
677                table_ids,
678                table_id_to_split,
679                VirtualNode::ZERO,
680                partition_vnode_count,
681            )
682            .await?;
683        assert!(result_vec.len() <= 2);
684
685        let mut finish_move = false;
686        for (cg_id, table_ids_after_split) in result_vec {
687            if table_ids_after_split.contains(&table_id_to_split) {
688                target_compaction_group_id = cg_id;
689            }
690
691            if table_ids_after_split == table_ids {
692                finish_move = true;
693            }
694
695            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
696        }
697        check_table_ids_valid(&cg_id_to_table_ids);
698
699        if finish_move {
700            return Ok((target_compaction_group_id, cg_id_to_table_ids));
701        }
702
703        // split 2
704        // See the example above and the split rule in `split_compaction_group_impl`.
705        let table_id_to_split = *table_ids.last().unwrap();
706        let result_vec = self
707            .split_compaction_group_impl(
708                target_compaction_group_id,
709                table_ids,
710                table_id_to_split,
711                VirtualNode::MAX_REPRESENTABLE,
712                partition_vnode_count,
713            )
714            .await?;
715        assert!(result_vec.len() <= 2);
716        for (cg_id, table_ids_after_split) in result_vec {
717            if table_ids_after_split.contains(&table_id_to_split) {
718                target_compaction_group_id = cg_id;
719            }
720            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
721        }
722        check_table_ids_valid(&cg_id_to_table_ids);
723
724        Ok((target_compaction_group_id, cg_id_to_table_ids))
725    }
726}
727
728impl HummockManager {
729    /// Split the compaction group if the group is too large or contains high throughput tables.
730    pub async fn try_split_compaction_group(
731        &self,
732        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
733        group: CompactionGroupStatistic,
734    ) {
735        if group
736            .compaction_group_config
737            .compaction_config
738            .disable_auto_group_scheduling
739            .unwrap_or(false)
740        {
741            return;
742        }
743        // split high throughput table to dedicated compaction group
744        for (table_id, table_size) in &group.table_statistic {
745            self.try_move_high_throughput_table_to_dedicated_cg(
746                table_write_throughput_statistic_manager,
747                *table_id,
748                table_size,
749                group.group_id,
750            )
751            .await;
752        }
753
754        // split the huge group to multiple groups
755        self.try_split_huge_compaction_group(group).await;
756    }
757
758    /// Try to move the high throughput table to a dedicated compaction group.
759    pub async fn try_move_high_throughput_table_to_dedicated_cg(
760        &self,
761        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
762        table_id: TableId,
763        _table_size: &u64,
764        parent_group_id: u64,
765    ) {
766        let mut table_throughput = table_write_throughput_statistic_manager
767            .get_table_throughput_descending(
768                table_id,
769                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
770            )
771            .peekable();
772
773        if table_throughput.peek().is_none() {
774            return;
775        }
776
777        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
778            table_throughput,
779            self.env.opts.table_high_write_throughput_threshold,
780            self.env
781                .opts
782                .table_stat_high_write_throughput_ratio_for_split,
783        );
784
785        // do not split a table to dedicated compaction group if it is not high write throughput
786        if !is_high_write_throughput {
787            return;
788        }
789
790        let ret = self
791            .move_state_tables_to_dedicated_compaction_group(
792                parent_group_id,
793                &[table_id],
794                Some(self.env.opts.partition_vnode_count),
795            )
796            .await;
797        match ret {
798            Ok(split_result) => {
799                tracing::info!(
800                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
801                    table_id,
802                    parent_group_id,
803                    self.env.opts.partition_vnode_count,
804                    split_result
805                );
806            }
807            Err(e) => {
808                tracing::info!(
809                    error = %e.as_report(),
810                    "failed to split state table [{}] from group-{}",
811                    table_id,
812                    parent_group_id,
813                )
814            }
815        }
816    }
817
818    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
819        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
820            * self.env.opts.split_group_size_ratio) as u64;
821        let is_huge_hybrid_group =
822            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
823        if is_huge_hybrid_group {
824            let mut accumulated_size = 0;
825            let mut table_ids = Vec::default();
826            for (table_id, table_size) in &group.table_statistic {
827                accumulated_size += table_size;
828                table_ids.push(*table_id);
829                // split if the accumulated size is greater than half of the group size
830                // avoid split a small table to dedicated compaction group and trigger multiple merge
831                assert!(table_ids.is_sorted());
832                if accumulated_size * 2 > group_max_size {
833                    let ret = self
834                        .move_state_tables_to_dedicated_compaction_group(
835                            group.group_id,
836                            &table_ids,
837                            None,
838                        )
839                        .await;
840                    match ret {
841                        Ok(split_result) => {
842                            tracing::info!(
843                                "split_huge_compaction_group success {:?}",
844                                split_result
845                            );
846                            self.metrics
847                                .split_compaction_group_count
848                                .with_label_values(&[&group.group_id.to_string()])
849                                .inc();
850                            return;
851                        }
852                        Err(e) => {
853                            tracing::error!(
854                                error = %e.as_report(),
855                                "failed to split_huge_compaction_group table {:?} from group-{}",
856                                table_ids,
857                                group.group_id
858                            );
859
860                            return;
861                        }
862                    }
863                }
864            }
865        }
866    }
867
868    pub async fn try_merge_compaction_group(
869        &self,
870        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
871        group: &CompactionGroupStatistic,
872        next_group: &CompactionGroupStatistic,
873        created_tables: &HashSet<TableId>,
874    ) -> Result<()> {
875        GroupMergeValidator::validate_group_merge(
876            group,
877            next_group,
878            created_tables,
879            table_write_throughput_statistic_manager,
880            &self.env.opts,
881            &self.versioning,
882        )
883        .await?;
884
885        match self
886            .merge_compaction_group(group.group_id, next_group.group_id)
887            .await
888        {
889            Ok(()) => {
890                tracing::info!(
891                    "merge group-{} to group-{}",
892                    next_group.group_id,
893                    group.group_id,
894                );
895
896                self.metrics
897                    .merge_compaction_group_count
898                    .with_label_values(&[&group.group_id.to_string()])
899                    .inc();
900            }
901            Err(e) => {
902                tracing::info!(
903                    error = %e.as_report(),
904                    "failed to merge group-{} group-{}",
905                    next_group.group_id,
906                    group.group_id,
907                )
908            }
909        }
910
911        Ok(())
912    }
913}
914
915#[derive(Debug, Default)]
916struct GroupMergeValidator {}
917
918impl GroupMergeValidator {
919    /// Check if the table is high write throughput with the given threshold and ratio.
920    pub fn is_table_high_write_throughput(
921        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
922        threshold: u64,
923        high_write_throughput_ratio: f64,
924    ) -> bool {
925        let mut sample_size = 0;
926        let mut high_write_throughput_count = 0;
927        for statistic in table_throughput {
928            sample_size += 1;
929            if statistic.throughput > threshold {
930                high_write_throughput_count += 1;
931            }
932        }
933
934        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
935    }
936
937    pub fn is_table_low_write_throughput(
938        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
939        threshold: u64,
940        low_write_throughput_ratio: f64,
941    ) -> bool {
942        let mut sample_size = 0;
943        let mut low_write_throughput_count = 0;
944        for statistic in table_throughput {
945            sample_size += 1;
946            if statistic.throughput <= threshold {
947                low_write_throughput_count += 1;
948            }
949        }
950
951        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
952    }
953
954    fn check_is_low_write_throughput_compaction_group(
955        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
956        group: &CompactionGroupStatistic,
957        opts: &Arc<MetaOpts>,
958    ) -> bool {
959        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
960        for table_id in group.table_statistic.keys() {
961            let mut table_throughput = table_write_throughput_statistic_manager
962                .get_table_throughput_descending(
963                    *table_id,
964                    opts.table_stat_throuput_window_seconds_for_merge as i64,
965                )
966                .peekable();
967            if table_throughput.peek().is_none() {
968                continue;
969            }
970
971            table_with_statistic.push(table_throughput);
972        }
973
974        // if all tables in the group do not have enough statistics, return true
975        if table_with_statistic.is_empty() {
976            return true;
977        }
978
979        // check if all tables in the group are low write throughput with enough statistics
980        table_with_statistic.into_iter().all(|table_throughput| {
981            Self::is_table_low_write_throughput(
982                table_throughput,
983                opts.table_low_write_throughput_threshold,
984                opts.table_stat_low_write_throughput_ratio_for_merge,
985            )
986        })
987    }
988
989    fn check_is_creating_compaction_group(
990        group: &CompactionGroupStatistic,
991        created_tables: &HashSet<TableId>,
992    ) -> bool {
993        group
994            .table_statistic
995            .keys()
996            .any(|table_id| !created_tables.contains(table_id))
997    }
998
999    async fn validate_group_merge(
1000        group: &CompactionGroupStatistic,
1001        next_group: &CompactionGroupStatistic,
1002        created_tables: &HashSet<TableId>,
1003        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1004        opts: &Arc<MetaOpts>,
1005        versioning: &MonitoredRwLock<Versioning>,
1006    ) -> Result<()> {
1007        // TODO: remove this check after refactor group id
1008        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1009            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1010            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1011                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1012        {
1013            return Err(Error::CompactionGroup(format!(
1014                "group-{} and group-{} are both StaticCompactionGroupId",
1015                group.group_id, next_group.group_id
1016            )));
1017        }
1018
1019        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1020            return Err(Error::CompactionGroup(format!(
1021                "group-{} or group-{} is empty",
1022                group.group_id, next_group.group_id
1023            )));
1024        }
1025
1026        if group
1027            .compaction_group_config
1028            .compaction_config
1029            .disable_auto_group_scheduling
1030            .unwrap_or(false)
1031            || next_group
1032                .compaction_group_config
1033                .compaction_config
1034                .disable_auto_group_scheduling
1035                .unwrap_or(false)
1036        {
1037            return Err(Error::CompactionGroup(format!(
1038                "group-{} or group-{} disable_auto_group_scheduling",
1039                group.group_id, next_group.group_id
1040            )));
1041        }
1042
1043        // do not merge the compaction group which is creating
1044        if Self::check_is_creating_compaction_group(group, created_tables) {
1045            return Err(Error::CompactionGroup(format!(
1046                "Not Merge creating group {} next_group {}",
1047                group.group_id, next_group.group_id
1048            )));
1049        }
1050
1051        // do not merge high throughput group
1052        if !Self::check_is_low_write_throughput_compaction_group(
1053            table_write_throughput_statistic_manager,
1054            group,
1055            opts,
1056        ) {
1057            return Err(Error::CompactionGroup(format!(
1058                "Not Merge high throughput group {} next_group {}",
1059                group.group_id, next_group.group_id
1060            )));
1061        }
1062
1063        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1064            * opts.split_group_size_ratio) as u64;
1065
1066        if (group.group_size + next_group.group_size) > size_limit {
1067            return Err(Error::CompactionGroup(format!(
1068                "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1069                group.group_id,
1070                group.group_size,
1071                next_group.group_id,
1072                next_group.group_size,
1073                size_limit
1074            )));
1075        }
1076
1077        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1078            return Err(Error::CompactionGroup(format!(
1079                "Not Merge creating group {} next group {}",
1080                group.group_id, next_group.group_id
1081            )));
1082        }
1083
1084        if !Self::check_is_low_write_throughput_compaction_group(
1085            table_write_throughput_statistic_manager,
1086            next_group,
1087            opts,
1088        ) {
1089            return Err(Error::CompactionGroup(format!(
1090                "Not Merge high throughput group {} next group {}",
1091                group.group_id, next_group.group_id
1092            )));
1093        }
1094
1095        {
1096            // Avoid merge when the group is in emergency state
1097            let versioning_guard = versioning.read().await;
1098            let levels = &versioning_guard.current_version.levels;
1099            if !levels.contains_key(&group.group_id) {
1100                return Err(Error::CompactionGroup(format!(
1101                    "Not Merge group {} not exist",
1102                    group.group_id
1103                )));
1104            }
1105
1106            if !levels.contains_key(&next_group.group_id) {
1107                return Err(Error::CompactionGroup(format!(
1108                    "Not Merge next group {} not exist",
1109                    next_group.group_id
1110                )));
1111            }
1112
1113            let group_levels = versioning_guard
1114                .current_version
1115                .get_compaction_group_levels(group.group_id);
1116
1117            let next_group_levels = versioning_guard
1118                .current_version
1119                .get_compaction_group_levels(next_group.group_id);
1120
1121            let group_state = GroupStateValidator::group_state(
1122                group_levels,
1123                group.compaction_group_config.compaction_config().deref(),
1124            );
1125
1126            if group_state.is_write_stop() || group_state.is_emergency() {
1127                return Err(Error::CompactionGroup(format!(
1128                    "Not Merge write limit group {} next group {}",
1129                    group.group_id, next_group.group_id
1130                )));
1131            }
1132
1133            let next_group_state = GroupStateValidator::group_state(
1134                next_group_levels,
1135                next_group
1136                    .compaction_group_config
1137                    .compaction_config()
1138                    .deref(),
1139            );
1140
1141            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1142                return Err(Error::CompactionGroup(format!(
1143                    "Not Merge write limit next group {} group {}",
1144                    next_group.group_id, group.group_id
1145                )));
1146            }
1147
1148            // check whether the group is in the write stop state after merge
1149            let l0_sub_level_count_after_merge =
1150                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1151            if GroupStateValidator::write_stop_l0_file_count(
1152                (l0_sub_level_count_after_merge as f64
1153                    * opts.compaction_group_merge_dimension_threshold) as usize,
1154                group.compaction_group_config.compaction_config().deref(),
1155            ) {
1156                return Err(Error::CompactionGroup(format!(
1157                    "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1158                    group.group_id, next_group.group_id
1159                )));
1160            }
1161
1162            let l0_file_count_after_merge =
1163                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1164            if GroupStateValidator::write_stop_l0_file_count(
1165                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1166                    as usize,
1167                group.compaction_group_config.compaction_config().deref(),
1168            ) {
1169                return Err(Error::CompactionGroup(format!(
1170                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1171                    next_group.group_id, group.group_id
1172                )));
1173            }
1174
1175            let l0_size_after_merge =
1176                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1177
1178            if GroupStateValidator::write_stop_l0_size(
1179                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1180                    as u64,
1181                group.compaction_group_config.compaction_config().deref(),
1182            ) {
1183                return Err(Error::CompactionGroup(format!(
1184                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1185                    next_group.group_id, group.group_id
1186                )));
1187            }
1188
1189            // check whether the group is in the emergency state after merge
1190            if GroupStateValidator::emergency_l0_file_count(
1191                (l0_sub_level_count_after_merge as f64
1192                    * opts.compaction_group_merge_dimension_threshold) as usize,
1193                group.compaction_group_config.compaction_config().deref(),
1194            ) {
1195                return Err(Error::CompactionGroup(format!(
1196                    "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1197                    group.group_id, next_group.group_id
1198                )));
1199            }
1200        }
1201
1202        Ok(())
1203    }
1204}