risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<u32>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<u32>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<u32>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(&table_id.table_id()))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids
168        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
169            return Err(Error::CompactionGroup(format!(
170                "invalid merge group_1 {} group_2 {}",
171                left_group_id, right_group_id
172            )));
173        }
174
175        let combined_member_table_ids = member_table_ids_1
176            .iter()
177            .chain(member_table_ids_2.iter())
178            .collect_vec();
179        assert!(combined_member_table_ids.is_sorted());
180
181        // check duplicated sst_id
182        let mut sst_id_set = HashSet::new();
183        for sst_id in versioning
184            .current_version
185            .get_sst_ids_by_group_id(left_group_id)
186            .chain(
187                versioning
188                    .current_version
189                    .get_sst_ids_by_group_id(right_group_id),
190            )
191        {
192            if !sst_id_set.insert(sst_id) {
193                return Err(Error::CompactionGroup(format!(
194                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
195                    left_group_id, right_group_id, sst_id
196                )));
197            }
198        }
199
200        // check branched sst on non-overlap level
201        {
202            let left_levels = versioning
203                .current_version
204                .get_compaction_group_levels(group_1);
205
206            let right_levels = versioning
207                .current_version
208                .get_compaction_group_levels(group_2);
209
210            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
211            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
212            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
213            for level_idx in 1..=max_level {
214                let left_level = left_levels.get_level(level_idx);
215                let right_level = right_levels.get_level(level_idx);
216                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
217                    continue;
218                }
219
220                let left_last_sst = left_level.table_infos.last().unwrap().clone();
221                let right_first_sst = right_level.table_infos.first().unwrap().clone();
222                let left_sst_id = left_last_sst.sst_id;
223                let right_sst_id = right_first_sst.sst_id;
224                let left_obj_id = left_last_sst.object_id;
225                let right_obj_id = right_first_sst.object_id;
226
227                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
228                if !can_concat(&[left_last_sst, right_first_sst]) {
229                    return Err(Error::CompactionGroup(format!(
230                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
231                        left_group_id,
232                        right_group_id,
233                        level_idx,
234                        left_sst_id,
235                        right_sst_id,
236                        left_obj_id,
237                        right_obj_id
238                    )));
239                }
240            }
241        }
242
243        let mut version = HummockVersionTransaction::new(
244            &mut versioning.current_version,
245            &mut versioning.hummock_version_deltas,
246            self.env.notification_manager(),
247            None,
248            &self.metrics,
249        );
250        let mut new_version_delta = version.new_delta();
251
252        let target_compaction_group_id = {
253            // merge right_group_id to left_group_id and remove right_group_id
254            new_version_delta.group_deltas.insert(
255                left_group_id,
256                GroupDeltas {
257                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
258                        left_group_id,
259                        right_group_id,
260                    })],
261                },
262            );
263            left_group_id
264        };
265
266        // TODO: remove compaciton group_id from state_table_info
267        // rewrite compaction_group_id for all tables
268        new_version_delta.with_latest_version(|version, new_version_delta| {
269            for table_id in combined_member_table_ids {
270                let table_id = TableId::new(table_id.table_id());
271                let info = version
272                    .state_table_info
273                    .info()
274                    .get(&table_id)
275                    .expect("have check exist previously");
276                assert!(
277                    new_version_delta
278                        .state_table_info_delta
279                        .insert(
280                            table_id,
281                            PbStateTableInfoDelta {
282                                committed_epoch: info.committed_epoch,
283                                compaction_group_id: target_compaction_group_id,
284                            }
285                        )
286                        .is_none()
287                );
288            }
289        });
290
291        {
292            let mut compaction_group_manager = self.compaction_group_manager.write().await;
293            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
294
295            // for metrics reclaim
296            {
297                let right_group_max_level = new_version_delta
298                    .latest_version()
299                    .get_compaction_group_levels(right_group_id)
300                    .levels
301                    .len();
302
303                remove_compaction_group_in_sst_stat(
304                    &self.metrics,
305                    right_group_id,
306                    right_group_max_level,
307                );
308            }
309
310            // clear `partition_vnode_count` for the hybrid group
311            {
312                if let Err(err) = compaction_groups_txn.update_compaction_config(
313                    &[left_group_id],
314                    &[MutableConfig::SplitWeightByVnode(0)], // default
315                ) {
316                    tracing::error!(
317                        error = %err.as_report(),
318                        "failed to update compaction config for group-{}",
319                        left_group_id
320                    );
321                }
322            }
323
324            new_version_delta.pre_apply();
325
326            // remove right_group_id
327            compaction_groups_txn.remove(right_group_id);
328            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
329        }
330
331        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
332        versioning.mark_next_time_travel_version_snapshot();
333
334        // cancel tasks
335        let mut canceled_tasks = vec![];
336        // after merge, all tasks in right_group_id should be canceled
337        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
338        let compact_task_assignments =
339            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
340        compact_task_assignments
341            .into_iter()
342            .for_each(|task_assignment| {
343                if let Some(task) = task_assignment.compact_task.as_ref() {
344                    assert_eq!(task.compaction_group_id, right_group_id);
345                    canceled_tasks.push(ReportTask {
346                        task_id: task.task_id,
347                        task_status: TaskStatus::ManualCanceled,
348                        table_stats_change: HashMap::default(),
349                        sorted_output_ssts: vec![],
350                        object_timestamps: HashMap::default(),
351                    });
352                }
353            });
354
355        if !canceled_tasks.is_empty() {
356            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
357                .await?;
358        }
359
360        self.metrics
361            .merge_compaction_group_count
362            .with_label_values(&[&left_group_id.to_string()])
363            .inc();
364
365        Ok(())
366    }
367}
368
369impl HummockManager {
370    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
371    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
372    /// The split will follow the following rules
373    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
374    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
375    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
376    ///   - `parent_group_id`: the `group_id` to split
377    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
378    ///   - `table_id_to_split`: the `table_id` to split
379    ///   - `vnode_to_split`: the `vnode` to split
380    ///   - `partition_vnode_count`: the partition count for the single table group if need
381    async fn split_compaction_group_impl(
382        &self,
383        parent_group_id: CompactionGroupId,
384        split_table_ids: &[StateTableId],
385        table_id_to_split: StateTableId,
386        vnode_to_split: VirtualNode,
387        partition_vnode_count: Option<u32>,
388    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
389        let mut result = vec![];
390        let compaction_guard = self.compaction.write().await;
391        let mut versioning_guard = self.versioning.write().await;
392        let versioning = versioning_guard.deref_mut();
393        // Validate parameters.
394        if !versioning
395            .current_version
396            .levels
397            .contains_key(&parent_group_id)
398        {
399            return Err(Error::CompactionGroup(format!(
400                "invalid group {}",
401                parent_group_id
402            )));
403        }
404
405        let member_table_ids = versioning
406            .current_version
407            .state_table_info
408            .compaction_group_member_table_ids(parent_group_id)
409            .iter()
410            .map(|table_id| table_id.table_id)
411            .collect::<BTreeSet<_>>();
412
413        if !member_table_ids.contains(&table_id_to_split) {
414            return Err(Error::CompactionGroup(format!(
415                "table {} doesn't in group {}",
416                table_id_to_split, parent_group_id
417            )));
418        }
419
420        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
421
422        // change to vec for partition
423        let table_ids = member_table_ids.into_iter().collect_vec();
424        if table_ids == split_table_ids {
425            return Err(Error::CompactionGroup(format!(
426                "invalid split attempt for group {}: all member tables are moved",
427                parent_group_id
428            )));
429        }
430        // avoid decode split_key when caller is aware of the table_id and vnode
431        let (table_ids_left, table_ids_right) =
432            group_split::split_table_ids_with_table_id_and_vnode(
433                &table_ids,
434                split_full_key.user_key.table_id.table_id(),
435                split_full_key.user_key.get_vnode_id(),
436            );
437        if table_ids_left.is_empty() || table_ids_right.is_empty() {
438            // not need to split group if all tables are in the same side
439            if !table_ids_left.is_empty() {
440                result.push((parent_group_id, table_ids_left));
441            }
442
443            if !table_ids_right.is_empty() {
444                result.push((parent_group_id, table_ids_right));
445            }
446            return Ok(result);
447        }
448
449        result.push((parent_group_id, table_ids_left));
450
451        let split_key: Bytes = split_full_key.encode().into();
452
453        let mut version = HummockVersionTransaction::new(
454            &mut versioning.current_version,
455            &mut versioning.hummock_version_deltas,
456            self.env.notification_manager(),
457            None,
458            &self.metrics,
459        );
460        let mut new_version_delta = version.new_delta();
461
462        let split_sst_count = new_version_delta
463            .latest_version()
464            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
465
466        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
467        let (new_compaction_group_id, config) = {
468            // All NewCompactionGroup pairs are mapped to one new compaction group.
469            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
470            // The new config will be persisted later.
471            let config = self
472                .compaction_group_manager
473                .read()
474                .await
475                .default_compaction_config()
476                .as_ref()
477                .clone();
478
479            #[expect(deprecated)]
480            // fill the deprecated field with default value
481            new_version_delta.group_deltas.insert(
482                new_compaction_group_id,
483                GroupDeltas {
484                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
485                        group_config: Some(config.clone()),
486                        group_id: new_compaction_group_id,
487                        parent_group_id,
488                        new_sst_start_id: new_sst_start_id.inner(),
489                        table_ids: vec![],
490                        version: CompatibilityVersion::LATEST as _, // for compatibility
491                        split_key: Some(split_key.into()),
492                    }))],
493                },
494            );
495            (new_compaction_group_id, config)
496        };
497
498        new_version_delta.with_latest_version(|version, new_version_delta| {
499            for table_id in &table_ids_right {
500                let table_id = TableId::new(*table_id);
501                let info = version
502                    .state_table_info
503                    .info()
504                    .get(&table_id)
505                    .expect("have check exist previously");
506                assert!(
507                    new_version_delta
508                        .state_table_info_delta
509                        .insert(
510                            table_id,
511                            PbStateTableInfoDelta {
512                                committed_epoch: info.committed_epoch,
513                                compaction_group_id: new_compaction_group_id,
514                            }
515                        )
516                        .is_none()
517                );
518            }
519        });
520
521        result.push((new_compaction_group_id, table_ids_right));
522
523        {
524            let mut compaction_group_manager = self.compaction_group_manager.write().await;
525            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
526            compaction_groups_txn
527                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
528
529            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
530            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
531            // The target `table_ids` might be split to an existing group, so we need to try to update its config
532            for (cg_id, table_ids) in &result {
533                // check the split_tables had been place to the dedicated compaction group
534                if let Some(partition_vnode_count) = partition_vnode_count
535                    && table_ids.len() == 1
536                    && table_ids == split_table_ids
537                    && let Err(err) = compaction_groups_txn.update_compaction_config(
538                        &[*cg_id],
539                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
540                    )
541                {
542                    tracing::error!(
543                        error = %err.as_report(),
544                        "failed to update compaction config for group-{}",
545                        cg_id
546                    );
547                }
548            }
549
550            new_version_delta.pre_apply();
551            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
552        }
553        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
554        versioning.mark_next_time_travel_version_snapshot();
555
556        // The expired compact tasks will be canceled.
557        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
558        let mut canceled_tasks = vec![];
559        let compact_task_assignments =
560            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
561        let levels = versioning
562            .current_version
563            .get_compaction_group_levels(parent_group_id);
564        compact_task_assignments
565            .into_iter()
566            .for_each(|task_assignment| {
567                if let Some(task) = task_assignment.compact_task.as_ref() {
568                    let is_expired = is_compaction_task_expired(
569                        task.compaction_group_version_id,
570                        levels.compaction_group_version_id,
571                    );
572                    if is_expired {
573                        canceled_tasks.push(ReportTask {
574                            task_id: task.task_id,
575                            task_status: TaskStatus::ManualCanceled,
576                            table_stats_change: HashMap::default(),
577                            sorted_output_ssts: vec![],
578                            object_timestamps: HashMap::default(),
579                        });
580                    }
581                }
582            });
583
584        if !canceled_tasks.is_empty() {
585            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
586                .await?;
587        }
588
589        self.metrics
590            .split_compaction_group_count
591            .with_label_values(&[&parent_group_id.to_string()])
592            .inc();
593
594        Ok(result)
595    }
596
597    /// Split `table_ids` to a dedicated compaction group.
598    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
599    pub async fn move_state_tables_to_dedicated_compaction_group(
600        &self,
601        parent_group_id: CompactionGroupId,
602        table_ids: &[StateTableId],
603        partition_vnode_count: Option<u32>,
604    ) -> Result<(
605        CompactionGroupId,
606        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
607    )> {
608        if table_ids.is_empty() {
609            return Err(Error::CompactionGroup(
610                "table_ids must not be empty".to_owned(),
611            ));
612        }
613
614        if !table_ids.is_sorted() {
615            return Err(Error::CompactionGroup(
616                "table_ids must be sorted".to_owned(),
617            ));
618        }
619
620        let parent_table_ids = {
621            let versioning_guard = self.versioning.read().await;
622            versioning_guard
623                .current_version
624                .state_table_info
625                .compaction_group_member_table_ids(parent_group_id)
626                .iter()
627                .map(|table_id| table_id.table_id)
628                .collect_vec()
629        };
630
631        if parent_table_ids == table_ids {
632            return Err(Error::CompactionGroup(format!(
633                "invalid split attempt for group {}: all member tables are moved",
634                parent_group_id
635            )));
636        }
637
638        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
639            // 1. table_ids in different cg are sorted.
640            {
641                cg_id_to_table_ids
642                    .iter()
643                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
644            }
645
646            // 2.table_ids in different cg are non-overlapping
647            {
648                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
649                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
650                assert!(table_table_ids_vec.concat().is_sorted());
651            }
652
653            // 3.table_ids belong to one and only one cg.
654            {
655                let mut all_table_ids = HashSet::new();
656                for table_ids in cg_id_to_table_ids.values() {
657                    for table_id in table_ids {
658                        assert!(all_table_ids.insert(*table_id));
659                    }
660                }
661            }
662        }
663
664        // move [3,4,5,6]
665        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
666        // split key
667        // 1. table_id = 3, vnode = 0, epoch = MAX
668        // 2. table_id = 7, vnode = 0, epoch = MAX
669
670        // The new compaction group id is always generate on the right side
671        // Hence, we return the first compaction group id as the result
672        // split 1
673        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
674        let table_id_to_split = *table_ids.first().unwrap();
675        let mut target_compaction_group_id = 0;
676        let result_vec = self
677            .split_compaction_group_impl(
678                parent_group_id,
679                table_ids,
680                table_id_to_split,
681                VirtualNode::ZERO,
682                partition_vnode_count,
683            )
684            .await?;
685        assert!(result_vec.len() <= 2);
686
687        let mut finish_move = false;
688        for (cg_id, table_ids_after_split) in result_vec {
689            if table_ids_after_split.contains(&table_id_to_split) {
690                target_compaction_group_id = cg_id;
691            }
692
693            if table_ids_after_split == table_ids {
694                finish_move = true;
695            }
696
697            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
698        }
699        check_table_ids_valid(&cg_id_to_table_ids);
700
701        if finish_move {
702            return Ok((target_compaction_group_id, cg_id_to_table_ids));
703        }
704
705        // split 2
706        // See the example above and the split rule in `split_compaction_group_impl`.
707        let table_id_to_split = *table_ids.last().unwrap();
708        let result_vec = self
709            .split_compaction_group_impl(
710                target_compaction_group_id,
711                table_ids,
712                table_id_to_split,
713                VirtualNode::MAX_REPRESENTABLE,
714                partition_vnode_count,
715            )
716            .await?;
717        assert!(result_vec.len() <= 2);
718        for (cg_id, table_ids_after_split) in result_vec {
719            if table_ids_after_split.contains(&table_id_to_split) {
720                target_compaction_group_id = cg_id;
721            }
722            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
723        }
724        check_table_ids_valid(&cg_id_to_table_ids);
725
726        Ok((target_compaction_group_id, cg_id_to_table_ids))
727    }
728}
729
730impl HummockManager {
731    /// Split the compaction group if the group is too large or contains high throughput tables.
732    pub async fn try_split_compaction_group(
733        &self,
734        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
735        group: CompactionGroupStatistic,
736    ) {
737        if group
738            .compaction_group_config
739            .compaction_config
740            .disable_auto_group_scheduling
741            .unwrap_or(false)
742        {
743            return;
744        }
745        // split high throughput table to dedicated compaction group
746        for (table_id, table_size) in &group.table_statistic {
747            self.try_move_high_throughput_table_to_dedicated_cg(
748                table_write_throughput_statistic_manager,
749                *table_id,
750                table_size,
751                group.group_id,
752            )
753            .await;
754        }
755
756        // split the huge group to multiple groups
757        self.try_split_huge_compaction_group(group).await;
758    }
759
760    /// Try to move the high throughput table to a dedicated compaction group.
761    pub async fn try_move_high_throughput_table_to_dedicated_cg(
762        &self,
763        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
764        table_id: u32,
765        _table_size: &u64,
766        parent_group_id: u64,
767    ) {
768        let mut table_throughput = table_write_throughput_statistic_manager
769            .get_table_throughput_descending(
770                table_id,
771                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
772            )
773            .peekable();
774
775        if table_throughput.peek().is_none() {
776            return;
777        }
778
779        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
780            table_throughput,
781            self.env.opts.table_high_write_throughput_threshold,
782            self.env
783                .opts
784                .table_stat_high_write_throughput_ratio_for_split,
785        );
786
787        // do not split a table to dedicated compaction group if it is not high write throughput
788        if !is_high_write_throughput {
789            return;
790        }
791
792        let ret = self
793            .move_state_tables_to_dedicated_compaction_group(
794                parent_group_id,
795                &[table_id],
796                Some(self.env.opts.partition_vnode_count),
797            )
798            .await;
799        match ret {
800            Ok(split_result) => {
801                tracing::info!(
802                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
803                    table_id,
804                    parent_group_id,
805                    self.env.opts.partition_vnode_count,
806                    split_result
807                );
808            }
809            Err(e) => {
810                tracing::info!(
811                    error = %e.as_report(),
812                    "failed to split state table [{}] from group-{}",
813                    table_id,
814                    parent_group_id,
815                )
816            }
817        }
818    }
819
820    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
821        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
822            * self.env.opts.split_group_size_ratio) as u64;
823        let is_huge_hybrid_group =
824            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
825        if is_huge_hybrid_group {
826            let mut accumulated_size = 0;
827            let mut table_ids = Vec::default();
828            for (table_id, table_size) in &group.table_statistic {
829                accumulated_size += table_size;
830                table_ids.push(*table_id);
831                // split if the accumulated size is greater than half of the group size
832                // avoid split a small table to dedicated compaction group and trigger multiple merge
833                assert!(table_ids.is_sorted());
834                if accumulated_size * 2 > group_max_size {
835                    let ret = self
836                        .move_state_tables_to_dedicated_compaction_group(
837                            group.group_id,
838                            &table_ids,
839                            None,
840                        )
841                        .await;
842                    match ret {
843                        Ok(split_result) => {
844                            tracing::info!(
845                                "split_huge_compaction_group success {:?}",
846                                split_result
847                            );
848                            self.metrics
849                                .split_compaction_group_count
850                                .with_label_values(&[&group.group_id.to_string()])
851                                .inc();
852                            return;
853                        }
854                        Err(e) => {
855                            tracing::error!(
856                                error = %e.as_report(),
857                                "failed to split_huge_compaction_group table {:?} from group-{}",
858                                table_ids,
859                                group.group_id
860                            );
861
862                            return;
863                        }
864                    }
865                }
866            }
867        }
868    }
869
870    pub async fn try_merge_compaction_group(
871        &self,
872        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
873        group: &CompactionGroupStatistic,
874        next_group: &CompactionGroupStatistic,
875        created_tables: &HashSet<u32>,
876    ) -> Result<()> {
877        GroupMergeValidator::validate_group_merge(
878            group,
879            next_group,
880            created_tables,
881            table_write_throughput_statistic_manager,
882            &self.env.opts,
883            &self.versioning,
884        )
885        .await?;
886
887        match self
888            .merge_compaction_group(group.group_id, next_group.group_id)
889            .await
890        {
891            Ok(()) => {
892                tracing::info!(
893                    "merge group-{} to group-{}",
894                    next_group.group_id,
895                    group.group_id,
896                );
897
898                self.metrics
899                    .merge_compaction_group_count
900                    .with_label_values(&[&group.group_id.to_string()])
901                    .inc();
902            }
903            Err(e) => {
904                tracing::info!(
905                    error = %e.as_report(),
906                    "failed to merge group-{} group-{}",
907                    next_group.group_id,
908                    group.group_id,
909                )
910            }
911        }
912
913        Ok(())
914    }
915}
916
917#[derive(Debug, Default)]
918struct GroupMergeValidator {}
919
920impl GroupMergeValidator {
921    /// Check if the table is high write throughput with the given threshold and ratio.
922    pub fn is_table_high_write_throughput(
923        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
924        threshold: u64,
925        high_write_throughput_ratio: f64,
926    ) -> bool {
927        let mut sample_size = 0;
928        let mut high_write_throughput_count = 0;
929        for statistic in table_throughput {
930            sample_size += 1;
931            if statistic.throughput > threshold {
932                high_write_throughput_count += 1;
933            }
934        }
935
936        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
937    }
938
939    pub fn is_table_low_write_throughput(
940        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
941        threshold: u64,
942        low_write_throughput_ratio: f64,
943    ) -> bool {
944        let mut sample_size = 0;
945        let mut low_write_throughput_count = 0;
946        for statistic in table_throughput {
947            sample_size += 1;
948            if statistic.throughput <= threshold {
949                low_write_throughput_count += 1;
950            }
951        }
952
953        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
954    }
955
956    fn check_is_low_write_throughput_compaction_group(
957        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
958        group: &CompactionGroupStatistic,
959        opts: &Arc<MetaOpts>,
960    ) -> bool {
961        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
962        for table_id in group.table_statistic.keys() {
963            let mut table_throughput = table_write_throughput_statistic_manager
964                .get_table_throughput_descending(
965                    *table_id,
966                    opts.table_stat_throuput_window_seconds_for_merge as i64,
967                )
968                .peekable();
969            if table_throughput.peek().is_none() {
970                continue;
971            }
972
973            table_with_statistic.push(table_throughput);
974        }
975
976        // if all tables in the group do not have enough statistics, return true
977        if table_with_statistic.is_empty() {
978            return true;
979        }
980
981        // check if all tables in the group are low write throughput with enough statistics
982        table_with_statistic.into_iter().all(|table_throughput| {
983            Self::is_table_low_write_throughput(
984                table_throughput,
985                opts.table_low_write_throughput_threshold,
986                opts.table_stat_low_write_throughput_ratio_for_merge,
987            )
988        })
989    }
990
991    fn check_is_creating_compaction_group(
992        group: &CompactionGroupStatistic,
993        created_tables: &HashSet<u32>,
994    ) -> bool {
995        group
996            .table_statistic
997            .keys()
998            .any(|table_id| !created_tables.contains(table_id))
999    }
1000
1001    async fn validate_group_merge(
1002        group: &CompactionGroupStatistic,
1003        next_group: &CompactionGroupStatistic,
1004        created_tables: &HashSet<u32>,
1005        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1006        opts: &Arc<MetaOpts>,
1007        versioning: &MonitoredRwLock<Versioning>,
1008    ) -> Result<()> {
1009        // TODO: remove this check after refactor group id
1010        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1011            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1012            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1013                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1014        {
1015            return Err(Error::CompactionGroup(format!(
1016                "group-{} and group-{} are both StaticCompactionGroupId",
1017                group.group_id, next_group.group_id
1018            )));
1019        }
1020
1021        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1022            return Err(Error::CompactionGroup(format!(
1023                "group-{} or group-{} is empty",
1024                group.group_id, next_group.group_id
1025            )));
1026        }
1027
1028        if group
1029            .compaction_group_config
1030            .compaction_config
1031            .disable_auto_group_scheduling
1032            .unwrap_or(false)
1033            || next_group
1034                .compaction_group_config
1035                .compaction_config
1036                .disable_auto_group_scheduling
1037                .unwrap_or(false)
1038        {
1039            return Err(Error::CompactionGroup(format!(
1040                "group-{} or group-{} disable_auto_group_scheduling",
1041                group.group_id, next_group.group_id
1042            )));
1043        }
1044
1045        // do not merge the compaction group which is creating
1046        if Self::check_is_creating_compaction_group(group, created_tables) {
1047            return Err(Error::CompactionGroup(format!(
1048                "Not Merge creating group {} next_group {}",
1049                group.group_id, next_group.group_id
1050            )));
1051        }
1052
1053        // do not merge high throughput group
1054        if !Self::check_is_low_write_throughput_compaction_group(
1055            table_write_throughput_statistic_manager,
1056            group,
1057            opts,
1058        ) {
1059            return Err(Error::CompactionGroup(format!(
1060                "Not Merge high throughput group {} next_group {}",
1061                group.group_id, next_group.group_id
1062            )));
1063        }
1064
1065        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1066            * opts.split_group_size_ratio) as u64;
1067
1068        if (group.group_size + next_group.group_size) > size_limit {
1069            return Err(Error::CompactionGroup(format!(
1070                "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1071                group.group_id,
1072                group.group_size,
1073                next_group.group_id,
1074                next_group.group_size,
1075                size_limit
1076            )));
1077        }
1078
1079        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1080            return Err(Error::CompactionGroup(format!(
1081                "Not Merge creating group {} next group {}",
1082                group.group_id, next_group.group_id
1083            )));
1084        }
1085
1086        if !Self::check_is_low_write_throughput_compaction_group(
1087            table_write_throughput_statistic_manager,
1088            next_group,
1089            opts,
1090        ) {
1091            return Err(Error::CompactionGroup(format!(
1092                "Not Merge high throughput group {} next group {}",
1093                group.group_id, next_group.group_id
1094            )));
1095        }
1096
1097        {
1098            // Avoid merge when the group is in emergency state
1099            let versioning_guard = versioning.read().await;
1100            let levels = &versioning_guard.current_version.levels;
1101            if !levels.contains_key(&group.group_id) {
1102                return Err(Error::CompactionGroup(format!(
1103                    "Not Merge group {} not exist",
1104                    group.group_id
1105                )));
1106            }
1107
1108            if !levels.contains_key(&next_group.group_id) {
1109                return Err(Error::CompactionGroup(format!(
1110                    "Not Merge next group {} not exist",
1111                    next_group.group_id
1112                )));
1113            }
1114
1115            let group_levels = versioning_guard
1116                .current_version
1117                .get_compaction_group_levels(group.group_id);
1118
1119            let next_group_levels = versioning_guard
1120                .current_version
1121                .get_compaction_group_levels(next_group.group_id);
1122
1123            let group_state = GroupStateValidator::group_state(
1124                group_levels,
1125                group.compaction_group_config.compaction_config().deref(),
1126            );
1127
1128            if group_state.is_write_stop() || group_state.is_emergency() {
1129                return Err(Error::CompactionGroup(format!(
1130                    "Not Merge write limit group {} next group {}",
1131                    group.group_id, next_group.group_id
1132                )));
1133            }
1134
1135            let next_group_state = GroupStateValidator::group_state(
1136                next_group_levels,
1137                next_group
1138                    .compaction_group_config
1139                    .compaction_config()
1140                    .deref(),
1141            );
1142
1143            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1144                return Err(Error::CompactionGroup(format!(
1145                    "Not Merge write limit next group {} group {}",
1146                    next_group.group_id, group.group_id
1147                )));
1148            }
1149
1150            // check whether the group is in the write stop state after merge
1151            let l0_sub_level_count_after_merge =
1152                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1153            if GroupStateValidator::write_stop_l0_file_count(
1154                (l0_sub_level_count_after_merge as f64
1155                    * opts.compaction_group_merge_dimension_threshold) as usize,
1156                group.compaction_group_config.compaction_config().deref(),
1157            ) {
1158                return Err(Error::CompactionGroup(format!(
1159                    "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1160                    group.group_id, next_group.group_id
1161                )));
1162            }
1163
1164            let l0_file_count_after_merge =
1165                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1166            if GroupStateValidator::write_stop_l0_file_count(
1167                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1168                    as usize,
1169                group.compaction_group_config.compaction_config().deref(),
1170            ) {
1171                return Err(Error::CompactionGroup(format!(
1172                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1173                    next_group.group_id, group.group_id
1174                )));
1175            }
1176
1177            let l0_size_after_merge =
1178                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1179
1180            if GroupStateValidator::write_stop_l0_size(
1181                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1182                    as u64,
1183                group.compaction_group_config.compaction_config().deref(),
1184            ) {
1185                return Err(Error::CompactionGroup(format!(
1186                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1187                    next_group.group_id, group.group_id
1188                )));
1189            }
1190
1191            // check whether the group is in the emergency state after merge
1192            if GroupStateValidator::emergency_l0_file_count(
1193                (l0_sub_level_count_after_merge as f64
1194                    * opts.compaction_group_merge_dimension_threshold) as usize,
1195                group.compaction_group_config.compaction_config().deref(),
1196            ) {
1197                return Err(Error::CompactionGroup(format!(
1198                    "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1199                    group.group_id, next_group.group_id
1200                )));
1201            }
1202        }
1203
1204        Ok(())
1205    }
1206}