risingwave_meta/hummock/manager/
timer_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::future::Either;
21use futures::stream::BoxStream;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use risingwave_hummock_sdk::CompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_pb::hummock::compact_task::{self, TaskStatus};
27use risingwave_pb::hummock::level_handler::RunningCompactTask;
28use rw_futures_util::select_all;
29use thiserror_ext::AsReport;
30use tokio::sync::oneshot::Sender;
31use tokio::task::JoinHandle;
32use tokio_stream::wrappers::IntervalStream;
33use tracing::warn;
34
35use crate::backup_restore::BackupManagerRef;
36use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
37use crate::hummock::{HummockManager, TASK_NORMAL};
38
39impl HummockManager {
40    pub fn hummock_timer_task(
41        hummock_manager: Arc<Self>,
42        backup_manager: Option<BackupManagerRef>,
43    ) -> (JoinHandle<()>, Sender<()>) {
44        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
45        let join_handle = tokio::spawn(async move {
46            const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
47            const STAT_REPORT_PERIOD_SEC: u64 = 20;
48            const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
49
50            pub enum HummockTimerEvent {
51                GroupScheduleSplit,
52                CheckDeadTask,
53                Report,
54                CompactionHeartBeatExpiredCheck,
55
56                DynamicCompactionTrigger,
57                SpaceReclaimCompactionTrigger,
58                TtlCompactionTrigger,
59                TombstoneCompactionTrigger,
60
61                FullGc,
62
63                GroupScheduleMerge,
64            }
65            let mut check_compact_trigger_interval =
66                tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
67            check_compact_trigger_interval
68                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
69            check_compact_trigger_interval.reset();
70
71            let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
72                .map(|_| HummockTimerEvent::CheckDeadTask);
73
74            let mut stat_report_interval =
75                tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
76            stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
77            stat_report_interval.reset();
78            let stat_report_trigger =
79                IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
80
81            let mut compaction_heartbeat_interval = tokio::time::interval(
82                std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
83            );
84            compaction_heartbeat_interval
85                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
86            compaction_heartbeat_interval.reset();
87            let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
88                .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
89
90            let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
91                hummock_manager.env.opts.periodic_compaction_interval_sec,
92            ));
93            min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94            min_trigger_interval.reset();
95            let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
96                .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
97
98            let mut min_space_reclaim_trigger_interval =
99                tokio::time::interval(Duration::from_secs(
100                    hummock_manager
101                        .env
102                        .opts
103                        .periodic_space_reclaim_compaction_interval_sec,
104                ));
105            min_space_reclaim_trigger_interval
106                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
107            min_space_reclaim_trigger_interval.reset();
108            let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
109                .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
110
111            let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
112                hummock_manager
113                    .env
114                    .opts
115                    .periodic_ttl_reclaim_compaction_interval_sec,
116            ));
117            min_ttl_reclaim_trigger_interval
118                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119            min_ttl_reclaim_trigger_interval.reset();
120            let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
121                .map(|_| HummockTimerEvent::TtlCompactionTrigger);
122
123            let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
124                hummock_manager.env.opts.full_gc_interval_sec,
125            ));
126            full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127            full_gc_interval.reset();
128            let full_gc_trigger =
129                IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
130
131            let mut tombstone_reclaim_trigger_interval =
132                tokio::time::interval(Duration::from_secs(
133                    hummock_manager
134                        .env
135                        .opts
136                        .periodic_tombstone_reclaim_compaction_interval_sec,
137                ));
138            tombstone_reclaim_trigger_interval
139                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
140            tombstone_reclaim_trigger_interval.reset();
141            let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
142                .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
143
144            let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
145                Box::pin(check_compact_trigger),
146                Box::pin(stat_report_trigger),
147                Box::pin(compaction_heartbeat_trigger),
148                Box::pin(dynamic_tick_trigger),
149                Box::pin(space_reclaim_trigger),
150                Box::pin(ttl_reclaim_trigger),
151                Box::pin(full_gc_trigger),
152                Box::pin(tombstone_reclaim_trigger),
153            ];
154
155            let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
156                .env
157                .opts
158                .periodic_scheduling_compaction_group_split_interval_sec;
159
160            if periodic_scheduling_compaction_group_split_interval_sec > 0 {
161                let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
162                    Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
163                );
164                scheduling_compaction_group_trigger_interval
165                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
166                scheduling_compaction_group_trigger_interval.reset();
167                let group_scheduling_split_trigger =
168                    IntervalStream::new(scheduling_compaction_group_trigger_interval)
169                        .map(|_| HummockTimerEvent::GroupScheduleSplit);
170                triggers.push(Box::pin(group_scheduling_split_trigger));
171            }
172
173            let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
174                .env
175                .opts
176                .periodic_scheduling_compaction_group_merge_interval_sec;
177
178            if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
179                let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
180                    Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
181                );
182                scheduling_compaction_group_merge_trigger_interval
183                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
184                scheduling_compaction_group_merge_trigger_interval.reset();
185                let group_scheduling_merge_trigger =
186                    IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
187                        .map(|_| HummockTimerEvent::GroupScheduleMerge);
188                triggers.push(Box::pin(group_scheduling_merge_trigger));
189            }
190
191            let event_stream = select_all(triggers);
192            use futures::pin_mut;
193            pin_mut!(event_stream);
194
195            let shutdown_rx_shared = shutdown_rx.shared();
196
197            tracing::info!(
198                "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
199                periodic_scheduling_compaction_group_split_interval_sec,
200                periodic_scheduling_compaction_group_merge_interval_sec,
201                CHECK_PENDING_TASK_PERIOD_SEC,
202                STAT_REPORT_PERIOD_SEC,
203                COMPACTION_HEARTBEAT_PERIOD_SEC
204            );
205
206            loop {
207                let item =
208                    futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
209
210                match item {
211                    Either::Left((event, _)) => {
212                        if let Some(event) = event {
213                            match event {
214                                HummockTimerEvent::CheckDeadTask => {
215                                    if hummock_manager.env.opts.compaction_deterministic_test {
216                                        continue;
217                                    }
218
219                                    hummock_manager.check_dead_task().await;
220                                }
221
222                                HummockTimerEvent::GroupScheduleSplit => {
223                                    if hummock_manager.env.opts.compaction_deterministic_test {
224                                        continue;
225                                    }
226
227                                    hummock_manager.on_handle_schedule_group_split().await;
228                                }
229
230                                HummockTimerEvent::GroupScheduleMerge => {
231                                    if hummock_manager.env.opts.compaction_deterministic_test {
232                                        continue;
233                                    }
234
235                                    hummock_manager.on_handle_schedule_group_merge().await;
236                                }
237
238                                HummockTimerEvent::Report => {
239                                    let (current_version, id_to_config, version_stats) = {
240                                        let versioning_guard =
241                                            hummock_manager.versioning.read().await;
242
243                                        let configs =
244                                            hummock_manager.get_compaction_group_map().await;
245                                        let versioning_deref = versioning_guard;
246                                        (
247                                            versioning_deref.current_version.clone(),
248                                            configs,
249                                            versioning_deref.version_stats.clone(),
250                                        )
251                                    };
252
253                                    if let Some(mv_id_to_all_table_ids) = hummock_manager
254                                        .metadata_manager
255                                        .get_job_id_to_internal_table_ids_mapping()
256                                        .await
257                                    {
258                                        trigger_mv_stat(
259                                            &hummock_manager.metrics,
260                                            &version_stats,
261                                            mv_id_to_all_table_ids,
262                                        );
263                                    }
264
265                                    for compaction_group_id in
266                                        get_compaction_group_ids(&current_version)
267                                    {
268                                        let compaction_group_config =
269                                            &id_to_config[&compaction_group_id];
270
271                                        let group_levels = current_version
272                                            .get_compaction_group_levels(
273                                                compaction_group_config.group_id(),
274                                            );
275
276                                        trigger_lsm_stat(
277                                            &hummock_manager.metrics,
278                                            compaction_group_config.compaction_config(),
279                                            group_levels,
280                                            compaction_group_config.group_id(),
281                                        )
282                                    }
283
284                                    {
285                                        let group_infos = hummock_manager
286                                            .calculate_compaction_group_statistic()
287                                            .await;
288                                        let compaction_group_count = group_infos.len();
289                                        hummock_manager
290                                            .metrics
291                                            .compaction_group_count
292                                            .set(compaction_group_count as i64);
293
294                                        let table_write_throughput_statistic_manager =
295                                            hummock_manager
296                                                .table_write_throughput_statistic_manager
297                                                .read()
298                                                .clone();
299
300                                        let current_version_levels = &hummock_manager
301                                            .versioning
302                                            .read()
303                                            .await
304                                            .current_version
305                                            .levels;
306
307                                        for group_info in group_infos {
308                                            hummock_manager
309                                                .metrics
310                                                .compaction_group_size
311                                                .with_label_values(&[&group_info
312                                                    .group_id
313                                                    .to_string()])
314                                                .set(group_info.group_size as _);
315                                            // accumulate the throughput of all tables in the group
316                                            let mut avg_throuput = 0;
317                                            let max_statistic_expired_time = std::cmp::max(
318                                                hummock_manager
319                                                    .env
320                                                    .opts
321                                                    .table_stat_throuput_window_seconds_for_split,
322                                                hummock_manager
323                                                    .env
324                                                    .opts
325                                                    .table_stat_throuput_window_seconds_for_merge,
326                                            );
327                                            for table_id in group_info.table_statistic.keys() {
328                                                avg_throuput +=
329                                                    table_write_throughput_statistic_manager
330                                                        .avg_write_throughput(
331                                                            *table_id,
332                                                            max_statistic_expired_time as i64,
333                                                        )
334                                                        as u64;
335                                            }
336
337                                            hummock_manager
338                                                .metrics
339                                                .compaction_group_throughput
340                                                .with_label_values(&[&group_info
341                                                    .group_id
342                                                    .to_string()])
343                                                .set(avg_throuput as _);
344
345                                            if let Some(group_levels) =
346                                                current_version_levels.get(&group_info.group_id)
347                                            {
348                                                let file_count = group_levels.count_ssts();
349                                                hummock_manager
350                                                    .metrics
351                                                    .compaction_group_file_count
352                                                    .with_label_values(&[&group_info
353                                                        .group_id
354                                                        .to_string()])
355                                                    .set(file_count as _);
356                                            }
357                                        }
358                                    }
359                                }
360
361                                HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
362                                    let compactor_manager =
363                                        hummock_manager.compactor_manager.clone();
364
365                                    // TODO: add metrics to track expired tasks
366                                    // The cancel task has two paths
367                                    // 1. compactor heartbeat cancels the expired task based on task
368                                    // progress (meta + compactor)
369                                    // 2. meta periodically scans the task and performs a cancel on
370                                    // the meta side for tasks that are not updated by heartbeat
371                                    let expired_tasks: Vec<u64> = compactor_manager
372                                        .get_heartbeat_expired_tasks()
373                                        .into_iter()
374                                        .map(|task| task.task_id)
375                                        .collect();
376                                    if !expired_tasks.is_empty() {
377                                        tracing::info!(
378                                            expired_tasks = ?expired_tasks,
379                                            "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
380                                        );
381                                        if let Err(e) = hummock_manager
382                                            .cancel_compact_tasks(
383                                                expired_tasks.clone(),
384                                                TaskStatus::HeartbeatCanceled,
385                                            )
386                                            .await
387                                        {
388                                            tracing::error!(
389                                                expired_tasks = ?expired_tasks,
390                                                error = %e.as_report(),
391                                                "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
392                                                until we can successfully report its status",
393                                            );
394                                        }
395                                    }
396                                }
397
398                                HummockTimerEvent::DynamicCompactionTrigger => {
399                                    // Disable periodic trigger for compaction_deterministic_test.
400                                    if hummock_manager.env.opts.compaction_deterministic_test {
401                                        continue;
402                                    }
403
404                                    hummock_manager
405                                        .on_handle_trigger_multi_group(
406                                            compact_task::TaskType::Dynamic,
407                                        )
408                                        .await;
409                                }
410
411                                HummockTimerEvent::SpaceReclaimCompactionTrigger => {
412                                    // Disable periodic trigger for compaction_deterministic_test.
413                                    if hummock_manager.env.opts.compaction_deterministic_test {
414                                        continue;
415                                    }
416
417                                    hummock_manager
418                                        .on_handle_trigger_multi_group(
419                                            compact_task::TaskType::SpaceReclaim,
420                                        )
421                                        .await;
422
423                                    // share the same trigger with SpaceReclaim
424                                    hummock_manager
425                                        .on_handle_trigger_multi_group(
426                                            compact_task::TaskType::VnodeWatermark,
427                                        )
428                                        .await;
429                                }
430
431                                HummockTimerEvent::TtlCompactionTrigger => {
432                                    // Disable periodic trigger for compaction_deterministic_test.
433                                    if hummock_manager.env.opts.compaction_deterministic_test {
434                                        continue;
435                                    }
436
437                                    hummock_manager
438                                        .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
439                                        .await;
440                                }
441
442                                HummockTimerEvent::TombstoneCompactionTrigger => {
443                                    // Disable periodic trigger for compaction_deterministic_test.
444                                    if hummock_manager.env.opts.compaction_deterministic_test {
445                                        continue;
446                                    }
447
448                                    hummock_manager
449                                        .on_handle_trigger_multi_group(
450                                            compact_task::TaskType::Tombstone,
451                                        )
452                                        .await;
453                                }
454
455                                HummockTimerEvent::FullGc => {
456                                    let retention_sec =
457                                        hummock_manager.env.opts.min_sst_retention_time_sec;
458                                    let backup_manager_2 = backup_manager.clone();
459                                    let hummock_manager_2 = hummock_manager.clone();
460                                    tokio::task::spawn(async move {
461                                        use thiserror_ext::AsReport;
462                                        let _ = hummock_manager_2
463                                            .start_full_gc(
464                                                Duration::from_secs(retention_sec),
465                                                None,
466                                                backup_manager_2,
467                                            )
468                                            .await
469                                            .inspect_err(|e| {
470                                                warn!(error = %e.as_report(), "Failed to start GC.")
471                                            });
472                                    });
473                                }
474                            }
475                        }
476                    }
477
478                    Either::Right((_, _shutdown)) => {
479                        tracing::info!("Hummock timer loop is stopped");
480                        break;
481                    }
482                }
483            }
484        });
485        (join_handle, shutdown_tx)
486    }
487}
488
489impl HummockManager {
490    async fn maybe_normalize_compaction_groups_before_merge(&self) {
491        if !self.env.opts.enable_compaction_group_normalize {
492            return;
493        }
494
495        match self
496            .normalize_overlapping_compaction_groups_with_limit(
497                self.env
498                    .opts
499                    .max_normalize_splits_per_round
500                    .try_into()
501                    .unwrap_or(usize::MAX),
502            )
503            .await
504        {
505            Ok(split_count) => {
506                if split_count > 0 {
507                    tracing::info!(
508                        "normalize compaction groups finished with {} split(s) before merge scheduling",
509                        split_count
510                    );
511                }
512            }
513            Err(e) => {
514                tracing::warn!(
515                    error = %e.as_report(),
516                    "failed to normalize compaction groups before merge scheduling"
517                );
518            }
519        }
520    }
521
522    async fn check_dead_task(&self) {
523        const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
524        const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
525        let (groups, configs) = {
526            let versioning_guard = self.versioning.read().await;
527            let g = versioning_guard
528                .current_version
529                .levels
530                .iter()
531                .map(|(id, group)| {
532                    (
533                        *id,
534                        group
535                            .l0
536                            .sub_levels
537                            .iter()
538                            .map(|level| level.total_file_size)
539                            .sum::<u64>(),
540                    )
541                })
542                .collect_vec();
543            let c = self.get_compaction_group_map().await;
544            (g, c)
545        };
546        let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
547        {
548            for (group_id, l0_file_size) in groups {
549                let group = &configs[&group_id];
550                if l0_file_size
551                    > MAX_COMPACTION_L0_MULTIPLIER
552                        * group.compaction_config.max_bytes_for_level_base
553                {
554                    slowdown_groups.insert(group_id, l0_file_size);
555                }
556            }
557        }
558        if slowdown_groups.is_empty() {
559            return;
560        }
561        let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
562            HashMap::default();
563        {
564            let compaction_guard = self.compaction.read().await;
565            for group_id in slowdown_groups.keys() {
566                if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
567                    for (idx, level_handler) in status.level_handlers.iter().enumerate() {
568                        let tasks = level_handler.pending_tasks().to_vec();
569                        if tasks.is_empty() {
570                            continue;
571                        }
572                        for task in tasks {
573                            pending_tasks.insert(task.task_id, (*group_id, idx, task));
574                        }
575                    }
576                }
577            }
578        }
579        let task_ids = pending_tasks.keys().cloned().collect_vec();
580        let task_infos = self
581            .compactor_manager
582            .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
583        for (task_id, (compact_time, status)) in task_infos {
584            if status == TASK_NORMAL {
585                continue;
586            }
587            if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
588                let group_size = *slowdown_groups.get(group_id).unwrap();
589                warn!(
590                    "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
591                    task_id,
592                    group_id,
593                    group_size / 1024 / 1024,
594                    *level_id,
595                    compact_time,
596                    status,
597                    task.ssts
598                );
599            }
600        }
601    }
602
603    /// Try to schedule a compaction `split` for the given compaction groups.
604    /// The `split` will be triggered if the following conditions are met:
605    /// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification.
606    /// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio`
607    async fn on_handle_schedule_group_split(&self) {
608        let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
609
610        let mut group_infos = self.calculate_compaction_group_statistic().await;
611        group_infos.sort_by_key(|group| Reverse(group.group_size));
612
613        for group in group_infos {
614            if group.table_statistic.len() == 1 {
615                // no need to handle the separate compaciton group
616                continue;
617            }
618
619            self.try_split_compaction_group(&table_write_throughput, group)
620                .await;
621        }
622    }
623
624    #[cfg(test)]
625    pub async fn schedule_group_split_for_test(&self) {
626        self.on_handle_schedule_group_split().await;
627    }
628
629    #[cfg(test)]
630    pub async fn schedule_group_merge_for_test(&self) {
631        self.on_handle_schedule_group_merge().await;
632    }
633
634    async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
635        for cg_id in self.compaction_group_ids().await {
636            self.compaction_state.try_sched_compaction(
637                cg_id,
638                task_type,
639                super::compaction::ScheduleTrigger::Periodic,
640            );
641        }
642    }
643
644    /// Try to schedule a compaction merge for the given compaction groups.
645    /// The merge will be triggered if the following conditions are met:
646    /// 1. The compaction group is not contains creating table.
647    /// 2. The compaction group is a small group.
648    /// 3. All tables in compaction group is in a low throughput state.
649    async fn on_handle_schedule_group_merge(&self) {
650        self.maybe_normalize_compaction_groups_before_merge().await;
651
652        let created_tables = match self.metadata_manager.get_created_table_ids().await {
653            Ok(created_tables) => HashSet::from_iter(created_tables),
654            Err(err) => {
655                tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
656                return;
657            }
658        };
659        let table_write_throughput_statistic_manager =
660            self.table_write_throughput_statistic_manager.read().clone();
661        let mut group_infos = self.calculate_compaction_group_statistic().await;
662        // sort by first table id for deterministic merge order
663        group_infos.sort_by_key(|group| group.table_statistic.keys().next().copied());
664
665        let group_count = group_infos.len();
666        if group_count < 2 {
667            return;
668        }
669
670        let mut base = 0;
671        let mut candidate = 1;
672
673        while candidate < group_count {
674            let group = &group_infos[base];
675            let next_group = &group_infos[candidate];
676            match self
677                .try_merge_compaction_group(
678                    &table_write_throughput_statistic_manager,
679                    group,
680                    next_group,
681                    &created_tables,
682                )
683                .await
684            {
685                Ok(_) => candidate += 1,
686                Err(e) => {
687                    tracing::debug!(
688                        error = %e.as_report(),
689                        "Failed to merge compaction group",
690                    );
691                    base = candidate;
692                    candidate = base + 1;
693                }
694            }
695        }
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use std::sync::Arc;
702    use std::time::Duration;
703
704    use itertools::Itertools;
705    use risingwave_common::catalog::TableId;
706    use risingwave_hummock_sdk::CompactionGroupId;
707    use risingwave_hummock_sdk::version::HummockVersion;
708    use risingwave_meta_model::WorkerId;
709    use risingwave_pb::common::worker_node::Property;
710    use risingwave_pb::common::{HostAddress, WorkerType};
711
712    use crate::controller::catalog::CatalogController;
713    use crate::controller::cluster::{ClusterController, ClusterControllerRef};
714    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
715    use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
716    use crate::manager::{MetaOpts, MetaSrvEnv};
717
718    async fn setup_compute_env_with_meta_opts(
719        port: i32,
720        opts: MetaOpts,
721    ) -> (
722        MetaSrvEnv,
723        HummockManagerRef,
724        ClusterControllerRef,
725        WorkerId,
726    ) {
727        let env = MetaSrvEnv::for_test_opts(opts, |_| ()).await;
728        let cluster_ctl = Arc::new(
729            ClusterController::new(env.clone(), Duration::from_secs(1))
730                .await
731                .unwrap(),
732        );
733        let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
734        let compactor_manager = Arc::new(CompactorManager::for_test());
735        let (compactor_streams_change_tx, _compactor_streams_change_rx) =
736            tokio::sync::mpsc::unbounded_channel();
737        let config = CompactionConfigBuilder::new()
738            .level0_tier_compact_file_number(1)
739            .level0_max_compact_file_number(130)
740            .level0_sub_level_compact_level_count(1)
741            .level0_overlapping_sub_level_compact_level_count(1)
742            .build();
743        let hummock_manager = HummockManager::with_config(
744            env.clone(),
745            cluster_ctl.clone(),
746            catalog_ctl,
747            Arc::new(Default::default()),
748            compactor_manager,
749            config,
750            compactor_streams_change_tx,
751        )
752        .await;
753
754        let worker_id = cluster_ctl
755            .add_worker(
756                WorkerType::ComputeNode,
757                HostAddress {
758                    host: "127.0.0.1".to_owned(),
759                    port,
760                },
761                Property {
762                    is_streaming: true,
763                    is_serving: true,
764                    parallelism: 4,
765                    ..Default::default()
766                },
767                Default::default(),
768            )
769            .await
770            .unwrap();
771        (env, hummock_manager, cluster_ctl, worker_id)
772    }
773
774    async fn get_compaction_group_id_by_table_id(
775        hummock_manager: HummockManagerRef,
776        table_id: u32,
777    ) -> CompactionGroupId {
778        hummock_manager
779            .get_current_version()
780            .await
781            .state_table_info
782            .info()
783            .get(&TableId::new(table_id))
784            .unwrap()
785            .compaction_group_id
786    }
787
788    fn member_table_ids(version: &HummockVersion, group_id: CompactionGroupId) -> Vec<u32> {
789        version
790            .state_table_info
791            .compaction_group_member_table_ids(group_id)
792            .iter()
793            .map(|table_id| table_id.as_raw_id())
794            .collect_vec()
795    }
796
797    fn assert_no_group_overlap(version: &HummockVersion) {
798        let mut ranges = version
799            .levels
800            .keys()
801            .filter_map(|group_id| {
802                let members = member_table_ids(version, *group_id);
803                (!members.is_empty()).then(|| (*members.first().unwrap(), *members.last().unwrap()))
804            })
805            .collect_vec();
806        ranges.sort_by_key(|(min_table_id, _)| *min_table_id);
807        assert!(ranges.windows(2).all(|window| window[0].1 < window[1].0));
808    }
809
810    #[tokio::test]
811    async fn test_merge_scheduling_normalizes_when_split_scheduling_is_disabled() {
812        let mut opts = MetaOpts::test(false);
813        opts.enable_compaction_group_normalize = true;
814        opts.periodic_scheduling_compaction_group_split_interval_sec = 0;
815
816        let (_env, hummock_manager, _, _worker_id) =
817            setup_compute_env_with_meta_opts(80, opts).await;
818        hummock_manager
819            .register_table_ids_for_test(&[(64, 2.into()), (80, 2.into())])
820            .await
821            .unwrap();
822        hummock_manager
823            .register_table_ids_for_test(&[(65, 3.into()), (81, 3.into()), (83, 3.into())])
824            .await
825            .unwrap();
826
827        let cg_64 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 64).await;
828        let cg_65 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 65).await;
829
830        hummock_manager.on_handle_schedule_group_merge().await;
831
832        let version = hummock_manager.get_current_version().await;
833        assert_eq!(member_table_ids(&version, cg_64), vec![64]);
834        assert_eq!(member_table_ids(&version, cg_65), vec![65]);
835        assert_no_group_overlap(&version);
836    }
837}