risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<TableId>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<TableId>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<TableId>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(table_id))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids.
168        // After the swap above, member_table_ids_1 has the smaller first element.
169        // If the last element of member_table_ids_1 >= the first element of member_table_ids_2,
170        // the two groups' table id ranges overlap and cannot be merged.
171        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
172            return Err(Error::CompactionGroup(format!(
173                "invalid merge group_1 {} group_2 {}: table id ranges overlap",
174                left_group_id, right_group_id
175            )));
176        }
177
178        let combined_member_table_ids = member_table_ids_1
179            .iter()
180            .chain(member_table_ids_2.iter())
181            .collect_vec();
182        assert!(combined_member_table_ids.is_sorted());
183
184        // check duplicated sst_id
185        let mut sst_id_set = HashSet::new();
186        for sst_id in versioning
187            .current_version
188            .get_sst_ids_by_group_id(left_group_id)
189            .chain(
190                versioning
191                    .current_version
192                    .get_sst_ids_by_group_id(right_group_id),
193            )
194        {
195            if !sst_id_set.insert(sst_id) {
196                return Err(Error::CompactionGroup(format!(
197                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
198                    left_group_id, right_group_id, sst_id
199                )));
200            }
201        }
202
203        // check branched sst on non-overlap level
204        {
205            let left_levels = versioning
206                .current_version
207                .get_compaction_group_levels(group_1);
208
209            let right_levels = versioning
210                .current_version
211                .get_compaction_group_levels(group_2);
212
213            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
214            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
215            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
216            for level_idx in 1..=max_level {
217                let left_level = left_levels.get_level(level_idx);
218                let right_level = right_levels.get_level(level_idx);
219                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
220                    continue;
221                }
222
223                let left_last_sst = left_level.table_infos.last().unwrap().clone();
224                let right_first_sst = right_level.table_infos.first().unwrap().clone();
225                let left_sst_id = left_last_sst.sst_id;
226                let right_sst_id = right_first_sst.sst_id;
227                let left_obj_id = left_last_sst.object_id;
228                let right_obj_id = right_first_sst.object_id;
229
230                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
231                if !can_concat(&[left_last_sst, right_first_sst]) {
232                    return Err(Error::CompactionGroup(format!(
233                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
234                        left_group_id,
235                        right_group_id,
236                        level_idx,
237                        left_sst_id,
238                        right_sst_id,
239                        left_obj_id,
240                        right_obj_id
241                    )));
242                }
243            }
244        }
245
246        let mut version = HummockVersionTransaction::new(
247            &mut versioning.current_version,
248            &mut versioning.hummock_version_deltas,
249            self.env.notification_manager(),
250            None,
251            &self.metrics,
252        );
253        let mut new_version_delta = version.new_delta();
254
255        let target_compaction_group_id = {
256            // merge right_group_id to left_group_id and remove right_group_id
257            new_version_delta.group_deltas.insert(
258                left_group_id,
259                GroupDeltas {
260                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
261                        left_group_id,
262                        right_group_id,
263                    })],
264                },
265            );
266            left_group_id
267        };
268
269        // TODO: remove compaciton group_id from state_table_info
270        // rewrite compaction_group_id for all tables
271        new_version_delta.with_latest_version(|version, new_version_delta| {
272            for &table_id in combined_member_table_ids {
273                let info = version
274                    .state_table_info
275                    .info()
276                    .get(&table_id)
277                    .expect("have check exist previously");
278                assert!(
279                    new_version_delta
280                        .state_table_info_delta
281                        .insert(
282                            table_id,
283                            PbStateTableInfoDelta {
284                                committed_epoch: info.committed_epoch,
285                                compaction_group_id: target_compaction_group_id,
286                            }
287                        )
288                        .is_none()
289                );
290            }
291        });
292
293        {
294            let mut compaction_group_manager = self.compaction_group_manager.write().await;
295            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
296
297            // for metrics reclaim
298            {
299                let right_group_max_level = new_version_delta
300                    .latest_version()
301                    .get_compaction_group_levels(right_group_id)
302                    .levels
303                    .len();
304
305                remove_compaction_group_in_sst_stat(
306                    &self.metrics,
307                    right_group_id,
308                    right_group_max_level,
309                );
310            }
311
312            // clean up compaction schedule state for the merged group
313            self.compaction_state
314                .remove_compaction_group(right_group_id);
315
316            // clear `partition_vnode_count` for the hybrid group
317            {
318                if let Err(err) = compaction_groups_txn.update_compaction_config(
319                    &[left_group_id],
320                    &[MutableConfig::SplitWeightByVnode(0)], // default
321                ) {
322                    tracing::error!(
323                        error = %err.as_report(),
324                        "failed to update compaction config for group-{}",
325                        left_group_id
326                    );
327                }
328            }
329
330            new_version_delta.pre_apply();
331
332            // remove right_group_id
333            compaction_groups_txn.remove(right_group_id);
334            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
335        }
336
337        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
338        versioning.mark_next_time_travel_version_snapshot();
339
340        // cancel tasks
341        let mut canceled_tasks = vec![];
342        // after merge, all tasks in right_group_id should be canceled
343        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
344        let compact_task_assignments =
345            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
346        compact_task_assignments
347            .into_iter()
348            .for_each(|task_assignment| {
349                if let Some(task) = task_assignment.compact_task.as_ref() {
350                    assert_eq!(task.compaction_group_id, right_group_id);
351                    canceled_tasks.push(ReportTask {
352                        task_id: task.task_id,
353                        task_status: TaskStatus::ManualCanceled,
354                        table_stats_change: HashMap::default(),
355                        sorted_output_ssts: vec![],
356                        object_timestamps: HashMap::default(),
357                    });
358                }
359            });
360
361        if !canceled_tasks.is_empty() {
362            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
363                .await?;
364        }
365
366        self.metrics
367            .merge_compaction_group_count
368            .with_label_values(&[&left_group_id.to_string()])
369            .inc();
370
371        Ok(())
372    }
373}
374
375impl HummockManager {
376    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
377    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
378    /// The split will follow the following rules
379    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
380    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
381    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
382    ///   - `parent_group_id`: the `group_id` to split
383    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
384    ///   - `table_id_to_split`: the `table_id` to split
385    ///   - `vnode_to_split`: the `vnode` to split
386    ///   - `partition_vnode_count`: the partition count for the single table group if need
387    async fn split_compaction_group_impl(
388        &self,
389        parent_group_id: CompactionGroupId,
390        split_table_ids: &[StateTableId],
391        table_id_to_split: StateTableId,
392        vnode_to_split: VirtualNode,
393        partition_vnode_count: Option<u32>,
394    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
395        let mut result = vec![];
396        let compaction_guard = self.compaction.write().await;
397        let mut versioning_guard = self.versioning.write().await;
398        let versioning = versioning_guard.deref_mut();
399        // Validate parameters.
400        if !versioning
401            .current_version
402            .levels
403            .contains_key(&parent_group_id)
404        {
405            return Err(Error::CompactionGroup(format!(
406                "invalid group {}",
407                parent_group_id
408            )));
409        }
410
411        let member_table_ids = versioning
412            .current_version
413            .state_table_info
414            .compaction_group_member_table_ids(parent_group_id)
415            .iter()
416            .copied()
417            .collect::<BTreeSet<_>>();
418
419        if !member_table_ids.contains(&table_id_to_split) {
420            return Err(Error::CompactionGroup(format!(
421                "table {} doesn't in group {}",
422                table_id_to_split, parent_group_id
423            )));
424        }
425
426        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
427
428        // change to vec for partition
429        let table_ids = member_table_ids.into_iter().collect_vec();
430        if table_ids == split_table_ids {
431            return Err(Error::CompactionGroup(format!(
432                "invalid split attempt for group {}: all member tables are moved",
433                parent_group_id
434            )));
435        }
436        // avoid decode split_key when caller is aware of the table_id and vnode
437        let (table_ids_left, table_ids_right) =
438            group_split::split_table_ids_with_table_id_and_vnode(
439                &table_ids,
440                split_full_key.user_key.table_id,
441                split_full_key.user_key.get_vnode_id(),
442            );
443        if table_ids_left.is_empty() || table_ids_right.is_empty() {
444            // not need to split group if all tables are in the same side
445            if !table_ids_left.is_empty() {
446                result.push((parent_group_id, table_ids_left));
447            }
448
449            if !table_ids_right.is_empty() {
450                result.push((parent_group_id, table_ids_right));
451            }
452            return Ok(result);
453        }
454
455        result.push((parent_group_id, table_ids_left));
456
457        let split_key: Bytes = split_full_key.encode().into();
458
459        let mut version = HummockVersionTransaction::new(
460            &mut versioning.current_version,
461            &mut versioning.hummock_version_deltas,
462            self.env.notification_manager(),
463            None,
464            &self.metrics,
465        );
466        let mut new_version_delta = version.new_delta();
467
468        let split_sst_count = new_version_delta
469            .latest_version()
470            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
471
472        let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
473        let (new_compaction_group_id, config) = {
474            // All NewCompactionGroup pairs are mapped to one new compaction group.
475            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
476            // Inherit config from parent group
477            let config = self
478                .compaction_group_manager
479                .read()
480                .await
481                .try_get_compaction_group_config(parent_group_id)
482                .ok_or_else(|| {
483                    Error::CompactionGroup(format!(
484                        "parent group {} config not found",
485                        parent_group_id
486                    ))
487                })?
488                .compaction_config()
489                .as_ref()
490                .clone();
491
492            #[expect(deprecated)]
493            // fill the deprecated field with default value
494            new_version_delta.group_deltas.insert(
495                new_compaction_group_id,
496                GroupDeltas {
497                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
498                        group_config: Some(config.clone()),
499                        group_id: new_compaction_group_id,
500                        parent_group_id,
501                        new_sst_start_id,
502                        table_ids: vec![],
503                        version: CompatibilityVersion::LATEST as _, // for compatibility
504                        split_key: Some(split_key.into()),
505                    }))],
506                },
507            );
508            (new_compaction_group_id, config)
509        };
510
511        new_version_delta.with_latest_version(|version, new_version_delta| {
512            for &table_id in &table_ids_right {
513                let info = version
514                    .state_table_info
515                    .info()
516                    .get(&table_id)
517                    .expect("have check exist previously");
518                assert!(
519                    new_version_delta
520                        .state_table_info_delta
521                        .insert(
522                            table_id,
523                            PbStateTableInfoDelta {
524                                committed_epoch: info.committed_epoch,
525                                compaction_group_id: new_compaction_group_id,
526                            }
527                        )
528                        .is_none()
529                );
530            }
531        });
532
533        result.push((new_compaction_group_id, table_ids_right));
534
535        {
536            let mut compaction_group_manager = self.compaction_group_manager.write().await;
537            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
538            compaction_groups_txn
539                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
540
541            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
542            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
543            // The target `table_ids` might be split to an existing group, so we need to try to update its config
544            for (cg_id, table_ids) in &result {
545                // check the split_tables had been place to the dedicated compaction group
546                if let Some(partition_vnode_count) = partition_vnode_count
547                    && table_ids.len() == 1
548                    && table_ids == split_table_ids
549                    && let Err(err) = compaction_groups_txn.update_compaction_config(
550                        &[*cg_id],
551                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
552                    )
553                {
554                    tracing::error!(
555                        error = %err.as_report(),
556                        "failed to update compaction config for group-{}",
557                        cg_id
558                    );
559                }
560            }
561
562            new_version_delta.pre_apply();
563            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
564        }
565        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
566        versioning.mark_next_time_travel_version_snapshot();
567
568        // The expired compact tasks will be canceled.
569        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
570        let mut canceled_tasks = vec![];
571        let compact_task_assignments =
572            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
573        let levels = versioning
574            .current_version
575            .get_compaction_group_levels(parent_group_id);
576        compact_task_assignments
577            .into_iter()
578            .for_each(|task_assignment| {
579                if let Some(task) = task_assignment.compact_task.as_ref() {
580                    let is_expired = is_compaction_task_expired(
581                        task.compaction_group_version_id,
582                        levels.compaction_group_version_id,
583                    );
584                    if is_expired {
585                        canceled_tasks.push(ReportTask {
586                            task_id: task.task_id,
587                            task_status: TaskStatus::ManualCanceled,
588                            table_stats_change: HashMap::default(),
589                            sorted_output_ssts: vec![],
590                            object_timestamps: HashMap::default(),
591                        });
592                    }
593                }
594            });
595
596        if !canceled_tasks.is_empty() {
597            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
598                .await?;
599        }
600
601        self.metrics
602            .split_compaction_group_count
603            .with_label_values(&[&parent_group_id.to_string()])
604            .inc();
605
606        Ok(result)
607    }
608
609    /// Split `table_ids` to a dedicated compaction group.
610    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
611    pub async fn move_state_tables_to_dedicated_compaction_group(
612        &self,
613        parent_group_id: CompactionGroupId,
614        table_ids: &[StateTableId],
615        partition_vnode_count: Option<u32>,
616    ) -> Result<(
617        CompactionGroupId,
618        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
619    )> {
620        if table_ids.is_empty() {
621            return Err(Error::CompactionGroup(
622                "table_ids must not be empty".to_owned(),
623            ));
624        }
625
626        if !table_ids.is_sorted() {
627            return Err(Error::CompactionGroup(
628                "table_ids must be sorted".to_owned(),
629            ));
630        }
631
632        let parent_table_ids = {
633            let versioning_guard = self.versioning.read().await;
634            versioning_guard
635                .current_version
636                .state_table_info
637                .compaction_group_member_table_ids(parent_group_id)
638                .iter()
639                .copied()
640                .collect_vec()
641        };
642
643        if parent_table_ids == table_ids {
644            return Err(Error::CompactionGroup(format!(
645                "invalid split attempt for group {}: all member tables are moved",
646                parent_group_id
647            )));
648        }
649
650        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
651            // 1. table_ids in different cg are sorted.
652            {
653                cg_id_to_table_ids
654                    .iter()
655                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
656            }
657
658            // 2.table_ids in different cg are non-overlapping
659            {
660                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
661                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
662                assert!(table_table_ids_vec.concat().is_sorted());
663            }
664
665            // 3.table_ids belong to one and only one cg.
666            {
667                let mut all_table_ids = HashSet::new();
668                for table_ids in cg_id_to_table_ids.values() {
669                    for table_id in table_ids {
670                        assert!(all_table_ids.insert(*table_id));
671                    }
672                }
673            }
674        }
675
676        // move [3,4,5,6]
677        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
678        // split key
679        // 1. table_id = 3, vnode = 0, epoch = MAX
680        // 2. table_id = 7, vnode = 0, epoch = MAX
681
682        // The new compaction group id is always generate on the right side
683        // Hence, we return the first compaction group id as the result
684        // split 1
685        let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
686        let table_id_to_split = *table_ids.first().unwrap();
687        let mut target_compaction_group_id: CompactionGroupId = 0.into();
688        let result_vec = self
689            .split_compaction_group_impl(
690                parent_group_id,
691                table_ids,
692                table_id_to_split,
693                VirtualNode::ZERO,
694                partition_vnode_count,
695            )
696            .await?;
697        assert!(result_vec.len() <= 2);
698
699        let mut finish_move = false;
700        for (cg_id, table_ids_after_split) in result_vec {
701            if table_ids_after_split.contains(&table_id_to_split) {
702                target_compaction_group_id = cg_id;
703            }
704
705            if table_ids_after_split == table_ids {
706                finish_move = true;
707            }
708
709            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
710        }
711        check_table_ids_valid(&cg_id_to_table_ids);
712
713        if finish_move {
714            return Ok((target_compaction_group_id, cg_id_to_table_ids));
715        }
716
717        // split 2
718        // See the example above and the split rule in `split_compaction_group_impl`.
719        let table_id_to_split = *table_ids.last().unwrap();
720        let result_vec = self
721            .split_compaction_group_impl(
722                target_compaction_group_id,
723                table_ids,
724                table_id_to_split,
725                VirtualNode::MAX_REPRESENTABLE,
726                partition_vnode_count,
727            )
728            .await?;
729        assert!(result_vec.len() <= 2);
730        for (cg_id, table_ids_after_split) in result_vec {
731            if table_ids_after_split.contains(&table_id_to_split) {
732                target_compaction_group_id = cg_id;
733            }
734            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
735        }
736        check_table_ids_valid(&cg_id_to_table_ids);
737
738        Ok((target_compaction_group_id, cg_id_to_table_ids))
739    }
740}
741
742impl HummockManager {
743    /// Split the compaction group if the group is too large or contains high throughput tables.
744    pub async fn try_split_compaction_group(
745        &self,
746        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
747        group: CompactionGroupStatistic,
748    ) {
749        if group
750            .compaction_group_config
751            .compaction_config
752            .disable_auto_group_scheduling
753            .unwrap_or(false)
754        {
755            return;
756        }
757        // split high throughput table to dedicated compaction group
758        for (table_id, table_size) in &group.table_statistic {
759            self.try_move_high_throughput_table_to_dedicated_cg(
760                table_write_throughput_statistic_manager,
761                *table_id,
762                table_size,
763                group.group_id,
764            )
765            .await;
766        }
767
768        // split the huge group to multiple groups
769        self.try_split_huge_compaction_group(group).await;
770    }
771
772    /// Try to move the high throughput table to a dedicated compaction group.
773    pub async fn try_move_high_throughput_table_to_dedicated_cg(
774        &self,
775        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
776        table_id: TableId,
777        _table_size: &u64,
778        parent_group_id: CompactionGroupId,
779    ) {
780        let mut table_throughput = table_write_throughput_statistic_manager
781            .get_table_throughput_descending(
782                table_id,
783                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
784            )
785            .peekable();
786
787        if table_throughput.peek().is_none() {
788            return;
789        }
790
791        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
792            table_throughput,
793            self.env.opts.table_high_write_throughput_threshold,
794            self.env
795                .opts
796                .table_stat_high_write_throughput_ratio_for_split,
797        );
798
799        // do not split a table to dedicated compaction group if it is not high write throughput
800        if !is_high_write_throughput {
801            return;
802        }
803
804        let ret = self
805            .move_state_tables_to_dedicated_compaction_group(
806                parent_group_id,
807                &[table_id],
808                Some(self.env.opts.partition_vnode_count),
809            )
810            .await;
811        match ret {
812            Ok(split_result) => {
813                tracing::info!(
814                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
815                    table_id,
816                    parent_group_id,
817                    self.env.opts.partition_vnode_count,
818                    split_result
819                );
820            }
821            Err(e) => {
822                tracing::info!(
823                    error = %e.as_report(),
824                    "failed to split state table [{}] from group-{}",
825                    table_id,
826                    parent_group_id,
827                )
828            }
829        }
830    }
831
832    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
833        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
834            * self.env.opts.split_group_size_ratio) as u64;
835        let is_huge_hybrid_group =
836            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
837        if is_huge_hybrid_group {
838            let mut accumulated_size = 0;
839            let mut table_ids = Vec::default();
840            for (table_id, table_size) in &group.table_statistic {
841                accumulated_size += table_size;
842                table_ids.push(*table_id);
843                // split if the accumulated size is greater than half of the group size
844                // avoid split a small table to dedicated compaction group and trigger multiple merge
845                assert!(table_ids.is_sorted());
846                let remaining_size = group.group_size.saturating_sub(accumulated_size);
847                if accumulated_size > group_max_size / 2
848                    && remaining_size > 0
849                    && table_ids.len() < group.table_statistic.len()
850                {
851                    let ret = self
852                        .move_state_tables_to_dedicated_compaction_group(
853                            group.group_id,
854                            &table_ids,
855                            None,
856                        )
857                        .await;
858                    match ret {
859                        Ok(split_result) => {
860                            tracing::info!(
861                                "split_huge_compaction_group success {:?}",
862                                split_result
863                            );
864                            self.metrics
865                                .split_compaction_group_count
866                                .with_label_values(&[&group.group_id.to_string()])
867                                .inc();
868                            return;
869                        }
870                        Err(e) => {
871                            tracing::error!(
872                                error = %e.as_report(),
873                                "failed to split_huge_compaction_group table {:?} from group-{}",
874                                table_ids,
875                                group.group_id
876                            );
877
878                            return;
879                        }
880                    }
881                }
882            }
883        }
884    }
885
886    pub async fn try_merge_compaction_group(
887        &self,
888        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
889        group: &CompactionGroupStatistic,
890        next_group: &CompactionGroupStatistic,
891        created_tables: &HashSet<TableId>,
892    ) -> Result<()> {
893        GroupMergeValidator::validate_group_merge(
894            group,
895            next_group,
896            created_tables,
897            table_write_throughput_statistic_manager,
898            &self.env.opts,
899            &self.versioning,
900        )
901        .await?;
902
903        let result = self
904            .merge_compaction_group(group.group_id, next_group.group_id)
905            .await;
906
907        match &result {
908            Ok(()) => {
909                tracing::info!(
910                    "merge group-{} to group-{}",
911                    next_group.group_id,
912                    group.group_id,
913                );
914
915                self.metrics
916                    .merge_compaction_group_count
917                    .with_label_values(&[&group.group_id.to_string()])
918                    .inc();
919            }
920            Err(e) => {
921                tracing::info!(
922                    error = %e.as_report(),
923                    "failed to merge group-{} group-{}",
924                    next_group.group_id,
925                    group.group_id,
926                );
927            }
928        }
929
930        result
931    }
932}
933
934#[derive(Debug, Default)]
935struct GroupMergeValidator {}
936
937impl GroupMergeValidator {
938    /// Check if two groups have compatible compaction configs for merging.
939    /// Ignores `split_weight_by_vnode` since it's per-table and will be reset after merge.
940    fn is_merge_compatible_by_semantics(
941        group: &CompactionGroupStatistic,
942        next_group: &CompactionGroupStatistic,
943    ) -> bool {
944        let (mut left, mut right) = (
945            group
946                .compaction_group_config
947                .compaction_config
948                .as_ref()
949                .clone(),
950            next_group
951                .compaction_group_config
952                .compaction_config
953                .as_ref()
954                .clone(),
955        );
956        left.split_weight_by_vnode = 0;
957        right.split_weight_by_vnode = 0;
958        left == right
959    }
960
961    /// Check if the table is high write throughput with the given threshold and ratio.
962    pub fn is_table_high_write_throughput(
963        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
964        threshold: u64,
965        high_write_throughput_ratio: f64,
966    ) -> bool {
967        let mut sample_size = 0;
968        let mut high_write_throughput_count = 0;
969        for statistic in table_throughput {
970            sample_size += 1;
971            if statistic.throughput > threshold {
972                high_write_throughput_count += 1;
973            }
974        }
975
976        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
977    }
978
979    pub fn is_table_low_write_throughput(
980        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
981        threshold: u64,
982        low_write_throughput_ratio: f64,
983    ) -> bool {
984        let mut sample_size = 0;
985        let mut low_write_throughput_count = 0;
986        for statistic in table_throughput {
987            sample_size += 1;
988            if statistic.throughput <= threshold {
989                low_write_throughput_count += 1;
990            }
991        }
992
993        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
994    }
995
996    fn check_is_low_write_throughput_compaction_group(
997        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
998        group: &CompactionGroupStatistic,
999        opts: &Arc<MetaOpts>,
1000    ) -> bool {
1001        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1002        for table_id in group.table_statistic.keys() {
1003            let mut table_throughput = table_write_throughput_statistic_manager
1004                .get_table_throughput_descending(
1005                    *table_id,
1006                    opts.table_stat_throuput_window_seconds_for_merge as i64,
1007                )
1008                .peekable();
1009            if table_throughput.peek().is_none() {
1010                continue;
1011            }
1012
1013            table_with_statistic.push(table_throughput);
1014        }
1015
1016        // if all tables in the group do not have enough statistics, return true
1017        if table_with_statistic.is_empty() {
1018            return true;
1019        }
1020
1021        // check if all tables in the group are low write throughput with enough statistics
1022        table_with_statistic.into_iter().all(|table_throughput| {
1023            Self::is_table_low_write_throughput(
1024                table_throughput,
1025                opts.table_low_write_throughput_threshold,
1026                opts.table_stat_low_write_throughput_ratio_for_merge,
1027            )
1028        })
1029    }
1030
1031    fn check_is_creating_compaction_group(
1032        group: &CompactionGroupStatistic,
1033        created_tables: &HashSet<TableId>,
1034    ) -> bool {
1035        group
1036            .table_statistic
1037            .keys()
1038            .any(|table_id| !created_tables.contains(table_id))
1039    }
1040
1041    async fn validate_group_merge(
1042        group: &CompactionGroupStatistic,
1043        next_group: &CompactionGroupStatistic,
1044        created_tables: &HashSet<TableId>,
1045        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1046        opts: &Arc<MetaOpts>,
1047        versioning: &MonitoredRwLock<Versioning>,
1048    ) -> Result<()> {
1049        // TODO: remove this check after refactor group id
1050        if (group.group_id == StaticCompactionGroupId::StateDefault
1051            && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1052            || (group.group_id == StaticCompactionGroupId::MaterializedView
1053                && next_group.group_id == StaticCompactionGroupId::StateDefault)
1054        {
1055            return Err(Error::CompactionGroup(format!(
1056                "group-{} and group-{} are both StaticCompactionGroupId",
1057                group.group_id, next_group.group_id
1058            )));
1059        }
1060
1061        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1062            return Err(Error::CompactionGroup(format!(
1063                "group-{} or group-{} is empty",
1064                group.group_id, next_group.group_id
1065            )));
1066        }
1067
1068        // Check non-overlapping table ids early to avoid acquiring heavyweight write locks
1069        // in merge_compaction_group_impl only to fail at the overlap check.
1070        // Sort both sides and ensure table_ids_1 has the smaller first element,
1071        // then reject if table_ids_1's last element >= table_ids_2's first element (overlap).
1072        {
1073            let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1074            let mut table_ids_2: Vec<TableId> =
1075                next_group.table_statistic.keys().cloned().collect_vec();
1076            table_ids_1.sort();
1077            table_ids_2.sort();
1078            if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1079                std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1080            }
1081            if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1082                return Err(Error::CompactionGroup(format!(
1083                    "group-{} and group-{} have overlapping table id ranges, not mergeable",
1084                    group.group_id, next_group.group_id
1085                )));
1086            }
1087        }
1088
1089        if group
1090            .compaction_group_config
1091            .compaction_config
1092            .disable_auto_group_scheduling
1093            .unwrap_or(false)
1094            || next_group
1095                .compaction_group_config
1096                .compaction_config
1097                .disable_auto_group_scheduling
1098                .unwrap_or(false)
1099        {
1100            return Err(Error::CompactionGroup(format!(
1101                "group-{} or group-{} disable_auto_group_scheduling",
1102                group.group_id, next_group.group_id
1103            )));
1104        }
1105
1106        // Keep merge compatibility as a feature, but ignore split_weight_by_vnode, because it is
1107        // only used for per-table split behavior and will be reset after merge.
1108        if !Self::is_merge_compatible_by_semantics(group, next_group) {
1109            let left_config = group.compaction_group_config.compaction_config.as_ref();
1110            let right_config = next_group
1111                .compaction_group_config
1112                .compaction_config
1113                .as_ref();
1114
1115            tracing::warn!(
1116                group_id = %group.group_id,
1117                next_group_id = %next_group.group_id,
1118                left_config = ?left_config,
1119                right_config = ?right_config,
1120                "compaction config semantic mismatch detected while merging compaction groups"
1121            );
1122
1123            return Err(Error::CompactionGroup(format!(
1124                "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1125                group.group_id, next_group.group_id, left_config, right_config
1126            )));
1127        }
1128
1129        // do not merge the compaction group which is creating
1130        if Self::check_is_creating_compaction_group(group, created_tables) {
1131            return Err(Error::CompactionGroup(format!(
1132                "Cannot merge creating group {} next_group {}",
1133                group.group_id, next_group.group_id
1134            )));
1135        }
1136
1137        // do not merge high throughput group
1138        if !Self::check_is_low_write_throughput_compaction_group(
1139            table_write_throughput_statistic_manager,
1140            group,
1141            opts,
1142        ) {
1143            return Err(Error::CompactionGroup(format!(
1144                "Cannot merge high throughput group {} next_group {}",
1145                group.group_id, next_group.group_id
1146            )));
1147        }
1148
1149        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1150            * opts.split_group_size_ratio) as u64;
1151
1152        if (group.group_size + next_group.group_size) > size_limit {
1153            return Err(Error::CompactionGroup(format!(
1154                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1155                group.group_id,
1156                group.group_size,
1157                next_group.group_id,
1158                next_group.group_size,
1159                size_limit
1160            )));
1161        }
1162
1163        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1164            return Err(Error::CompactionGroup(format!(
1165                "Cannot merge creating group {} next group {}",
1166                group.group_id, next_group.group_id
1167            )));
1168        }
1169
1170        if !Self::check_is_low_write_throughput_compaction_group(
1171            table_write_throughput_statistic_manager,
1172            next_group,
1173            opts,
1174        ) {
1175            return Err(Error::CompactionGroup(format!(
1176                "Cannot merge high throughput group {} next group {}",
1177                group.group_id, next_group.group_id
1178            )));
1179        }
1180
1181        {
1182            // Avoid merge when the group is in emergency state
1183            let versioning_guard = versioning.read().await;
1184            let levels = &versioning_guard.current_version.levels;
1185            if !levels.contains_key(&group.group_id) {
1186                return Err(Error::CompactionGroup(format!(
1187                    "Cannot merge group {} not exist",
1188                    group.group_id
1189                )));
1190            }
1191
1192            if !levels.contains_key(&next_group.group_id) {
1193                return Err(Error::CompactionGroup(format!(
1194                    "Cannot merge next group {} not exist",
1195                    next_group.group_id
1196                )));
1197            }
1198
1199            let group_levels = versioning_guard
1200                .current_version
1201                .get_compaction_group_levels(group.group_id);
1202
1203            let next_group_levels = versioning_guard
1204                .current_version
1205                .get_compaction_group_levels(next_group.group_id);
1206
1207            let group_state = GroupStateValidator::group_state(
1208                group_levels,
1209                group.compaction_group_config.compaction_config().deref(),
1210            );
1211
1212            if group_state.is_write_stop() || group_state.is_emergency() {
1213                return Err(Error::CompactionGroup(format!(
1214                    "Cannot merge write limit group {} next group {}",
1215                    group.group_id, next_group.group_id
1216                )));
1217            }
1218
1219            let next_group_state = GroupStateValidator::group_state(
1220                next_group_levels,
1221                next_group
1222                    .compaction_group_config
1223                    .compaction_config()
1224                    .deref(),
1225            );
1226
1227            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1228                return Err(Error::CompactionGroup(format!(
1229                    "Cannot merge write limit next group {} group {}",
1230                    next_group.group_id, group.group_id
1231                )));
1232            }
1233
1234            // check whether the group is in the write stop state after merge
1235            let l0_sub_level_count_after_merge =
1236                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1237            if GroupStateValidator::write_stop_l0_file_count(
1238                (l0_sub_level_count_after_merge as f64
1239                    * opts.compaction_group_merge_dimension_threshold) as usize,
1240                group.compaction_group_config.compaction_config().deref(),
1241            ) {
1242                return Err(Error::CompactionGroup(format!(
1243                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1244                    group.group_id, next_group.group_id
1245                )));
1246            }
1247
1248            let l0_file_count_after_merge =
1249                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1250            if GroupStateValidator::write_stop_l0_file_count(
1251                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1252                    as usize,
1253                group.compaction_group_config.compaction_config().deref(),
1254            ) {
1255                return Err(Error::CompactionGroup(format!(
1256                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1257                    next_group.group_id, group.group_id
1258                )));
1259            }
1260
1261            let l0_size_after_merge =
1262                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1263
1264            if GroupStateValidator::write_stop_l0_size(
1265                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1266                    as u64,
1267                group.compaction_group_config.compaction_config().deref(),
1268            ) {
1269                return Err(Error::CompactionGroup(format!(
1270                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1271                    next_group.group_id, group.group_id
1272                )));
1273            }
1274
1275            // check whether the group is in the emergency state after merge
1276            if GroupStateValidator::emergency_l0_file_count(
1277                (l0_sub_level_count_after_merge as f64
1278                    * opts.compaction_group_merge_dimension_threshold) as usize,
1279                group.compaction_group_config.compaction_config().deref(),
1280            ) {
1281                return Err(Error::CompactionGroup(format!(
1282                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1283                    group.group_id, next_group.group_id
1284                )));
1285            }
1286        }
1287
1288        Ok(())
1289    }
1290}