risingwave_meta/hummock/manager/
timer_task.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::future::Either;
21use futures::stream::BoxStream;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use risingwave_hummock_sdk::CompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_pb::hummock::compact_task::{self, TaskStatus};
27use risingwave_pb::hummock::level_handler::RunningCompactTask;
28use rw_futures_util::select_all;
29use thiserror_ext::AsReport;
30use tokio::sync::oneshot::Sender;
31use tokio::task::JoinHandle;
32use tokio_stream::wrappers::IntervalStream;
33use tracing::warn;
34
35use crate::backup_restore::BackupManagerRef;
36use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
37use crate::hummock::{HummockManager, TASK_NORMAL};
38
39impl HummockManager {
40    pub fn hummock_timer_task(
41        hummock_manager: Arc<Self>,
42        backup_manager: Option<BackupManagerRef>,
43    ) -> (JoinHandle<()>, Sender<()>) {
44        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
45        let join_handle = tokio::spawn(async move {
46            const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
47            const STAT_REPORT_PERIOD_SEC: u64 = 20;
48            const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
49
50            pub enum HummockTimerEvent {
51                GroupScheduleSplit,
52                CheckDeadTask,
53                Report,
54                CompactionHeartBeatExpiredCheck,
55
56                DynamicCompactionTrigger,
57                SpaceReclaimCompactionTrigger,
58                TtlCompactionTrigger,
59                TombstoneCompactionTrigger,
60
61                FullGc,
62
63                GroupScheduleMerge,
64            }
65            let mut check_compact_trigger_interval =
66                tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
67            check_compact_trigger_interval
68                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
69            check_compact_trigger_interval.reset();
70
71            let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
72                .map(|_| HummockTimerEvent::CheckDeadTask);
73
74            let mut stat_report_interval =
75                tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
76            stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
77            stat_report_interval.reset();
78            let stat_report_trigger =
79                IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
80
81            let mut compaction_heartbeat_interval = tokio::time::interval(
82                std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
83            );
84            compaction_heartbeat_interval
85                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
86            compaction_heartbeat_interval.reset();
87            let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
88                .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
89
90            let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
91                hummock_manager.env.opts.periodic_compaction_interval_sec,
92            ));
93            min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94            min_trigger_interval.reset();
95            let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
96                .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
97
98            let mut min_space_reclaim_trigger_interval =
99                tokio::time::interval(Duration::from_secs(
100                    hummock_manager
101                        .env
102                        .opts
103                        .periodic_space_reclaim_compaction_interval_sec,
104                ));
105            min_space_reclaim_trigger_interval
106                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
107            min_space_reclaim_trigger_interval.reset();
108            let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
109                .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
110
111            let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
112                hummock_manager
113                    .env
114                    .opts
115                    .periodic_ttl_reclaim_compaction_interval_sec,
116            ));
117            min_ttl_reclaim_trigger_interval
118                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119            min_ttl_reclaim_trigger_interval.reset();
120            let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
121                .map(|_| HummockTimerEvent::TtlCompactionTrigger);
122
123            let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
124                hummock_manager.env.opts.full_gc_interval_sec,
125            ));
126            full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127            full_gc_interval.reset();
128            let full_gc_trigger =
129                IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
130
131            let mut tombstone_reclaim_trigger_interval =
132                tokio::time::interval(Duration::from_secs(
133                    hummock_manager
134                        .env
135                        .opts
136                        .periodic_tombstone_reclaim_compaction_interval_sec,
137                ));
138            tombstone_reclaim_trigger_interval
139                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
140            tombstone_reclaim_trigger_interval.reset();
141            let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
142                .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
143
144            let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
145                Box::pin(check_compact_trigger),
146                Box::pin(stat_report_trigger),
147                Box::pin(compaction_heartbeat_trigger),
148                Box::pin(dynamic_tick_trigger),
149                Box::pin(space_reclaim_trigger),
150                Box::pin(ttl_reclaim_trigger),
151                Box::pin(full_gc_trigger),
152                Box::pin(tombstone_reclaim_trigger),
153            ];
154
155            let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
156                .env
157                .opts
158                .periodic_scheduling_compaction_group_split_interval_sec;
159
160            if periodic_scheduling_compaction_group_split_interval_sec > 0 {
161                let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
162                    Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
163                );
164                scheduling_compaction_group_trigger_interval
165                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
166                scheduling_compaction_group_trigger_interval.reset();
167                let group_scheduling_split_trigger =
168                    IntervalStream::new(scheduling_compaction_group_trigger_interval)
169                        .map(|_| HummockTimerEvent::GroupScheduleSplit);
170                triggers.push(Box::pin(group_scheduling_split_trigger));
171            }
172
173            let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
174                .env
175                .opts
176                .periodic_scheduling_compaction_group_merge_interval_sec;
177
178            if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
179                let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
180                    Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
181                );
182                scheduling_compaction_group_merge_trigger_interval
183                    .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
184                scheduling_compaction_group_merge_trigger_interval.reset();
185                let group_scheduling_merge_trigger =
186                    IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
187                        .map(|_| HummockTimerEvent::GroupScheduleMerge);
188                triggers.push(Box::pin(group_scheduling_merge_trigger));
189            }
190
191            let event_stream = select_all(triggers);
192            use futures::pin_mut;
193            pin_mut!(event_stream);
194
195            let shutdown_rx_shared = shutdown_rx.shared();
196
197            tracing::info!(
198                "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
199                periodic_scheduling_compaction_group_split_interval_sec,
200                periodic_scheduling_compaction_group_merge_interval_sec,
201                CHECK_PENDING_TASK_PERIOD_SEC,
202                STAT_REPORT_PERIOD_SEC,
203                COMPACTION_HEARTBEAT_PERIOD_SEC
204            );
205
206            loop {
207                let item =
208                    futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
209
210                match item {
211                    Either::Left((event, _)) => {
212                        if let Some(event) = event {
213                            match event {
214                                HummockTimerEvent::CheckDeadTask => {
215                                    if hummock_manager.env.opts.compaction_deterministic_test {
216                                        continue;
217                                    }
218
219                                    hummock_manager.check_dead_task().await;
220                                }
221
222                                HummockTimerEvent::GroupScheduleSplit => {
223                                    if hummock_manager.env.opts.compaction_deterministic_test {
224                                        continue;
225                                    }
226
227                                    hummock_manager.on_handle_schedule_group_split().await;
228                                }
229
230                                HummockTimerEvent::GroupScheduleMerge => {
231                                    if hummock_manager.env.opts.compaction_deterministic_test {
232                                        continue;
233                                    }
234
235                                    hummock_manager.on_handle_schedule_group_merge().await;
236                                }
237
238                                HummockTimerEvent::Report => {
239                                    let (current_version, id_to_config, version_stats) = {
240                                        let versioning_guard =
241                                            hummock_manager.versioning.read().await;
242
243                                        let configs =
244                                            hummock_manager.get_compaction_group_map().await;
245                                        let versioning_deref = versioning_guard;
246                                        (
247                                            versioning_deref.current_version.clone(),
248                                            configs,
249                                            versioning_deref.version_stats.clone(),
250                                        )
251                                    };
252
253                                    if let Some(mv_id_to_all_table_ids) = hummock_manager
254                                        .metadata_manager
255                                        .get_job_id_to_internal_table_ids_mapping()
256                                        .await
257                                    {
258                                        trigger_mv_stat(
259                                            &hummock_manager.metrics,
260                                            &version_stats,
261                                            mv_id_to_all_table_ids,
262                                        );
263                                    }
264
265                                    for compaction_group_id in
266                                        get_compaction_group_ids(&current_version)
267                                    {
268                                        let compaction_group_config =
269                                            &id_to_config[&compaction_group_id];
270
271                                        let group_levels = current_version
272                                            .get_compaction_group_levels(
273                                                compaction_group_config.group_id(),
274                                            );
275
276                                        trigger_lsm_stat(
277                                            &hummock_manager.metrics,
278                                            compaction_group_config.compaction_config(),
279                                            group_levels,
280                                            compaction_group_config.group_id(),
281                                        )
282                                    }
283
284                                    {
285                                        let group_infos = hummock_manager
286                                            .calculate_compaction_group_statistic()
287                                            .await;
288                                        let compaction_group_count = group_infos.len();
289                                        hummock_manager
290                                            .metrics
291                                            .compaction_group_count
292                                            .set(compaction_group_count as i64);
293
294                                        let table_write_throughput_statistic_manager =
295                                            hummock_manager
296                                                .table_write_throughput_statistic_manager
297                                                .read()
298                                                .clone();
299
300                                        let current_version_levels = &hummock_manager
301                                            .versioning
302                                            .read()
303                                            .await
304                                            .current_version
305                                            .levels;
306
307                                        for group_info in group_infos {
308                                            hummock_manager
309                                                .metrics
310                                                .compaction_group_size
311                                                .with_label_values(&[&group_info
312                                                    .group_id
313                                                    .to_string()])
314                                                .set(group_info.group_size as _);
315                                            // accumulate the throughput of all tables in the group
316                                            let mut avg_throuput = 0;
317                                            let max_statistic_expired_time = std::cmp::max(
318                                                hummock_manager
319                                                    .env
320                                                    .opts
321                                                    .table_stat_throuput_window_seconds_for_split,
322                                                hummock_manager
323                                                    .env
324                                                    .opts
325                                                    .table_stat_throuput_window_seconds_for_merge,
326                                            );
327                                            for table_id in group_info.table_statistic.keys() {
328                                                avg_throuput +=
329                                                    table_write_throughput_statistic_manager
330                                                        .avg_write_throughput(
331                                                            *table_id,
332                                                            max_statistic_expired_time as i64,
333                                                        )
334                                                        as u64;
335                                            }
336
337                                            hummock_manager
338                                                .metrics
339                                                .compaction_group_throughput
340                                                .with_label_values(&[&group_info
341                                                    .group_id
342                                                    .to_string()])
343                                                .set(avg_throuput as _);
344
345                                            if let Some(group_levels) =
346                                                current_version_levels.get(&group_info.group_id)
347                                            {
348                                                let file_count = group_levels.count_ssts();
349                                                hummock_manager
350                                                    .metrics
351                                                    .compaction_group_file_count
352                                                    .with_label_values(&[&group_info
353                                                        .group_id
354                                                        .to_string()])
355                                                    .set(file_count as _);
356                                            }
357                                        }
358                                    }
359                                }
360
361                                HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
362                                    let compactor_manager =
363                                        hummock_manager.compactor_manager.clone();
364
365                                    // TODO: add metrics to track expired tasks
366                                    // The cancel task has two paths
367                                    // 1. compactor heartbeat cancels the expired task based on task
368                                    // progress (meta + compactor)
369                                    // 2. meta periodically scans the task and performs a cancel on
370                                    // the meta side for tasks that are not updated by heartbeat
371                                    let expired_tasks: Vec<u64> = compactor_manager
372                                        .get_heartbeat_expired_tasks()
373                                        .into_iter()
374                                        .map(|task| task.task_id)
375                                        .collect();
376                                    if !expired_tasks.is_empty() {
377                                        tracing::info!(
378                                            expired_tasks = ?expired_tasks,
379                                            "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
380                                        );
381                                        if let Err(e) = hummock_manager
382                                            .cancel_compact_tasks(
383                                                expired_tasks.clone(),
384                                                TaskStatus::HeartbeatCanceled,
385                                            )
386                                            .await
387                                        {
388                                            tracing::error!(
389                                                expired_tasks = ?expired_tasks,
390                                                error = %e.as_report(),
391                                                "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
392                                                until we can successfully report its status",
393                                            );
394                                        }
395                                    }
396                                }
397
398                                HummockTimerEvent::DynamicCompactionTrigger => {
399                                    // Disable periodic trigger for compaction_deterministic_test.
400                                    if hummock_manager.env.opts.compaction_deterministic_test {
401                                        continue;
402                                    }
403
404                                    hummock_manager
405                                        .on_handle_trigger_multi_group(
406                                            compact_task::TaskType::Dynamic,
407                                        )
408                                        .await;
409                                }
410
411                                HummockTimerEvent::SpaceReclaimCompactionTrigger => {
412                                    // Disable periodic trigger for compaction_deterministic_test.
413                                    if hummock_manager.env.opts.compaction_deterministic_test {
414                                        continue;
415                                    }
416
417                                    hummock_manager
418                                        .on_handle_trigger_multi_group(
419                                            compact_task::TaskType::SpaceReclaim,
420                                        )
421                                        .await;
422
423                                    // share the same trigger with SpaceReclaim
424                                    hummock_manager
425                                        .on_handle_trigger_multi_group(
426                                            compact_task::TaskType::VnodeWatermark,
427                                        )
428                                        .await;
429                                }
430
431                                HummockTimerEvent::TtlCompactionTrigger => {
432                                    // Disable periodic trigger for compaction_deterministic_test.
433                                    if hummock_manager.env.opts.compaction_deterministic_test {
434                                        continue;
435                                    }
436
437                                    hummock_manager
438                                        .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
439                                        .await;
440                                }
441
442                                HummockTimerEvent::TombstoneCompactionTrigger => {
443                                    // Disable periodic trigger for compaction_deterministic_test.
444                                    if hummock_manager.env.opts.compaction_deterministic_test {
445                                        continue;
446                                    }
447
448                                    hummock_manager
449                                        .on_handle_trigger_multi_group(
450                                            compact_task::TaskType::Tombstone,
451                                        )
452                                        .await;
453                                }
454
455                                HummockTimerEvent::FullGc => {
456                                    let retention_sec =
457                                        hummock_manager.env.opts.min_sst_retention_time_sec;
458                                    let backup_manager_2 = backup_manager.clone();
459                                    let hummock_manager_2 = hummock_manager.clone();
460                                    tokio::task::spawn(async move {
461                                        use thiserror_ext::AsReport;
462                                        let _ = hummock_manager_2
463                                            .start_full_gc(
464                                                Duration::from_secs(retention_sec),
465                                                None,
466                                                backup_manager_2,
467                                            )
468                                            .await
469                                            .inspect_err(|e| {
470                                                warn!(error = %e.as_report(), "Failed to start GC.")
471                                            });
472                                    });
473                                }
474                            }
475                        }
476                    }
477
478                    Either::Right((_, _shutdown)) => {
479                        tracing::info!("Hummock timer loop is stopped");
480                        break;
481                    }
482                }
483            }
484        });
485        (join_handle, shutdown_tx)
486    }
487}
488
489impl HummockManager {
490    async fn maybe_normalize_compaction_groups(&self, schedule_type: &'static str) {
491        if !self.env.opts.enable_compaction_group_normalize {
492            return;
493        }
494
495        match self
496            .normalize_overlapping_compaction_groups_with_limit(
497                self.env
498                    .opts
499                    .max_normalize_splits_per_round
500                    .try_into()
501                    .unwrap_or(usize::MAX),
502            )
503            .await
504        {
505            Ok(split_count) => {
506                if split_count > 0 {
507                    tracing::info!(
508                        "normalize compaction groups finished with {} split(s) before {} scheduling",
509                        split_count,
510                        schedule_type
511                    );
512                }
513            }
514            Err(e) => {
515                tracing::warn!(
516                    error = %e.as_report(),
517                    "failed to normalize compaction groups before {} scheduling",
518                    schedule_type
519                );
520            }
521        }
522    }
523
524    async fn check_dead_task(&self) {
525        const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
526        const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
527        let (groups, configs) = {
528            let versioning_guard = self.versioning.read().await;
529            let g = versioning_guard
530                .current_version
531                .levels
532                .iter()
533                .map(|(id, group)| {
534                    (
535                        *id,
536                        group
537                            .l0
538                            .sub_levels
539                            .iter()
540                            .map(|level| level.total_file_size)
541                            .sum::<u64>(),
542                    )
543                })
544                .collect_vec();
545            let c = self.get_compaction_group_map().await;
546            (g, c)
547        };
548        let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
549        {
550            for (group_id, l0_file_size) in groups {
551                let group = &configs[&group_id];
552                if l0_file_size
553                    > MAX_COMPACTION_L0_MULTIPLIER
554                        * group.compaction_config.max_bytes_for_level_base
555                {
556                    slowdown_groups.insert(group_id, l0_file_size);
557                }
558            }
559        }
560        if slowdown_groups.is_empty() {
561            return;
562        }
563        let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
564            HashMap::default();
565        {
566            let compaction_guard = self.compaction.read().await;
567            for group_id in slowdown_groups.keys() {
568                if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
569                    for (idx, level_handler) in status.level_handlers.iter().enumerate() {
570                        let tasks = level_handler.pending_tasks().to_vec();
571                        if tasks.is_empty() {
572                            continue;
573                        }
574                        for task in tasks {
575                            pending_tasks.insert(task.task_id, (*group_id, idx, task));
576                        }
577                    }
578                }
579            }
580        }
581        let task_ids = pending_tasks.keys().cloned().collect_vec();
582        let task_infos = self
583            .compactor_manager
584            .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
585        for (task_id, (compact_time, status)) in task_infos {
586            if status == TASK_NORMAL {
587                continue;
588            }
589            if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
590                let group_size = *slowdown_groups.get(group_id).unwrap();
591                warn!(
592                    "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
593                    task_id,
594                    group_id,
595                    group_size / 1024 / 1024,
596                    *level_id,
597                    compact_time,
598                    status,
599                    task.ssts
600                );
601            }
602        }
603    }
604
605    /// Try to schedule a compaction `split` for the given compaction groups.
606    /// The `split` will be triggered if the following conditions are met:
607    /// 1. `state table throughput`: If the table is in a high throughput state and it belongs to a multi table group, then an attempt will be made to split the table into separate compaction groups to increase its throughput and reduce the impact on write amplification.
608    /// 2. `group size`: If the group size has exceeded the set upper limit, e.g. `max_group_size` * `split_group_size_ratio`
609    async fn on_handle_schedule_group_split(&self) {
610        let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
611        self.maybe_normalize_compaction_groups("split").await;
612
613        let mut group_infos = self.calculate_compaction_group_statistic().await;
614        group_infos.sort_by_key(|group| Reverse(group.group_size));
615
616        for group in group_infos {
617            if group.table_statistic.len() == 1 {
618                // no need to handle the separate compaciton group
619                continue;
620            }
621
622            self.try_split_compaction_group(&table_write_throughput, group)
623                .await;
624        }
625    }
626
627    #[cfg(test)]
628    pub async fn schedule_group_split_for_test(&self) {
629        self.on_handle_schedule_group_split().await;
630    }
631
632    #[cfg(test)]
633    pub async fn schedule_group_merge_for_test(&self) {
634        self.on_handle_schedule_group_merge().await;
635    }
636
637    async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
638        for cg_id in self.compaction_group_ids().await {
639            self.compaction_state.try_sched_compaction(
640                cg_id,
641                task_type,
642                super::compaction::ScheduleTrigger::Periodic,
643            );
644        }
645    }
646
647    /// Try to schedule a compaction merge for the given compaction groups.
648    /// The merge will be triggered if the following conditions are met:
649    /// 1. The compaction group is not contains creating table.
650    /// 2. The compaction group is a small group.
651    /// 3. All tables in compaction group is in a low throughput state.
652    async fn on_handle_schedule_group_merge(&self) {
653        self.maybe_normalize_compaction_groups("merge").await;
654
655        let created_tables = match self.metadata_manager.get_created_table_ids().await {
656            Ok(created_tables) => HashSet::from_iter(created_tables),
657            Err(err) => {
658                tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
659                return;
660            }
661        };
662        let table_write_throughput_statistic_manager =
663            self.table_write_throughput_statistic_manager.read().clone();
664        let mut group_infos = self.calculate_compaction_group_statistic().await;
665        // sort by first table id for deterministic merge order
666        group_infos.sort_by_key(|group| group.table_statistic.keys().next().copied());
667
668        let group_count = group_infos.len();
669        if group_count < 2 {
670            return;
671        }
672
673        let mut base = 0;
674        let mut candidate = 1;
675
676        while candidate < group_count {
677            let group = &group_infos[base];
678            let next_group = &group_infos[candidate];
679            match self
680                .try_merge_compaction_group(
681                    &table_write_throughput_statistic_manager,
682                    group,
683                    next_group,
684                    &created_tables,
685                )
686                .await
687            {
688                Ok(_) => candidate += 1,
689                Err(e) => {
690                    tracing::debug!(
691                        error = %e.as_report(),
692                        "Failed to merge compaction group",
693                    );
694                    base = candidate;
695                    candidate = base + 1;
696                }
697            }
698        }
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use std::sync::Arc;
705    use std::time::Duration;
706
707    use itertools::Itertools;
708    use risingwave_common::catalog::TableId;
709    use risingwave_hummock_sdk::CompactionGroupId;
710    use risingwave_hummock_sdk::version::HummockVersion;
711    use risingwave_meta_model::WorkerId;
712    use risingwave_pb::common::worker_node::Property;
713    use risingwave_pb::common::{HostAddress, WorkerType};
714
715    use crate::controller::catalog::CatalogController;
716    use crate::controller::cluster::{ClusterController, ClusterControllerRef};
717    use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
718    use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
719    use crate::manager::{MetaOpts, MetaSrvEnv};
720
721    async fn setup_compute_env_with_meta_opts(
722        port: i32,
723        opts: MetaOpts,
724    ) -> (
725        MetaSrvEnv,
726        HummockManagerRef,
727        ClusterControllerRef,
728        WorkerId,
729    ) {
730        let env = MetaSrvEnv::for_test_opts(opts, |_| ()).await;
731        let cluster_ctl = Arc::new(
732            ClusterController::new(env.clone(), Duration::from_secs(1))
733                .await
734                .unwrap(),
735        );
736        let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
737        let compactor_manager = Arc::new(CompactorManager::for_test());
738        let (compactor_streams_change_tx, _compactor_streams_change_rx) =
739            tokio::sync::mpsc::unbounded_channel();
740        let config = CompactionConfigBuilder::new()
741            .level0_tier_compact_file_number(1)
742            .level0_max_compact_file_number(130)
743            .level0_sub_level_compact_level_count(1)
744            .level0_overlapping_sub_level_compact_level_count(1)
745            .build();
746        let hummock_manager = HummockManager::with_config(
747            env.clone(),
748            cluster_ctl.clone(),
749            catalog_ctl,
750            Arc::new(Default::default()),
751            compactor_manager,
752            config,
753            compactor_streams_change_tx,
754        )
755        .await;
756
757        let worker_id = cluster_ctl
758            .add_worker(
759                WorkerType::ComputeNode,
760                HostAddress {
761                    host: "127.0.0.1".to_owned(),
762                    port,
763                },
764                Property {
765                    is_streaming: true,
766                    is_serving: true,
767                    parallelism: 4,
768                    ..Default::default()
769                },
770                Default::default(),
771            )
772            .await
773            .unwrap();
774        (env, hummock_manager, cluster_ctl, worker_id)
775    }
776
777    async fn get_compaction_group_id_by_table_id(
778        hummock_manager: HummockManagerRef,
779        table_id: u32,
780    ) -> CompactionGroupId {
781        hummock_manager
782            .get_current_version()
783            .await
784            .state_table_info
785            .info()
786            .get(&TableId::new(table_id))
787            .unwrap()
788            .compaction_group_id
789    }
790
791    fn member_table_ids(version: &HummockVersion, group_id: CompactionGroupId) -> Vec<u32> {
792        version
793            .state_table_info
794            .compaction_group_member_table_ids(group_id)
795            .iter()
796            .map(|table_id| table_id.as_raw_id())
797            .collect_vec()
798    }
799
800    fn assert_no_group_overlap(version: &HummockVersion) {
801        let mut ranges = version
802            .levels
803            .keys()
804            .filter_map(|group_id| {
805                let members = member_table_ids(version, *group_id);
806                (!members.is_empty()).then(|| (*members.first().unwrap(), *members.last().unwrap()))
807            })
808            .collect_vec();
809        ranges.sort_by_key(|(min_table_id, _)| *min_table_id);
810        assert!(ranges.windows(2).all(|window| window[0].1 < window[1].0));
811    }
812
813    #[tokio::test]
814    async fn test_merge_scheduling_normalizes_when_split_scheduling_is_disabled() {
815        let mut opts = MetaOpts::test(false);
816        opts.enable_compaction_group_normalize = true;
817        opts.periodic_scheduling_compaction_group_split_interval_sec = 0;
818
819        let (_env, hummock_manager, _, _worker_id) =
820            setup_compute_env_with_meta_opts(80, opts).await;
821        hummock_manager
822            .register_table_ids_for_test(&[(64, 2.into()), (80, 2.into())])
823            .await
824            .unwrap();
825        hummock_manager
826            .register_table_ids_for_test(&[(65, 3.into()), (81, 3.into()), (83, 3.into())])
827            .await
828            .unwrap();
829
830        let cg_64 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 64).await;
831        let cg_65 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 65).await;
832
833        hummock_manager.on_handle_schedule_group_merge().await;
834
835        let version = hummock_manager.get_current_version().await;
836        assert_eq!(member_table_ids(&version, cg_64), vec![64]);
837        assert_eq!(member_table_ids(&version, cg_65), vec![65]);
838        assert_no_group_overlap(&version);
839    }
840}