1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
19use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
20use risingwave_pb::hummock::PbCompactTask;
21use risingwave_pb::hummock::report_compaction_task_request::{
22 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
23 ReportTask as ReportSharedTask,
24};
25use risingwave_rpc_client::GrpcCompactorProxyClient;
26use thiserror_ext::AsReport;
27use tokio::sync::mpsc;
28use tonic::Request;
29
30pub mod compactor_runner;
31mod context;
32pub mod fast_compactor_runner;
33mod iterator;
34mod shared_buffer_compact;
35pub(super) mod task_progress;
36
37use std::collections::{HashMap, VecDeque};
38use std::marker::PhantomData;
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::{Arc, Mutex};
41use std::time::{Duration, SystemTime};
42
43use await_tree::{InstrumentAwait, SpanExt};
44pub use compaction_executor::CompactionExecutor;
45pub use compaction_filter::{
46 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
47 TtlCompactionFilter,
48};
49pub use context::{
50 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
51};
52use futures::{StreamExt, pin_mut};
53pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
54use more_asserts::assert_ge;
55use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
56use risingwave_hummock_sdk::{
57 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
58};
59use risingwave_pb::hummock::compact_task::TaskStatus;
60use risingwave_pb::hummock::subscribe_compaction_event_request::{
61 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
62};
63use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
64use risingwave_pb::hummock::{
65 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
66 SubscribeCompactionEventResponse,
67};
68use risingwave_rpc_client::HummockMetaClient;
69pub use shared_buffer_compact::{compact, merge_imms_in_memory};
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tokio::time::Instant;
73
74pub use self::compaction_utils::{
75 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
76 check_flush_result,
77};
78pub use self::task_progress::TaskProgress;
79use super::multi_builder::CapacitySplitTableBuilder;
80use super::{
81 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
82};
83use crate::compaction_catalog_manager::{
84 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
85};
86use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
87use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
88use crate::hummock::iterator::{Forward, HummockIterator};
89use crate::hummock::{
90 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
91 UnifiedSstableWriterFactory, validate_ssts,
92};
93use crate::monitor::CompactorMetrics;
94
95pub struct Compactor {
97 context: CompactorContext,
99 object_id_getter: Arc<dyn GetObjectId>,
100 task_config: TaskConfig,
101 options: SstableBuilderOptions,
102 get_id_time: Arc<AtomicU64>,
103}
104
105pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
106
107impl Compactor {
108 pub fn new(
110 context: CompactorContext,
111 options: SstableBuilderOptions,
112 task_config: TaskConfig,
113 object_id_getter: Arc<dyn GetObjectId>,
114 ) -> Self {
115 Self {
116 context,
117 options,
118 task_config,
119 get_id_time: Arc::new(AtomicU64::new(0)),
120 object_id_getter,
121 }
122 }
123
124 async fn compact_key_range(
129 &self,
130 iter: impl HummockIterator<Direction = Forward>,
131 compaction_filter: impl CompactionFilter,
132 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
133 task_progress: Option<Arc<TaskProgress>>,
134 task_id: Option<HummockCompactionTaskId>,
135 split_index: Option<usize>,
136 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
137 let compact_timer = if self.context.is_share_buffer_compact {
139 self.context
140 .compactor_metrics
141 .write_build_l0_sst_duration
142 .start_timer()
143 } else {
144 self.context
145 .compactor_metrics
146 .compact_sst_duration
147 .start_timer()
148 };
149
150 let (split_table_outputs, table_stats_map) = {
151 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
152 if self.task_config.use_block_based_filter {
153 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
154 factory,
155 iter,
156 compaction_filter,
157 compaction_catalog_agent_ref,
158 task_progress.clone(),
159 self.object_id_getter.clone(),
160 )
161 .instrument_await("compact".verbose())
162 .await?
163 } else {
164 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
165 factory,
166 iter,
167 compaction_filter,
168 compaction_catalog_agent_ref,
169 task_progress.clone(),
170 self.object_id_getter.clone(),
171 )
172 .instrument_await("compact".verbose())
173 .await?
174 }
175 };
176
177 compact_timer.observe_duration();
178
179 Self::report_progress(
180 self.context.compactor_metrics.clone(),
181 task_progress,
182 &split_table_outputs,
183 self.context.is_share_buffer_compact,
184 );
185
186 self.context
187 .compactor_metrics
188 .get_table_id_total_time_duration
189 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
190
191 debug_assert!(
192 split_table_outputs
193 .iter()
194 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
195 );
196
197 if task_id.is_some() {
198 tracing::info!(
200 "Finish Task {:?} split_index {:?} sst count {}",
201 task_id,
202 split_index,
203 split_table_outputs.len()
204 );
205 }
206 Ok((split_table_outputs, table_stats_map))
207 }
208
209 pub fn report_progress(
210 metrics: Arc<CompactorMetrics>,
211 task_progress: Option<Arc<TaskProgress>>,
212 ssts: &Vec<LocalSstableInfo>,
213 is_share_buffer_compact: bool,
214 ) {
215 for sst_info in ssts {
216 let sst_size = sst_info.file_size();
217 if let Some(tracker) = &task_progress {
218 tracker.inc_ssts_uploaded();
219 tracker.dec_num_pending_write_io();
220 }
221 if is_share_buffer_compact {
222 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
223 } else {
224 metrics.compaction_upload_sst_counts.inc();
225 }
226 }
227 }
228
229 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
230 &self,
231 writer_factory: F,
232 iter: impl HummockIterator<Direction = Forward>,
233 compaction_filter: impl CompactionFilter,
234 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
235 task_progress: Option<Arc<TaskProgress>>,
236 object_id_getter: Arc<dyn GetObjectId>,
237 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
238 let builder_factory = RemoteBuilderFactory::<F, B> {
239 object_id_getter,
240 limiter: self.context.memory_limiter.clone(),
241 options: self.options.clone(),
242 policy: self.task_config.cache_policy,
243 remote_rpc_cost: self.get_id_time.clone(),
244 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
245 sstable_writer_factory: writer_factory,
246 _phantom: PhantomData,
247 };
248
249 let mut sst_builder = CapacitySplitTableBuilder::new(
250 builder_factory,
251 self.context.compactor_metrics.clone(),
252 task_progress.clone(),
253 self.task_config.table_vnode_partition.clone(),
254 self.context
255 .storage_opts
256 .compactor_concurrent_uploading_sst_count,
257 compaction_catalog_agent_ref,
258 );
259 let compaction_statistics = compact_and_build_sst(
260 &mut sst_builder,
261 &self.task_config,
262 self.context.compactor_metrics.clone(),
263 iter,
264 compaction_filter,
265 )
266 .instrument_await("compact_and_build_sst".verbose())
267 .await?;
268
269 let ssts = sst_builder
270 .finish()
271 .instrument_await("builder_finish".verbose())
272 .await?;
273
274 Ok((ssts, compaction_statistics))
275 }
276}
277
278#[cfg_attr(coverage, coverage(off))]
281#[must_use]
282pub fn start_compactor(
283 compactor_context: CompactorContext,
284 hummock_meta_client: Arc<dyn HummockMetaClient>,
285 object_id_manager: Arc<ObjectIdManager>,
286 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
287) -> (JoinHandle<()>, Sender<()>) {
288 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
289 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
290 let stream_retry_interval = Duration::from_secs(30);
291 let task_progress = compactor_context.task_progress_manager.clone();
292 let periodic_event_update_interval = Duration::from_millis(1000);
293
294 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
295 * compactor_context.storage_opts.compactor_max_task_multiplier)
296 .ceil() as u32;
297 let running_task_parallelism = Arc::new(AtomicU32::new(0));
298
299 const MAX_PULL_TASK_COUNT: u32 = 4;
300 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
301
302 assert_ge!(
303 compactor_context.storage_opts.compactor_max_task_multiplier,
304 0.0
305 );
306
307 let join_handle = tokio::spawn(async move {
308 let shutdown_map = CompactionShutdownMap::default();
309 let mut min_interval = tokio::time::interval(stream_retry_interval);
310 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
311
312 'start_stream: loop {
314 let mut pull_task_ack = true;
317 tokio::select! {
318 _ = min_interval.tick() => {},
320 _ = &mut shutdown_rx => {
322 tracing::info!("Compactor is shutting down");
323 return;
324 }
325 }
326
327 let (request_sender, response_event_stream) =
328 match hummock_meta_client.subscribe_compaction_event().await {
329 Ok((request_sender, response_event_stream)) => {
330 tracing::debug!("Succeeded subscribe_compaction_event.");
331 (request_sender, response_event_stream)
332 }
333
334 Err(e) => {
335 tracing::warn!(
336 error = %e.as_report(),
337 "Subscribing to compaction tasks failed with error. Will retry.",
338 );
339 continue 'start_stream;
340 }
341 };
342
343 pin_mut!(response_event_stream);
344
345 let executor = compactor_context.compaction_executor.clone();
346 let object_id_manager = object_id_manager.clone();
347
348 let mut event_loop_iteration_now = Instant::now();
350 'consume_stream: loop {
351 {
352 compactor_context
354 .compactor_metrics
355 .compaction_event_loop_iteration_latency
356 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
357 event_loop_iteration_now = Instant::now();
358 }
359
360 let running_task_parallelism = running_task_parallelism.clone();
361 let request_sender = request_sender.clone();
362 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
363 _ = periodic_event_interval.tick() => {
364 let progress_list = get_task_progress(task_progress.clone());
365
366 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
367 event: Some(RequestEvent::HeartBeat(
368 HeartBeat {
369 progress: progress_list
370 }
371 )),
372 create_at: SystemTime::now()
373 .duration_since(std::time::UNIX_EPOCH)
374 .expect("Clock may have gone backwards")
375 .as_millis() as u64,
376 }) {
377 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
378 continue 'start_stream;
380 }
381
382
383 let mut pending_pull_task_count = 0;
384 if pull_task_ack {
385 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
387
388 if pending_pull_task_count > 0 {
389 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
390 event: Some(RequestEvent::PullTask(
391 PullTask {
392 pull_task_count: pending_pull_task_count,
393 }
394 )),
395 create_at: SystemTime::now()
396 .duration_since(std::time::UNIX_EPOCH)
397 .expect("Clock may have gone backwards")
398 .as_millis() as u64,
399 }) {
400 tracing::warn!(error = %e.as_report(), "Failed to pull task");
401
402 continue 'start_stream;
404 } else {
405 pull_task_ack = false;
406 }
407 }
408 }
409
410 tracing::info!(
411 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
412 pull_task_ack = %pull_task_ack,
413 pending_pull_task_count = %pending_pull_task_count
414 );
415
416 continue;
417 }
418 event = response_event_stream.next() => {
419 event
420 }
421
422 _ = &mut shutdown_rx => {
423 tracing::info!("Compactor is shutting down");
424 return
425 }
426 };
427
428 fn send_report_task_event(
429 compact_task: &CompactTask,
430 table_stats: TableStatsMap,
431 object_timestamps: HashMap<HummockSstableObjectId, u64>,
432 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
433 ) {
434 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
435 event: Some(RequestEvent::ReportTask(ReportTask {
436 task_id: compact_task.task_id,
437 task_status: compact_task.task_status.into(),
438 sorted_output_ssts: compact_task
439 .sorted_output_ssts
440 .iter()
441 .map(|sst| sst.into())
442 .collect(),
443 table_stats_change: to_prost_table_stats_map(table_stats),
444 object_timestamps: object_timestamps
445 .iter()
446 .map(|(object_id, timestamp)| (object_id.inner(), *timestamp))
447 .collect(),
448 })),
449 create_at: SystemTime::now()
450 .duration_since(std::time::UNIX_EPOCH)
451 .expect("Clock may have gone backwards")
452 .as_millis() as u64,
453 }) {
454 let task_id = compact_task.task_id;
455 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
456 }
457 }
458
459 match event {
460 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
461 let event = match event {
462 Some(event) => event,
463 None => continue 'consume_stream,
464 };
465 let shutdown = shutdown_map.clone();
466 let context = compactor_context.clone();
467 let consumed_latency_ms = SystemTime::now()
468 .duration_since(std::time::UNIX_EPOCH)
469 .expect("Clock may have gone backwards")
470 .as_millis() as u64
471 - create_at;
472 context
473 .compactor_metrics
474 .compaction_event_consumed_latency
475 .observe(consumed_latency_ms as _);
476
477 let object_id_manager = object_id_manager.clone();
478 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
479
480 match event {
481 ResponseEvent::CompactTask(compact_task) => {
482 let compact_task = CompactTask::from(compact_task);
483 let parallelism =
484 calculate_task_parallelism(&compact_task, &context);
485
486 assert_ne!(parallelism, 0, "splits cannot be empty");
487
488 if (max_task_parallelism
489 - running_task_parallelism.load(Ordering::SeqCst))
490 < parallelism as u32
491 {
492 tracing::warn!(
493 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
494 compact_task.task_id,
495 parallelism,
496 max_task_parallelism,
497 running_task_parallelism.load(Ordering::Relaxed),
498 );
499 let (compact_task, table_stats, object_timestamps) =
500 compact_done(
501 compact_task,
502 context.clone(),
503 vec![],
504 TaskStatus::NoAvailCpuResourceCanceled,
505 );
506
507 send_report_task_event(
508 &compact_task,
509 table_stats,
510 object_timestamps,
511 &request_sender,
512 );
513
514 continue 'consume_stream;
515 }
516
517 running_task_parallelism
518 .fetch_add(parallelism as u32, Ordering::SeqCst);
519 executor.spawn(async move {
520 let (tx, rx) = tokio::sync::oneshot::channel();
521 let task_id = compact_task.task_id;
522 shutdown.lock().unwrap().insert(task_id, tx);
523
524 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
525 context.clone(),
526 compact_task,
527 rx,
528 object_id_manager.clone(),
529 compaction_catalog_manager_ref.clone(),
530 )
531 .await;
532
533 shutdown.lock().unwrap().remove(&task_id);
534 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
535
536 send_report_task_event(
537 &compact_task,
538 table_stats,
539 object_timestamps,
540 &request_sender,
541 );
542
543 let enable_check_compaction_result =
544 context.storage_opts.check_compaction_result;
545 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
546
547 if enable_check_compaction_result && need_check_task {
548 let compact_table_ids = compact_task.build_compact_table_ids();
549 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
550 Ok(compaction_catalog_agent_ref) => {
551 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
552 {
553 Err(e) => {
554 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
555 }
556 Ok(true) => (),
557 Ok(false) => {
558 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
559 }
560 }
561 },
562 Err(e) => {
563 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
564 }
565 }
566 }
567 });
568 }
569 ResponseEvent::VacuumTask(_) => {
570 unreachable!("unexpected vacuum task");
571 }
572 ResponseEvent::FullScanTask(_) => {
573 unreachable!("unexpected scan task");
574 }
575 ResponseEvent::ValidationTask(validation_task) => {
576 let validation_task = ValidationTask::from(validation_task);
577 executor.spawn(async move {
578 validate_ssts(validation_task, context.sstable_store.clone())
579 .await;
580 });
581 }
582 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
583 .lock()
584 .unwrap()
585 .remove(&cancel_compact_task.task_id)
586 {
587 Some(tx) => {
588 if tx.send(()).is_err() {
589 tracing::warn!(
590 "Cancellation of compaction task failed. task_id: {}",
591 cancel_compact_task.task_id
592 );
593 }
594 }
595 _ => {
596 tracing::warn!(
597 "Attempting to cancel non-existent compaction task. task_id: {}",
598 cancel_compact_task.task_id
599 );
600 }
601 },
602
603 ResponseEvent::PullTaskAck(_pull_task_ack) => {
604 pull_task_ack = true;
606 }
607 }
608 }
609 Some(Err(e)) => {
610 tracing::warn!("Failed to consume stream. {}", e.message());
611 continue 'start_stream;
612 }
613 _ => {
614 continue 'start_stream;
616 }
617 }
618 }
619 }
620 });
621
622 (join_handle, shutdown_tx)
623}
624
625#[cfg_attr(coverage, coverage(off))]
628#[must_use]
629pub fn start_shared_compactor(
630 grpc_proxy_client: GrpcCompactorProxyClient,
631 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
632 context: CompactorContext,
633) -> (JoinHandle<()>, Sender<()>) {
634 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
635 let task_progress = context.task_progress_manager.clone();
636 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
637 let periodic_event_update_interval = Duration::from_millis(1000);
638
639 let join_handle = tokio::spawn(async move {
640 let shutdown_map = CompactionShutdownMap::default();
641
642 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
643 let executor = context.compaction_executor.clone();
644 let report_heartbeat_client = grpc_proxy_client.clone();
645 'consume_stream: loop {
646 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
647 _ = periodic_event_interval.tick() => {
648 let progress_list = get_task_progress(task_progress.clone());
649 let report_compaction_task_request = ReportCompactionTaskRequest{
650 event: Some(ReportCompactionTaskEvent::HeartBeat(
651 SharedHeartBeat {
652 progress: progress_list
653 }
654 )),
655 };
656 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
657 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
658 }
659 continue
660 }
661
662
663 _ = &mut shutdown_rx => {
664 tracing::info!("Compactor is shutting down");
665 return
666 }
667
668 request = receiver.recv() => {
669 request
670 }
671
672 };
673 match request {
674 Some(request) => {
675 let context = context.clone();
676 let shutdown = shutdown_map.clone();
677
678 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
679 executor.spawn(async move {
680 let DispatchCompactionTaskRequest {
681 tables,
682 output_object_ids,
683 task: dispatch_task,
684 } = request.into_inner();
685 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
686 acc.insert(table.id, table);
687 acc
688 });
689
690 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
691 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
692 let shared_compactor_object_id_manager =
693 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
694 match dispatch_task.unwrap() {
695 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
696 let compact_task = CompactTask::from(&compact_task);
697 let (tx, rx) = tokio::sync::oneshot::channel();
698 let task_id = compact_task.task_id;
699 shutdown.lock().unwrap().insert(task_id, tx);
700
701 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
702 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
703 context.clone(),
704 compact_task,
705 rx,
706 shared_compactor_object_id_manager,
707 compaction_catalog_agent_ref.clone(),
708 )
709 .await;
710 shutdown.lock().unwrap().remove(&task_id);
711 let report_compaction_task_request = ReportCompactionTaskRequest {
712 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
713 compact_task: Some(PbCompactTask::from(&compact_task)),
714 table_stats_change: to_prost_table_stats_map(table_stats),
715 object_timestamps: object_timestamps
716 .into_iter()
717 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
718 .collect(),
719 })),
720 };
721
722 match cloned_grpc_proxy_client
723 .report_compaction_task(report_compaction_task_request)
724 .await
725 {
726 Ok(_) => {
727 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
729 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
730 if enable_check_compaction_result && need_check_task {
731 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
732 Err(e) => {
733 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
734 },
735 Ok(true) => (),
736 Ok(false) => {
737 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
738 }
739 }
740 }
741 }
742 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
743 }
744
745 }
746 dispatch_compaction_task_request::Task::VacuumTask(_) => {
747 unreachable!("unexpected vacuum task");
748 }
749 dispatch_compaction_task_request::Task::FullScanTask(_) => {
750 unreachable!("unexpected scan task");
751 }
752 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
753 let validation_task = ValidationTask::from(validation_task);
754 validate_ssts(validation_task, context.sstable_store.clone()).await;
755 }
756 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
757 match shutdown
758 .lock()
759 .unwrap()
760 .remove(&cancel_compact_task.task_id)
761 { Some(tx) => {
762 if tx.send(()).is_err() {
763 tracing::warn!(
764 "Cancellation of compaction task failed. task_id: {}",
765 cancel_compact_task.task_id
766 );
767 }
768 } _ => {
769 tracing::warn!(
770 "Attempting to cancel non-existent compaction task. task_id: {}",
771 cancel_compact_task.task_id
772 );
773 }}
774 }
775 }
776 });
777 }
778 None => continue 'consume_stream,
779 }
780 }
781 });
782 (join_handle, shutdown_tx)
783}
784
785fn get_task_progress(
786 task_progress: Arc<
787 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
788 >,
789) -> Vec<CompactTaskProgress> {
790 let mut progress_list = Vec::new();
791 for (&task_id, progress) in &*task_progress.lock() {
792 progress_list.push(progress.snapshot(task_id));
793 }
794 progress_list
795}