1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::{HashMap, VecDeque};
43use std::marker::PhantomData;
44use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
45use std::sync::{Arc, Mutex};
46use std::time::{Duration, SystemTime};
47
48use await_tree::{InstrumentAwait, SpanExt};
49pub use compaction_executor::CompactionExecutor;
50pub use compaction_filter::{
51 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
52 TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::{
60 IcebergCompactorRunnerConfigBuilder, create_plan_runners,
61};
62use iceberg_compaction::{IcebergTaskQueue, PushResult};
63pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
64use more_asserts::assert_ge;
65use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
66use risingwave_hummock_sdk::{
67 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
68};
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::subscribe_compaction_event_request::{
71 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
72};
73use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
74use risingwave_pb::hummock::{
75 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
76 SubscribeCompactionEventResponse,
77};
78use risingwave_rpc_client::HummockMetaClient;
79pub use shared_buffer_compact::{compact, merge_imms_in_memory};
80use tokio::sync::oneshot::Sender;
81use tokio::task::JoinHandle;
82use tokio::time::Instant;
83
84pub use self::compaction_utils::{
85 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
86 check_flush_result,
87};
88pub use self::task_progress::TaskProgress;
89use super::multi_builder::CapacitySplitTableBuilder;
90use super::{
91 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
92};
93use crate::compaction_catalog_manager::{
94 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
95};
96use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
97use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
98use crate::hummock::compactor::iceberg_compaction::TaskKey;
99use crate::hummock::iterator::{Forward, HummockIterator};
100use crate::hummock::{
101 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
102 UnifiedSstableWriterFactory, validate_ssts,
103};
104use crate::monitor::CompactorMetrics;
105
106const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
108
109#[derive(Debug, Clone, PartialEq, Eq)]
111struct CompactionLogState {
112 running_parallelism: u32,
113 pull_task_ack: bool,
114 pending_pull_task_count: u32,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119struct IcebergCompactionLogState {
120 running_parallelism: u32,
121 waiting_parallelism: u32,
122 available_parallelism: u32,
123 pull_task_ack: bool,
124 pending_pull_task_count: u32,
125}
126
127struct LogThrottler<T: PartialEq> {
129 last_logged_state: Option<T>,
130 last_heartbeat: Instant,
131 heartbeat_interval: Duration,
132}
133
134impl<T: PartialEq> LogThrottler<T> {
135 fn new(heartbeat_interval: Duration) -> Self {
136 Self {
137 last_logged_state: None,
138 last_heartbeat: Instant::now(),
139 heartbeat_interval,
140 }
141 }
142
143 fn should_log(&self, current_state: &T) -> bool {
145 self.last_logged_state.as_ref() != Some(current_state)
146 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
147 }
148
149 fn update(&mut self, current_state: T) {
151 self.last_logged_state = Some(current_state);
152 self.last_heartbeat = Instant::now();
153 }
154}
155
156pub struct Compactor {
158 context: CompactorContext,
160 object_id_getter: Arc<dyn GetObjectId>,
161 task_config: TaskConfig,
162 options: SstableBuilderOptions,
163 get_id_time: Arc<AtomicU64>,
164}
165
166pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
167
168impl Compactor {
169 pub fn new(
171 context: CompactorContext,
172 options: SstableBuilderOptions,
173 task_config: TaskConfig,
174 object_id_getter: Arc<dyn GetObjectId>,
175 ) -> Self {
176 Self {
177 context,
178 options,
179 task_config,
180 get_id_time: Arc::new(AtomicU64::new(0)),
181 object_id_getter,
182 }
183 }
184
185 async fn compact_key_range(
190 &self,
191 iter: impl HummockIterator<Direction = Forward>,
192 compaction_filter: impl CompactionFilter,
193 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
194 task_progress: Option<Arc<TaskProgress>>,
195 task_id: Option<HummockCompactionTaskId>,
196 split_index: Option<usize>,
197 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
198 let compact_timer = if self.context.is_share_buffer_compact {
200 self.context
201 .compactor_metrics
202 .write_build_l0_sst_duration
203 .start_timer()
204 } else {
205 self.context
206 .compactor_metrics
207 .compact_sst_duration
208 .start_timer()
209 };
210
211 let (split_table_outputs, table_stats_map) = {
212 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
213 if self.task_config.use_block_based_filter {
214 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
215 factory,
216 iter,
217 compaction_filter,
218 compaction_catalog_agent_ref,
219 task_progress.clone(),
220 self.object_id_getter.clone(),
221 )
222 .instrument_await("compact".verbose())
223 .await?
224 } else {
225 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
226 factory,
227 iter,
228 compaction_filter,
229 compaction_catalog_agent_ref,
230 task_progress.clone(),
231 self.object_id_getter.clone(),
232 )
233 .instrument_await("compact".verbose())
234 .await?
235 }
236 };
237
238 compact_timer.observe_duration();
239
240 Self::report_progress(
241 self.context.compactor_metrics.clone(),
242 task_progress,
243 &split_table_outputs,
244 self.context.is_share_buffer_compact,
245 );
246
247 self.context
248 .compactor_metrics
249 .get_table_id_total_time_duration
250 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
251
252 debug_assert!(
253 split_table_outputs
254 .iter()
255 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
256 );
257
258 if task_id.is_some() {
259 tracing::info!(
261 "Finish Task {:?} split_index {:?} sst count {}",
262 task_id,
263 split_index,
264 split_table_outputs.len()
265 );
266 }
267 Ok((split_table_outputs, table_stats_map))
268 }
269
270 pub fn report_progress(
271 metrics: Arc<CompactorMetrics>,
272 task_progress: Option<Arc<TaskProgress>>,
273 ssts: &Vec<LocalSstableInfo>,
274 is_share_buffer_compact: bool,
275 ) {
276 for sst_info in ssts {
277 let sst_size = sst_info.file_size();
278 if let Some(tracker) = &task_progress {
279 tracker.inc_ssts_uploaded();
280 tracker.dec_num_pending_write_io();
281 }
282 if is_share_buffer_compact {
283 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
284 } else {
285 metrics.compaction_upload_sst_counts.inc();
286 }
287 }
288 }
289
290 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
291 &self,
292 writer_factory: F,
293 iter: impl HummockIterator<Direction = Forward>,
294 compaction_filter: impl CompactionFilter,
295 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
296 task_progress: Option<Arc<TaskProgress>>,
297 object_id_getter: Arc<dyn GetObjectId>,
298 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
299 let builder_factory = RemoteBuilderFactory::<F, B> {
300 object_id_getter,
301 limiter: self.context.memory_limiter.clone(),
302 options: self.options.clone(),
303 policy: self.task_config.cache_policy,
304 remote_rpc_cost: self.get_id_time.clone(),
305 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
306 sstable_writer_factory: writer_factory,
307 _phantom: PhantomData,
308 };
309
310 let mut sst_builder = CapacitySplitTableBuilder::new(
311 builder_factory,
312 self.context.compactor_metrics.clone(),
313 task_progress.clone(),
314 self.task_config.table_vnode_partition.clone(),
315 self.context
316 .storage_opts
317 .compactor_concurrent_uploading_sst_count,
318 compaction_catalog_agent_ref,
319 );
320 let compaction_statistics = compact_and_build_sst(
321 &mut sst_builder,
322 &self.task_config,
323 self.context.compactor_metrics.clone(),
324 iter,
325 compaction_filter,
326 )
327 .instrument_await("compact_and_build_sst".verbose())
328 .await?;
329
330 let ssts = sst_builder
331 .finish()
332 .instrument_await("builder_finish".verbose())
333 .await?;
334
335 Ok((ssts, compaction_statistics))
336 }
337}
338
339#[must_use]
342pub fn start_iceberg_compactor(
343 compactor_context: CompactorContext,
344 hummock_meta_client: Arc<dyn HummockMetaClient>,
345) -> (JoinHandle<()>, Sender<()>) {
346 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
347 let stream_retry_interval = Duration::from_secs(30);
348 let periodic_event_update_interval = Duration::from_millis(1000);
349 let worker_num = compactor_context.compaction_executor.worker_num();
350
351 let max_task_parallelism: u32 = (worker_num as f32
352 * compactor_context.storage_opts.compactor_max_task_multiplier)
353 .ceil() as u32;
354
355 const MAX_PULL_TASK_COUNT: u32 = 4;
356 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
357
358 assert_ge!(
359 compactor_context.storage_opts.compactor_max_task_multiplier,
360 0.0
361 );
362
363 let join_handle = tokio::spawn(async move {
364 let pending_parallelism_budget = (max_task_parallelism as f32
366 * compactor_context
367 .storage_opts
368 .iceberg_compaction_pending_parallelism_budget_multiplier)
369 .ceil() as u32;
370 let mut task_queue =
371 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
372
373 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
375
376 let (task_completion_tx, mut task_completion_rx) =
378 tokio::sync::mpsc::unbounded_channel::<TaskKey>();
379
380 let mut min_interval = tokio::time::interval(stream_retry_interval);
381 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
382
383 let mut log_throttler =
385 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
386
387 'start_stream: loop {
389 let mut pull_task_ack = true;
392 tokio::select! {
393 _ = min_interval.tick() => {},
395 _ = &mut shutdown_rx => {
397 tracing::info!("Compactor is shutting down");
398 return;
399 }
400 }
401
402 let (request_sender, response_event_stream) = match hummock_meta_client
403 .subscribe_iceberg_compaction_event()
404 .await
405 {
406 Ok((request_sender, response_event_stream)) => {
407 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
408 (request_sender, response_event_stream)
409 }
410
411 Err(e) => {
412 tracing::warn!(
413 error = %e.as_report(),
414 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
415 );
416 continue 'start_stream;
417 }
418 };
419
420 pin_mut!(response_event_stream);
421
422 let _executor = compactor_context.compaction_executor.clone();
423
424 let mut event_loop_iteration_now = Instant::now();
426 'consume_stream: loop {
427 {
428 compactor_context
430 .compactor_metrics
431 .compaction_event_loop_iteration_latency
432 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
433 event_loop_iteration_now = Instant::now();
434 }
435
436 let request_sender = request_sender.clone();
437 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
438 Some(completed_task_key) = task_completion_rx.recv() => {
440 tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
441 task_queue.finish_running(completed_task_key);
442 continue 'consume_stream;
443 }
444
445 _ = task_queue.wait_schedulable() => {
447 schedule_queued_tasks(
448 &mut task_queue,
449 &compactor_context,
450 &shutdown_map,
451 &task_completion_tx,
452 );
453 continue 'consume_stream;
454 }
455
456 _ = periodic_event_interval.tick() => {
457 let should_restart_stream = handle_meta_task_pulling(
459 &mut pull_task_ack,
460 &task_queue,
461 max_task_parallelism,
462 max_pull_task_count,
463 &request_sender,
464 &mut log_throttler,
465 );
466
467 if should_restart_stream {
468 continue 'start_stream;
469 }
470 continue;
471 }
472 event = response_event_stream.next() => {
473 event
474 }
475
476 _ = &mut shutdown_rx => {
477 tracing::info!("Iceberg Compactor is shutting down");
478 return
479 }
480 };
481
482 match event {
483 Some(Ok(SubscribeIcebergCompactionEventResponse {
484 event,
485 create_at: _create_at,
486 })) => {
487 let event = match event {
488 Some(event) => event,
489 None => continue 'consume_stream,
490 };
491
492 match event {
493 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
494 let task_id = iceberg_compaction_task.task_id;
495 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
497 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
498 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
499 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
500 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
501 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
502 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
503 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
504 .target_binpack_group_size_mb(
505 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
506 )
507 .min_group_size_mb(
508 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
509 )
510 .min_group_file_count(
511 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
512 )
513 .build() {
514 Ok(config) => config,
515 Err(e) => {
516 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
517 continue 'consume_stream;
518 }
519 };
520
521 let plan_runners = match create_plan_runners(
523 iceberg_compaction_task,
524 compactor_runner_config,
525 compactor_context.compactor_metrics.clone(),
526 ).await {
527 Ok(runners) => runners,
528 Err(e) => {
529 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
530 continue 'consume_stream;
531 }
532 };
533
534 if plan_runners.is_empty() {
535 tracing::info!(task_id, "No plans to execute");
536 continue 'consume_stream;
537 }
538
539 let total_plans = plan_runners.len();
541 let mut enqueued_count = 0;
542
543 for runner in plan_runners {
544 let meta = runner.to_meta();
545 let required_parallelism = runner.required_parallelism();
546 let push_result = task_queue.push(meta.clone(), Some(runner));
547
548 match push_result {
549 PushResult::Added => {
550 enqueued_count += 1;
551 tracing::debug!(
552 task_id = task_id,
553 plan_index = enqueued_count - 1,
554 required_parallelism = required_parallelism,
555 "Iceberg plan runner added to queue"
556 );
557 },
558 PushResult::RejectedCapacity => {
559 tracing::warn!(
560 task_id = task_id,
561 required_parallelism = required_parallelism,
562 pending_budget = pending_parallelism_budget,
563 enqueued_count = enqueued_count,
564 total_plans = total_plans,
565 "Iceberg plan runner rejected - queue capacity exceeded"
566 );
567 break;
569 },
570 PushResult::RejectedTooLarge => {
571 tracing::error!(
572 task_id = task_id,
573 required_parallelism = required_parallelism,
574 max_parallelism = max_task_parallelism,
575 "Iceberg plan runner rejected - parallelism exceeds max"
576 );
577 },
578 PushResult::RejectedInvalidParallelism => {
579 tracing::error!(
580 task_id = task_id,
581 required_parallelism = required_parallelism,
582 "Iceberg plan runner rejected - invalid parallelism"
583 );
584 },
585 PushResult::RejectedDuplicate => {
586 tracing::error!(
587 task_id = task_id,
588 plan_index = meta.plan_index,
589 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
590 );
591 }
592 }
593 }
594
595 tracing::info!(
596 task_id = task_id,
597 total_plans = total_plans,
598 enqueued_count = enqueued_count,
599 "Enqueued {} of {} Iceberg plan runners",
600 enqueued_count,
601 total_plans
602 );
603 },
604 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
605 pull_task_ack = true;
607 },
608 }
609 }
610 Some(Err(e)) => {
611 tracing::warn!("Failed to consume stream. {}", e.message());
612 continue 'start_stream;
613 }
614 _ => {
615 continue 'start_stream;
617 }
618 }
619 }
620 }
621 });
622
623 (join_handle, shutdown_tx)
624}
625
626#[must_use]
629pub fn start_compactor(
630 compactor_context: CompactorContext,
631 hummock_meta_client: Arc<dyn HummockMetaClient>,
632 object_id_manager: Arc<ObjectIdManager>,
633 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
634) -> (JoinHandle<()>, Sender<()>) {
635 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
636 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
637 let stream_retry_interval = Duration::from_secs(30);
638 let task_progress = compactor_context.task_progress_manager.clone();
639 let periodic_event_update_interval = Duration::from_millis(1000);
640
641 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
642 * compactor_context.storage_opts.compactor_max_task_multiplier)
643 .ceil() as u32;
644 let running_task_parallelism = Arc::new(AtomicU32::new(0));
645
646 const MAX_PULL_TASK_COUNT: u32 = 4;
647 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
648
649 assert_ge!(
650 compactor_context.storage_opts.compactor_max_task_multiplier,
651 0.0
652 );
653
654 let join_handle = tokio::spawn(async move {
655 let shutdown_map = CompactionShutdownMap::default();
656 let mut min_interval = tokio::time::interval(stream_retry_interval);
657 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
658
659 let mut log_throttler =
661 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
662
663 'start_stream: loop {
665 let mut pull_task_ack = true;
668 tokio::select! {
669 _ = min_interval.tick() => {},
671 _ = &mut shutdown_rx => {
673 tracing::info!("Compactor is shutting down");
674 return;
675 }
676 }
677
678 let (request_sender, response_event_stream) =
679 match hummock_meta_client.subscribe_compaction_event().await {
680 Ok((request_sender, response_event_stream)) => {
681 tracing::debug!("Succeeded subscribe_compaction_event.");
682 (request_sender, response_event_stream)
683 }
684
685 Err(e) => {
686 tracing::warn!(
687 error = %e.as_report(),
688 "Subscribing to compaction tasks failed with error. Will retry.",
689 );
690 continue 'start_stream;
691 }
692 };
693
694 pin_mut!(response_event_stream);
695
696 let executor = compactor_context.compaction_executor.clone();
697 let object_id_manager = object_id_manager.clone();
698
699 let mut event_loop_iteration_now = Instant::now();
701 'consume_stream: loop {
702 {
703 compactor_context
705 .compactor_metrics
706 .compaction_event_loop_iteration_latency
707 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
708 event_loop_iteration_now = Instant::now();
709 }
710
711 let running_task_parallelism = running_task_parallelism.clone();
712 let request_sender = request_sender.clone();
713 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
714 _ = periodic_event_interval.tick() => {
715 let progress_list = get_task_progress(task_progress.clone());
716
717 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
718 event: Some(RequestEvent::HeartBeat(
719 HeartBeat {
720 progress: progress_list
721 }
722 )),
723 create_at: SystemTime::now()
724 .duration_since(std::time::UNIX_EPOCH)
725 .expect("Clock may have gone backwards")
726 .as_millis() as u64,
727 }) {
728 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
729 continue 'start_stream;
731 }
732
733
734 let mut pending_pull_task_count = 0;
735 if pull_task_ack {
736 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
738
739 if pending_pull_task_count > 0 {
740 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
741 event: Some(RequestEvent::PullTask(
742 PullTask {
743 pull_task_count: pending_pull_task_count,
744 }
745 )),
746 create_at: SystemTime::now()
747 .duration_since(std::time::UNIX_EPOCH)
748 .expect("Clock may have gone backwards")
749 .as_millis() as u64,
750 }) {
751 tracing::warn!(error = %e.as_report(), "Failed to pull task");
752
753 continue 'start_stream;
755 } else {
756 pull_task_ack = false;
757 }
758 }
759 }
760
761 let running_count = running_task_parallelism.load(Ordering::SeqCst);
762 let current_state = CompactionLogState {
763 running_parallelism: running_count,
764 pull_task_ack,
765 pending_pull_task_count,
766 };
767
768 if log_throttler.should_log(¤t_state) {
770 tracing::info!(
771 running_parallelism_count = %current_state.running_parallelism,
772 pull_task_ack = %current_state.pull_task_ack,
773 pending_pull_task_count = %current_state.pending_pull_task_count
774 );
775 log_throttler.update(current_state);
776 }
777
778 continue;
779 }
780 event = response_event_stream.next() => {
781 event
782 }
783
784 _ = &mut shutdown_rx => {
785 tracing::info!("Compactor is shutting down");
786 return
787 }
788 };
789
790 fn send_report_task_event(
791 compact_task: &CompactTask,
792 table_stats: TableStatsMap,
793 object_timestamps: HashMap<HummockSstableObjectId, u64>,
794 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
795 ) {
796 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
797 event: Some(RequestEvent::ReportTask(ReportTask {
798 task_id: compact_task.task_id,
799 task_status: compact_task.task_status.into(),
800 sorted_output_ssts: compact_task
801 .sorted_output_ssts
802 .iter()
803 .map(|sst| sst.into())
804 .collect(),
805 table_stats_change: to_prost_table_stats_map(table_stats),
806 object_timestamps,
807 })),
808 create_at: SystemTime::now()
809 .duration_since(std::time::UNIX_EPOCH)
810 .expect("Clock may have gone backwards")
811 .as_millis() as u64,
812 }) {
813 let task_id = compact_task.task_id;
814 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
815 }
816 }
817
818 match event {
819 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
820 let event = match event {
821 Some(event) => event,
822 None => continue 'consume_stream,
823 };
824 let shutdown = shutdown_map.clone();
825 let context = compactor_context.clone();
826 let consumed_latency_ms = SystemTime::now()
827 .duration_since(std::time::UNIX_EPOCH)
828 .expect("Clock may have gone backwards")
829 .as_millis() as u64
830 - create_at;
831 context
832 .compactor_metrics
833 .compaction_event_consumed_latency
834 .observe(consumed_latency_ms as _);
835
836 let object_id_manager = object_id_manager.clone();
837 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
838
839 match event {
840 ResponseEvent::CompactTask(compact_task) => {
841 let compact_task = CompactTask::from(compact_task);
842 let parallelism =
843 calculate_task_parallelism(&compact_task, &context);
844
845 assert_ne!(parallelism, 0, "splits cannot be empty");
846
847 if (max_task_parallelism
848 - running_task_parallelism.load(Ordering::SeqCst))
849 < parallelism as u32
850 {
851 tracing::warn!(
852 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
853 compact_task.task_id,
854 parallelism,
855 max_task_parallelism,
856 running_task_parallelism.load(Ordering::Relaxed),
857 );
858 let (compact_task, table_stats, object_timestamps) =
859 compact_done(
860 compact_task,
861 context.clone(),
862 vec![],
863 TaskStatus::NoAvailCpuResourceCanceled,
864 );
865
866 send_report_task_event(
867 &compact_task,
868 table_stats,
869 object_timestamps,
870 &request_sender,
871 );
872
873 continue 'consume_stream;
874 }
875
876 running_task_parallelism
877 .fetch_add(parallelism as u32, Ordering::SeqCst);
878 executor.spawn(async move {
879 let (tx, rx) = tokio::sync::oneshot::channel();
880 let task_id = compact_task.task_id;
881 shutdown.lock().unwrap().insert(task_id, tx);
882
883 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
884 context.clone(),
885 compact_task,
886 rx,
887 object_id_manager.clone(),
888 compaction_catalog_manager_ref.clone(),
889 )
890 .await;
891
892 shutdown.lock().unwrap().remove(&task_id);
893 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
894
895 send_report_task_event(
896 &compact_task,
897 table_stats,
898 object_timestamps,
899 &request_sender,
900 );
901
902 let enable_check_compaction_result =
903 context.storage_opts.check_compaction_result;
904 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
905
906 if enable_check_compaction_result && need_check_task {
907 let compact_table_ids = compact_task.build_compact_table_ids();
908 match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
909 Ok(compaction_catalog_agent_ref) => {
910 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
911 {
912 Err(e) => {
913 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
914 }
915 Ok(true) => (),
916 Ok(false) => {
917 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
918 }
919 }
920 },
921 Err(e) => {
922 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
923 }
924 }
925 }
926 });
927 }
928 #[expect(deprecated)]
929 ResponseEvent::VacuumTask(_) => {
930 unreachable!("unexpected vacuum task");
931 }
932 #[expect(deprecated)]
933 ResponseEvent::FullScanTask(_) => {
934 unreachable!("unexpected scan task");
935 }
936 #[expect(deprecated)]
937 ResponseEvent::ValidationTask(validation_task) => {
938 let validation_task = ValidationTask::from(validation_task);
939 executor.spawn(async move {
940 validate_ssts(validation_task, context.sstable_store.clone())
941 .await;
942 });
943 }
944 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
945 .lock()
946 .unwrap()
947 .remove(&cancel_compact_task.task_id)
948 {
949 Some(tx) => {
950 if tx.send(()).is_err() {
951 tracing::warn!(
952 "Cancellation of compaction task failed. task_id: {}",
953 cancel_compact_task.task_id
954 );
955 }
956 }
957 _ => {
958 tracing::warn!(
959 "Attempting to cancel non-existent compaction task. task_id: {}",
960 cancel_compact_task.task_id
961 );
962 }
963 },
964
965 ResponseEvent::PullTaskAck(_pull_task_ack) => {
966 pull_task_ack = true;
968 }
969 }
970 }
971 Some(Err(e)) => {
972 tracing::warn!("Failed to consume stream. {}", e.message());
973 continue 'start_stream;
974 }
975 _ => {
976 continue 'start_stream;
978 }
979 }
980 }
981 }
982 });
983
984 (join_handle, shutdown_tx)
985}
986
987#[must_use]
990pub fn start_shared_compactor(
991 grpc_proxy_client: GrpcCompactorProxyClient,
992 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
993 context: CompactorContext,
994) -> (JoinHandle<()>, Sender<()>) {
995 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
996 let task_progress = context.task_progress_manager.clone();
997 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
998 let periodic_event_update_interval = Duration::from_millis(1000);
999
1000 let join_handle = tokio::spawn(async move {
1001 let shutdown_map = CompactionShutdownMap::default();
1002
1003 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1004 let executor = context.compaction_executor.clone();
1005 let report_heartbeat_client = grpc_proxy_client.clone();
1006 'consume_stream: loop {
1007 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1008 _ = periodic_event_interval.tick() => {
1009 let progress_list = get_task_progress(task_progress.clone());
1010 let report_compaction_task_request = ReportCompactionTaskRequest{
1011 event: Some(ReportCompactionTaskEvent::HeartBeat(
1012 SharedHeartBeat {
1013 progress: progress_list
1014 }
1015 )),
1016 };
1017 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1018 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1019 }
1020 continue
1021 }
1022
1023
1024 _ = &mut shutdown_rx => {
1025 tracing::info!("Compactor is shutting down");
1026 return
1027 }
1028
1029 request = receiver.recv() => {
1030 request
1031 }
1032
1033 };
1034 match request {
1035 Some(request) => {
1036 let context = context.clone();
1037 let shutdown = shutdown_map.clone();
1038
1039 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1040 executor.spawn(async move {
1041 let DispatchCompactionTaskRequest {
1042 tables,
1043 output_object_ids,
1044 task: dispatch_task,
1045 } = request.into_inner();
1046 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1047 acc.insert(table.id, table);
1048 acc
1049 });
1050
1051 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1052 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1053 let shared_compactor_object_id_manager =
1054 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1055 match dispatch_task.unwrap() {
1056 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1057 let compact_task = CompactTask::from(&compact_task);
1058 let (tx, rx) = tokio::sync::oneshot::channel();
1059 let task_id = compact_task.task_id;
1060 shutdown.lock().unwrap().insert(task_id, tx);
1061
1062 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1063 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1064 context.clone(),
1065 compact_task,
1066 rx,
1067 shared_compactor_object_id_manager,
1068 compaction_catalog_agent_ref.clone(),
1069 )
1070 .await;
1071 shutdown.lock().unwrap().remove(&task_id);
1072 let report_compaction_task_request = ReportCompactionTaskRequest {
1073 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1074 compact_task: Some(PbCompactTask::from(&compact_task)),
1075 table_stats_change: to_prost_table_stats_map(table_stats),
1076 object_timestamps,
1077 })),
1078 };
1079
1080 match cloned_grpc_proxy_client
1081 .report_compaction_task(report_compaction_task_request)
1082 .await
1083 {
1084 Ok(_) => {
1085 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1087 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1088 if enable_check_compaction_result && need_check_task {
1089 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1090 Err(e) => {
1091 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1092 },
1093 Ok(true) => (),
1094 Ok(false) => {
1095 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1096 }
1097 }
1098 }
1099 }
1100 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1101 }
1102
1103 }
1104 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1105 unreachable!("unexpected vacuum task");
1106 }
1107 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1108 unreachable!("unexpected scan task");
1109 }
1110 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1111 let validation_task = ValidationTask::from(validation_task);
1112 validate_ssts(validation_task, context.sstable_store.clone()).await;
1113 }
1114 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1115 match shutdown
1116 .lock()
1117 .unwrap()
1118 .remove(&cancel_compact_task.task_id)
1119 { Some(tx) => {
1120 if tx.send(()).is_err() {
1121 tracing::warn!(
1122 "Cancellation of compaction task failed. task_id: {}",
1123 cancel_compact_task.task_id
1124 );
1125 }
1126 } _ => {
1127 tracing::warn!(
1128 "Attempting to cancel non-existent compaction task. task_id: {}",
1129 cancel_compact_task.task_id
1130 );
1131 }}
1132 }
1133 }
1134 });
1135 }
1136 None => continue 'consume_stream,
1137 }
1138 }
1139 });
1140 (join_handle, shutdown_tx)
1141}
1142
1143fn get_task_progress(
1144 task_progress: Arc<
1145 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1146 >,
1147) -> Vec<CompactTaskProgress> {
1148 let mut progress_list = Vec::new();
1149 for (&task_id, progress) in &*task_progress.lock() {
1150 progress_list.push(progress.snapshot(task_id));
1151 }
1152 progress_list
1153}
1154
1155fn schedule_queued_tasks(
1157 task_queue: &mut IcebergTaskQueue,
1158 compactor_context: &CompactorContext,
1159 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1160 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1161) {
1162 while let Some(popped_task) = task_queue.pop() {
1163 let task_id = popped_task.meta.task_id;
1164 let plan_index = popped_task.meta.plan_index;
1165 let task_key = (task_id, plan_index);
1166
1167 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1169
1170 let Some(runner) = popped_task.runner else {
1171 tracing::error!(
1172 task_id = task_id,
1173 plan_index = plan_index,
1174 "Popped task missing runner - this should not happen"
1175 );
1176 task_queue.finish_running(task_key);
1177 continue;
1178 };
1179
1180 let executor = compactor_context.compaction_executor.clone();
1181 let shutdown_map_clone = shutdown_map.clone();
1182 let completion_tx_clone = task_completion_tx.clone();
1183
1184 tracing::info!(
1185 task_id = task_id,
1186 plan_index = plan_index,
1187 unique_ident = ?unique_ident,
1188 required_parallelism = popped_task.meta.required_parallelism,
1189 "Starting iceberg compaction task from queue"
1190 );
1191
1192 executor.spawn(async move {
1193 let (tx, rx) = tokio::sync::oneshot::channel();
1194 {
1195 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1196 shutdown_guard.insert(task_key, tx);
1197 }
1198
1199 let _cleanup_guard = scopeguard::guard(
1200 (task_key, shutdown_map_clone, completion_tx_clone),
1201 move |(task_key, shutdown_map, completion_tx)| {
1202 {
1203 let mut shutdown_guard = shutdown_map.lock().unwrap();
1204 shutdown_guard.remove(&task_key);
1205 }
1206 if completion_tx.send(task_key).is_err() {
1209 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1210 }
1211 },
1212 );
1213
1214 if let Err(e) = Box::pin(runner.compact(rx)).await {
1215 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1216 }
1217 });
1218 }
1219}
1220
1221fn handle_meta_task_pulling(
1224 pull_task_ack: &mut bool,
1225 task_queue: &IcebergTaskQueue,
1226 max_task_parallelism: u32,
1227 max_pull_task_count: u32,
1228 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1229 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1230) -> bool {
1231 let mut pending_pull_task_count = 0;
1232 if *pull_task_ack {
1233 let current_running_parallelism = task_queue.running_parallelism_sum();
1235 pending_pull_task_count =
1236 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1237
1238 if pending_pull_task_count > 0 {
1239 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1240 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1241 subscribe_iceberg_compaction_event_request::PullTask {
1242 pull_task_count: pending_pull_task_count,
1243 },
1244 )),
1245 create_at: SystemTime::now()
1246 .duration_since(std::time::UNIX_EPOCH)
1247 .expect("Clock may have gone backwards")
1248 .as_millis() as u64,
1249 }) {
1250 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1251 return true; } else {
1253 *pull_task_ack = false;
1254 }
1255 }
1256 }
1257
1258 let running_count = task_queue.running_parallelism_sum();
1259 let waiting_count = task_queue.waiting_parallelism_sum();
1260 let available_count = max_task_parallelism.saturating_sub(running_count);
1261 let current_state = IcebergCompactionLogState {
1262 running_parallelism: running_count,
1263 waiting_parallelism: waiting_count,
1264 available_parallelism: available_count,
1265 pull_task_ack: *pull_task_ack,
1266 pending_pull_task_count,
1267 };
1268
1269 if log_throttler.should_log(¤t_state) {
1271 tracing::info!(
1272 running_parallelism_count = %current_state.running_parallelism,
1273 waiting_parallelism_count = %current_state.waiting_parallelism,
1274 available_parallelism = %current_state.available_parallelism,
1275 pull_task_ack = %current_state.pull_task_ack,
1276 pending_pull_task_count = %current_state.pending_pull_task_count
1277 );
1278 log_throttler.update(current_state);
1279 }
1280
1281 false }
1283
1284#[cfg(test)]
1285mod tests {
1286 use super::*;
1287
1288 #[test]
1289 fn test_log_state_equality() {
1290 let state1 = CompactionLogState {
1292 running_parallelism: 10,
1293 pull_task_ack: true,
1294 pending_pull_task_count: 2,
1295 };
1296 let state2 = CompactionLogState {
1297 running_parallelism: 10,
1298 pull_task_ack: true,
1299 pending_pull_task_count: 2,
1300 };
1301 let state3 = CompactionLogState {
1302 running_parallelism: 11,
1303 pull_task_ack: true,
1304 pending_pull_task_count: 2,
1305 };
1306 assert_eq!(state1, state2);
1307 assert_ne!(state1, state3);
1308
1309 let ice_state1 = IcebergCompactionLogState {
1311 running_parallelism: 10,
1312 waiting_parallelism: 5,
1313 available_parallelism: 15,
1314 pull_task_ack: true,
1315 pending_pull_task_count: 2,
1316 };
1317 let ice_state2 = IcebergCompactionLogState {
1318 running_parallelism: 10,
1319 waiting_parallelism: 6,
1320 available_parallelism: 15,
1321 pull_task_ack: true,
1322 pending_pull_task_count: 2,
1323 };
1324 assert_ne!(ice_state1, ice_state2);
1325 }
1326
1327 #[test]
1328 fn test_log_throttler_state_change_detection() {
1329 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1330 let state1 = CompactionLogState {
1331 running_parallelism: 10,
1332 pull_task_ack: true,
1333 pending_pull_task_count: 2,
1334 };
1335 let state2 = CompactionLogState {
1336 running_parallelism: 11,
1337 pull_task_ack: true,
1338 pending_pull_task_count: 2,
1339 };
1340
1341 assert!(throttler.should_log(&state1));
1343 throttler.update(state1.clone());
1344
1345 assert!(!throttler.should_log(&state1));
1347
1348 assert!(throttler.should_log(&state2));
1350 throttler.update(state2.clone());
1351
1352 assert!(!throttler.should_log(&state2));
1354 }
1355
1356 #[test]
1357 fn test_log_throttler_heartbeat() {
1358 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1359 let state = CompactionLogState {
1360 running_parallelism: 10,
1361 pull_task_ack: true,
1362 pending_pull_task_count: 2,
1363 };
1364
1365 assert!(throttler.should_log(&state));
1367 throttler.update(state.clone());
1368
1369 assert!(!throttler.should_log(&state));
1371
1372 std::thread::sleep(Duration::from_millis(15));
1374
1375 assert!(throttler.should_log(&state));
1377 }
1378}