1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26 ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30 subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54 TtlCompactionFilter,
55};
56pub use context::{
57 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60use iceberg_compaction::iceberg_compactor_runner::{
62 IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78 SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::iterator::{Forward, HummockIterator};
101use crate::hummock::{
102 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
103 UnifiedSstableWriterFactory, validate_ssts,
104};
105use crate::monitor::CompactorMetrics;
106
107const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112struct CompactionLogState {
113 running_parallelism: u32,
114 pull_task_ack: bool,
115 pending_pull_task_count: u32,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
120struct IcebergCompactionLogState {
121 running_parallelism: u32,
122 waiting_parallelism: u32,
123 available_parallelism: u32,
124 pull_task_ack: bool,
125 pending_pull_task_count: u32,
126}
127
128struct LogThrottler<T: PartialEq> {
130 last_logged_state: Option<T>,
131 last_heartbeat: Instant,
132 heartbeat_interval: Duration,
133}
134
135impl<T: PartialEq> LogThrottler<T> {
136 fn new(heartbeat_interval: Duration) -> Self {
137 Self {
138 last_logged_state: None,
139 last_heartbeat: Instant::now(),
140 heartbeat_interval,
141 }
142 }
143
144 fn should_log(&self, current_state: &T) -> bool {
146 self.last_logged_state.as_ref() != Some(current_state)
147 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
148 }
149
150 fn update(&mut self, current_state: T) {
152 self.last_logged_state = Some(current_state);
153 self.last_heartbeat = Instant::now();
154 }
155}
156
157pub struct Compactor {
159 context: CompactorContext,
161 object_id_getter: Arc<dyn GetObjectId>,
162 task_config: TaskConfig,
163 options: SstableBuilderOptions,
164 get_id_time: Arc<AtomicU64>,
165}
166
167pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
168
169impl Compactor {
170 pub fn new(
172 context: CompactorContext,
173 options: SstableBuilderOptions,
174 task_config: TaskConfig,
175 object_id_getter: Arc<dyn GetObjectId>,
176 ) -> Self {
177 Self {
178 context,
179 options,
180 task_config,
181 get_id_time: Arc::new(AtomicU64::new(0)),
182 object_id_getter,
183 }
184 }
185
186 async fn compact_key_range(
191 &self,
192 iter: impl HummockIterator<Direction = Forward>,
193 compaction_filter: impl CompactionFilter,
194 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
195 task_progress: Option<Arc<TaskProgress>>,
196 task_id: Option<HummockCompactionTaskId>,
197 split_index: Option<usize>,
198 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
199 let compact_timer = if self.context.is_share_buffer_compact {
201 self.context
202 .compactor_metrics
203 .write_build_l0_sst_duration
204 .start_timer()
205 } else {
206 self.context
207 .compactor_metrics
208 .compact_sst_duration
209 .start_timer()
210 };
211
212 let (split_table_outputs, table_stats_map) = {
213 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
214 if self.task_config.use_block_based_filter {
215 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
216 factory,
217 iter,
218 compaction_filter,
219 compaction_catalog_agent_ref,
220 task_progress.clone(),
221 self.object_id_getter.clone(),
222 )
223 .instrument_await("compact".verbose())
224 .await?
225 } else {
226 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
227 factory,
228 iter,
229 compaction_filter,
230 compaction_catalog_agent_ref,
231 task_progress.clone(),
232 self.object_id_getter.clone(),
233 )
234 .instrument_await("compact".verbose())
235 .await?
236 }
237 };
238
239 compact_timer.observe_duration();
240
241 Self::report_progress(
242 self.context.compactor_metrics.clone(),
243 task_progress,
244 &split_table_outputs,
245 self.context.is_share_buffer_compact,
246 );
247
248 self.context
249 .compactor_metrics
250 .get_table_id_total_time_duration
251 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
252
253 debug_assert!(
254 split_table_outputs
255 .iter()
256 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
257 );
258
259 if task_id.is_some() {
260 tracing::info!(
262 "Finish Task {:?} split_index {:?} sst count {}",
263 task_id,
264 split_index,
265 split_table_outputs.len()
266 );
267 }
268 Ok((split_table_outputs, table_stats_map))
269 }
270
271 pub fn report_progress(
272 metrics: Arc<CompactorMetrics>,
273 task_progress: Option<Arc<TaskProgress>>,
274 ssts: &Vec<LocalSstableInfo>,
275 is_share_buffer_compact: bool,
276 ) {
277 for sst_info in ssts {
278 let sst_size = sst_info.file_size();
279 if let Some(tracker) = &task_progress {
280 tracker.inc_ssts_uploaded();
281 tracker.dec_num_pending_write_io();
282 }
283 if is_share_buffer_compact {
284 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
285 } else {
286 metrics.compaction_upload_sst_counts.inc();
287 }
288 }
289 }
290
291 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
292 &self,
293 writer_factory: F,
294 iter: impl HummockIterator<Direction = Forward>,
295 compaction_filter: impl CompactionFilter,
296 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
297 task_progress: Option<Arc<TaskProgress>>,
298 object_id_getter: Arc<dyn GetObjectId>,
299 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
300 let builder_factory = RemoteBuilderFactory::<F, B> {
301 object_id_getter,
302 limiter: self.context.memory_limiter.clone(),
303 options: self.options.clone(),
304 policy: self.task_config.cache_policy,
305 remote_rpc_cost: self.get_id_time.clone(),
306 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
307 sstable_writer_factory: writer_factory,
308 _phantom: PhantomData,
309 };
310
311 let mut sst_builder = CapacitySplitTableBuilder::new(
312 builder_factory,
313 self.context.compactor_metrics.clone(),
314 task_progress.clone(),
315 self.task_config.table_vnode_partition.clone(),
316 self.context
317 .storage_opts
318 .compactor_concurrent_uploading_sst_count,
319 compaction_catalog_agent_ref,
320 );
321 let compaction_statistics = compact_and_build_sst(
322 &mut sst_builder,
323 &self.task_config,
324 self.context.compactor_metrics.clone(),
325 iter,
326 compaction_filter,
327 )
328 .instrument_await("compact_and_build_sst".verbose())
329 .await?;
330
331 let ssts = sst_builder
332 .finish()
333 .instrument_await("builder_finish".verbose())
334 .await?;
335
336 Ok((ssts, compaction_statistics))
337 }
338}
339
340#[must_use]
343pub fn start_iceberg_compactor(
344 compactor_context: CompactorContext,
345 hummock_meta_client: Arc<dyn HummockMetaClient>,
346) -> (JoinHandle<()>, Sender<()>) {
347 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
348 let stream_retry_interval = Duration::from_secs(30);
349 let periodic_event_update_interval = Duration::from_millis(1000);
350 let worker_num = compactor_context.compaction_executor.worker_num();
351
352 let max_task_parallelism: u32 = (worker_num as f32
353 * compactor_context.storage_opts.compactor_max_task_multiplier)
354 .ceil() as u32;
355
356 const MAX_PULL_TASK_COUNT: u32 = 4;
357 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
358
359 assert_ge!(
360 compactor_context.storage_opts.compactor_max_task_multiplier,
361 0.0
362 );
363
364 let join_handle = tokio::spawn(async move {
365 let pending_parallelism_budget = (max_task_parallelism as f32
367 * compactor_context
368 .storage_opts
369 .iceberg_compaction_pending_parallelism_budget_multiplier)
370 .ceil() as u32;
371 let (mut task_queue, _schedule_notify) =
372 IcebergTaskQueue::new_with_notify(max_task_parallelism, pending_parallelism_budget);
373
374 let shutdown_map = Arc::new(Mutex::new(
376 HashMap::<u64, tokio::sync::oneshot::Sender<()>>::new(),
377 ));
378
379 let (task_completion_tx, mut task_completion_rx) =
381 tokio::sync::mpsc::unbounded_channel::<u64>();
382
383 let mut min_interval = tokio::time::interval(stream_retry_interval);
384 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
385
386 let mut log_throttler =
388 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
389
390 'start_stream: loop {
392 let mut pull_task_ack = true;
395 tokio::select! {
396 _ = min_interval.tick() => {},
398 _ = &mut shutdown_rx => {
400 tracing::info!("Compactor is shutting down");
401 return;
402 }
403 }
404
405 let (request_sender, response_event_stream) = match hummock_meta_client
406 .subscribe_iceberg_compaction_event()
407 .await
408 {
409 Ok((request_sender, response_event_stream)) => {
410 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
411 (request_sender, response_event_stream)
412 }
413
414 Err(e) => {
415 tracing::warn!(
416 error = %e.as_report(),
417 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
418 );
419 continue 'start_stream;
420 }
421 };
422
423 pin_mut!(response_event_stream);
424
425 let _executor = compactor_context.compaction_executor.clone();
426
427 let mut event_loop_iteration_now = Instant::now();
429 'consume_stream: loop {
430 {
431 compactor_context
433 .compactor_metrics
434 .compaction_event_loop_iteration_latency
435 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
436 event_loop_iteration_now = Instant::now();
437 }
438
439 let request_sender = request_sender.clone();
440 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
441 Some(completed_task_id) = task_completion_rx.recv() => {
443 tracing::debug!(task_id = completed_task_id, "Task completed, updating queue state");
444 task_queue.finish_running(completed_task_id);
445 continue 'consume_stream;
446 }
447
448 _ = task_queue.wait_schedulable() => {
450 schedule_queued_tasks(
451 &mut task_queue,
452 &compactor_context,
453 &shutdown_map,
454 &task_completion_tx,
455 );
456 continue 'consume_stream;
457 }
458
459 _ = periodic_event_interval.tick() => {
460 let should_restart_stream = handle_meta_task_pulling(
462 &mut pull_task_ack,
463 &task_queue,
464 max_task_parallelism,
465 max_pull_task_count,
466 &request_sender,
467 &mut log_throttler,
468 );
469
470 if should_restart_stream {
471 continue 'start_stream;
472 }
473 continue;
474 }
475 event = response_event_stream.next() => {
476 event
477 }
478
479 _ = &mut shutdown_rx => {
480 tracing::info!("Iceberg Compactor is shutting down");
481 return
482 }
483 };
484
485 match event {
486 Some(Ok(SubscribeIcebergCompactionEventResponse {
487 event,
488 create_at: _create_at,
489 })) => {
490 let event = match event {
491 Some(event) => event,
492 None => continue 'consume_stream,
493 };
494
495 match event {
496 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
497 let task_id = iceberg_compaction_task.task_id;
498 let write_parquet_properties = WriterProperties::builder()
499 .set_created_by(concat!(
500 "risingwave version ",
501 env!("CARGO_PKG_VERSION")
502 )
503 .to_owned())
504 .set_max_row_group_size(
505 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
506 )
507 .set_compression(Compression::SNAPPY) .build();
509
510 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
511 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
512 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
513 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
514 .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
515 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
516 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
517 .write_parquet_properties(write_parquet_properties)
518 .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
519 .max_task_total_size(
520 compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
521 )
522 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
523 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
524 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
525 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
526 .build() {
527 Ok(config) => config,
528 Err(e) => {
529 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
530 continue 'consume_stream;
531 }
532 };
533
534 let plan_runners = match create_plan_runners(
536 iceberg_compaction_task,
537 compactor_runner_config,
538 compactor_context.compactor_metrics.clone(),
539 ).await {
540 Ok(runners) => runners,
541 Err(e) => {
542 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
543 continue 'consume_stream;
544 }
545 };
546
547 if plan_runners.is_empty() {
548 tracing::info!(task_id, "No plans to execute");
549 continue 'consume_stream;
550 }
551
552 let total_plans = plan_runners.len();
554 let mut enqueued_count = 0;
555
556 for runner in plan_runners {
557 let meta = runner.to_meta();
558 let required_parallelism = runner.required_parallelism();
559 let push_result = task_queue.push(meta.clone(), Some(runner));
560
561 match push_result {
562 PushResult::Added => {
563 enqueued_count += 1;
564 tracing::debug!(
565 task_id = task_id,
566 plan_index = enqueued_count - 1,
567 required_parallelism = required_parallelism,
568 "Iceberg plan runner added to queue"
569 );
570 },
571 PushResult::RejectedCapacity => {
572 tracing::warn!(
573 task_id = task_id,
574 required_parallelism = required_parallelism,
575 pending_budget = pending_parallelism_budget,
576 enqueued_count = enqueued_count,
577 total_plans = total_plans,
578 "Iceberg plan runner rejected - queue capacity exceeded"
579 );
580 break;
582 },
583 PushResult::RejectedTooLarge => {
584 tracing::error!(
585 task_id = task_id,
586 required_parallelism = required_parallelism,
587 max_parallelism = max_task_parallelism,
588 "Iceberg plan runner rejected - parallelism exceeds max"
589 );
590 },
591 PushResult::RejectedInvalidParallelism => {
592 tracing::error!(
593 task_id = task_id,
594 required_parallelism = required_parallelism,
595 "Iceberg plan runner rejected - invalid parallelism"
596 );
597 }
598 }
599 }
600
601 tracing::info!(
602 task_id = task_id,
603 total_plans = total_plans,
604 enqueued_count = enqueued_count,
605 "Enqueued {} of {} Iceberg plan runners",
606 enqueued_count,
607 total_plans
608 );
609 },
610 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
611 pull_task_ack = true;
613 },
614 }
615 }
616 Some(Err(e)) => {
617 tracing::warn!("Failed to consume stream. {}", e.message());
618 continue 'start_stream;
619 }
620 _ => {
621 continue 'start_stream;
623 }
624 }
625 }
626 }
627 });
628
629 (join_handle, shutdown_tx)
630}
631
632#[must_use]
635pub fn start_compactor(
636 compactor_context: CompactorContext,
637 hummock_meta_client: Arc<dyn HummockMetaClient>,
638 object_id_manager: Arc<ObjectIdManager>,
639 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
640) -> (JoinHandle<()>, Sender<()>) {
641 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
642 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
643 let stream_retry_interval = Duration::from_secs(30);
644 let task_progress = compactor_context.task_progress_manager.clone();
645 let periodic_event_update_interval = Duration::from_millis(1000);
646
647 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
648 * compactor_context.storage_opts.compactor_max_task_multiplier)
649 .ceil() as u32;
650 let running_task_parallelism = Arc::new(AtomicU32::new(0));
651
652 const MAX_PULL_TASK_COUNT: u32 = 4;
653 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
654
655 assert_ge!(
656 compactor_context.storage_opts.compactor_max_task_multiplier,
657 0.0
658 );
659
660 let join_handle = tokio::spawn(async move {
661 let shutdown_map = CompactionShutdownMap::default();
662 let mut min_interval = tokio::time::interval(stream_retry_interval);
663 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
664
665 let mut log_throttler =
667 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
668
669 'start_stream: loop {
671 let mut pull_task_ack = true;
674 tokio::select! {
675 _ = min_interval.tick() => {},
677 _ = &mut shutdown_rx => {
679 tracing::info!("Compactor is shutting down");
680 return;
681 }
682 }
683
684 let (request_sender, response_event_stream) =
685 match hummock_meta_client.subscribe_compaction_event().await {
686 Ok((request_sender, response_event_stream)) => {
687 tracing::debug!("Succeeded subscribe_compaction_event.");
688 (request_sender, response_event_stream)
689 }
690
691 Err(e) => {
692 tracing::warn!(
693 error = %e.as_report(),
694 "Subscribing to compaction tasks failed with error. Will retry.",
695 );
696 continue 'start_stream;
697 }
698 };
699
700 pin_mut!(response_event_stream);
701
702 let executor = compactor_context.compaction_executor.clone();
703 let object_id_manager = object_id_manager.clone();
704
705 let mut event_loop_iteration_now = Instant::now();
707 'consume_stream: loop {
708 {
709 compactor_context
711 .compactor_metrics
712 .compaction_event_loop_iteration_latency
713 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
714 event_loop_iteration_now = Instant::now();
715 }
716
717 let running_task_parallelism = running_task_parallelism.clone();
718 let request_sender = request_sender.clone();
719 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
720 _ = periodic_event_interval.tick() => {
721 let progress_list = get_task_progress(task_progress.clone());
722
723 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
724 event: Some(RequestEvent::HeartBeat(
725 HeartBeat {
726 progress: progress_list
727 }
728 )),
729 create_at: SystemTime::now()
730 .duration_since(std::time::UNIX_EPOCH)
731 .expect("Clock may have gone backwards")
732 .as_millis() as u64,
733 }) {
734 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
735 continue 'start_stream;
737 }
738
739
740 let mut pending_pull_task_count = 0;
741 if pull_task_ack {
742 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
744
745 if pending_pull_task_count > 0 {
746 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
747 event: Some(RequestEvent::PullTask(
748 PullTask {
749 pull_task_count: pending_pull_task_count,
750 }
751 )),
752 create_at: SystemTime::now()
753 .duration_since(std::time::UNIX_EPOCH)
754 .expect("Clock may have gone backwards")
755 .as_millis() as u64,
756 }) {
757 tracing::warn!(error = %e.as_report(), "Failed to pull task");
758
759 continue 'start_stream;
761 } else {
762 pull_task_ack = false;
763 }
764 }
765 }
766
767 let running_count = running_task_parallelism.load(Ordering::SeqCst);
768 let current_state = CompactionLogState {
769 running_parallelism: running_count,
770 pull_task_ack,
771 pending_pull_task_count,
772 };
773
774 if log_throttler.should_log(¤t_state) {
776 tracing::info!(
777 running_parallelism_count = %current_state.running_parallelism,
778 pull_task_ack = %current_state.pull_task_ack,
779 pending_pull_task_count = %current_state.pending_pull_task_count
780 );
781 log_throttler.update(current_state);
782 }
783
784 continue;
785 }
786 event = response_event_stream.next() => {
787 event
788 }
789
790 _ = &mut shutdown_rx => {
791 tracing::info!("Compactor is shutting down");
792 return
793 }
794 };
795
796 fn send_report_task_event(
797 compact_task: &CompactTask,
798 table_stats: TableStatsMap,
799 object_timestamps: HashMap<HummockSstableObjectId, u64>,
800 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
801 ) {
802 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
803 event: Some(RequestEvent::ReportTask(ReportTask {
804 task_id: compact_task.task_id,
805 task_status: compact_task.task_status.into(),
806 sorted_output_ssts: compact_task
807 .sorted_output_ssts
808 .iter()
809 .map(|sst| sst.into())
810 .collect(),
811 table_stats_change: to_prost_table_stats_map(table_stats),
812 object_timestamps: object_timestamps
813 .into_iter()
814 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
815 .collect(),
816 })),
817 create_at: SystemTime::now()
818 .duration_since(std::time::UNIX_EPOCH)
819 .expect("Clock may have gone backwards")
820 .as_millis() as u64,
821 }) {
822 let task_id = compact_task.task_id;
823 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
824 }
825 }
826
827 match event {
828 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
829 let event = match event {
830 Some(event) => event,
831 None => continue 'consume_stream,
832 };
833 let shutdown = shutdown_map.clone();
834 let context = compactor_context.clone();
835 let consumed_latency_ms = SystemTime::now()
836 .duration_since(std::time::UNIX_EPOCH)
837 .expect("Clock may have gone backwards")
838 .as_millis() as u64
839 - create_at;
840 context
841 .compactor_metrics
842 .compaction_event_consumed_latency
843 .observe(consumed_latency_ms as _);
844
845 let object_id_manager = object_id_manager.clone();
846 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
847
848 match event {
849 ResponseEvent::CompactTask(compact_task) => {
850 let compact_task = CompactTask::from(compact_task);
851 let parallelism =
852 calculate_task_parallelism(&compact_task, &context);
853
854 assert_ne!(parallelism, 0, "splits cannot be empty");
855
856 if (max_task_parallelism
857 - running_task_parallelism.load(Ordering::SeqCst))
858 < parallelism as u32
859 {
860 tracing::warn!(
861 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
862 compact_task.task_id,
863 parallelism,
864 max_task_parallelism,
865 running_task_parallelism.load(Ordering::Relaxed),
866 );
867 let (compact_task, table_stats, object_timestamps) =
868 compact_done(
869 compact_task,
870 context.clone(),
871 vec![],
872 TaskStatus::NoAvailCpuResourceCanceled,
873 );
874
875 send_report_task_event(
876 &compact_task,
877 table_stats,
878 object_timestamps,
879 &request_sender,
880 );
881
882 continue 'consume_stream;
883 }
884
885 running_task_parallelism
886 .fetch_add(parallelism as u32, Ordering::SeqCst);
887 executor.spawn(async move {
888 let (tx, rx) = tokio::sync::oneshot::channel();
889 let task_id = compact_task.task_id;
890 shutdown.lock().unwrap().insert(task_id, tx);
891
892 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
893 context.clone(),
894 compact_task,
895 rx,
896 object_id_manager.clone(),
897 compaction_catalog_manager_ref.clone(),
898 )
899 .await;
900
901 shutdown.lock().unwrap().remove(&task_id);
902 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
903
904 send_report_task_event(
905 &compact_task,
906 table_stats,
907 object_timestamps,
908 &request_sender,
909 );
910
911 let enable_check_compaction_result =
912 context.storage_opts.check_compaction_result;
913 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
914
915 if enable_check_compaction_result && need_check_task {
916 let compact_table_ids = compact_task.build_compact_table_ids();
917 match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
918 Ok(compaction_catalog_agent_ref) => {
919 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
920 {
921 Err(e) => {
922 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
923 }
924 Ok(true) => (),
925 Ok(false) => {
926 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
927 }
928 }
929 },
930 Err(e) => {
931 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
932 }
933 }
934 }
935 });
936 }
937 ResponseEvent::VacuumTask(_) => {
938 unreachable!("unexpected vacuum task");
939 }
940 ResponseEvent::FullScanTask(_) => {
941 unreachable!("unexpected scan task");
942 }
943 ResponseEvent::ValidationTask(validation_task) => {
944 let validation_task = ValidationTask::from(validation_task);
945 executor.spawn(async move {
946 validate_ssts(validation_task, context.sstable_store.clone())
947 .await;
948 });
949 }
950 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
951 .lock()
952 .unwrap()
953 .remove(&cancel_compact_task.task_id)
954 {
955 Some(tx) => {
956 if tx.send(()).is_err() {
957 tracing::warn!(
958 "Cancellation of compaction task failed. task_id: {}",
959 cancel_compact_task.task_id
960 );
961 }
962 }
963 _ => {
964 tracing::warn!(
965 "Attempting to cancel non-existent compaction task. task_id: {}",
966 cancel_compact_task.task_id
967 );
968 }
969 },
970
971 ResponseEvent::PullTaskAck(_pull_task_ack) => {
972 pull_task_ack = true;
974 }
975 }
976 }
977 Some(Err(e)) => {
978 tracing::warn!("Failed to consume stream. {}", e.message());
979 continue 'start_stream;
980 }
981 _ => {
982 continue 'start_stream;
984 }
985 }
986 }
987 }
988 });
989
990 (join_handle, shutdown_tx)
991}
992
993#[must_use]
996pub fn start_shared_compactor(
997 grpc_proxy_client: GrpcCompactorProxyClient,
998 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
999 context: CompactorContext,
1000) -> (JoinHandle<()>, Sender<()>) {
1001 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1002 let task_progress = context.task_progress_manager.clone();
1003 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1004 let periodic_event_update_interval = Duration::from_millis(1000);
1005
1006 let join_handle = tokio::spawn(async move {
1007 let shutdown_map = CompactionShutdownMap::default();
1008
1009 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1010 let executor = context.compaction_executor.clone();
1011 let report_heartbeat_client = grpc_proxy_client.clone();
1012 'consume_stream: loop {
1013 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1014 _ = periodic_event_interval.tick() => {
1015 let progress_list = get_task_progress(task_progress.clone());
1016 let report_compaction_task_request = ReportCompactionTaskRequest{
1017 event: Some(ReportCompactionTaskEvent::HeartBeat(
1018 SharedHeartBeat {
1019 progress: progress_list
1020 }
1021 )),
1022 };
1023 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1024 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1025 }
1026 continue
1027 }
1028
1029
1030 _ = &mut shutdown_rx => {
1031 tracing::info!("Compactor is shutting down");
1032 return
1033 }
1034
1035 request = receiver.recv() => {
1036 request
1037 }
1038
1039 };
1040 match request {
1041 Some(request) => {
1042 let context = context.clone();
1043 let shutdown = shutdown_map.clone();
1044
1045 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1046 executor.spawn(async move {
1047 let DispatchCompactionTaskRequest {
1048 tables,
1049 output_object_ids,
1050 task: dispatch_task,
1051 } = request.into_inner();
1052 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1053 acc.insert(table.id, table);
1054 acc
1055 });
1056
1057 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1058 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1059 let shared_compactor_object_id_manager =
1060 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1061 match dispatch_task.unwrap() {
1062 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1063 let compact_task = CompactTask::from(&compact_task);
1064 let (tx, rx) = tokio::sync::oneshot::channel();
1065 let task_id = compact_task.task_id;
1066 shutdown.lock().unwrap().insert(task_id, tx);
1067
1068 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1069 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1070 context.clone(),
1071 compact_task,
1072 rx,
1073 shared_compactor_object_id_manager,
1074 compaction_catalog_agent_ref.clone(),
1075 )
1076 .await;
1077 shutdown.lock().unwrap().remove(&task_id);
1078 let report_compaction_task_request = ReportCompactionTaskRequest {
1079 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1080 compact_task: Some(PbCompactTask::from(&compact_task)),
1081 table_stats_change: to_prost_table_stats_map(table_stats),
1082 object_timestamps: object_timestamps
1083 .into_iter()
1084 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1085 .collect(),
1086 })),
1087 };
1088
1089 match cloned_grpc_proxy_client
1090 .report_compaction_task(report_compaction_task_request)
1091 .await
1092 {
1093 Ok(_) => {
1094 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1096 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1097 if enable_check_compaction_result && need_check_task {
1098 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1099 Err(e) => {
1100 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1101 },
1102 Ok(true) => (),
1103 Ok(false) => {
1104 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1105 }
1106 }
1107 }
1108 }
1109 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1110 }
1111
1112 }
1113 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1114 unreachable!("unexpected vacuum task");
1115 }
1116 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1117 unreachable!("unexpected scan task");
1118 }
1119 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1120 let validation_task = ValidationTask::from(validation_task);
1121 validate_ssts(validation_task, context.sstable_store.clone()).await;
1122 }
1123 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1124 match shutdown
1125 .lock()
1126 .unwrap()
1127 .remove(&cancel_compact_task.task_id)
1128 { Some(tx) => {
1129 if tx.send(()).is_err() {
1130 tracing::warn!(
1131 "Cancellation of compaction task failed. task_id: {}",
1132 cancel_compact_task.task_id
1133 );
1134 }
1135 } _ => {
1136 tracing::warn!(
1137 "Attempting to cancel non-existent compaction task. task_id: {}",
1138 cancel_compact_task.task_id
1139 );
1140 }}
1141 }
1142 }
1143 });
1144 }
1145 None => continue 'consume_stream,
1146 }
1147 }
1148 });
1149 (join_handle, shutdown_tx)
1150}
1151
1152fn get_task_progress(
1153 task_progress: Arc<
1154 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1155 >,
1156) -> Vec<CompactTaskProgress> {
1157 let mut progress_list = Vec::new();
1158 for (&task_id, progress) in &*task_progress.lock() {
1159 progress_list.push(progress.snapshot(task_id));
1160 }
1161 progress_list
1162}
1163
1164fn schedule_queued_tasks(
1166 task_queue: &mut IcebergTaskQueue,
1167 compactor_context: &CompactorContext,
1168 shutdown_map: &Arc<Mutex<HashMap<u64, tokio::sync::oneshot::Sender<()>>>>,
1169 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<u64>,
1170) {
1171 while let Some(popped_task) = task_queue.pop() {
1172 let task_id = popped_task.meta.task_id;
1173
1174 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1176
1177 let Some(runner) = popped_task.runner else {
1178 tracing::error!(
1179 task_id = task_id,
1180 "Popped task missing runner - this should not happen"
1181 );
1182 task_queue.finish_running(task_id);
1183 continue;
1184 };
1185
1186 let executor = compactor_context.compaction_executor.clone();
1187 let shutdown_map_clone = shutdown_map.clone();
1188 let completion_tx_clone = task_completion_tx.clone();
1189
1190 tracing::info!(
1191 task_id = task_id,
1192 unique_ident = ?unique_ident,
1193 required_parallelism = popped_task.meta.required_parallelism,
1194 "Starting iceberg compaction task from queue"
1195 );
1196
1197 executor.spawn(async move {
1198 let (tx, rx) = tokio::sync::oneshot::channel();
1199 {
1200 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1201 shutdown_guard.insert(task_id, tx);
1202 }
1203
1204 let _cleanup_guard = scopeguard::guard(
1205 (task_id, shutdown_map_clone, completion_tx_clone),
1206 move |(task_id, shutdown_map, completion_tx)| {
1207 {
1208 let mut shutdown_guard = shutdown_map.lock().unwrap();
1209 shutdown_guard.remove(&task_id);
1210 }
1211 if completion_tx.send(task_id).is_err() {
1214 tracing::warn!(task_id = task_id, "Failed to notify task completion - main loop may have shut down");
1215 }
1216 },
1217 );
1218
1219 if let Err(e) = runner.compact(rx).await {
1220 tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
1221 }
1222 });
1223 }
1224}
1225
1226fn handle_meta_task_pulling(
1229 pull_task_ack: &mut bool,
1230 task_queue: &IcebergTaskQueue,
1231 max_task_parallelism: u32,
1232 max_pull_task_count: u32,
1233 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1234 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1235) -> bool {
1236 let mut pending_pull_task_count = 0;
1237 if *pull_task_ack {
1238 let current_running_parallelism = task_queue.running_parallelism_sum();
1240 pending_pull_task_count =
1241 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1242
1243 if pending_pull_task_count > 0 {
1244 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1245 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1246 subscribe_iceberg_compaction_event_request::PullTask {
1247 pull_task_count: pending_pull_task_count,
1248 },
1249 )),
1250 create_at: SystemTime::now()
1251 .duration_since(std::time::UNIX_EPOCH)
1252 .expect("Clock may have gone backwards")
1253 .as_millis() as u64,
1254 }) {
1255 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1256 return true; } else {
1258 *pull_task_ack = false;
1259 }
1260 }
1261 }
1262
1263 let running_count = task_queue.running_parallelism_sum();
1264 let waiting_count = task_queue.waiting_parallelism_sum();
1265 let available_count = max_task_parallelism.saturating_sub(running_count);
1266 let current_state = IcebergCompactionLogState {
1267 running_parallelism: running_count,
1268 waiting_parallelism: waiting_count,
1269 available_parallelism: available_count,
1270 pull_task_ack: *pull_task_ack,
1271 pending_pull_task_count,
1272 };
1273
1274 if log_throttler.should_log(¤t_state) {
1276 tracing::info!(
1277 running_parallelism_count = %current_state.running_parallelism,
1278 waiting_parallelism_count = %current_state.waiting_parallelism,
1279 available_parallelism = %current_state.available_parallelism,
1280 pull_task_ack = %current_state.pull_task_ack,
1281 pending_pull_task_count = %current_state.pending_pull_task_count
1282 );
1283 log_throttler.update(current_state);
1284 }
1285
1286 false }
1288
1289#[cfg(test)]
1290mod tests {
1291 use super::*;
1292
1293 #[test]
1294 fn test_log_state_equality() {
1295 let state1 = CompactionLogState {
1297 running_parallelism: 10,
1298 pull_task_ack: true,
1299 pending_pull_task_count: 2,
1300 };
1301 let state2 = CompactionLogState {
1302 running_parallelism: 10,
1303 pull_task_ack: true,
1304 pending_pull_task_count: 2,
1305 };
1306 let state3 = CompactionLogState {
1307 running_parallelism: 11,
1308 pull_task_ack: true,
1309 pending_pull_task_count: 2,
1310 };
1311 assert_eq!(state1, state2);
1312 assert_ne!(state1, state3);
1313
1314 let ice_state1 = IcebergCompactionLogState {
1316 running_parallelism: 10,
1317 waiting_parallelism: 5,
1318 available_parallelism: 15,
1319 pull_task_ack: true,
1320 pending_pull_task_count: 2,
1321 };
1322 let ice_state2 = IcebergCompactionLogState {
1323 running_parallelism: 10,
1324 waiting_parallelism: 6,
1325 available_parallelism: 15,
1326 pull_task_ack: true,
1327 pending_pull_task_count: 2,
1328 };
1329 assert_ne!(ice_state1, ice_state2);
1330 }
1331
1332 #[test]
1333 fn test_log_throttler_state_change_detection() {
1334 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1335 let state1 = CompactionLogState {
1336 running_parallelism: 10,
1337 pull_task_ack: true,
1338 pending_pull_task_count: 2,
1339 };
1340 let state2 = CompactionLogState {
1341 running_parallelism: 11,
1342 pull_task_ack: true,
1343 pending_pull_task_count: 2,
1344 };
1345
1346 assert!(throttler.should_log(&state1));
1348 throttler.update(state1.clone());
1349
1350 assert!(!throttler.should_log(&state1));
1352
1353 assert!(throttler.should_log(&state2));
1355 throttler.update(state2.clone());
1356
1357 assert!(!throttler.should_log(&state2));
1359 }
1360
1361 #[test]
1362 fn test_log_throttler_heartbeat() {
1363 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1364 let state = CompactionLogState {
1365 running_parallelism: 10,
1366 pull_task_ack: true,
1367 pending_pull_task_count: 2,
1368 };
1369
1370 assert!(throttler.should_log(&state));
1372 throttler.update(state.clone());
1373
1374 assert!(!throttler.should_log(&state));
1376
1377 std::thread::sleep(Duration::from_millis(15));
1379
1380 assert!(throttler.should_log(&state));
1382 }
1383}