risingwave_meta/hummock/manager/compaction/
compaction_group_schedule.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::ops::{Deref, DerefMut};
17use std::sync::Arc;
18
19use bytes::Bytes;
20use itertools::Itertools;
21use risingwave_common::catalog::TableId;
22use risingwave_common::hash::VirtualNode;
23use risingwave_common::monitor::MonitoredRwLock;
24use risingwave_hummock_sdk::compact_task::{ReportTask, is_compaction_task_expired};
25use risingwave_hummock_sdk::compaction_group::{
26    StateTableId, StaticCompactionGroupId, group_split,
27};
28use risingwave_hummock_sdk::version::{GroupDelta, GroupDeltas};
29use risingwave_hummock_sdk::{CompactionGroupId, can_concat};
30use risingwave_pb::hummock::compact_task::TaskStatus;
31use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
32use risingwave_pb::hummock::{
33    CompatibilityVersion, PbGroupConstruct, PbGroupMerge, PbStateTableInfoDelta,
34};
35use thiserror_ext::AsReport;
36
37use super::{CompactionGroupStatistic, GroupStateValidator};
38use crate::hummock::error::{Error, Result};
39use crate::hummock::manager::transaction::HummockVersionTransaction;
40use crate::hummock::manager::versioning::Versioning;
41use crate::hummock::manager::{HummockManager, commit_multi_var};
42use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat;
43use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id};
44use crate::hummock::table_write_throughput_statistic::{
45    TableWriteThroughputStatistic, TableWriteThroughputStatisticManager,
46};
47use crate::manager::MetaOpts;
48
49impl HummockManager {
50    pub async fn merge_compaction_group(
51        &self,
52        group_1: CompactionGroupId,
53        group_2: CompactionGroupId,
54    ) -> Result<()> {
55        self.merge_compaction_group_impl(group_1, group_2, None)
56            .await
57    }
58
59    pub async fn merge_compaction_group_for_test(
60        &self,
61        group_1: CompactionGroupId,
62        group_2: CompactionGroupId,
63        created_tables: HashSet<u32>,
64    ) -> Result<()> {
65        self.merge_compaction_group_impl(group_1, group_2, Some(created_tables))
66            .await
67    }
68
69    pub async fn merge_compaction_group_impl(
70        &self,
71        group_1: CompactionGroupId,
72        group_2: CompactionGroupId,
73        created_tables: Option<HashSet<u32>>,
74    ) -> Result<()> {
75        let compaction_guard = self.compaction.write().await;
76        let mut versioning_guard = self.versioning.write().await;
77        let versioning = versioning_guard.deref_mut();
78        // Validate parameters.
79        if !versioning.current_version.levels.contains_key(&group_1) {
80            return Err(Error::CompactionGroup(format!("invalid group {}", group_1)));
81        }
82
83        if !versioning.current_version.levels.contains_key(&group_2) {
84            return Err(Error::CompactionGroup(format!("invalid group {}", group_2)));
85        }
86
87        let state_table_info = versioning.current_version.state_table_info.clone();
88        let mut member_table_ids_1 = state_table_info
89            .compaction_group_member_table_ids(group_1)
90            .iter()
91            .cloned()
92            .collect_vec();
93
94        if member_table_ids_1.is_empty() {
95            return Err(Error::CompactionGroup(format!(
96                "group_1 {} is empty",
97                group_1
98            )));
99        }
100
101        let mut member_table_ids_2 = state_table_info
102            .compaction_group_member_table_ids(group_2)
103            .iter()
104            .cloned()
105            .collect_vec();
106
107        if member_table_ids_2.is_empty() {
108            return Err(Error::CompactionGroup(format!(
109                "group_2 {} is empty",
110                group_2
111            )));
112        }
113
114        debug_assert!(!member_table_ids_1.is_empty());
115        debug_assert!(!member_table_ids_2.is_empty());
116        assert!(member_table_ids_1.is_sorted());
117        assert!(member_table_ids_2.is_sorted());
118
119        let created_tables = if let Some(created_tables) = created_tables {
120            // if the created_tables is provided, use it directly, most for test
121            assert!(cfg!(debug_assertions));
122            created_tables
123        } else {
124            match self.metadata_manager.get_created_table_ids().await {
125                Ok(created_tables) => HashSet::from_iter(created_tables),
126                Err(err) => {
127                    tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
128                    return Err(Error::CompactionGroup(format!(
129                        "merge group_1 {} group_2 {} failed to fetch created table ids",
130                        group_1, group_2
131                    )));
132                }
133            }
134        };
135
136        fn contains_creating_table(
137            table_ids: &Vec<TableId>,
138            created_tables: &HashSet<u32>,
139        ) -> bool {
140            table_ids
141                .iter()
142                .any(|table_id| !created_tables.contains(&table_id.table_id()))
143        }
144
145        // do not merge the compaction group which is creating
146        if contains_creating_table(&member_table_ids_1, &created_tables)
147            || contains_creating_table(&member_table_ids_2, &created_tables)
148        {
149            return Err(Error::CompactionGroup(format!(
150                "Not Merge creating group {} next_group {} member_table_ids_1 {:?} member_table_ids_2 {:?}",
151                group_1, group_2, member_table_ids_1, member_table_ids_2
152            )));
153        }
154
155        // Make sure `member_table_ids_1` is smaller than `member_table_ids_2`
156        let (left_group_id, right_group_id) =
157            if member_table_ids_1.first().unwrap() < member_table_ids_2.first().unwrap() {
158                (group_1, group_2)
159            } else {
160                std::mem::swap(&mut member_table_ids_1, &mut member_table_ids_2);
161                (group_2, group_1)
162            };
163
164        // We can only merge two groups with non-overlapping member table ids
165        if member_table_ids_1.last().unwrap() >= member_table_ids_2.first().unwrap() {
166            return Err(Error::CompactionGroup(format!(
167                "invalid merge group_1 {} group_2 {}",
168                left_group_id, right_group_id
169            )));
170        }
171
172        let combined_member_table_ids = member_table_ids_1
173            .iter()
174            .chain(member_table_ids_2.iter())
175            .collect_vec();
176        assert!(combined_member_table_ids.is_sorted());
177
178        // check duplicated sst_id
179        let mut sst_id_set = HashSet::new();
180        for sst_id in versioning
181            .current_version
182            .get_sst_ids_by_group_id(left_group_id)
183            .chain(
184                versioning
185                    .current_version
186                    .get_sst_ids_by_group_id(right_group_id),
187            )
188        {
189            if !sst_id_set.insert(sst_id) {
190                return Err(Error::CompactionGroup(format!(
191                    "invalid merge group_1 {} group_2 {} duplicated sst_id {}",
192                    left_group_id, right_group_id, sst_id
193                )));
194            }
195        }
196
197        // check branched sst on non-overlap level
198        {
199            let left_levels = versioning
200                .current_version
201                .get_compaction_group_levels(group_1);
202
203            let right_levels = versioning
204                .current_version
205                .get_compaction_group_levels(group_2);
206
207            // we can not check the l0 sub level, because the sub level id will be rewritten when merge
208            // This check will ensure that other non-overlapping level ssts can be concat and that the key_range is correct.
209            let max_level = std::cmp::max(left_levels.levels.len(), right_levels.levels.len());
210            for level_idx in 1..=max_level {
211                let left_level = left_levels.get_level(level_idx);
212                let right_level = right_levels.get_level(level_idx);
213                if left_level.table_infos.is_empty() || right_level.table_infos.is_empty() {
214                    continue;
215                }
216
217                let left_last_sst = left_level.table_infos.last().unwrap().clone();
218                let right_first_sst = right_level.table_infos.first().unwrap().clone();
219                let left_sst_id = left_last_sst.sst_id;
220                let right_sst_id = right_first_sst.sst_id;
221                let left_obj_id = left_last_sst.object_id;
222                let right_obj_id = right_first_sst.object_id;
223
224                // Since the sst key_range within a group is legal, we only need to check the ssts adjacent to the two groups.
225                if !can_concat(&[left_last_sst, right_first_sst]) {
226                    return Err(Error::CompactionGroup(format!(
227                        "invalid merge group_1 {} group_2 {} level_idx {} left_last_sst_id {} right_first_sst_id {} left_obj_id {} right_obj_id {}",
228                        left_group_id,
229                        right_group_id,
230                        level_idx,
231                        left_sst_id,
232                        right_sst_id,
233                        left_obj_id,
234                        right_obj_id
235                    )));
236                }
237            }
238        }
239
240        let mut version = HummockVersionTransaction::new(
241            &mut versioning.current_version,
242            &mut versioning.hummock_version_deltas,
243            self.env.notification_manager(),
244            None,
245            &self.metrics,
246        );
247        let mut new_version_delta = version.new_delta();
248
249        let target_compaction_group_id = {
250            // merge right_group_id to left_group_id and remove right_group_id
251            new_version_delta.group_deltas.insert(
252                left_group_id,
253                GroupDeltas {
254                    group_deltas: vec![GroupDelta::GroupMerge(PbGroupMerge {
255                        left_group_id,
256                        right_group_id,
257                    })],
258                },
259            );
260            left_group_id
261        };
262
263        // TODO: remove compaciton group_id from state_table_info
264        // rewrite compaction_group_id for all tables
265        new_version_delta.with_latest_version(|version, new_version_delta| {
266            for table_id in combined_member_table_ids {
267                let table_id = TableId::new(table_id.table_id());
268                let info = version
269                    .state_table_info
270                    .info()
271                    .get(&table_id)
272                    .expect("have check exist previously");
273                assert!(
274                    new_version_delta
275                        .state_table_info_delta
276                        .insert(
277                            table_id,
278                            PbStateTableInfoDelta {
279                                committed_epoch: info.committed_epoch,
280                                compaction_group_id: target_compaction_group_id,
281                            }
282                        )
283                        .is_none()
284                );
285            }
286        });
287
288        {
289            let mut compaction_group_manager = self.compaction_group_manager.write().await;
290            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
291
292            // for metrics reclaim
293            {
294                let right_group_max_level = new_version_delta
295                    .latest_version()
296                    .get_compaction_group_levels(right_group_id)
297                    .levels
298                    .len();
299
300                remove_compaction_group_in_sst_stat(
301                    &self.metrics,
302                    right_group_id,
303                    right_group_max_level,
304                );
305            }
306
307            // clear `partition_vnode_count` for the hybrid group
308            {
309                if let Err(err) = compaction_groups_txn.update_compaction_config(
310                    &[left_group_id],
311                    &[MutableConfig::SplitWeightByVnode(0)], // default
312                ) {
313                    tracing::error!(
314                        error = %err.as_report(),
315                        "failed to update compaction config for group-{}",
316                        left_group_id
317                    );
318                }
319            }
320
321            new_version_delta.pre_apply();
322
323            // remove right_group_id
324            compaction_groups_txn.remove(right_group_id);
325            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
326        }
327
328        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
329        versioning.mark_next_time_travel_version_snapshot();
330
331        // cancel tasks
332        let mut canceled_tasks = vec![];
333        // after merge, all tasks in right_group_id should be canceled
334        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
335        let compact_task_assignments =
336            compaction_guard.get_compact_task_assignments_by_group_id(right_group_id);
337        compact_task_assignments
338            .into_iter()
339            .for_each(|task_assignment| {
340                if let Some(task) = task_assignment.compact_task.as_ref() {
341                    assert_eq!(task.compaction_group_id, right_group_id);
342                    canceled_tasks.push(ReportTask {
343                        task_id: task.task_id,
344                        task_status: TaskStatus::ManualCanceled,
345                        table_stats_change: HashMap::default(),
346                        sorted_output_ssts: vec![],
347                        object_timestamps: HashMap::default(),
348                    });
349                }
350            });
351
352        if !canceled_tasks.is_empty() {
353            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
354                .await?;
355        }
356
357        self.metrics
358            .merge_compaction_group_count
359            .with_label_values(&[&left_group_id.to_string()])
360            .inc();
361
362        Ok(())
363    }
364}
365
366impl HummockManager {
367    /// Split `table_ids` to a dedicated compaction group.(will be split by the `table_id` and `vnode`.)
368    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
369    /// The split will follow the following rules
370    /// 1. ssts with `key_range.left` greater than `split_key` will be split to the right group
371    /// 2. the sst containing `split_key` will be split into two separate ssts and their `key_range` will be changed `sst_1`: [`sst.key_range.left`, `split_key`) `sst_2`: [`split_key`, `sst.key_range.right`]
372    /// 3. currently only `vnode` 0 and `vnode` max is supported. (Due to the above rule, vnode max will be rewritten as `table_id` + 1, `vnode` 0)
373    ///   - `parent_group_id`: the `group_id` to split
374    ///   - `split_table_ids`: the `table_ids` to split, now we still support to split multiple tables to one group at once, pass `split_table_ids` for per `split` operation for checking
375    ///   - `table_id_to_split`: the `table_id` to split
376    ///   - `vnode_to_split`: the `vnode` to split
377    ///   - `partition_vnode_count`: the partition count for the single table group if need
378    async fn split_compaction_group_impl(
379        &self,
380        parent_group_id: CompactionGroupId,
381        split_table_ids: &[StateTableId],
382        table_id_to_split: StateTableId,
383        vnode_to_split: VirtualNode,
384        partition_vnode_count: Option<u32>,
385    ) -> Result<Vec<(CompactionGroupId, Vec<StateTableId>)>> {
386        let mut result = vec![];
387        let compaction_guard = self.compaction.write().await;
388        let mut versioning_guard = self.versioning.write().await;
389        let versioning = versioning_guard.deref_mut();
390        // Validate parameters.
391        if !versioning
392            .current_version
393            .levels
394            .contains_key(&parent_group_id)
395        {
396            return Err(Error::CompactionGroup(format!(
397                "invalid group {}",
398                parent_group_id
399            )));
400        }
401
402        let member_table_ids = versioning
403            .current_version
404            .state_table_info
405            .compaction_group_member_table_ids(parent_group_id)
406            .iter()
407            .map(|table_id| table_id.table_id)
408            .collect::<BTreeSet<_>>();
409
410        if !member_table_ids.contains(&table_id_to_split) {
411            return Err(Error::CompactionGroup(format!(
412                "table {} doesn't in group {}",
413                table_id_to_split, parent_group_id
414            )));
415        }
416
417        let split_full_key = group_split::build_split_full_key(table_id_to_split, vnode_to_split);
418
419        // change to vec for partition
420        let table_ids = member_table_ids.into_iter().collect_vec();
421        if table_ids == split_table_ids {
422            return Err(Error::CompactionGroup(format!(
423                "invalid split attempt for group {}: all member tables are moved",
424                parent_group_id
425            )));
426        }
427        // avoid decode split_key when caller is aware of the table_id and vnode
428        let (table_ids_left, table_ids_right) =
429            group_split::split_table_ids_with_table_id_and_vnode(
430                &table_ids,
431                split_full_key.user_key.table_id.table_id(),
432                split_full_key.user_key.get_vnode_id(),
433            );
434        if table_ids_left.is_empty() || table_ids_right.is_empty() {
435            // not need to split group if all tables are in the same side
436            if !table_ids_left.is_empty() {
437                result.push((parent_group_id, table_ids_left));
438            }
439
440            if !table_ids_right.is_empty() {
441                result.push((parent_group_id, table_ids_right));
442            }
443            return Ok(result);
444        }
445
446        result.push((parent_group_id, table_ids_left));
447
448        let split_key: Bytes = split_full_key.encode().into();
449
450        let mut version = HummockVersionTransaction::new(
451            &mut versioning.current_version,
452            &mut versioning.hummock_version_deltas,
453            self.env.notification_manager(),
454            None,
455            &self.metrics,
456        );
457        let mut new_version_delta = version.new_delta();
458
459        let split_sst_count = new_version_delta
460            .latest_version()
461            .count_new_ssts_in_group_split(parent_group_id, split_key.clone());
462
463        let new_sst_start_id = next_sstable_object_id(&self.env, split_sst_count).await?;
464        let (new_compaction_group_id, config) = {
465            // All NewCompactionGroup pairs are mapped to one new compaction group.
466            let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
467            // The new config will be persisted later.
468            let config = self
469                .compaction_group_manager
470                .read()
471                .await
472                .default_compaction_config()
473                .as_ref()
474                .clone();
475
476            #[expect(deprecated)]
477            // fill the deprecated field with default value
478            new_version_delta.group_deltas.insert(
479                new_compaction_group_id,
480                GroupDeltas {
481                    group_deltas: vec![GroupDelta::GroupConstruct(Box::new(PbGroupConstruct {
482                        group_config: Some(config.clone()),
483                        group_id: new_compaction_group_id,
484                        parent_group_id,
485                        new_sst_start_id,
486                        table_ids: vec![],
487                        version: CompatibilityVersion::LATEST as _, // for compatibility
488                        split_key: Some(split_key.into()),
489                    }))],
490                },
491            );
492            (new_compaction_group_id, config)
493        };
494
495        new_version_delta.with_latest_version(|version, new_version_delta| {
496            for table_id in &table_ids_right {
497                let table_id = TableId::new(*table_id);
498                let info = version
499                    .state_table_info
500                    .info()
501                    .get(&table_id)
502                    .expect("have check exist previously");
503                assert!(
504                    new_version_delta
505                        .state_table_info_delta
506                        .insert(
507                            table_id,
508                            PbStateTableInfoDelta {
509                                committed_epoch: info.committed_epoch,
510                                compaction_group_id: new_compaction_group_id,
511                            }
512                        )
513                        .is_none()
514                );
515            }
516        });
517
518        result.push((new_compaction_group_id, table_ids_right));
519
520        {
521            let mut compaction_group_manager = self.compaction_group_manager.write().await;
522            let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn();
523            compaction_groups_txn
524                .create_compaction_groups(new_compaction_group_id, Arc::new(config));
525
526            // check if need to update the compaction config for the single table group and guarantee the operation atomicity
527            // `partition_vnode_count` only works inside a table, to avoid a lot of slicing sst, we only enable it in groups with high throughput and only one table.
528            // The target `table_ids` might be split to an existing group, so we need to try to update its config
529            for (cg_id, table_ids) in &result {
530                // check the split_tables had been place to the dedicated compaction group
531                if let Some(partition_vnode_count) = partition_vnode_count
532                    && table_ids.len() == 1
533                    && table_ids == split_table_ids
534                {
535                    if let Err(err) = compaction_groups_txn.update_compaction_config(
536                        &[*cg_id],
537                        &[MutableConfig::SplitWeightByVnode(partition_vnode_count)],
538                    ) {
539                        tracing::error!(
540                            error = %err.as_report(),
541                            "failed to update compaction config for group-{}",
542                            cg_id
543                        );
544                    }
545                }
546            }
547
548            new_version_delta.pre_apply();
549            commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?;
550        }
551        // Instead of handling DeltaType::GroupConstruct for time travel, simply enforce a version snapshot.
552        versioning.mark_next_time_travel_version_snapshot();
553
554        // The expired compact tasks will be canceled.
555        // Failure of cancel does not cause correctness problems, the report task will have better interception, and the operation here is designed to free up compactor compute resources more quickly.
556        let mut canceled_tasks = vec![];
557        let compact_task_assignments =
558            compaction_guard.get_compact_task_assignments_by_group_id(parent_group_id);
559        let levels = versioning
560            .current_version
561            .get_compaction_group_levels(parent_group_id);
562        compact_task_assignments
563            .into_iter()
564            .for_each(|task_assignment| {
565                if let Some(task) = task_assignment.compact_task.as_ref() {
566                    let is_expired = is_compaction_task_expired(
567                        task.compaction_group_version_id,
568                        levels.compaction_group_version_id,
569                    );
570                    if is_expired {
571                        canceled_tasks.push(ReportTask {
572                            task_id: task.task_id,
573                            task_status: TaskStatus::ManualCanceled,
574                            table_stats_change: HashMap::default(),
575                            sorted_output_ssts: vec![],
576                            object_timestamps: HashMap::default(),
577                        });
578                    }
579                }
580            });
581
582        if !canceled_tasks.is_empty() {
583            self.report_compact_tasks_impl(canceled_tasks, compaction_guard, versioning_guard)
584                .await?;
585        }
586
587        self.metrics
588            .split_compaction_group_count
589            .with_label_values(&[&parent_group_id.to_string()])
590            .inc();
591
592        Ok(result)
593    }
594
595    /// Split `table_ids` to a dedicated compaction group.
596    /// Returns the compaction group id containing the `table_ids` and the mapping of compaction group id to table ids.
597    pub async fn move_state_tables_to_dedicated_compaction_group(
598        &self,
599        parent_group_id: CompactionGroupId,
600        table_ids: &[StateTableId],
601        partition_vnode_count: Option<u32>,
602    ) -> Result<(
603        CompactionGroupId,
604        BTreeMap<CompactionGroupId, Vec<StateTableId>>,
605    )> {
606        if table_ids.is_empty() {
607            return Err(Error::CompactionGroup(
608                "table_ids must not be empty".to_owned(),
609            ));
610        }
611
612        if !table_ids.is_sorted() {
613            return Err(Error::CompactionGroup(
614                "table_ids must be sorted".to_owned(),
615            ));
616        }
617
618        let parent_table_ids = {
619            let versioning_guard = self.versioning.read().await;
620            versioning_guard
621                .current_version
622                .state_table_info
623                .compaction_group_member_table_ids(parent_group_id)
624                .iter()
625                .map(|table_id| table_id.table_id)
626                .collect_vec()
627        };
628
629        if parent_table_ids == table_ids {
630            return Err(Error::CompactionGroup(format!(
631                "invalid split attempt for group {}: all member tables are moved",
632                parent_group_id
633            )));
634        }
635
636        fn check_table_ids_valid(cg_id_to_table_ids: &BTreeMap<u64, Vec<u32>>) {
637            // 1. table_ids in different cg are sorted.
638            {
639                cg_id_to_table_ids
640                    .iter()
641                    .for_each(|(_cg_id, table_ids)| assert!(table_ids.is_sorted()));
642            }
643
644            // 2.table_ids in different cg are non-overlapping
645            {
646                let mut table_table_ids_vec = cg_id_to_table_ids.values().cloned().collect_vec();
647                table_table_ids_vec.sort_by(|a, b| a[0].cmp(&b[0]));
648                assert!(table_table_ids_vec.concat().is_sorted());
649            }
650
651            // 3.table_ids belong to one and only one cg.
652            {
653                let mut all_table_ids = HashSet::new();
654                for table_ids in cg_id_to_table_ids.values() {
655                    for table_id in table_ids {
656                        assert!(all_table_ids.insert(*table_id));
657                    }
658                }
659            }
660        }
661
662        // move [3,4,5,6]
663        // [1,2,3,4,5,6,7,8,9,10] -> [1,2] [3,4,5,6] [7,8,9,10]
664        // split key
665        // 1. table_id = 3, vnode = 0, epoch = MAX
666        // 2. table_id = 7, vnode = 0, epoch = MAX
667
668        // The new compaction group id is always generate on the right side
669        // Hence, we return the first compaction group id as the result
670        // split 1
671        let mut cg_id_to_table_ids: BTreeMap<u64, Vec<u32>> = BTreeMap::new();
672        let table_id_to_split = *table_ids.first().unwrap();
673        let mut target_compaction_group_id = 0;
674        let result_vec = self
675            .split_compaction_group_impl(
676                parent_group_id,
677                table_ids,
678                table_id_to_split,
679                VirtualNode::ZERO,
680                partition_vnode_count,
681            )
682            .await?;
683        assert!(result_vec.len() <= 2);
684
685        let mut finish_move = false;
686        for (cg_id, table_ids_after_split) in result_vec {
687            if table_ids_after_split.contains(&table_id_to_split) {
688                target_compaction_group_id = cg_id;
689            }
690
691            if table_ids_after_split == table_ids {
692                finish_move = true;
693            }
694
695            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
696        }
697        check_table_ids_valid(&cg_id_to_table_ids);
698
699        if finish_move {
700            return Ok((target_compaction_group_id, cg_id_to_table_ids));
701        }
702
703        // split 2
704        // See the example above and the split rule in `split_compaction_group_impl`.
705        let table_id_to_split = *table_ids.last().unwrap();
706        let result_vec = self
707            .split_compaction_group_impl(
708                target_compaction_group_id,
709                table_ids,
710                table_id_to_split,
711                VirtualNode::MAX_REPRESENTABLE,
712                partition_vnode_count,
713            )
714            .await?;
715        assert!(result_vec.len() <= 2);
716        for (cg_id, table_ids_after_split) in result_vec {
717            if table_ids_after_split.contains(&table_id_to_split) {
718                target_compaction_group_id = cg_id;
719            }
720            cg_id_to_table_ids.insert(cg_id, table_ids_after_split);
721        }
722        check_table_ids_valid(&cg_id_to_table_ids);
723
724        Ok((target_compaction_group_id, cg_id_to_table_ids))
725    }
726}
727
728impl HummockManager {
729    /// Split the compaction group if the group is too large or contains high throughput tables.
730    pub async fn try_split_compaction_group(
731        &self,
732        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
733        group: CompactionGroupStatistic,
734    ) {
735        if group
736            .compaction_group_config
737            .compaction_config
738            .disable_auto_group_scheduling
739            .unwrap_or(false)
740        {
741            return;
742        }
743        // split high throughput table to dedicated compaction group
744        for (table_id, table_size) in &group.table_statistic {
745            self.try_move_high_throughput_table_to_dedicated_cg(
746                table_write_throughput_statistic_manager,
747                *table_id,
748                table_size,
749                group.group_id,
750            )
751            .await;
752        }
753
754        // split the huge group to multiple groups
755        self.try_split_huge_compaction_group(group).await;
756    }
757
758    /// Try to move the high throughput table to a dedicated compaction group.
759    pub async fn try_move_high_throughput_table_to_dedicated_cg(
760        &self,
761        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
762        table_id: u32,
763        _table_size: &u64,
764        parent_group_id: u64,
765    ) {
766        let mut table_throughput = table_write_throughput_statistic_manager
767            .get_table_throughput_descending(
768                table_id,
769                self.env.opts.table_stat_throuput_window_seconds_for_split as i64,
770            )
771            .peekable();
772
773        if table_throughput.peek().is_none() {
774            return;
775        }
776
777        let is_high_write_throughput = GroupMergeValidator::is_table_high_write_throughput(
778            table_throughput,
779            self.env.opts.table_high_write_throughput_threshold,
780            self.env
781                .opts
782                .table_stat_high_write_throughput_ratio_for_split,
783        );
784
785        // do not split a table to dedicated compaction group if it is not high write throughput
786        if !is_high_write_throughput {
787            return;
788        }
789
790        let ret = self
791            .move_state_tables_to_dedicated_compaction_group(
792                parent_group_id,
793                &[table_id],
794                Some(self.env.opts.partition_vnode_count),
795            )
796            .await;
797        match ret {
798            Ok(split_result) => {
799                tracing::info!(
800                    "split state table [{}] from group-{} success table_vnode_partition_count {:?} split result {:?}",
801                    table_id,
802                    parent_group_id,
803                    self.env.opts.partition_vnode_count,
804                    split_result
805                );
806            }
807            Err(e) => {
808                tracing::info!(
809                    error = %e.as_report(),
810                    "failed to split state table [{}] from group-{}",
811                    table_id,
812                    parent_group_id,
813                )
814            }
815        }
816    }
817
818    pub async fn try_split_huge_compaction_group(&self, group: CompactionGroupStatistic) {
819        let group_max_size = (group.compaction_group_config.max_estimated_group_size() as f64
820            * self.env.opts.split_group_size_ratio) as u64;
821        let is_huge_hybrid_group =
822            group.group_size > group_max_size && group.table_statistic.len() > 1; // avoid split single table group
823        if is_huge_hybrid_group {
824            let mut accumulated_size = 0;
825            let mut table_ids = Vec::default();
826            for (table_id, table_size) in &group.table_statistic {
827                accumulated_size += table_size;
828                table_ids.push(*table_id);
829                // split if the accumulated size is greater than half of the group size
830                // avoid split a small table to dedicated compaction group and trigger multiple merge
831                assert!(table_ids.is_sorted());
832                if accumulated_size * 2 > group_max_size {
833                    let ret = self
834                        .move_state_tables_to_dedicated_compaction_group(
835                            group.group_id,
836                            &table_ids,
837                            None,
838                        )
839                        .await;
840                    match ret {
841                        Ok(split_result) => {
842                            tracing::info!(
843                                "split_huge_compaction_group success {:?}",
844                                split_result
845                            );
846                            self.metrics
847                                .split_compaction_group_count
848                                .with_label_values(&[&group.group_id.to_string()])
849                                .inc();
850                            return;
851                        }
852                        Err(e) => {
853                            tracing::error!(
854                                error = %e.as_report(),
855                                "failed to split_huge_compaction_group table {:?} from group-{}",
856                                table_ids,
857                                group.group_id
858                            );
859
860                            return;
861                        }
862                    }
863                }
864            }
865        }
866    }
867
868    pub async fn try_merge_compaction_group(
869        &self,
870        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
871        group: &CompactionGroupStatistic,
872        next_group: &CompactionGroupStatistic,
873        created_tables: &HashSet<u32>,
874    ) -> Result<()> {
875        GroupMergeValidator::validate_group_merge(
876            group,
877            next_group,
878            created_tables,
879            table_write_throughput_statistic_manager,
880            &self.env.opts,
881            &self.versioning,
882        )
883        .await?;
884
885        match self
886            .merge_compaction_group(group.group_id, next_group.group_id)
887            .await
888        {
889            Ok(()) => {
890                tracing::info!(
891                    "merge group-{} to group-{}",
892                    next_group.group_id,
893                    group.group_id,
894                );
895
896                self.metrics
897                    .merge_compaction_group_count
898                    .with_label_values(&[&group.group_id.to_string()])
899                    .inc();
900            }
901            Err(e) => {
902                tracing::info!(
903                    error = %e.as_report(),
904                    "failed to merge group-{} group-{}",
905                    next_group.group_id,
906                    group.group_id,
907                )
908            }
909        }
910
911        Ok(())
912    }
913}
914
915#[derive(Debug, Default)]
916struct GroupMergeValidator {}
917
918impl GroupMergeValidator {
919    /// Check if the table is high write throughput with the given threshold and ratio.
920    pub fn is_table_high_write_throughput(
921        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
922        threshold: u64,
923        high_write_throughput_ratio: f64,
924    ) -> bool {
925        let mut sample_size = 0;
926        let mut high_write_throughput_count = 0;
927        for statistic in table_throughput {
928            sample_size += 1;
929            if statistic.throughput > threshold {
930                high_write_throughput_count += 1;
931            }
932        }
933
934        high_write_throughput_count as f64 > sample_size as f64 * high_write_throughput_ratio
935    }
936
937    pub fn is_table_low_write_throughput(
938        table_throughput: impl Iterator<Item = &TableWriteThroughputStatistic>,
939        threshold: u64,
940        low_write_throughput_ratio: f64,
941    ) -> bool {
942        let mut sample_size = 0;
943        let mut low_write_throughput_count = 0;
944        for statistic in table_throughput {
945            sample_size += 1;
946            if statistic.throughput <= threshold {
947                low_write_throughput_count += 1;
948            }
949        }
950
951        low_write_throughput_count as f64 > sample_size as f64 * low_write_throughput_ratio
952    }
953
954    fn check_is_low_write_throughput_compaction_group(
955        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
956        group: &CompactionGroupStatistic,
957        opts: &Arc<MetaOpts>,
958    ) -> bool {
959        let mut table_with_statistic = Vec::with_capacity(group.table_statistic.len());
960        for table_id in group.table_statistic.keys() {
961            let mut table_throughput = table_write_throughput_statistic_manager
962                .get_table_throughput_descending(
963                    *table_id,
964                    opts.table_stat_throuput_window_seconds_for_merge as i64,
965                )
966                .peekable();
967            if table_throughput.peek().is_none() {
968                continue;
969            }
970
971            table_with_statistic.push(table_throughput);
972        }
973
974        // if all tables in the group do not have enough statistics, return true
975        if table_with_statistic.is_empty() {
976            return true;
977        }
978
979        // check if all tables in the group are low write throughput with enough statistics
980        table_with_statistic.into_iter().all(|table_throughput| {
981            Self::is_table_low_write_throughput(
982                table_throughput,
983                opts.table_low_write_throughput_threshold,
984                opts.table_stat_low_write_throughput_ratio_for_merge,
985            )
986        })
987    }
988
989    fn check_is_creating_compaction_group(
990        group: &CompactionGroupStatistic,
991        created_tables: &HashSet<u32>,
992    ) -> bool {
993        group
994            .table_statistic
995            .keys()
996            .any(|table_id| !created_tables.contains(table_id))
997    }
998
999    async fn validate_group_merge(
1000        group: &CompactionGroupStatistic,
1001        next_group: &CompactionGroupStatistic,
1002        created_tables: &HashSet<u32>,
1003        table_write_throughput_statistic_manager: &TableWriteThroughputStatisticManager,
1004        opts: &Arc<MetaOpts>,
1005        versioning: &MonitoredRwLock<Versioning>,
1006    ) -> Result<()> {
1007        // TODO: remove this check after refactor group id
1008        if (group.group_id == StaticCompactionGroupId::StateDefault as u64
1009            && next_group.group_id == StaticCompactionGroupId::MaterializedView as u64)
1010            || (group.group_id == StaticCompactionGroupId::MaterializedView as u64
1011                && next_group.group_id == StaticCompactionGroupId::StateDefault as u64)
1012        {
1013            return Err(Error::CompactionGroup(format!(
1014                "group-{} and group-{} are both StaticCompactionGroupId",
1015                group.group_id, next_group.group_id
1016            )));
1017        }
1018
1019        if group.table_statistic.is_empty() || next_group.table_statistic.is_empty() {
1020            return Err(Error::CompactionGroup(format!(
1021                "group-{} or group-{} is empty",
1022                group.group_id, next_group.group_id
1023            )));
1024        }
1025
1026        if group
1027            .compaction_group_config
1028            .compaction_config
1029            .disable_auto_group_scheduling
1030            .unwrap_or(false)
1031            || next_group
1032                .compaction_group_config
1033                .compaction_config
1034                .disable_auto_group_scheduling
1035                .unwrap_or(false)
1036        {
1037            return Err(Error::CompactionGroup(format!(
1038                "group-{} or group-{} disable_auto_group_scheduling",
1039                group.group_id, next_group.group_id
1040            )));
1041        }
1042
1043        // do not merge the compaction group which is creating
1044        if Self::check_is_creating_compaction_group(group, created_tables) {
1045            return Err(Error::CompactionGroup(format!(
1046                "Not Merge creating group {} next_group {}",
1047                group.group_id, next_group.group_id
1048            )));
1049        }
1050
1051        // do not merge high throughput group
1052        if !Self::check_is_low_write_throughput_compaction_group(
1053            table_write_throughput_statistic_manager,
1054            group,
1055            opts,
1056        ) {
1057            return Err(Error::CompactionGroup(format!(
1058                "Not Merge high throughput group {} next_group {}",
1059                group.group_id, next_group.group_id
1060            )));
1061        }
1062
1063        let size_limit = (group.compaction_group_config.max_estimated_group_size() as f64
1064            * opts.split_group_size_ratio) as u64;
1065
1066        if (group.group_size + next_group.group_size) > size_limit {
1067            return Err(Error::CompactionGroup(format!(
1068                "Not Merge huge group {} group_size {} next_group {} next_group_size {} size_limit {}",
1069                group.group_id,
1070                group.group_size,
1071                next_group.group_id,
1072                next_group.group_size,
1073                size_limit
1074            )));
1075        }
1076
1077        if Self::check_is_creating_compaction_group(next_group, created_tables) {
1078            return Err(Error::CompactionGroup(format!(
1079                "Not Merge creating group {} next group {}",
1080                group.group_id, next_group.group_id
1081            )));
1082        }
1083
1084        if !Self::check_is_low_write_throughput_compaction_group(
1085            table_write_throughput_statistic_manager,
1086            next_group,
1087            opts,
1088        ) {
1089            return Err(Error::CompactionGroup(format!(
1090                "Not Merge high throughput group {} next group {}",
1091                group.group_id, next_group.group_id
1092            )));
1093        }
1094
1095        {
1096            // Avoid merge when the group is in emergency state
1097            let versioning_guard = versioning.read().await;
1098            let levels = &versioning_guard.current_version.levels;
1099            if !levels.contains_key(&group.group_id) {
1100                return Err(Error::CompactionGroup(format!(
1101                    "Not Merge group {} not exist",
1102                    group.group_id
1103                )));
1104            }
1105
1106            if !levels.contains_key(&next_group.group_id) {
1107                return Err(Error::CompactionGroup(format!(
1108                    "Not Merge next group {} not exist",
1109                    next_group.group_id
1110                )));
1111            }
1112
1113            let group_levels = versioning_guard
1114                .current_version
1115                .get_compaction_group_levels(group.group_id);
1116
1117            let next_group_levels = versioning_guard
1118                .current_version
1119                .get_compaction_group_levels(next_group.group_id);
1120
1121            let group_state = GroupStateValidator::group_state(
1122                group_levels,
1123                group.compaction_group_config.compaction_config().deref(),
1124            );
1125
1126            if group_state.is_write_stop() || group_state.is_emergency() {
1127                return Err(Error::CompactionGroup(format!(
1128                    "Not Merge write limit group {} next group {}",
1129                    group.group_id, next_group.group_id
1130                )));
1131            }
1132
1133            let next_group_state = GroupStateValidator::group_state(
1134                next_group_levels,
1135                next_group
1136                    .compaction_group_config
1137                    .compaction_config()
1138                    .deref(),
1139            );
1140
1141            if next_group_state.is_write_stop() || next_group_state.is_emergency() {
1142                return Err(Error::CompactionGroup(format!(
1143                    "Not Merge write limit next group {} group {}",
1144                    next_group.group_id, group.group_id
1145                )));
1146            }
1147
1148            // check whether the group is in the write stop state after merge
1149            let l0_sub_level_count_after_merge =
1150                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1151            if GroupStateValidator::write_stop_l0_file_count(
1152                l0_sub_level_count_after_merge,
1153                group.compaction_group_config.compaction_config().deref(),
1154            ) {
1155                return Err(Error::CompactionGroup(format!(
1156                    "Not Merge write limit group {} next group {}, will trigger write stop after merge",
1157                    group.group_id, next_group.group_id
1158                )));
1159            }
1160
1161            let l0_file_count_after_merge =
1162                group_levels.l0.sub_levels.len() + next_group_levels.l0.sub_levels.len();
1163            if GroupStateValidator::write_stop_l0_file_count(
1164                l0_file_count_after_merge,
1165                group.compaction_group_config.compaction_config().deref(),
1166            ) {
1167                return Err(Error::CompactionGroup(format!(
1168                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1169                    next_group.group_id, group.group_id
1170                )));
1171            }
1172
1173            let l0_size_after_merge =
1174                group_levels.l0.total_file_size + next_group_levels.l0.total_file_size;
1175
1176            if GroupStateValidator::write_stop_l0_size(
1177                l0_size_after_merge,
1178                group.compaction_group_config.compaction_config().deref(),
1179            ) {
1180                return Err(Error::CompactionGroup(format!(
1181                    "Not Merge write limit next group {} group {}, will trigger write stop after merge",
1182                    next_group.group_id, group.group_id
1183                )));
1184            }
1185
1186            // check whether the group is in the emergency state after merge
1187            if GroupStateValidator::emergency_l0_file_count(
1188                l0_sub_level_count_after_merge,
1189                group.compaction_group_config.compaction_config().deref(),
1190            ) {
1191                return Err(Error::CompactionGroup(format!(
1192                    "Not Merge emergency group {} next group {}, will trigger emergency after merge",
1193                    group.group_id, next_group.group_id
1194                )));
1195            }
1196        }
1197
1198        Ok(())
1199    }
1200}