1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25 ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29 subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, HashSet, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53 TtlCompactionFilter,
54};
55pub use context::{
56 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72 SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82 check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95 IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100 UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104pub struct Compactor {
106 context: CompactorContext,
108 object_id_getter: Arc<dyn GetObjectId>,
109 task_config: TaskConfig,
110 options: SstableBuilderOptions,
111 get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117 pub fn new(
119 context: CompactorContext,
120 options: SstableBuilderOptions,
121 task_config: TaskConfig,
122 object_id_getter: Arc<dyn GetObjectId>,
123 ) -> Self {
124 Self {
125 context,
126 options,
127 task_config,
128 get_id_time: Arc::new(AtomicU64::new(0)),
129 object_id_getter,
130 }
131 }
132
133 async fn compact_key_range(
138 &self,
139 iter: impl HummockIterator<Direction = Forward>,
140 compaction_filter: impl CompactionFilter,
141 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142 task_progress: Option<Arc<TaskProgress>>,
143 task_id: Option<HummockCompactionTaskId>,
144 split_index: Option<usize>,
145 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146 let compact_timer = if self.context.is_share_buffer_compact {
148 self.context
149 .compactor_metrics
150 .write_build_l0_sst_duration
151 .start_timer()
152 } else {
153 self.context
154 .compactor_metrics
155 .compact_sst_duration
156 .start_timer()
157 };
158
159 let (split_table_outputs, table_stats_map) = {
160 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161 if self.task_config.use_block_based_filter {
162 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163 factory,
164 iter,
165 compaction_filter,
166 compaction_catalog_agent_ref,
167 task_progress.clone(),
168 self.object_id_getter.clone(),
169 )
170 .instrument_await("compact".verbose())
171 .await?
172 } else {
173 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174 factory,
175 iter,
176 compaction_filter,
177 compaction_catalog_agent_ref,
178 task_progress.clone(),
179 self.object_id_getter.clone(),
180 )
181 .instrument_await("compact".verbose())
182 .await?
183 }
184 };
185
186 compact_timer.observe_duration();
187
188 Self::report_progress(
189 self.context.compactor_metrics.clone(),
190 task_progress,
191 &split_table_outputs,
192 self.context.is_share_buffer_compact,
193 );
194
195 self.context
196 .compactor_metrics
197 .get_table_id_total_time_duration
198 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200 debug_assert!(
201 split_table_outputs
202 .iter()
203 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204 );
205
206 if task_id.is_some() {
207 tracing::info!(
209 "Finish Task {:?} split_index {:?} sst count {}",
210 task_id,
211 split_index,
212 split_table_outputs.len()
213 );
214 }
215 Ok((split_table_outputs, table_stats_map))
216 }
217
218 pub fn report_progress(
219 metrics: Arc<CompactorMetrics>,
220 task_progress: Option<Arc<TaskProgress>>,
221 ssts: &Vec<LocalSstableInfo>,
222 is_share_buffer_compact: bool,
223 ) {
224 for sst_info in ssts {
225 let sst_size = sst_info.file_size();
226 if let Some(tracker) = &task_progress {
227 tracker.inc_ssts_uploaded();
228 tracker.dec_num_pending_write_io();
229 }
230 if is_share_buffer_compact {
231 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232 } else {
233 metrics.compaction_upload_sst_counts.inc();
234 }
235 }
236 }
237
238 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239 &self,
240 writer_factory: F,
241 iter: impl HummockIterator<Direction = Forward>,
242 compaction_filter: impl CompactionFilter,
243 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244 task_progress: Option<Arc<TaskProgress>>,
245 object_id_getter: Arc<dyn GetObjectId>,
246 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247 let builder_factory = RemoteBuilderFactory::<F, B> {
248 object_id_getter,
249 limiter: self.context.memory_limiter.clone(),
250 options: self.options.clone(),
251 policy: self.task_config.cache_policy,
252 remote_rpc_cost: self.get_id_time.clone(),
253 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254 sstable_writer_factory: writer_factory,
255 _phantom: PhantomData,
256 };
257
258 let mut sst_builder = CapacitySplitTableBuilder::new(
259 builder_factory,
260 self.context.compactor_metrics.clone(),
261 task_progress.clone(),
262 self.task_config.table_vnode_partition.clone(),
263 self.context
264 .storage_opts
265 .compactor_concurrent_uploading_sst_count,
266 compaction_catalog_agent_ref,
267 );
268 let compaction_statistics = compact_and_build_sst(
269 &mut sst_builder,
270 &self.task_config,
271 self.context.compactor_metrics.clone(),
272 iter,
273 compaction_filter,
274 )
275 .instrument_await("compact_and_build_sst".verbose())
276 .await?;
277
278 let ssts = sst_builder
279 .finish()
280 .instrument_await("builder_finish".verbose())
281 .await?;
282
283 Ok((ssts, compaction_statistics))
284 }
285}
286
287#[must_use]
290pub fn start_iceberg_compactor(
291 compactor_context: CompactorContext,
292 hummock_meta_client: Arc<dyn HummockMetaClient>,
293) -> (JoinHandle<()>, Sender<()>) {
294 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295 let stream_retry_interval = Duration::from_secs(30);
296 let periodic_event_update_interval = Duration::from_millis(1000);
297 let worker_num = compactor_context.compaction_executor.worker_num();
298
299 let max_task_parallelism: u32 = (worker_num as f32
300 * compactor_context.storage_opts.compactor_max_task_multiplier)
301 .ceil() as u32;
302 let running_task_parallelism = Arc::new(AtomicU32::new(0));
303
304 const MAX_PULL_TASK_COUNT: u32 = 4;
305 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
306
307 assert_ge!(
308 compactor_context.storage_opts.compactor_max_task_multiplier,
309 0.0
310 );
311
312 let join_handle = tokio::spawn(async move {
313 let iceberg_compaction_running_task_tracker =
315 Arc::new(Mutex::new((HashMap::new(), HashSet::new())));
316 let mut min_interval = tokio::time::interval(stream_retry_interval);
317 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319 'start_stream: loop {
321 let mut pull_task_ack = true;
324 tokio::select! {
325 _ = min_interval.tick() => {},
327 _ = &mut shutdown_rx => {
329 tracing::info!("Compactor is shutting down");
330 return;
331 }
332 }
333
334 let (request_sender, response_event_stream) = match hummock_meta_client
335 .subscribe_iceberg_compaction_event()
336 .await
337 {
338 Ok((request_sender, response_event_stream)) => {
339 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340 (request_sender, response_event_stream)
341 }
342
343 Err(e) => {
344 tracing::warn!(
345 error = %e.as_report(),
346 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347 );
348 continue 'start_stream;
349 }
350 };
351
352 pin_mut!(response_event_stream);
353
354 let executor = compactor_context.compaction_executor.clone();
355
356 let mut event_loop_iteration_now = Instant::now();
358 'consume_stream: loop {
359 {
360 compactor_context
362 .compactor_metrics
363 .compaction_event_loop_iteration_latency
364 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365 event_loop_iteration_now = Instant::now();
366 }
367
368 let running_task_parallelism = running_task_parallelism.clone();
369 let request_sender = request_sender.clone();
370 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371 _ = periodic_event_interval.tick() => {
372 let mut pending_pull_task_count = 0;
373 if pull_task_ack {
374 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377 if pending_pull_task_count > 0 {
378 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380 subscribe_iceberg_compaction_event_request::PullTask {
381 pull_task_count: pending_pull_task_count,
382 }
383 )),
384 create_at: SystemTime::now()
385 .duration_since(std::time::UNIX_EPOCH)
386 .expect("Clock may have gone backwards")
387 .as_millis() as u64,
388 }) {
389 tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391 continue 'start_stream;
393 } else {
394 pull_task_ack = false;
395 }
396 }
397 }
398
399 tracing::info!(
400 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401 pull_task_ack = %pull_task_ack,
402 pending_pull_task_count = %pending_pull_task_count
403 );
404
405 continue;
406 }
407 event = response_event_stream.next() => {
408 event
409 }
410
411 _ = &mut shutdown_rx => {
412 tracing::info!("Iceberg Compactor is shutting down");
413 return
414 }
415 };
416
417 match event {
418 Some(Ok(SubscribeIcebergCompactionEventResponse {
419 event,
420 create_at: _create_at,
421 })) => {
422 let event = match event {
423 Some(event) => event,
424 None => continue 'consume_stream,
425 };
426
427 let iceberg_running_task_tracker =
428 iceberg_compaction_running_task_tracker.clone();
429 match event {
430 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
431 let task_id = iceberg_compaction_task.task_id;
432 let write_parquet_properties = WriterProperties::builder()
433 .set_created_by(concat!(
434 "risingwave version ",
435 env!("CARGO_PKG_VERSION")
436 )
437 .to_owned())
438 .set_max_row_group_size(
439 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
440 )
441 .set_compression(Compression::SNAPPY) .build();
443
444 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
445 .max_parallelism(worker_num as u32)
446 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
447 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
448 .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
449 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
450 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
451 .write_parquet_properties(write_parquet_properties)
452 .build() {
453 Ok(config) => config,
454 Err(e) => {
455 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
456 continue 'consume_stream;
457 }
458 };
459
460
461 let iceberg_runner = match IcebergCompactorRunner::new(
462 iceberg_compaction_task,
463 compactor_runner_config,
464 compactor_context.compactor_metrics.clone(),
465 ).await {
466 Ok(runner) => runner,
467 Err(e) => {
468 tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
469 continue 'consume_stream;
470 }
471 };
472
473 let task_unique_ident = format!(
474 "{}-{:?}",
475 iceberg_runner.iceberg_config.catalog_name(),
476 iceberg_runner.table_ident
477 );
478
479 {
480 let running_task_tracker_guard = iceberg_compaction_running_task_tracker
481 .lock()
482 .unwrap();
483
484 if running_task_tracker_guard.1.contains(&task_unique_ident) {
485 tracing::warn!(
486 task_id = %task_id,
487 task_unique_ident = %task_unique_ident,
488 "Iceberg compaction task already running, skip",
489 );
490 continue 'consume_stream;
491 }
492 }
493
494 executor.spawn(async move {
495 let (tx, rx) = tokio::sync::oneshot::channel();
496 {
497 let mut running_task_tracker_guard =
498 iceberg_running_task_tracker.lock().unwrap();
499 running_task_tracker_guard.0.insert(task_id, tx);
500 running_task_tracker_guard.1.insert(task_unique_ident.clone());
501 }
502
503 let _release_guard = scopeguard::guard(
504 iceberg_running_task_tracker.clone(),
505 move |tracker| {
506 let mut running_task_tracker_guard = tracker.lock().unwrap();
507 running_task_tracker_guard.0.remove(&task_id);
508 running_task_tracker_guard.1.remove(&task_unique_ident);
509 },
510 );
511
512 if let Err(e) = iceberg_runner.compact(
513 RunnerContext::new(
514 max_task_parallelism,
515 running_task_parallelism.clone(),
516 ),
517 rx,
518 )
519 .await {
520 tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
521 }
522 });
523 },
524 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
525 pull_task_ack = true;
527 },
528 }
529 }
530 Some(Err(e)) => {
531 tracing::warn!("Failed to consume stream. {}", e.message());
532 continue 'start_stream;
533 }
534 _ => {
535 continue 'start_stream;
537 }
538 }
539 }
540 }
541 });
542
543 (join_handle, shutdown_tx)
544}
545
546#[must_use]
549pub fn start_compactor(
550 compactor_context: CompactorContext,
551 hummock_meta_client: Arc<dyn HummockMetaClient>,
552 object_id_manager: Arc<ObjectIdManager>,
553 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
554) -> (JoinHandle<()>, Sender<()>) {
555 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
556 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
557 let stream_retry_interval = Duration::from_secs(30);
558 let task_progress = compactor_context.task_progress_manager.clone();
559 let periodic_event_update_interval = Duration::from_millis(1000);
560
561 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
562 * compactor_context.storage_opts.compactor_max_task_multiplier)
563 .ceil() as u32;
564 let running_task_parallelism = Arc::new(AtomicU32::new(0));
565
566 const MAX_PULL_TASK_COUNT: u32 = 4;
567 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
568
569 assert_ge!(
570 compactor_context.storage_opts.compactor_max_task_multiplier,
571 0.0
572 );
573
574 let join_handle = tokio::spawn(async move {
575 let shutdown_map = CompactionShutdownMap::default();
576 let mut min_interval = tokio::time::interval(stream_retry_interval);
577 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
578
579 'start_stream: loop {
581 let mut pull_task_ack = true;
584 tokio::select! {
585 _ = min_interval.tick() => {},
587 _ = &mut shutdown_rx => {
589 tracing::info!("Compactor is shutting down");
590 return;
591 }
592 }
593
594 let (request_sender, response_event_stream) =
595 match hummock_meta_client.subscribe_compaction_event().await {
596 Ok((request_sender, response_event_stream)) => {
597 tracing::debug!("Succeeded subscribe_compaction_event.");
598 (request_sender, response_event_stream)
599 }
600
601 Err(e) => {
602 tracing::warn!(
603 error = %e.as_report(),
604 "Subscribing to compaction tasks failed with error. Will retry.",
605 );
606 continue 'start_stream;
607 }
608 };
609
610 pin_mut!(response_event_stream);
611
612 let executor = compactor_context.compaction_executor.clone();
613 let object_id_manager = object_id_manager.clone();
614
615 let mut event_loop_iteration_now = Instant::now();
617 'consume_stream: loop {
618 {
619 compactor_context
621 .compactor_metrics
622 .compaction_event_loop_iteration_latency
623 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
624 event_loop_iteration_now = Instant::now();
625 }
626
627 let running_task_parallelism = running_task_parallelism.clone();
628 let request_sender = request_sender.clone();
629 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
630 _ = periodic_event_interval.tick() => {
631 let progress_list = get_task_progress(task_progress.clone());
632
633 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
634 event: Some(RequestEvent::HeartBeat(
635 HeartBeat {
636 progress: progress_list
637 }
638 )),
639 create_at: SystemTime::now()
640 .duration_since(std::time::UNIX_EPOCH)
641 .expect("Clock may have gone backwards")
642 .as_millis() as u64,
643 }) {
644 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
645 continue 'start_stream;
647 }
648
649
650 let mut pending_pull_task_count = 0;
651 if pull_task_ack {
652 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
654
655 if pending_pull_task_count > 0 {
656 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
657 event: Some(RequestEvent::PullTask(
658 PullTask {
659 pull_task_count: pending_pull_task_count,
660 }
661 )),
662 create_at: SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .expect("Clock may have gone backwards")
665 .as_millis() as u64,
666 }) {
667 tracing::warn!(error = %e.as_report(), "Failed to pull task");
668
669 continue 'start_stream;
671 } else {
672 pull_task_ack = false;
673 }
674 }
675 }
676
677 tracing::info!(
678 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
679 pull_task_ack = %pull_task_ack,
680 pending_pull_task_count = %pending_pull_task_count
681 );
682
683 continue;
684 }
685 event = response_event_stream.next() => {
686 event
687 }
688
689 _ = &mut shutdown_rx => {
690 tracing::info!("Compactor is shutting down");
691 return
692 }
693 };
694
695 fn send_report_task_event(
696 compact_task: &CompactTask,
697 table_stats: TableStatsMap,
698 object_timestamps: HashMap<HummockSstableObjectId, u64>,
699 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
700 ) {
701 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
702 event: Some(RequestEvent::ReportTask(ReportTask {
703 task_id: compact_task.task_id,
704 task_status: compact_task.task_status.into(),
705 sorted_output_ssts: compact_task
706 .sorted_output_ssts
707 .iter()
708 .map(|sst| sst.into())
709 .collect(),
710 table_stats_change: to_prost_table_stats_map(table_stats),
711 object_timestamps: object_timestamps
712 .into_iter()
713 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
714 .collect(),
715 })),
716 create_at: SystemTime::now()
717 .duration_since(std::time::UNIX_EPOCH)
718 .expect("Clock may have gone backwards")
719 .as_millis() as u64,
720 }) {
721 let task_id = compact_task.task_id;
722 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
723 }
724 }
725
726 match event {
727 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
728 let event = match event {
729 Some(event) => event,
730 None => continue 'consume_stream,
731 };
732 let shutdown = shutdown_map.clone();
733 let context = compactor_context.clone();
734 let consumed_latency_ms = SystemTime::now()
735 .duration_since(std::time::UNIX_EPOCH)
736 .expect("Clock may have gone backwards")
737 .as_millis() as u64
738 - create_at;
739 context
740 .compactor_metrics
741 .compaction_event_consumed_latency
742 .observe(consumed_latency_ms as _);
743
744 let object_id_manager = object_id_manager.clone();
745 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
746
747 match event {
748 ResponseEvent::CompactTask(compact_task) => {
749 let compact_task = CompactTask::from(compact_task);
750 let parallelism =
751 calculate_task_parallelism(&compact_task, &context);
752
753 assert_ne!(parallelism, 0, "splits cannot be empty");
754
755 if (max_task_parallelism
756 - running_task_parallelism.load(Ordering::SeqCst))
757 < parallelism as u32
758 {
759 tracing::warn!(
760 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
761 compact_task.task_id,
762 parallelism,
763 max_task_parallelism,
764 running_task_parallelism.load(Ordering::Relaxed),
765 );
766 let (compact_task, table_stats, object_timestamps) =
767 compact_done(
768 compact_task,
769 context.clone(),
770 vec![],
771 TaskStatus::NoAvailCpuResourceCanceled,
772 );
773
774 send_report_task_event(
775 &compact_task,
776 table_stats,
777 object_timestamps,
778 &request_sender,
779 );
780
781 continue 'consume_stream;
782 }
783
784 running_task_parallelism
785 .fetch_add(parallelism as u32, Ordering::SeqCst);
786 executor.spawn(async move {
787 let (tx, rx) = tokio::sync::oneshot::channel();
788 let task_id = compact_task.task_id;
789 shutdown.lock().unwrap().insert(task_id, tx);
790
791 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
792 context.clone(),
793 compact_task,
794 rx,
795 object_id_manager.clone(),
796 compaction_catalog_manager_ref.clone(),
797 )
798 .await;
799
800 shutdown.lock().unwrap().remove(&task_id);
801 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
802
803 send_report_task_event(
804 &compact_task,
805 table_stats,
806 object_timestamps,
807 &request_sender,
808 );
809
810 let enable_check_compaction_result =
811 context.storage_opts.check_compaction_result;
812 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
813
814 if enable_check_compaction_result && need_check_task {
815 let compact_table_ids = compact_task.build_compact_table_ids();
816 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
817 Ok(compaction_catalog_agent_ref) => {
818 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
819 {
820 Err(e) => {
821 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
822 }
823 Ok(true) => (),
824 Ok(false) => {
825 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
826 }
827 }
828 },
829 Err(e) => {
830 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
831 }
832 }
833 }
834 });
835 }
836 ResponseEvent::VacuumTask(_) => {
837 unreachable!("unexpected vacuum task");
838 }
839 ResponseEvent::FullScanTask(_) => {
840 unreachable!("unexpected scan task");
841 }
842 ResponseEvent::ValidationTask(validation_task) => {
843 let validation_task = ValidationTask::from(validation_task);
844 executor.spawn(async move {
845 validate_ssts(validation_task, context.sstable_store.clone())
846 .await;
847 });
848 }
849 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
850 .lock()
851 .unwrap()
852 .remove(&cancel_compact_task.task_id)
853 {
854 Some(tx) => {
855 if tx.send(()).is_err() {
856 tracing::warn!(
857 "Cancellation of compaction task failed. task_id: {}",
858 cancel_compact_task.task_id
859 );
860 }
861 }
862 _ => {
863 tracing::warn!(
864 "Attempting to cancel non-existent compaction task. task_id: {}",
865 cancel_compact_task.task_id
866 );
867 }
868 },
869
870 ResponseEvent::PullTaskAck(_pull_task_ack) => {
871 pull_task_ack = true;
873 }
874 }
875 }
876 Some(Err(e)) => {
877 tracing::warn!("Failed to consume stream. {}", e.message());
878 continue 'start_stream;
879 }
880 _ => {
881 continue 'start_stream;
883 }
884 }
885 }
886 }
887 });
888
889 (join_handle, shutdown_tx)
890}
891
892#[must_use]
895pub fn start_shared_compactor(
896 grpc_proxy_client: GrpcCompactorProxyClient,
897 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
898 context: CompactorContext,
899) -> (JoinHandle<()>, Sender<()>) {
900 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
901 let task_progress = context.task_progress_manager.clone();
902 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
903 let periodic_event_update_interval = Duration::from_millis(1000);
904
905 let join_handle = tokio::spawn(async move {
906 let shutdown_map = CompactionShutdownMap::default();
907
908 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
909 let executor = context.compaction_executor.clone();
910 let report_heartbeat_client = grpc_proxy_client.clone();
911 'consume_stream: loop {
912 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
913 _ = periodic_event_interval.tick() => {
914 let progress_list = get_task_progress(task_progress.clone());
915 let report_compaction_task_request = ReportCompactionTaskRequest{
916 event: Some(ReportCompactionTaskEvent::HeartBeat(
917 SharedHeartBeat {
918 progress: progress_list
919 }
920 )),
921 };
922 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
923 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
924 }
925 continue
926 }
927
928
929 _ = &mut shutdown_rx => {
930 tracing::info!("Compactor is shutting down");
931 return
932 }
933
934 request = receiver.recv() => {
935 request
936 }
937
938 };
939 match request {
940 Some(request) => {
941 let context = context.clone();
942 let shutdown = shutdown_map.clone();
943
944 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
945 executor.spawn(async move {
946 let DispatchCompactionTaskRequest {
947 tables,
948 output_object_ids,
949 task: dispatch_task,
950 } = request.into_inner();
951 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
952 acc.insert(table.id, table);
953 acc
954 });
955
956 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
957 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
958 let shared_compactor_object_id_manager =
959 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
960 match dispatch_task.unwrap() {
961 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
962 let compact_task = CompactTask::from(&compact_task);
963 let (tx, rx) = tokio::sync::oneshot::channel();
964 let task_id = compact_task.task_id;
965 shutdown.lock().unwrap().insert(task_id, tx);
966
967 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
968 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
969 context.clone(),
970 compact_task,
971 rx,
972 shared_compactor_object_id_manager,
973 compaction_catalog_agent_ref.clone(),
974 )
975 .await;
976 shutdown.lock().unwrap().remove(&task_id);
977 let report_compaction_task_request = ReportCompactionTaskRequest {
978 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
979 compact_task: Some(PbCompactTask::from(&compact_task)),
980 table_stats_change: to_prost_table_stats_map(table_stats),
981 object_timestamps: object_timestamps
982 .into_iter()
983 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
984 .collect(),
985 })),
986 };
987
988 match cloned_grpc_proxy_client
989 .report_compaction_task(report_compaction_task_request)
990 .await
991 {
992 Ok(_) => {
993 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
995 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
996 if enable_check_compaction_result && need_check_task {
997 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
998 Err(e) => {
999 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1000 },
1001 Ok(true) => (),
1002 Ok(false) => {
1003 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1004 }
1005 }
1006 }
1007 }
1008 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1009 }
1010
1011 }
1012 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1013 unreachable!("unexpected vacuum task");
1014 }
1015 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1016 unreachable!("unexpected scan task");
1017 }
1018 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1019 let validation_task = ValidationTask::from(validation_task);
1020 validate_ssts(validation_task, context.sstable_store.clone()).await;
1021 }
1022 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1023 match shutdown
1024 .lock()
1025 .unwrap()
1026 .remove(&cancel_compact_task.task_id)
1027 { Some(tx) => {
1028 if tx.send(()).is_err() {
1029 tracing::warn!(
1030 "Cancellation of compaction task failed. task_id: {}",
1031 cancel_compact_task.task_id
1032 );
1033 }
1034 } _ => {
1035 tracing::warn!(
1036 "Attempting to cancel non-existent compaction task. task_id: {}",
1037 cancel_compact_task.task_id
1038 );
1039 }}
1040 }
1041 }
1042 });
1043 }
1044 None => continue 'consume_stream,
1045 }
1046 }
1047 });
1048 (join_handle, shutdown_tx)
1049}
1050
1051fn get_task_progress(
1052 task_progress: Arc<
1053 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1054 >,
1055) -> Vec<CompactTaskProgress> {
1056 let mut progress_list = Vec::new();
1057 for (&task_id, progress) in &*task_progress.lock() {
1058 progress_list.push(progress.snapshot(task_id));
1059 }
1060 progress_list
1061}