1use std::cmp::Reverse;
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use futures::future::Either;
21use futures::stream::BoxStream;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use risingwave_hummock_sdk::CompactionGroupId;
25use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
26use risingwave_pb::hummock::compact_task::{self, TaskStatus};
27use risingwave_pb::hummock::level_handler::RunningCompactTask;
28use rw_futures_util::select_all;
29use thiserror_ext::AsReport;
30use tokio::sync::oneshot::Sender;
31use tokio::task::JoinHandle;
32use tokio_stream::wrappers::IntervalStream;
33use tracing::warn;
34
35use crate::backup_restore::BackupManagerRef;
36use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
37use crate::hummock::{HummockManager, TASK_NORMAL};
38
39impl HummockManager {
40 pub fn hummock_timer_task(
41 hummock_manager: Arc<Self>,
42 backup_manager: Option<BackupManagerRef>,
43 ) -> (JoinHandle<()>, Sender<()>) {
44 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
45 let join_handle = tokio::spawn(async move {
46 const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
47 const STAT_REPORT_PERIOD_SEC: u64 = 20;
48 const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
49
50 pub enum HummockTimerEvent {
51 GroupScheduleSplit,
52 CheckDeadTask,
53 Report,
54 CompactionHeartBeatExpiredCheck,
55
56 DynamicCompactionTrigger,
57 SpaceReclaimCompactionTrigger,
58 TtlCompactionTrigger,
59 TombstoneCompactionTrigger,
60
61 FullGc,
62
63 GroupScheduleMerge,
64 }
65 let mut check_compact_trigger_interval =
66 tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
67 check_compact_trigger_interval
68 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
69 check_compact_trigger_interval.reset();
70
71 let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
72 .map(|_| HummockTimerEvent::CheckDeadTask);
73
74 let mut stat_report_interval =
75 tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
76 stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
77 stat_report_interval.reset();
78 let stat_report_trigger =
79 IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
80
81 let mut compaction_heartbeat_interval = tokio::time::interval(
82 std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
83 );
84 compaction_heartbeat_interval
85 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
86 compaction_heartbeat_interval.reset();
87 let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
88 .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
89
90 let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
91 hummock_manager.env.opts.periodic_compaction_interval_sec,
92 ));
93 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94 min_trigger_interval.reset();
95 let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
96 .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
97
98 let mut min_space_reclaim_trigger_interval =
99 tokio::time::interval(Duration::from_secs(
100 hummock_manager
101 .env
102 .opts
103 .periodic_space_reclaim_compaction_interval_sec,
104 ));
105 min_space_reclaim_trigger_interval
106 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
107 min_space_reclaim_trigger_interval.reset();
108 let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
109 .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
110
111 let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
112 hummock_manager
113 .env
114 .opts
115 .periodic_ttl_reclaim_compaction_interval_sec,
116 ));
117 min_ttl_reclaim_trigger_interval
118 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
119 min_ttl_reclaim_trigger_interval.reset();
120 let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
121 .map(|_| HummockTimerEvent::TtlCompactionTrigger);
122
123 let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
124 hummock_manager.env.opts.full_gc_interval_sec,
125 ));
126 full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
127 full_gc_interval.reset();
128 let full_gc_trigger =
129 IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
130
131 let mut tombstone_reclaim_trigger_interval =
132 tokio::time::interval(Duration::from_secs(
133 hummock_manager
134 .env
135 .opts
136 .periodic_tombstone_reclaim_compaction_interval_sec,
137 ));
138 tombstone_reclaim_trigger_interval
139 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
140 tombstone_reclaim_trigger_interval.reset();
141 let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
142 .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
143
144 let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
145 Box::pin(check_compact_trigger),
146 Box::pin(stat_report_trigger),
147 Box::pin(compaction_heartbeat_trigger),
148 Box::pin(dynamic_tick_trigger),
149 Box::pin(space_reclaim_trigger),
150 Box::pin(ttl_reclaim_trigger),
151 Box::pin(full_gc_trigger),
152 Box::pin(tombstone_reclaim_trigger),
153 ];
154
155 let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
156 .env
157 .opts
158 .periodic_scheduling_compaction_group_split_interval_sec;
159
160 if periodic_scheduling_compaction_group_split_interval_sec > 0 {
161 let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
162 Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
163 );
164 scheduling_compaction_group_trigger_interval
165 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
166 scheduling_compaction_group_trigger_interval.reset();
167 let group_scheduling_split_trigger =
168 IntervalStream::new(scheduling_compaction_group_trigger_interval)
169 .map(|_| HummockTimerEvent::GroupScheduleSplit);
170 triggers.push(Box::pin(group_scheduling_split_trigger));
171 }
172
173 let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
174 .env
175 .opts
176 .periodic_scheduling_compaction_group_merge_interval_sec;
177
178 if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
179 let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
180 Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
181 );
182 scheduling_compaction_group_merge_trigger_interval
183 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
184 scheduling_compaction_group_merge_trigger_interval.reset();
185 let group_scheduling_merge_trigger =
186 IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
187 .map(|_| HummockTimerEvent::GroupScheduleMerge);
188 triggers.push(Box::pin(group_scheduling_merge_trigger));
189 }
190
191 let event_stream = select_all(triggers);
192 use futures::pin_mut;
193 pin_mut!(event_stream);
194
195 let shutdown_rx_shared = shutdown_rx.shared();
196
197 tracing::info!(
198 "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
199 periodic_scheduling_compaction_group_split_interval_sec,
200 periodic_scheduling_compaction_group_merge_interval_sec,
201 CHECK_PENDING_TASK_PERIOD_SEC,
202 STAT_REPORT_PERIOD_SEC,
203 COMPACTION_HEARTBEAT_PERIOD_SEC
204 );
205
206 loop {
207 let item =
208 futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
209
210 match item {
211 Either::Left((event, _)) => {
212 if let Some(event) = event {
213 match event {
214 HummockTimerEvent::CheckDeadTask => {
215 if hummock_manager.env.opts.compaction_deterministic_test {
216 continue;
217 }
218
219 hummock_manager.check_dead_task().await;
220 }
221
222 HummockTimerEvent::GroupScheduleSplit => {
223 if hummock_manager.env.opts.compaction_deterministic_test {
224 continue;
225 }
226
227 hummock_manager.on_handle_schedule_group_split().await;
228 }
229
230 HummockTimerEvent::GroupScheduleMerge => {
231 if hummock_manager.env.opts.compaction_deterministic_test {
232 continue;
233 }
234
235 hummock_manager.on_handle_schedule_group_merge().await;
236 }
237
238 HummockTimerEvent::Report => {
239 let (current_version, id_to_config, version_stats) = {
240 let versioning_guard =
241 hummock_manager.versioning.read().await;
242
243 let configs =
244 hummock_manager.get_compaction_group_map().await;
245 let versioning_deref = versioning_guard;
246 (
247 versioning_deref.current_version.clone(),
248 configs,
249 versioning_deref.version_stats.clone(),
250 )
251 };
252
253 if let Some(mv_id_to_all_table_ids) = hummock_manager
254 .metadata_manager
255 .get_job_id_to_internal_table_ids_mapping()
256 .await
257 {
258 trigger_mv_stat(
259 &hummock_manager.metrics,
260 &version_stats,
261 mv_id_to_all_table_ids,
262 );
263 }
264
265 for compaction_group_id in
266 get_compaction_group_ids(¤t_version)
267 {
268 let compaction_group_config =
269 &id_to_config[&compaction_group_id];
270
271 let group_levels = current_version
272 .get_compaction_group_levels(
273 compaction_group_config.group_id(),
274 );
275
276 trigger_lsm_stat(
277 &hummock_manager.metrics,
278 compaction_group_config.compaction_config(),
279 group_levels,
280 compaction_group_config.group_id(),
281 )
282 }
283
284 {
285 let group_infos = hummock_manager
286 .calculate_compaction_group_statistic()
287 .await;
288 let compaction_group_count = group_infos.len();
289 hummock_manager
290 .metrics
291 .compaction_group_count
292 .set(compaction_group_count as i64);
293
294 let table_write_throughput_statistic_manager =
295 hummock_manager
296 .table_write_throughput_statistic_manager
297 .read()
298 .clone();
299
300 let current_version_levels = &hummock_manager
301 .versioning
302 .read()
303 .await
304 .current_version
305 .levels;
306
307 for group_info in group_infos {
308 hummock_manager
309 .metrics
310 .compaction_group_size
311 .with_label_values(&[&group_info
312 .group_id
313 .to_string()])
314 .set(group_info.group_size as _);
315 let mut avg_throuput = 0;
317 let max_statistic_expired_time = std::cmp::max(
318 hummock_manager
319 .env
320 .opts
321 .table_stat_throuput_window_seconds_for_split,
322 hummock_manager
323 .env
324 .opts
325 .table_stat_throuput_window_seconds_for_merge,
326 );
327 for table_id in group_info.table_statistic.keys() {
328 avg_throuput +=
329 table_write_throughput_statistic_manager
330 .avg_write_throughput(
331 *table_id,
332 max_statistic_expired_time as i64,
333 )
334 as u64;
335 }
336
337 hummock_manager
338 .metrics
339 .compaction_group_throughput
340 .with_label_values(&[&group_info
341 .group_id
342 .to_string()])
343 .set(avg_throuput as _);
344
345 if let Some(group_levels) =
346 current_version_levels.get(&group_info.group_id)
347 {
348 let file_count = group_levels.count_ssts();
349 hummock_manager
350 .metrics
351 .compaction_group_file_count
352 .with_label_values(&[&group_info
353 .group_id
354 .to_string()])
355 .set(file_count as _);
356 }
357 }
358 }
359 }
360
361 HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
362 let compactor_manager =
363 hummock_manager.compactor_manager.clone();
364
365 let expired_tasks: Vec<u64> = compactor_manager
372 .get_heartbeat_expired_tasks()
373 .into_iter()
374 .map(|task| task.task_id)
375 .collect();
376 if !expired_tasks.is_empty() {
377 tracing::info!(
378 expired_tasks = ?expired_tasks,
379 "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
380 );
381 if let Err(e) = hummock_manager
382 .cancel_compact_tasks(
383 expired_tasks.clone(),
384 TaskStatus::HeartbeatCanceled,
385 )
386 .await
387 {
388 tracing::error!(
389 expired_tasks = ?expired_tasks,
390 error = %e.as_report(),
391 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
392 until we can successfully report its status",
393 );
394 }
395 }
396 }
397
398 HummockTimerEvent::DynamicCompactionTrigger => {
399 if hummock_manager.env.opts.compaction_deterministic_test {
401 continue;
402 }
403
404 hummock_manager
405 .on_handle_trigger_multi_group(
406 compact_task::TaskType::Dynamic,
407 )
408 .await;
409 }
410
411 HummockTimerEvent::SpaceReclaimCompactionTrigger => {
412 if hummock_manager.env.opts.compaction_deterministic_test {
414 continue;
415 }
416
417 hummock_manager
418 .on_handle_trigger_multi_group(
419 compact_task::TaskType::SpaceReclaim,
420 )
421 .await;
422
423 hummock_manager
425 .on_handle_trigger_multi_group(
426 compact_task::TaskType::VnodeWatermark,
427 )
428 .await;
429 }
430
431 HummockTimerEvent::TtlCompactionTrigger => {
432 if hummock_manager.env.opts.compaction_deterministic_test {
434 continue;
435 }
436
437 hummock_manager
438 .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
439 .await;
440 }
441
442 HummockTimerEvent::TombstoneCompactionTrigger => {
443 if hummock_manager.env.opts.compaction_deterministic_test {
445 continue;
446 }
447
448 hummock_manager
449 .on_handle_trigger_multi_group(
450 compact_task::TaskType::Tombstone,
451 )
452 .await;
453 }
454
455 HummockTimerEvent::FullGc => {
456 let retention_sec =
457 hummock_manager.env.opts.min_sst_retention_time_sec;
458 let backup_manager_2 = backup_manager.clone();
459 let hummock_manager_2 = hummock_manager.clone();
460 tokio::task::spawn(async move {
461 use thiserror_ext::AsReport;
462 let _ = hummock_manager_2
463 .start_full_gc(
464 Duration::from_secs(retention_sec),
465 None,
466 backup_manager_2,
467 )
468 .await
469 .inspect_err(|e| {
470 warn!(error = %e.as_report(), "Failed to start GC.")
471 });
472 });
473 }
474 }
475 }
476 }
477
478 Either::Right((_, _shutdown)) => {
479 tracing::info!("Hummock timer loop is stopped");
480 break;
481 }
482 }
483 }
484 });
485 (join_handle, shutdown_tx)
486 }
487}
488
489impl HummockManager {
490 async fn maybe_normalize_compaction_groups(&self, schedule_type: &'static str) {
491 if !self.env.opts.enable_compaction_group_normalize {
492 return;
493 }
494
495 match self
496 .normalize_overlapping_compaction_groups_with_limit(
497 self.env
498 .opts
499 .max_normalize_splits_per_round
500 .try_into()
501 .unwrap_or(usize::MAX),
502 )
503 .await
504 {
505 Ok(split_count) => {
506 if split_count > 0 {
507 tracing::info!(
508 "normalize compaction groups finished with {} split(s) before {} scheduling",
509 split_count,
510 schedule_type
511 );
512 }
513 }
514 Err(e) => {
515 tracing::warn!(
516 error = %e.as_report(),
517 "failed to normalize compaction groups before {} scheduling",
518 schedule_type
519 );
520 }
521 }
522 }
523
524 async fn check_dead_task(&self) {
525 const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
526 const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
527 let (groups, configs) = {
528 let versioning_guard = self.versioning.read().await;
529 let g = versioning_guard
530 .current_version
531 .levels
532 .iter()
533 .map(|(id, group)| {
534 (
535 *id,
536 group
537 .l0
538 .sub_levels
539 .iter()
540 .map(|level| level.total_file_size)
541 .sum::<u64>(),
542 )
543 })
544 .collect_vec();
545 let c = self.get_compaction_group_map().await;
546 (g, c)
547 };
548 let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
549 {
550 for (group_id, l0_file_size) in groups {
551 let group = &configs[&group_id];
552 if l0_file_size
553 > MAX_COMPACTION_L0_MULTIPLIER
554 * group.compaction_config.max_bytes_for_level_base
555 {
556 slowdown_groups.insert(group_id, l0_file_size);
557 }
558 }
559 }
560 if slowdown_groups.is_empty() {
561 return;
562 }
563 let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
564 HashMap::default();
565 {
566 let compaction_guard = self.compaction.read().await;
567 for group_id in slowdown_groups.keys() {
568 if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
569 for (idx, level_handler) in status.level_handlers.iter().enumerate() {
570 let tasks = level_handler.pending_tasks().to_vec();
571 if tasks.is_empty() {
572 continue;
573 }
574 for task in tasks {
575 pending_tasks.insert(task.task_id, (*group_id, idx, task));
576 }
577 }
578 }
579 }
580 }
581 let task_ids = pending_tasks.keys().cloned().collect_vec();
582 let task_infos = self
583 .compactor_manager
584 .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
585 for (task_id, (compact_time, status)) in task_infos {
586 if status == TASK_NORMAL {
587 continue;
588 }
589 if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
590 let group_size = *slowdown_groups.get(group_id).unwrap();
591 warn!(
592 "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
593 task_id,
594 group_id,
595 group_size / 1024 / 1024,
596 *level_id,
597 compact_time,
598 status,
599 task.ssts
600 );
601 }
602 }
603 }
604
605 async fn on_handle_schedule_group_split(&self) {
610 let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
611 self.maybe_normalize_compaction_groups("split").await;
612
613 let mut group_infos = self.calculate_compaction_group_statistic().await;
614 group_infos.sort_by_key(|group| Reverse(group.group_size));
615
616 for group in group_infos {
617 if group.table_statistic.len() == 1 {
618 continue;
620 }
621
622 self.try_split_compaction_group(&table_write_throughput, group)
623 .await;
624 }
625 }
626
627 #[cfg(test)]
628 pub async fn schedule_group_split_for_test(&self) {
629 self.on_handle_schedule_group_split().await;
630 }
631
632 #[cfg(test)]
633 pub async fn schedule_group_merge_for_test(&self) {
634 self.on_handle_schedule_group_merge().await;
635 }
636
637 async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
638 for cg_id in self.compaction_group_ids().await {
639 self.compaction_state.try_sched_compaction(
640 cg_id,
641 task_type,
642 super::compaction::ScheduleTrigger::Periodic,
643 );
644 }
645 }
646
647 async fn on_handle_schedule_group_merge(&self) {
653 self.maybe_normalize_compaction_groups("merge").await;
654
655 let created_tables = match self.metadata_manager.get_created_table_ids().await {
656 Ok(created_tables) => HashSet::from_iter(created_tables),
657 Err(err) => {
658 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
659 return;
660 }
661 };
662 let table_write_throughput_statistic_manager =
663 self.table_write_throughput_statistic_manager.read().clone();
664 let mut group_infos = self.calculate_compaction_group_statistic().await;
665 group_infos.sort_by_key(|group| group.table_statistic.keys().next().copied());
667
668 let group_count = group_infos.len();
669 if group_count < 2 {
670 return;
671 }
672
673 let mut base = 0;
674 let mut candidate = 1;
675
676 while candidate < group_count {
677 let group = &group_infos[base];
678 let next_group = &group_infos[candidate];
679 match self
680 .try_merge_compaction_group(
681 &table_write_throughput_statistic_manager,
682 group,
683 next_group,
684 &created_tables,
685 )
686 .await
687 {
688 Ok(_) => candidate += 1,
689 Err(e) => {
690 tracing::debug!(
691 error = %e.as_report(),
692 "Failed to merge compaction group",
693 );
694 base = candidate;
695 candidate = base + 1;
696 }
697 }
698 }
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use std::sync::Arc;
705 use std::time::Duration;
706
707 use itertools::Itertools;
708 use risingwave_common::catalog::TableId;
709 use risingwave_hummock_sdk::CompactionGroupId;
710 use risingwave_hummock_sdk::version::HummockVersion;
711 use risingwave_meta_model::WorkerId;
712 use risingwave_pb::common::worker_node::Property;
713 use risingwave_pb::common::{HostAddress, WorkerType};
714
715 use crate::controller::catalog::CatalogController;
716 use crate::controller::cluster::{ClusterController, ClusterControllerRef};
717 use crate::hummock::compaction::compaction_config::CompactionConfigBuilder;
718 use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef};
719 use crate::manager::{MetaOpts, MetaSrvEnv};
720
721 async fn setup_compute_env_with_meta_opts(
722 port: i32,
723 opts: MetaOpts,
724 ) -> (
725 MetaSrvEnv,
726 HummockManagerRef,
727 ClusterControllerRef,
728 WorkerId,
729 ) {
730 let env = MetaSrvEnv::for_test_opts(opts, |_| ()).await;
731 let cluster_ctl = Arc::new(
732 ClusterController::new(env.clone(), Duration::from_secs(1))
733 .await
734 .unwrap(),
735 );
736 let catalog_ctl = Arc::new(CatalogController::new(env.clone()).await.unwrap());
737 let compactor_manager = Arc::new(CompactorManager::for_test());
738 let (compactor_streams_change_tx, _compactor_streams_change_rx) =
739 tokio::sync::mpsc::unbounded_channel();
740 let config = CompactionConfigBuilder::new()
741 .level0_tier_compact_file_number(1)
742 .level0_max_compact_file_number(130)
743 .level0_sub_level_compact_level_count(1)
744 .level0_overlapping_sub_level_compact_level_count(1)
745 .build();
746 let hummock_manager = HummockManager::with_config(
747 env.clone(),
748 cluster_ctl.clone(),
749 catalog_ctl,
750 Arc::new(Default::default()),
751 compactor_manager,
752 config,
753 compactor_streams_change_tx,
754 )
755 .await;
756
757 let worker_id = cluster_ctl
758 .add_worker(
759 WorkerType::ComputeNode,
760 HostAddress {
761 host: "127.0.0.1".to_owned(),
762 port,
763 },
764 Property {
765 is_streaming: true,
766 is_serving: true,
767 parallelism: 4,
768 ..Default::default()
769 },
770 Default::default(),
771 )
772 .await
773 .unwrap();
774 (env, hummock_manager, cluster_ctl, worker_id)
775 }
776
777 async fn get_compaction_group_id_by_table_id(
778 hummock_manager: HummockManagerRef,
779 table_id: u32,
780 ) -> CompactionGroupId {
781 hummock_manager
782 .get_current_version()
783 .await
784 .state_table_info
785 .info()
786 .get(&TableId::new(table_id))
787 .unwrap()
788 .compaction_group_id
789 }
790
791 fn member_table_ids(version: &HummockVersion, group_id: CompactionGroupId) -> Vec<u32> {
792 version
793 .state_table_info
794 .compaction_group_member_table_ids(group_id)
795 .iter()
796 .map(|table_id| table_id.as_raw_id())
797 .collect_vec()
798 }
799
800 fn assert_no_group_overlap(version: &HummockVersion) {
801 let mut ranges = version
802 .levels
803 .keys()
804 .filter_map(|group_id| {
805 let members = member_table_ids(version, *group_id);
806 (!members.is_empty()).then(|| (*members.first().unwrap(), *members.last().unwrap()))
807 })
808 .collect_vec();
809 ranges.sort_by_key(|(min_table_id, _)| *min_table_id);
810 assert!(ranges.windows(2).all(|window| window[0].1 < window[1].0));
811 }
812
813 #[tokio::test]
814 async fn test_merge_scheduling_normalizes_when_split_scheduling_is_disabled() {
815 let mut opts = MetaOpts::test(false);
816 opts.enable_compaction_group_normalize = true;
817 opts.periodic_scheduling_compaction_group_split_interval_sec = 0;
818
819 let (_env, hummock_manager, _, _worker_id) =
820 setup_compute_env_with_meta_opts(80, opts).await;
821 hummock_manager
822 .register_table_ids_for_test(&[(64, 2.into()), (80, 2.into())])
823 .await
824 .unwrap();
825 hummock_manager
826 .register_table_ids_for_test(&[(65, 3.into()), (81, 3.into()), (83, 3.into())])
827 .await
828 .unwrap();
829
830 let cg_64 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 64).await;
831 let cg_65 = get_compaction_group_id_by_table_id(hummock_manager.clone(), 65).await;
832
833 hummock_manager.on_handle_schedule_group_merge().await;
834
835 let version = hummock_manager.get_current_version().await;
836 assert_eq!(member_table_ids(&version, cg_64), vec![64]);
837 assert_eq!(member_table_ids(&version, cg_65), vec![65]);
838 assert_no_group_overlap(&version);
839 }
840}