1use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::future::Either;
21use futures::stream::BoxStream;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use risingwave_hummock_sdk::CompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_pb::hummock::compact_task::{self, TaskStatus};
27use risingwave_pb::hummock::level_handler::RunningCompactTask;
28use rw_futures_util::select_all;
29use thiserror_ext::AsReport;
30use tokio::sync::oneshot::Sender;
31use tokio::task::JoinHandle;
32use tokio_stream::wrappers::IntervalStream;
33use tracing::warn;
34
35use crate::backup_restore::BackupManagerRef;
36use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
37use crate::hummock::{HummockManager, TASK_NORMAL};
38
39impl HummockManager {
40 pub fn hummock_timer_task(
41 hummock_manager: Arc<Self>,
42 backup_manager: Option<BackupManagerRef>,
43 ) -> (JoinHandle<()>, Sender<()>) {
44 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
45 let join_handle = tokio::spawn(async move {
46 const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
47 const STAT_REPORT_PERIOD_SEC: u64 = 20;
48 const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
49
50 pub enum HummockTimerEvent {
51 GroupScheduleSplit,
52 CheckDeadTask,
53 Report,
54 CompactionHeartBeatExpiredCheck,
55
56 DynamicCompactionTrigger,
57 SpaceReclaimCompactionTrigger,
58 TtlCompactionTrigger,
59 TombstoneCompactionTrigger,
60
61 FullGc,
62
63 GroupScheduleMerge,
64 }
65 let mut check_compact_trigger_interval =
66 tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
67 check_compact_trigger_interval
68 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
69 check_compact_trigger_interval.reset();
70
71 let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
72 .map(|_| HummockTimerEvent::CheckDeadTask);
73
74 let mut stat_report_interval =
75 tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
76 stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
77 stat_report_interval.reset();
78 let stat_report_trigger =
79 IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
80
81 let mut compaction_heartbeat_interval = tokio::time::interval(
82 std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
83 );
84 compaction_heartbeat_interval
85 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
86 compaction_heartbeat_interval.reset();
87 let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
88 .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
89
90 let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
91 hummock_manager.env.opts.periodic_compaction_interval_sec,
92 ));
93 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94 min_trigger_interval.reset();
95 let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
96 .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
97
98 let mut min_space_reclaim_trigger_interval =
99 tokio::time::interval(Duration::from_secs(
100 hummock_manager
101 .env
102 .opts
103 .periodic_space_reclaim_compaction_interval_sec,
104 ));
105 min_space_reclaim_trigger_interval
106 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
107 min_space_reclaim_trigger_interval.reset();
108 let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
109 .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
110
111 let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
112 hummock_manager
113 .env
114 .opts
115 .periodic_ttl_reclaim_compaction_interval_sec,
116 ));
117 min_ttl_reclaim_trigger_interval
118 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119 min_ttl_reclaim_trigger_interval.reset();
120 let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
121 .map(|_| HummockTimerEvent::TtlCompactionTrigger);
122
123 let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
124 hummock_manager.env.opts.full_gc_interval_sec,
125 ));
126 full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127 full_gc_interval.reset();
128 let full_gc_trigger =
129 IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
130
131 let mut tombstone_reclaim_trigger_interval =
132 tokio::time::interval(Duration::from_secs(
133 hummock_manager
134 .env
135 .opts
136 .periodic_tombstone_reclaim_compaction_interval_sec,
137 ));
138 tombstone_reclaim_trigger_interval
139 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
140 tombstone_reclaim_trigger_interval.reset();
141 let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
142 .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
143
144 let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
145 Box::pin(check_compact_trigger),
146 Box::pin(stat_report_trigger),
147 Box::pin(compaction_heartbeat_trigger),
148 Box::pin(dynamic_tick_trigger),
149 Box::pin(space_reclaim_trigger),
150 Box::pin(ttl_reclaim_trigger),
151 Box::pin(full_gc_trigger),
152 Box::pin(tombstone_reclaim_trigger),
153 ];
154
155 let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
156 .env
157 .opts
158 .periodic_scheduling_compaction_group_split_interval_sec;
159
160 if periodic_scheduling_compaction_group_split_interval_sec > 0 {
161 let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
162 Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
163 );
164 scheduling_compaction_group_trigger_interval
165 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
166 scheduling_compaction_group_trigger_interval.reset();
167 let group_scheduling_split_trigger =
168 IntervalStream::new(scheduling_compaction_group_trigger_interval)
169 .map(|_| HummockTimerEvent::GroupScheduleSplit);
170 triggers.push(Box::pin(group_scheduling_split_trigger));
171 }
172
173 let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
174 .env
175 .opts
176 .periodic_scheduling_compaction_group_merge_interval_sec;
177
178 if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
179 let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
180 Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
181 );
182 scheduling_compaction_group_merge_trigger_interval
183 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
184 scheduling_compaction_group_merge_trigger_interval.reset();
185 let group_scheduling_merge_trigger =
186 IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
187 .map(|_| HummockTimerEvent::GroupScheduleMerge);
188 triggers.push(Box::pin(group_scheduling_merge_trigger));
189 }
190
191 let event_stream = select_all(triggers);
192 use futures::pin_mut;
193 pin_mut!(event_stream);
194
195 let shutdown_rx_shared = shutdown_rx.shared();
196
197 tracing::info!(
198 "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
199 periodic_scheduling_compaction_group_split_interval_sec,
200 periodic_scheduling_compaction_group_merge_interval_sec,
201 CHECK_PENDING_TASK_PERIOD_SEC,
202 STAT_REPORT_PERIOD_SEC,
203 COMPACTION_HEARTBEAT_PERIOD_SEC
204 );
205
206 loop {
207 let item =
208 futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
209
210 match item {
211 Either::Left((event, _)) => {
212 if let Some(event) = event {
213 match event {
214 HummockTimerEvent::CheckDeadTask => {
215 if hummock_manager.env.opts.compaction_deterministic_test {
216 continue;
217 }
218
219 hummock_manager.check_dead_task().await;
220 }
221
222 HummockTimerEvent::GroupScheduleSplit => {
223 if hummock_manager.env.opts.compaction_deterministic_test {
224 continue;
225 }
226
227 hummock_manager.on_handle_schedule_group_split().await;
228 }
229
230 HummockTimerEvent::GroupScheduleMerge => {
231 if hummock_manager.env.opts.compaction_deterministic_test {
232 continue;
233 }
234
235 hummock_manager.on_handle_schedule_group_merge().await;
236 }
237
238 HummockTimerEvent::Report => {
239 let (current_version, id_to_config, version_stats) = {
240 let versioning_guard =
241 hummock_manager.versioning.read().await;
242
243 let configs =
244 hummock_manager.get_compaction_group_map().await;
245 let versioning_deref = versioning_guard;
246 (
247 versioning_deref.current_version.clone(),
248 configs,
249 versioning_deref.version_stats.clone(),
250 )
251 };
252
253 if let Some(mv_id_to_all_table_ids) = hummock_manager
254 .metadata_manager
255 .get_job_id_to_internal_table_ids_mapping()
256 .await
257 {
258 trigger_mv_stat(
259 &hummock_manager.metrics,
260 &version_stats,
261 mv_id_to_all_table_ids,
262 );
263 }
264
265 for compaction_group_id in
266 get_compaction_group_ids(¤t_version)
267 {
268 let compaction_group_config =
269 &id_to_config[&compaction_group_id];
270
271 let group_levels = current_version
272 .get_compaction_group_levels(
273 compaction_group_config.group_id(),
274 );
275
276 trigger_lsm_stat(
277 &hummock_manager.metrics,
278 compaction_group_config.compaction_config(),
279 group_levels,
280 compaction_group_config.group_id(),
281 )
282 }
283
284 {
285 let group_infos = hummock_manager
286 .calculate_compaction_group_statistic()
287 .await;
288 let compaction_group_count = group_infos.len();
289 hummock_manager
290 .metrics
291 .compaction_group_count
292 .set(compaction_group_count as i64);
293
294 let table_write_throughput_statistic_manager =
295 hummock_manager
296 .table_write_throughput_statistic_manager
297 .read()
298 .clone();
299
300 let current_version_levels = &hummock_manager
301 .versioning
302 .read()
303 .await
304 .current_version
305 .levels;
306
307 for group_info in group_infos {
308 hummock_manager
309 .metrics
310 .compaction_group_size
311 .with_label_values(&[&group_info
312 .group_id
313 .to_string()])
314 .set(group_info.group_size as _);
315 let mut avg_throuput = 0;
317 let max_statistic_expired_time = std::cmp::max(
318 hummock_manager
319 .env
320 .opts
321 .table_stat_throuput_window_seconds_for_split,
322 hummock_manager
323 .env
324 .opts
325 .table_stat_throuput_window_seconds_for_merge,
326 );
327 for table_id in group_info.table_statistic.keys() {
328 avg_throuput +=
329 table_write_throughput_statistic_manager
330 .avg_write_throughput(
331 *table_id,
332 max_statistic_expired_time as i64,
333 )
334 as u64;
335 }
336
337 hummock_manager
338 .metrics
339 .compaction_group_throughput
340 .with_label_values(&[&group_info
341 .group_id
342 .to_string()])
343 .set(avg_throuput as _);
344
345 if let Some(group_levels) =
346 current_version_levels.get(&group_info.group_id)
347 {
348 let file_count = group_levels.count_ssts();
349 hummock_manager
350 .metrics
351 .compaction_group_file_count
352 .with_label_values(&[&group_info
353 .group_id
354 .to_string()])
355 .set(file_count as _);
356 }
357 }
358 }
359 }
360
361 HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
362 let compactor_manager =
363 hummock_manager.compactor_manager.clone();
364
365 let expired_tasks: Vec<u64> = compactor_manager
372 .get_heartbeat_expired_tasks()
373 .into_iter()
374 .map(|task| task.task_id)
375 .collect();
376 if !expired_tasks.is_empty() {
377 tracing::info!(
378 expired_tasks = ?expired_tasks,
379 "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
380 );
381 if let Err(e) = hummock_manager
382 .cancel_compact_tasks(
383 expired_tasks.clone(),
384 TaskStatus::HeartbeatCanceled,
385 )
386 .await
387 {
388 tracing::error!(
389 expired_tasks = ?expired_tasks,
390 error = %e.as_report(),
391 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
392 until we can successfully report its status",
393 );
394 }
395 }
396 }
397
398 HummockTimerEvent::DynamicCompactionTrigger => {
399 if hummock_manager.env.opts.compaction_deterministic_test {
401 continue;
402 }
403
404 hummock_manager
405 .on_handle_trigger_multi_group(
406 compact_task::TaskType::Dynamic,
407 )
408 .await;
409 }
410
411 HummockTimerEvent::SpaceReclaimCompactionTrigger => {
412 if hummock_manager.env.opts.compaction_deterministic_test {
414 continue;
415 }
416
417 hummock_manager
418 .on_handle_trigger_multi_group(
419 compact_task::TaskType::SpaceReclaim,
420 )
421 .await;
422
423 hummock_manager
425 .on_handle_trigger_multi_group(
426 compact_task::TaskType::VnodeWatermark,
427 )
428 .await;
429 }
430
431 HummockTimerEvent::TtlCompactionTrigger => {
432 if hummock_manager.env.opts.compaction_deterministic_test {
434 continue;
435 }
436
437 hummock_manager
438 .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
439 .await;
440 }
441
442 HummockTimerEvent::TombstoneCompactionTrigger => {
443 if hummock_manager.env.opts.compaction_deterministic_test {
445 continue;
446 }
447
448 hummock_manager
449 .on_handle_trigger_multi_group(
450 compact_task::TaskType::Tombstone,
451 )
452 .await;
453 }
454
455 HummockTimerEvent::FullGc => {
456 let retention_sec =
457 hummock_manager.env.opts.min_sst_retention_time_sec;
458 let backup_manager_2 = backup_manager.clone();
459 let hummock_manager_2 = hummock_manager.clone();
460 tokio::task::spawn(async move {
461 use thiserror_ext::AsReport;
462 let _ = hummock_manager_2
463 .start_full_gc(
464 Duration::from_secs(retention_sec),
465 None,
466 backup_manager_2,
467 )
468 .await
469 .inspect_err(|e| {
470 warn!(error = %e.as_report(), "Failed to start GC.")
471 });
472 });
473 }
474 }
475 }
476 }
477
478 Either::Right((_, _shutdown)) => {
479 tracing::info!("Hummock timer loop is stopped");
480 break;
481 }
482 }
483 }
484 });
485 (join_handle, shutdown_tx)
486 }
487}
488
489impl HummockManager {
490 async fn maybe_normalize_compaction_groups_before_merge(&self) {
491 if !self.env.opts.enable_compaction_group_normalize {
492 return;
493 }
494
495 match self
496 .normalize_overlapping_compaction_groups_with_limit(
497 self.env
498 .opts
499 .max_normalize_splits_per_round
500 .try_into()
501 .unwrap_or(usize::MAX),
502 )
503 .await
504 {
505 Ok(split_count) => {
506 if split_count > 0 {
507 tracing::info!(
508 "normalize compaction groups finished with {} split(s) before merge scheduling",
509 split_count
510 );
511 }
512 }
513 Err(e) => {
514 tracing::warn!(
515 error = %e.as_report(),
516 "failed to normalize compaction groups before merge scheduling"
517 );
518 }
519 }
520 }
521
522 async fn check_dead_task(&self) {
523 const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
524 const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
525 let (groups, configs) = {
526 let versioning_guard = self.versioning.read().await;
527 let g = versioning_guard
528 .current_version
529 .levels
530 .iter()
531 .map(|(id, group)| {
532 (
533 *id,
534 group
535 .l0
536 .sub_levels
537 .iter()
538 .map(|level| level.total_file_size)
539 .sum::<u64>(),
540 )
541 })
542 .collect_vec();
543 let c = self.get_compaction_group_map().await;
544 (g, c)
545 };
546 let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
547 {
548 for (group_id, l0_file_size) in groups {
549 let group = &configs[&group_id];
550 if l0_file_size
551 > MAX_COMPACTION_L0_MULTIPLIER
552 * group.compaction_config.max_bytes_for_level_base
553 {
554 slowdown_groups.insert(group_id, l0_file_size);
555 }
556 }
557 }
558 if slowdown_groups.is_empty() {
559 return;
560 }
561 let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
562 HashMap::default();
563 {
564 let compaction_guard = self.compaction.read().await;
565 for group_id in slowdown_groups.keys() {
566 if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
567 for (idx, level_handler) in status.level_handlers.iter().enumerate() {
568 let tasks = level_handler.pending_tasks().to_vec();
569 if tasks.is_empty() {
570 continue;
571 }
572 for task in tasks {
573 pending_tasks.insert(task.task_id, (*group_id, idx, task));
574 }
575 }
576 }
577 }
578 }
579 let task_ids = pending_tasks.keys().cloned().collect_vec();
580 let task_infos = self
581 .compactor_manager
582 .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
583 for (task_id, (compact_time, status)) in task_infos {
584 if status == TASK_NORMAL {
585 continue;
586 }
587 if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
588 let group_size = *slowdown_groups.get(group_id).unwrap();
589 warn!(
590 "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
591 task_id,
592 group_id,
593 group_size / 1024 / 1024,
594 *level_id,
595 compact_time,
596 status,
597 task.ssts
598 );
599 }
600 }
601 }
602
603 async fn on_handle_schedule_group_split(&self) {
608 let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
609
610 let mut group_infos = self.calculate_compaction_group_statistic().await;
611 group_infos.sort_by_key(|group| Reverse(group.group_size));
612
613 for group in group_infos {
614 if group.table_statistic.len() == 1 {
615 continue;
617 }
618
619 self.try_split_compaction_group(&table_write_throughput, group)
620 .await;
621 }
622 }
623
624 #[cfg(test)]
625 pub async fn schedule_group_split_for_test(&self) {
626 self.on_handle_schedule_group_split().await;
627 }
628
629 #[cfg(test)]
630 pub async fn schedule_group_merge_for_test(&self) {
631 self.on_handle_schedule_group_merge().await;
632 }
633
634 async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
635 for cg_id in self.compaction_group_ids().await {
636 self.compaction_state.try_sched_compaction(
637 cg_id,
638 task_type,
639 super::compaction::ScheduleTrigger::Periodic,
640 );
641 }
642 }
643
644 async fn on_handle_schedule_group_merge(&self) {
650 self.maybe_normalize_compaction_groups_before_merge().await;
651
652 let created_tables = match self.metadata_manager.get_created_table_ids().await {
653 Ok(created_tables) => HashSet::from_iter(created_tables),
654 Err(err) => {
655 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
656 return;
657 }
658 };
659 let table_write_throughput_statistic_manager =
660 self.table_write_throughput_statistic_manager.read().clone();
661 let mut group_infos = self.calculate_compaction_group_statistic().await;
662 group_infos.sort_by_key(|group| group.table_statistic.keys().next().copied());
664
665 let group_count = group_infos.len();
666 if group_count < 2 {
667 return;
668 }
669
670 let mut base = 0;
671 let mut candidate = 1;
672
673 while candidate < group_count {
674 let group = &group_infos[base];
675 let next_group = &group_infos[candidate];
676 match self
677 .try_merge_compaction_group(
678 &table_write_throughput_statistic_manager,
679 group,
680 next_group,
681 &created_tables,
682 )
683 .await
684 {
685 Ok(_) => candidate += 1,
686 Err(e) => {
687 tracing::debug!(
688 error = %e.as_report(),
689 "Failed to merge compaction group",
690 );
691 base = candidate;
692 candidate = base + 1;
693 }
694 }
695 }
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use std::sync::Arc;
702 use std::time::Duration;
703
704 use itertools::Itertools;
705 use risingwave_common::catalog::TableId;
706 use risingwave_hummock_sdk::CompactionGroupId;
707 use risingwave_hummock_sdk::version::HummockVersion;
708 use risingwave_meta_model::WorkerId;
709 use risingwave_pb::common::worker_node::Property;
710 use risingwave_pb::common::{HostAddress, WorkerType};
711
712 use crate::controller::catalog::CatalogController;
713 use crate::controller::cluster::{ClusterController, ClusterControllerRef};
714 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
715 use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
716 use crate::manager::{MetaOpts, MetaSrvEnv};
717
718 async fn setup_compute_env_with_meta_opts(
719 port: i32,
720 opts: MetaOpts,
721 ) -> (
722 MetaSrvEnv,
723 HummockManagerRef,
724 ClusterControllerRef,
725 WorkerId,
726 ) {
727 let env = MetaSrvEnv::for_test_opts(opts, |_| ()).await;
728 let cluster_ctl = Arc::new(
729 ClusterController::new(env.clone(), Duration::from_secs(1))
730 .await
731 .unwrap(),
732 );
733 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
734 let compactor_manager = Arc::new(CompactorManager::for_test());
735 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
736 tokio::sync::mpsc::unbounded_channel();
737 let config = CompactionConfigBuilder::new()
738 .level0_tier_compact_file_number(1)
739 .level0_max_compact_file_number(130)
740 .level0_sub_level_compact_level_count(1)
741 .level0_overlapping_sub_level_compact_level_count(1)
742 .build();
743 let hummock_manager = HummockManager::with_config(
744 env.clone(),
745 cluster_ctl.clone(),
746 catalog_ctl,
747 Arc::new(Default::default()),
748 compactor_manager,
749 config,
750 compactor_streams_change_tx,
751 )
752 .await;
753
754 let worker_id = cluster_ctl
755 .add_worker(
756 WorkerType::ComputeNode,
757 HostAddress {
758 host: "127.0.0.1".to_owned(),
759 port,
760 },
761 Property {
762 is_streaming: true,
763 is_serving: true,
764 parallelism: 4,
765 ..Default::default()
766 },
767 Default::default(),
768 )
769 .await
770 .unwrap();
771 (env, hummock_manager, cluster_ctl, worker_id)
772 }
773
774 async fn get_compaction_group_id_by_table_id(
775 hummock_manager: HummockManagerRef,
776 table_id: u32,
777 ) -> CompactionGroupId {
778 hummock_manager
779 .get_current_version()
780 .await
781 .state_table_info
782 .info()
783 .get(&TableId::new(table_id))
784 .unwrap()
785 .compaction_group_id
786 }
787
788 fn member_table_ids(version: &HummockVersion, group_id: CompactionGroupId) -> Vec<u32> {
789 version
790 .state_table_info
791 .compaction_group_member_table_ids(group_id)
792 .iter()
793 .map(|table_id| table_id.as_raw_id())
794 .collect_vec()
795 }
796
797 fn assert_no_group_overlap(version: &HummockVersion) {
798 let mut ranges = version
799 .levels
800 .keys()
801 .filter_map(|group_id| {
802 let members = member_table_ids(version, *group_id);
803 (!members.is_empty()).then(|| (*members.first().unwrap(), *members.last().unwrap()))
804 })
805 .collect_vec();
806 ranges.sort_by_key(|(min_table_id, _)| *min_table_id);
807 assert!(ranges.windows(2).all(|window| window[0].1 < window[1].0));
808 }
809
810 #[tokio::test]
811 async fn test_merge_scheduling_normalizes_when_split_scheduling_is_disabled() {
812 let mut opts = MetaOpts::test(false);
813 opts.enable_compaction_group_normalize = true;
814 opts.periodic_scheduling_compaction_group_split_interval_sec = 0;
815
816 let (_env, hummock_manager, _, _worker_id) =
817 setup_compute_env_with_meta_opts(80, opts).await;
818 hummock_manager
819 .register_table_ids_for_test(&[(64, 2.into()), (80, 2.into())])
820 .await
821 .unwrap();
822 hummock_manager
823 .register_table_ids_for_test(&[(65, 3.into()), (81, 3.into()), (83, 3.into())])
824 .await
825 .unwrap();
826
827 let cg_64 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 64).await;
828 let cg_65 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 65).await;
829
830 hummock_manager.on_handle_schedule_group_merge().await;
831
832 let version = hummock_manager.get_current_version().await;
833 assert_eq!(member_table_ids(&version, cg_64), vec![64]);
834 assert_eq!(member_table_ids(&version, cg_65), vec![65]);
835 assert_no_group_overlap(&version);
836 }
837}