1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25 ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29 subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53 TtlCompactionFilter,
54};
55pub use context::{
56 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72 SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82 check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95 IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100 UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104pub struct Compactor {
106 context: CompactorContext,
108 object_id_getter: Arc<dyn GetObjectId>,
109 task_config: TaskConfig,
110 options: SstableBuilderOptions,
111 get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117 pub fn new(
119 context: CompactorContext,
120 options: SstableBuilderOptions,
121 task_config: TaskConfig,
122 object_id_getter: Arc<dyn GetObjectId>,
123 ) -> Self {
124 Self {
125 context,
126 options,
127 task_config,
128 get_id_time: Arc::new(AtomicU64::new(0)),
129 object_id_getter,
130 }
131 }
132
133 async fn compact_key_range(
138 &self,
139 iter: impl HummockIterator<Direction = Forward>,
140 compaction_filter: impl CompactionFilter,
141 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142 task_progress: Option<Arc<TaskProgress>>,
143 task_id: Option<HummockCompactionTaskId>,
144 split_index: Option<usize>,
145 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146 let compact_timer = if self.context.is_share_buffer_compact {
148 self.context
149 .compactor_metrics
150 .write_build_l0_sst_duration
151 .start_timer()
152 } else {
153 self.context
154 .compactor_metrics
155 .compact_sst_duration
156 .start_timer()
157 };
158
159 let (split_table_outputs, table_stats_map) = {
160 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161 if self.task_config.use_block_based_filter {
162 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163 factory,
164 iter,
165 compaction_filter,
166 compaction_catalog_agent_ref,
167 task_progress.clone(),
168 self.object_id_getter.clone(),
169 )
170 .instrument_await("compact".verbose())
171 .await?
172 } else {
173 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174 factory,
175 iter,
176 compaction_filter,
177 compaction_catalog_agent_ref,
178 task_progress.clone(),
179 self.object_id_getter.clone(),
180 )
181 .instrument_await("compact".verbose())
182 .await?
183 }
184 };
185
186 compact_timer.observe_duration();
187
188 Self::report_progress(
189 self.context.compactor_metrics.clone(),
190 task_progress,
191 &split_table_outputs,
192 self.context.is_share_buffer_compact,
193 );
194
195 self.context
196 .compactor_metrics
197 .get_table_id_total_time_duration
198 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200 debug_assert!(
201 split_table_outputs
202 .iter()
203 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204 );
205
206 if task_id.is_some() {
207 tracing::info!(
209 "Finish Task {:?} split_index {:?} sst count {}",
210 task_id,
211 split_index,
212 split_table_outputs.len()
213 );
214 }
215 Ok((split_table_outputs, table_stats_map))
216 }
217
218 pub fn report_progress(
219 metrics: Arc<CompactorMetrics>,
220 task_progress: Option<Arc<TaskProgress>>,
221 ssts: &Vec<LocalSstableInfo>,
222 is_share_buffer_compact: bool,
223 ) {
224 for sst_info in ssts {
225 let sst_size = sst_info.file_size();
226 if let Some(tracker) = &task_progress {
227 tracker.inc_ssts_uploaded();
228 tracker.dec_num_pending_write_io();
229 }
230 if is_share_buffer_compact {
231 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232 } else {
233 metrics.compaction_upload_sst_counts.inc();
234 }
235 }
236 }
237
238 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239 &self,
240 writer_factory: F,
241 iter: impl HummockIterator<Direction = Forward>,
242 compaction_filter: impl CompactionFilter,
243 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244 task_progress: Option<Arc<TaskProgress>>,
245 object_id_getter: Arc<dyn GetObjectId>,
246 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247 let builder_factory = RemoteBuilderFactory::<F, B> {
248 object_id_getter,
249 limiter: self.context.memory_limiter.clone(),
250 options: self.options.clone(),
251 policy: self.task_config.cache_policy,
252 remote_rpc_cost: self.get_id_time.clone(),
253 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254 sstable_writer_factory: writer_factory,
255 _phantom: PhantomData,
256 };
257
258 let mut sst_builder = CapacitySplitTableBuilder::new(
259 builder_factory,
260 self.context.compactor_metrics.clone(),
261 task_progress.clone(),
262 self.task_config.table_vnode_partition.clone(),
263 self.context
264 .storage_opts
265 .compactor_concurrent_uploading_sst_count,
266 compaction_catalog_agent_ref,
267 );
268 let compaction_statistics = compact_and_build_sst(
269 &mut sst_builder,
270 &self.task_config,
271 self.context.compactor_metrics.clone(),
272 iter,
273 compaction_filter,
274 )
275 .instrument_await("compact_and_build_sst".verbose())
276 .await?;
277
278 let ssts = sst_builder
279 .finish()
280 .instrument_await("builder_finish".verbose())
281 .await?;
282
283 Ok((ssts, compaction_statistics))
284 }
285}
286
287#[cfg_attr(coverage, coverage(off))]
290#[must_use]
291pub fn start_compactor_iceberg(
292 compactor_context: CompactorContext,
293 hummock_meta_client: Arc<dyn HummockMetaClient>,
294) -> (JoinHandle<()>, Sender<()>) {
295 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
296 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
297 let stream_retry_interval = Duration::from_secs(30);
298 let periodic_event_update_interval = Duration::from_millis(1000);
299 let worker_num = compactor_context.compaction_executor.worker_num();
300
301 let max_task_parallelism: u32 = (worker_num as f32
302 * compactor_context.storage_opts.compactor_max_task_multiplier)
303 .ceil() as u32;
304 let running_task_parallelism = Arc::new(AtomicU32::new(0));
305
306 const MAX_PULL_TASK_COUNT: u32 = 4;
307 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
308
309 assert_ge!(
310 compactor_context.storage_opts.compactor_max_task_multiplier,
311 0.0
312 );
313
314 let join_handle = tokio::spawn(async move {
315 let shutdown_map = CompactionShutdownMap::default();
316 let mut min_interval = tokio::time::interval(stream_retry_interval);
317 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319 'start_stream: loop {
321 let mut pull_task_ack = true;
324 tokio::select! {
325 _ = min_interval.tick() => {},
327 _ = &mut shutdown_rx => {
329 tracing::info!("Compactor is shutting down");
330 return;
331 }
332 }
333
334 let (request_sender, response_event_stream) = match hummock_meta_client
335 .subscribe_iceberg_compaction_event()
336 .await
337 {
338 Ok((request_sender, response_event_stream)) => {
339 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340 (request_sender, response_event_stream)
341 }
342
343 Err(e) => {
344 tracing::warn!(
345 error = %e.as_report(),
346 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347 );
348 continue 'start_stream;
349 }
350 };
351
352 pin_mut!(response_event_stream);
353
354 let executor = compactor_context.compaction_executor.clone();
355
356 let mut event_loop_iteration_now = Instant::now();
358 'consume_stream: loop {
359 {
360 compactor_context
362 .compactor_metrics
363 .compaction_event_loop_iteration_latency
364 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365 event_loop_iteration_now = Instant::now();
366 }
367
368 let running_task_parallelism = running_task_parallelism.clone();
369 let request_sender = request_sender.clone();
370 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371 _ = periodic_event_interval.tick() => {
372 let mut pending_pull_task_count = 0;
373 if pull_task_ack {
374 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377 if pending_pull_task_count > 0 {
378 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380 subscribe_iceberg_compaction_event_request::PullTask {
381 pull_task_count: pending_pull_task_count,
382 }
383 )),
384 create_at: SystemTime::now()
385 .duration_since(std::time::UNIX_EPOCH)
386 .expect("Clock may have gone backwards")
387 .as_millis() as u64,
388 }) {
389 tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391 continue 'start_stream;
393 } else {
394 pull_task_ack = false;
395 }
396 }
397 }
398
399 tracing::info!(
400 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401 pull_task_ack = %pull_task_ack,
402 pending_pull_task_count = %pending_pull_task_count
403 );
404
405 continue;
406 }
407 event = response_event_stream.next() => {
408 event
409 }
410
411 _ = &mut shutdown_rx => {
412 tracing::info!("Iceberg Compactor is shutting down");
413 return
414 }
415 };
416
417 match event {
418 Some(Ok(SubscribeIcebergCompactionEventResponse {
419 event,
420 create_at: _create_at,
421 })) => {
422 let event = match event {
423 Some(event) => event,
424 None => continue 'consume_stream,
425 };
426 let shutdown = shutdown_map.clone();
428 match event {
429 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
430 let task_id = iceberg_compaction_task.task_id;
431 let write_parquet_properties = WriterProperties::builder()
432 .set_created_by(concat!(
433 "risingwave version ",
434 env!("CARGO_PKG_VERSION")
435 )
436 .to_owned())
437 .set_max_row_group_size(
438 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
439 )
440 .set_compression(Compression::SNAPPY) .build();
442
443 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
444 .max_parallelism(worker_num as u32)
445 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
446 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
447 .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
448 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
449 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
450 .write_parquet_properties(write_parquet_properties)
451 .build() {
452 Ok(config) => config,
453 Err(e) => {
454 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
455 continue 'consume_stream;
456 }
457 };
458
459
460 let iceberg_runner = match IcebergCompactorRunner::new(
461 iceberg_compaction_task,
462 compactor_runner_config,
463 compactor_context.compactor_metrics.clone(),
464 ).await {
465 Ok(runner) => runner,
466 Err(e) => {
467 tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
468 continue 'consume_stream;
469 }
470 };
471
472 executor.spawn(async move {
473 let (tx, rx) = tokio::sync::oneshot::channel();
474 shutdown.lock().unwrap().insert(task_id, tx);
475
476 if let Err(e) = iceberg_runner.compact(
477 RunnerContext::new(
478 max_task_parallelism,
479 running_task_parallelism.clone(),
480 ),
481 rx,
482 )
483 .await {
484 tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
485 }
486
487 shutdown.lock().unwrap().remove(&task_id);
488 });
489 },
490 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
491 pull_task_ack = true;
493 },
494 }
495 }
496 Some(Err(e)) => {
497 tracing::warn!("Failed to consume stream. {}", e.message());
498 continue 'start_stream;
499 }
500 _ => {
501 continue 'start_stream;
503 }
504 }
505 }
506 }
507 });
508
509 (join_handle, shutdown_tx)
510}
511
512#[cfg_attr(coverage, coverage(off))]
515#[must_use]
516pub fn start_compactor(
517 compactor_context: CompactorContext,
518 hummock_meta_client: Arc<dyn HummockMetaClient>,
519 object_id_manager: Arc<ObjectIdManager>,
520 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
521) -> (JoinHandle<()>, Sender<()>) {
522 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
523 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
524 let stream_retry_interval = Duration::from_secs(30);
525 let task_progress = compactor_context.task_progress_manager.clone();
526 let periodic_event_update_interval = Duration::from_millis(1000);
527
528 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
529 * compactor_context.storage_opts.compactor_max_task_multiplier)
530 .ceil() as u32;
531 let running_task_parallelism = Arc::new(AtomicU32::new(0));
532
533 const MAX_PULL_TASK_COUNT: u32 = 4;
534 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
535
536 assert_ge!(
537 compactor_context.storage_opts.compactor_max_task_multiplier,
538 0.0
539 );
540
541 let join_handle = tokio::spawn(async move {
542 let shutdown_map = CompactionShutdownMap::default();
543 let mut min_interval = tokio::time::interval(stream_retry_interval);
544 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
545
546 'start_stream: loop {
548 let mut pull_task_ack = true;
551 tokio::select! {
552 _ = min_interval.tick() => {},
554 _ = &mut shutdown_rx => {
556 tracing::info!("Compactor is shutting down");
557 return;
558 }
559 }
560
561 let (request_sender, response_event_stream) =
562 match hummock_meta_client.subscribe_compaction_event().await {
563 Ok((request_sender, response_event_stream)) => {
564 tracing::debug!("Succeeded subscribe_compaction_event.");
565 (request_sender, response_event_stream)
566 }
567
568 Err(e) => {
569 tracing::warn!(
570 error = %e.as_report(),
571 "Subscribing to compaction tasks failed with error. Will retry.",
572 );
573 continue 'start_stream;
574 }
575 };
576
577 pin_mut!(response_event_stream);
578
579 let executor = compactor_context.compaction_executor.clone();
580 let object_id_manager = object_id_manager.clone();
581
582 let mut event_loop_iteration_now = Instant::now();
584 'consume_stream: loop {
585 {
586 compactor_context
588 .compactor_metrics
589 .compaction_event_loop_iteration_latency
590 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
591 event_loop_iteration_now = Instant::now();
592 }
593
594 let running_task_parallelism = running_task_parallelism.clone();
595 let request_sender = request_sender.clone();
596 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
597 _ = periodic_event_interval.tick() => {
598 let progress_list = get_task_progress(task_progress.clone());
599
600 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
601 event: Some(RequestEvent::HeartBeat(
602 HeartBeat {
603 progress: progress_list
604 }
605 )),
606 create_at: SystemTime::now()
607 .duration_since(std::time::UNIX_EPOCH)
608 .expect("Clock may have gone backwards")
609 .as_millis() as u64,
610 }) {
611 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
612 continue 'start_stream;
614 }
615
616
617 let mut pending_pull_task_count = 0;
618 if pull_task_ack {
619 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
621
622 if pending_pull_task_count > 0 {
623 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
624 event: Some(RequestEvent::PullTask(
625 PullTask {
626 pull_task_count: pending_pull_task_count,
627 }
628 )),
629 create_at: SystemTime::now()
630 .duration_since(std::time::UNIX_EPOCH)
631 .expect("Clock may have gone backwards")
632 .as_millis() as u64,
633 }) {
634 tracing::warn!(error = %e.as_report(), "Failed to pull task");
635
636 continue 'start_stream;
638 } else {
639 pull_task_ack = false;
640 }
641 }
642 }
643
644 tracing::info!(
645 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
646 pull_task_ack = %pull_task_ack,
647 pending_pull_task_count = %pending_pull_task_count
648 );
649
650 continue;
651 }
652 event = response_event_stream.next() => {
653 event
654 }
655
656 _ = &mut shutdown_rx => {
657 tracing::info!("Compactor is shutting down");
658 return
659 }
660 };
661
662 fn send_report_task_event(
663 compact_task: &CompactTask,
664 table_stats: TableStatsMap,
665 object_timestamps: HashMap<HummockSstableObjectId, u64>,
666 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
667 ) {
668 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
669 event: Some(RequestEvent::ReportTask(ReportTask {
670 task_id: compact_task.task_id,
671 task_status: compact_task.task_status.into(),
672 sorted_output_ssts: compact_task
673 .sorted_output_ssts
674 .iter()
675 .map(|sst| sst.into())
676 .collect(),
677 table_stats_change: to_prost_table_stats_map(table_stats),
678 object_timestamps: object_timestamps
679 .into_iter()
680 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
681 .collect(),
682 })),
683 create_at: SystemTime::now()
684 .duration_since(std::time::UNIX_EPOCH)
685 .expect("Clock may have gone backwards")
686 .as_millis() as u64,
687 }) {
688 let task_id = compact_task.task_id;
689 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
690 }
691 }
692
693 match event {
694 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
695 let event = match event {
696 Some(event) => event,
697 None => continue 'consume_stream,
698 };
699 let shutdown = shutdown_map.clone();
700 let context = compactor_context.clone();
701 let consumed_latency_ms = SystemTime::now()
702 .duration_since(std::time::UNIX_EPOCH)
703 .expect("Clock may have gone backwards")
704 .as_millis() as u64
705 - create_at;
706 context
707 .compactor_metrics
708 .compaction_event_consumed_latency
709 .observe(consumed_latency_ms as _);
710
711 let object_id_manager = object_id_manager.clone();
712 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
713
714 match event {
715 ResponseEvent::CompactTask(compact_task) => {
716 let compact_task = CompactTask::from(compact_task);
717 let parallelism =
718 calculate_task_parallelism(&compact_task, &context);
719
720 assert_ne!(parallelism, 0, "splits cannot be empty");
721
722 if (max_task_parallelism
723 - running_task_parallelism.load(Ordering::SeqCst))
724 < parallelism as u32
725 {
726 tracing::warn!(
727 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
728 compact_task.task_id,
729 parallelism,
730 max_task_parallelism,
731 running_task_parallelism.load(Ordering::Relaxed),
732 );
733 let (compact_task, table_stats, object_timestamps) =
734 compact_done(
735 compact_task,
736 context.clone(),
737 vec![],
738 TaskStatus::NoAvailCpuResourceCanceled,
739 );
740
741 send_report_task_event(
742 &compact_task,
743 table_stats,
744 object_timestamps,
745 &request_sender,
746 );
747
748 continue 'consume_stream;
749 }
750
751 running_task_parallelism
752 .fetch_add(parallelism as u32, Ordering::SeqCst);
753 executor.spawn(async move {
754 let (tx, rx) = tokio::sync::oneshot::channel();
755 let task_id = compact_task.task_id;
756 shutdown.lock().unwrap().insert(task_id, tx);
757
758 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
759 context.clone(),
760 compact_task,
761 rx,
762 object_id_manager.clone(),
763 compaction_catalog_manager_ref.clone(),
764 )
765 .await;
766
767 shutdown.lock().unwrap().remove(&task_id);
768 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
769
770 send_report_task_event(
771 &compact_task,
772 table_stats,
773 object_timestamps,
774 &request_sender,
775 );
776
777 let enable_check_compaction_result =
778 context.storage_opts.check_compaction_result;
779 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
780
781 if enable_check_compaction_result && need_check_task {
782 let compact_table_ids = compact_task.build_compact_table_ids();
783 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
784 Ok(compaction_catalog_agent_ref) => {
785 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
786 {
787 Err(e) => {
788 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
789 }
790 Ok(true) => (),
791 Ok(false) => {
792 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
793 }
794 }
795 },
796 Err(e) => {
797 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
798 }
799 }
800 }
801 });
802 }
803 ResponseEvent::VacuumTask(_) => {
804 unreachable!("unexpected vacuum task");
805 }
806 ResponseEvent::FullScanTask(_) => {
807 unreachable!("unexpected scan task");
808 }
809 ResponseEvent::ValidationTask(validation_task) => {
810 let validation_task = ValidationTask::from(validation_task);
811 executor.spawn(async move {
812 validate_ssts(validation_task, context.sstable_store.clone())
813 .await;
814 });
815 }
816 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
817 .lock()
818 .unwrap()
819 .remove(&cancel_compact_task.task_id)
820 {
821 Some(tx) => {
822 if tx.send(()).is_err() {
823 tracing::warn!(
824 "Cancellation of compaction task failed. task_id: {}",
825 cancel_compact_task.task_id
826 );
827 }
828 }
829 _ => {
830 tracing::warn!(
831 "Attempting to cancel non-existent compaction task. task_id: {}",
832 cancel_compact_task.task_id
833 );
834 }
835 },
836
837 ResponseEvent::PullTaskAck(_pull_task_ack) => {
838 pull_task_ack = true;
840 }
841 }
842 }
843 Some(Err(e)) => {
844 tracing::warn!("Failed to consume stream. {}", e.message());
845 continue 'start_stream;
846 }
847 _ => {
848 continue 'start_stream;
850 }
851 }
852 }
853 }
854 });
855
856 (join_handle, shutdown_tx)
857}
858
859#[cfg_attr(coverage, coverage(off))]
862#[must_use]
863pub fn start_shared_compactor(
864 grpc_proxy_client: GrpcCompactorProxyClient,
865 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
866 context: CompactorContext,
867) -> (JoinHandle<()>, Sender<()>) {
868 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
869 let task_progress = context.task_progress_manager.clone();
870 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
871 let periodic_event_update_interval = Duration::from_millis(1000);
872
873 let join_handle = tokio::spawn(async move {
874 let shutdown_map = CompactionShutdownMap::default();
875
876 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
877 let executor = context.compaction_executor.clone();
878 let report_heartbeat_client = grpc_proxy_client.clone();
879 'consume_stream: loop {
880 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
881 _ = periodic_event_interval.tick() => {
882 let progress_list = get_task_progress(task_progress.clone());
883 let report_compaction_task_request = ReportCompactionTaskRequest{
884 event: Some(ReportCompactionTaskEvent::HeartBeat(
885 SharedHeartBeat {
886 progress: progress_list
887 }
888 )),
889 };
890 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
891 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
892 }
893 continue
894 }
895
896
897 _ = &mut shutdown_rx => {
898 tracing::info!("Compactor is shutting down");
899 return
900 }
901
902 request = receiver.recv() => {
903 request
904 }
905
906 };
907 match request {
908 Some(request) => {
909 let context = context.clone();
910 let shutdown = shutdown_map.clone();
911
912 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
913 executor.spawn(async move {
914 let DispatchCompactionTaskRequest {
915 tables,
916 output_object_ids,
917 task: dispatch_task,
918 } = request.into_inner();
919 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
920 acc.insert(table.id, table);
921 acc
922 });
923
924 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
925 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
926 let shared_compactor_object_id_manager =
927 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
928 match dispatch_task.unwrap() {
929 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
930 let compact_task = CompactTask::from(&compact_task);
931 let (tx, rx) = tokio::sync::oneshot::channel();
932 let task_id = compact_task.task_id;
933 shutdown.lock().unwrap().insert(task_id, tx);
934
935 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
936 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
937 context.clone(),
938 compact_task,
939 rx,
940 shared_compactor_object_id_manager,
941 compaction_catalog_agent_ref.clone(),
942 )
943 .await;
944 shutdown.lock().unwrap().remove(&task_id);
945 let report_compaction_task_request = ReportCompactionTaskRequest {
946 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
947 compact_task: Some(PbCompactTask::from(&compact_task)),
948 table_stats_change: to_prost_table_stats_map(table_stats),
949 object_timestamps: object_timestamps
950 .into_iter()
951 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
952 .collect(),
953 })),
954 };
955
956 match cloned_grpc_proxy_client
957 .report_compaction_task(report_compaction_task_request)
958 .await
959 {
960 Ok(_) => {
961 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
963 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
964 if enable_check_compaction_result && need_check_task {
965 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
966 Err(e) => {
967 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
968 },
969 Ok(true) => (),
970 Ok(false) => {
971 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
972 }
973 }
974 }
975 }
976 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
977 }
978
979 }
980 dispatch_compaction_task_request::Task::VacuumTask(_) => {
981 unreachable!("unexpected vacuum task");
982 }
983 dispatch_compaction_task_request::Task::FullScanTask(_) => {
984 unreachable!("unexpected scan task");
985 }
986 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
987 let validation_task = ValidationTask::from(validation_task);
988 validate_ssts(validation_task, context.sstable_store.clone()).await;
989 }
990 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
991 match shutdown
992 .lock()
993 .unwrap()
994 .remove(&cancel_compact_task.task_id)
995 { Some(tx) => {
996 if tx.send(()).is_err() {
997 tracing::warn!(
998 "Cancellation of compaction task failed. task_id: {}",
999 cancel_compact_task.task_id
1000 );
1001 }
1002 } _ => {
1003 tracing::warn!(
1004 "Attempting to cancel non-existent compaction task. task_id: {}",
1005 cancel_compact_task.task_id
1006 );
1007 }}
1008 }
1009 }
1010 });
1011 }
1012 None => continue 'consume_stream,
1013 }
1014 }
1015 });
1016 (join_handle, shutdown_tx)
1017}
1018
1019fn get_task_progress(
1020 task_progress: Arc<
1021 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1022 >,
1023) -> Vec<CompactTaskProgress> {
1024 let mut progress_list = Vec::new();
1025 for (&task_id, progress) in &*task_progress.lock() {
1026 progress_list.push(progress.snapshot(task_id));
1027 }
1028 progress_list
1029}