1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
19use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
20use risingwave_pb::hummock::PbCompactTask;
21use risingwave_pb::hummock::report_compaction_task_request::{
22 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
23 ReportTask as ReportSharedTask,
24};
25use risingwave_rpc_client::GrpcCompactorProxyClient;
26use thiserror_ext::AsReport;
27use tokio::sync::mpsc;
28use tonic::Request;
29
30pub mod compactor_runner;
31mod context;
32pub mod fast_compactor_runner;
33mod iterator;
34mod shared_buffer_compact;
35pub(super) mod task_progress;
36
37use std::collections::{HashMap, VecDeque};
38use std::marker::PhantomData;
39use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
40use std::sync::{Arc, Mutex};
41use std::time::{Duration, SystemTime};
42
43use await_tree::{InstrumentAwait, SpanExt};
44pub use compaction_executor::CompactionExecutor;
45pub use compaction_filter::{
46 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
47 TtlCompactionFilter,
48};
49pub use context::{
50 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
51};
52use futures::{StreamExt, pin_mut};
53pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
54use more_asserts::assert_ge;
55use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
56use risingwave_hummock_sdk::{
57 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
58};
59use risingwave_pb::hummock::compact_task::TaskStatus;
60use risingwave_pb::hummock::subscribe_compaction_event_request::{
61 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
62};
63use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
64use risingwave_pb::hummock::{
65 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
66 SubscribeCompactionEventResponse,
67};
68use risingwave_rpc_client::HummockMetaClient;
69pub use shared_buffer_compact::{compact, merge_imms_in_memory};
70use tokio::sync::oneshot::Sender;
71use tokio::task::JoinHandle;
72use tokio::time::Instant;
73
74pub use self::compaction_utils::{
75 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
76 check_flush_result,
77};
78pub use self::task_progress::TaskProgress;
79use super::multi_builder::CapacitySplitTableBuilder;
80use super::{
81 GetObjectId, HummockResult, SstableBuilderOptions, SstableObjectIdManager, Xor16FilterBuilder,
82};
83use crate::compaction_catalog_manager::{
84 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
85};
86use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
87use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
88use crate::hummock::iterator::{Forward, HummockIterator};
89use crate::hummock::{
90 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
91 UnifiedSstableWriterFactory, validate_ssts,
92};
93use crate::monitor::CompactorMetrics;
94
95pub struct Compactor {
97 context: CompactorContext,
99 object_id_getter: Box<dyn GetObjectId>,
100 task_config: TaskConfig,
101 options: SstableBuilderOptions,
102 get_id_time: Arc<AtomicU64>,
103}
104
105pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
106
107impl Compactor {
108 pub fn new(
110 context: CompactorContext,
111 options: SstableBuilderOptions,
112 task_config: TaskConfig,
113 object_id_getter: Box<dyn GetObjectId>,
114 ) -> Self {
115 Self {
116 context,
117 options,
118 task_config,
119 get_id_time: Arc::new(AtomicU64::new(0)),
120 object_id_getter,
121 }
122 }
123
124 async fn compact_key_range(
129 &self,
130 iter: impl HummockIterator<Direction = Forward>,
131 compaction_filter: impl CompactionFilter,
132 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
133 task_progress: Option<Arc<TaskProgress>>,
134 task_id: Option<HummockCompactionTaskId>,
135 split_index: Option<usize>,
136 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
137 let compact_timer = if self.context.is_share_buffer_compact {
139 self.context
140 .compactor_metrics
141 .write_build_l0_sst_duration
142 .start_timer()
143 } else {
144 self.context
145 .compactor_metrics
146 .compact_sst_duration
147 .start_timer()
148 };
149
150 let (split_table_outputs, table_stats_map) = {
151 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
152 if self.task_config.use_block_based_filter {
153 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
154 factory,
155 iter,
156 compaction_filter,
157 compaction_catalog_agent_ref,
158 task_progress.clone(),
159 self.object_id_getter.clone(),
160 )
161 .instrument_await("compact".verbose())
162 .await?
163 } else {
164 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
165 factory,
166 iter,
167 compaction_filter,
168 compaction_catalog_agent_ref,
169 task_progress.clone(),
170 self.object_id_getter.clone(),
171 )
172 .instrument_await("compact".verbose())
173 .await?
174 }
175 };
176
177 compact_timer.observe_duration();
178
179 Self::report_progress(
180 self.context.compactor_metrics.clone(),
181 task_progress,
182 &split_table_outputs,
183 self.context.is_share_buffer_compact,
184 );
185
186 self.context
187 .compactor_metrics
188 .get_table_id_total_time_duration
189 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
190
191 debug_assert!(
192 split_table_outputs
193 .iter()
194 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
195 );
196
197 if task_id.is_some() {
198 tracing::info!(
200 "Finish Task {:?} split_index {:?} sst count {}",
201 task_id,
202 split_index,
203 split_table_outputs.len()
204 );
205 }
206 Ok((split_table_outputs, table_stats_map))
207 }
208
209 pub fn report_progress(
210 metrics: Arc<CompactorMetrics>,
211 task_progress: Option<Arc<TaskProgress>>,
212 ssts: &Vec<LocalSstableInfo>,
213 is_share_buffer_compact: bool,
214 ) {
215 for sst_info in ssts {
216 let sst_size = sst_info.file_size();
217 if let Some(tracker) = &task_progress {
218 tracker.inc_ssts_uploaded();
219 tracker.dec_num_pending_write_io();
220 }
221 if is_share_buffer_compact {
222 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
223 } else {
224 metrics.compaction_upload_sst_counts.inc();
225 }
226 }
227 }
228
229 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
230 &self,
231 writer_factory: F,
232 iter: impl HummockIterator<Direction = Forward>,
233 compaction_filter: impl CompactionFilter,
234 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
235 task_progress: Option<Arc<TaskProgress>>,
236 object_id_getter: Box<dyn GetObjectId>,
237 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
238 let builder_factory = RemoteBuilderFactory::<F, B> {
239 object_id_getter,
240 limiter: self.context.memory_limiter.clone(),
241 options: self.options.clone(),
242 policy: self.task_config.cache_policy,
243 remote_rpc_cost: self.get_id_time.clone(),
244 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
245 sstable_writer_factory: writer_factory,
246 _phantom: PhantomData,
247 };
248
249 let mut sst_builder = CapacitySplitTableBuilder::new(
250 builder_factory,
251 self.context.compactor_metrics.clone(),
252 task_progress.clone(),
253 self.task_config.table_vnode_partition.clone(),
254 self.context
255 .storage_opts
256 .compactor_concurrent_uploading_sst_count,
257 compaction_catalog_agent_ref,
258 );
259 let compaction_statistics = compact_and_build_sst(
260 &mut sst_builder,
261 &self.task_config,
262 self.context.compactor_metrics.clone(),
263 iter,
264 compaction_filter,
265 )
266 .instrument_await("compact_and_build_sst".verbose())
267 .await?;
268
269 let ssts = sst_builder
270 .finish()
271 .instrument_await("builder_finish".verbose())
272 .await?;
273
274 Ok((ssts, compaction_statistics))
275 }
276}
277
278#[cfg_attr(coverage, coverage(off))]
281#[must_use]
282pub fn start_compactor(
283 compactor_context: CompactorContext,
284 hummock_meta_client: Arc<dyn HummockMetaClient>,
285 sstable_object_id_manager: Arc<SstableObjectIdManager>,
286 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
287) -> (JoinHandle<()>, Sender<()>) {
288 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
289 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
290 let stream_retry_interval = Duration::from_secs(30);
291 let task_progress = compactor_context.task_progress_manager.clone();
292 let periodic_event_update_interval = Duration::from_millis(1000);
293
294 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
295 * compactor_context.storage_opts.compactor_max_task_multiplier)
296 .ceil() as u32;
297 let running_task_parallelism = Arc::new(AtomicU32::new(0));
298
299 const MAX_PULL_TASK_COUNT: u32 = 4;
300 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
301
302 assert_ge!(
303 compactor_context.storage_opts.compactor_max_task_multiplier,
304 0.0
305 );
306
307 let join_handle = tokio::spawn(async move {
308 let shutdown_map = CompactionShutdownMap::default();
309 let mut min_interval = tokio::time::interval(stream_retry_interval);
310 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
311
312 'start_stream: loop {
314 let mut pull_task_ack = true;
317 tokio::select! {
318 _ = min_interval.tick() => {},
320 _ = &mut shutdown_rx => {
322 tracing::info!("Compactor is shutting down");
323 return;
324 }
325 }
326
327 let (request_sender, response_event_stream) =
328 match hummock_meta_client.subscribe_compaction_event().await {
329 Ok((request_sender, response_event_stream)) => {
330 tracing::debug!("Succeeded subscribe_compaction_event.");
331 (request_sender, response_event_stream)
332 }
333
334 Err(e) => {
335 tracing::warn!(
336 error = %e.as_report(),
337 "Subscribing to compaction tasks failed with error. Will retry.",
338 );
339 continue 'start_stream;
340 }
341 };
342
343 pin_mut!(response_event_stream);
344
345 let executor = compactor_context.compaction_executor.clone();
346 let sstable_object_id_manager = sstable_object_id_manager.clone();
347
348 let mut event_loop_iteration_now = Instant::now();
350 'consume_stream: loop {
351 {
352 compactor_context
354 .compactor_metrics
355 .compaction_event_loop_iteration_latency
356 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
357 event_loop_iteration_now = Instant::now();
358 }
359
360 let running_task_parallelism = running_task_parallelism.clone();
361 let request_sender = request_sender.clone();
362 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
363 _ = periodic_event_interval.tick() => {
364 let progress_list = get_task_progress(task_progress.clone());
365
366 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
367 event: Some(RequestEvent::HeartBeat(
368 HeartBeat {
369 progress: progress_list
370 }
371 )),
372 create_at: SystemTime::now()
373 .duration_since(std::time::UNIX_EPOCH)
374 .expect("Clock may have gone backwards")
375 .as_millis() as u64,
376 }) {
377 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
378 continue 'start_stream;
380 }
381
382
383 let mut pending_pull_task_count = 0;
384 if pull_task_ack {
385 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
387
388 if pending_pull_task_count > 0 {
389 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
390 event: Some(RequestEvent::PullTask(
391 PullTask {
392 pull_task_count: pending_pull_task_count,
393 }
394 )),
395 create_at: SystemTime::now()
396 .duration_since(std::time::UNIX_EPOCH)
397 .expect("Clock may have gone backwards")
398 .as_millis() as u64,
399 }) {
400 tracing::warn!(error = %e.as_report(), "Failed to pull task");
401
402 continue 'start_stream;
404 } else {
405 pull_task_ack = false;
406 }
407 }
408 }
409
410 tracing::info!(
411 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
412 pull_task_ack = %pull_task_ack,
413 pending_pull_task_count = %pending_pull_task_count
414 );
415
416 continue;
417 }
418 event = response_event_stream.next() => {
419 event
420 }
421
422 _ = &mut shutdown_rx => {
423 tracing::info!("Compactor is shutting down");
424 return
425 }
426 };
427
428 fn send_report_task_event(
429 compact_task: &CompactTask,
430 table_stats: TableStatsMap,
431 object_timestamps: HashMap<HummockSstableObjectId, u64>,
432 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
433 ) {
434 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
435 event: Some(RequestEvent::ReportTask(ReportTask {
436 task_id: compact_task.task_id,
437 task_status: compact_task.task_status.into(),
438 sorted_output_ssts: compact_task
439 .sorted_output_ssts
440 .iter()
441 .map(|sst| sst.into())
442 .collect(),
443 table_stats_change: to_prost_table_stats_map(table_stats),
444 object_timestamps,
445 })),
446 create_at: SystemTime::now()
447 .duration_since(std::time::UNIX_EPOCH)
448 .expect("Clock may have gone backwards")
449 .as_millis() as u64,
450 }) {
451 let task_id = compact_task.task_id;
452 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
453 }
454 }
455
456 match event {
457 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
458 let event = match event {
459 Some(event) => event,
460 None => continue 'consume_stream,
461 };
462 let shutdown = shutdown_map.clone();
463 let context = compactor_context.clone();
464 let consumed_latency_ms = SystemTime::now()
465 .duration_since(std::time::UNIX_EPOCH)
466 .expect("Clock may have gone backwards")
467 .as_millis() as u64
468 - create_at;
469 context
470 .compactor_metrics
471 .compaction_event_consumed_latency
472 .observe(consumed_latency_ms as _);
473
474 let sstable_object_id_manager = sstable_object_id_manager.clone();
475 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
476
477 match event {
478 ResponseEvent::CompactTask(compact_task) => {
479 let compact_task = CompactTask::from(compact_task);
480 let parallelism =
481 calculate_task_parallelism(&compact_task, &context);
482
483 assert_ne!(parallelism, 0, "splits cannot be empty");
484
485 if (max_task_parallelism
486 - running_task_parallelism.load(Ordering::SeqCst))
487 < parallelism as u32
488 {
489 tracing::warn!(
490 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
491 compact_task.task_id,
492 parallelism,
493 max_task_parallelism,
494 running_task_parallelism.load(Ordering::Relaxed),
495 );
496 let (compact_task, table_stats, object_timestamps) =
497 compact_done(
498 compact_task,
499 context.clone(),
500 vec![],
501 TaskStatus::NoAvailCpuResourceCanceled,
502 );
503
504 send_report_task_event(
505 &compact_task,
506 table_stats,
507 object_timestamps,
508 &request_sender,
509 );
510
511 continue 'consume_stream;
512 }
513
514 running_task_parallelism
515 .fetch_add(parallelism as u32, Ordering::SeqCst);
516 executor.spawn(async move {
517 let (tx, rx) = tokio::sync::oneshot::channel();
518 let task_id = compact_task.task_id;
519 shutdown.lock().unwrap().insert(task_id, tx);
520
521 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
522 context.clone(),
523 compact_task,
524 rx,
525 Box::new(sstable_object_id_manager.clone()),
526 compaction_catalog_manager_ref.clone(),
527 )
528 .await;
529
530 shutdown.lock().unwrap().remove(&task_id);
531 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
532
533 send_report_task_event(
534 &compact_task,
535 table_stats,
536 object_timestamps,
537 &request_sender,
538 );
539
540 let enable_check_compaction_result =
541 context.storage_opts.check_compaction_result;
542 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
543
544 if enable_check_compaction_result && need_check_task {
545 let compact_table_ids = compact_task.build_compact_table_ids();
546 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
547 Ok(compaction_catalog_agent_ref) => {
548 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
549 {
550 Err(e) => {
551 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
552 }
553 Ok(true) => (),
554 Ok(false) => {
555 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
556 }
557 }
558 },
559 Err(e) => {
560 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
561 }
562 }
563 }
564 });
565 }
566 ResponseEvent::VacuumTask(_) => {
567 unreachable!("unexpected vacuum task");
568 }
569 ResponseEvent::FullScanTask(_) => {
570 unreachable!("unexpected scan task");
571 }
572 ResponseEvent::ValidationTask(validation_task) => {
573 let validation_task = ValidationTask::from(validation_task);
574 executor.spawn(async move {
575 validate_ssts(validation_task, context.sstable_store.clone())
576 .await;
577 });
578 }
579 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
580 .lock()
581 .unwrap()
582 .remove(&cancel_compact_task.task_id)
583 {
584 Some(tx) => {
585 if tx.send(()).is_err() {
586 tracing::warn!(
587 "Cancellation of compaction task failed. task_id: {}",
588 cancel_compact_task.task_id
589 );
590 }
591 }
592 _ => {
593 tracing::warn!(
594 "Attempting to cancel non-existent compaction task. task_id: {}",
595 cancel_compact_task.task_id
596 );
597 }
598 },
599
600 ResponseEvent::PullTaskAck(_pull_task_ack) => {
601 pull_task_ack = true;
603 }
604 }
605 }
606 Some(Err(e)) => {
607 tracing::warn!("Failed to consume stream. {}", e.message());
608 continue 'start_stream;
609 }
610 _ => {
611 continue 'start_stream;
613 }
614 }
615 }
616 }
617 });
618
619 (join_handle, shutdown_tx)
620}
621
622#[cfg_attr(coverage, coverage(off))]
625#[must_use]
626pub fn start_shared_compactor(
627 grpc_proxy_client: GrpcCompactorProxyClient,
628 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
629 context: CompactorContext,
630) -> (JoinHandle<()>, Sender<()>) {
631 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
632 let task_progress = context.task_progress_manager.clone();
633 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
634 let periodic_event_update_interval = Duration::from_millis(1000);
635
636 let join_handle = tokio::spawn(async move {
637 let shutdown_map = CompactionShutdownMap::default();
638
639 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
640 let executor = context.compaction_executor.clone();
641 let report_heartbeat_client = grpc_proxy_client.clone();
642 'consume_stream: loop {
643 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
644 _ = periodic_event_interval.tick() => {
645 let progress_list = get_task_progress(task_progress.clone());
646 let report_compaction_task_request = ReportCompactionTaskRequest{
647 event: Some(ReportCompactionTaskEvent::HeartBeat(
648 SharedHeartBeat {
649 progress: progress_list
650 }
651 )),
652 };
653 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
654 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
655 }
656 continue
657 }
658
659
660 _ = &mut shutdown_rx => {
661 tracing::info!("Compactor is shutting down");
662 return
663 }
664
665 request = receiver.recv() => {
666 request
667 }
668
669 };
670 match request {
671 Some(request) => {
672 let context = context.clone();
673 let shutdown = shutdown_map.clone();
674
675 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
676 executor.spawn(async move {
677 let DispatchCompactionTaskRequest {
678 tables,
679 output_object_ids,
680 task: dispatch_task,
681 } = request.into_inner();
682 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
683 acc.insert(table.id, table);
684 acc
685 });
686
687 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
688 output_object_ids_deque.extend(output_object_ids);
689 let shared_compactor_object_id_manager =
690 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
691 match dispatch_task.unwrap() {
692 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
693 let compact_task = CompactTask::from(&compact_task);
694 let (tx, rx) = tokio::sync::oneshot::channel();
695 let task_id = compact_task.task_id;
696 shutdown.lock().unwrap().insert(task_id, tx);
697
698 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
699 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
700 context.clone(),
701 compact_task,
702 rx,
703 Box::new(shared_compactor_object_id_manager),
704 compaction_catalog_agent_ref.clone(),
705 )
706 .await;
707 shutdown.lock().unwrap().remove(&task_id);
708 let report_compaction_task_request = ReportCompactionTaskRequest {
709 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
710 compact_task: Some(PbCompactTask::from(&compact_task)),
711 table_stats_change: to_prost_table_stats_map(table_stats),
712 object_timestamps,
713 })),
714 };
715
716 match cloned_grpc_proxy_client
717 .report_compaction_task(report_compaction_task_request)
718 .await
719 {
720 Ok(_) => {
721 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
723 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
724 if enable_check_compaction_result && need_check_task {
725 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
726 Err(e) => {
727 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
728 },
729 Ok(true) => (),
730 Ok(false) => {
731 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
732 }
733 }
734 }
735 }
736 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
737 }
738
739 }
740 dispatch_compaction_task_request::Task::VacuumTask(_) => {
741 unreachable!("unexpected vacuum task");
742 }
743 dispatch_compaction_task_request::Task::FullScanTask(_) => {
744 unreachable!("unexpected scan task");
745 }
746 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
747 let validation_task = ValidationTask::from(validation_task);
748 validate_ssts(validation_task, context.sstable_store.clone()).await;
749 }
750 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
751 match shutdown
752 .lock()
753 .unwrap()
754 .remove(&cancel_compact_task.task_id)
755 { Some(tx) => {
756 if tx.send(()).is_err() {
757 tracing::warn!(
758 "Cancellation of compaction task failed. task_id: {}",
759 cancel_compact_task.task_id
760 );
761 }
762 } _ => {
763 tracing::warn!(
764 "Attempting to cancel non-existent compaction task. task_id: {}",
765 cancel_compact_task.task_id
766 );
767 }}
768 }
769 }
770 });
771 }
772 None => continue 'consume_stream,
773 }
774 }
775 });
776 (join_handle, shutdown_tx)
777}
778
779fn get_task_progress(
780 task_progress: Arc<
781 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
782 >,
783) -> Vec<CompactTaskProgress> {
784 let mut progress_list = Vec::new();
785 for (&task_id, progress) in &*task_progress.lock() {
786 progress_list.push(progress.snapshot(task_id));
787 }
788 progress_list
789}