1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18use parquet::basic::Compression;
19use parquet::file::properties::WriterProperties;
20use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
21use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
22use risingwave_pb::hummock::PbCompactTask;
23use risingwave_pb::hummock::report_compaction_task_request::{
24 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
25 ReportTask as ReportSharedTask,
26};
27use risingwave_pb::iceberg_compaction::{
28 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
29 subscribe_iceberg_compaction_event_request,
30};
31use risingwave_rpc_client::GrpcCompactorProxyClient;
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc;
34use tonic::Request;
35
36pub mod compactor_runner;
37mod context;
38pub mod fast_compactor_runner;
39mod iterator;
40mod shared_buffer_compact;
41pub(super) mod task_progress;
42
43use std::collections::{HashMap, HashSet, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53 TtlCompactionFilter,
54};
55pub use context::{
56 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
60use more_asserts::assert_ge;
61use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
62use risingwave_hummock_sdk::{
63 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
64};
65use risingwave_pb::hummock::compact_task::TaskStatus;
66use risingwave_pb::hummock::subscribe_compaction_event_request::{
67 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
68};
69use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
70use risingwave_pb::hummock::{
71 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
72 SubscribeCompactionEventResponse,
73};
74use risingwave_rpc_client::HummockMetaClient;
75pub use shared_buffer_compact::{compact, merge_imms_in_memory};
76use tokio::sync::oneshot::Sender;
77use tokio::task::JoinHandle;
78use tokio::time::Instant;
79
80pub use self::compaction_utils::{
81 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
82 check_flush_result,
83};
84pub use self::task_progress::TaskProgress;
85use super::multi_builder::CapacitySplitTableBuilder;
86use super::{
87 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
88};
89use crate::compaction_catalog_manager::{
90 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
91};
92use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
93use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
94use crate::hummock::iceberg_compactor_runner::{
95 IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder, RunnerContext,
96};
97use crate::hummock::iterator::{Forward, HummockIterator};
98use crate::hummock::{
99 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
100 UnifiedSstableWriterFactory, validate_ssts,
101};
102use crate::monitor::CompactorMetrics;
103
104pub struct Compactor {
106 context: CompactorContext,
108 object_id_getter: Arc<dyn GetObjectId>,
109 task_config: TaskConfig,
110 options: SstableBuilderOptions,
111 get_id_time: Arc<AtomicU64>,
112}
113
114pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
115
116impl Compactor {
117 pub fn new(
119 context: CompactorContext,
120 options: SstableBuilderOptions,
121 task_config: TaskConfig,
122 object_id_getter: Arc<dyn GetObjectId>,
123 ) -> Self {
124 Self {
125 context,
126 options,
127 task_config,
128 get_id_time: Arc::new(AtomicU64::new(0)),
129 object_id_getter,
130 }
131 }
132
133 async fn compact_key_range(
138 &self,
139 iter: impl HummockIterator<Direction = Forward>,
140 compaction_filter: impl CompactionFilter,
141 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142 task_progress: Option<Arc<TaskProgress>>,
143 task_id: Option<HummockCompactionTaskId>,
144 split_index: Option<usize>,
145 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
146 let compact_timer = if self.context.is_share_buffer_compact {
148 self.context
149 .compactor_metrics
150 .write_build_l0_sst_duration
151 .start_timer()
152 } else {
153 self.context
154 .compactor_metrics
155 .compact_sst_duration
156 .start_timer()
157 };
158
159 let (split_table_outputs, table_stats_map) = {
160 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
161 if self.task_config.use_block_based_filter {
162 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
163 factory,
164 iter,
165 compaction_filter,
166 compaction_catalog_agent_ref,
167 task_progress.clone(),
168 self.object_id_getter.clone(),
169 )
170 .instrument_await("compact".verbose())
171 .await?
172 } else {
173 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
174 factory,
175 iter,
176 compaction_filter,
177 compaction_catalog_agent_ref,
178 task_progress.clone(),
179 self.object_id_getter.clone(),
180 )
181 .instrument_await("compact".verbose())
182 .await?
183 }
184 };
185
186 compact_timer.observe_duration();
187
188 Self::report_progress(
189 self.context.compactor_metrics.clone(),
190 task_progress,
191 &split_table_outputs,
192 self.context.is_share_buffer_compact,
193 );
194
195 self.context
196 .compactor_metrics
197 .get_table_id_total_time_duration
198 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
199
200 debug_assert!(
201 split_table_outputs
202 .iter()
203 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
204 );
205
206 if task_id.is_some() {
207 tracing::info!(
209 "Finish Task {:?} split_index {:?} sst count {}",
210 task_id,
211 split_index,
212 split_table_outputs.len()
213 );
214 }
215 Ok((split_table_outputs, table_stats_map))
216 }
217
218 pub fn report_progress(
219 metrics: Arc<CompactorMetrics>,
220 task_progress: Option<Arc<TaskProgress>>,
221 ssts: &Vec<LocalSstableInfo>,
222 is_share_buffer_compact: bool,
223 ) {
224 for sst_info in ssts {
225 let sst_size = sst_info.file_size();
226 if let Some(tracker) = &task_progress {
227 tracker.inc_ssts_uploaded();
228 tracker.dec_num_pending_write_io();
229 }
230 if is_share_buffer_compact {
231 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
232 } else {
233 metrics.compaction_upload_sst_counts.inc();
234 }
235 }
236 }
237
238 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
239 &self,
240 writer_factory: F,
241 iter: impl HummockIterator<Direction = Forward>,
242 compaction_filter: impl CompactionFilter,
243 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
244 task_progress: Option<Arc<TaskProgress>>,
245 object_id_getter: Arc<dyn GetObjectId>,
246 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
247 let builder_factory = RemoteBuilderFactory::<F, B> {
248 object_id_getter,
249 limiter: self.context.memory_limiter.clone(),
250 options: self.options.clone(),
251 policy: self.task_config.cache_policy,
252 remote_rpc_cost: self.get_id_time.clone(),
253 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
254 sstable_writer_factory: writer_factory,
255 _phantom: PhantomData,
256 };
257
258 let mut sst_builder = CapacitySplitTableBuilder::new(
259 builder_factory,
260 self.context.compactor_metrics.clone(),
261 task_progress.clone(),
262 self.task_config.table_vnode_partition.clone(),
263 self.context
264 .storage_opts
265 .compactor_concurrent_uploading_sst_count,
266 compaction_catalog_agent_ref,
267 );
268 let compaction_statistics = compact_and_build_sst(
269 &mut sst_builder,
270 &self.task_config,
271 self.context.compactor_metrics.clone(),
272 iter,
273 compaction_filter,
274 )
275 .instrument_await("compact_and_build_sst".verbose())
276 .await?;
277
278 let ssts = sst_builder
279 .finish()
280 .instrument_await("builder_finish".verbose())
281 .await?;
282
283 Ok((ssts, compaction_statistics))
284 }
285}
286
287#[must_use]
290pub fn start_iceberg_compactor(
291 compactor_context: CompactorContext,
292 hummock_meta_client: Arc<dyn HummockMetaClient>,
293) -> (JoinHandle<()>, Sender<()>) {
294 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
295 let stream_retry_interval = Duration::from_secs(30);
296 let periodic_event_update_interval = Duration::from_millis(1000);
297 let worker_num = compactor_context.compaction_executor.worker_num();
298
299 let max_task_parallelism: u32 = (worker_num as f32
300 * compactor_context.storage_opts.compactor_max_task_multiplier)
301 .ceil() as u32;
302 let running_task_parallelism = Arc::new(AtomicU32::new(0));
303
304 const MAX_PULL_TASK_COUNT: u32 = 4;
305 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
306
307 assert_ge!(
308 compactor_context.storage_opts.compactor_max_task_multiplier,
309 0.0
310 );
311
312 let join_handle = tokio::spawn(async move {
313 let iceberg_compaction_running_task_tracker =
315 Arc::new(Mutex::new((HashMap::new(), HashSet::new())));
316 let mut min_interval = tokio::time::interval(stream_retry_interval);
317 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
318
319 'start_stream: loop {
321 let mut pull_task_ack = true;
324 tokio::select! {
325 _ = min_interval.tick() => {},
327 _ = &mut shutdown_rx => {
329 tracing::info!("Compactor is shutting down");
330 return;
331 }
332 }
333
334 let (request_sender, response_event_stream) = match hummock_meta_client
335 .subscribe_iceberg_compaction_event()
336 .await
337 {
338 Ok((request_sender, response_event_stream)) => {
339 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
340 (request_sender, response_event_stream)
341 }
342
343 Err(e) => {
344 tracing::warn!(
345 error = %e.as_report(),
346 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
347 );
348 continue 'start_stream;
349 }
350 };
351
352 pin_mut!(response_event_stream);
353
354 let executor = compactor_context.compaction_executor.clone();
355
356 let mut event_loop_iteration_now = Instant::now();
358 'consume_stream: loop {
359 {
360 compactor_context
362 .compactor_metrics
363 .compaction_event_loop_iteration_latency
364 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
365 event_loop_iteration_now = Instant::now();
366 }
367
368 let running_task_parallelism = running_task_parallelism.clone();
369 let request_sender = request_sender.clone();
370 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
371 _ = periodic_event_interval.tick() => {
372 let mut pending_pull_task_count = 0;
373 if pull_task_ack {
374 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
376
377 if pending_pull_task_count > 0 {
378 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
379 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
380 subscribe_iceberg_compaction_event_request::PullTask {
381 pull_task_count: pending_pull_task_count,
382 }
383 )),
384 create_at: SystemTime::now()
385 .duration_since(std::time::UNIX_EPOCH)
386 .expect("Clock may have gone backwards")
387 .as_millis() as u64,
388 }) {
389 tracing::warn!(error = %e.as_report(), "Failed to pull task");
390
391 continue 'start_stream;
393 } else {
394 pull_task_ack = false;
395 }
396 }
397 }
398
399 tracing::info!(
400 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
401 pull_task_ack = %pull_task_ack,
402 pending_pull_task_count = %pending_pull_task_count
403 );
404
405 continue;
406 }
407 event = response_event_stream.next() => {
408 event
409 }
410
411 _ = &mut shutdown_rx => {
412 tracing::info!("Iceberg Compactor is shutting down");
413 return
414 }
415 };
416
417 match event {
418 Some(Ok(SubscribeIcebergCompactionEventResponse {
419 event,
420 create_at: _create_at,
421 })) => {
422 let event = match event {
423 Some(event) => event,
424 None => continue 'consume_stream,
425 };
426
427 let iceberg_running_task_tracker =
428 iceberg_compaction_running_task_tracker.clone();
429 match event {
430 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
431 let task_id = iceberg_compaction_task.task_id;
432 let write_parquet_properties = WriterProperties::builder()
433 .set_created_by(concat!(
434 "risingwave version ",
435 env!("CARGO_PKG_VERSION")
436 )
437 .to_owned())
438 .set_max_row_group_size(
439 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
440 )
441 .set_compression(Compression::SNAPPY) .build();
443
444 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
445 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
446 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
447 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
448 .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
449 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
450 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
451 .write_parquet_properties(write_parquet_properties)
452 .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
453 .max_task_total_size(
454 compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
455 )
456 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
457 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
458 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
459 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
460 .build() {
461 Ok(config) => config,
462 Err(e) => {
463 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
464 continue 'consume_stream;
465 }
466 };
467
468
469 let iceberg_runner = match IcebergCompactorRunner::new(
470 iceberg_compaction_task,
471 compactor_runner_config,
472 compactor_context.compactor_metrics.clone(),
473 ).await {
474 Ok(runner) => runner,
475 Err(e) => {
476 tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
477 continue 'consume_stream;
478 }
479 };
480
481 let task_unique_ident = format!(
482 "{}-{:?}",
483 iceberg_runner.iceberg_config.catalog_name(),
484 iceberg_runner.table_ident
485 );
486
487 {
488 let running_task_tracker_guard = iceberg_compaction_running_task_tracker
489 .lock()
490 .unwrap();
491
492 if running_task_tracker_guard.1.contains(&task_unique_ident) {
493 tracing::warn!(
494 task_id = %task_id,
495 task_unique_ident = %task_unique_ident,
496 "Iceberg compaction task already running, skip",
497 );
498 continue 'consume_stream;
499 }
500 }
501
502 executor.spawn(async move {
503 let (tx, rx) = tokio::sync::oneshot::channel();
504 {
505 let mut running_task_tracker_guard =
506 iceberg_running_task_tracker.lock().unwrap();
507 running_task_tracker_guard.0.insert(task_id, tx);
508 running_task_tracker_guard.1.insert(task_unique_ident.clone());
509 }
510
511 let _release_guard = scopeguard::guard(
512 iceberg_running_task_tracker.clone(),
513 move |tracker| {
514 let mut running_task_tracker_guard = tracker.lock().unwrap();
515 running_task_tracker_guard.0.remove(&task_id);
516 running_task_tracker_guard.1.remove(&task_unique_ident);
517 },
518 );
519
520 if let Err(e) = iceberg_runner.compact(
521 RunnerContext::new(
522 max_task_parallelism,
523 running_task_parallelism.clone(),
524 ),
525 rx,
526 )
527 .await {
528 tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
529 }
530 });
531 },
532 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
533 pull_task_ack = true;
535 },
536 }
537 }
538 Some(Err(e)) => {
539 tracing::warn!("Failed to consume stream. {}", e.message());
540 continue 'start_stream;
541 }
542 _ => {
543 continue 'start_stream;
545 }
546 }
547 }
548 }
549 });
550
551 (join_handle, shutdown_tx)
552}
553
554#[must_use]
557pub fn start_compactor(
558 compactor_context: CompactorContext,
559 hummock_meta_client: Arc<dyn HummockMetaClient>,
560 object_id_manager: Arc<ObjectIdManager>,
561 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
562) -> (JoinHandle<()>, Sender<()>) {
563 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
564 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
565 let stream_retry_interval = Duration::from_secs(30);
566 let task_progress = compactor_context.task_progress_manager.clone();
567 let periodic_event_update_interval = Duration::from_millis(1000);
568
569 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
570 * compactor_context.storage_opts.compactor_max_task_multiplier)
571 .ceil() as u32;
572 let running_task_parallelism = Arc::new(AtomicU32::new(0));
573
574 const MAX_PULL_TASK_COUNT: u32 = 4;
575 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
576
577 assert_ge!(
578 compactor_context.storage_opts.compactor_max_task_multiplier,
579 0.0
580 );
581
582 let join_handle = tokio::spawn(async move {
583 let shutdown_map = CompactionShutdownMap::default();
584 let mut min_interval = tokio::time::interval(stream_retry_interval);
585 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
586
587 'start_stream: loop {
589 let mut pull_task_ack = true;
592 tokio::select! {
593 _ = min_interval.tick() => {},
595 _ = &mut shutdown_rx => {
597 tracing::info!("Compactor is shutting down");
598 return;
599 }
600 }
601
602 let (request_sender, response_event_stream) =
603 match hummock_meta_client.subscribe_compaction_event().await {
604 Ok((request_sender, response_event_stream)) => {
605 tracing::debug!("Succeeded subscribe_compaction_event.");
606 (request_sender, response_event_stream)
607 }
608
609 Err(e) => {
610 tracing::warn!(
611 error = %e.as_report(),
612 "Subscribing to compaction tasks failed with error. Will retry.",
613 );
614 continue 'start_stream;
615 }
616 };
617
618 pin_mut!(response_event_stream);
619
620 let executor = compactor_context.compaction_executor.clone();
621 let object_id_manager = object_id_manager.clone();
622
623 let mut event_loop_iteration_now = Instant::now();
625 'consume_stream: loop {
626 {
627 compactor_context
629 .compactor_metrics
630 .compaction_event_loop_iteration_latency
631 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
632 event_loop_iteration_now = Instant::now();
633 }
634
635 let running_task_parallelism = running_task_parallelism.clone();
636 let request_sender = request_sender.clone();
637 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
638 _ = periodic_event_interval.tick() => {
639 let progress_list = get_task_progress(task_progress.clone());
640
641 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
642 event: Some(RequestEvent::HeartBeat(
643 HeartBeat {
644 progress: progress_list
645 }
646 )),
647 create_at: SystemTime::now()
648 .duration_since(std::time::UNIX_EPOCH)
649 .expect("Clock may have gone backwards")
650 .as_millis() as u64,
651 }) {
652 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
653 continue 'start_stream;
655 }
656
657
658 let mut pending_pull_task_count = 0;
659 if pull_task_ack {
660 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
662
663 if pending_pull_task_count > 0 {
664 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
665 event: Some(RequestEvent::PullTask(
666 PullTask {
667 pull_task_count: pending_pull_task_count,
668 }
669 )),
670 create_at: SystemTime::now()
671 .duration_since(std::time::UNIX_EPOCH)
672 .expect("Clock may have gone backwards")
673 .as_millis() as u64,
674 }) {
675 tracing::warn!(error = %e.as_report(), "Failed to pull task");
676
677 continue 'start_stream;
679 } else {
680 pull_task_ack = false;
681 }
682 }
683 }
684
685 tracing::info!(
686 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
687 pull_task_ack = %pull_task_ack,
688 pending_pull_task_count = %pending_pull_task_count
689 );
690
691 continue;
692 }
693 event = response_event_stream.next() => {
694 event
695 }
696
697 _ = &mut shutdown_rx => {
698 tracing::info!("Compactor is shutting down");
699 return
700 }
701 };
702
703 fn send_report_task_event(
704 compact_task: &CompactTask,
705 table_stats: TableStatsMap,
706 object_timestamps: HashMap<HummockSstableObjectId, u64>,
707 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
708 ) {
709 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
710 event: Some(RequestEvent::ReportTask(ReportTask {
711 task_id: compact_task.task_id,
712 task_status: compact_task.task_status.into(),
713 sorted_output_ssts: compact_task
714 .sorted_output_ssts
715 .iter()
716 .map(|sst| sst.into())
717 .collect(),
718 table_stats_change: to_prost_table_stats_map(table_stats),
719 object_timestamps: object_timestamps
720 .into_iter()
721 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
722 .collect(),
723 })),
724 create_at: SystemTime::now()
725 .duration_since(std::time::UNIX_EPOCH)
726 .expect("Clock may have gone backwards")
727 .as_millis() as u64,
728 }) {
729 let task_id = compact_task.task_id;
730 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
731 }
732 }
733
734 match event {
735 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
736 let event = match event {
737 Some(event) => event,
738 None => continue 'consume_stream,
739 };
740 let shutdown = shutdown_map.clone();
741 let context = compactor_context.clone();
742 let consumed_latency_ms = SystemTime::now()
743 .duration_since(std::time::UNIX_EPOCH)
744 .expect("Clock may have gone backwards")
745 .as_millis() as u64
746 - create_at;
747 context
748 .compactor_metrics
749 .compaction_event_consumed_latency
750 .observe(consumed_latency_ms as _);
751
752 let object_id_manager = object_id_manager.clone();
753 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
754
755 match event {
756 ResponseEvent::CompactTask(compact_task) => {
757 let compact_task = CompactTask::from(compact_task);
758 let parallelism =
759 calculate_task_parallelism(&compact_task, &context);
760
761 assert_ne!(parallelism, 0, "splits cannot be empty");
762
763 if (max_task_parallelism
764 - running_task_parallelism.load(Ordering::SeqCst))
765 < parallelism as u32
766 {
767 tracing::warn!(
768 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
769 compact_task.task_id,
770 parallelism,
771 max_task_parallelism,
772 running_task_parallelism.load(Ordering::Relaxed),
773 );
774 let (compact_task, table_stats, object_timestamps) =
775 compact_done(
776 compact_task,
777 context.clone(),
778 vec![],
779 TaskStatus::NoAvailCpuResourceCanceled,
780 );
781
782 send_report_task_event(
783 &compact_task,
784 table_stats,
785 object_timestamps,
786 &request_sender,
787 );
788
789 continue 'consume_stream;
790 }
791
792 running_task_parallelism
793 .fetch_add(parallelism as u32, Ordering::SeqCst);
794 executor.spawn(async move {
795 let (tx, rx) = tokio::sync::oneshot::channel();
796 let task_id = compact_task.task_id;
797 shutdown.lock().unwrap().insert(task_id, tx);
798
799 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
800 context.clone(),
801 compact_task,
802 rx,
803 object_id_manager.clone(),
804 compaction_catalog_manager_ref.clone(),
805 )
806 .await;
807
808 shutdown.lock().unwrap().remove(&task_id);
809 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
810
811 send_report_task_event(
812 &compact_task,
813 table_stats,
814 object_timestamps,
815 &request_sender,
816 );
817
818 let enable_check_compaction_result =
819 context.storage_opts.check_compaction_result;
820 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
821
822 if enable_check_compaction_result && need_check_task {
823 let compact_table_ids = compact_task.build_compact_table_ids();
824 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
825 Ok(compaction_catalog_agent_ref) => {
826 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
827 {
828 Err(e) => {
829 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
830 }
831 Ok(true) => (),
832 Ok(false) => {
833 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
834 }
835 }
836 },
837 Err(e) => {
838 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
839 }
840 }
841 }
842 });
843 }
844 ResponseEvent::VacuumTask(_) => {
845 unreachable!("unexpected vacuum task");
846 }
847 ResponseEvent::FullScanTask(_) => {
848 unreachable!("unexpected scan task");
849 }
850 ResponseEvent::ValidationTask(validation_task) => {
851 let validation_task = ValidationTask::from(validation_task);
852 executor.spawn(async move {
853 validate_ssts(validation_task, context.sstable_store.clone())
854 .await;
855 });
856 }
857 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
858 .lock()
859 .unwrap()
860 .remove(&cancel_compact_task.task_id)
861 {
862 Some(tx) => {
863 if tx.send(()).is_err() {
864 tracing::warn!(
865 "Cancellation of compaction task failed. task_id: {}",
866 cancel_compact_task.task_id
867 );
868 }
869 }
870 _ => {
871 tracing::warn!(
872 "Attempting to cancel non-existent compaction task. task_id: {}",
873 cancel_compact_task.task_id
874 );
875 }
876 },
877
878 ResponseEvent::PullTaskAck(_pull_task_ack) => {
879 pull_task_ack = true;
881 }
882 }
883 }
884 Some(Err(e)) => {
885 tracing::warn!("Failed to consume stream. {}", e.message());
886 continue 'start_stream;
887 }
888 _ => {
889 continue 'start_stream;
891 }
892 }
893 }
894 }
895 });
896
897 (join_handle, shutdown_tx)
898}
899
900#[must_use]
903pub fn start_shared_compactor(
904 grpc_proxy_client: GrpcCompactorProxyClient,
905 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
906 context: CompactorContext,
907) -> (JoinHandle<()>, Sender<()>) {
908 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
909 let task_progress = context.task_progress_manager.clone();
910 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
911 let periodic_event_update_interval = Duration::from_millis(1000);
912
913 let join_handle = tokio::spawn(async move {
914 let shutdown_map = CompactionShutdownMap::default();
915
916 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
917 let executor = context.compaction_executor.clone();
918 let report_heartbeat_client = grpc_proxy_client.clone();
919 'consume_stream: loop {
920 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
921 _ = periodic_event_interval.tick() => {
922 let progress_list = get_task_progress(task_progress.clone());
923 let report_compaction_task_request = ReportCompactionTaskRequest{
924 event: Some(ReportCompactionTaskEvent::HeartBeat(
925 SharedHeartBeat {
926 progress: progress_list
927 }
928 )),
929 };
930 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
931 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
932 }
933 continue
934 }
935
936
937 _ = &mut shutdown_rx => {
938 tracing::info!("Compactor is shutting down");
939 return
940 }
941
942 request = receiver.recv() => {
943 request
944 }
945
946 };
947 match request {
948 Some(request) => {
949 let context = context.clone();
950 let shutdown = shutdown_map.clone();
951
952 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
953 executor.spawn(async move {
954 let DispatchCompactionTaskRequest {
955 tables,
956 output_object_ids,
957 task: dispatch_task,
958 } = request.into_inner();
959 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
960 acc.insert(table.id, table);
961 acc
962 });
963
964 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
965 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
966 let shared_compactor_object_id_manager =
967 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
968 match dispatch_task.unwrap() {
969 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
970 let compact_task = CompactTask::from(&compact_task);
971 let (tx, rx) = tokio::sync::oneshot::channel();
972 let task_id = compact_task.task_id;
973 shutdown.lock().unwrap().insert(task_id, tx);
974
975 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
976 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
977 context.clone(),
978 compact_task,
979 rx,
980 shared_compactor_object_id_manager,
981 compaction_catalog_agent_ref.clone(),
982 )
983 .await;
984 shutdown.lock().unwrap().remove(&task_id);
985 let report_compaction_task_request = ReportCompactionTaskRequest {
986 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
987 compact_task: Some(PbCompactTask::from(&compact_task)),
988 table_stats_change: to_prost_table_stats_map(table_stats),
989 object_timestamps: object_timestamps
990 .into_iter()
991 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
992 .collect(),
993 })),
994 };
995
996 match cloned_grpc_proxy_client
997 .report_compaction_task(report_compaction_task_request)
998 .await
999 {
1000 Ok(_) => {
1001 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1003 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1004 if enable_check_compaction_result && need_check_task {
1005 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1006 Err(e) => {
1007 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1008 },
1009 Ok(true) => (),
1010 Ok(false) => {
1011 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1012 }
1013 }
1014 }
1015 }
1016 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1017 }
1018
1019 }
1020 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1021 unreachable!("unexpected vacuum task");
1022 }
1023 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1024 unreachable!("unexpected scan task");
1025 }
1026 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1027 let validation_task = ValidationTask::from(validation_task);
1028 validate_ssts(validation_task, context.sstable_store.clone()).await;
1029 }
1030 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1031 match shutdown
1032 .lock()
1033 .unwrap()
1034 .remove(&cancel_compact_task.task_id)
1035 { Some(tx) => {
1036 if tx.send(()).is_err() {
1037 tracing::warn!(
1038 "Cancellation of compaction task failed. task_id: {}",
1039 cancel_compact_task.task_id
1040 );
1041 }
1042 } _ => {
1043 tracing::warn!(
1044 "Attempting to cancel non-existent compaction task. task_id: {}",
1045 cancel_compact_task.task_id
1046 );
1047 }}
1048 }
1049 }
1050 });
1051 }
1052 None => continue 'consume_stream,
1053 }
1054 }
1055 });
1056 (join_handle, shutdown_tx)
1057}
1058
1059fn get_task_progress(
1060 task_progress: Arc<
1061 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1062 >,
1063) -> Vec<CompactTaskProgress> {
1064 let mut progress_list = Vec::new();
1065 for (&task_id, progress) in &*task_progress.lock() {
1066 progress_list.push(progress.snapshot(task_id));
1067 }
1068 progress_list
1069}