1use std::collections::{BTreeMap, HashMap, HashSet};
30use std::sync::{Arc, LazyLock};
31use std::time::{Instant, SystemTime};
32
33use anyhow::Context;
34use fail::fail_point;
35use futures::future::Shared;
36use futures::stream::FuturesUnordered;
37use futures::{FutureExt, StreamExt};
38use itertools::Itertools;
39use parking_lot::Mutex;
40use rand::rng as thread_rng;
41use rand::seq::SliceRandom;
42use risingwave_common::config::default::compaction_config;
43use risingwave_common::util::epoch::Epoch;
44use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
45use risingwave_hummock_sdk::compaction_group::StateTableId;
46use risingwave_hummock_sdk::key_range::KeyRange;
47use risingwave_hummock_sdk::level::Levels;
48use risingwave_hummock_sdk::sstable_info::SstableInfo;
49use risingwave_hummock_sdk::table_stats::{
50 PbTableStatsMap, add_prost_table_stats_map, purge_prost_table_stats,
51};
52use risingwave_hummock_sdk::table_watermark::WatermarkSerdeType;
53use risingwave_hummock_sdk::version::{GroupDelta, IntraLevelDelta};
54use risingwave_hummock_sdk::{
55 CompactionGroupId, HummockCompactionTaskId, HummockSstableObjectId, HummockVersionId,
56 compact_task_to_string, statistics_compact_task,
57};
58use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType};
59use risingwave_pb::hummock::subscribe_compaction_event_request::{
60 self, Event as RequestEvent, HeartBeat, PullTask,
61};
62use risingwave_pb::hummock::subscribe_compaction_event_response::{
63 Event as ResponseEvent, PullTaskAck,
64};
65use risingwave_pb::hummock::{
66 CompactTaskAssignment, CompactionConfig, PbCompactStatus, PbCompactTaskAssignment,
67 SubscribeCompactionEventRequest, TableOption, TableSchema, compact_task,
68};
69use rw_futures_util::pending_on_none;
70use thiserror_ext::AsReport;
71use tokio::sync::RwLockWriteGuard;
72use tokio::sync::mpsc::error::SendError;
73use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
74use tokio::sync::oneshot::{Receiver as OneShotReceiver, Sender};
75use tokio::task::JoinHandle;
76use tonic::Streaming;
77use tracing::warn;
78
79use crate::hummock::compaction::selector::level_selector::PickerInfo;
80use crate::hummock::compaction::selector::{
81 DynamicLevelSelector, DynamicLevelSelectorCore, LocalSelectorStatistic, ManualCompactionOption,
82 ManualCompactionSelector, SpaceReclaimCompactionSelector, TombstoneCompactionSelector,
83 TtlCompactionSelector, VnodeWatermarkCompactionSelector,
84};
85use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
86use crate::hummock::error::{Error, Result};
87use crate::hummock::manager::transaction::{
88 HummockVersionStatsTransaction, HummockVersionTransaction,
89};
90use crate::hummock::manager::versioning::Versioning;
91use crate::hummock::metrics_utils::{
92 build_compact_task_level_type_metrics_label, trigger_compact_tasks_stat,
93 trigger_local_table_stat,
94};
95use crate::hummock::model::CompactionGroup;
96use crate::hummock::sequence::next_compaction_task_id;
97use crate::hummock::{HummockManager, commit_multi_var, start_measure_real_process_timer};
98use crate::manager::META_NODE_ID;
99use crate::model::BTreeMapTransaction;
100
101pub mod compaction_group_manager;
102pub mod compaction_group_schedule;
103
104const MAX_SKIP_TIMES: usize = 8;
105const MAX_REPORT_COUNT: usize = 16;
106
107static CANCEL_STATUS_SET: LazyLock<HashSet<TaskStatus>> = LazyLock::new(|| {
108 [
109 TaskStatus::ManualCanceled,
110 TaskStatus::SendFailCanceled,
111 TaskStatus::AssignFailCanceled,
112 TaskStatus::HeartbeatCanceled,
113 TaskStatus::InvalidGroupCanceled,
114 TaskStatus::NoAvailMemoryResourceCanceled,
115 TaskStatus::NoAvailCpuResourceCanceled,
116 TaskStatus::HeartbeatProgressCanceled,
117 ]
118 .into_iter()
119 .collect()
120});
121
122type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType);
123
124fn init_selectors() -> HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> {
125 let mut compaction_selectors: HashMap<compact_task::TaskType, Box<dyn CompactionSelector>> =
126 HashMap::default();
127 compaction_selectors.insert(
128 compact_task::TaskType::Dynamic,
129 Box::<DynamicLevelSelector>::default(),
130 );
131 compaction_selectors.insert(
132 compact_task::TaskType::SpaceReclaim,
133 Box::<SpaceReclaimCompactionSelector>::default(),
134 );
135 compaction_selectors.insert(
136 compact_task::TaskType::Ttl,
137 Box::<TtlCompactionSelector>::default(),
138 );
139 compaction_selectors.insert(
140 compact_task::TaskType::Tombstone,
141 Box::<TombstoneCompactionSelector>::default(),
142 );
143 compaction_selectors.insert(
144 compact_task::TaskType::VnodeWatermark,
145 Box::<VnodeWatermarkCompactionSelector>::default(),
146 );
147 compaction_selectors
148}
149
150impl HummockVersionTransaction<'_> {
151 fn apply_compact_task(&mut self, compact_task: &CompactTask) {
152 let mut version_delta = self.new_delta();
153 let trivial_move = compact_task.is_trivial_move_task();
154 version_delta.trivial_move = trivial_move;
155
156 let group_deltas = &mut version_delta
157 .group_deltas
158 .entry(compact_task.compaction_group_id)
159 .or_default()
160 .group_deltas;
161 let mut removed_table_ids_map: BTreeMap<u32, HashSet<u64>> = BTreeMap::default();
162
163 for level in &compact_task.input_ssts {
164 let level_idx = level.level_idx;
165
166 removed_table_ids_map
167 .entry(level_idx)
168 .or_default()
169 .extend(level.table_infos.iter().map(|sst| sst.sst_id));
170 }
171
172 for (level_idx, removed_table_ids) in removed_table_ids_map {
173 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
174 level_idx,
175 0, removed_table_ids,
177 vec![], 0, compact_task.compaction_group_version_id,
180 ));
181
182 group_deltas.push(group_delta);
183 }
184
185 let group_delta = GroupDelta::IntraLevel(IntraLevelDelta::new(
186 compact_task.target_level,
187 compact_task.target_sub_level_id,
188 HashSet::new(), compact_task.sorted_output_ssts.clone(),
190 compact_task.split_weight_by_vnode,
191 compact_task.compaction_group_version_id,
192 ));
193
194 group_deltas.push(group_delta);
195 version_delta.pre_apply();
196 }
197}
198
199#[derive(Default)]
200pub struct Compaction {
201 pub compact_task_assignment: BTreeMap<HummockCompactionTaskId, PbCompactTaskAssignment>,
203 pub compaction_statuses: BTreeMap<CompactionGroupId, CompactStatus>,
205
206 pub _deterministic_mode: bool,
207}
208
209impl HummockManager {
210 pub async fn get_assigned_compact_task_num(&self) -> u64 {
211 self.compaction.read().await.compact_task_assignment.len() as u64
212 }
213
214 pub async fn list_compaction_status(
215 &self,
216 ) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
217 let compaction = self.compaction.read().await;
218 (
219 compaction.compaction_statuses.values().map_into().collect(),
220 compaction
221 .compact_task_assignment
222 .values()
223 .cloned()
224 .collect(),
225 )
226 }
227
228 pub async fn get_compaction_scores(
229 &self,
230 compaction_group_id: CompactionGroupId,
231 ) -> Vec<PickerInfo> {
232 let (status, levels, group) = {
233 let compaction = self.compaction.read().await;
234 let versioning = self.versioning.read().await;
235 let config_manager = self.compaction_group_manager.read().await;
236 match (
237 compaction.compaction_statuses.get(&compaction_group_id),
238 versioning.current_version.levels.get(&compaction_group_id),
239 config_manager.try_get_compaction_group_config(compaction_group_id),
240 ) {
241 (Some(cs), Some(v), Some(cf)) => (cs.to_owned(), v.to_owned(), cf),
242 _ => {
243 return vec![];
244 }
245 }
246 };
247 let dynamic_level_core = DynamicLevelSelectorCore::new(
248 group.compaction_config,
249 Arc::new(CompactionDeveloperConfig::default()),
250 );
251 let ctx = dynamic_level_core.get_priority_levels(&levels, &status.level_handlers);
252 ctx.score_levels
253 }
254}
255
256impl HummockManager {
257 async fn handle_pull_task_event(
258 &self,
259 context_id: u32,
260 pull_task_count: usize,
261 compaction_selectors: &mut HashMap<TaskType, Box<dyn CompactionSelector>>,
262 max_get_task_probe_times: usize,
263 ) {
264 assert_ne!(0, pull_task_count);
265 if let Some(compactor) = self.compactor_manager.get_compactor(context_id) {
266 let (groups, task_type) = self.auto_pick_compaction_groups_and_type().await;
267 if let TaskType::Ttl = task_type {
268 match self
269 .metadata_manager
270 .get_all_table_options()
271 .await
272 .map_err(|err| Error::MetaStore(err.into()))
273 {
274 Ok(table_options) => {
275 self.update_table_id_to_table_option(table_options);
276 }
277 Err(err) => {
278 warn!(error = %err.as_report(), "Failed to get table options");
279 }
280 }
281 }
282
283 if !groups.is_empty() {
284 let selector: &mut Box<dyn CompactionSelector> =
285 compaction_selectors.get_mut(&task_type).unwrap();
286
287 let mut generated_task_count = 0;
288 let mut existed_groups = groups.clone();
289 let mut no_task_groups: HashSet<CompactionGroupId> = HashSet::default();
290 let mut failed_tasks = vec![];
291 let mut loop_times = 0;
292
293 while generated_task_count < pull_task_count
294 && failed_tasks.is_empty()
295 && loop_times < max_get_task_probe_times
296 {
297 loop_times += 1;
298 let compact_ret = self
299 .get_compact_tasks(
300 existed_groups.clone(),
301 pull_task_count - generated_task_count,
302 selector,
303 )
304 .await;
305
306 match compact_ret {
307 Ok((compact_tasks, unschedule_groups)) => {
308 no_task_groups.extend(unschedule_groups);
309 if compact_tasks.is_empty() {
310 break;
311 }
312 generated_task_count += compact_tasks.len();
313 for task in compact_tasks {
314 let task_id = task.task_id;
315 if let Err(e) =
316 compactor.send_event(ResponseEvent::CompactTask(task.into()))
317 {
318 tracing::warn!(
319 error = %e.as_report(),
320 "Failed to send task {} to {}",
321 task_id,
322 compactor.context_id(),
323 );
324 failed_tasks.push(task_id);
325 }
326 }
327 if !failed_tasks.is_empty() {
328 self.compactor_manager.remove_compactor(context_id);
329 }
330 existed_groups.retain(|group_id| !no_task_groups.contains(group_id));
331 }
332 Err(err) => {
333 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
334 break;
335 }
336 };
337 }
338 for group in no_task_groups {
339 self.compaction_state.unschedule(group, task_type);
340 }
341 if let Err(err) = self
342 .cancel_compact_tasks(failed_tasks, TaskStatus::SendFailCanceled)
343 .await
344 {
345 tracing::warn!(error = %err.as_report(), "Failed to cancel compaction task");
346 }
347 }
348
349 if let Err(e) = compactor.send_event(ResponseEvent::PullTaskAck(PullTaskAck {})) {
351 tracing::warn!(
352 error = %e.as_report(),
353 "Failed to send ask to {}",
354 context_id,
355 );
356 self.compactor_manager.remove_compactor(context_id);
357 }
358 }
359 }
360
361 pub fn compaction_event_loop(
363 hummock_manager: Arc<Self>,
364 mut compactor_streams_change_rx: UnboundedReceiver<(
365 u32,
366 Streaming<SubscribeCompactionEventRequest>,
367 )>,
368 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
369 let mut compactor_request_streams = FuturesUnordered::new();
370 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
371 let (shutdown_tx_dedicated, shutdown_rx_dedicated) = tokio::sync::oneshot::channel();
372 let shutdown_rx_shared = shutdown_rx.shared();
373 let shutdown_rx_dedicated_shared = shutdown_rx_dedicated.shared();
374
375 let (tx, rx) = unbounded_channel();
376
377 let mut join_handle_vec = Vec::default();
378
379 let hummock_manager_dedicated = hummock_manager.clone();
380 let compact_task_event_handler_join_handle = tokio::spawn(async move {
381 Self::compact_task_dedicated_event_handler(
382 hummock_manager_dedicated,
383 rx,
384 shutdown_rx_dedicated_shared,
385 )
386 .await;
387 });
388
389 join_handle_vec.push((
390 compact_task_event_handler_join_handle,
391 shutdown_tx_dedicated,
392 ));
393
394 let join_handle = tokio::spawn(async move {
395 let push_stream =
396 |context_id: u32,
397 stream: Streaming<SubscribeCompactionEventRequest>,
398 compactor_request_streams: &mut FuturesUnordered<_>| {
399 let future = StreamExt::into_future(stream)
400 .map(move |stream_future| (context_id, stream_future));
401
402 compactor_request_streams.push(future);
403 };
404
405 let mut event_loop_iteration_now = Instant::now();
406
407 loop {
408 let shutdown_rx_shared = shutdown_rx_shared.clone();
409 let hummock_manager = hummock_manager.clone();
410 hummock_manager
411 .metrics
412 .compaction_event_loop_iteration_latency
413 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
414 event_loop_iteration_now = Instant::now();
415
416 tokio::select! {
417 _ = shutdown_rx_shared => { return; },
418
419 compactor_stream = compactor_streams_change_rx.recv() => {
420 if let Some((context_id, stream)) = compactor_stream {
421 tracing::info!("compactor {} enters the cluster", context_id);
422 push_stream(context_id, stream, &mut compactor_request_streams);
423 }
424 },
425
426 result = pending_on_none(compactor_request_streams.next()) => {
427 let mut compactor_alive = true;
428
429 let (context_id, compactor_stream_req): (_, (std::option::Option<std::result::Result<SubscribeCompactionEventRequest, _>>, _)) = result;
430 let (event, create_at, stream) = match compactor_stream_req {
431 (Some(Ok(req)), stream) => {
432 (req.event.unwrap(), req.create_at, stream)
433 }
434
435 (Some(Err(err)), _stream) => {
436 tracing::warn!(error = %err.as_report(), "compactor stream {} poll with err, recv stream may be destroyed", context_id);
437 continue
438 }
439
440 _ => {
441 tracing::warn!("compactor stream {} poll err, recv stream may be destroyed", context_id);
442 continue
443 },
444 };
445
446 {
447 let consumed_latency_ms = SystemTime::now()
448 .duration_since(std::time::UNIX_EPOCH)
449 .expect("Clock may have gone backwards")
450 .as_millis()
451 as u64
452 - create_at;
453 hummock_manager.metrics
454 .compaction_event_consumed_latency
455 .observe(consumed_latency_ms as _);
456 }
457
458 match event {
459 RequestEvent::HeartBeat(HeartBeat {
460 progress,
461 }) => {
462 let compactor_manager = hummock_manager.compactor_manager.clone();
463 let cancel_tasks = compactor_manager.update_task_heartbeats(&progress).into_iter().map(|task|task.task_id).collect::<Vec<_>>();
464 if !cancel_tasks.is_empty() {
465 tracing::info!(
466 ?cancel_tasks,
467 context_id,
468 "Tasks cancel has expired due to lack of visible progress",
469 );
470
471 if let Err(e) = hummock_manager
472 .cancel_compact_tasks(cancel_tasks.clone(), TaskStatus::HeartbeatProgressCanceled)
473 .await
474 {
475 tracing::error!(
476 error = %e.as_report(),
477 "Attempt to remove compaction task due to elapsed heartbeat failed. We will continue to track its heartbeat
478 until we can successfully report its status."
479 );
480 }
481 }
482
483 match compactor_manager.get_compactor(context_id) { Some(compactor) => {
484 if !cancel_tasks.is_empty() {
488 let _ = compactor.cancel_tasks(&cancel_tasks);
489 tracing::info!(
490 ?cancel_tasks,
491 context_id,
492 "CancelTask operation has been sent to compactor node",
493 );
494 }
495 } _ => {
496 compactor_alive = false;
499 }}
500 },
501
502 RequestEvent::Register(_) => {
503 unreachable!()
504 }
505
506 e @ (RequestEvent::PullTask(_) | RequestEvent::ReportTask(_)) => {
507 let _ = tx.send((context_id, e));
508 }
509 }
510
511 if compactor_alive {
512 push_stream(context_id, stream, &mut compactor_request_streams);
513 } else {
514 tracing::warn!(context_id, "compactor stream error, send stream may be destroyed");
515 }
516 },
517 }
518 }
519 });
520
521 join_handle_vec.push((join_handle, shutdown_tx));
522
523 join_handle_vec
524 }
525
526 pub fn add_compactor_stream(
527 &self,
528 context_id: u32,
529 req_stream: Streaming<SubscribeCompactionEventRequest>,
530 ) {
531 self.compactor_streams_change_tx
532 .send((context_id, req_stream))
533 .unwrap();
534 }
535
536 pub async fn auto_pick_compaction_group_and_type(
537 &self,
538 ) -> Option<(CompactionGroupId, compact_task::TaskType)> {
539 let mut compaction_group_ids = self.compaction_group_ids().await;
540 compaction_group_ids.shuffle(&mut thread_rng());
541
542 for cg_id in compaction_group_ids {
543 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
544 return Some((cg_id, pick_type));
545 }
546 }
547
548 None
549 }
550
551 async fn auto_pick_compaction_groups_and_type(
554 &self,
555 ) -> (Vec<CompactionGroupId>, compact_task::TaskType) {
556 let mut compaction_group_ids = self.compaction_group_ids().await;
557 compaction_group_ids.shuffle(&mut thread_rng());
558
559 let mut normal_groups = vec![];
560 for cg_id in compaction_group_ids {
561 if let Some(pick_type) = self.compaction_state.auto_pick_type(cg_id) {
562 if pick_type == TaskType::Dynamic {
563 normal_groups.push(cg_id);
564 } else if normal_groups.is_empty() {
565 return (vec![cg_id], pick_type);
566 }
567 }
568 }
569 (normal_groups, TaskType::Dynamic)
570 }
571
572 async fn compact_task_dedicated_event_handler(
574 hummock_manager: Arc<HummockManager>,
575 mut rx: UnboundedReceiver<(u32, subscribe_compaction_event_request::Event)>,
576 shutdown_rx_shared: Shared<OneShotReceiver<()>>,
577 ) {
578 let mut compaction_selectors = init_selectors();
579
580 tokio::select! {
581 _ = shutdown_rx_shared => {}
582
583 _ = async {
584 while let Some((context_id, event)) = rx.recv().await {
585 let mut report_events = vec![];
586 let mut skip_times = 0;
587 match event {
588 RequestEvent::PullTask(PullTask { pull_task_count }) => {
589 hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, hummock_manager.env.opts.max_get_task_probe_times).await;
590 }
591
592 RequestEvent::ReportTask(task) => {
593 report_events.push(task.into());
594 }
595
596 _ => unreachable!(),
597 }
598 while let Ok((context_id, event)) = rx.try_recv() {
599 match event {
600 RequestEvent::PullTask(PullTask { pull_task_count }) => {
601 hummock_manager.handle_pull_task_event(context_id, pull_task_count as usize, &mut compaction_selectors, hummock_manager.env.opts.max_get_task_probe_times).await;
602 if !report_events.is_empty() {
603 if skip_times > MAX_SKIP_TIMES {
604 break;
605 }
606 skip_times += 1;
607 }
608 }
609
610 RequestEvent::ReportTask(task) => {
611 report_events.push(task.into());
612 if report_events.len() >= MAX_REPORT_COUNT {
613 break;
614 }
615 }
616 _ => unreachable!(),
617 }
618 }
619 if !report_events.is_empty() {
620 if let Err(e) = hummock_manager.report_compact_tasks(report_events).await
621 {
622 tracing::error!(error = %e.as_report(), "report compact_tack fail")
623 }
624 }
625 }
626 } => {}
627 }
628 }
629}
630
631impl HummockManager {
632 pub async fn get_compact_tasks_impl(
633 &self,
634 compaction_groups: Vec<CompactionGroupId>,
635 max_select_count: usize,
636 selector: &mut Box<dyn CompactionSelector>,
637 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
638 let deterministic_mode = self.env.opts.compaction_deterministic_test;
639
640 let mut compaction_guard = self.compaction.write().await;
641 let compaction: &mut Compaction = &mut compaction_guard;
642 let mut versioning_guard = self.versioning.write().await;
643 let versioning: &mut Versioning = &mut versioning_guard;
644
645 let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl");
646
647 let start_time = Instant::now();
648 let mut compaction_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
649
650 let mut compact_task_assignment =
651 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
652
653 let mut version = HummockVersionTransaction::new(
654 &mut versioning.current_version,
655 &mut versioning.hummock_version_deltas,
656 self.env.notification_manager(),
657 None,
658 &self.metrics,
659 );
660 let mut version_stats = HummockVersionStatsTransaction::new(
662 &mut versioning.version_stats,
663 self.env.notification_manager(),
664 );
665
666 if deterministic_mode {
667 version.disable_apply_to_txn();
668 }
669 let all_versioned_table_schemas = if self.env.opts.enable_dropped_column_reclaim {
670 self.metadata_manager
671 .catalog_controller
672 .get_versioned_table_schemas()
673 .await
674 .map_err(|e| Error::Internal(e.into()))?
675 } else {
676 HashMap::default()
677 };
678 let mut unschedule_groups = vec![];
679 let mut trivial_tasks = vec![];
680 let mut pick_tasks = vec![];
681 let developer_config = Arc::new(CompactionDeveloperConfig::new_from_meta_opts(
682 &self.env.opts,
683 ));
684 'outside: for compaction_group_id in compaction_groups {
685 if pick_tasks.len() >= max_select_count {
686 break;
687 }
688
689 if !version
690 .latest_version()
691 .levels
692 .contains_key(&compaction_group_id)
693 {
694 continue;
695 }
696
697 let group_config = {
701 let config_manager = self.compaction_group_manager.read().await;
702
703 match config_manager.try_get_compaction_group_config(compaction_group_id) {
704 Some(config) => config,
705 None => continue,
706 }
707 };
708
709 let task_id = next_compaction_task_id(&self.env).await?;
711
712 if !compaction_statuses.contains_key(&compaction_group_id) {
713 compaction_statuses.insert(
715 compaction_group_id,
716 CompactStatus::new(
717 compaction_group_id,
718 group_config.compaction_config.max_level,
719 ),
720 );
721 }
722 let mut compact_status = compaction_statuses.get_mut(compaction_group_id).unwrap();
723
724 let can_trivial_move = matches!(selector.task_type(), TaskType::Dynamic)
725 || matches!(selector.task_type(), TaskType::Emergency);
726
727 let mut stats = LocalSelectorStatistic::default();
728 let member_table_ids: Vec<_> = version
729 .latest_version()
730 .state_table_info
731 .compaction_group_member_table_ids(compaction_group_id)
732 .iter()
733 .map(|table_id| table_id.table_id)
734 .collect();
735
736 let mut table_id_to_option: HashMap<u32, _> = HashMap::default();
737
738 {
739 let guard = self.table_id_to_table_option.read();
740 for table_id in &member_table_ids {
741 if let Some(opts) = guard.get(table_id) {
742 table_id_to_option.insert(*table_id, *opts);
743 }
744 }
745 }
746
747 while let Some(compact_task) = compact_status.get_compact_task(
748 version
749 .latest_version()
750 .get_compaction_group_levels(compaction_group_id),
751 version
752 .latest_version()
753 .state_table_info
754 .compaction_group_member_table_ids(compaction_group_id),
755 task_id as HummockCompactionTaskId,
756 &group_config,
757 &mut stats,
758 selector,
759 &table_id_to_option,
760 developer_config.clone(),
761 &version.latest_version().table_watermarks,
762 &version.latest_version().state_table_info,
763 ) {
764 let target_level_id = compact_task.input.target_level as u32;
765 let compaction_group_version_id = version
766 .latest_version()
767 .get_compaction_group_levels(compaction_group_id)
768 .compaction_group_version_id;
769 let compression_algorithm = match compact_task.compression_algorithm.as_str() {
770 "Lz4" => 1,
771 "Zstd" => 2,
772 _ => 0,
773 };
774 let vnode_partition_count = compact_task.input.vnode_partition_count;
775 let mut compact_task = CompactTask {
776 input_ssts: compact_task.input.input_levels,
777 splits: vec![KeyRange::inf()],
778 sorted_output_ssts: vec![],
779 task_id,
780 target_level: target_level_id,
781 gc_delete_keys: version
784 .latest_version()
785 .get_compaction_group_levels(compaction_group_id)
786 .is_last_level(target_level_id),
787 base_level: compact_task.base_level as u32,
788 task_status: TaskStatus::Pending,
789 compaction_group_id: group_config.group_id,
790 compaction_group_version_id,
791 existing_table_ids: member_table_ids.clone(),
792 compression_algorithm,
793 target_file_size: compact_task.target_file_size,
794 table_options: table_id_to_option
795 .iter()
796 .map(|(table_id, table_option)| {
797 (*table_id, TableOption::from(table_option))
798 })
799 .collect(),
800 current_epoch_time: Epoch::now().0,
801 compaction_filter_mask: group_config.compaction_config.compaction_filter_mask,
802 target_sub_level_id: compact_task.input.target_sub_level_id,
803 task_type: compact_task.compaction_task_type,
804 split_weight_by_vnode: vnode_partition_count,
805 max_sub_compaction: group_config.compaction_config.max_sub_compaction,
806 ..Default::default()
807 };
808
809 let is_trivial_reclaim = compact_task.is_trivial_reclaim();
810 let is_trivial_move = compact_task.is_trivial_move_task();
811 if is_trivial_reclaim || (is_trivial_move && can_trivial_move) {
812 let log_label = if is_trivial_reclaim {
813 "TrivialReclaim"
814 } else {
815 "TrivialMove"
816 };
817 let label = if is_trivial_reclaim {
818 "trivial-space-reclaim"
819 } else {
820 "trivial-move"
821 };
822
823 tracing::debug!(
824 "{} for compaction group {}: input: {:?}, cost time: {:?}",
825 log_label,
826 compact_task.compaction_group_id,
827 compact_task.input_ssts,
828 start_time.elapsed()
829 );
830 compact_task.task_status = TaskStatus::Success;
831 compact_status.report_compact_task(&compact_task);
832 if !is_trivial_reclaim {
833 compact_task
834 .sorted_output_ssts
835 .clone_from(&compact_task.input_ssts[0].table_infos);
836 }
837 update_table_stats_for_vnode_watermark_trivial_reclaim(
838 &mut version_stats.table_stats,
839 &compact_task,
840 );
841 self.metrics
842 .compact_frequency
843 .with_label_values(&[
844 label,
845 &compact_task.compaction_group_id.to_string(),
846 selector.task_type().as_str_name(),
847 "SUCCESS",
848 ])
849 .inc();
850
851 version.apply_compact_task(&compact_task);
852 trivial_tasks.push(compact_task);
853 if trivial_tasks.len() >= self.env.opts.max_trivial_move_task_count_per_loop {
854 break 'outside;
855 }
856 } else {
857 self.calculate_vnode_partition(
858 &mut compact_task,
859 group_config.compaction_config.as_ref(),
860 );
861 let (pk_prefix_table_watermarks, non_pk_prefix_table_watermarks) = version
862 .latest_version()
863 .safe_epoch_table_watermarks(&compact_task.existing_table_ids)
864 .into_iter()
865 .partition(|(_table_id, table_watermarke)| {
866 matches!(
867 table_watermarke.watermark_type,
868 WatermarkSerdeType::PkPrefix
869 )
870 });
871
872 compact_task.pk_prefix_table_watermarks = pk_prefix_table_watermarks;
873 compact_task.non_pk_prefix_table_watermarks = non_pk_prefix_table_watermarks;
874
875 compact_task.table_schemas = compact_task
876 .existing_table_ids
877 .iter()
878 .filter_map(|table_id| {
879 let id = (*table_id).try_into().unwrap();
880 all_versioned_table_schemas.get(&id).map(|column_ids| {
881 (
882 *table_id,
883 TableSchema {
884 column_ids: column_ids.clone(),
885 },
886 )
887 })
888 })
889 .collect();
890
891 compact_task_assignment.insert(
892 compact_task.task_id,
893 CompactTaskAssignment {
894 compact_task: Some(compact_task.clone().into()),
895 context_id: META_NODE_ID, },
897 );
898
899 pick_tasks.push(compact_task);
900 break;
901 }
902
903 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
904 stats = LocalSelectorStatistic::default();
905 }
906 if pick_tasks
907 .last()
908 .map(|task| task.compaction_group_id != compaction_group_id)
909 .unwrap_or(true)
910 {
911 unschedule_groups.push(compaction_group_id);
912 }
913 stats.report_to_metrics(compaction_group_id, self.metrics.as_ref());
914 }
915
916 if !trivial_tasks.is_empty() {
917 commit_multi_var!(
918 self.meta_store_ref(),
919 compaction_statuses,
920 compact_task_assignment,
921 version,
922 version_stats
923 )?;
924 self.metrics
925 .compact_task_batch_count
926 .with_label_values(&["batch_trivial_move"])
927 .observe(trivial_tasks.len() as f64);
928
929 for trivial_task in &trivial_tasks {
930 self.metrics
931 .compact_task_trivial_move_sst_count
932 .with_label_values(&[&trivial_task.compaction_group_id.to_string()])
933 .observe(trivial_task.input_ssts[0].table_infos.len() as _);
934 }
935
936 drop(versioning_guard);
937 } else {
938 drop(versioning_guard);
941 commit_multi_var!(
942 self.meta_store_ref(),
943 compaction_statuses,
944 compact_task_assignment
945 )?;
946 }
947 drop(compaction_guard);
948 if !pick_tasks.is_empty() {
949 self.metrics
950 .compact_task_batch_count
951 .with_label_values(&["batch_get_compact_task"])
952 .observe(pick_tasks.len() as f64);
953 }
954
955 for compact_task in &mut pick_tasks {
956 let compaction_group_id = compact_task.compaction_group_id;
957
958 self.compactor_manager
960 .initiate_task_heartbeat(compact_task.clone());
961
962 compact_task.task_status = TaskStatus::Pending;
964 let compact_task_statistics = statistics_compact_task(compact_task);
965
966 let level_type_label = build_compact_task_level_type_metrics_label(
967 compact_task.input_ssts[0].level_idx as usize,
968 compact_task.input_ssts.last().unwrap().level_idx as usize,
969 );
970
971 let level_count = compact_task.input_ssts.len();
972 if compact_task.input_ssts[0].level_idx == 0 {
973 self.metrics
974 .l0_compact_level_count
975 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
976 .observe(level_count as _);
977 }
978
979 self.metrics
980 .compact_task_size
981 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
982 .observe(compact_task_statistics.total_file_size as _);
983
984 self.metrics
985 .compact_task_size
986 .with_label_values(&[
987 &compaction_group_id.to_string(),
988 &format!("{} uncompressed", level_type_label),
989 ])
990 .observe(compact_task_statistics.total_uncompressed_file_size as _);
991
992 self.metrics
993 .compact_task_file_count
994 .with_label_values(&[&compaction_group_id.to_string(), &level_type_label])
995 .observe(compact_task_statistics.total_file_count as _);
996
997 tracing::trace!(
998 "For compaction group {}: pick up {} {} sub_level in level {} to compact to target {}. cost time: {:?} compact_task_statistics {:?}",
999 compaction_group_id,
1000 level_count,
1001 compact_task.input_ssts[0].level_type.as_str_name(),
1002 compact_task.input_ssts[0].level_idx,
1003 compact_task.target_level,
1004 start_time.elapsed(),
1005 compact_task_statistics
1006 );
1007 }
1008
1009 #[cfg(test)]
1010 {
1011 self.check_state_consistency().await;
1012 }
1013 pick_tasks.extend(trivial_tasks);
1014 Ok((pick_tasks, unschedule_groups))
1015 }
1016
1017 pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
1019 fail_point!("fp_cancel_compact_task", |_| Err(Error::MetaStore(
1020 anyhow::anyhow!("failpoint metastore err")
1021 )));
1022 let ret = self
1023 .cancel_compact_task_impl(vec![task_id], task_status)
1024 .await?;
1025 Ok(ret[0])
1026 }
1027
1028 pub async fn cancel_compact_tasks(
1029 &self,
1030 tasks: Vec<u64>,
1031 task_status: TaskStatus,
1032 ) -> Result<Vec<bool>> {
1033 self.cancel_compact_task_impl(tasks, task_status).await
1034 }
1035
1036 async fn cancel_compact_task_impl(
1037 &self,
1038 task_ids: Vec<u64>,
1039 task_status: TaskStatus,
1040 ) -> Result<Vec<bool>> {
1041 assert!(CANCEL_STATUS_SET.contains(&task_status));
1042 let tasks = task_ids
1043 .into_iter()
1044 .map(|task_id| ReportTask {
1045 task_id,
1046 task_status,
1047 sorted_output_ssts: vec![],
1048 table_stats_change: HashMap::default(),
1049 object_timestamps: HashMap::default(),
1050 })
1051 .collect_vec();
1052 let rets = self.report_compact_tasks(tasks).await?;
1053 #[cfg(test)]
1054 {
1055 self.check_state_consistency().await;
1056 }
1057 Ok(rets)
1058 }
1059
1060 async fn get_compact_tasks(
1061 &self,
1062 mut compaction_groups: Vec<CompactionGroupId>,
1063 max_select_count: usize,
1064 selector: &mut Box<dyn CompactionSelector>,
1065 ) -> Result<(Vec<CompactTask>, Vec<CompactionGroupId>)> {
1066 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
1067 anyhow::anyhow!("failpoint metastore error")
1068 )));
1069 compaction_groups.shuffle(&mut thread_rng());
1070 let (mut tasks, groups) = self
1071 .get_compact_tasks_impl(compaction_groups, max_select_count, selector)
1072 .await?;
1073 tasks.retain(|task| {
1074 if task.task_status == TaskStatus::Success {
1075 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
1076 false
1077 } else {
1078 true
1079 }
1080 });
1081 Ok((tasks, groups))
1082 }
1083
1084 pub async fn get_compact_task(
1085 &self,
1086 compaction_group_id: CompactionGroupId,
1087 selector: &mut Box<dyn CompactionSelector>,
1088 ) -> Result<Option<CompactTask>> {
1089 fail_point!("fp_get_compact_task", |_| Err(Error::MetaStore(
1090 anyhow::anyhow!("failpoint metastore error")
1091 )));
1092
1093 let (normal_tasks, _) = self
1094 .get_compact_tasks_impl(vec![compaction_group_id], 1, selector)
1095 .await?;
1096 for task in normal_tasks {
1097 if task.task_status != TaskStatus::Success {
1098 return Ok(Some(task));
1099 }
1100 debug_assert!(task.is_trivial_reclaim() || task.is_trivial_move_task());
1101 }
1102 Ok(None)
1103 }
1104
1105 pub async fn manual_get_compact_task(
1106 &self,
1107 compaction_group_id: CompactionGroupId,
1108 manual_compaction_option: ManualCompactionOption,
1109 ) -> Result<Option<CompactTask>> {
1110 let mut selector: Box<dyn CompactionSelector> =
1111 Box::new(ManualCompactionSelector::new(manual_compaction_option));
1112 self.get_compact_task(compaction_group_id, &mut selector)
1113 .await
1114 }
1115
1116 pub async fn report_compact_task(
1117 &self,
1118 task_id: u64,
1119 task_status: TaskStatus,
1120 sorted_output_ssts: Vec<SstableInfo>,
1121 table_stats_change: Option<PbTableStatsMap>,
1122 object_timestamps: HashMap<HummockSstableObjectId, u64>,
1123 ) -> Result<bool> {
1124 let rets = self
1125 .report_compact_tasks(vec![ReportTask {
1126 task_id,
1127 task_status,
1128 sorted_output_ssts,
1129 table_stats_change: table_stats_change.unwrap_or_default(),
1130 object_timestamps,
1131 }])
1132 .await?;
1133 Ok(rets[0])
1134 }
1135
1136 pub async fn report_compact_tasks(&self, report_tasks: Vec<ReportTask>) -> Result<Vec<bool>> {
1137 let compaction_guard = self.compaction.write().await;
1138 let versioning_guard = self.versioning.write().await;
1139
1140 self.report_compact_tasks_impl(report_tasks, compaction_guard, versioning_guard)
1141 .await
1142 }
1143
1144 pub async fn report_compact_tasks_impl(
1152 &self,
1153 report_tasks: Vec<ReportTask>,
1154 mut compaction_guard: RwLockWriteGuard<'_, Compaction>,
1155 mut versioning_guard: RwLockWriteGuard<'_, Versioning>,
1156 ) -> Result<Vec<bool>> {
1157 let deterministic_mode = self.env.opts.compaction_deterministic_test;
1158 let compaction: &mut Compaction = &mut compaction_guard;
1159 let start_time = Instant::now();
1160 let original_keys = compaction.compaction_statuses.keys().cloned().collect_vec();
1161 let mut compact_statuses = BTreeMapTransaction::new(&mut compaction.compaction_statuses);
1162 let mut rets = vec![false; report_tasks.len()];
1163 let mut compact_task_assignment =
1164 BTreeMapTransaction::new(&mut compaction.compact_task_assignment);
1165 let versioning: &mut Versioning = &mut versioning_guard;
1167 let _timer = start_measure_real_process_timer!(self, "report_compact_tasks");
1168
1169 for group_id in original_keys {
1171 if !versioning.current_version.levels.contains_key(&group_id) {
1172 compact_statuses.remove(group_id);
1173 }
1174 }
1175 let mut tasks = vec![];
1176
1177 let mut version = HummockVersionTransaction::new(
1178 &mut versioning.current_version,
1179 &mut versioning.hummock_version_deltas,
1180 self.env.notification_manager(),
1181 None,
1182 &self.metrics,
1183 );
1184
1185 if deterministic_mode {
1186 version.disable_apply_to_txn();
1187 }
1188
1189 let mut version_stats = HummockVersionStatsTransaction::new(
1190 &mut versioning.version_stats,
1191 self.env.notification_manager(),
1192 );
1193 let mut success_count = 0;
1194 for (idx, task) in report_tasks.into_iter().enumerate() {
1195 rets[idx] = true;
1196 let mut compact_task = match compact_task_assignment.remove(task.task_id) {
1197 Some(compact_task) => CompactTask::from(compact_task.compact_task.unwrap()),
1198 None => {
1199 tracing::warn!("{}", format!("compact task {} not found", task.task_id));
1200 rets[idx] = false;
1201 continue;
1202 }
1203 };
1204
1205 {
1206 compact_task.task_status = task.task_status;
1208 compact_task.sorted_output_ssts = task.sorted_output_ssts;
1209 }
1210
1211 match compact_statuses.get_mut(compact_task.compaction_group_id) {
1212 Some(mut compact_status) => {
1213 compact_status.report_compact_task(&compact_task);
1214 }
1215 None => {
1216 compact_task.task_status = TaskStatus::InvalidGroupCanceled;
1222 }
1223 }
1224
1225 let is_success = if let TaskStatus::Success = compact_task.task_status {
1226 match self
1227 .report_compaction_sanity_check(&task.object_timestamps)
1228 .await
1229 {
1230 Err(e) => {
1231 warn!(
1232 "failed to commit compaction task {} {}",
1233 compact_task.task_id,
1234 e.as_report()
1235 );
1236 compact_task.task_status = TaskStatus::RetentionTimeRejected;
1237 false
1238 }
1239 _ => {
1240 let group = version
1241 .latest_version()
1242 .levels
1243 .get(&compact_task.compaction_group_id)
1244 .unwrap();
1245 let is_expired = compact_task.is_expired(group.compaction_group_version_id);
1246 if is_expired {
1247 compact_task.task_status = TaskStatus::InputOutdatedCanceled;
1248 warn!(
1249 "The task may be expired because of group split, task:\n {:?}",
1250 compact_task_to_string(&compact_task)
1251 );
1252 }
1253 !is_expired
1254 }
1255 }
1256 } else {
1257 false
1258 };
1259 if is_success {
1260 success_count += 1;
1261 version.apply_compact_task(&compact_task);
1262 if purge_prost_table_stats(&mut version_stats.table_stats, version.latest_version())
1263 {
1264 self.metrics.version_stats.reset();
1265 versioning.local_metrics.clear();
1266 }
1267 add_prost_table_stats_map(&mut version_stats.table_stats, &task.table_stats_change);
1268 trigger_local_table_stat(
1269 &self.metrics,
1270 &mut versioning.local_metrics,
1271 &version_stats,
1272 &task.table_stats_change,
1273 );
1274 }
1275 tasks.push(compact_task);
1276 }
1277 if success_count > 0 {
1278 commit_multi_var!(
1279 self.meta_store_ref(),
1280 compact_statuses,
1281 compact_task_assignment,
1282 version,
1283 version_stats
1284 )?;
1285
1286 self.metrics
1287 .compact_task_batch_count
1288 .with_label_values(&["batch_report_task"])
1289 .observe(success_count as f64);
1290 } else {
1291 commit_multi_var!(
1293 self.meta_store_ref(),
1294 compact_statuses,
1295 compact_task_assignment
1296 )?;
1297 }
1298
1299 let mut success_groups = vec![];
1300 for compact_task in &tasks {
1301 self.compactor_manager
1302 .remove_task_heartbeat(compact_task.task_id);
1303 tracing::trace!(
1304 "Reported compaction task. {}. cost time: {:?}",
1305 compact_task_to_string(compact_task),
1306 start_time.elapsed(),
1307 );
1308
1309 if !deterministic_mode
1310 && (matches!(compact_task.task_type, compact_task::TaskType::Dynamic)
1311 || matches!(compact_task.task_type, compact_task::TaskType::Emergency))
1312 {
1313 self.try_send_compaction_request(
1315 compact_task.compaction_group_id,
1316 compact_task::TaskType::Dynamic,
1317 );
1318 }
1319
1320 if compact_task.task_status == TaskStatus::Success {
1321 success_groups.push(compact_task.compaction_group_id);
1322 }
1323 }
1324
1325 trigger_compact_tasks_stat(
1326 &self.metrics,
1327 &tasks,
1328 &compaction.compaction_statuses,
1329 &versioning_guard.current_version,
1330 );
1331 drop(versioning_guard);
1332 if !success_groups.is_empty() {
1333 self.try_update_write_limits(&success_groups).await;
1334 }
1335 Ok(rets)
1336 }
1337
1338 pub async fn trigger_compaction_deterministic(
1341 &self,
1342 _base_version_id: HummockVersionId,
1343 compaction_groups: Vec<CompactionGroupId>,
1344 ) -> Result<()> {
1345 self.on_current_version(|old_version| {
1346 tracing::info!(
1347 "Trigger compaction for version {}, groups {:?}",
1348 old_version.id,
1349 compaction_groups
1350 );
1351 })
1352 .await;
1353
1354 if compaction_groups.is_empty() {
1355 return Ok(());
1356 }
1357 for compaction_group in compaction_groups {
1358 self.try_send_compaction_request(compaction_group, compact_task::TaskType::Dynamic);
1359 }
1360 Ok(())
1361 }
1362
1363 pub async fn trigger_manual_compaction(
1364 &self,
1365 compaction_group: CompactionGroupId,
1366 manual_compaction_option: ManualCompactionOption,
1367 ) -> Result<()> {
1368 let start_time = Instant::now();
1369
1370 let compactor = match self.compactor_manager.next_compactor() {
1372 Some(compactor) => compactor,
1373 None => {
1374 tracing::warn!("trigger_manual_compaction No compactor is available.");
1375 return Err(anyhow::anyhow!(
1376 "trigger_manual_compaction No compactor is available. compaction_group {}",
1377 compaction_group
1378 )
1379 .into());
1380 }
1381 };
1382
1383 let compact_task = self
1385 .manual_get_compact_task(compaction_group, manual_compaction_option)
1386 .await;
1387 let compact_task = match compact_task {
1388 Ok(Some(compact_task)) => compact_task,
1389 Ok(None) => {
1390 return Err(anyhow::anyhow!(
1392 "trigger_manual_compaction No compaction_task is available. compaction_group {}",
1393 compaction_group
1394 )
1395 .into());
1396 }
1397 Err(err) => {
1398 tracing::warn!(error = %err.as_report(), "Failed to get compaction task");
1399
1400 return Err(anyhow::anyhow!(err)
1401 .context(format!(
1402 "Failed to get compaction task for compaction_group {}",
1403 compaction_group,
1404 ))
1405 .into());
1406 }
1407 };
1408
1409 let compact_task_string = compact_task_to_string(&compact_task);
1411 compactor
1413 .send_event(ResponseEvent::CompactTask(compact_task.into()))
1414 .with_context(|| {
1415 format!(
1416 "Failed to trigger compaction task for compaction_group {}",
1417 compaction_group,
1418 )
1419 })?;
1420
1421 tracing::info!(
1422 "Trigger manual compaction task. {}. cost time: {:?}",
1423 &compact_task_string,
1424 start_time.elapsed(),
1425 );
1426
1427 Ok(())
1428 }
1429
1430 pub fn try_send_compaction_request(
1432 &self,
1433 compaction_group: CompactionGroupId,
1434 task_type: compact_task::TaskType,
1435 ) -> bool {
1436 match self
1437 .compaction_state
1438 .try_sched_compaction(compaction_group, task_type)
1439 {
1440 Ok(_) => true,
1441 Err(e) => {
1442 tracing::error!(
1443 error = %e.as_report(),
1444 "failed to send compaction request for compaction group {}",
1445 compaction_group,
1446 );
1447 false
1448 }
1449 }
1450 }
1451
1452 pub(crate) fn calculate_vnode_partition(
1453 &self,
1454 compact_task: &mut CompactTask,
1455 compaction_config: &CompactionConfig,
1456 ) {
1457 if compact_task.target_level > compact_task.base_level {
1461 return;
1462 }
1463 if compaction_config.split_weight_by_vnode > 0 {
1464 for table_id in &compact_task.existing_table_ids {
1465 compact_task
1466 .table_vnode_partition
1467 .insert(*table_id, compact_task.split_weight_by_vnode);
1468 }
1469 } else {
1470 let mut table_size_info: HashMap<u32, u64> = HashMap::default();
1471 let mut existing_table_ids: HashSet<u32> = HashSet::default();
1472 for input_ssts in &compact_task.input_ssts {
1473 for sst in &input_ssts.table_infos {
1474 existing_table_ids.extend(sst.table_ids.iter());
1475 for table_id in &sst.table_ids {
1476 *table_size_info.entry(*table_id).or_default() +=
1477 sst.sst_size / (sst.table_ids.len() as u64);
1478 }
1479 }
1480 }
1481 compact_task
1482 .existing_table_ids
1483 .retain(|table_id| existing_table_ids.contains(table_id));
1484
1485 let hybrid_vnode_count = self.env.opts.hybrid_partition_node_count;
1486 let default_partition_count = self.env.opts.partition_vnode_count;
1487 let compact_task_table_size_partition_threshold_low = self
1489 .env
1490 .opts
1491 .compact_task_table_size_partition_threshold_low;
1492 let compact_task_table_size_partition_threshold_high = self
1493 .env
1494 .opts
1495 .compact_task_table_size_partition_threshold_high;
1496 let table_write_throughput_statistic_manager =
1498 self.table_write_throughput_statistic_manager.read();
1499 let timestamp = chrono::Utc::now().timestamp();
1500 for (table_id, compact_table_size) in table_size_info {
1501 let write_throughput = table_write_throughput_statistic_manager
1502 .get_table_throughput_descending(table_id, timestamp)
1503 .peekable()
1504 .peek()
1505 .map(|item| item.throughput)
1506 .unwrap_or(0);
1507 if compact_table_size > compact_task_table_size_partition_threshold_high
1508 && default_partition_count > 0
1509 {
1510 compact_task
1511 .table_vnode_partition
1512 .insert(table_id, default_partition_count);
1513 } else if (compact_table_size > compact_task_table_size_partition_threshold_low
1514 || (write_throughput > self.env.opts.table_high_write_throughput_threshold
1515 && compact_table_size > compaction_config.target_file_size_base))
1516 && hybrid_vnode_count > 0
1517 {
1518 compact_task
1520 .table_vnode_partition
1521 .insert(table_id, hybrid_vnode_count);
1522 } else if compact_table_size > compaction_config.target_file_size_base {
1523 compact_task.table_vnode_partition.insert(table_id, 1);
1525 }
1526 }
1527 compact_task
1528 .table_vnode_partition
1529 .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id));
1530 }
1531 }
1532}
1533
1534#[cfg(any(test, feature = "test"))]
1535impl HummockManager {
1536 pub fn compactor_manager_ref_for_test(&self) -> crate::hummock::CompactorManagerRef {
1537 self.compactor_manager.clone()
1538 }
1539
1540 pub async fn compaction_task_from_assignment_for_test(
1541 &self,
1542 task_id: u64,
1543 ) -> Option<CompactTaskAssignment> {
1544 let compaction_guard = self.compaction.read().await;
1545 let assignment_ref = &compaction_guard.compact_task_assignment;
1546 assignment_ref.get(&task_id).cloned()
1547 }
1548
1549 pub async fn report_compact_task_for_test(
1550 &self,
1551 task_id: u64,
1552 compact_task: Option<CompactTask>,
1553 task_status: TaskStatus,
1554 sorted_output_ssts: Vec<SstableInfo>,
1555 table_stats_change: Option<PbTableStatsMap>,
1556 ) -> Result<()> {
1557 if let Some(task) = compact_task {
1558 let mut guard = self.compaction.write().await;
1559 guard.compact_task_assignment.insert(
1560 task_id,
1561 CompactTaskAssignment {
1562 compact_task: Some(task.into()),
1563 context_id: 0,
1564 },
1565 );
1566 }
1567
1568 self.report_compact_tasks(vec![ReportTask {
1571 task_id,
1572 task_status,
1573 sorted_output_ssts,
1574 table_stats_change: table_stats_change.unwrap_or_default(),
1575 object_timestamps: HashMap::default(),
1576 }])
1577 .await?;
1578 Ok(())
1579 }
1580}
1581
1582#[derive(Debug, Default)]
1583pub struct CompactionState {
1584 scheduled: Mutex<HashSet<(CompactionGroupId, compact_task::TaskType)>>,
1585}
1586
1587impl CompactionState {
1588 pub fn new() -> Self {
1589 Self {
1590 scheduled: Default::default(),
1591 }
1592 }
1593
1594 pub fn try_sched_compaction(
1596 &self,
1597 compaction_group: CompactionGroupId,
1598 task_type: TaskType,
1599 ) -> std::result::Result<bool, SendError<CompactionRequestChannelItem>> {
1600 let mut guard = self.scheduled.lock();
1601 let key = (compaction_group, task_type);
1602 if guard.contains(&key) {
1603 return Ok(false);
1604 }
1605 guard.insert(key);
1606 Ok(true)
1607 }
1608
1609 pub fn unschedule(
1610 &self,
1611 compaction_group: CompactionGroupId,
1612 task_type: compact_task::TaskType,
1613 ) {
1614 self.scheduled.lock().remove(&(compaction_group, task_type));
1615 }
1616
1617 pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
1618 let guard = self.scheduled.lock();
1619 if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
1620 Some(compact_task::TaskType::Dynamic)
1621 } else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
1622 Some(compact_task::TaskType::SpaceReclaim)
1623 } else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
1624 Some(compact_task::TaskType::Ttl)
1625 } else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
1626 Some(compact_task::TaskType::Tombstone)
1627 } else if guard.contains(&(group, compact_task::TaskType::VnodeWatermark)) {
1628 Some(compact_task::TaskType::VnodeWatermark)
1629 } else {
1630 None
1631 }
1632 }
1633}
1634
1635impl Compaction {
1636 pub fn get_compact_task_assignments_by_group_id(
1637 &self,
1638 compaction_group_id: CompactionGroupId,
1639 ) -> Vec<CompactTaskAssignment> {
1640 self.compact_task_assignment
1641 .iter()
1642 .filter_map(|(_, assignment)| {
1643 if assignment
1644 .compact_task
1645 .as_ref()
1646 .is_some_and(|task| task.compaction_group_id == compaction_group_id)
1647 {
1648 Some(CompactTaskAssignment {
1649 compact_task: assignment.compact_task.clone(),
1650 context_id: assignment.context_id,
1651 })
1652 } else {
1653 None
1654 }
1655 })
1656 .collect()
1657 }
1658}
1659
1660#[derive(Clone, Default)]
1661pub struct CompactionGroupStatistic {
1662 pub group_id: CompactionGroupId,
1663 pub group_size: u64,
1664 pub table_statistic: BTreeMap<StateTableId, u64>,
1665 pub compaction_group_config: CompactionGroup,
1666}
1667
1668fn update_table_stats_for_vnode_watermark_trivial_reclaim(
1670 table_stats: &mut PbTableStatsMap,
1671 task: &CompactTask,
1672) {
1673 if task.task_type != TaskType::VnodeWatermark {
1674 return;
1675 }
1676 let mut deleted_table_keys: HashMap<u32, u64> = HashMap::default();
1677 for s in task.input_ssts.iter().flat_map(|l| l.table_infos.iter()) {
1678 assert_eq!(s.table_ids.len(), 1);
1679 let e = deleted_table_keys.entry(s.table_ids[0]).or_insert(0);
1680 *e += s.total_key_count;
1681 }
1682 for (table_id, delete_count) in deleted_table_keys {
1683 let Some(stats) = table_stats.get_mut(&table_id) else {
1684 continue;
1685 };
1686 if stats.total_key_count == 0 {
1687 continue;
1688 }
1689 let new_total_key_count = stats.total_key_count.saturating_sub(delete_count as i64);
1690 let ratio = new_total_key_count as f64 / stats.total_key_count as f64;
1691 stats.total_key_count = new_total_key_count;
1693 stats.total_key_size = (stats.total_key_size as f64 * ratio).ceil() as i64;
1695 stats.total_value_size = (stats.total_value_size as f64 * ratio).ceil() as i64;
1696 }
1697}
1698
1699#[derive(Debug, Clone)]
1700pub enum GroupState {
1701 Normal,
1703
1704 Emergency(String), WriteStop(String), }
1710
1711impl GroupState {
1712 pub fn is_write_stop(&self) -> bool {
1713 matches!(self, Self::WriteStop(_))
1714 }
1715
1716 pub fn is_emergency(&self) -> bool {
1717 matches!(self, Self::Emergency(_))
1718 }
1719
1720 pub fn reason(&self) -> Option<&str> {
1721 match self {
1722 Self::Emergency(reason) | Self::WriteStop(reason) => Some(reason),
1723 _ => None,
1724 }
1725 }
1726}
1727
1728#[derive(Clone, Default)]
1729pub struct GroupStateValidator;
1730
1731impl GroupStateValidator {
1732 pub fn write_stop_sub_level_count(
1733 level_count: usize,
1734 compaction_config: &CompactionConfig,
1735 ) -> bool {
1736 let threshold = compaction_config.level0_stop_write_threshold_sub_level_number as usize;
1737 level_count > threshold
1738 }
1739
1740 pub fn write_stop_l0_size(l0_size: u64, compaction_config: &CompactionConfig) -> bool {
1741 l0_size
1742 > compaction_config
1743 .level0_stop_write_threshold_max_size
1744 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1745 }
1746
1747 pub fn write_stop_l0_file_count(
1748 l0_file_count: usize,
1749 compaction_config: &CompactionConfig,
1750 ) -> bool {
1751 l0_file_count
1752 > compaction_config
1753 .level0_stop_write_threshold_max_sst_count
1754 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1755 as usize
1756 }
1757
1758 pub fn emergency_l0_file_count(
1759 l0_file_count: usize,
1760 compaction_config: &CompactionConfig,
1761 ) -> bool {
1762 l0_file_count
1763 > compaction_config
1764 .emergency_level0_sst_file_count
1765 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1766 as usize
1767 }
1768
1769 pub fn emergency_l0_partition_count(
1770 last_l0_sub_level_partition_count: usize,
1771 compaction_config: &CompactionConfig,
1772 ) -> bool {
1773 last_l0_sub_level_partition_count
1774 > compaction_config
1775 .emergency_level0_sub_level_partition
1776 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1777 as usize
1778 }
1779
1780 pub fn check_single_group_write_stop(
1781 levels: &Levels,
1782 compaction_config: &CompactionConfig,
1783 ) -> GroupState {
1784 if Self::write_stop_sub_level_count(levels.l0.sub_levels.len(), compaction_config) {
1785 return GroupState::WriteStop(format!(
1786 "WriteStop(l0_level_count: {}, threshold: {}) too many L0 sub levels",
1787 levels.l0.sub_levels.len(),
1788 compaction_config.level0_stop_write_threshold_sub_level_number
1789 ));
1790 }
1791
1792 if Self::write_stop_l0_file_count(
1793 levels
1794 .l0
1795 .sub_levels
1796 .iter()
1797 .map(|l| l.table_infos.len())
1798 .sum(),
1799 compaction_config,
1800 ) {
1801 return GroupState::WriteStop(format!(
1802 "WriteStop(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1803 levels
1804 .l0
1805 .sub_levels
1806 .iter()
1807 .map(|l| l.table_infos.len())
1808 .sum::<usize>(),
1809 compaction_config
1810 .level0_stop_write_threshold_max_sst_count
1811 .unwrap_or(compaction_config::level0_stop_write_threshold_max_sst_count())
1812 ));
1813 }
1814
1815 if Self::write_stop_l0_size(levels.l0.total_file_size, compaction_config) {
1816 return GroupState::WriteStop(format!(
1817 "WriteStop(l0_size: {}, threshold: {}) too large L0 size",
1818 levels.l0.total_file_size,
1819 compaction_config
1820 .level0_stop_write_threshold_max_size
1821 .unwrap_or(compaction_config::level0_stop_write_threshold_max_size())
1822 ));
1823 }
1824
1825 GroupState::Normal
1826 }
1827
1828 pub fn check_single_group_emergency(
1829 levels: &Levels,
1830 compaction_config: &CompactionConfig,
1831 ) -> GroupState {
1832 if Self::emergency_l0_file_count(
1833 levels
1834 .l0
1835 .sub_levels
1836 .iter()
1837 .map(|l| l.table_infos.len())
1838 .sum(),
1839 compaction_config,
1840 ) {
1841 return GroupState::Emergency(format!(
1842 "Emergency(l0_sst_count: {}, threshold: {}) too many L0 sst files",
1843 levels
1844 .l0
1845 .sub_levels
1846 .iter()
1847 .map(|l| l.table_infos.len())
1848 .sum::<usize>(),
1849 compaction_config
1850 .emergency_level0_sst_file_count
1851 .unwrap_or(compaction_config::emergency_level0_sst_file_count())
1852 ));
1853 }
1854
1855 if Self::emergency_l0_partition_count(
1856 levels
1857 .l0
1858 .sub_levels
1859 .first()
1860 .map(|l| l.table_infos.len())
1861 .unwrap_or(0),
1862 compaction_config,
1863 ) {
1864 return GroupState::Emergency(format!(
1865 "Emergency(l0_partition_count: {}, threshold: {}) too many L0 partitions",
1866 levels
1867 .l0
1868 .sub_levels
1869 .first()
1870 .map(|l| l.table_infos.len())
1871 .unwrap_or(0),
1872 compaction_config
1873 .emergency_level0_sub_level_partition
1874 .unwrap_or(compaction_config::emergency_level0_sub_level_partition())
1875 ));
1876 }
1877
1878 GroupState::Normal
1879 }
1880
1881 pub fn group_state(levels: &Levels, compaction_config: &CompactionConfig) -> GroupState {
1882 let state = Self::check_single_group_write_stop(levels, compaction_config);
1883 if state.is_write_stop() {
1884 return state;
1885 }
1886
1887 Self::check_single_group_emergency(levels, compaction_config)
1888 }
1889}