risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<TableId>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<TableId>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            #[expect(clippy::assertions_on_constants)]
122            {
123                assert!(cfg!(debug_assertions));
124            }
125            created_tables
126        } else {
127            match self.metadata_manager.get_created_table_ids().await {
128                Ok(created_tables) => HashSet::from_iter(created_tables),
129                Err(err) => {
130                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
131                    return Err(Error::CompactionGroup(format!(
132                        "merge group_1 {} group_2 {} failed to fetch created table ids",
133                        group_1, group_2
134                    )));
135                }
136            }
137        };
138
139        fn contains_creating_table(
140            table_ids: &Vec<TableId>,
141            created_tables: &HashSet<TableId>,
142        ) -> bool {
143            table_ids
144                .iter()
145                .any(|table_id| !created_tables.contains(table_id))
146        }
147
148        // do not merge the compaction group which is creating
149        if contains_creating_table(&member_table_ids_1, &created_tables)
150            || contains_creating_table(&member_table_ids_2, &created_tables)
151        {
152            return Err(Error::CompactionGroup(format!(
153                "Cannot merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
154                group_1, group_2, member_table_ids_1, member_table_ids_2
155            )));
156        }
157
158        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
159        let (left_group_id, right_group_id) =
160            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
161                (group_1, group_2)
162            } else {
163                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
164                (group_2, group_1)
165            };
166
167        // We can only merge two groups with non-overlapping member table ids.
168        // After the swap above, member_table_ids_1 has the smaller first element.
169        // If the last element of member_table_ids_1 >= the first element of member_table_ids_2,
170        // the two groups' table id ranges overlap and cannot be merged.
171        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
172            return Err(Error::CompactionGroup(format!(
173                "invalid merge group_1 {} group_2 {}: table id ranges overlap",
174                left_group_id, right_group_id
175            )));
176        }
177
178        let combined_member_table_ids = member_table_ids_1
179            .iter()
180            .chain(member_table_ids_2.iter())
181            .collect_vec();
182        assert!(combined_member_table_ids.is_sorted());
183
184        // check duplicated sst_id
185        let mut sst_id_set = HashSet::new();
186        for sst_id in versioning
187            .current_version
188            .get_sst_ids_by_group_id(left_group_id)
189            .chain(
190                versioning
191                    .current_version
192                    .get_sst_ids_by_group_id(right_group_id),
193            )
194        {
195            if !sst_id_set.insert(sst_id) {
196                return Err(Error::CompactionGroup(format!(
197                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
198                    left_group_id, right_group_id, sst_id
199                )));
200            }
201        }
202
203        // check branched sst on non-overlap level
204        {
205            let left_levels = versioning
206                .current_version
207                .get_compaction_group_levels(group_1);
208
209            let right_levels = versioning
210                .current_version
211                .get_compaction_group_levels(group_2);
212
213            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
214            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
215            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
216            for level_idx in 1..=max_level {
217                let left_level = left_levels.get_level(level_idx);
218                let right_level = right_levels.get_level(level_idx);
219                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
220                    continue;
221                }
222
223                let left_last_sst = left_level.table_infos.last().unwrap().clone();
224                let right_first_sst = right_level.table_infos.first().unwrap().clone();
225                let left_sst_id = left_last_sst.sst_id;
226                let right_sst_id = right_first_sst.sst_id;
227                let left_obj_id = left_last_sst.object_id;
228                let right_obj_id = right_first_sst.object_id;
229
230                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
231                if !can_concat(&[left_last_sst, right_first_sst]) {
232                    return Err(Error::CompactionGroup(format!(
233                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
234                        left_group_id,
235                        right_group_id,
236                        level_idx,
237                        left_sst_id,
238                        right_sst_id,
239                        left_obj_id,
240                        right_obj_id
241                    )));
242                }
243            }
244        }
245
246        let mut version = HummockVersionTransaction::new(
247            &mut versioning.current_version,
248            &mut versioning.hummock_version_deltas,
249            &mut versioning.table_change_log,
250            self.env.notification_manager(),
251            None,
252            &self.metrics,
253            &self.env.opts,
254        );
255        let mut new_version_delta = version.new_delta();
256
257        let target_compaction_group_id = {
258            // merge right_group_id to left_group_id and remove right_group_id
259            new_version_delta.group_deltas.insert(
260                left_group_id,
261                GroupDeltas {
262                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
263                        left_group_id,
264                        right_group_id,
265                    })],
266                },
267            );
268            left_group_id
269        };
270
271        // TODO: remove compaciton group_id from state_table_info
272        // rewrite compaction_group_id for all tables
273        new_version_delta.with_latest_version(|version, new_version_delta| {
274            for &table_id in combined_member_table_ids {
275                let info = version
276                    .state_table_info
277                    .info()
278                    .get(&table_id)
279                    .expect("have check exist previously");
280                assert!(
281                    new_version_delta
282                        .state_table_info_delta
283                        .insert(
284                            table_id,
285                            PbStateTableInfoDelta {
286                                committed_epoch: info.committed_epoch,
287                                compaction_group_id: target_compaction_group_id,
288                            }
289                        )
290                        .is_none()
291                );
292            }
293        });
294
295        {
296            let mut compaction_group_manager = self.compaction_group_manager.write().await;
297            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
298
299            // for metrics reclaim
300            {
301                let right_group_max_level = new_version_delta
302                    .latest_version()
303                    .get_compaction_group_levels(right_group_id)
304                    .levels
305                    .len();
306
307                remove_compaction_group_in_sst_stat(
308                    &self.metrics,
309                    right_group_id,
310                    right_group_max_level,
311                );
312            }
313
314            // clean up compaction schedule state for the merged group
315            self.compaction_state
316                .remove_compaction_group(right_group_id);
317
318            // clear `partition_vnode_count` for the hybrid group
319            {
320                if let Err(err) = compaction_groups_txn.update_compaction_config(
321                    &[left_group_id],
322                    &[MutableConfig::SplitWeightByVnode(0)], // default
323                ) {
324                    tracing::error!(
325                        error = %err.as_report(),
326                        "failed to update compaction config for group-{}",
327                        left_group_id
328                    );
329                }
330            }
331
332            new_version_delta.pre_apply();
333
334            // remove right_group_id
335            compaction_groups_txn.remove(right_group_id);
336            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
337        }
338
339        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
340        versioning.mark_next_time_travel_version_snapshot();
341
342        // cancel tasks
343        let mut canceled_tasks = vec![];
344        // after merge, all tasks in right_group_id should be canceled
345        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
346        let compact_task_assignments =
347            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
348        compact_task_assignments
349            .into_iter()
350            .for_each(|task_assignment| {
351                if let Some(task) = task_assignment.compact_task.as_ref() {
352                    assert_eq!(task.compaction_group_id, right_group_id);
353                    canceled_tasks.push(ReportTask {
354                        task_id: task.task_id,
355                        task_status: TaskStatus::ManualCanceled,
356                        table_stats_change: HashMap::default(),
357                        sorted_output_ssts: vec![],
358                        object_timestamps: HashMap::default(),
359                    });
360                }
361            });
362
363        if !canceled_tasks.is_empty() {
364            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
365                .await?;
366        }
367
368        self.metrics
369            .merge_compaction_group_count
370            .with_label_values(&[&left_group_id.to_string()])
371            .inc();
372
373        Ok(())
374    }
375}
376
377impl HummockManager {
378    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
379    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
380    /// The split will follow the following rules
381    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
382    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
383    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
384    ///   - `parent_group_id`: the `group_id` to split
385    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
386    ///   - `table_id_to_split`: the `table_id` to split
387    ///   - `vnode_to_split`: the `vnode` to split
388    ///   - `partition_vnode_count`: the partition count for the single table group if need
389    async fn split_compaction_group_impl(
390        &self,
391        parent_group_id: CompactionGroupId,
392        split_table_ids: &[StateTableId],
393        table_id_to_split: StateTableId,
394        vnode_to_split: VirtualNode,
395        partition_vnode_count: Option<u32>,
396    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
397        let mut result = vec![];
398        let compaction_guard = self.compaction.write().await;
399        let mut versioning_guard = self.versioning.write().await;
400        let versioning = versioning_guard.deref_mut();
401        // Validate parameters.
402        if !versioning
403            .current_version
404            .levels
405            .contains_key(&parent_group_id)
406        {
407            return Err(Error::CompactionGroup(format!(
408                "invalid group {}",
409                parent_group_id
410            )));
411        }
412
413        let member_table_ids = versioning
414            .current_version
415            .state_table_info
416            .compaction_group_member_table_ids(parent_group_id)
417            .iter()
418            .copied()
419            .collect::<BTreeSet<_>>();
420
421        if !member_table_ids.contains(&table_id_to_split) {
422            return Err(Error::CompactionGroup(format!(
423                "table {} doesn't in group {}",
424                table_id_to_split, parent_group_id
425            )));
426        }
427
428        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
429
430        // change to vec for partition
431        let table_ids = member_table_ids.into_iter().collect_vec();
432        if table_ids == split_table_ids {
433            return Err(Error::CompactionGroup(format!(
434                "invalid split attempt for group {}: all member tables are moved",
435                parent_group_id
436            )));
437        }
438        // avoid decode split_key when caller is aware of the table_id and vnode
439        let (table_ids_left, table_ids_right) =
440            group_split::split_table_ids_with_table_id_and_vnode(
441                &table_ids,
442                split_full_key.user_key.table_id,
443                split_full_key.user_key.get_vnode_id(),
444            );
445        if table_ids_left.is_empty() || table_ids_right.is_empty() {
446            // not need to split group if all tables are in the same side
447            if !table_ids_left.is_empty() {
448                result.push((parent_group_id, table_ids_left));
449            }
450
451            if !table_ids_right.is_empty() {
452                result.push((parent_group_id, table_ids_right));
453            }
454            return Ok(result);
455        }
456
457        result.push((parent_group_id, table_ids_left));
458
459        let split_key: Bytes = split_full_key.encode().into();
460
461        let mut version = HummockVersionTransaction::new(
462            &mut versioning.current_version,
463            &mut versioning.hummock_version_deltas,
464            &mut versioning.table_change_log,
465            self.env.notification_manager(),
466            None,
467            &self.metrics,
468            &self.env.opts,
469        );
470        let mut new_version_delta = version.new_delta();
471
472        let split_sst_count = new_version_delta
473            .latest_version()
474            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
475
476        let new_sst_start_id = next_sstable_id(&self.env, split_sst_count).await?;
477        let (new_compaction_group_id, config) = {
478            // All NewCompactionGroup pairs are mapped to one new compaction group.
479            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
480            // Inherit config from parent group
481            let config = self
482                .compaction_group_manager
483                .read()
484                .await
485                .try_get_compaction_group_config(parent_group_id)
486                .ok_or_else(|| {
487                    Error::CompactionGroup(format!(
488                        "parent group {} config not found",
489                        parent_group_id
490                    ))
491                })?
492                .compaction_config()
493                .as_ref()
494                .clone();
495
496            #[expect(deprecated)]
497            // fill the deprecated field with default value
498            new_version_delta.group_deltas.insert(
499                new_compaction_group_id,
500                GroupDeltas {
501                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
502                        group_config: Some(config.clone()),
503                        group_id: new_compaction_group_id,
504                        parent_group_id,
505                        new_sst_start_id,
506                        table_ids: vec![],
507                        version: CompatibilityVersion::LATEST as _, // for compatibility
508                        split_key: Some(split_key.into()),
509                    }))],
510                },
511            );
512            (new_compaction_group_id, config)
513        };
514
515        new_version_delta.with_latest_version(|version, new_version_delta| {
516            for &table_id in &table_ids_right {
517                let info = version
518                    .state_table_info
519                    .info()
520                    .get(&table_id)
521                    .expect("have check exist previously");
522                assert!(
523                    new_version_delta
524                        .state_table_info_delta
525                        .insert(
526                            table_id,
527                            PbStateTableInfoDelta {
528                                committed_epoch: info.committed_epoch,
529                                compaction_group_id: new_compaction_group_id,
530                            }
531                        )
532                        .is_none()
533                );
534            }
535        });
536
537        result.push((new_compaction_group_id, table_ids_right));
538
539        {
540            let mut compaction_group_manager = self.compaction_group_manager.write().await;
541            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
542            compaction_groups_txn
543                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
544
545            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
546            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
547            // The target `table_ids` might be split to an existing group, so we need to try to update its config
548            for (cg_id, table_ids) in &result {
549                // check the split_tables had been place to the dedicated compaction group
550                if let Some(partition_vnode_count) = partition_vnode_count
551                    && table_ids.len() == 1
552                    && table_ids == split_table_ids
553                    && let Err(err) = compaction_groups_txn.update_compaction_config(
554                        &[*cg_id],
555                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
556                    )
557                {
558                    tracing::error!(
559                        error = %err.as_report(),
560                        "failed to update compaction config for group-{}",
561                        cg_id
562                    );
563                }
564            }
565
566            new_version_delta.pre_apply();
567            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
568        }
569        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
570        versioning.mark_next_time_travel_version_snapshot();
571
572        // The expired compact tasks will be canceled.
573        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
574        let mut canceled_tasks = vec![];
575        let compact_task_assignments =
576            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
577        let levels = versioning
578            .current_version
579            .get_compaction_group_levels(parent_group_id);
580        compact_task_assignments
581            .into_iter()
582            .for_each(|task_assignment| {
583                if let Some(task) = task_assignment.compact_task.as_ref() {
584                    let is_expired = is_compaction_task_expired(
585                        task.compaction_group_version_id,
586                        levels.compaction_group_version_id,
587                    );
588                    if is_expired {
589                        canceled_tasks.push(ReportTask {
590                            task_id: task.task_id,
591                            task_status: TaskStatus::ManualCanceled,
592                            table_stats_change: HashMap::default(),
593                            sorted_output_ssts: vec![],
594                            object_timestamps: HashMap::default(),
595                        });
596                    }
597                }
598            });
599
600        if !canceled_tasks.is_empty() {
601            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
602                .await?;
603        }
604
605        self.metrics
606            .split_compaction_group_count
607            .with_label_values(&[&parent_group_id.to_string()])
608            .inc();
609
610        Ok(result)
611    }
612
613    /// Split `table_ids` to a dedicated compaction group.
614    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
615    pub async fn move_state_tables_to_dedicated_compaction_group(
616        &self,
617        parent_group_id: CompactionGroupId,
618        table_ids: &[StateTableId],
619        partition_vnode_count: Option<u32>,
620    ) -> Result<(
621        CompactionGroupId,
622        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
623    )> {
624        if table_ids.is_empty() {
625            return Err(Error::CompactionGroup(
626                "table_ids must not be empty".to_owned(),
627            ));
628        }
629
630        if !table_ids.is_sorted() {
631            return Err(Error::CompactionGroup(
632                "table_ids must be sorted".to_owned(),
633            ));
634        }
635
636        let parent_table_ids = {
637            let versioning_guard = self.versioning.read().await;
638            versioning_guard
639                .current_version
640                .state_table_info
641                .compaction_group_member_table_ids(parent_group_id)
642                .iter()
643                .copied()
644                .collect_vec()
645        };
646
647        if parent_table_ids == table_ids {
648            return Err(Error::CompactionGroup(format!(
649                "invalid split attempt for group {}: all member tables are moved",
650                parent_group_id
651            )));
652        }
653
654        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<CompactionGroupId, Vec<TableId>>) {
655            // 1. table_ids in different cg are sorted.
656            {
657                cg_id_to_table_ids
658                    .iter()
659                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
660            }
661
662            // 2.table_ids in different cg are non-overlapping
663            {
664                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
665                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
666                assert!(table_table_ids_vec.concat().is_sorted());
667            }
668
669            // 3.table_ids belong to one and only one cg.
670            {
671                let mut all_table_ids = HashSet::new();
672                for table_ids in cg_id_to_table_ids.values() {
673                    for table_id in table_ids {
674                        assert!(all_table_ids.insert(*table_id));
675                    }
676                }
677            }
678        }
679
680        // move [3,4,5,6]
681        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
682        // split key
683        // 1. table_id = 3, vnode = 0, epoch = MAX
684        // 2. table_id = 7, vnode = 0, epoch = MAX
685
686        // The new compaction group id is always generate on the right side
687        // Hence, we return the first compaction group id as the result
688        // split 1
689        let mut cg_id_to_table_ids: BTreeMap<CompactionGroupId, Vec<TableId>> = BTreeMap::new();
690        let table_id_to_split = *table_ids.first().unwrap();
691        let mut target_compaction_group_id: CompactionGroupId = 0.into();
692        let result_vec = self
693            .split_compaction_group_impl(
694                parent_group_id,
695                table_ids,
696                table_id_to_split,
697                VirtualNode::ZERO,
698                partition_vnode_count,
699            )
700            .await?;
701        assert!(result_vec.len() <= 2);
702
703        let mut finish_move = false;
704        for (cg_id, table_ids_after_split) in result_vec {
705            if table_ids_after_split.contains(&table_id_to_split) {
706                target_compaction_group_id = cg_id;
707            }
708
709            if table_ids_after_split == table_ids {
710                finish_move = true;
711            }
712
713            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
714        }
715        check_table_ids_valid(&cg_id_to_table_ids);
716
717        if finish_move {
718            return Ok((target_compaction_group_id, cg_id_to_table_ids));
719        }
720
721        // split 2
722        // See the example above and the split rule in `split_compaction_group_impl`.
723        let table_id_to_split = *table_ids.last().unwrap();
724        let result_vec = self
725            .split_compaction_group_impl(
726                target_compaction_group_id,
727                table_ids,
728                table_id_to_split,
729                VirtualNode::MAX_REPRESENTABLE,
730                partition_vnode_count,
731            )
732            .await?;
733        assert!(result_vec.len() <= 2);
734        for (cg_id, table_ids_after_split) in result_vec {
735            if table_ids_after_split.contains(&table_id_to_split) {
736                target_compaction_group_id = cg_id;
737            }
738            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
739        }
740        check_table_ids_valid(&cg_id_to_table_ids);
741
742        Ok((target_compaction_group_id, cg_id_to_table_ids))
743    }
744}
745
746impl HummockManager {
747    /// Split the compaction group if the group is too large or contains high throughput tables.
748    pub async fn try_split_compaction_group(
749        &self,
750        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
751        group: CompactionGroupStatistic,
752    ) {
753        if group
754            .compaction_group_config
755            .compaction_config
756            .disable_auto_group_scheduling
757            .unwrap_or(false)
758        {
759            return;
760        }
761        // split high throughput table to dedicated compaction group
762        for (table_id, table_size) in &group.table_statistic {
763            self.try_move_high_throughput_table_to_dedicated_cg(
764                table_write_throughput_statistic_manager,
765                *table_id,
766                table_size,
767                group.group_id,
768            )
769            .await;
770        }
771
772        // split the huge group to multiple groups
773        self.try_split_huge_compaction_group(group).await;
774    }
775
776    /// Try to move the high throughput table to a dedicated compaction group.
777    pub async fn try_move_high_throughput_table_to_dedicated_cg(
778        &self,
779        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
780        table_id: TableId,
781        _table_size: &u64,
782        parent_group_id: CompactionGroupId,
783    ) {
784        let mut table_throughput = table_write_throughput_statistic_manager
785            .get_table_throughput_descending(
786                table_id,
787                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
788            )
789            .peekable();
790
791        if table_throughput.peek().is_none() {
792            return;
793        }
794
795        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
796            table_throughput,
797            self.env.opts.table_high_write_throughput_threshold,
798            self.env
799                .opts
800                .table_stat_high_write_throughput_ratio_for_split,
801        );
802
803        // do not split a table to dedicated compaction group if it is not high write throughput
804        if !is_high_write_throughput {
805            return;
806        }
807
808        let ret = self
809            .move_state_tables_to_dedicated_compaction_group(
810                parent_group_id,
811                &[table_id],
812                Some(self.env.opts.partition_vnode_count),
813            )
814            .await;
815        match ret {
816            Ok(split_result) => {
817                tracing::info!(
818                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
819                    table_id,
820                    parent_group_id,
821                    self.env.opts.partition_vnode_count,
822                    split_result
823                );
824            }
825            Err(e) => {
826                tracing::info!(
827                    error = %e.as_report(),
828                    "failed to split state table [{}] from group-{}",
829                    table_id,
830                    parent_group_id,
831                )
832            }
833        }
834    }
835
836    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
837        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
838            * self.env.opts.split_group_size_ratio) as u64;
839        let is_huge_hybrid_group =
840            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
841        if is_huge_hybrid_group {
842            let mut accumulated_size = 0;
843            let mut table_ids = Vec::default();
844            for (table_id, table_size) in &group.table_statistic {
845                accumulated_size += table_size;
846                table_ids.push(*table_id);
847                // split if the accumulated size is greater than half of the group size
848                // avoid split a small table to dedicated compaction group and trigger multiple merge
849                assert!(table_ids.is_sorted());
850                let remaining_size = group.group_size.saturating_sub(accumulated_size);
851                if accumulated_size > group_max_size / 2
852                    && remaining_size > 0
853                    && table_ids.len() < group.table_statistic.len()
854                {
855                    let ret = self
856                        .move_state_tables_to_dedicated_compaction_group(
857                            group.group_id,
858                            &table_ids,
859                            None,
860                        )
861                        .await;
862                    match ret {
863                        Ok(split_result) => {
864                            tracing::info!(
865                                "split_huge_compaction_group success {:?}",
866                                split_result
867                            );
868                            self.metrics
869                                .split_compaction_group_count
870                                .with_label_values(&[&group.group_id.to_string()])
871                                .inc();
872                            return;
873                        }
874                        Err(e) => {
875                            tracing::error!(
876                                error = %e.as_report(),
877                                "failed to split_huge_compaction_group table {:?} from group-{}",
878                                table_ids,
879                                group.group_id
880                            );
881
882                            return;
883                        }
884                    }
885                }
886            }
887        }
888    }
889
890    pub async fn try_merge_compaction_group(
891        &self,
892        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
893        group: &CompactionGroupStatistic,
894        next_group: &CompactionGroupStatistic,
895        created_tables: &HashSet<TableId>,
896    ) -> Result<()> {
897        GroupMergeValidator::validate_group_merge(
898            group,
899            next_group,
900            created_tables,
901            table_write_throughput_statistic_manager,
902            &self.env.opts,
903            &self.versioning,
904        )
905        .await?;
906
907        let result = self
908            .merge_compaction_group(group.group_id, next_group.group_id)
909            .await;
910
911        match &result {
912            Ok(()) => {
913                tracing::info!(
914                    "merge group-{} to group-{}",
915                    next_group.group_id,
916                    group.group_id,
917                );
918
919                self.metrics
920                    .merge_compaction_group_count
921                    .with_label_values(&[&group.group_id.to_string()])
922                    .inc();
923            }
924            Err(e) => {
925                tracing::info!(
926                    error = %e.as_report(),
927                    "failed to merge group-{} group-{}",
928                    next_group.group_id,
929                    group.group_id,
930                );
931            }
932        }
933
934        result
935    }
936}
937
938#[derive(Debug, Default)]
939struct GroupMergeValidator {}
940
941impl GroupMergeValidator {
942    /// Check if two groups have compatible compaction configs for merging.
943    /// Ignores `split_weight_by_vnode` since it's per-table and will be reset after merge.
944    fn is_merge_compatible_by_semantics(
945        group: &CompactionGroupStatistic,
946        next_group: &CompactionGroupStatistic,
947    ) -> bool {
948        let (mut left, mut right) = (
949            group
950                .compaction_group_config
951                .compaction_config
952                .as_ref()
953                .clone(),
954            next_group
955                .compaction_group_config
956                .compaction_config
957                .as_ref()
958                .clone(),
959        );
960        left.split_weight_by_vnode = 0;
961        right.split_weight_by_vnode = 0;
962        left == right
963    }
964
965    /// Check if the table is high write throughput with the given threshold and ratio.
966    pub fn is_table_high_write_throughput(
967        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
968        threshold: u64,
969        high_write_throughput_ratio: f64,
970    ) -> bool {
971        let mut sample_size = 0;
972        let mut high_write_throughput_count = 0;
973        for statistic in table_throughput {
974            sample_size += 1;
975            if statistic.throughput > threshold {
976                high_write_throughput_count += 1;
977            }
978        }
979
980        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
981    }
982
983    pub fn is_table_low_write_throughput(
984        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
985        threshold: u64,
986        low_write_throughput_ratio: f64,
987    ) -> bool {
988        let mut sample_size = 0;
989        let mut low_write_throughput_count = 0;
990        for statistic in table_throughput {
991            sample_size += 1;
992            if statistic.throughput <= threshold {
993                low_write_throughput_count += 1;
994            }
995        }
996
997        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
998    }
999
1000    fn check_is_low_write_throughput_compaction_group(
1001        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1002        group: &CompactionGroupStatistic,
1003        opts: &Arc<MetaOpts>,
1004    ) -> bool {
1005        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
1006        for table_id in group.table_statistic.keys() {
1007            let mut table_throughput = table_write_throughput_statistic_manager
1008                .get_table_throughput_descending(
1009                    *table_id,
1010                    opts.table_stat_throuput_window_seconds_for_merge as i64,
1011                )
1012                .peekable();
1013            if table_throughput.peek().is_none() {
1014                continue;
1015            }
1016
1017            table_with_statistic.push(table_throughput);
1018        }
1019
1020        // if all tables in the group do not have enough statistics, return true
1021        if table_with_statistic.is_empty() {
1022            return true;
1023        }
1024
1025        // check if all tables in the group are low write throughput with enough statistics
1026        table_with_statistic.into_iter().all(|table_throughput| {
1027            Self::is_table_low_write_throughput(
1028                table_throughput,
1029                opts.table_low_write_throughput_threshold,
1030                opts.table_stat_low_write_throughput_ratio_for_merge,
1031            )
1032        })
1033    }
1034
1035    fn check_is_creating_compaction_group(
1036        group: &CompactionGroupStatistic,
1037        created_tables: &HashSet<TableId>,
1038    ) -> bool {
1039        group
1040            .table_statistic
1041            .keys()
1042            .any(|table_id| !created_tables.contains(table_id))
1043    }
1044
1045    async fn validate_group_merge(
1046        group: &CompactionGroupStatistic,
1047        next_group: &CompactionGroupStatistic,
1048        created_tables: &HashSet<TableId>,
1049        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1050        opts: &Arc<MetaOpts>,
1051        versioning: &MonitoredRwLock<Versioning>,
1052    ) -> Result<()> {
1053        // TODO: remove this check after refactor group id
1054        if (group.group_id == StaticCompactionGroupId::StateDefault
1055            && next_group.group_id == StaticCompactionGroupId::MaterializedView)
1056            || (group.group_id == StaticCompactionGroupId::MaterializedView
1057                && next_group.group_id == StaticCompactionGroupId::StateDefault)
1058        {
1059            return Err(Error::CompactionGroup(format!(
1060                "group-{} and group-{} are both StaticCompactionGroupId",
1061                group.group_id, next_group.group_id
1062            )));
1063        }
1064
1065        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1066            return Err(Error::CompactionGroup(format!(
1067                "group-{} or group-{} is empty",
1068                group.group_id, next_group.group_id
1069            )));
1070        }
1071
1072        // Check non-overlapping table ids early to avoid acquiring heavyweight write locks
1073        // in merge_compaction_group_impl only to fail at the overlap check.
1074        // Sort both sides and ensure table_ids_1 has the smaller first element,
1075        // then reject if table_ids_1's last element >= table_ids_2's first element (overlap).
1076        {
1077            let mut table_ids_1: Vec<TableId> = group.table_statistic.keys().cloned().collect_vec();
1078            let mut table_ids_2: Vec<TableId> =
1079                next_group.table_statistic.keys().cloned().collect_vec();
1080            table_ids_1.sort();
1081            table_ids_2.sort();
1082            if table_ids_1.first().unwrap() > table_ids_2.first().unwrap() {
1083                std::mem::swap(&mut table_ids_1, &mut table_ids_2);
1084            }
1085            if table_ids_1.last().unwrap() >= table_ids_2.first().unwrap() {
1086                return Err(Error::CompactionGroup(format!(
1087                    "group-{} and group-{} have overlapping table id ranges, not mergeable",
1088                    group.group_id, next_group.group_id
1089                )));
1090            }
1091        }
1092
1093        if group
1094            .compaction_group_config
1095            .compaction_config
1096            .disable_auto_group_scheduling
1097            .unwrap_or(false)
1098            || next_group
1099                .compaction_group_config
1100                .compaction_config
1101                .disable_auto_group_scheduling
1102                .unwrap_or(false)
1103        {
1104            return Err(Error::CompactionGroup(format!(
1105                "group-{} or group-{} disable_auto_group_scheduling",
1106                group.group_id, next_group.group_id
1107            )));
1108        }
1109
1110        // Keep merge compatibility as a feature, but ignore split_weight_by_vnode, because it is
1111        // only used for per-table split behavior and will be reset after merge.
1112        if !Self::is_merge_compatible_by_semantics(group, next_group) {
1113            let left_config = group.compaction_group_config.compaction_config.as_ref();
1114            let right_config = next_group
1115                .compaction_group_config
1116                .compaction_config
1117                .as_ref();
1118
1119            tracing::warn!(
1120                group_id = %group.group_id,
1121                next_group_id = %next_group.group_id,
1122                left_config = ?left_config,
1123                right_config = ?right_config,
1124                "compaction config semantic mismatch detected while merging compaction groups"
1125            );
1126
1127            return Err(Error::CompactionGroup(format!(
1128                "Cannot merge group {} and next_group {} with different compaction config (split_weight_by_vnode is excluded from comparison). left_config: {:?}, right_config: {:?}",
1129                group.group_id, next_group.group_id, left_config, right_config
1130            )));
1131        }
1132
1133        // do not merge the compaction group which is creating
1134        if Self::check_is_creating_compaction_group(group, created_tables) {
1135            return Err(Error::CompactionGroup(format!(
1136                "Cannot merge creating group {} next_group {}",
1137                group.group_id, next_group.group_id
1138            )));
1139        }
1140
1141        // do not merge high throughput group
1142        if !Self::check_is_low_write_throughput_compaction_group(
1143            table_write_throughput_statistic_manager,
1144            group,
1145            opts,
1146        ) {
1147            return Err(Error::CompactionGroup(format!(
1148                "Cannot merge high throughput group {} next_group {}",
1149                group.group_id, next_group.group_id
1150            )));
1151        }
1152
1153        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1154            * opts.split_group_size_ratio) as u64;
1155
1156        if (group.group_size + next_group.group_size) > size_limit {
1157            return Err(Error::CompactionGroup(format!(
1158                "Cannot merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1159                group.group_id,
1160                group.group_size,
1161                next_group.group_id,
1162                next_group.group_size,
1163                size_limit
1164            )));
1165        }
1166
1167        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1168            return Err(Error::CompactionGroup(format!(
1169                "Cannot merge creating group {} next group {}",
1170                group.group_id, next_group.group_id
1171            )));
1172        }
1173
1174        if !Self::check_is_low_write_throughput_compaction_group(
1175            table_write_throughput_statistic_manager,
1176            next_group,
1177            opts,
1178        ) {
1179            return Err(Error::CompactionGroup(format!(
1180                "Cannot merge high throughput group {} next group {}",
1181                group.group_id, next_group.group_id
1182            )));
1183        }
1184
1185        {
1186            // Avoid merge when the group is in emergency state
1187            let versioning_guard = versioning.read().await;
1188            let levels = &versioning_guard.current_version.levels;
1189            if !levels.contains_key(&group.group_id) {
1190                return Err(Error::CompactionGroup(format!(
1191                    "Cannot merge group {} not exist",
1192                    group.group_id
1193                )));
1194            }
1195
1196            if !levels.contains_key(&next_group.group_id) {
1197                return Err(Error::CompactionGroup(format!(
1198                    "Cannot merge next group {} not exist",
1199                    next_group.group_id
1200                )));
1201            }
1202
1203            let group_levels = versioning_guard
1204                .current_version
1205                .get_compaction_group_levels(group.group_id);
1206
1207            let next_group_levels = versioning_guard
1208                .current_version
1209                .get_compaction_group_levels(next_group.group_id);
1210
1211            let group_state = GroupStateValidator::group_state(
1212                group_levels,
1213                group.compaction_group_config.compaction_config().deref(),
1214            );
1215
1216            if group_state.is_write_stop() || group_state.is_emergency() {
1217                return Err(Error::CompactionGroup(format!(
1218                    "Cannot merge write limit group {} next group {}",
1219                    group.group_id, next_group.group_id
1220                )));
1221            }
1222
1223            let next_group_state = GroupStateValidator::group_state(
1224                next_group_levels,
1225                next_group
1226                    .compaction_group_config
1227                    .compaction_config()
1228                    .deref(),
1229            );
1230
1231            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1232                return Err(Error::CompactionGroup(format!(
1233                    "Cannot merge write limit next group {} group {}",
1234                    next_group.group_id, group.group_id
1235                )));
1236            }
1237
1238            // check whether the group is in the write stop state after merge
1239            let l0_sub_level_count_after_merge =
1240                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1241            if GroupStateValidator::write_stop_l0_file_count(
1242                (l0_sub_level_count_after_merge as f64
1243                    * opts.compaction_group_merge_dimension_threshold) as usize,
1244                group.compaction_group_config.compaction_config().deref(),
1245            ) {
1246                return Err(Error::CompactionGroup(format!(
1247                    "Cannot merge write limit group {} next group {}, will trigger write stop after merge",
1248                    group.group_id, next_group.group_id
1249                )));
1250            }
1251
1252            let l0_file_count_after_merge =
1253                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1254            if GroupStateValidator::write_stop_l0_file_count(
1255                (l0_file_count_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1256                    as usize,
1257                group.compaction_group_config.compaction_config().deref(),
1258            ) {
1259                return Err(Error::CompactionGroup(format!(
1260                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1261                    next_group.group_id, group.group_id
1262                )));
1263            }
1264
1265            let l0_size_after_merge =
1266                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1267
1268            if GroupStateValidator::write_stop_l0_size(
1269                (l0_size_after_merge as f64 * opts.compaction_group_merge_dimension_threshold)
1270                    as u64,
1271                group.compaction_group_config.compaction_config().deref(),
1272            ) {
1273                return Err(Error::CompactionGroup(format!(
1274                    "Cannot merge write limit next group {} group {}, will trigger write stop after merge",
1275                    next_group.group_id, group.group_id
1276                )));
1277            }
1278
1279            // check whether the group is in the emergency state after merge
1280            if GroupStateValidator::emergency_l0_file_count(
1281                (l0_sub_level_count_after_merge as f64
1282                    * opts.compaction_group_merge_dimension_threshold) as usize,
1283                group.compaction_group_config.compaction_config().deref(),
1284            ) {
1285                return Err(Error::CompactionGroup(format!(
1286                    "Cannot merge emergency group {} next group {}, will trigger emergency after merge",
1287                    group.group_id, next_group.group_id
1288                )));
1289            }
1290        }
1291
1292        Ok(())
1293    }
1294}