risingwave_meta/hummock/manager/
timer_task.rs1use std::collections::{BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17use std::time::Duration;
18
19use futures::future::Either;
20use futures::stream::BoxStream;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids;
24use risingwave_pb::hummock::compact_task::{self, TaskStatus};
25use risingwave_pb::hummock::level_handler::RunningCompactTask;
26use rw_futures_util::select_all;
27use thiserror_ext::AsReport;
28use tokio::sync::oneshot::Sender;
29use tokio::task::JoinHandle;
30use tokio_stream::wrappers::IntervalStream;
31use tracing::warn;
32
33use crate::backup_restore::BackupManagerRef;
34use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat};
35use crate::hummock::{HummockManager, TASK_NORMAL};
36
37impl HummockManager {
38 pub fn hummock_timer_task(
39 hummock_manager: Arc<Self>,
40 backup_manager: Option<BackupManagerRef>,
41 ) -> (JoinHandle<()>, Sender<()>) {
42 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
43 let join_handle = tokio::spawn(async move {
44 const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300;
45 const STAT_REPORT_PERIOD_SEC: u64 = 20;
46 const COMPACTION_HEARTBEAT_PERIOD_SEC: u64 = 1;
47
48 pub enum HummockTimerEvent {
49 GroupScheduleSplit,
50 CheckDeadTask,
51 Report,
52 CompactionHeartBeatExpiredCheck,
53
54 DynamicCompactionTrigger,
55 SpaceReclaimCompactionTrigger,
56 TtlCompactionTrigger,
57 TombstoneCompactionTrigger,
58
59 FullGc,
60
61 GroupScheduleMerge,
62 }
63 let mut check_compact_trigger_interval =
64 tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC));
65 check_compact_trigger_interval
66 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
67 check_compact_trigger_interval.reset();
68
69 let check_compact_trigger = IntervalStream::new(check_compact_trigger_interval)
70 .map(|_| HummockTimerEvent::CheckDeadTask);
71
72 let mut stat_report_interval =
73 tokio::time::interval(std::time::Duration::from_secs(STAT_REPORT_PERIOD_SEC));
74 stat_report_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
75 stat_report_interval.reset();
76 let stat_report_trigger =
77 IntervalStream::new(stat_report_interval).map(|_| HummockTimerEvent::Report);
78
79 let mut compaction_heartbeat_interval = tokio::time::interval(
80 std::time::Duration::from_secs(COMPACTION_HEARTBEAT_PERIOD_SEC),
81 );
82 compaction_heartbeat_interval
83 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
84 compaction_heartbeat_interval.reset();
85 let compaction_heartbeat_trigger = IntervalStream::new(compaction_heartbeat_interval)
86 .map(|_| HummockTimerEvent::CompactionHeartBeatExpiredCheck);
87
88 let mut min_trigger_interval = tokio::time::interval(Duration::from_secs(
89 hummock_manager.env.opts.periodic_compaction_interval_sec,
90 ));
91 min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
92 min_trigger_interval.reset();
93 let dynamic_tick_trigger = IntervalStream::new(min_trigger_interval)
94 .map(|_| HummockTimerEvent::DynamicCompactionTrigger);
95
96 let mut min_space_reclaim_trigger_interval =
97 tokio::time::interval(Duration::from_secs(
98 hummock_manager
99 .env
100 .opts
101 .periodic_space_reclaim_compaction_interval_sec,
102 ));
103 min_space_reclaim_trigger_interval
104 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
105 min_space_reclaim_trigger_interval.reset();
106 let space_reclaim_trigger = IntervalStream::new(min_space_reclaim_trigger_interval)
107 .map(|_| HummockTimerEvent::SpaceReclaimCompactionTrigger);
108
109 let mut min_ttl_reclaim_trigger_interval = tokio::time::interval(Duration::from_secs(
110 hummock_manager
111 .env
112 .opts
113 .periodic_ttl_reclaim_compaction_interval_sec,
114 ));
115 min_ttl_reclaim_trigger_interval
116 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
117 min_ttl_reclaim_trigger_interval.reset();
118 let ttl_reclaim_trigger = IntervalStream::new(min_ttl_reclaim_trigger_interval)
119 .map(|_| HummockTimerEvent::TtlCompactionTrigger);
120
121 let mut full_gc_interval = tokio::time::interval(Duration::from_secs(
122 hummock_manager.env.opts.full_gc_interval_sec,
123 ));
124 full_gc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
125 full_gc_interval.reset();
126 let full_gc_trigger =
127 IntervalStream::new(full_gc_interval).map(|_| HummockTimerEvent::FullGc);
128
129 let mut tombstone_reclaim_trigger_interval =
130 tokio::time::interval(Duration::from_secs(
131 hummock_manager
132 .env
133 .opts
134 .periodic_tombstone_reclaim_compaction_interval_sec,
135 ));
136 tombstone_reclaim_trigger_interval
137 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
138 tombstone_reclaim_trigger_interval.reset();
139 let tombstone_reclaim_trigger = IntervalStream::new(tombstone_reclaim_trigger_interval)
140 .map(|_| HummockTimerEvent::TombstoneCompactionTrigger);
141
142 let mut triggers: Vec<BoxStream<'static, HummockTimerEvent>> = vec![
143 Box::pin(check_compact_trigger),
144 Box::pin(stat_report_trigger),
145 Box::pin(compaction_heartbeat_trigger),
146 Box::pin(dynamic_tick_trigger),
147 Box::pin(space_reclaim_trigger),
148 Box::pin(ttl_reclaim_trigger),
149 Box::pin(full_gc_trigger),
150 Box::pin(tombstone_reclaim_trigger),
151 ];
152
153 let periodic_scheduling_compaction_group_split_interval_sec = hummock_manager
154 .env
155 .opts
156 .periodic_scheduling_compaction_group_split_interval_sec;
157
158 if periodic_scheduling_compaction_group_split_interval_sec > 0 {
159 let mut scheduling_compaction_group_trigger_interval = tokio::time::interval(
160 Duration::from_secs(periodic_scheduling_compaction_group_split_interval_sec),
161 );
162 scheduling_compaction_group_trigger_interval
163 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
164 scheduling_compaction_group_trigger_interval.reset();
165 let group_scheduling_split_trigger =
166 IntervalStream::new(scheduling_compaction_group_trigger_interval)
167 .map(|_| HummockTimerEvent::GroupScheduleSplit);
168 triggers.push(Box::pin(group_scheduling_split_trigger));
169 }
170
171 let periodic_scheduling_compaction_group_merge_interval_sec = hummock_manager
172 .env
173 .opts
174 .periodic_scheduling_compaction_group_merge_interval_sec;
175
176 if periodic_scheduling_compaction_group_merge_interval_sec > 0 {
177 let mut scheduling_compaction_group_merge_trigger_interval = tokio::time::interval(
178 Duration::from_secs(periodic_scheduling_compaction_group_merge_interval_sec),
179 );
180 scheduling_compaction_group_merge_trigger_interval
181 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
182 scheduling_compaction_group_merge_trigger_interval.reset();
183 let group_scheduling_merge_trigger =
184 IntervalStream::new(scheduling_compaction_group_merge_trigger_interval)
185 .map(|_| HummockTimerEvent::GroupScheduleMerge);
186 triggers.push(Box::pin(group_scheduling_merge_trigger));
187 }
188
189 let event_stream = select_all(triggers);
190 use futures::pin_mut;
191 pin_mut!(event_stream);
192
193 let shutdown_rx_shared = shutdown_rx.shared();
194
195 tracing::info!(
196 "Hummock timer task [GroupSchedulingSplit interval {} sec] [GroupSchedulingMerge interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]",
197 periodic_scheduling_compaction_group_split_interval_sec,
198 periodic_scheduling_compaction_group_merge_interval_sec,
199 CHECK_PENDING_TASK_PERIOD_SEC,
200 STAT_REPORT_PERIOD_SEC,
201 COMPACTION_HEARTBEAT_PERIOD_SEC
202 );
203
204 loop {
205 let item =
206 futures::future::select(event_stream.next(), shutdown_rx_shared.clone()).await;
207
208 match item {
209 Either::Left((event, _)) => {
210 if let Some(event) = event {
211 match event {
212 HummockTimerEvent::CheckDeadTask => {
213 if hummock_manager.env.opts.compaction_deterministic_test {
214 continue;
215 }
216
217 hummock_manager.check_dead_task().await;
218 }
219
220 HummockTimerEvent::GroupScheduleSplit => {
221 if hummock_manager.env.opts.compaction_deterministic_test {
222 continue;
223 }
224
225 hummock_manager.on_handle_schedule_group_split().await;
226 }
227
228 HummockTimerEvent::GroupScheduleMerge => {
229 if hummock_manager.env.opts.compaction_deterministic_test {
230 continue;
231 }
232
233 hummock_manager.on_handle_schedule_group_merge().await;
234 }
235
236 HummockTimerEvent::Report => {
237 let (current_version, id_to_config, version_stats) = {
238 let versioning_guard =
239 hummock_manager.versioning.read().await;
240
241 let configs =
242 hummock_manager.get_compaction_group_map().await;
243 let versioning_deref = versioning_guard;
244 (
245 versioning_deref.current_version.clone(),
246 configs,
247 versioning_deref.version_stats.clone(),
248 )
249 };
250
251 if let Some(mv_id_to_all_table_ids) = hummock_manager
252 .metadata_manager
253 .get_job_id_to_internal_table_ids_mapping()
254 .await
255 {
256 trigger_mv_stat(
257 &hummock_manager.metrics,
258 &version_stats,
259 mv_id_to_all_table_ids,
260 );
261 }
262
263 for compaction_group_id in
264 get_compaction_group_ids(¤t_version)
265 {
266 let compaction_group_config =
267 &id_to_config[&compaction_group_id];
268
269 let group_levels = current_version
270 .get_compaction_group_levels(
271 compaction_group_config.group_id(),
272 );
273
274 trigger_lsm_stat(
275 &hummock_manager.metrics,
276 compaction_group_config.compaction_config(),
277 group_levels,
278 compaction_group_config.group_id(),
279 )
280 }
281
282 {
283 let group_infos = hummock_manager
284 .calculate_compaction_group_statistic()
285 .await;
286 let compaction_group_count = group_infos.len();
287 hummock_manager
288 .metrics
289 .compaction_group_count
290 .set(compaction_group_count as i64);
291
292 let table_write_throughput_statistic_manager =
293 hummock_manager
294 .table_write_throughput_statistic_manager
295 .read()
296 .clone();
297
298 let current_version_levels = &hummock_manager
299 .versioning
300 .read()
301 .await
302 .current_version
303 .levels;
304
305 for group_info in group_infos {
306 hummock_manager
307 .metrics
308 .compaction_group_size
309 .with_label_values(&[&group_info
310 .group_id
311 .to_string()])
312 .set(group_info.group_size as _);
313 let mut avg_throuput = 0;
315 let max_statistic_expired_time = std::cmp::max(
316 hummock_manager
317 .env
318 .opts
319 .table_stat_throuput_window_seconds_for_split,
320 hummock_manager
321 .env
322 .opts
323 .table_stat_throuput_window_seconds_for_merge,
324 );
325 for table_id in group_info.table_statistic.keys() {
326 avg_throuput +=
327 table_write_throughput_statistic_manager
328 .avg_write_throughput(
329 *table_id,
330 max_statistic_expired_time as i64,
331 )
332 as u64;
333 }
334
335 hummock_manager
336 .metrics
337 .compaction_group_throughput
338 .with_label_values(&[&group_info
339 .group_id
340 .to_string()])
341 .set(avg_throuput as _);
342
343 if let Some(group_levels) =
344 current_version_levels.get(&group_info.group_id)
345 {
346 let file_count = group_levels.count_ssts();
347 hummock_manager
348 .metrics
349 .compaction_group_file_count
350 .with_label_values(&[&group_info
351 .group_id
352 .to_string()])
353 .set(file_count as _);
354 }
355 }
356 }
357 }
358
359 HummockTimerEvent::CompactionHeartBeatExpiredCheck => {
360 let compactor_manager =
361 hummock_manager.compactor_manager.clone();
362
363 let expired_tasks: Vec<u64> = compactor_manager
370 .get_heartbeat_expired_tasks()
371 .into_iter()
372 .map(|task| task.task_id)
373 .collect();
374 if !expired_tasks.is_empty() {
375 tracing::info!(
376 expired_tasks = ?expired_tasks,
377 "Heartbeat expired compaction tasks detected. Attempting to cancel tasks.",
378 );
379 if let Err(e) = hummock_manager
380 .cancel_compact_tasks(
381 expired_tasks.clone(),
382 TaskStatus::HeartbeatCanceled,
383 )
384 .await
385 {
386 tracing::error!(
387 expired_tasks = ?expired_tasks,
388 error = %e.as_report(),
389 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
390 until we can successfully report its status",
391 );
392 }
393 }
394 }
395
396 HummockTimerEvent::DynamicCompactionTrigger => {
397 if hummock_manager.env.opts.compaction_deterministic_test {
399 continue;
400 }
401
402 hummock_manager
403 .on_handle_trigger_multi_group(
404 compact_task::TaskType::Dynamic,
405 )
406 .await;
407 }
408
409 HummockTimerEvent::SpaceReclaimCompactionTrigger => {
410 if hummock_manager.env.opts.compaction_deterministic_test {
412 continue;
413 }
414
415 hummock_manager
416 .on_handle_trigger_multi_group(
417 compact_task::TaskType::SpaceReclaim,
418 )
419 .await;
420
421 hummock_manager
423 .on_handle_trigger_multi_group(
424 compact_task::TaskType::VnodeWatermark,
425 )
426 .await;
427 }
428
429 HummockTimerEvent::TtlCompactionTrigger => {
430 if hummock_manager.env.opts.compaction_deterministic_test {
432 continue;
433 }
434
435 hummock_manager
436 .on_handle_trigger_multi_group(compact_task::TaskType::Ttl)
437 .await;
438 }
439
440 HummockTimerEvent::TombstoneCompactionTrigger => {
441 if hummock_manager.env.opts.compaction_deterministic_test {
443 continue;
444 }
445
446 hummock_manager
447 .on_handle_trigger_multi_group(
448 compact_task::TaskType::Tombstone,
449 )
450 .await;
451 }
452
453 HummockTimerEvent::FullGc => {
454 let retention_sec =
455 hummock_manager.env.opts.min_sst_retention_time_sec;
456 let backup_manager_2 = backup_manager.clone();
457 let hummock_manager_2 = hummock_manager.clone();
458 tokio::task::spawn(async move {
459 use thiserror_ext::AsReport;
460 let _ = hummock_manager_2
461 .start_full_gc(
462 Duration::from_secs(retention_sec),
463 None,
464 backup_manager_2,
465 )
466 .await
467 .inspect_err(|e| {
468 warn!(error = %e.as_report(), "Failed to start GC.")
469 });
470 });
471 }
472 }
473 }
474 }
475
476 Either::Right((_, _shutdown)) => {
477 tracing::info!("Hummock timer loop is stopped");
478 break;
479 }
480 }
481 }
482 });
483 (join_handle, shutdown_tx)
484 }
485}
486
487impl HummockManager {
488 async fn check_dead_task(&self) {
489 const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32;
490 const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60;
491 let (groups, configs) = {
492 let versioning_guard = self.versioning.read().await;
493 let g = versioning_guard
494 .current_version
495 .levels
496 .iter()
497 .map(|(id, group)| {
498 (
499 *id,
500 group
501 .l0
502 .sub_levels
503 .iter()
504 .map(|level| level.total_file_size)
505 .sum::<u64>(),
506 )
507 })
508 .collect_vec();
509 let c = self.get_compaction_group_map().await;
510 (g, c)
511 };
512 let mut slowdown_groups: HashMap<u64, u64> = HashMap::default();
513 {
514 for (group_id, l0_file_size) in groups {
515 let group = &configs[&group_id];
516 if l0_file_size
517 > MAX_COMPACTION_L0_MULTIPLIER
518 * group.compaction_config.max_bytes_for_level_base
519 {
520 slowdown_groups.insert(group_id, l0_file_size);
521 }
522 }
523 }
524 if slowdown_groups.is_empty() {
525 return;
526 }
527 let mut pending_tasks: HashMap<u64, (u64, usize, RunningCompactTask)> = HashMap::default();
528 {
529 let compaction_guard = self.compaction.read().await;
530 for group_id in slowdown_groups.keys() {
531 if let Some(status) = compaction_guard.compaction_statuses.get(group_id) {
532 for (idx, level_handler) in status.level_handlers.iter().enumerate() {
533 let tasks = level_handler.pending_tasks().to_vec();
534 if tasks.is_empty() {
535 continue;
536 }
537 for task in tasks {
538 pending_tasks.insert(task.task_id, (*group_id, idx, task));
539 }
540 }
541 }
542 }
543 }
544 let task_ids = pending_tasks.keys().cloned().collect_vec();
545 let task_infos = self
546 .compactor_manager
547 .check_tasks_status(&task_ids, Duration::from_secs(MAX_COMPACTION_DURATION_SEC));
548 for (task_id, (compact_time, status)) in task_infos {
549 if status == TASK_NORMAL {
550 continue;
551 }
552 if let Some((group_id, level_id, task)) = pending_tasks.get(&task_id) {
553 let group_size = *slowdown_groups.get(group_id).unwrap();
554 warn!(
555 "COMPACTION SLOW: the task-{} of group-{}(size: {}MB) level-{} has not finished after {:?}, {}, it may cause pending sstable files({:?}) blocking other task.",
556 task_id,
557 *group_id,
558 group_size / 1024 / 1024,
559 *level_id,
560 compact_time,
561 status,
562 task.ssts
563 );
564 }
565 }
566 }
567
568 async fn on_handle_schedule_group_split(&self) {
573 let table_write_throughput = self.table_write_throughput_statistic_manager.read().clone();
574 let mut group_infos = self.calculate_compaction_group_statistic().await;
575 group_infos.sort_by_key(|group| group.group_size);
576 group_infos.reverse();
577
578 for group in group_infos {
579 if group.table_statistic.len() == 1 {
580 continue;
582 }
583
584 self.try_split_compaction_group(&table_write_throughput, group)
585 .await;
586 }
587 }
588
589 async fn on_handle_trigger_multi_group(&self, task_type: compact_task::TaskType) {
590 for cg_id in self.compaction_group_ids().await {
591 if let Err(e) = self.compaction_state.try_sched_compaction(cg_id, task_type) {
592 tracing::error!(
593 error = %e.as_report(),
594 "Failed to schedule {:?} compaction for compaction group {}",
595 task_type,
596 cg_id,
597 );
598 }
599 }
600 }
601
602 async fn on_handle_schedule_group_merge(&self) {
608 let created_tables = match self.metadata_manager.get_created_table_ids().await {
609 Ok(created_tables) => HashSet::from_iter(created_tables),
610 Err(err) => {
611 tracing::warn!(error = %err.as_report(), "failed to fetch created table ids");
612 return;
613 }
614 };
615 let table_write_throughput_statistic_manager =
616 self.table_write_throughput_statistic_manager.read().clone();
617 let mut group_infos = self.calculate_compaction_group_statistic().await;
618 group_infos.sort_by_key(|group| {
620 let table_ids = group
621 .table_statistic
622 .keys()
623 .cloned()
624 .collect::<BTreeSet<_>>();
625 table_ids.iter().next().cloned()
626 });
627
628 let group_count = group_infos.len();
629 if group_count < 2 {
630 return;
631 }
632
633 let mut left = 0;
634 let mut right = left + 1;
635
636 while left < right && right < group_count {
637 let group = &group_infos[left];
638 let next_group = &group_infos[right];
639 match self
640 .try_merge_compaction_group(
641 &table_write_throughput_statistic_manager,
642 group,
643 next_group,
644 &created_tables,
645 )
646 .await
647 {
648 Ok(_) => right += 1,
649 Err(e) => {
650 tracing::debug!(
651 error = %e.as_report(),
652 "Failed to merge compaction group",
653 );
654 left = right;
655 right = left + 1;
656 }
657 }
658 }
659 }
660}