1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::{HashMap, VecDeque};
43use std::marker::PhantomData;
44use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
45use std::sync::{Arc, Mutex};
46use std::time::{Duration, SystemTime};
47
48use await_tree::{InstrumentAwait, SpanExt};
49pub use compaction_executor::CompactionExecutor;
50pub use compaction_filter::{
51 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
52 TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::{
60 IcebergCompactorRunnerConfigBuilder, create_plan_runners,
61};
62use iceberg_compaction::{IcebergTaskQueue, PushResult};
63pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
64use more_asserts::assert_ge;
65use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
66use risingwave_hummock_sdk::{
67 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
68};
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::subscribe_compaction_event_request::{
71 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
72};
73use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
74use risingwave_pb::hummock::{
75 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
76 SubscribeCompactionEventResponse,
77};
78use risingwave_rpc_client::HummockMetaClient;
79pub use shared_buffer_compact::{compact, merge_imms_in_memory};
80use tokio::sync::oneshot::Sender;
81use tokio::task::JoinHandle;
82use tokio::time::Instant;
83
84pub use self::compaction_utils::{
85 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
86 check_flush_result,
87};
88pub use self::task_progress::TaskProgress;
89use super::multi_builder::CapacitySplitTableBuilder;
90use super::{
91 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
92};
93use crate::compaction_catalog_manager::{
94 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
95};
96use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
97use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
98use crate::hummock::compactor::iceberg_compaction::TaskKey;
99use crate::hummock::iterator::{Forward, HummockIterator};
100use crate::hummock::{
101 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
102 UnifiedSstableWriterFactory, validate_ssts,
103};
104use crate::monitor::CompactorMetrics;
105
106const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
108
109#[derive(Debug, Clone, PartialEq, Eq)]
111struct CompactionLogState {
112 running_parallelism: u32,
113 pull_task_ack: bool,
114 pending_pull_task_count: u32,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119struct IcebergCompactionLogState {
120 running_parallelism: u32,
121 waiting_parallelism: u32,
122 available_parallelism: u32,
123 pull_task_ack: bool,
124 pending_pull_task_count: u32,
125}
126
127struct LogThrottler<T: PartialEq> {
129 last_logged_state: Option<T>,
130 last_heartbeat: Instant,
131 heartbeat_interval: Duration,
132}
133
134impl<T: PartialEq> LogThrottler<T> {
135 fn new(heartbeat_interval: Duration) -> Self {
136 Self {
137 last_logged_state: None,
138 last_heartbeat: Instant::now(),
139 heartbeat_interval,
140 }
141 }
142
143 fn should_log(&self, current_state: &T) -> bool {
145 self.last_logged_state.as_ref() != Some(current_state)
146 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
147 }
148
149 fn update(&mut self, current_state: T) {
151 self.last_logged_state = Some(current_state);
152 self.last_heartbeat = Instant::now();
153 }
154}
155
156pub struct Compactor {
158 context: CompactorContext,
160 object_id_getter: Arc<dyn GetObjectId>,
161 task_config: TaskConfig,
162 options: SstableBuilderOptions,
163 get_id_time: Arc<AtomicU64>,
164}
165
166pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
167
168impl Compactor {
169 pub fn new(
171 context: CompactorContext,
172 options: SstableBuilderOptions,
173 task_config: TaskConfig,
174 object_id_getter: Arc<dyn GetObjectId>,
175 ) -> Self {
176 Self {
177 context,
178 options,
179 task_config,
180 get_id_time: Arc::new(AtomicU64::new(0)),
181 object_id_getter,
182 }
183 }
184
185 async fn compact_key_range(
190 &self,
191 iter: impl HummockIterator<Direction = Forward>,
192 compaction_filter: impl CompactionFilter,
193 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
194 task_progress: Option<Arc<TaskProgress>>,
195 task_id: Option<HummockCompactionTaskId>,
196 split_index: Option<usize>,
197 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
198 let compact_timer = if self.context.is_share_buffer_compact {
200 self.context
201 .compactor_metrics
202 .write_build_l0_sst_duration
203 .start_timer()
204 } else {
205 self.context
206 .compactor_metrics
207 .compact_sst_duration
208 .start_timer()
209 };
210
211 let (split_table_outputs, table_stats_map) = {
212 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
213 if self.task_config.use_block_based_filter {
214 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
215 factory,
216 iter,
217 compaction_filter,
218 compaction_catalog_agent_ref,
219 task_progress.clone(),
220 self.object_id_getter.clone(),
221 )
222 .instrument_await("compact".verbose())
223 .await?
224 } else {
225 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
226 factory,
227 iter,
228 compaction_filter,
229 compaction_catalog_agent_ref,
230 task_progress.clone(),
231 self.object_id_getter.clone(),
232 )
233 .instrument_await("compact".verbose())
234 .await?
235 }
236 };
237
238 compact_timer.observe_duration();
239
240 Self::report_progress(
241 self.context.compactor_metrics.clone(),
242 task_progress,
243 &split_table_outputs,
244 self.context.is_share_buffer_compact,
245 );
246
247 self.context
248 .compactor_metrics
249 .get_table_id_total_time_duration
250 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
251
252 debug_assert!(
253 split_table_outputs
254 .iter()
255 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
256 );
257
258 if task_id.is_some() {
259 tracing::info!(
261 "Finish Task {:?} split_index {:?} sst count {}",
262 task_id,
263 split_index,
264 split_table_outputs.len()
265 );
266 }
267 Ok((split_table_outputs, table_stats_map))
268 }
269
270 pub fn report_progress(
271 metrics: Arc<CompactorMetrics>,
272 task_progress: Option<Arc<TaskProgress>>,
273 ssts: &Vec<LocalSstableInfo>,
274 is_share_buffer_compact: bool,
275 ) {
276 for sst_info in ssts {
277 let sst_size = sst_info.file_size();
278 if let Some(tracker) = &task_progress {
279 tracker.inc_ssts_uploaded();
280 tracker.dec_num_pending_write_io();
281 }
282 if is_share_buffer_compact {
283 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
284 } else {
285 metrics.compaction_upload_sst_counts.inc();
286 }
287 }
288 }
289
290 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
291 &self,
292 writer_factory: F,
293 iter: impl HummockIterator<Direction = Forward>,
294 compaction_filter: impl CompactionFilter,
295 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
296 task_progress: Option<Arc<TaskProgress>>,
297 object_id_getter: Arc<dyn GetObjectId>,
298 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
299 let builder_factory = RemoteBuilderFactory::<F, B> {
300 object_id_getter,
301 limiter: self.context.memory_limiter.clone(),
302 options: self.options.clone(),
303 policy: self.task_config.cache_policy,
304 remote_rpc_cost: self.get_id_time.clone(),
305 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
306 sstable_writer_factory: writer_factory,
307 _phantom: PhantomData,
308 };
309
310 let mut sst_builder = CapacitySplitTableBuilder::new(
311 builder_factory,
312 self.context.compactor_metrics.clone(),
313 task_progress.clone(),
314 self.task_config.table_vnode_partition.clone(),
315 self.context
316 .storage_opts
317 .compactor_concurrent_uploading_sst_count,
318 compaction_catalog_agent_ref,
319 );
320 let compaction_statistics = compact_and_build_sst(
321 &mut sst_builder,
322 &self.task_config,
323 self.context.compactor_metrics.clone(),
324 iter,
325 compaction_filter,
326 )
327 .instrument_await("compact_and_build_sst".verbose())
328 .await?;
329
330 let ssts = sst_builder
331 .finish()
332 .instrument_await("builder_finish".verbose())
333 .await?;
334
335 Ok((ssts, compaction_statistics))
336 }
337}
338
339#[must_use]
342pub fn start_iceberg_compactor(
343 compactor_context: CompactorContext,
344 hummock_meta_client: Arc<dyn HummockMetaClient>,
345) -> (JoinHandle<()>, Sender<()>) {
346 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
347 let stream_retry_interval = Duration::from_secs(30);
348 let periodic_event_update_interval = Duration::from_millis(1000);
349 let worker_num = compactor_context.compaction_executor.worker_num();
350
351 let max_task_parallelism: u32 = (worker_num as f32
352 * compactor_context.storage_opts.compactor_max_task_multiplier)
353 .ceil() as u32;
354
355 const MAX_PULL_TASK_COUNT: u32 = 4;
356 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
357
358 assert_ge!(
359 compactor_context.storage_opts.compactor_max_task_multiplier,
360 0.0
361 );
362
363 let join_handle = tokio::spawn(async move {
364 let pending_parallelism_budget = (max_task_parallelism as f32
366 * compactor_context
367 .storage_opts
368 .iceberg_compaction_pending_parallelism_budget_multiplier)
369 .ceil() as u32;
370 let mut task_queue =
371 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
372
373 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
375
376 let (task_completion_tx, mut task_completion_rx) =
378 tokio::sync::mpsc::unbounded_channel::<TaskKey>();
379
380 let mut min_interval = tokio::time::interval(stream_retry_interval);
381 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
382
383 let mut log_throttler =
385 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
386
387 'start_stream: loop {
389 let mut pull_task_ack = true;
392 tokio::select! {
393 _ = min_interval.tick() => {},
395 _ = &mut shutdown_rx => {
397 tracing::info!("Compactor is shutting down");
398 return;
399 }
400 }
401
402 let (request_sender, response_event_stream) = match hummock_meta_client
403 .subscribe_iceberg_compaction_event()
404 .await
405 {
406 Ok((request_sender, response_event_stream)) => {
407 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
408 (request_sender, response_event_stream)
409 }
410
411 Err(e) => {
412 tracing::warn!(
413 error = %e.as_report(),
414 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
415 );
416 continue 'start_stream;
417 }
418 };
419
420 pin_mut!(response_event_stream);
421
422 let _executor = compactor_context.compaction_executor.clone();
423
424 let mut event_loop_iteration_now = Instant::now();
426 'consume_stream: loop {
427 {
428 compactor_context
430 .compactor_metrics
431 .compaction_event_loop_iteration_latency
432 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
433 event_loop_iteration_now = Instant::now();
434 }
435
436 let request_sender = request_sender.clone();
437 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
438 Some(completed_task_key) = task_completion_rx.recv() => {
440 tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
441 task_queue.finish_running(completed_task_key);
442 continue 'consume_stream;
443 }
444
445 _ = task_queue.wait_schedulable() => {
447 schedule_queued_tasks(
448 &mut task_queue,
449 &compactor_context,
450 &shutdown_map,
451 &task_completion_tx,
452 );
453 continue 'consume_stream;
454 }
455
456 _ = periodic_event_interval.tick() => {
457 let should_restart_stream = handle_meta_task_pulling(
459 &mut pull_task_ack,
460 &task_queue,
461 max_task_parallelism,
462 max_pull_task_count,
463 &request_sender,
464 &mut log_throttler,
465 );
466
467 if should_restart_stream {
468 continue 'start_stream;
469 }
470 continue;
471 }
472 event = response_event_stream.next() => {
473 event
474 }
475
476 _ = &mut shutdown_rx => {
477 tracing::info!("Iceberg Compactor is shutting down");
478 return
479 }
480 };
481
482 match event {
483 Some(Ok(SubscribeIcebergCompactionEventResponse {
484 event,
485 create_at: _create_at,
486 })) => {
487 let event = match event {
488 Some(event) => event,
489 None => continue 'consume_stream,
490 };
491
492 match event {
493 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
494 let task_id = iceberg_compaction_task.task_id;
495 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
497 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
498 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
499 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
500 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
501 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
502 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
503 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
504 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
505 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
506 .target_binpack_group_size_mb(
507 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
508 )
509 .min_group_size_mb(
510 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
511 )
512 .min_group_file_count(
513 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
514 )
515 .build() {
516 Ok(config) => config,
517 Err(e) => {
518 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
519 continue 'consume_stream;
520 }
521 };
522
523 let plan_runners = match create_plan_runners(
525 iceberg_compaction_task,
526 compactor_runner_config,
527 compactor_context.compactor_metrics.clone(),
528 ).await {
529 Ok(runners) => runners,
530 Err(e) => {
531 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
532 continue 'consume_stream;
533 }
534 };
535
536 if plan_runners.is_empty() {
537 tracing::info!(task_id, "No plans to execute");
538 continue 'consume_stream;
539 }
540
541 let total_plans = plan_runners.len();
543 let mut enqueued_count = 0;
544
545 for runner in plan_runners {
546 let meta = runner.to_meta();
547 let required_parallelism = runner.required_parallelism();
548 let push_result = task_queue.push(meta.clone(), Some(runner));
549
550 match push_result {
551 PushResult::Added => {
552 enqueued_count += 1;
553 tracing::debug!(
554 task_id = task_id,
555 plan_index = enqueued_count - 1,
556 required_parallelism = required_parallelism,
557 "Iceberg plan runner added to queue"
558 );
559 },
560 PushResult::RejectedCapacity => {
561 tracing::warn!(
562 task_id = task_id,
563 required_parallelism = required_parallelism,
564 pending_budget = pending_parallelism_budget,
565 enqueued_count = enqueued_count,
566 total_plans = total_plans,
567 "Iceberg plan runner rejected - queue capacity exceeded"
568 );
569 break;
571 },
572 PushResult::RejectedTooLarge => {
573 tracing::error!(
574 task_id = task_id,
575 required_parallelism = required_parallelism,
576 max_parallelism = max_task_parallelism,
577 "Iceberg plan runner rejected - parallelism exceeds max"
578 );
579 },
580 PushResult::RejectedInvalidParallelism => {
581 tracing::error!(
582 task_id = task_id,
583 required_parallelism = required_parallelism,
584 "Iceberg plan runner rejected - invalid parallelism"
585 );
586 },
587 PushResult::RejectedDuplicate => {
588 tracing::error!(
589 task_id = task_id,
590 plan_index = meta.plan_index,
591 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
592 );
593 }
594 }
595 }
596
597 tracing::info!(
598 task_id = task_id,
599 total_plans = total_plans,
600 enqueued_count = enqueued_count,
601 "Enqueued {} of {} Iceberg plan runners",
602 enqueued_count,
603 total_plans
604 );
605 },
606 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
607 pull_task_ack = true;
609 },
610 }
611 }
612 Some(Err(e)) => {
613 tracing::warn!("Failed to consume stream. {}", e.message());
614 continue 'start_stream;
615 }
616 _ => {
617 continue 'start_stream;
619 }
620 }
621 }
622 }
623 });
624
625 (join_handle, shutdown_tx)
626}
627
628#[must_use]
631pub fn start_compactor(
632 compactor_context: CompactorContext,
633 hummock_meta_client: Arc<dyn HummockMetaClient>,
634 object_id_manager: Arc<ObjectIdManager>,
635 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
636) -> (JoinHandle<()>, Sender<()>) {
637 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
638 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
639 let stream_retry_interval = Duration::from_secs(30);
640 let task_progress = compactor_context.task_progress_manager.clone();
641 let periodic_event_update_interval = Duration::from_millis(1000);
642
643 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
644 * compactor_context.storage_opts.compactor_max_task_multiplier)
645 .ceil() as u32;
646 let running_task_parallelism = Arc::new(AtomicU32::new(0));
647
648 const MAX_PULL_TASK_COUNT: u32 = 4;
649 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
650
651 assert_ge!(
652 compactor_context.storage_opts.compactor_max_task_multiplier,
653 0.0
654 );
655
656 let join_handle = tokio::spawn(async move {
657 let shutdown_map = CompactionShutdownMap::default();
658 let mut min_interval = tokio::time::interval(stream_retry_interval);
659 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
660
661 let mut log_throttler =
663 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
664
665 'start_stream: loop {
667 let mut pull_task_ack = true;
670 tokio::select! {
671 _ = min_interval.tick() => {},
673 _ = &mut shutdown_rx => {
675 tracing::info!("Compactor is shutting down");
676 return;
677 }
678 }
679
680 let (request_sender, response_event_stream) =
681 match hummock_meta_client.subscribe_compaction_event().await {
682 Ok((request_sender, response_event_stream)) => {
683 tracing::debug!("Succeeded subscribe_compaction_event.");
684 (request_sender, response_event_stream)
685 }
686
687 Err(e) => {
688 tracing::warn!(
689 error = %e.as_report(),
690 "Subscribing to compaction tasks failed with error. Will retry.",
691 );
692 continue 'start_stream;
693 }
694 };
695
696 pin_mut!(response_event_stream);
697
698 let executor = compactor_context.compaction_executor.clone();
699 let object_id_manager = object_id_manager.clone();
700
701 let mut event_loop_iteration_now = Instant::now();
703 'consume_stream: loop {
704 {
705 compactor_context
707 .compactor_metrics
708 .compaction_event_loop_iteration_latency
709 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
710 event_loop_iteration_now = Instant::now();
711 }
712
713 let running_task_parallelism = running_task_parallelism.clone();
714 let request_sender = request_sender.clone();
715 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
716 _ = periodic_event_interval.tick() => {
717 let progress_list = get_task_progress(task_progress.clone());
718
719 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
720 event: Some(RequestEvent::HeartBeat(
721 HeartBeat {
722 progress: progress_list
723 }
724 )),
725 create_at: SystemTime::now()
726 .duration_since(std::time::UNIX_EPOCH)
727 .expect("Clock may have gone backwards")
728 .as_millis() as u64,
729 }) {
730 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
731 continue 'start_stream;
733 }
734
735
736 let mut pending_pull_task_count = 0;
737 if pull_task_ack {
738 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
740
741 if pending_pull_task_count > 0 {
742 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
743 event: Some(RequestEvent::PullTask(
744 PullTask {
745 pull_task_count: pending_pull_task_count,
746 }
747 )),
748 create_at: SystemTime::now()
749 .duration_since(std::time::UNIX_EPOCH)
750 .expect("Clock may have gone backwards")
751 .as_millis() as u64,
752 }) {
753 tracing::warn!(error = %e.as_report(), "Failed to pull task");
754
755 continue 'start_stream;
757 } else {
758 pull_task_ack = false;
759 }
760 }
761 }
762
763 let running_count = running_task_parallelism.load(Ordering::SeqCst);
764 let current_state = CompactionLogState {
765 running_parallelism: running_count,
766 pull_task_ack,
767 pending_pull_task_count,
768 };
769
770 if log_throttler.should_log(¤t_state) {
772 tracing::info!(
773 running_parallelism_count = %current_state.running_parallelism,
774 pull_task_ack = %current_state.pull_task_ack,
775 pending_pull_task_count = %current_state.pending_pull_task_count
776 );
777 log_throttler.update(current_state);
778 }
779
780 continue;
781 }
782 event = response_event_stream.next() => {
783 event
784 }
785
786 _ = &mut shutdown_rx => {
787 tracing::info!("Compactor is shutting down");
788 return
789 }
790 };
791
792 fn send_report_task_event(
793 compact_task: &CompactTask,
794 table_stats: TableStatsMap,
795 object_timestamps: HashMap<HummockSstableObjectId, u64>,
796 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
797 ) {
798 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
799 event: Some(RequestEvent::ReportTask(ReportTask {
800 task_id: compact_task.task_id,
801 task_status: compact_task.task_status.into(),
802 sorted_output_ssts: compact_task
803 .sorted_output_ssts
804 .iter()
805 .map(|sst| sst.into())
806 .collect(),
807 table_stats_change: to_prost_table_stats_map(table_stats),
808 object_timestamps,
809 })),
810 create_at: SystemTime::now()
811 .duration_since(std::time::UNIX_EPOCH)
812 .expect("Clock may have gone backwards")
813 .as_millis() as u64,
814 }) {
815 let task_id = compact_task.task_id;
816 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
817 }
818 }
819
820 match event {
821 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
822 let event = match event {
823 Some(event) => event,
824 None => continue 'consume_stream,
825 };
826 let shutdown = shutdown_map.clone();
827 let context = compactor_context.clone();
828 let consumed_latency_ms = SystemTime::now()
829 .duration_since(std::time::UNIX_EPOCH)
830 .expect("Clock may have gone backwards")
831 .as_millis() as u64
832 - create_at;
833 context
834 .compactor_metrics
835 .compaction_event_consumed_latency
836 .observe(consumed_latency_ms as _);
837
838 let object_id_manager = object_id_manager.clone();
839 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
840
841 match event {
842 ResponseEvent::CompactTask(compact_task) => {
843 let compact_task = CompactTask::from(compact_task);
844 let parallelism =
845 calculate_task_parallelism(&compact_task, &context);
846
847 assert_ne!(parallelism, 0, "splits cannot be empty");
848
849 if (max_task_parallelism
850 - running_task_parallelism.load(Ordering::SeqCst))
851 < parallelism as u32
852 {
853 tracing::warn!(
854 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
855 compact_task.task_id,
856 parallelism,
857 max_task_parallelism,
858 running_task_parallelism.load(Ordering::Relaxed),
859 );
860 let (compact_task, table_stats, object_timestamps) =
861 compact_done(
862 compact_task,
863 context.clone(),
864 vec![],
865 TaskStatus::NoAvailCpuResourceCanceled,
866 );
867
868 send_report_task_event(
869 &compact_task,
870 table_stats,
871 object_timestamps,
872 &request_sender,
873 );
874
875 continue 'consume_stream;
876 }
877
878 running_task_parallelism
879 .fetch_add(parallelism as u32, Ordering::SeqCst);
880 executor.spawn(async move {
881 let (tx, rx) = tokio::sync::oneshot::channel();
882 let task_id = compact_task.task_id;
883 shutdown.lock().unwrap().insert(task_id, tx);
884
885 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
886 context.clone(),
887 compact_task,
888 rx,
889 object_id_manager.clone(),
890 compaction_catalog_manager_ref.clone(),
891 )
892 .await;
893
894 shutdown.lock().unwrap().remove(&task_id);
895 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
896
897 send_report_task_event(
898 &compact_task,
899 table_stats,
900 object_timestamps,
901 &request_sender,
902 );
903
904 let enable_check_compaction_result =
905 context.storage_opts.check_compaction_result;
906 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
907
908 if enable_check_compaction_result && need_check_task {
909 let compact_table_ids = compact_task.build_compact_table_ids();
910 match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
911 Ok(compaction_catalog_agent_ref) => {
912 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
913 {
914 Err(e) => {
915 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
916 }
917 Ok(true) => (),
918 Ok(false) => {
919 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
920 }
921 }
922 },
923 Err(e) => {
924 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
925 }
926 }
927 }
928 });
929 }
930 #[expect(deprecated)]
931 ResponseEvent::VacuumTask(_) => {
932 unreachable!("unexpected vacuum task");
933 }
934 #[expect(deprecated)]
935 ResponseEvent::FullScanTask(_) => {
936 unreachable!("unexpected scan task");
937 }
938 #[expect(deprecated)]
939 ResponseEvent::ValidationTask(validation_task) => {
940 let validation_task = ValidationTask::from(validation_task);
941 executor.spawn(async move {
942 validate_ssts(validation_task, context.sstable_store.clone())
943 .await;
944 });
945 }
946 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
947 .lock()
948 .unwrap()
949 .remove(&cancel_compact_task.task_id)
950 {
951 Some(tx) => {
952 if tx.send(()).is_err() {
953 tracing::warn!(
954 "Cancellation of compaction task failed. task_id: {}",
955 cancel_compact_task.task_id
956 );
957 }
958 }
959 _ => {
960 tracing::warn!(
961 "Attempting to cancel non-existent compaction task. task_id: {}",
962 cancel_compact_task.task_id
963 );
964 }
965 },
966
967 ResponseEvent::PullTaskAck(_pull_task_ack) => {
968 pull_task_ack = true;
970 }
971 }
972 }
973 Some(Err(e)) => {
974 tracing::warn!("Failed to consume stream. {}", e.message());
975 continue 'start_stream;
976 }
977 _ => {
978 continue 'start_stream;
980 }
981 }
982 }
983 }
984 });
985
986 (join_handle, shutdown_tx)
987}
988
989#[must_use]
992pub fn start_shared_compactor(
993 grpc_proxy_client: GrpcCompactorProxyClient,
994 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
995 context: CompactorContext,
996) -> (JoinHandle<()>, Sender<()>) {
997 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
998 let task_progress = context.task_progress_manager.clone();
999 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1000 let periodic_event_update_interval = Duration::from_millis(1000);
1001
1002 let join_handle = tokio::spawn(async move {
1003 let shutdown_map = CompactionShutdownMap::default();
1004
1005 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1006 let executor = context.compaction_executor.clone();
1007 let report_heartbeat_client = grpc_proxy_client.clone();
1008 'consume_stream: loop {
1009 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1010 _ = periodic_event_interval.tick() => {
1011 let progress_list = get_task_progress(task_progress.clone());
1012 let report_compaction_task_request = ReportCompactionTaskRequest{
1013 event: Some(ReportCompactionTaskEvent::HeartBeat(
1014 SharedHeartBeat {
1015 progress: progress_list
1016 }
1017 )),
1018 };
1019 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1020 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1021 }
1022 continue
1023 }
1024
1025
1026 _ = &mut shutdown_rx => {
1027 tracing::info!("Compactor is shutting down");
1028 return
1029 }
1030
1031 request = receiver.recv() => {
1032 request
1033 }
1034
1035 };
1036 match request {
1037 Some(request) => {
1038 let context = context.clone();
1039 let shutdown = shutdown_map.clone();
1040
1041 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1042 executor.spawn(async move {
1043 let DispatchCompactionTaskRequest {
1044 tables,
1045 output_object_ids,
1046 task: dispatch_task,
1047 } = request.into_inner();
1048 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1049 acc.insert(table.id, table);
1050 acc
1051 });
1052
1053 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1054 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1055 let shared_compactor_object_id_manager =
1056 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1057 match dispatch_task.unwrap() {
1058 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1059 let compact_task = CompactTask::from(&compact_task);
1060 let (tx, rx) = tokio::sync::oneshot::channel();
1061 let task_id = compact_task.task_id;
1062 shutdown.lock().unwrap().insert(task_id, tx);
1063
1064 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1065 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1066 context.clone(),
1067 compact_task,
1068 rx,
1069 shared_compactor_object_id_manager,
1070 compaction_catalog_agent_ref.clone(),
1071 )
1072 .await;
1073 shutdown.lock().unwrap().remove(&task_id);
1074 let report_compaction_task_request = ReportCompactionTaskRequest {
1075 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1076 compact_task: Some(PbCompactTask::from(&compact_task)),
1077 table_stats_change: to_prost_table_stats_map(table_stats),
1078 object_timestamps,
1079 })),
1080 };
1081
1082 match cloned_grpc_proxy_client
1083 .report_compaction_task(report_compaction_task_request)
1084 .await
1085 {
1086 Ok(_) => {
1087 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1089 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1090 if enable_check_compaction_result && need_check_task {
1091 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1092 Err(e) => {
1093 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1094 },
1095 Ok(true) => (),
1096 Ok(false) => {
1097 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1098 }
1099 }
1100 }
1101 }
1102 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1103 }
1104
1105 }
1106 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1107 unreachable!("unexpected vacuum task");
1108 }
1109 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1110 unreachable!("unexpected scan task");
1111 }
1112 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1113 let validation_task = ValidationTask::from(validation_task);
1114 validate_ssts(validation_task, context.sstable_store.clone()).await;
1115 }
1116 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1117 match shutdown
1118 .lock()
1119 .unwrap()
1120 .remove(&cancel_compact_task.task_id)
1121 { Some(tx) => {
1122 if tx.send(()).is_err() {
1123 tracing::warn!(
1124 "Cancellation of compaction task failed. task_id: {}",
1125 cancel_compact_task.task_id
1126 );
1127 }
1128 } _ => {
1129 tracing::warn!(
1130 "Attempting to cancel non-existent compaction task. task_id: {}",
1131 cancel_compact_task.task_id
1132 );
1133 }}
1134 }
1135 }
1136 });
1137 }
1138 None => continue 'consume_stream,
1139 }
1140 }
1141 });
1142 (join_handle, shutdown_tx)
1143}
1144
1145fn get_task_progress(
1146 task_progress: Arc<
1147 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1148 >,
1149) -> Vec<CompactTaskProgress> {
1150 let mut progress_list = Vec::new();
1151 for (&task_id, progress) in &*task_progress.lock() {
1152 progress_list.push(progress.snapshot(task_id));
1153 }
1154 progress_list
1155}
1156
1157fn schedule_queued_tasks(
1159 task_queue: &mut IcebergTaskQueue,
1160 compactor_context: &CompactorContext,
1161 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1162 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1163) {
1164 while let Some(popped_task) = task_queue.pop() {
1165 let task_id = popped_task.meta.task_id;
1166 let plan_index = popped_task.meta.plan_index;
1167 let task_key = (task_id, plan_index);
1168
1169 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1171
1172 let Some(runner) = popped_task.runner else {
1173 tracing::error!(
1174 task_id = task_id,
1175 plan_index = plan_index,
1176 "Popped task missing runner - this should not happen"
1177 );
1178 task_queue.finish_running(task_key);
1179 continue;
1180 };
1181
1182 let executor = compactor_context.compaction_executor.clone();
1183 let shutdown_map_clone = shutdown_map.clone();
1184 let completion_tx_clone = task_completion_tx.clone();
1185
1186 tracing::info!(
1187 task_id = task_id,
1188 plan_index = plan_index,
1189 unique_ident = ?unique_ident,
1190 required_parallelism = popped_task.meta.required_parallelism,
1191 "Starting iceberg compaction task from queue"
1192 );
1193
1194 executor.spawn(async move {
1195 let (tx, rx) = tokio::sync::oneshot::channel();
1196 {
1197 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1198 shutdown_guard.insert(task_key, tx);
1199 }
1200
1201 let _cleanup_guard = scopeguard::guard(
1202 (task_key, shutdown_map_clone, completion_tx_clone),
1203 move |(task_key, shutdown_map, completion_tx)| {
1204 {
1205 let mut shutdown_guard = shutdown_map.lock().unwrap();
1206 shutdown_guard.remove(&task_key);
1207 }
1208 if completion_tx.send(task_key).is_err() {
1211 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1212 }
1213 },
1214 );
1215
1216 if let Err(e) = Box::pin(runner.compact(rx)).await {
1217 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1218 }
1219 });
1220 }
1221}
1222
1223fn handle_meta_task_pulling(
1226 pull_task_ack: &mut bool,
1227 task_queue: &IcebergTaskQueue,
1228 max_task_parallelism: u32,
1229 max_pull_task_count: u32,
1230 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1231 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1232) -> bool {
1233 let mut pending_pull_task_count = 0;
1234 if *pull_task_ack {
1235 let current_running_parallelism = task_queue.running_parallelism_sum();
1237 pending_pull_task_count =
1238 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1239
1240 if pending_pull_task_count > 0 {
1241 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1242 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1243 subscribe_iceberg_compaction_event_request::PullTask {
1244 pull_task_count: pending_pull_task_count,
1245 },
1246 )),
1247 create_at: SystemTime::now()
1248 .duration_since(std::time::UNIX_EPOCH)
1249 .expect("Clock may have gone backwards")
1250 .as_millis() as u64,
1251 }) {
1252 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1253 return true; } else {
1255 *pull_task_ack = false;
1256 }
1257 }
1258 }
1259
1260 let running_count = task_queue.running_parallelism_sum();
1261 let waiting_count = task_queue.waiting_parallelism_sum();
1262 let available_count = max_task_parallelism.saturating_sub(running_count);
1263 let current_state = IcebergCompactionLogState {
1264 running_parallelism: running_count,
1265 waiting_parallelism: waiting_count,
1266 available_parallelism: available_count,
1267 pull_task_ack: *pull_task_ack,
1268 pending_pull_task_count,
1269 };
1270
1271 if log_throttler.should_log(¤t_state) {
1273 tracing::info!(
1274 running_parallelism_count = %current_state.running_parallelism,
1275 waiting_parallelism_count = %current_state.waiting_parallelism,
1276 available_parallelism = %current_state.available_parallelism,
1277 pull_task_ack = %current_state.pull_task_ack,
1278 pending_pull_task_count = %current_state.pending_pull_task_count
1279 );
1280 log_throttler.update(current_state);
1281 }
1282
1283 false }
1285
1286#[cfg(test)]
1287mod tests {
1288 use super::*;
1289
1290 #[test]
1291 fn test_log_state_equality() {
1292 let state1 = CompactionLogState {
1294 running_parallelism: 10,
1295 pull_task_ack: true,
1296 pending_pull_task_count: 2,
1297 };
1298 let state2 = CompactionLogState {
1299 running_parallelism: 10,
1300 pull_task_ack: true,
1301 pending_pull_task_count: 2,
1302 };
1303 let state3 = CompactionLogState {
1304 running_parallelism: 11,
1305 pull_task_ack: true,
1306 pending_pull_task_count: 2,
1307 };
1308 assert_eq!(state1, state2);
1309 assert_ne!(state1, state3);
1310
1311 let ice_state1 = IcebergCompactionLogState {
1313 running_parallelism: 10,
1314 waiting_parallelism: 5,
1315 available_parallelism: 15,
1316 pull_task_ack: true,
1317 pending_pull_task_count: 2,
1318 };
1319 let ice_state2 = IcebergCompactionLogState {
1320 running_parallelism: 10,
1321 waiting_parallelism: 6,
1322 available_parallelism: 15,
1323 pull_task_ack: true,
1324 pending_pull_task_count: 2,
1325 };
1326 assert_ne!(ice_state1, ice_state2);
1327 }
1328
1329 #[test]
1330 fn test_log_throttler_state_change_detection() {
1331 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1332 let state1 = CompactionLogState {
1333 running_parallelism: 10,
1334 pull_task_ack: true,
1335 pending_pull_task_count: 2,
1336 };
1337 let state2 = CompactionLogState {
1338 running_parallelism: 11,
1339 pull_task_ack: true,
1340 pending_pull_task_count: 2,
1341 };
1342
1343 assert!(throttler.should_log(&state1));
1345 throttler.update(state1.clone());
1346
1347 assert!(!throttler.should_log(&state1));
1349
1350 assert!(throttler.should_log(&state2));
1352 throttler.update(state2.clone());
1353
1354 assert!(!throttler.should_log(&state2));
1356 }
1357
1358 #[test]
1359 fn test_log_throttler_heartbeat() {
1360 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1361 let state = CompactionLogState {
1362 running_parallelism: 10,
1363 pull_task_ack: true,
1364 pending_pull_task_count: 2,
1365 };
1366
1367 assert!(throttler.should_log(&state));
1369 throttler.update(state.clone());
1370
1371 assert!(!throttler.should_log(&state));
1373
1374 std::thread::sleep(Duration::from_millis(15));
1376
1377 assert!(throttler.should_log(&state));
1379 }
1380}