1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::sync::{Arc, LazyLock};
17use std::time::Instant;
18
19use anyhow::Context;
20use compaction_event_loop::{
21 HummockCompactionEventDispatcher, HummockCompactionEventHandler, HummockCompactionEventLoop,
22 HummockCompactorDedicatedEventLoop,
23};
24use fail::fail_point;
25use itertools::Itertools;
26use parking_lot::Mutex;
27use rand::rng as thread_rng;
28use rand::seq::SliceRandom;
29use risingwave_common::catalog::TableId;
30use risingwave_common::config::meta::default::compaction_config;
31use risingwave_common::hash::VnodeCountCompat;
32use risingwave_common::util::epoch::Epoch;
33use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
34use risingwave_hummock_sdk::compaction_group::StateTableId;
35use risingwave_hummock_sdk::key_range::KeyRange;
36use risingwave_hummock_sdk::level::Levels;
37use risingwave_hummock_sdk::sstable_info::SstableInfo;
38use risingwave_hummock_sdk::table_stats::{
39 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
40};
41use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
42use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
43use risingwave_hummock_sdk::{
44 CompactionGroupId, HummockCompactionTaskId, HummockContextId, HummockSstableId,
45 HummockSstableObjectId, HummockVersionId, compact_task_to_string, statistics_compact_task,
46};
47use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
48use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
49use risingwave_pb::hummock::{
50 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
51 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
52};
53use thiserror_ext::AsReport;
54use tokio::sync::RwLockWriteGuard;
55use tokio::sync::mpsc::UnboundedReceiver;
56use tokio::sync::oneshot::Sender;
57use tokio::task::JoinHandle;
58use tonic::Streaming;
59use tracing::warn;
60
61use crate::hummock::compaction::selector::level_selector::PickerInfo;
62use crate::hummock::compaction::selector::{
63 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
64 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
65 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
66};
67use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
68use crate::hummock::error::{Error, Result};
69use crate::hummock::manager::transaction::{
70 HummockVersionStatsTransaction, HummockVersionTransaction,
71};
72use crate::hummock::manager::versioning::Versioning;
73use crate::hummock::metrics_utils::{
74 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
75 trigger_local_table_stat,
76};
77use crate::hummock::model::CompactionGroup;
78use crate::hummock::sequence::next_compaction_task_id;
79use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
80use crate::manager::META_NODE_ID;
81use crate::model::BTreeMapTransaction;
82
83pub mod compaction_event_loop;
84pub mod compaction_group_manager;
85pub mod compaction_group_schedule;
86
87static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
88 [
89 TaskStatus::ManualCanceled,
90 TaskStatus::SendFailCanceled,
91 TaskStatus::AssignFailCanceled,
92 TaskStatus::HeartbeatCanceled,
93 TaskStatus::InvalidGroupCanceled,
94 TaskStatus::NoAvailMemoryResourceCanceled,
95 TaskStatus::NoAvailCpuResourceCanceled,
96 TaskStatus::HeartbeatProgressCanceled,
97 ]
98 .into_iter()
99 .collect()
100});
101
102fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
103 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
104 HashMap::default();
105 compaction_selectors.insert(
106 compact_task::TaskType::Dynamic,
107 Box::<DynamicLevelSelector>::default(),
108 );
109 compaction_selectors.insert(
110 compact_task::TaskType::SpaceReclaim,
111 Box::<SpaceReclaimCompactionSelector>::default(),
112 );
113 compaction_selectors.insert(
114 compact_task::TaskType::Ttl,
115 Box::<TtlCompactionSelector>::default(),
116 );
117 compaction_selectors.insert(
118 compact_task::TaskType::Tombstone,
119 Box::<TombstoneCompactionSelector>::default(),
120 );
121 compaction_selectors.insert(
122 compact_task::TaskType::VnodeWatermark,
123 Box::<VnodeWatermarkCompactionSelector>::default(),
124 );
125 compaction_selectors
126}
127
128impl HummockVersionTransaction<'_> {
129 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
130 let mut version_delta = self.new_delta();
131 let trivial_move = compact_task.is_trivial_move_task();
132 version_delta.trivial_move = trivial_move;
133
134 let group_deltas = &mut version_delta
135 .group_deltas
136 .entry(compact_task.compaction_group_id)
137 .or_default()
138 .group_deltas;
139 let mut removed_table_ids_map: BTreeMap<u32, HashSet<HummockSstableId>> =
140 BTreeMap::default();
141
142 for level in &compact_task.input_ssts {
143 let level_idx = level.level_idx;
144
145 removed_table_ids_map
146 .entry(level_idx)
147 .or_default()
148 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
149 }
150
151 for (level_idx, removed_table_ids) in removed_table_ids_map {
152 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
153 level_idx,
154 0, removed_table_ids,
156 vec![], 0, compact_task.compaction_group_version_id,
159 ));
160
161 group_deltas.push(group_delta);
162 }
163
164 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
165 compact_task.target_level,
166 compact_task.target_sub_level_id,
167 HashSet::new(), compact_task.sorted_output_ssts.clone(),
169 compact_task.split_weight_by_vnode,
170 compact_task.compaction_group_version_id,
171 ));
172
173 group_deltas.push(group_delta);
174 version_delta.pre_apply();
175 }
176}
177
178#[derive(Default)]
179pub struct Compaction {
180 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
182 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
184
185 pub _deterministic_mode: bool,
186}
187
188impl HummockManager {
189 pub async fn get_assigned_compact_task_num(&self) -> u64 {
190 self.compaction.read().await.compact_task_assignment.len() as u64
191 }
192
193 pub async fn list_compaction_status(
194 &self,
195 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
196 let compaction = self.compaction.read().await;
197 (
198 compaction.compaction_statuses.values().map_into().collect(),
199 compaction
200 .compact_task_assignment
201 .values()
202 .cloned()
203 .collect(),
204 )
205 }
206
207 pub async fn get_compaction_scores(
208 &self,
209 compaction_group_id: CompactionGroupId,
210 ) -> Vec<PickerInfo> {
211 let (status, levels, group) = {
212 let compaction = self.compaction.read().await;
213 let versioning = self.versioning.read().await;
214 let config_manager = self.compaction_group_manager.read().await;
215 match (
216 compaction.compaction_statuses.get(&compaction_group_id),
217 versioning.current_version.levels.get(&compaction_group_id),
218 config_manager.try_get_compaction_group_config(compaction_group_id),
219 ) {
220 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
221 _ => {
222 return vec![];
223 }
224 }
225 };
226 let dynamic_level_core = DynamicLevelSelectorCore::new(
227 group.compaction_config,
228 Arc::new(CompactionDeveloperConfig::default()),
229 );
230 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
231 ctx.score_levels
232 }
233}
234
235impl HummockManager {
236 pub fn compaction_event_loop(
237 hummock_manager: Arc<Self>,
238 compactor_streams_change_rx: UnboundedReceiver<(
239 HummockContextId,
240 Streaming<SubscribeCompactionEventRequest>,
241 )>,
242 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
243 let mut join_handle_vec = Vec::default();
244
245 let hummock_compaction_event_handler =
246 HummockCompactionEventHandler::new(hummock_manager.clone());
247
248 let dedicated_event_loop = HummockCompactorDedicatedEventLoop::new(
249 hummock_manager.clone(),
250 hummock_compaction_event_handler.clone(),
251 );
252
253 let (dedicated_event_loop_join_handle, event_tx, shutdown_tx) = dedicated_event_loop.run();
254 join_handle_vec.push((dedicated_event_loop_join_handle, shutdown_tx));
255
256 let hummock_compaction_event_dispatcher = HummockCompactionEventDispatcher::new(
257 hummock_manager.env.opts.clone(),
258 hummock_compaction_event_handler,
259 Some(event_tx),
260 );
261
262 let event_loop = HummockCompactionEventLoop::new(
263 hummock_compaction_event_dispatcher,
264 hummock_manager.metrics.clone(),
265 compactor_streams_change_rx,
266 );
267
268 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
269 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
270
271 join_handle_vec
272 }
273
274 pub fn add_compactor_stream(
275 &self,
276 context_id: HummockContextId,
277 req_stream: Streaming<SubscribeCompactionEventRequest>,
278 ) {
279 self.compactor_streams_change_tx
280 .send((context_id, req_stream))
281 .unwrap();
282 }
283}
284
285impl HummockManager {
286 pub async fn get_compact_tasks_impl(
287 &self,
288 compaction_groups: Vec<CompactionGroupId>,
289 max_select_count: usize,
290 selector: &mut Box<dyn CompactionSelector>,
291 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
292 let deterministic_mode = self.env.opts.compaction_deterministic_test;
293
294 let mut compaction_guard = self.compaction.write().await;
295 let compaction: &mut Compaction = &mut compaction_guard;
296 let mut versioning_guard = self.versioning.write().await;
297 let versioning: &mut Versioning = &mut versioning_guard;
298
299 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
300
301 let start_time = Instant::now();
302 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
303
304 let mut compact_task_assignment =
305 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
306
307 let mut version = HummockVersionTransaction::new(
308 &mut versioning.current_version,
309 &mut versioning.hummock_version_deltas,
310 self.env.notification_manager(),
311 None,
312 &self.metrics,
313 );
314 let mut version_stats = HummockVersionStatsTransaction::new(
316 &mut versioning.version_stats,
317 self.env.notification_manager(),
318 );
319
320 if deterministic_mode {
321 version.disable_apply_to_txn();
322 }
323 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
324 self.metadata_manager
325 .catalog_controller
326 .get_versioned_table_schemas()
327 .await
328 .map_err(|e| Error::Internal(e.into()))?
329 } else {
330 HashMap::default()
331 };
332 let mut unschedule_groups = vec![];
333 let mut trivial_tasks = vec![];
334 let mut pick_tasks = vec![];
335 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
336 &self.env.opts,
337 ));
338 'outside: for compaction_group_id in compaction_groups {
339 if pick_tasks.len() >= max_select_count {
340 break;
341 }
342
343 if !version
344 .latest_version()
345 .levels
346 .contains_key(&compaction_group_id)
347 {
348 continue;
349 }
350
351 let group_config = {
355 let config_manager = self.compaction_group_manager.read().await;
356
357 match config_manager.try_get_compaction_group_config(compaction_group_id) {
358 Some(config) => config,
359 None => continue,
360 }
361 };
362
363 let task_id = next_compaction_task_id(&self.env).await?;
365
366 if !compaction_statuses.contains_key(&compaction_group_id) {
367 compaction_statuses.insert(
369 compaction_group_id,
370 CompactStatus::new(
371 compaction_group_id,
372 group_config.compaction_config.max_level,
373 ),
374 );
375 }
376 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
377
378 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
379 || matches!(selector.task_type(), TaskType::Emergency);
380
381 let mut stats = LocalSelectorStatistic::default();
382 let member_table_ids: Vec<_> = version
383 .latest_version()
384 .state_table_info
385 .compaction_group_member_table_ids(compaction_group_id)
386 .iter()
387 .copied()
388 .collect();
389
390 let mut table_id_to_option: HashMap<TableId, _> = HashMap::default();
391
392 {
393 let guard = self.table_id_to_table_option.read();
394 for table_id in &member_table_ids {
395 if let Some(opts) = guard.get(table_id) {
396 table_id_to_option.insert(*table_id, *opts);
397 }
398 }
399 }
400
401 while let Some(compact_task) = compact_status.get_compact_task(
402 version
403 .latest_version()
404 .get_compaction_group_levels(compaction_group_id),
405 version
406 .latest_version()
407 .state_table_info
408 .compaction_group_member_table_ids(compaction_group_id),
409 task_id as HummockCompactionTaskId,
410 &group_config,
411 &mut stats,
412 selector,
413 &table_id_to_option,
414 developer_config.clone(),
415 &version.latest_version().table_watermarks,
416 &version.latest_version().state_table_info,
417 ) {
418 let target_level_id = compact_task.input.target_level as u32;
419 let compaction_group_version_id = version
420 .latest_version()
421 .get_compaction_group_levels(compaction_group_id)
422 .compaction_group_version_id;
423 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
424 "Lz4" => 1,
425 "Zstd" => 2,
426 _ => 0,
427 };
428 let vnode_partition_count = compact_task.input.vnode_partition_count;
429 let mut compact_task = CompactTask {
430 input_ssts: compact_task.input.input_levels,
431 splits: vec![KeyRange::inf()],
432 sorted_output_ssts: vec![],
433 task_id,
434 target_level: target_level_id,
435 gc_delete_keys: version
438 .latest_version()
439 .get_compaction_group_levels(compaction_group_id)
440 .is_last_level(target_level_id),
441 base_level: compact_task.base_level as u32,
442 task_status: TaskStatus::Pending,
443 compaction_group_id: group_config.group_id,
444 compaction_group_version_id,
445 existing_table_ids: member_table_ids.clone(),
446 compression_algorithm,
447 target_file_size: compact_task.target_file_size,
448 table_options: table_id_to_option
449 .iter()
450 .map(|(table_id, table_option)| {
451 (*table_id, TableOption::from(table_option))
452 })
453 .collect(),
454 current_epoch_time: Epoch::now().0,
455 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
456 target_sub_level_id: compact_task.input.target_sub_level_id,
457 task_type: compact_task.compaction_task_type,
458 split_weight_by_vnode: vnode_partition_count,
459 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
460 max_kv_count_for_xor16: group_config.compaction_config.max_kv_count_for_xor16,
461 ..Default::default()
462 };
463
464 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
465 let is_trivial_move = compact_task.is_trivial_move_task();
466 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
467 let log_label = if is_trivial_reclaim {
468 "TrivialReclaim"
469 } else {
470 "TrivialMove"
471 };
472 let label = if is_trivial_reclaim {
473 "trivial-space-reclaim"
474 } else {
475 "trivial-move"
476 };
477
478 tracing::debug!(
479 "{} for compaction group {}: input: {:?}, cost time: {:?}",
480 log_label,
481 compact_task.compaction_group_id,
482 compact_task.input_ssts,
483 start_time.elapsed()
484 );
485 compact_task.task_status = TaskStatus::Success;
486 compact_status.report_compact_task(&compact_task);
487 if !is_trivial_reclaim {
488 compact_task
489 .sorted_output_ssts
490 .clone_from(&compact_task.input_ssts[0].table_infos);
491 }
492 update_table_stats_for_vnode_watermark_trivial_reclaim(
493 &mut version_stats.table_stats,
494 &compact_task,
495 );
496 self.metrics
497 .compact_frequency
498 .with_label_values(&[
499 label,
500 &compact_task.compaction_group_id.to_string(),
501 selector.task_type().as_str_name(),
502 "SUCCESS",
503 ])
504 .inc();
505
506 version.apply_compact_task(&compact_task);
507 trivial_tasks.push(compact_task);
508 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
509 break 'outside;
510 }
511 } else {
512 self.calculate_vnode_partition(
513 &mut compact_task,
514 group_config.compaction_config.as_ref(),
515 version
516 .latest_version()
517 .get_compaction_group_levels(compaction_group_id),
518 )
519 .await?;
520
521 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
522
523 let mut pk_prefix_table_watermarks = BTreeMap::default();
524 let mut non_pk_prefix_table_watermarks = BTreeMap::default();
525 let mut value_table_watermarks = BTreeMap::default();
526 for (table_id, watermark) in version
527 .latest_version()
528 .safe_epoch_table_watermarks(&table_ids_to_be_compacted)
529 {
530 match watermark.watermark_type {
531 WatermarkSerdeType::PkPrefix => {
532 pk_prefix_table_watermarks.insert(table_id, watermark);
533 }
534 WatermarkSerdeType::NonPkPrefix => {
535 non_pk_prefix_table_watermarks.insert(table_id, watermark);
536 }
537 WatermarkSerdeType::Value => {
538 value_table_watermarks.insert(table_id, watermark);
539 }
540 }
541 }
542 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
543 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
544 compact_task.value_table_watermarks = value_table_watermarks;
545
546 compact_task.table_schemas = compact_task
547 .existing_table_ids
548 .iter()
549 .filter_map(|table_id| {
550 all_versioned_table_schemas.get(table_id).map(|column_ids| {
551 (
552 *table_id,
553 TableSchema {
554 column_ids: column_ids.clone(),
555 },
556 )
557 })
558 })
559 .collect();
560
561 compact_task_assignment.insert(
562 compact_task.task_id,
563 CompactTaskAssignment {
564 compact_task: Some(compact_task.clone().into()),
565 context_id: META_NODE_ID, },
567 );
568
569 pick_tasks.push(compact_task);
570 break;
571 }
572
573 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
574 stats = LocalSelectorStatistic::default();
575 }
576 if pick_tasks
577 .last()
578 .map(|task| task.compaction_group_id != compaction_group_id)
579 .unwrap_or(true)
580 {
581 unschedule_groups.push(compaction_group_id);
582 }
583 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
584 }
585
586 if !trivial_tasks.is_empty() {
587 commit_multi_var!(
588 self.meta_store_ref(),
589 compaction_statuses,
590 compact_task_assignment,
591 version,
592 version_stats
593 )?;
594 self.metrics
595 .compact_task_batch_count
596 .with_label_values(&["batch_trivial_move"])
597 .observe(trivial_tasks.len() as f64);
598
599 for trivial_task in &trivial_tasks {
600 self.metrics
601 .compact_task_trivial_move_sst_count
602 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
603 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
604 }
605
606 drop(versioning_guard);
607 } else {
608 drop(versioning_guard);
611 commit_multi_var!(
612 self.meta_store_ref(),
613 compaction_statuses,
614 compact_task_assignment
615 )?;
616 }
617 drop(compaction_guard);
618 if !pick_tasks.is_empty() {
619 self.metrics
620 .compact_task_batch_count
621 .with_label_values(&["batch_get_compact_task"])
622 .observe(pick_tasks.len() as f64);
623 }
624
625 for compact_task in &mut pick_tasks {
626 let compaction_group_id = compact_task.compaction_group_id;
627
628 self.compactor_manager
630 .initiate_task_heartbeat(compact_task.clone());
631
632 compact_task.task_status = TaskStatus::Pending;
634 let compact_task_statistics = statistics_compact_task(compact_task);
635
636 let level_type_label = build_compact_task_level_type_metrics_label(
637 compact_task.input_ssts[0].level_idx as usize,
638 compact_task.input_ssts.last().unwrap().level_idx as usize,
639 );
640
641 let level_count = compact_task.input_ssts.len();
642 if compact_task.input_ssts[0].level_idx == 0 {
643 self.metrics
644 .l0_compact_level_count
645 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
646 .observe(level_count as _);
647 }
648
649 self.metrics
650 .compact_task_size
651 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
652 .observe(compact_task_statistics.total_file_size as _);
653
654 self.metrics
655 .compact_task_size
656 .with_label_values(&[
657 &compaction_group_id.to_string(),
658 &format!("{} uncompressed", level_type_label),
659 ])
660 .observe(compact_task_statistics.total_uncompressed_file_size as _);
661
662 self.metrics
663 .compact_task_file_count
664 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
665 .observe(compact_task_statistics.total_file_count as _);
666
667 tracing::trace!(
668 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
669 compaction_group_id,
670 level_count,
671 compact_task.input_ssts[0].level_type.as_str_name(),
672 compact_task.input_ssts[0].level_idx,
673 compact_task.target_level,
674 start_time.elapsed(),
675 compact_task_statistics
676 );
677 }
678
679 #[cfg(test)]
680 {
681 self.check_state_consistency().await;
682 }
683 pick_tasks.extend(trivial_tasks);
684 Ok((pick_tasks, unschedule_groups))
685 }
686
687 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
689 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
690 anyhow::anyhow!("failpoint metastore err")
691 )));
692 let ret = self
693 .cancel_compact_task_impl(vec![task_id], task_status)
694 .await?;
695 Ok(ret[0])
696 }
697
698 pub async fn cancel_compact_tasks(
699 &self,
700 tasks: Vec<u64>,
701 task_status: TaskStatus,
702 ) -> Result<Vec<bool>> {
703 self.cancel_compact_task_impl(tasks, task_status).await
704 }
705
706 async fn cancel_compact_task_impl(
707 &self,
708 task_ids: Vec<u64>,
709 task_status: TaskStatus,
710 ) -> Result<Vec<bool>> {
711 assert!(CANCEL_STATUS_SET.contains(&task_status));
712 let tasks = task_ids
713 .into_iter()
714 .map(|task_id| ReportTask {
715 task_id,
716 task_status,
717 sorted_output_ssts: vec![],
718 table_stats_change: HashMap::default(),
719 object_timestamps: HashMap::default(),
720 })
721 .collect_vec();
722 let rets = self.report_compact_tasks(tasks).await?;
723 #[cfg(test)]
724 {
725 self.check_state_consistency().await;
726 }
727 Ok(rets)
728 }
729
730 async fn get_compact_tasks(
731 &self,
732 mut compaction_groups: Vec<CompactionGroupId>,
733 max_select_count: usize,
734 selector: &mut Box<dyn CompactionSelector>,
735 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
736 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
737 anyhow::anyhow!("failpoint metastore error")
738 )));
739 compaction_groups.shuffle(&mut thread_rng());
740 let (mut tasks, groups) = self
741 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
742 .await?;
743 tasks.retain(|task| {
744 if task.task_status == TaskStatus::Success {
745 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
746 false
747 } else {
748 true
749 }
750 });
751 Ok((tasks, groups))
752 }
753
754 pub async fn get_compact_task(
755 &self,
756 compaction_group_id: CompactionGroupId,
757 selector: &mut Box<dyn CompactionSelector>,
758 ) -> Result<Option<CompactTask>> {
759 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
760 anyhow::anyhow!("failpoint metastore error")
761 )));
762
763 let (normal_tasks, _) = self
764 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
765 .await?;
766 for task in normal_tasks {
767 if task.task_status != TaskStatus::Success {
768 return Ok(Some(task));
769 }
770 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
771 }
772 Ok(None)
773 }
774
775 pub async fn manual_get_compact_task(
776 &self,
777 compaction_group_id: CompactionGroupId,
778 manual_compaction_option: ManualCompactionOption,
779 ) -> Result<Option<CompactTask>> {
780 let mut selector: Box<dyn CompactionSelector> =
781 Box::new(ManualCompactionSelector::new(manual_compaction_option));
782 self.get_compact_task(compaction_group_id, &mut selector)
783 .await
784 }
785
786 pub async fn report_compact_task(
787 &self,
788 task_id: u64,
789 task_status: TaskStatus,
790 sorted_output_ssts: Vec<SstableInfo>,
791 table_stats_change: Option<PbTableStatsMap>,
792 object_timestamps: HashMap<HummockSstableObjectId, u64>,
793 ) -> Result<bool> {
794 let rets = self
795 .report_compact_tasks(vec![ReportTask {
796 task_id,
797 task_status,
798 sorted_output_ssts,
799 table_stats_change: table_stats_change.unwrap_or_default(),
800 object_timestamps,
801 }])
802 .await?;
803 Ok(rets[0])
804 }
805
806 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
807 let compaction_guard = self.compaction.write().await;
808 let versioning_guard = self.versioning.write().await;
809
810 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
811 .await
812 }
813
814 pub async fn report_compact_tasks_impl(
822 &self,
823 report_tasks: Vec<ReportTask>,
824 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
825 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
826 ) -> Result<Vec<bool>> {
827 let deterministic_mode = self.env.opts.compaction_deterministic_test;
828 let compaction: &mut Compaction = &mut compaction_guard;
829 let start_time = Instant::now();
830 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
831 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
832 let mut rets = vec![false; report_tasks.len()];
833 let mut compact_task_assignment =
834 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
835 let versioning: &mut Versioning = &mut versioning_guard;
837 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
838
839 for group_id in original_keys {
841 if !versioning.current_version.levels.contains_key(&group_id) {
842 compact_statuses.remove(group_id);
843 }
844 }
845 let mut tasks = vec![];
846
847 let mut version = HummockVersionTransaction::new(
848 &mut versioning.current_version,
849 &mut versioning.hummock_version_deltas,
850 self.env.notification_manager(),
851 None,
852 &self.metrics,
853 );
854
855 if deterministic_mode {
856 version.disable_apply_to_txn();
857 }
858
859 let mut version_stats = HummockVersionStatsTransaction::new(
860 &mut versioning.version_stats,
861 self.env.notification_manager(),
862 );
863 let mut success_count = 0;
864 for (idx, task) in report_tasks.into_iter().enumerate() {
865 rets[idx] = true;
866 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
867 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
868 None => {
869 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
870 rets[idx] = false;
871 continue;
872 }
873 };
874
875 {
876 compact_task.task_status = task.task_status;
878 compact_task.sorted_output_ssts = task.sorted_output_ssts;
879 }
880
881 match compact_statuses.get_mut(compact_task.compaction_group_id) {
882 Some(mut compact_status) => {
883 compact_status.report_compact_task(&compact_task);
884 }
885 None => {
886 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
892 }
893 }
894
895 let is_success = if let TaskStatus::Success = compact_task.task_status {
896 match self
897 .report_compaction_sanity_check(&task.object_timestamps)
898 .await
899 {
900 Err(e) => {
901 warn!(
902 "failed to commit compaction task {} {}",
903 compact_task.task_id,
904 e.as_report()
905 );
906 compact_task.task_status = TaskStatus::RetentionTimeRejected;
907 false
908 }
909 _ => {
910 let group = version
911 .latest_version()
912 .levels
913 .get(&compact_task.compaction_group_id)
914 .unwrap();
915 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
916 if is_expired {
917 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
918 warn!(
919 "The task may be expired because of group split, task:\n {:?}",
920 compact_task_to_string(&compact_task)
921 );
922 }
923 !is_expired
924 }
925 }
926 } else {
927 false
928 };
929 if is_success {
930 success_count += 1;
931 version.apply_compact_task(&compact_task);
932 if purge_prost_table_stats(
933 &mut version_stats.table_stats,
934 version.latest_version(),
935 &HashSet::default(),
936 ) {
937 self.metrics.version_stats.reset();
938 versioning.local_metrics.clear();
939 }
940 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
941 trigger_local_table_stat(
942 &self.metrics,
943 &mut versioning.local_metrics,
944 &version_stats,
945 &task.table_stats_change,
946 );
947 }
948 tasks.push(compact_task);
949 }
950 if success_count > 0 {
951 commit_multi_var!(
952 self.meta_store_ref(),
953 compact_statuses,
954 compact_task_assignment,
955 version,
956 version_stats
957 )?;
958
959 self.metrics
960 .compact_task_batch_count
961 .with_label_values(&["batch_report_task"])
962 .observe(success_count as f64);
963 } else {
964 commit_multi_var!(
966 self.meta_store_ref(),
967 compact_statuses,
968 compact_task_assignment
969 )?;
970 }
971
972 let mut success_groups = vec![];
973 for compact_task in &tasks {
974 self.compactor_manager
975 .remove_task_heartbeat(compact_task.task_id);
976 tracing::trace!(
977 "Reported compaction task. {}. cost time: {:?}",
978 compact_task_to_string(compact_task),
979 start_time.elapsed(),
980 );
981
982 if !deterministic_mode
983 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
984 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
985 {
986 self.try_send_compaction_request(
988 compact_task.compaction_group_id,
989 compact_task::TaskType::Dynamic,
990 );
991 }
992
993 if compact_task.task_status == TaskStatus::Success {
994 success_groups.push(compact_task.compaction_group_id);
995 }
996 }
997
998 trigger_compact_tasks_stat(
999 &self.metrics,
1000 &tasks,
1001 &compaction.compaction_statuses,
1002 &versioning_guard.current_version,
1003 );
1004 drop(versioning_guard);
1005 if !success_groups.is_empty() {
1006 self.try_update_write_limits(&success_groups).await;
1007 }
1008 Ok(rets)
1009 }
1010
1011 pub async fn trigger_compaction_deterministic(
1014 &self,
1015 _base_version_id: HummockVersionId,
1016 compaction_groups: Vec<CompactionGroupId>,
1017 ) -> Result<()> {
1018 self.on_current_version(|old_version| {
1019 tracing::info!(
1020 "Trigger compaction for version {}, groups {:?}",
1021 old_version.id,
1022 compaction_groups
1023 );
1024 })
1025 .await;
1026
1027 if compaction_groups.is_empty() {
1028 return Ok(());
1029 }
1030 for compaction_group in compaction_groups {
1031 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1032 }
1033 Ok(())
1034 }
1035
1036 pub async fn trigger_manual_compaction(
1037 &self,
1038 compaction_group: CompactionGroupId,
1039 manual_compaction_option: ManualCompactionOption,
1040 ) -> Result<()> {
1041 let start_time = Instant::now();
1042
1043 let compactor = match self.compactor_manager.next_compactor() {
1045 Some(compactor) => compactor,
1046 None => {
1047 tracing::warn!("trigger_manual_compaction No compactor is available.");
1048 return Err(anyhow::anyhow!(
1049 "trigger_manual_compaction No compactor is available. compaction_group {}",
1050 compaction_group
1051 )
1052 .into());
1053 }
1054 };
1055
1056 let compact_task = self
1058 .manual_get_compact_task(compaction_group, manual_compaction_option)
1059 .await;
1060 let compact_task = match compact_task {
1061 Ok(Some(compact_task)) => compact_task,
1062 Ok(None) => {
1063 return Err(anyhow::anyhow!(
1065 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1066 compaction_group
1067 )
1068 .into());
1069 }
1070 Err(err) => {
1071 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1072
1073 return Err(anyhow::anyhow!(err)
1074 .context(format!(
1075 "Failed to get compaction task for compaction_group {}",
1076 compaction_group,
1077 ))
1078 .into());
1079 }
1080 };
1081
1082 let compact_task_string = compact_task_to_string(&compact_task);
1084 compactor
1086 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1087 .with_context(|| {
1088 format!(
1089 "Failed to trigger compaction task for compaction_group {}",
1090 compaction_group,
1091 )
1092 })?;
1093
1094 tracing::info!(
1095 "Trigger manual compaction task. {}. cost time: {:?}",
1096 &compact_task_string,
1097 start_time.elapsed(),
1098 );
1099
1100 Ok(())
1101 }
1102
1103 pub fn try_send_compaction_request(
1105 &self,
1106 compaction_group: CompactionGroupId,
1107 task_type: compact_task::TaskType,
1108 ) -> bool {
1109 self.compaction_state.try_sched_compaction(
1110 compaction_group,
1111 task_type,
1112 ScheduleTrigger::NewData,
1113 )
1114 }
1115
1116 async fn try_apply_vnode_aligned_partition(
1119 &self,
1120 compact_task: &mut CompactTask,
1121 compaction_config: &CompactionConfig,
1122 levels: &Levels,
1123 ) -> Result<bool> {
1124 let Some(threshold) = compaction_config.vnode_aligned_level_size_threshold else {
1127 return Ok(false);
1128 };
1129
1130 if compact_task.target_level < compact_task.base_level
1131 || compact_task.existing_table_ids.len() != 1
1132 {
1133 return Ok(false);
1134 }
1135
1136 let target_level_size = levels
1138 .get_level(compact_task.target_level as usize)
1139 .total_file_size;
1140
1141 if target_level_size < threshold {
1142 return Ok(false);
1143 }
1144
1145 let table_id = compact_task.existing_table_ids[0];
1147
1148 let table = self
1150 .metadata_manager
1151 .get_table_catalog_by_ids(&[table_id])
1152 .await
1153 .with_context(|| {
1154 format!(
1155 "Failed to get table catalog for table_id {} in compaction_group {}",
1156 table_id, compact_task.compaction_group_id
1157 )
1158 })
1159 .map_err(Error::Internal)?
1160 .into_iter()
1161 .next()
1162 .ok_or_else(|| {
1163 Error::Internal(anyhow::anyhow!(
1164 "Table catalog not found for table_id {} in compaction_group {}",
1165 table_id,
1166 compact_task.compaction_group_id
1167 ))
1168 })?;
1169
1170 compact_task
1171 .table_vnode_partition
1172 .insert(table_id, table.vnode_count() as u32);
1173
1174 Ok(true)
1175 }
1176
1177 fn apply_split_weight_by_vnode_partition(
1180 &self,
1181 compact_task: &mut CompactTask,
1182 compaction_config: &CompactionConfig,
1183 ) {
1184 if compaction_config.split_weight_by_vnode > 0 {
1185 for table_id in &compact_task.existing_table_ids {
1186 compact_task
1187 .table_vnode_partition
1188 .insert(*table_id, compact_task.split_weight_by_vnode);
1189 }
1190
1191 return;
1192 }
1193
1194 let mut table_size_info: HashMap<TableId, u64> = HashMap::default();
1196 let mut existing_table_ids: HashSet<TableId> = HashSet::default();
1197 for input_ssts in &compact_task.input_ssts {
1198 for sst in &input_ssts.table_infos {
1199 existing_table_ids.extend(sst.table_ids.iter());
1200 for table_id in &sst.table_ids {
1201 *table_size_info.entry(*table_id).or_default() +=
1202 sst.sst_size / (sst.table_ids.len() as u64);
1203 }
1204 }
1205 }
1206 compact_task
1207 .existing_table_ids
1208 .retain(|table_id| existing_table_ids.contains(table_id));
1209
1210 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1211 let default_partition_count = self.env.opts.partition_vnode_count;
1212 let compact_task_table_size_partition_threshold_low = self
1213 .env
1214 .opts
1215 .compact_task_table_size_partition_threshold_low;
1216 let compact_task_table_size_partition_threshold_high = self
1217 .env
1218 .opts
1219 .compact_task_table_size_partition_threshold_high;
1220
1221 let table_write_throughput_statistic_manager =
1223 self.table_write_throughput_statistic_manager.read();
1224 let timestamp = chrono::Utc::now().timestamp();
1225
1226 for (table_id, compact_table_size) in table_size_info {
1227 let write_throughput = table_write_throughput_statistic_manager
1228 .get_table_throughput_descending(table_id, timestamp)
1229 .peekable()
1230 .peek()
1231 .map(|item| item.throughput)
1232 .unwrap_or(0);
1233
1234 if compact_table_size > compact_task_table_size_partition_threshold_high
1235 && default_partition_count > 0
1236 {
1237 compact_task
1238 .table_vnode_partition
1239 .insert(table_id, default_partition_count);
1240 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1241 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1242 && compact_table_size > compaction_config.target_file_size_base))
1243 && hybrid_vnode_count > 0
1244 {
1245 compact_task
1246 .table_vnode_partition
1247 .insert(table_id, hybrid_vnode_count);
1248 } else if compact_table_size > compaction_config.target_file_size_base {
1249 compact_task.table_vnode_partition.insert(table_id, 1);
1250 }
1251 }
1252
1253 compact_task
1254 .table_vnode_partition
1255 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1256 }
1257
1258 pub(crate) async fn calculate_vnode_partition(
1259 &self,
1260 compact_task: &mut CompactTask,
1261 compaction_config: &CompactionConfig,
1262 levels: &Levels,
1263 ) -> Result<()> {
1264 if self
1266 .try_apply_vnode_aligned_partition(compact_task, compaction_config, levels)
1267 .await?
1268 {
1269 return Ok(());
1270 }
1271
1272 if compact_task.target_level > compact_task.base_level {
1277 return Ok(());
1278 }
1279
1280 self.apply_split_weight_by_vnode_partition(compact_task, compaction_config);
1282
1283 Ok(())
1284 }
1285
1286 pub fn compactor_manager_ref(&self) -> crate::hummock::CompactorManagerRef {
1287 self.compactor_manager.clone()
1288 }
1289}
1290
1291#[cfg(any(test, feature = "test"))]
1292impl HummockManager {
1293 pub async fn compaction_task_from_assignment_for_test(
1294 &self,
1295 task_id: u64,
1296 ) -> Option<CompactTaskAssignment> {
1297 let compaction_guard = self.compaction.read().await;
1298 let assignment_ref = &compaction_guard.compact_task_assignment;
1299 assignment_ref.get(&task_id).cloned()
1300 }
1301
1302 pub async fn report_compact_task_for_test(
1303 &self,
1304 task_id: u64,
1305 compact_task: Option<CompactTask>,
1306 task_status: TaskStatus,
1307 sorted_output_ssts: Vec<SstableInfo>,
1308 table_stats_change: Option<PbTableStatsMap>,
1309 ) -> Result<()> {
1310 if let Some(task) = compact_task {
1311 let mut guard = self.compaction.write().await;
1312 guard.compact_task_assignment.insert(
1313 task_id,
1314 CompactTaskAssignment {
1315 compact_task: Some(task.into()),
1316 context_id: 0.into(),
1317 },
1318 );
1319 }
1320
1321 self.report_compact_tasks(vec![ReportTask {
1324 task_id,
1325 task_status,
1326 sorted_output_ssts,
1327 table_stats_change: table_stats_change.unwrap_or_default(),
1328 object_timestamps: HashMap::default(),
1329 }])
1330 .await?;
1331 Ok(())
1332 }
1333}
1334
1335#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337pub enum ScheduleTrigger {
1338 NewData,
1340 Periodic,
1342}
1343
1344pub struct CompactionScheduleSnapshot {
1349 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1350 snapshot_time: Instant,
1351}
1352
1353impl CompactionScheduleSnapshot {
1354 const TASK_TYPE_PRIORITY: &[TaskType] = &[
1356 TaskType::Dynamic,
1357 TaskType::SpaceReclaim,
1358 TaskType::Ttl,
1359 TaskType::Tombstone,
1360 TaskType::VnodeWatermark,
1361 ];
1362
1363 pub fn snapshot_time(&self) -> Instant {
1364 self.snapshot_time
1365 }
1366
1367 pub fn pick_compaction_groups_and_type(&self) -> Option<(Vec<CompactionGroupId>, TaskType)> {
1372 let group_ids = self.group_ids_shuffled();
1373 let mut normal_groups = vec![];
1374 for cg_id in group_ids {
1375 if let Some(pick_type) = self.pick_type(cg_id) {
1376 if pick_type == TaskType::Dynamic {
1377 normal_groups.push(cg_id);
1378 } else if normal_groups.is_empty() {
1379 return Some((vec![cg_id], pick_type));
1380 }
1381 }
1382 }
1383 if normal_groups.is_empty() {
1384 None
1385 } else {
1386 Some((normal_groups, TaskType::Dynamic))
1387 }
1388 }
1389
1390 fn group_ids_shuffled(&self) -> Vec<CompactionGroupId> {
1391 let mut group_ids: Vec<_> = self.scheduled.iter().map(|(g, _)| *g).unique().collect();
1392 group_ids.shuffle(&mut thread_rng());
1393 group_ids
1394 }
1395
1396 fn pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1397 Self::TASK_TYPE_PRIORITY
1398 .iter()
1399 .find(|t| self.scheduled.contains(&(group, **t)))
1400 .copied()
1401 }
1402}
1403
1404#[derive(Debug, Default)]
1409pub struct CompactionState {
1410 inner: Mutex<CompactionStateInner>,
1411}
1412
1413#[derive(Debug, Default)]
1414struct CompactionStateInner {
1415 scheduled: HashSet<(CompactionGroupId, compact_task::TaskType)>,
1416 dynamic_cooldown: HashSet<CompactionGroupId>,
1418 last_new_data_time: HashMap<CompactionGroupId, Instant>,
1420}
1421
1422impl CompactionState {
1423 pub fn new() -> Self {
1424 Self {
1425 inner: Default::default(),
1426 }
1427 }
1428
1429 pub fn try_sched_compaction(
1433 &self,
1434 compaction_group: CompactionGroupId,
1435 task_type: TaskType,
1436 trigger: ScheduleTrigger,
1437 ) -> bool {
1438 let mut guard = self.inner.lock();
1439 if task_type == TaskType::Dynamic {
1440 match trigger {
1441 ScheduleTrigger::NewData => {
1442 guard.dynamic_cooldown.remove(&compaction_group);
1443 guard
1444 .last_new_data_time
1445 .insert(compaction_group, Instant::now());
1446 }
1447 ScheduleTrigger::Periodic => {
1448 if guard.dynamic_cooldown.contains(&compaction_group) {
1449 return false;
1450 }
1451 }
1452 }
1453 }
1454 guard.scheduled.insert((compaction_group, task_type))
1455 }
1456
1457 pub fn unschedule(
1460 &self,
1461 compaction_group: CompactionGroupId,
1462 task_type: compact_task::TaskType,
1463 snapshot_time: Instant,
1464 ) {
1465 let mut guard = self.inner.lock();
1466 guard.scheduled.remove(&(compaction_group, task_type));
1467 if task_type == TaskType::Dynamic {
1468 let has_new_data = guard
1469 .last_new_data_time
1470 .get(&compaction_group)
1471 .is_some_and(|t| *t > snapshot_time);
1472 if !has_new_data {
1473 guard.dynamic_cooldown.insert(compaction_group);
1474 }
1475 }
1476 }
1477
1478 pub fn snapshot(&self) -> CompactionScheduleSnapshot {
1480 let guard = self.inner.lock();
1481 let snapshot_time = Instant::now();
1483 CompactionScheduleSnapshot {
1484 scheduled: guard.scheduled.clone(),
1485 snapshot_time,
1486 }
1487 }
1488
1489 pub fn remove_compaction_group(&self, compaction_group: CompactionGroupId) {
1491 let mut guard = self.inner.lock();
1492 guard
1493 .scheduled
1494 .retain(|(group, _)| *group != compaction_group);
1495 guard.dynamic_cooldown.remove(&compaction_group);
1496 guard.last_new_data_time.remove(&compaction_group);
1497 }
1498}
1499
1500impl Compaction {
1501 pub fn get_compact_task_assignments_by_group_id(
1502 &self,
1503 compaction_group_id: CompactionGroupId,
1504 ) -> Vec<CompactTaskAssignment> {
1505 self.compact_task_assignment
1506 .iter()
1507 .filter_map(|(_, assignment)| {
1508 if assignment
1509 .compact_task
1510 .as_ref()
1511 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1512 {
1513 Some(CompactTaskAssignment {
1514 compact_task: assignment.compact_task.clone(),
1515 context_id: assignment.context_id,
1516 })
1517 } else {
1518 None
1519 }
1520 })
1521 .collect()
1522 }
1523}
1524
1525#[derive(Clone, Default)]
1526pub struct CompactionGroupStatistic {
1527 pub group_id: CompactionGroupId,
1528 pub group_size: u64,
1529 pub table_statistic: BTreeMap<StateTableId, u64>,
1530 pub compaction_group_config: CompactionGroup,
1531}
1532
1533fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1535 table_stats: &mut PbTableStatsMap,
1536 task: &CompactTask,
1537) {
1538 if task.task_type != TaskType::VnodeWatermark {
1539 return;
1540 }
1541 let mut deleted_table_keys: HashMap<TableId, u64> = HashMap::default();
1542 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1543 assert_eq!(s.table_ids.len(), 1);
1544 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1545 *e += s.total_key_count;
1546 }
1547 for (table_id, delete_count) in deleted_table_keys {
1548 let Some(stats) = table_stats.get_mut(&table_id) else {
1549 continue;
1550 };
1551 if stats.total_key_count == 0 {
1552 continue;
1553 }
1554 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1555 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1556 stats.total_key_count = new_total_key_count;
1558 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1560 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1561 }
1562}
1563
1564#[derive(Debug, Clone)]
1565pub enum GroupState {
1566 Normal,
1568
1569 Emergency(String), WriteStop(String), }
1575
1576impl GroupState {
1577 pub fn is_write_stop(&self) -> bool {
1578 matches!(self, Self::WriteStop(_))
1579 }
1580
1581 pub fn is_emergency(&self) -> bool {
1582 matches!(self, Self::Emergency(_))
1583 }
1584
1585 pub fn reason(&self) -> Option<&str> {
1586 match self {
1587 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1588 _ => None,
1589 }
1590 }
1591}
1592
1593#[derive(Clone, Default)]
1594pub struct GroupStateValidator;
1595
1596impl GroupStateValidator {
1597 pub fn write_stop_sub_level_count(
1598 level_count: usize,
1599 compaction_config: &CompactionConfig,
1600 ) -> bool {
1601 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1602 level_count > threshold
1603 }
1604
1605 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1606 l0_size
1607 > compaction_config
1608 .level0_stop_write_threshold_max_size
1609 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1610 }
1611
1612 pub fn write_stop_l0_file_count(
1613 l0_file_count: usize,
1614 compaction_config: &CompactionConfig,
1615 ) -> bool {
1616 l0_file_count
1617 > compaction_config
1618 .level0_stop_write_threshold_max_sst_count
1619 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1620 as usize
1621 }
1622
1623 pub fn emergency_l0_file_count(
1624 l0_file_count: usize,
1625 compaction_config: &CompactionConfig,
1626 ) -> bool {
1627 l0_file_count
1628 > compaction_config
1629 .emergency_level0_sst_file_count
1630 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1631 as usize
1632 }
1633
1634 pub fn emergency_l0_partition_count(
1635 last_l0_sub_level_partition_count: usize,
1636 compaction_config: &CompactionConfig,
1637 ) -> bool {
1638 last_l0_sub_level_partition_count
1639 > compaction_config
1640 .emergency_level0_sub_level_partition
1641 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1642 as usize
1643 }
1644
1645 pub fn check_single_group_write_stop(
1646 levels: &Levels,
1647 compaction_config: &CompactionConfig,
1648 ) -> GroupState {
1649 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1650 return GroupState::WriteStop(format!(
1651 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1652 levels.l0.sub_levels.len(),
1653 compaction_config.level0_stop_write_threshold_sub_level_number
1654 ));
1655 }
1656
1657 if Self::write_stop_l0_file_count(
1658 levels
1659 .l0
1660 .sub_levels
1661 .iter()
1662 .map(|l| l.table_infos.len())
1663 .sum(),
1664 compaction_config,
1665 ) {
1666 return GroupState::WriteStop(format!(
1667 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1668 levels
1669 .l0
1670 .sub_levels
1671 .iter()
1672 .map(|l| l.table_infos.len())
1673 .sum::<usize>(),
1674 compaction_config
1675 .level0_stop_write_threshold_max_sst_count
1676 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1677 ));
1678 }
1679
1680 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1681 return GroupState::WriteStop(format!(
1682 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1683 levels.l0.total_file_size,
1684 compaction_config
1685 .level0_stop_write_threshold_max_size
1686 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1687 ));
1688 }
1689
1690 GroupState::Normal
1691 }
1692
1693 pub fn check_single_group_emergency(
1694 levels: &Levels,
1695 compaction_config: &CompactionConfig,
1696 ) -> GroupState {
1697 if Self::emergency_l0_file_count(
1698 levels
1699 .l0
1700 .sub_levels
1701 .iter()
1702 .map(|l| l.table_infos.len())
1703 .sum(),
1704 compaction_config,
1705 ) {
1706 return GroupState::Emergency(format!(
1707 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1708 levels
1709 .l0
1710 .sub_levels
1711 .iter()
1712 .map(|l| l.table_infos.len())
1713 .sum::<usize>(),
1714 compaction_config
1715 .emergency_level0_sst_file_count
1716 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1717 ));
1718 }
1719
1720 if Self::emergency_l0_partition_count(
1721 levels
1722 .l0
1723 .sub_levels
1724 .first()
1725 .map(|l| l.table_infos.len())
1726 .unwrap_or(0),
1727 compaction_config,
1728 ) {
1729 return GroupState::Emergency(format!(
1730 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1731 levels
1732 .l0
1733 .sub_levels
1734 .first()
1735 .map(|l| l.table_infos.len())
1736 .unwrap_or(0),
1737 compaction_config
1738 .emergency_level0_sub_level_partition
1739 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1740 ));
1741 }
1742
1743 GroupState::Normal
1744 }
1745
1746 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1747 let state = Self::check_single_group_write_stop(levels, compaction_config);
1748 if state.is_write_stop() {
1749 return state;
1750 }
1751
1752 Self::check_single_group_emergency(levels, compaction_config)
1753 }
1754}
1755
1756#[cfg(test)]
1757mod compaction_state_tests {
1758 use risingwave_pb::hummock::compact_task::TaskType;
1759
1760 use super::*;
1761
1762 #[test]
1763 fn test_basic_schedule_and_unschedule() {
1764 let state = CompactionState::new();
1765 let group_id: CompactionGroupId = 1.into();
1766
1767 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1769 assert!(!state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1771 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1773
1774 let snapshot = state.snapshot();
1776 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Dynamic)));
1777 assert!(snapshot.scheduled.contains(&(group_id, TaskType::Ttl)));
1778
1779 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1781 let snapshot2 = state.snapshot();
1782 assert!(!snapshot2.scheduled.contains(&(group_id, TaskType::Dynamic)));
1783 assert!(snapshot2.scheduled.contains(&(group_id, TaskType::Ttl)));
1784 }
1785
1786 #[test]
1787 fn test_cooldown_blocks_periodic_trigger() {
1788 let state = CompactionState::new();
1789 let group_id: CompactionGroupId = 1.into();
1790
1791 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1793 let snapshot = state.snapshot();
1794 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1795
1796 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1798
1799 assert!(!state.try_sched_compaction(
1801 group_id,
1802 TaskType::Dynamic,
1803 ScheduleTrigger::Periodic
1804 ));
1805 }
1806
1807 #[test]
1808 fn test_new_data_clears_cooldown() {
1809 let state = CompactionState::new();
1810 let group_id: CompactionGroupId = 1.into();
1811
1812 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1814 let snapshot = state.snapshot();
1815 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1816 assert!(state.inner.lock().dynamic_cooldown.contains(&group_id));
1817
1818 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1820 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id));
1821 }
1822
1823 #[test]
1824 fn test_cooldown_only_affects_dynamic_type() {
1825 let state = CompactionState::new();
1826 let group_id: CompactionGroupId = 1.into();
1827
1828 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1830 let snapshot = state.snapshot();
1831 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1832
1833 let group_id_2: CompactionGroupId = 2.into();
1835 assert!(state.try_sched_compaction(group_id_2, TaskType::Ttl, ScheduleTrigger::Periodic));
1836 let snapshot2 = state.snapshot();
1837 state.unschedule(group_id_2, TaskType::Ttl, snapshot2.snapshot_time());
1838 assert!(!state.inner.lock().dynamic_cooldown.contains(&group_id_2));
1839
1840 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1842 assert!(state.try_sched_compaction(
1843 group_id,
1844 TaskType::SpaceReclaim,
1845 ScheduleTrigger::Periodic
1846 ));
1847 }
1848
1849 #[test]
1850 fn test_race_condition_new_data_after_snapshot() {
1851 let state = CompactionState::new();
1852 let group_id: CompactionGroupId = 1.into();
1853
1854 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1855 let snapshot = state.snapshot();
1856
1857 {
1859 let mut guard = state.inner.lock();
1860 guard.last_new_data_time.insert(group_id, Instant::now());
1861 }
1862
1863 state.unschedule(group_id, TaskType::Dynamic, snapshot.snapshot_time());
1865 assert!(
1866 !state.inner.lock().dynamic_cooldown.contains(&group_id),
1867 "Should skip cooldown when new data arrived after snapshot"
1868 );
1869 }
1870
1871 #[test]
1872 fn test_remove_compaction_group_cleans_all_state() {
1873 let state = CompactionState::new();
1874 let group_id: CompactionGroupId = 1.into();
1875
1876 assert!(state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData));
1878 assert!(state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic));
1879 state.inner.lock().dynamic_cooldown.insert(group_id);
1880
1881 state.remove_compaction_group(group_id);
1883
1884 let guard = state.inner.lock();
1886 assert!(!guard.scheduled.contains(&(group_id, TaskType::Dynamic)));
1887 assert!(!guard.scheduled.contains(&(group_id, TaskType::Ttl)));
1888 assert!(!guard.dynamic_cooldown.contains(&group_id));
1889 assert!(!guard.last_new_data_time.contains_key(&group_id));
1890 }
1891
1892 #[test]
1893 fn test_snapshot_pick_type_priority() {
1894 let state = CompactionState::new();
1895 let group_id: CompactionGroupId = 1.into();
1896
1897 assert_eq!(state.snapshot().pick_type(group_id), None);
1899
1900 state.try_sched_compaction(
1902 group_id,
1903 TaskType::VnodeWatermark,
1904 ScheduleTrigger::Periodic,
1905 );
1906 assert_eq!(
1907 state.snapshot().pick_type(group_id),
1908 Some(TaskType::VnodeWatermark)
1909 );
1910
1911 state.try_sched_compaction(group_id, TaskType::Tombstone, ScheduleTrigger::Periodic);
1912 assert_eq!(
1913 state.snapshot().pick_type(group_id),
1914 Some(TaskType::Tombstone)
1915 );
1916
1917 state.try_sched_compaction(group_id, TaskType::Ttl, ScheduleTrigger::Periodic);
1918 assert_eq!(state.snapshot().pick_type(group_id), Some(TaskType::Ttl));
1919
1920 state.try_sched_compaction(group_id, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
1921 assert_eq!(
1922 state.snapshot().pick_type(group_id),
1923 Some(TaskType::SpaceReclaim)
1924 );
1925
1926 state.try_sched_compaction(group_id, TaskType::Dynamic, ScheduleTrigger::NewData);
1927 assert_eq!(
1928 state.snapshot().pick_type(group_id),
1929 Some(TaskType::Dynamic)
1930 );
1931 }
1932
1933 #[test]
1934 fn test_multiple_groups_independent_cooldown() {
1935 let state = CompactionState::new();
1936 let g1: CompactionGroupId = 1.into();
1937 let g2: CompactionGroupId = 2.into();
1938
1939 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1940 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1941 let snapshot = state.snapshot();
1942
1943 state.unschedule(g1, TaskType::Dynamic, snapshot.snapshot_time());
1945
1946 let guard = state.inner.lock();
1947 assert!(guard.dynamic_cooldown.contains(&g1));
1948 assert!(!guard.dynamic_cooldown.contains(&g2));
1949 }
1950
1951 #[test]
1952 fn test_pick_compaction_groups_empty() {
1953 let state = CompactionState::new();
1954 let snapshot = state.snapshot();
1955 assert!(snapshot.pick_compaction_groups_and_type().is_none());
1957 }
1958
1959 #[test]
1960 fn test_pick_compaction_groups_mixed_types() {
1961 let state = CompactionState::new();
1962 let g1: CompactionGroupId = 1.into();
1963 let g2: CompactionGroupId = 2.into();
1964 let g3: CompactionGroupId = 3.into();
1965
1966 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1968 state.try_sched_compaction(g2, TaskType::Ttl, ScheduleTrigger::Periodic);
1969 state.try_sched_compaction(g3, TaskType::Dynamic, ScheduleTrigger::NewData);
1970
1971 let snapshot = state.snapshot();
1972 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
1973
1974 if task_type == TaskType::Dynamic {
1979 assert!(groups.contains(&g1));
1980 assert!(groups.contains(&g3));
1981 assert!(!groups.contains(&g2)); } else {
1983 assert_eq!(task_type, TaskType::Ttl);
1984 assert_eq!(groups, vec![g2]);
1985 }
1986 }
1987
1988 #[test]
1989 fn test_pick_compaction_groups_all_dynamic() {
1990 let state = CompactionState::new();
1991 let g1: CompactionGroupId = 1.into();
1992 let g2: CompactionGroupId = 2.into();
1993
1994 state.try_sched_compaction(g1, TaskType::Dynamic, ScheduleTrigger::NewData);
1995 state.try_sched_compaction(g2, TaskType::Dynamic, ScheduleTrigger::NewData);
1996
1997 let snapshot = state.snapshot();
1998 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
1999 assert_eq!(task_type, TaskType::Dynamic);
2000 assert!(groups.contains(&g1));
2001 assert!(groups.contains(&g2));
2002 }
2003
2004 #[test]
2005 fn test_pick_compaction_groups_single_non_dynamic() {
2006 let state = CompactionState::new();
2007 let g1: CompactionGroupId = 1.into();
2008
2009 state.try_sched_compaction(g1, TaskType::SpaceReclaim, ScheduleTrigger::Periodic);
2010
2011 let snapshot = state.snapshot();
2012 let (groups, task_type) = snapshot.pick_compaction_groups_and_type().unwrap();
2013 assert_eq!(task_type, TaskType::SpaceReclaim);
2014 assert_eq!(groups, vec![g1]);
2015 }
2016}