1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26 ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30 subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54 TtlCompactionFilter,
55};
56pub use context::{
57 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60use iceberg_compaction::iceberg_compactor_runner::{
62 IcebergCompactorRunner, IcebergCompactorRunnerConfigBuilder,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78 SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::iterator::{Forward, HummockIterator};
101use crate::hummock::{
102 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
103 UnifiedSstableWriterFactory, validate_ssts,
104};
105use crate::monitor::CompactorMetrics;
106
107pub struct Compactor {
109 context: CompactorContext,
111 object_id_getter: Arc<dyn GetObjectId>,
112 task_config: TaskConfig,
113 options: SstableBuilderOptions,
114 get_id_time: Arc<AtomicU64>,
115}
116
117pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
118
119impl Compactor {
120 pub fn new(
122 context: CompactorContext,
123 options: SstableBuilderOptions,
124 task_config: TaskConfig,
125 object_id_getter: Arc<dyn GetObjectId>,
126 ) -> Self {
127 Self {
128 context,
129 options,
130 task_config,
131 get_id_time: Arc::new(AtomicU64::new(0)),
132 object_id_getter,
133 }
134 }
135
136 async fn compact_key_range(
141 &self,
142 iter: impl HummockIterator<Direction = Forward>,
143 compaction_filter: impl CompactionFilter,
144 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
145 task_progress: Option<Arc<TaskProgress>>,
146 task_id: Option<HummockCompactionTaskId>,
147 split_index: Option<usize>,
148 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
149 let compact_timer = if self.context.is_share_buffer_compact {
151 self.context
152 .compactor_metrics
153 .write_build_l0_sst_duration
154 .start_timer()
155 } else {
156 self.context
157 .compactor_metrics
158 .compact_sst_duration
159 .start_timer()
160 };
161
162 let (split_table_outputs, table_stats_map) = {
163 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
164 if self.task_config.use_block_based_filter {
165 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
166 factory,
167 iter,
168 compaction_filter,
169 compaction_catalog_agent_ref,
170 task_progress.clone(),
171 self.object_id_getter.clone(),
172 )
173 .instrument_await("compact".verbose())
174 .await?
175 } else {
176 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
177 factory,
178 iter,
179 compaction_filter,
180 compaction_catalog_agent_ref,
181 task_progress.clone(),
182 self.object_id_getter.clone(),
183 )
184 .instrument_await("compact".verbose())
185 .await?
186 }
187 };
188
189 compact_timer.observe_duration();
190
191 Self::report_progress(
192 self.context.compactor_metrics.clone(),
193 task_progress,
194 &split_table_outputs,
195 self.context.is_share_buffer_compact,
196 );
197
198 self.context
199 .compactor_metrics
200 .get_table_id_total_time_duration
201 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
202
203 debug_assert!(
204 split_table_outputs
205 .iter()
206 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
207 );
208
209 if task_id.is_some() {
210 tracing::info!(
212 "Finish Task {:?} split_index {:?} sst count {}",
213 task_id,
214 split_index,
215 split_table_outputs.len()
216 );
217 }
218 Ok((split_table_outputs, table_stats_map))
219 }
220
221 pub fn report_progress(
222 metrics: Arc<CompactorMetrics>,
223 task_progress: Option<Arc<TaskProgress>>,
224 ssts: &Vec<LocalSstableInfo>,
225 is_share_buffer_compact: bool,
226 ) {
227 for sst_info in ssts {
228 let sst_size = sst_info.file_size();
229 if let Some(tracker) = &task_progress {
230 tracker.inc_ssts_uploaded();
231 tracker.dec_num_pending_write_io();
232 }
233 if is_share_buffer_compact {
234 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
235 } else {
236 metrics.compaction_upload_sst_counts.inc();
237 }
238 }
239 }
240
241 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
242 &self,
243 writer_factory: F,
244 iter: impl HummockIterator<Direction = Forward>,
245 compaction_filter: impl CompactionFilter,
246 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
247 task_progress: Option<Arc<TaskProgress>>,
248 object_id_getter: Arc<dyn GetObjectId>,
249 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
250 let builder_factory = RemoteBuilderFactory::<F, B> {
251 object_id_getter,
252 limiter: self.context.memory_limiter.clone(),
253 options: self.options.clone(),
254 policy: self.task_config.cache_policy,
255 remote_rpc_cost: self.get_id_time.clone(),
256 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
257 sstable_writer_factory: writer_factory,
258 _phantom: PhantomData,
259 };
260
261 let mut sst_builder = CapacitySplitTableBuilder::new(
262 builder_factory,
263 self.context.compactor_metrics.clone(),
264 task_progress.clone(),
265 self.task_config.table_vnode_partition.clone(),
266 self.context
267 .storage_opts
268 .compactor_concurrent_uploading_sst_count,
269 compaction_catalog_agent_ref,
270 );
271 let compaction_statistics = compact_and_build_sst(
272 &mut sst_builder,
273 &self.task_config,
274 self.context.compactor_metrics.clone(),
275 iter,
276 compaction_filter,
277 )
278 .instrument_await("compact_and_build_sst".verbose())
279 .await?;
280
281 let ssts = sst_builder
282 .finish()
283 .instrument_await("builder_finish".verbose())
284 .await?;
285
286 Ok((ssts, compaction_statistics))
287 }
288}
289
290#[must_use]
293pub fn start_iceberg_compactor(
294 compactor_context: CompactorContext,
295 hummock_meta_client: Arc<dyn HummockMetaClient>,
296) -> (JoinHandle<()>, Sender<()>) {
297 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
298 let stream_retry_interval = Duration::from_secs(30);
299 let periodic_event_update_interval = Duration::from_millis(1000);
300 let worker_num = compactor_context.compaction_executor.worker_num();
301
302 let max_task_parallelism: u32 = (worker_num as f32
303 * compactor_context.storage_opts.compactor_max_task_multiplier)
304 .ceil() as u32;
305
306 const MAX_PULL_TASK_COUNT: u32 = 4;
307 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
308
309 assert_ge!(
310 compactor_context.storage_opts.compactor_max_task_multiplier,
311 0.0
312 );
313
314 let join_handle = tokio::spawn(async move {
315 let pending_parallelism_budget = (max_task_parallelism as f32
317 * compactor_context
318 .storage_opts
319 .iceberg_compaction_pending_parallelism_budget_multiplier)
320 .ceil() as u32;
321 let (mut task_queue, _schedule_notify) =
322 IcebergTaskQueue::new_with_notify(max_task_parallelism, pending_parallelism_budget);
323
324 let shutdown_map = Arc::new(Mutex::new(
326 HashMap::<u64, tokio::sync::oneshot::Sender<()>>::new(),
327 ));
328
329 let (task_completion_tx, mut task_completion_rx) =
331 tokio::sync::mpsc::unbounded_channel::<u64>();
332
333 let mut min_interval = tokio::time::interval(stream_retry_interval);
334 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
335
336 'start_stream: loop {
338 let mut pull_task_ack = true;
341 tokio::select! {
342 _ = min_interval.tick() => {},
344 _ = &mut shutdown_rx => {
346 tracing::info!("Compactor is shutting down");
347 return;
348 }
349 }
350
351 let (request_sender, response_event_stream) = match hummock_meta_client
352 .subscribe_iceberg_compaction_event()
353 .await
354 {
355 Ok((request_sender, response_event_stream)) => {
356 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
357 (request_sender, response_event_stream)
358 }
359
360 Err(e) => {
361 tracing::warn!(
362 error = %e.as_report(),
363 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
364 );
365 continue 'start_stream;
366 }
367 };
368
369 pin_mut!(response_event_stream);
370
371 let _executor = compactor_context.compaction_executor.clone();
372
373 let mut event_loop_iteration_now = Instant::now();
375 'consume_stream: loop {
376 {
377 compactor_context
379 .compactor_metrics
380 .compaction_event_loop_iteration_latency
381 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
382 event_loop_iteration_now = Instant::now();
383 }
384
385 let request_sender = request_sender.clone();
386 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
387 Some(completed_task_id) = task_completion_rx.recv() => {
389 tracing::debug!(task_id = completed_task_id, "Task completed, updating queue state");
390 task_queue.finish_running(completed_task_id);
391 continue 'consume_stream;
392 }
393
394 _ = task_queue.wait_schedulable() => {
396 schedule_queued_tasks(
397 &mut task_queue,
398 &compactor_context,
399 &shutdown_map,
400 &task_completion_tx,
401 );
402 continue 'consume_stream;
403 }
404
405 _ = periodic_event_interval.tick() => {
406 let should_restart_stream = handle_meta_task_pulling(
408 &mut pull_task_ack,
409 &task_queue,
410 max_task_parallelism,
411 max_pull_task_count,
412 &request_sender,
413 );
414
415 if should_restart_stream {
416 continue 'start_stream;
417 }
418 continue;
419 }
420 event = response_event_stream.next() => {
421 event
422 }
423
424 _ = &mut shutdown_rx => {
425 tracing::info!("Iceberg Compactor is shutting down");
426 return
427 }
428 };
429
430 match event {
431 Some(Ok(SubscribeIcebergCompactionEventResponse {
432 event,
433 create_at: _create_at,
434 })) => {
435 let event = match event {
436 Some(event) => event,
437 None => continue 'consume_stream,
438 };
439
440 match event {
441 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
442 let task_id = iceberg_compaction_task.task_id;
443 let write_parquet_properties = WriterProperties::builder()
444 .set_created_by(concat!(
445 "risingwave version ",
446 env!("CARGO_PKG_VERSION")
447 )
448 .to_owned())
449 .set_max_row_group_size(
450 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
451 )
452 .set_compression(Compression::SNAPPY) .build();
454
455 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
456 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
457 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
458 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
459 .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
460 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
461 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
462 .write_parquet_properties(write_parquet_properties)
463 .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
464 .max_task_total_size(
465 compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
466 )
467 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
468 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
469 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
470 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
471 .build() {
472 Ok(config) => config,
473 Err(e) => {
474 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
475 continue 'consume_stream;
476 }
477 };
478
479 let iceberg_runner = match IcebergCompactorRunner::new(
480 iceberg_compaction_task,
481 compactor_runner_config,
482 compactor_context.compactor_metrics.clone(),
483 ).await {
484 Ok(runner) => runner,
485 Err(e) => {
486 tracing::warn!(error = %e.as_report(), "Failed to create iceberg compactor runner {}", task_id);
487 continue 'consume_stream;
488 }
489 };
490
491 let meta = iceberg_runner.to_meta();
493 let push_result = task_queue.push(meta.clone(), Some(iceberg_runner));
494
495 match push_result {
496 PushResult::Added => {
497 tracing::info!(
498 task_id = task_id,
499 unique_ident = %meta.unique_ident,
500 required_parallelism = meta.required_parallelism,
501 "Iceberg compaction task added to queue"
502 );
503 },
504 PushResult::Replaced { old_task_id } => {
505 tracing::info!(
506 task_id = task_id,
507 old_task_id = old_task_id,
508 unique_ident = %meta.unique_ident,
509 required_parallelism = meta.required_parallelism,
510 "Iceberg compaction task replaced in queue"
511 );
512 },
513 PushResult::RejectedRunningDuplicate => {
514 tracing::warn!(
515 task_id = task_id,
516 unique_ident = %meta.unique_ident,
517 "Iceberg compaction task rejected - duplicate already running"
518 );
519 },
520 PushResult::RejectedCapacity => {
521 tracing::warn!(
522 task_id = task_id,
523 unique_ident = %meta.unique_ident,
524 required_parallelism = meta.required_parallelism,
525 pending_budget = pending_parallelism_budget,
526 "Iceberg compaction task rejected - queue capacity exceeded"
527 );
528 },
529 PushResult::RejectedTooLarge => {
530 tracing::error!(
531 task_id = task_id,
532 unique_ident = %meta.unique_ident,
533 required_parallelism = meta.required_parallelism,
534 max_parallelism = max_task_parallelism,
535 "Iceberg compaction task rejected - parallelism requirement exceeds max"
536 );
537 },
538 PushResult::RejectedInvalidParallelism => {
539 tracing::error!(
540 task_id = task_id,
541 unique_ident = %meta.unique_ident,
542 required_parallelism = meta.required_parallelism,
543 "Iceberg compaction task rejected - invalid parallelism (must be > 0)"
544 );
545 }
546 }
547 },
548 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
549 pull_task_ack = true;
551 },
552 }
553 }
554 Some(Err(e)) => {
555 tracing::warn!("Failed to consume stream. {}", e.message());
556 continue 'start_stream;
557 }
558 _ => {
559 continue 'start_stream;
561 }
562 }
563 }
564 }
565 });
566
567 (join_handle, shutdown_tx)
568}
569
570#[must_use]
573pub fn start_compactor(
574 compactor_context: CompactorContext,
575 hummock_meta_client: Arc<dyn HummockMetaClient>,
576 object_id_manager: Arc<ObjectIdManager>,
577 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
578) -> (JoinHandle<()>, Sender<()>) {
579 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
580 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
581 let stream_retry_interval = Duration::from_secs(30);
582 let task_progress = compactor_context.task_progress_manager.clone();
583 let periodic_event_update_interval = Duration::from_millis(1000);
584
585 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
586 * compactor_context.storage_opts.compactor_max_task_multiplier)
587 .ceil() as u32;
588 let running_task_parallelism = Arc::new(AtomicU32::new(0));
589
590 const MAX_PULL_TASK_COUNT: u32 = 4;
591 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
592
593 assert_ge!(
594 compactor_context.storage_opts.compactor_max_task_multiplier,
595 0.0
596 );
597
598 let join_handle = tokio::spawn(async move {
599 let shutdown_map = CompactionShutdownMap::default();
600 let mut min_interval = tokio::time::interval(stream_retry_interval);
601 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
602
603 'start_stream: loop {
605 let mut pull_task_ack = true;
608 tokio::select! {
609 _ = min_interval.tick() => {},
611 _ = &mut shutdown_rx => {
613 tracing::info!("Compactor is shutting down");
614 return;
615 }
616 }
617
618 let (request_sender, response_event_stream) =
619 match hummock_meta_client.subscribe_compaction_event().await {
620 Ok((request_sender, response_event_stream)) => {
621 tracing::debug!("Succeeded subscribe_compaction_event.");
622 (request_sender, response_event_stream)
623 }
624
625 Err(e) => {
626 tracing::warn!(
627 error = %e.as_report(),
628 "Subscribing to compaction tasks failed with error. Will retry.",
629 );
630 continue 'start_stream;
631 }
632 };
633
634 pin_mut!(response_event_stream);
635
636 let executor = compactor_context.compaction_executor.clone();
637 let object_id_manager = object_id_manager.clone();
638
639 let mut event_loop_iteration_now = Instant::now();
641 'consume_stream: loop {
642 {
643 compactor_context
645 .compactor_metrics
646 .compaction_event_loop_iteration_latency
647 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
648 event_loop_iteration_now = Instant::now();
649 }
650
651 let running_task_parallelism = running_task_parallelism.clone();
652 let request_sender = request_sender.clone();
653 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
654 _ = periodic_event_interval.tick() => {
655 let progress_list = get_task_progress(task_progress.clone());
656
657 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
658 event: Some(RequestEvent::HeartBeat(
659 HeartBeat {
660 progress: progress_list
661 }
662 )),
663 create_at: SystemTime::now()
664 .duration_since(std::time::UNIX_EPOCH)
665 .expect("Clock may have gone backwards")
666 .as_millis() as u64,
667 }) {
668 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
669 continue 'start_stream;
671 }
672
673
674 let mut pending_pull_task_count = 0;
675 if pull_task_ack {
676 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
678
679 if pending_pull_task_count > 0 {
680 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
681 event: Some(RequestEvent::PullTask(
682 PullTask {
683 pull_task_count: pending_pull_task_count,
684 }
685 )),
686 create_at: SystemTime::now()
687 .duration_since(std::time::UNIX_EPOCH)
688 .expect("Clock may have gone backwards")
689 .as_millis() as u64,
690 }) {
691 tracing::warn!(error = %e.as_report(), "Failed to pull task");
692
693 continue 'start_stream;
695 } else {
696 pull_task_ack = false;
697 }
698 }
699 }
700
701 tracing::info!(
702 running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst),
703 pull_task_ack = %pull_task_ack,
704 pending_pull_task_count = %pending_pull_task_count
705 );
706
707 continue;
708 }
709 event = response_event_stream.next() => {
710 event
711 }
712
713 _ = &mut shutdown_rx => {
714 tracing::info!("Compactor is shutting down");
715 return
716 }
717 };
718
719 fn send_report_task_event(
720 compact_task: &CompactTask,
721 table_stats: TableStatsMap,
722 object_timestamps: HashMap<HummockSstableObjectId, u64>,
723 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
724 ) {
725 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
726 event: Some(RequestEvent::ReportTask(ReportTask {
727 task_id: compact_task.task_id,
728 task_status: compact_task.task_status.into(),
729 sorted_output_ssts: compact_task
730 .sorted_output_ssts
731 .iter()
732 .map(|sst| sst.into())
733 .collect(),
734 table_stats_change: to_prost_table_stats_map(table_stats),
735 object_timestamps: object_timestamps
736 .into_iter()
737 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
738 .collect(),
739 })),
740 create_at: SystemTime::now()
741 .duration_since(std::time::UNIX_EPOCH)
742 .expect("Clock may have gone backwards")
743 .as_millis() as u64,
744 }) {
745 let task_id = compact_task.task_id;
746 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
747 }
748 }
749
750 match event {
751 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
752 let event = match event {
753 Some(event) => event,
754 None => continue 'consume_stream,
755 };
756 let shutdown = shutdown_map.clone();
757 let context = compactor_context.clone();
758 let consumed_latency_ms = SystemTime::now()
759 .duration_since(std::time::UNIX_EPOCH)
760 .expect("Clock may have gone backwards")
761 .as_millis() as u64
762 - create_at;
763 context
764 .compactor_metrics
765 .compaction_event_consumed_latency
766 .observe(consumed_latency_ms as _);
767
768 let object_id_manager = object_id_manager.clone();
769 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
770
771 match event {
772 ResponseEvent::CompactTask(compact_task) => {
773 let compact_task = CompactTask::from(compact_task);
774 let parallelism =
775 calculate_task_parallelism(&compact_task, &context);
776
777 assert_ne!(parallelism, 0, "splits cannot be empty");
778
779 if (max_task_parallelism
780 - running_task_parallelism.load(Ordering::SeqCst))
781 < parallelism as u32
782 {
783 tracing::warn!(
784 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
785 compact_task.task_id,
786 parallelism,
787 max_task_parallelism,
788 running_task_parallelism.load(Ordering::Relaxed),
789 );
790 let (compact_task, table_stats, object_timestamps) =
791 compact_done(
792 compact_task,
793 context.clone(),
794 vec![],
795 TaskStatus::NoAvailCpuResourceCanceled,
796 );
797
798 send_report_task_event(
799 &compact_task,
800 table_stats,
801 object_timestamps,
802 &request_sender,
803 );
804
805 continue 'consume_stream;
806 }
807
808 running_task_parallelism
809 .fetch_add(parallelism as u32, Ordering::SeqCst);
810 executor.spawn(async move {
811 let (tx, rx) = tokio::sync::oneshot::channel();
812 let task_id = compact_task.task_id;
813 shutdown.lock().unwrap().insert(task_id, tx);
814
815 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
816 context.clone(),
817 compact_task,
818 rx,
819 object_id_manager.clone(),
820 compaction_catalog_manager_ref.clone(),
821 )
822 .await;
823
824 shutdown.lock().unwrap().remove(&task_id);
825 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
826
827 send_report_task_event(
828 &compact_task,
829 table_stats,
830 object_timestamps,
831 &request_sender,
832 );
833
834 let enable_check_compaction_result =
835 context.storage_opts.check_compaction_result;
836 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
837
838 if enable_check_compaction_result && need_check_task {
839 let compact_table_ids = compact_task.build_compact_table_ids();
840 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
841 Ok(compaction_catalog_agent_ref) => {
842 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
843 {
844 Err(e) => {
845 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
846 }
847 Ok(true) => (),
848 Ok(false) => {
849 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
850 }
851 }
852 },
853 Err(e) => {
854 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
855 }
856 }
857 }
858 });
859 }
860 ResponseEvent::VacuumTask(_) => {
861 unreachable!("unexpected vacuum task");
862 }
863 ResponseEvent::FullScanTask(_) => {
864 unreachable!("unexpected scan task");
865 }
866 ResponseEvent::ValidationTask(validation_task) => {
867 let validation_task = ValidationTask::from(validation_task);
868 executor.spawn(async move {
869 validate_ssts(validation_task, context.sstable_store.clone())
870 .await;
871 });
872 }
873 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
874 .lock()
875 .unwrap()
876 .remove(&cancel_compact_task.task_id)
877 {
878 Some(tx) => {
879 if tx.send(()).is_err() {
880 tracing::warn!(
881 "Cancellation of compaction task failed. task_id: {}",
882 cancel_compact_task.task_id
883 );
884 }
885 }
886 _ => {
887 tracing::warn!(
888 "Attempting to cancel non-existent compaction task. task_id: {}",
889 cancel_compact_task.task_id
890 );
891 }
892 },
893
894 ResponseEvent::PullTaskAck(_pull_task_ack) => {
895 pull_task_ack = true;
897 }
898 }
899 }
900 Some(Err(e)) => {
901 tracing::warn!("Failed to consume stream. {}", e.message());
902 continue 'start_stream;
903 }
904 _ => {
905 continue 'start_stream;
907 }
908 }
909 }
910 }
911 });
912
913 (join_handle, shutdown_tx)
914}
915
916#[must_use]
919pub fn start_shared_compactor(
920 grpc_proxy_client: GrpcCompactorProxyClient,
921 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
922 context: CompactorContext,
923) -> (JoinHandle<()>, Sender<()>) {
924 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
925 let task_progress = context.task_progress_manager.clone();
926 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
927 let periodic_event_update_interval = Duration::from_millis(1000);
928
929 let join_handle = tokio::spawn(async move {
930 let shutdown_map = CompactionShutdownMap::default();
931
932 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
933 let executor = context.compaction_executor.clone();
934 let report_heartbeat_client = grpc_proxy_client.clone();
935 'consume_stream: loop {
936 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
937 _ = periodic_event_interval.tick() => {
938 let progress_list = get_task_progress(task_progress.clone());
939 let report_compaction_task_request = ReportCompactionTaskRequest{
940 event: Some(ReportCompactionTaskEvent::HeartBeat(
941 SharedHeartBeat {
942 progress: progress_list
943 }
944 )),
945 };
946 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
947 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
948 }
949 continue
950 }
951
952
953 _ = &mut shutdown_rx => {
954 tracing::info!("Compactor is shutting down");
955 return
956 }
957
958 request = receiver.recv() => {
959 request
960 }
961
962 };
963 match request {
964 Some(request) => {
965 let context = context.clone();
966 let shutdown = shutdown_map.clone();
967
968 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
969 executor.spawn(async move {
970 let DispatchCompactionTaskRequest {
971 tables,
972 output_object_ids,
973 task: dispatch_task,
974 } = request.into_inner();
975 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
976 acc.insert(table.id, table);
977 acc
978 });
979
980 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
981 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
982 let shared_compactor_object_id_manager =
983 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
984 match dispatch_task.unwrap() {
985 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
986 let compact_task = CompactTask::from(&compact_task);
987 let (tx, rx) = tokio::sync::oneshot::channel();
988 let task_id = compact_task.task_id;
989 shutdown.lock().unwrap().insert(task_id, tx);
990
991 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
992 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
993 context.clone(),
994 compact_task,
995 rx,
996 shared_compactor_object_id_manager,
997 compaction_catalog_agent_ref.clone(),
998 )
999 .await;
1000 shutdown.lock().unwrap().remove(&task_id);
1001 let report_compaction_task_request = ReportCompactionTaskRequest {
1002 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1003 compact_task: Some(PbCompactTask::from(&compact_task)),
1004 table_stats_change: to_prost_table_stats_map(table_stats),
1005 object_timestamps: object_timestamps
1006 .into_iter()
1007 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1008 .collect(),
1009 })),
1010 };
1011
1012 match cloned_grpc_proxy_client
1013 .report_compaction_task(report_compaction_task_request)
1014 .await
1015 {
1016 Ok(_) => {
1017 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1019 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1020 if enable_check_compaction_result && need_check_task {
1021 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1022 Err(e) => {
1023 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1024 },
1025 Ok(true) => (),
1026 Ok(false) => {
1027 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1028 }
1029 }
1030 }
1031 }
1032 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1033 }
1034
1035 }
1036 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1037 unreachable!("unexpected vacuum task");
1038 }
1039 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1040 unreachable!("unexpected scan task");
1041 }
1042 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1043 let validation_task = ValidationTask::from(validation_task);
1044 validate_ssts(validation_task, context.sstable_store.clone()).await;
1045 }
1046 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1047 match shutdown
1048 .lock()
1049 .unwrap()
1050 .remove(&cancel_compact_task.task_id)
1051 { Some(tx) => {
1052 if tx.send(()).is_err() {
1053 tracing::warn!(
1054 "Cancellation of compaction task failed. task_id: {}",
1055 cancel_compact_task.task_id
1056 );
1057 }
1058 } _ => {
1059 tracing::warn!(
1060 "Attempting to cancel non-existent compaction task. task_id: {}",
1061 cancel_compact_task.task_id
1062 );
1063 }}
1064 }
1065 }
1066 });
1067 }
1068 None => continue 'consume_stream,
1069 }
1070 }
1071 });
1072 (join_handle, shutdown_tx)
1073}
1074
1075fn get_task_progress(
1076 task_progress: Arc<
1077 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1078 >,
1079) -> Vec<CompactTaskProgress> {
1080 let mut progress_list = Vec::new();
1081 for (&task_id, progress) in &*task_progress.lock() {
1082 progress_list.push(progress.snapshot(task_id));
1083 }
1084 progress_list
1085}
1086
1087fn schedule_queued_tasks(
1089 task_queue: &mut IcebergTaskQueue,
1090 compactor_context: &CompactorContext,
1091 shutdown_map: &Arc<Mutex<HashMap<u64, tokio::sync::oneshot::Sender<()>>>>,
1092 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<u64>,
1093) {
1094 while let Some(popped_task) = task_queue.pop() {
1095 let task_id = popped_task.meta.task_id;
1096 let Some(runner) = popped_task.runner else {
1097 tracing::error!(
1098 task_id = task_id,
1099 "Popped task missing runner - this should not happen"
1100 );
1101 task_queue.finish_running(task_id);
1102 continue;
1103 };
1104
1105 let executor = compactor_context.compaction_executor.clone();
1106 let shutdown_map_clone = shutdown_map.clone();
1107 let completion_tx_clone = task_completion_tx.clone();
1108
1109 tracing::info!(
1110 task_id = task_id,
1111 unique_ident = %popped_task.meta.unique_ident,
1112 required_parallelism = popped_task.meta.required_parallelism,
1113 "Starting iceberg compaction task from queue"
1114 );
1115
1116 executor.spawn(async move {
1117 let (tx, rx) = tokio::sync::oneshot::channel();
1118 {
1119 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1120 shutdown_guard.insert(task_id, tx);
1121 }
1122
1123 let _cleanup_guard = scopeguard::guard(
1124 (task_id, shutdown_map_clone, completion_tx_clone),
1125 move |(task_id, shutdown_map, completion_tx)| {
1126 {
1127 let mut shutdown_guard = shutdown_map.lock().unwrap();
1128 shutdown_guard.remove(&task_id);
1129 }
1130 if completion_tx.send(task_id).is_err() {
1133 tracing::warn!(task_id = task_id, "Failed to notify task completion - main loop may have shut down");
1134 }
1135 },
1136 );
1137
1138 if let Err(e) = runner.compact(rx).await {
1139 tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
1140 }
1141 });
1142 }
1143}
1144
1145fn handle_meta_task_pulling(
1148 pull_task_ack: &mut bool,
1149 task_queue: &IcebergTaskQueue,
1150 max_task_parallelism: u32,
1151 max_pull_task_count: u32,
1152 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1153) -> bool {
1154 let mut pending_pull_task_count = 0;
1155 if *pull_task_ack {
1156 let current_running_parallelism = task_queue.running_parallelism_sum();
1158 pending_pull_task_count =
1159 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1160
1161 if pending_pull_task_count > 0 {
1162 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1163 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1164 subscribe_iceberg_compaction_event_request::PullTask {
1165 pull_task_count: pending_pull_task_count,
1166 },
1167 )),
1168 create_at: SystemTime::now()
1169 .duration_since(std::time::UNIX_EPOCH)
1170 .expect("Clock may have gone backwards")
1171 .as_millis() as u64,
1172 }) {
1173 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1174 return true; } else {
1176 *pull_task_ack = false;
1177 }
1178 }
1179 }
1180
1181 tracing::info!(
1182 running_parallelism_count = %task_queue.running_parallelism_sum(),
1183 waiting_parallelism_count = %task_queue.waiting_parallelism_sum(),
1184 available_parallelism = %(max_task_parallelism.saturating_sub(task_queue.running_parallelism_sum())),
1185 pull_task_ack = %*pull_task_ack,
1186 pending_pull_task_count = %pending_pull_task_count
1187 );
1188
1189 false }