risingwave_meta/hummock/manager/
timer_task.rs1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use futures::future::Either;
20use futures::stream::BoxStream;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use risingwave_hummock_sdk::CompactionGroupId;
24use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
25use risingwave_pb::hummock::compact_task::{self, TaskStatus};
26use risingwave_pb::hummock::level_handler::RunningCompactTask;
27use rw_futures_util::select_all;
28use thiserror_ext::AsReport;
29use tokio::sync::oneshot::Sender;
30use tokio::task::JoinHandle;
31use tokio_stream::wrappers::IntervalStream;
32use tracing::warn;
33
34use crate::backup_restore::BackupManagerRef;
35use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
36use crate::hummock::{HummockManager, TASK_NORMAL};
37
38impl HummockManager {
39 pub fn hummock_timer_task(
40 hummock_manager: Arc<Self>,
41 backup_manager: Option<BackupManagerRef>,
42 ) -> (JoinHandle<()>, Sender<()>) {
43 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
44 let join_handle = tokio::spawn(async move {
45 const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
46 const STAT_REPORT_PERIOD_SEC: u64 = 20;
47 const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
48
49 pub enum HummockTimerEvent {
50 GroupScheduleSplit,
51 CheckDeadTask,
52 Report,
53 CompactionHeartBeatExpiredCheck,
54
55 DynamicCompactionTrigger,
56 SpaceReclaimCompactionTrigger,
57 TtlCompactionTrigger,
58 TombstoneCompactionTrigger,
59
60 FullGc,
61
62 GroupScheduleMerge,
63 }
64 let mut check_compact_trigger_interval =
65 tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
66 check_compact_trigger_interval
67 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
68 check_compact_trigger_interval.reset();
69
70 let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
71 .map(|_| HummockTimerEvent::CheckDeadTask);
72
73 let mut stat_report_interval =
74 tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
75 stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
76 stat_report_interval.reset();
77 let stat_report_trigger =
78 IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
79
80 let mut compaction_heartbeat_interval = tokio::time::interval(
81 std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
82 );
83 compaction_heartbeat_interval
84 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
85 compaction_heartbeat_interval.reset();
86 let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
87 .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
88
89 let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
90 hummock_manager.env.opts.periodic_compaction_interval_sec,
91 ));
92 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
93 min_trigger_interval.reset();
94 let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
95 .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
96
97 let mut min_space_reclaim_trigger_interval =
98 tokio::time::interval(Duration::from_secs(
99 hummock_manager
100 .env
101 .opts
102 .periodic_space_reclaim_compaction_interval_sec,
103 ));
104 min_space_reclaim_trigger_interval
105 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
106 min_space_reclaim_trigger_interval.reset();
107 let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
108 .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
109
110 let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
111 hummock_manager
112 .env
113 .opts
114 .periodic_ttl_reclaim_compaction_interval_sec,
115 ));
116 min_ttl_reclaim_trigger_interval
117 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
118 min_ttl_reclaim_trigger_interval.reset();
119 let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
120 .map(|_| HummockTimerEvent::TtlCompactionTrigger);
121
122 let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
123 hummock_manager.env.opts.full_gc_interval_sec,
124 ));
125 full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
126 full_gc_interval.reset();
127 let full_gc_trigger =
128 IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
129
130 let mut tombstone_reclaim_trigger_interval =
131 tokio::time::interval(Duration::from_secs(
132 hummock_manager
133 .env
134 .opts
135 .periodic_tombstone_reclaim_compaction_interval_sec,
136 ));
137 tombstone_reclaim_trigger_interval
138 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
139 tombstone_reclaim_trigger_interval.reset();
140 let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
141 .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
142
143 let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
144 Box::pin(check_compact_trigger),
145 Box::pin(stat_report_trigger),
146 Box::pin(compaction_heartbeat_trigger),
147 Box::pin(dynamic_tick_trigger),
148 Box::pin(space_reclaim_trigger),
149 Box::pin(ttl_reclaim_trigger),
150 Box::pin(full_gc_trigger),
151 Box::pin(tombstone_reclaim_trigger),
152 ];
153
154 let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
155 .env
156 .opts
157 .periodic_scheduling_compaction_group_split_interval_sec;
158
159 if periodic_scheduling_compaction_group_split_interval_sec > 0 {
160 let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
161 Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
162 );
163 scheduling_compaction_group_trigger_interval
164 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
165 scheduling_compaction_group_trigger_interval.reset();
166 let group_scheduling_split_trigger =
167 IntervalStream::new(scheduling_compaction_group_trigger_interval)
168 .map(|_| HummockTimerEvent::GroupScheduleSplit);
169 triggers.push(Box::pin(group_scheduling_split_trigger));
170 }
171
172 let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
173 .env
174 .opts
175 .periodic_scheduling_compaction_group_merge_interval_sec;
176
177 if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
178 let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
179 Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
180 );
181 scheduling_compaction_group_merge_trigger_interval
182 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
183 scheduling_compaction_group_merge_trigger_interval.reset();
184 let group_scheduling_merge_trigger =
185 IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
186 .map(|_| HummockTimerEvent::GroupScheduleMerge);
187 triggers.push(Box::pin(group_scheduling_merge_trigger));
188 }
189
190 let event_stream = select_all(triggers);
191 use futures::pin_mut;
192 pin_mut!(event_stream);
193
194 let shutdown_rx_shared = shutdown_rx.shared();
195
196 tracing::info!(
197 "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
198 periodic_scheduling_compaction_group_split_interval_sec,
199 periodic_scheduling_compaction_group_merge_interval_sec,
200 CHECK_PENDING_TASK_PERIOD_SEC,
201 STAT_REPORT_PERIOD_SEC,
202 COMPACTION_HEARTBEAT_PERIOD_SEC
203 );
204
205 loop {
206 let item =
207 futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
208
209 match item {
210 Either::Left((event, _)) => {
211 if let Some(event) = event {
212 match event {
213 HummockTimerEvent::CheckDeadTask => {
214 if hummock_manager.env.opts.compaction_deterministic_test {
215 continue;
216 }
217
218 hummock_manager.check_dead_task().await;
219 }
220
221 HummockTimerEvent::GroupScheduleSplit => {
222 if hummock_manager.env.opts.compaction_deterministic_test {
223 continue;
224 }
225
226 hummock_manager.on_handle_schedule_group_split().await;
227 }
228
229 HummockTimerEvent::GroupScheduleMerge => {
230 if hummock_manager.env.opts.compaction_deterministic_test {
231 continue;
232 }
233
234 hummock_manager.on_handle_schedule_group_merge().await;
235 }
236
237 HummockTimerEvent::Report => {
238 let (current_version, id_to_config, version_stats) = {
239 let versioning_guard =
240 hummock_manager.versioning.read().await;
241
242 let configs =
243 hummock_manager.get_compaction_group_map().await;
244 let versioning_deref = versioning_guard;
245 (
246 versioning_deref.current_version.clone(),
247 configs,
248 versioning_deref.version_stats.clone(),
249 )
250 };
251
252 if let Some(mv_id_to_all_table_ids) = hummock_manager
253 .metadata_manager
254 .get_job_id_to_internal_table_ids_mapping()
255 .await
256 {
257 trigger_mv_stat(
258 &hummock_manager.metrics,
259 &version_stats,
260 mv_id_to_all_table_ids,
261 );
262 }
263
264 for compaction_group_id in
265 get_compaction_group_ids(¤t_version)
266 {
267 let compaction_group_config =
268 &id_to_config[&compaction_group_id];
269
270 let group_levels = current_version
271 .get_compaction_group_levels(
272 compaction_group_config.group_id(),
273 );
274
275 trigger_lsm_stat(
276 &hummock_manager.metrics,
277 compaction_group_config.compaction_config(),
278 group_levels,
279 compaction_group_config.group_id(),
280 )
281 }
282
283 {
284 let group_infos = hummock_manager
285 .calculate_compaction_group_statistic()
286 .await;
287 let compaction_group_count = group_infos.len();
288 hummock_manager
289 .metrics
290 .compaction_group_count
291 .set(compaction_group_count as i64);
292
293 let table_write_throughput_statistic_manager =
294 hummock_manager
295 .table_write_throughput_statistic_manager
296 .read()
297 .clone();
298
299 let current_version_levels = &hummock_manager
300 .versioning
301 .read()
302 .await
303 .current_version
304 .levels;
305
306 for group_info in group_infos {
307 hummock_manager
308 .metrics
309 .compaction_group_size
310 .with_label_values(&[&group_info
311 .group_id
312 .to_string()])
313 .set(group_info.group_size as _);
314 let mut avg_throuput = 0;
316 let max_statistic_expired_time = std::cmp::max(
317 hummock_manager
318 .env
319 .opts
320 .table_stat_throuput_window_seconds_for_split,
321 hummock_manager
322 .env
323 .opts
324 .table_stat_throuput_window_seconds_for_merge,
325 );
326 for table_id in group_info.table_statistic.keys() {
327 avg_throuput +=
328 table_write_throughput_statistic_manager
329 .avg_write_throughput(
330 *table_id,
331 max_statistic_expired_time as i64,
332 )
333 as u64;
334 }
335
336 hummock_manager
337 .metrics
338 .compaction_group_throughput
339 .with_label_values(&[&group_info
340 .group_id
341 .to_string()])
342 .set(avg_throuput as _);
343
344 if let Some(group_levels) =
345 current_version_levels.get(&group_info.group_id)
346 {
347 let file_count = group_levels.count_ssts();
348 hummock_manager
349 .metrics
350 .compaction_group_file_count
351 .with_label_values(&[&group_info
352 .group_id
353 .to_string()])
354 .set(file_count as _);
355 }
356 }
357 }
358 }
359
360 HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
361 let compactor_manager =
362 hummock_manager.compactor_manager.clone();
363
364 let expired_tasks: Vec<u64> = compactor_manager
371 .get_heartbeat_expired_tasks()
372 .into_iter()
373 .map(|task| task.task_id)
374 .collect();
375 if !expired_tasks.is_empty() {
376 tracing::info!(
377 expired_tasks = ?expired_tasks,
378 "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
379 );
380 if let Err(e) = hummock_manager
381 .cancel_compact_tasks(
382 expired_tasks.clone(),
383 TaskStatus::HeartbeatCanceled,
384 )
385 .await
386 {
387 tracing::error!(
388 expired_tasks = ?expired_tasks,
389 error = %e.as_report(),
390 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
391 until we can successfully report its status",
392 );
393 }
394 }
395 }
396
397 HummockTimerEvent::DynamicCompactionTrigger => {
398 if hummock_manager.env.opts.compaction_deterministic_test {
400 continue;
401 }
402
403 hummock_manager
404 .on_handle_trigger_multi_group(
405 compact_task::TaskType::Dynamic,
406 )
407 .await;
408 }
409
410 HummockTimerEvent::SpaceReclaimCompactionTrigger => {
411 if hummock_manager.env.opts.compaction_deterministic_test {
413 continue;
414 }
415
416 hummock_manager
417 .on_handle_trigger_multi_group(
418 compact_task::TaskType::SpaceReclaim,
419 )
420 .await;
421
422 hummock_manager
424 .on_handle_trigger_multi_group(
425 compact_task::TaskType::VnodeWatermark,
426 )
427 .await;
428 }
429
430 HummockTimerEvent::TtlCompactionTrigger => {
431 if hummock_manager.env.opts.compaction_deterministic_test {
433 continue;
434 }
435
436 hummock_manager
437 .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
438 .await;
439 }
440
441 HummockTimerEvent::TombstoneCompactionTrigger => {
442 if hummock_manager.env.opts.compaction_deterministic_test {
444 continue;
445 }
446
447 hummock_manager
448 .on_handle_trigger_multi_group(
449 compact_task::TaskType::Tombstone,
450 )
451 .await;
452 }
453
454 HummockTimerEvent::FullGc => {
455 let retention_sec =
456 hummock_manager.env.opts.min_sst_retention_time_sec;
457 let backup_manager_2 = backup_manager.clone();
458 let hummock_manager_2 = hummock_manager.clone();
459 tokio::task::spawn(async move {
460 use thiserror_ext::AsReport;
461 let _ = hummock_manager_2
462 .start_full_gc(
463 Duration::from_secs(retention_sec),
464 None,
465 backup_manager_2,
466 )
467 .await
468 .inspect_err(|e| {
469 warn!(error = %e.as_report(), "Failed to start GC.")
470 });
471 });
472 }
473 }
474 }
475 }
476
477 Either::Right((_, _shutdown)) => {
478 tracing::info!("Hummock timer loop is stopped");
479 break;
480 }
481 }
482 }
483 });
484 (join_handle, shutdown_tx)
485 }
486}
487
488impl HummockManager {
489 async fn check_dead_task(&self) {
490 const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
491 const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
492 let (groups, configs) = {
493 let versioning_guard = self.versioning.read().await;
494 let g = versioning_guard
495 .current_version
496 .levels
497 .iter()
498 .map(|(id, group)| {
499 (
500 *id,
501 group
502 .l0
503 .sub_levels
504 .iter()
505 .map(|level| level.total_file_size)
506 .sum::<u64>(),
507 )
508 })
509 .collect_vec();
510 let c = self.get_compaction_group_map().await;
511 (g, c)
512 };
513 let mut slowdown_groups: HashMap<CompactionGroupId, u64> = HashMap::default();
514 {
515 for (group_id, l0_file_size) in groups {
516 let group = &configs[&group_id];
517 if l0_file_size
518 > MAX_COMPACTION_L0_MULTIPLIER
519 * group.compaction_config.max_bytes_for_level_base
520 {
521 slowdown_groups.insert(group_id, l0_file_size);
522 }
523 }
524 }
525 if slowdown_groups.is_empty() {
526 return;
527 }
528 let mut pending_tasks: HashMap<u64, (CompactionGroupId, usize, RunningCompactTask)> =
529 HashMap::default();
530 {
531 let compaction_guard = self.compaction.read().await;
532 for group_id in slowdown_groups.keys() {
533 if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
534 for (idx, level_handler) in status.level_handlers.iter().enumerate() {
535 let tasks = level_handler.pending_tasks().to_vec();
536 if tasks.is_empty() {
537 continue;
538 }
539 for task in tasks {
540 pending_tasks.insert(task.task_id, (*group_id, idx, task));
541 }
542 }
543 }
544 }
545 }
546 let task_ids = pending_tasks.keys().cloned().collect_vec();
547 let task_infos = self
548 .compactor_manager
549 .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
550 for (task_id, (compact_time, status)) in task_infos {
551 if status == TASK_NORMAL {
552 continue;
553 }
554 if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
555 let group_size = *slowdown_groups.get(group_id).unwrap();
556 warn!(
557 "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
558 task_id,
559 group_id,
560 group_size / 1024 / 1024,
561 *level_id,
562 compact_time,
563 status,
564 task.ssts
565 );
566 }
567 }
568 }
569
570 async fn on_handle_schedule_group_split(&self) {
575 let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
576 let mut group_infos = self.calculate_compaction_group_statistic().await;
577 group_infos.sort_by_key(|group| group.group_size);
578 group_infos.reverse();
579
580 for group in group_infos {
581 if group.table_statistic.len() == 1 {
582 continue;
584 }
585
586 self.try_split_compaction_group(&table_write_throughput, group)
587 .await;
588 }
589 }
590
591 async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
592 for cg_id in self.compaction_group_ids().await {
593 self.compaction_state.try_sched_compaction(
594 cg_id,
595 task_type,
596 super::compaction::ScheduleTrigger::Periodic,
597 );
598 }
599 }
600
601 async fn on_handle_schedule_group_merge(&self) {
607 let created_tables = match self.metadata_manager.get_created_table_ids().await {
608 Ok(created_tables) => HashSet::from_iter(created_tables),
609 Err(err) => {
610 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
611 return;
612 }
613 };
614 let table_write_throughput_statistic_manager =
615 self.table_write_throughput_statistic_manager.read().clone();
616 let mut group_infos = self.calculate_compaction_group_statistic().await;
617 group_infos.sort_by_key(|group| {
619 let table_ids = group
620 .table_statistic
621 .keys()
622 .cloned()
623 .collect::<BTreeSet<_>>();
624 table_ids.iter().next().cloned()
625 });
626
627 let group_count = group_infos.len();
628 if group_count < 2 {
629 return;
630 }
631
632 let mut base = 0;
633 let mut candidate = 1;
634
635 while candidate < group_count {
636 let group = &group_infos[base];
637 let next_group = &group_infos[candidate];
638 match self
639 .try_merge_compaction_group(
640 &table_write_throughput_statistic_manager,
641 group,
642 next_group,
643 &created_tables,
644 )
645 .await
646 {
647 Ok(_) => candidate += 1,
648 Err(e) => {
649 tracing::debug!(
650 error = %e.as_report(),
651 "Failed to merge compaction group",
652 );
653 base = candidate;
654 candidate = base + 1;
655 }
656 }
657 }
658 }
659}