risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<u32>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<u32>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            assert!(cfg!(debug_assertions));
122            created_tables
123        } else {
124            match self.metadata_manager.get_created_table_ids().await {
125                Ok(created_tables) => HashSet::from_iter(created_tables),
126                Err(err) => {
127                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
128                    return Err(Error::CompactionGroup(format!(
129                        "merge group_1 {} group_2 {} failed to fetch created table ids",
130                        group_1, group_2
131                    )));
132                }
133            }
134        };
135
136        fn contains_creating_table(
137            table_ids: &Vec<TableId>,
138            created_tables: &HashSet<u32>,
139        ) -> bool {
140            table_ids
141                .iter()
142                .any(|table_id| !created_tables.contains(&table_id.table_id()))
143        }
144
145        // do not merge the compaction group which is creating
146        if contains_creating_table(&member_table_ids_1, &created_tables)
147            || contains_creating_table(&member_table_ids_2, &created_tables)
148        {
149            return Err(Error::CompactionGroup(format!(
150                "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
151                group_1, group_2, member_table_ids_1, member_table_ids_2
152            )));
153        }
154
155        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
156        let (left_group_id, right_group_id) =
157            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
158                (group_1, group_2)
159            } else {
160                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
161                (group_2, group_1)
162            };
163
164        // We can only merge two groups with non-overlapping member table ids
165        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
166            return Err(Error::CompactionGroup(format!(
167                "invalid merge group_1 {} group_2 {}",
168                left_group_id, right_group_id
169            )));
170        }
171
172        let combined_member_table_ids = member_table_ids_1
173            .iter()
174            .chain(member_table_ids_2.iter())
175            .collect_vec();
176        assert!(combined_member_table_ids.is_sorted());
177
178        // check duplicated sst_id
179        let mut sst_id_set = HashSet::new();
180        for sst_id in versioning
181            .current_version
182            .get_sst_ids_by_group_id(left_group_id)
183            .chain(
184                versioning
185                    .current_version
186                    .get_sst_ids_by_group_id(right_group_id),
187            )
188        {
189            if !sst_id_set.insert(sst_id) {
190                return Err(Error::CompactionGroup(format!(
191                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
192                    left_group_id, right_group_id, sst_id
193                )));
194            }
195        }
196
197        // check branched sst on non-overlap level
198        {
199            let left_levels = versioning
200                .current_version
201                .get_compaction_group_levels(group_1);
202
203            let right_levels = versioning
204                .current_version
205                .get_compaction_group_levels(group_2);
206
207            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
208            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
209            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
210            for level_idx in 1..=max_level {
211                let left_level = left_levels.get_level(level_idx);
212                let right_level = right_levels.get_level(level_idx);
213                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
214                    continue;
215                }
216
217                let left_last_sst = left_level.table_infos.last().unwrap().clone();
218                let right_first_sst = right_level.table_infos.first().unwrap().clone();
219                let left_sst_id = left_last_sst.sst_id;
220                let right_sst_id = right_first_sst.sst_id;
221                let left_obj_id = left_last_sst.object_id;
222                let right_obj_id = right_first_sst.object_id;
223
224                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
225                if !can_concat(&[left_last_sst, right_first_sst]) {
226                    return Err(Error::CompactionGroup(format!(
227                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
228                        left_group_id,
229                        right_group_id,
230                        level_idx,
231                        left_sst_id,
232                        right_sst_id,
233                        left_obj_id,
234                        right_obj_id
235                    )));
236                }
237            }
238        }
239
240        let mut version = HummockVersionTransaction::new(
241            &mut versioning.current_version,
242            &mut versioning.hummock_version_deltas,
243            self.env.notification_manager(),
244            None,
245            &self.metrics,
246        );
247        let mut new_version_delta = version.new_delta();
248
249        let target_compaction_group_id = {
250            // merge right_group_id to left_group_id and remove right_group_id
251            new_version_delta.group_deltas.insert(
252                left_group_id,
253                GroupDeltas {
254                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
255                        left_group_id,
256                        right_group_id,
257                    })],
258                },
259            );
260            left_group_id
261        };
262
263        // TODO: remove compaciton group_id from state_table_info
264        // rewrite compaction_group_id for all tables
265        new_version_delta.with_latest_version(|version, new_version_delta| {
266            for table_id in combined_member_table_ids {
267                let table_id = TableId::new(table_id.table_id());
268                let info = version
269                    .state_table_info
270                    .info()
271                    .get(&table_id)
272                    .expect("have check exist previously");
273                assert!(
274                    new_version_delta
275                        .state_table_info_delta
276                        .insert(
277                            table_id,
278                            PbStateTableInfoDelta {
279                                committed_epoch: info.committed_epoch,
280                                compaction_group_id: target_compaction_group_id,
281                            }
282                        )
283                        .is_none()
284                );
285            }
286        });
287
288        {
289            let mut compaction_group_manager = self.compaction_group_manager.write().await;
290            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
291
292            // for metrics reclaim
293            {
294                let right_group_max_level = new_version_delta
295                    .latest_version()
296                    .get_compaction_group_levels(right_group_id)
297                    .levels
298                    .len();
299
300                remove_compaction_group_in_sst_stat(
301                    &self.metrics,
302                    right_group_id,
303                    right_group_max_level,
304                );
305            }
306
307            // clear `partition_vnode_count` for the hybrid group
308            {
309                if let Err(err) = compaction_groups_txn.update_compaction_config(
310                    &[left_group_id],
311                    &[MutableConfig::SplitWeightByVnode(0)], // default
312                ) {
313                    tracing::error!(
314                        error = %err.as_report(),
315                        "failed to update compaction config for group-{}",
316                        left_group_id
317                    );
318                }
319            }
320
321            new_version_delta.pre_apply();
322
323            // remove right_group_id
324            compaction_groups_txn.remove(right_group_id);
325            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
326        }
327
328        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
329        versioning.mark_next_time_travel_version_snapshot();
330
331        // cancel tasks
332        let mut canceled_tasks = vec![];
333        // after merge, all tasks in right_group_id should be canceled
334        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
335        let compact_task_assignments =
336            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
337        compact_task_assignments
338            .into_iter()
339            .for_each(|task_assignment| {
340                if let Some(task) = task_assignment.compact_task.as_ref() {
341                    assert_eq!(task.compaction_group_id, right_group_id);
342                    canceled_tasks.push(ReportTask {
343                        task_id: task.task_id,
344                        task_status: TaskStatus::ManualCanceled,
345                        table_stats_change: HashMap::default(),
346                        sorted_output_ssts: vec![],
347                        object_timestamps: HashMap::default(),
348                    });
349                }
350            });
351
352        if !canceled_tasks.is_empty() {
353            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
354                .await?;
355        }
356
357        self.metrics
358            .merge_compaction_group_count
359            .with_label_values(&[&left_group_id.to_string()])
360            .inc();
361
362        Ok(())
363    }
364}
365
366impl HummockManager {
367    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
368    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
369    /// The split will follow the following rules
370    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
371    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
372    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
373    ///   - `parent_group_id`: the `group_id` to split
374    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
375    ///   - `table_id_to_split`: the `table_id` to split
376    ///   - `vnode_to_split`: the `vnode` to split
377    ///   - `partition_vnode_count`: the partition count for the single table group if need
378    async fn split_compaction_group_impl(
379        &self,
380        parent_group_id: CompactionGroupId,
381        split_table_ids: &[StateTableId],
382        table_id_to_split: StateTableId,
383        vnode_to_split: VirtualNode,
384        partition_vnode_count: Option<u32>,
385    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
386        let mut result = vec![];
387        let compaction_guard = self.compaction.write().await;
388        let mut versioning_guard = self.versioning.write().await;
389        let versioning = versioning_guard.deref_mut();
390        // Validate parameters.
391        if !versioning
392            .current_version
393            .levels
394            .contains_key(&parent_group_id)
395        {
396            return Err(Error::CompactionGroup(format!(
397                "invalid group {}",
398                parent_group_id
399            )));
400        }
401
402        let member_table_ids = versioning
403            .current_version
404            .state_table_info
405            .compaction_group_member_table_ids(parent_group_id)
406            .iter()
407            .map(|table_id| table_id.table_id)
408            .collect::<BTreeSet<_>>();
409
410        if !member_table_ids.contains(&table_id_to_split) {
411            return Err(Error::CompactionGroup(format!(
412                "table {} doesn't in group {}",
413                table_id_to_split, parent_group_id
414            )));
415        }
416
417        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
418
419        // change to vec for partition
420        let table_ids = member_table_ids.into_iter().collect_vec();
421        if table_ids == split_table_ids {
422            return Err(Error::CompactionGroup(format!(
423                "invalid split attempt for group {}: all member tables are moved",
424                parent_group_id
425            )));
426        }
427        // avoid decode split_key when caller is aware of the table_id and vnode
428        let (table_ids_left, table_ids_right) =
429            group_split::split_table_ids_with_table_id_and_vnode(
430                &table_ids,
431                split_full_key.user_key.table_id.table_id(),
432                split_full_key.user_key.get_vnode_id(),
433            );
434        if table_ids_left.is_empty() || table_ids_right.is_empty() {
435            // not need to split group if all tables are in the same side
436            if !table_ids_left.is_empty() {
437                result.push((parent_group_id, table_ids_left));
438            }
439
440            if !table_ids_right.is_empty() {
441                result.push((parent_group_id, table_ids_right));
442            }
443            return Ok(result);
444        }
445
446        result.push((parent_group_id, table_ids_left));
447
448        let split_key: Bytes = split_full_key.encode().into();
449
450        let mut version = HummockVersionTransaction::new(
451            &mut versioning.current_version,
452            &mut versioning.hummock_version_deltas,
453            self.env.notification_manager(),
454            None,
455            &self.metrics,
456        );
457        let mut new_version_delta = version.new_delta();
458
459        let split_sst_count = new_version_delta
460            .latest_version()
461            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
462
463        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
464        let (new_compaction_group_id, config) = {
465            // All NewCompactionGroup pairs are mapped to one new compaction group.
466            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
467            // The new config will be persisted later.
468            let config = self
469                .compaction_group_manager
470                .read()
471                .await
472                .default_compaction_config()
473                .as_ref()
474                .clone();
475
476            #[expect(deprecated)]
477            // fill the deprecated field with default value
478            new_version_delta.group_deltas.insert(
479                new_compaction_group_id,
480                GroupDeltas {
481                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
482                        group_config: Some(config.clone()),
483                        group_id: new_compaction_group_id,
484                        parent_group_id,
485                        new_sst_start_id: new_sst_start_id.inner(),
486                        table_ids: vec![],
487                        version: CompatibilityVersion::LATEST as _, // for compatibility
488                        split_key: Some(split_key.into()),
489                    }))],
490                },
491            );
492            (new_compaction_group_id, config)
493        };
494
495        new_version_delta.with_latest_version(|version, new_version_delta| {
496            for table_id in &table_ids_right {
497                let table_id = TableId::new(*table_id);
498                let info = version
499                    .state_table_info
500                    .info()
501                    .get(&table_id)
502                    .expect("have check exist previously");
503                assert!(
504                    new_version_delta
505                        .state_table_info_delta
506                        .insert(
507                            table_id,
508                            PbStateTableInfoDelta {
509                                committed_epoch: info.committed_epoch,
510                                compaction_group_id: new_compaction_group_id,
511                            }
512                        )
513                        .is_none()
514                );
515            }
516        });
517
518        result.push((new_compaction_group_id, table_ids_right));
519
520        {
521            let mut compaction_group_manager = self.compaction_group_manager.write().await;
522            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
523            compaction_groups_txn
524                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
525
526            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
527            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
528            // The target `table_ids` might be split to an existing group, so we need to try to update its config
529            for (cg_id, table_ids) in &result {
530                // check the split_tables had been place to the dedicated compaction group
531                if let Some(partition_vnode_count) = partition_vnode_count
532                    && table_ids.len() == 1
533                    && table_ids == split_table_ids
534                    && let Err(err) = compaction_groups_txn.update_compaction_config(
535                        &[*cg_id],
536                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
537                    )
538                {
539                    tracing::error!(
540                        error = %err.as_report(),
541                        "failed to update compaction config for group-{}",
542                        cg_id
543                    );
544                }
545            }
546
547            new_version_delta.pre_apply();
548            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
549        }
550        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
551        versioning.mark_next_time_travel_version_snapshot();
552
553        // The expired compact tasks will be canceled.
554        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
555        let mut canceled_tasks = vec![];
556        let compact_task_assignments =
557            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
558        let levels = versioning
559            .current_version
560            .get_compaction_group_levels(parent_group_id);
561        compact_task_assignments
562            .into_iter()
563            .for_each(|task_assignment| {
564                if let Some(task) = task_assignment.compact_task.as_ref() {
565                    let is_expired = is_compaction_task_expired(
566                        task.compaction_group_version_id,
567                        levels.compaction_group_version_id,
568                    );
569                    if is_expired {
570                        canceled_tasks.push(ReportTask {
571                            task_id: task.task_id,
572                            task_status: TaskStatus::ManualCanceled,
573                            table_stats_change: HashMap::default(),
574                            sorted_output_ssts: vec![],
575                            object_timestamps: HashMap::default(),
576                        });
577                    }
578                }
579            });
580
581        if !canceled_tasks.is_empty() {
582            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
583                .await?;
584        }
585
586        self.metrics
587            .split_compaction_group_count
588            .with_label_values(&[&parent_group_id.to_string()])
589            .inc();
590
591        Ok(result)
592    }
593
594    /// Split `table_ids` to a dedicated compaction group.
595    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
596    pub async fn move_state_tables_to_dedicated_compaction_group(
597        &self,
598        parent_group_id: CompactionGroupId,
599        table_ids: &[StateTableId],
600        partition_vnode_count: Option<u32>,
601    ) -> Result<(
602        CompactionGroupId,
603        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
604    )> {
605        if table_ids.is_empty() {
606            return Err(Error::CompactionGroup(
607                "table_ids must not be empty".to_owned(),
608            ));
609        }
610
611        if !table_ids.is_sorted() {
612            return Err(Error::CompactionGroup(
613                "table_ids must be sorted".to_owned(),
614            ));
615        }
616
617        let parent_table_ids = {
618            let versioning_guard = self.versioning.read().await;
619            versioning_guard
620                .current_version
621                .state_table_info
622                .compaction_group_member_table_ids(parent_group_id)
623                .iter()
624                .map(|table_id| table_id.table_id)
625                .collect_vec()
626        };
627
628        if parent_table_ids == table_ids {
629            return Err(Error::CompactionGroup(format!(
630                "invalid split attempt for group {}: all member tables are moved",
631                parent_group_id
632            )));
633        }
634
635        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
636            // 1. table_ids in different cg are sorted.
637            {
638                cg_id_to_table_ids
639                    .iter()
640                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
641            }
642
643            // 2.table_ids in different cg are non-overlapping
644            {
645                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
646                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
647                assert!(table_table_ids_vec.concat().is_sorted());
648            }
649
650            // 3.table_ids belong to one and only one cg.
651            {
652                let mut all_table_ids = HashSet::new();
653                for table_ids in cg_id_to_table_ids.values() {
654                    for table_id in table_ids {
655                        assert!(all_table_ids.insert(*table_id));
656                    }
657                }
658            }
659        }
660
661        // move [3,4,5,6]
662        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
663        // split key
664        // 1. table_id = 3, vnode = 0, epoch = MAX
665        // 2. table_id = 7, vnode = 0, epoch = MAX
666
667        // The new compaction group id is always generate on the right side
668        // Hence, we return the first compaction group id as the result
669        // split 1
670        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
671        let table_id_to_split = *table_ids.first().unwrap();
672        let mut target_compaction_group_id = 0;
673        let result_vec = self
674            .split_compaction_group_impl(
675                parent_group_id,
676                table_ids,
677                table_id_to_split,
678                VirtualNode::ZERO,
679                partition_vnode_count,
680            )
681            .await?;
682        assert!(result_vec.len() <= 2);
683
684        let mut finish_move = false;
685        for (cg_id, table_ids_after_split) in result_vec {
686            if table_ids_after_split.contains(&table_id_to_split) {
687                target_compaction_group_id = cg_id;
688            }
689
690            if table_ids_after_split == table_ids {
691                finish_move = true;
692            }
693
694            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
695        }
696        check_table_ids_valid(&cg_id_to_table_ids);
697
698        if finish_move {
699            return Ok((target_compaction_group_id, cg_id_to_table_ids));
700        }
701
702        // split 2
703        // See the example above and the split rule in `split_compaction_group_impl`.
704        let table_id_to_split = *table_ids.last().unwrap();
705        let result_vec = self
706            .split_compaction_group_impl(
707                target_compaction_group_id,
708                table_ids,
709                table_id_to_split,
710                VirtualNode::MAX_REPRESENTABLE,
711                partition_vnode_count,
712            )
713            .await?;
714        assert!(result_vec.len() <= 2);
715        for (cg_id, table_ids_after_split) in result_vec {
716            if table_ids_after_split.contains(&table_id_to_split) {
717                target_compaction_group_id = cg_id;
718            }
719            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
720        }
721        check_table_ids_valid(&cg_id_to_table_ids);
722
723        Ok((target_compaction_group_id, cg_id_to_table_ids))
724    }
725}
726
727impl HummockManager {
728    /// Split the compaction group if the group is too large or contains high throughput tables.
729    pub async fn try_split_compaction_group(
730        &self,
731        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
732        group: CompactionGroupStatistic,
733    ) {
734        if group
735            .compaction_group_config
736            .compaction_config
737            .disable_auto_group_scheduling
738            .unwrap_or(false)
739        {
740            return;
741        }
742        // split high throughput table to dedicated compaction group
743        for (table_id, table_size) in &group.table_statistic {
744            self.try_move_high_throughput_table_to_dedicated_cg(
745                table_write_throughput_statistic_manager,
746                *table_id,
747                table_size,
748                group.group_id,
749            )
750            .await;
751        }
752
753        // split the huge group to multiple groups
754        self.try_split_huge_compaction_group(group).await;
755    }
756
757    /// Try to move the high throughput table to a dedicated compaction group.
758    pub async fn try_move_high_throughput_table_to_dedicated_cg(
759        &self,
760        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
761        table_id: u32,
762        _table_size: &u64,
763        parent_group_id: u64,
764    ) {
765        let mut table_throughput = table_write_throughput_statistic_manager
766            .get_table_throughput_descending(
767                table_id,
768                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
769            )
770            .peekable();
771
772        if table_throughput.peek().is_none() {
773            return;
774        }
775
776        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
777            table_throughput,
778            self.env.opts.table_high_write_throughput_threshold,
779            self.env
780                .opts
781                .table_stat_high_write_throughput_ratio_for_split,
782        );
783
784        // do not split a table to dedicated compaction group if it is not high write throughput
785        if !is_high_write_throughput {
786            return;
787        }
788
789        let ret = self
790            .move_state_tables_to_dedicated_compaction_group(
791                parent_group_id,
792                &[table_id],
793                Some(self.env.opts.partition_vnode_count),
794            )
795            .await;
796        match ret {
797            Ok(split_result) => {
798                tracing::info!(
799                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
800                    table_id,
801                    parent_group_id,
802                    self.env.opts.partition_vnode_count,
803                    split_result
804                );
805            }
806            Err(e) => {
807                tracing::info!(
808                    error = %e.as_report(),
809                    "failed to split state table [{}] from group-{}",
810                    table_id,
811                    parent_group_id,
812                )
813            }
814        }
815    }
816
817    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
818        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
819            * self.env.opts.split_group_size_ratio) as u64;
820        let is_huge_hybrid_group =
821            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
822        if is_huge_hybrid_group {
823            let mut accumulated_size = 0;
824            let mut table_ids = Vec::default();
825            for (table_id, table_size) in &group.table_statistic {
826                accumulated_size += table_size;
827                table_ids.push(*table_id);
828                // split if the accumulated size is greater than half of the group size
829                // avoid split a small table to dedicated compaction group and trigger multiple merge
830                assert!(table_ids.is_sorted());
831                if accumulated_size * 2 > group_max_size {
832                    let ret = self
833                        .move_state_tables_to_dedicated_compaction_group(
834                            group.group_id,
835                            &table_ids,
836                            None,
837                        )
838                        .await;
839                    match ret {
840                        Ok(split_result) => {
841                            tracing::info!(
842                                "split_huge_compaction_group success {:?}",
843                                split_result
844                            );
845                            self.metrics
846                                .split_compaction_group_count
847                                .with_label_values(&[&group.group_id.to_string()])
848                                .inc();
849                            return;
850                        }
851                        Err(e) => {
852                            tracing::error!(
853                                error = %e.as_report(),
854                                "failed to split_huge_compaction_group table {:?} from group-{}",
855                                table_ids,
856                                group.group_id
857                            );
858
859                            return;
860                        }
861                    }
862                }
863            }
864        }
865    }
866
867    pub async fn try_merge_compaction_group(
868        &self,
869        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
870        group: &CompactionGroupStatistic,
871        next_group: &CompactionGroupStatistic,
872        created_tables: &HashSet<u32>,
873    ) -> Result<()> {
874        GroupMergeValidator::validate_group_merge(
875            group,
876            next_group,
877            created_tables,
878            table_write_throughput_statistic_manager,
879            &self.env.opts,
880            &self.versioning,
881        )
882        .await?;
883
884        match self
885            .merge_compaction_group(group.group_id, next_group.group_id)
886            .await
887        {
888            Ok(()) => {
889                tracing::info!(
890                    "merge group-{} to group-{}",
891                    next_group.group_id,
892                    group.group_id,
893                );
894
895                self.metrics
896                    .merge_compaction_group_count
897                    .with_label_values(&[&group.group_id.to_string()])
898                    .inc();
899            }
900            Err(e) => {
901                tracing::info!(
902                    error = %e.as_report(),
903                    "failed to merge group-{} group-{}",
904                    next_group.group_id,
905                    group.group_id,
906                )
907            }
908        }
909
910        Ok(())
911    }
912}
913
914#[derive(Debug, Default)]
915struct GroupMergeValidator {}
916
917impl GroupMergeValidator {
918    /// Check if the table is high write throughput with the given threshold and ratio.
919    pub fn is_table_high_write_throughput(
920        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
921        threshold: u64,
922        high_write_throughput_ratio: f64,
923    ) -> bool {
924        let mut sample_size = 0;
925        let mut high_write_throughput_count = 0;
926        for statistic in table_throughput {
927            sample_size += 1;
928            if statistic.throughput > threshold {
929                high_write_throughput_count += 1;
930            }
931        }
932
933        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
934    }
935
936    pub fn is_table_low_write_throughput(
937        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
938        threshold: u64,
939        low_write_throughput_ratio: f64,
940    ) -> bool {
941        let mut sample_size = 0;
942        let mut low_write_throughput_count = 0;
943        for statistic in table_throughput {
944            sample_size += 1;
945            if statistic.throughput <= threshold {
946                low_write_throughput_count += 1;
947            }
948        }
949
950        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
951    }
952
953    fn check_is_low_write_throughput_compaction_group(
954        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
955        group: &CompactionGroupStatistic,
956        opts: &Arc<MetaOpts>,
957    ) -> bool {
958        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
959        for table_id in group.table_statistic.keys() {
960            let mut table_throughput = table_write_throughput_statistic_manager
961                .get_table_throughput_descending(
962                    *table_id,
963                    opts.table_stat_throuput_window_seconds_for_merge as i64,
964                )
965                .peekable();
966            if table_throughput.peek().is_none() {
967                continue;
968            }
969
970            table_with_statistic.push(table_throughput);
971        }
972
973        // if all tables in the group do not have enough statistics, return true
974        if table_with_statistic.is_empty() {
975            return true;
976        }
977
978        // check if all tables in the group are low write throughput with enough statistics
979        table_with_statistic.into_iter().all(|table_throughput| {
980            Self::is_table_low_write_throughput(
981                table_throughput,
982                opts.table_low_write_throughput_threshold,
983                opts.table_stat_low_write_throughput_ratio_for_merge,
984            )
985        })
986    }
987
988    fn check_is_creating_compaction_group(
989        group: &CompactionGroupStatistic,
990        created_tables: &HashSet<u32>,
991    ) -> bool {
992        group
993            .table_statistic
994            .keys()
995            .any(|table_id| !created_tables.contains(table_id))
996    }
997
998    async fn validate_group_merge(
999        group: &CompactionGroupStatistic,
1000        next_group: &CompactionGroupStatistic,
1001        created_tables: &HashSet<u32>,
1002        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1003        opts: &Arc<MetaOpts>,
1004        versioning: &MonitoredRwLock<Versioning>,
1005    ) -> Result<()> {
1006        // TODO: remove this check after refactor group id
1007        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1008            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1009            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1010                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1011        {
1012            return Err(Error::CompactionGroup(format!(
1013                "group-{} and group-{} are both StaticCompactionGroupId",
1014                group.group_id, next_group.group_id
1015            )));
1016        }
1017
1018        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1019            return Err(Error::CompactionGroup(format!(
1020                "group-{} or group-{} is empty",
1021                group.group_id, next_group.group_id
1022            )));
1023        }
1024
1025        if group
1026            .compaction_group_config
1027            .compaction_config
1028            .disable_auto_group_scheduling
1029            .unwrap_or(false)
1030            || next_group
1031                .compaction_group_config
1032                .compaction_config
1033                .disable_auto_group_scheduling
1034                .unwrap_or(false)
1035        {
1036            return Err(Error::CompactionGroup(format!(
1037                "group-{} or group-{} disable_auto_group_scheduling",
1038                group.group_id, next_group.group_id
1039            )));
1040        }
1041
1042        // do not merge the compaction group which is creating
1043        if Self::check_is_creating_compaction_group(group, created_tables) {
1044            return Err(Error::CompactionGroup(format!(
1045                "Not Merge creating group {} next_group {}",
1046                group.group_id, next_group.group_id
1047            )));
1048        }
1049
1050        // do not merge high throughput group
1051        if !Self::check_is_low_write_throughput_compaction_group(
1052            table_write_throughput_statistic_manager,
1053            group,
1054            opts,
1055        ) {
1056            return Err(Error::CompactionGroup(format!(
1057                "Not Merge high throughput group {} next_group {}",
1058                group.group_id, next_group.group_id
1059            )));
1060        }
1061
1062        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1063            * opts.split_group_size_ratio) as u64;
1064
1065        if (group.group_size + next_group.group_size) > size_limit {
1066            return Err(Error::CompactionGroup(format!(
1067                "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1068                group.group_id,
1069                group.group_size,
1070                next_group.group_id,
1071                next_group.group_size,
1072                size_limit
1073            )));
1074        }
1075
1076        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1077            return Err(Error::CompactionGroup(format!(
1078                "Not Merge creating group {} next group {}",
1079                group.group_id, next_group.group_id
1080            )));
1081        }
1082
1083        if !Self::check_is_low_write_throughput_compaction_group(
1084            table_write_throughput_statistic_manager,
1085            next_group,
1086            opts,
1087        ) {
1088            return Err(Error::CompactionGroup(format!(
1089                "Not Merge high throughput group {} next group {}",
1090                group.group_id, next_group.group_id
1091            )));
1092        }
1093
1094        {
1095            // Avoid merge when the group is in emergency state
1096            let versioning_guard = versioning.read().await;
1097            let levels = &versioning_guard.current_version.levels;
1098            if !levels.contains_key(&group.group_id) {
1099                return Err(Error::CompactionGroup(format!(
1100                    "Not Merge group {} not exist",
1101                    group.group_id
1102                )));
1103            }
1104
1105            if !levels.contains_key(&next_group.group_id) {
1106                return Err(Error::CompactionGroup(format!(
1107                    "Not Merge next group {} not exist",
1108                    next_group.group_id
1109                )));
1110            }
1111
1112            let group_levels = versioning_guard
1113                .current_version
1114                .get_compaction_group_levels(group.group_id);
1115
1116            let next_group_levels = versioning_guard
1117                .current_version
1118                .get_compaction_group_levels(next_group.group_id);
1119
1120            let group_state = GroupStateValidator::group_state(
1121                group_levels,
1122                group.compaction_group_config.compaction_config().deref(),
1123            );
1124
1125            if group_state.is_write_stop() || group_state.is_emergency() {
1126                return Err(Error::CompactionGroup(format!(
1127                    "Not Merge write limit group {} next group {}",
1128                    group.group_id, next_group.group_id
1129                )));
1130            }
1131
1132            let next_group_state = GroupStateValidator::group_state(
1133                next_group_levels,
1134                next_group
1135                    .compaction_group_config
1136                    .compaction_config()
1137                    .deref(),
1138            );
1139
1140            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1141                return Err(Error::CompactionGroup(format!(
1142                    "Not Merge write limit next group {} group {}",
1143                    next_group.group_id, group.group_id
1144                )));
1145            }
1146
1147            // check whether the group is in the write stop state after merge
1148            let l0_sub_level_count_after_merge =
1149                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1150            if GroupStateValidator::write_stop_l0_file_count(
1151                (l0_sub_level_count_after_merge as f64
1152                    * opts.compaction_group_merge_dimension_threshold) as usize,
1153                group.compaction_group_config.compaction_config().deref(),
1154            ) {
1155                return Err(Error::CompactionGroup(format!(
1156                    "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1157                    group.group_id, next_group.group_id
1158                )));
1159            }
1160
1161            let l0_file_count_after_merge =
1162                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1163            if GroupStateValidator::write_stop_l0_file_count(
1164                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1165                    as usize,
1166                group.compaction_group_config.compaction_config().deref(),
1167            ) {
1168                return Err(Error::CompactionGroup(format!(
1169                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1170                    next_group.group_id, group.group_id
1171                )));
1172            }
1173
1174            let l0_size_after_merge =
1175                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1176
1177            if GroupStateValidator::write_stop_l0_size(
1178                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1179                    as u64,
1180                group.compaction_group_config.compaction_config().deref(),
1181            ) {
1182                return Err(Error::CompactionGroup(format!(
1183                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1184                    next_group.group_id, group.group_id
1185                )));
1186            }
1187
1188            // check whether the group is in the emergency state after merge
1189            if GroupStateValidator::emergency_l0_file_count(
1190                (l0_sub_level_count_after_merge as f64
1191                    * opts.compaction_group_merge_dimension_threshold) as usize,
1192                group.compaction_group_config.compaction_config().deref(),
1193            ) {
1194                return Err(Error::CompactionGroup(format!(
1195                    "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1196                    group.group_id, next_group.group_id
1197                )));
1198            }
1199        }
1200
1201        Ok(())
1202    }
1203}