1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26 ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30 subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54 TtlCompactionFilter,
55};
56pub use context::{
57 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60use iceberg_compaction::iceberg_compactor_runner::{
62 IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78 SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104 UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114 running_parallelism: u32,
115 pull_task_ack: bool,
116 pending_pull_task_count: u32,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122 running_parallelism: u32,
123 waiting_parallelism: u32,
124 available_parallelism: u32,
125 pull_task_ack: bool,
126 pending_pull_task_count: u32,
127}
128
129struct LogThrottler<T: PartialEq> {
131 last_logged_state: Option<T>,
132 last_heartbeat: Instant,
133 heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137 fn new(heartbeat_interval: Duration) -> Self {
138 Self {
139 last_logged_state: None,
140 last_heartbeat: Instant::now(),
141 heartbeat_interval,
142 }
143 }
144
145 fn should_log(&self, current_state: &T) -> bool {
147 self.last_logged_state.as_ref() != Some(current_state)
148 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149 }
150
151 fn update(&mut self, current_state: T) {
153 self.last_logged_state = Some(current_state);
154 self.last_heartbeat = Instant::now();
155 }
156}
157
158pub struct Compactor {
160 context: CompactorContext,
162 object_id_getter: Arc<dyn GetObjectId>,
163 task_config: TaskConfig,
164 options: SstableBuilderOptions,
165 get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171 pub fn new(
173 context: CompactorContext,
174 options: SstableBuilderOptions,
175 task_config: TaskConfig,
176 object_id_getter: Arc<dyn GetObjectId>,
177 ) -> Self {
178 Self {
179 context,
180 options,
181 task_config,
182 get_id_time: Arc::new(AtomicU64::new(0)),
183 object_id_getter,
184 }
185 }
186
187 async fn compact_key_range(
192 &self,
193 iter: impl HummockIterator<Direction = Forward>,
194 compaction_filter: impl CompactionFilter,
195 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196 task_progress: Option<Arc<TaskProgress>>,
197 task_id: Option<HummockCompactionTaskId>,
198 split_index: Option<usize>,
199 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200 let compact_timer = if self.context.is_share_buffer_compact {
202 self.context
203 .compactor_metrics
204 .write_build_l0_sst_duration
205 .start_timer()
206 } else {
207 self.context
208 .compactor_metrics
209 .compact_sst_duration
210 .start_timer()
211 };
212
213 let (split_table_outputs, table_stats_map) = {
214 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215 if self.task_config.use_block_based_filter {
216 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217 factory,
218 iter,
219 compaction_filter,
220 compaction_catalog_agent_ref,
221 task_progress.clone(),
222 self.object_id_getter.clone(),
223 )
224 .instrument_await("compact".verbose())
225 .await?
226 } else {
227 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228 factory,
229 iter,
230 compaction_filter,
231 compaction_catalog_agent_ref,
232 task_progress.clone(),
233 self.object_id_getter.clone(),
234 )
235 .instrument_await("compact".verbose())
236 .await?
237 }
238 };
239
240 compact_timer.observe_duration();
241
242 Self::report_progress(
243 self.context.compactor_metrics.clone(),
244 task_progress,
245 &split_table_outputs,
246 self.context.is_share_buffer_compact,
247 );
248
249 self.context
250 .compactor_metrics
251 .get_table_id_total_time_duration
252 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254 debug_assert!(
255 split_table_outputs
256 .iter()
257 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258 );
259
260 if task_id.is_some() {
261 tracing::info!(
263 "Finish Task {:?} split_index {:?} sst count {}",
264 task_id,
265 split_index,
266 split_table_outputs.len()
267 );
268 }
269 Ok((split_table_outputs, table_stats_map))
270 }
271
272 pub fn report_progress(
273 metrics: Arc<CompactorMetrics>,
274 task_progress: Option<Arc<TaskProgress>>,
275 ssts: &Vec<LocalSstableInfo>,
276 is_share_buffer_compact: bool,
277 ) {
278 for sst_info in ssts {
279 let sst_size = sst_info.file_size();
280 if let Some(tracker) = &task_progress {
281 tracker.inc_ssts_uploaded();
282 tracker.dec_num_pending_write_io();
283 }
284 if is_share_buffer_compact {
285 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286 } else {
287 metrics.compaction_upload_sst_counts.inc();
288 }
289 }
290 }
291
292 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293 &self,
294 writer_factory: F,
295 iter: impl HummockIterator<Direction = Forward>,
296 compaction_filter: impl CompactionFilter,
297 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298 task_progress: Option<Arc<TaskProgress>>,
299 object_id_getter: Arc<dyn GetObjectId>,
300 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301 let builder_factory = RemoteBuilderFactory::<F, B> {
302 object_id_getter,
303 limiter: self.context.memory_limiter.clone(),
304 options: self.options.clone(),
305 policy: self.task_config.cache_policy,
306 remote_rpc_cost: self.get_id_time.clone(),
307 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308 sstable_writer_factory: writer_factory,
309 _phantom: PhantomData,
310 };
311
312 let mut sst_builder = CapacitySplitTableBuilder::new(
313 builder_factory,
314 self.context.compactor_metrics.clone(),
315 task_progress.clone(),
316 self.task_config.table_vnode_partition.clone(),
317 self.context
318 .storage_opts
319 .compactor_concurrent_uploading_sst_count,
320 compaction_catalog_agent_ref,
321 );
322 let compaction_statistics = compact_and_build_sst(
323 &mut sst_builder,
324 &self.task_config,
325 self.context.compactor_metrics.clone(),
326 iter,
327 compaction_filter,
328 )
329 .instrument_await("compact_and_build_sst".verbose())
330 .await?;
331
332 let ssts = sst_builder
333 .finish()
334 .instrument_await("builder_finish".verbose())
335 .await?;
336
337 Ok((ssts, compaction_statistics))
338 }
339}
340
341#[must_use]
344pub fn start_iceberg_compactor(
345 compactor_context: CompactorContext,
346 hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349 let stream_retry_interval = Duration::from_secs(30);
350 let periodic_event_update_interval = Duration::from_millis(1000);
351 let worker_num = compactor_context.compaction_executor.worker_num();
352
353 let max_task_parallelism: u32 = (worker_num as f32
354 * compactor_context.storage_opts.compactor_max_task_multiplier)
355 .ceil() as u32;
356
357 const MAX_PULL_TASK_COUNT: u32 = 4;
358 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
359
360 assert_ge!(
361 compactor_context.storage_opts.compactor_max_task_multiplier,
362 0.0
363 );
364
365 let join_handle = tokio::spawn(async move {
366 let pending_parallelism_budget = (max_task_parallelism as f32
368 * compactor_context
369 .storage_opts
370 .iceberg_compaction_pending_parallelism_budget_multiplier)
371 .ceil() as u32;
372 let mut task_queue =
373 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
374
375 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
377
378 let (task_completion_tx, mut task_completion_rx) =
380 tokio::sync::mpsc::unbounded_channel::<TaskKey>();
381
382 let mut min_interval = tokio::time::interval(stream_retry_interval);
383 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
384
385 let mut log_throttler =
387 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
388
389 'start_stream: loop {
391 let mut pull_task_ack = true;
394 tokio::select! {
395 _ = min_interval.tick() => {},
397 _ = &mut shutdown_rx => {
399 tracing::info!("Compactor is shutting down");
400 return;
401 }
402 }
403
404 let (request_sender, response_event_stream) = match hummock_meta_client
405 .subscribe_iceberg_compaction_event()
406 .await
407 {
408 Ok((request_sender, response_event_stream)) => {
409 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
410 (request_sender, response_event_stream)
411 }
412
413 Err(e) => {
414 tracing::warn!(
415 error = %e.as_report(),
416 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
417 );
418 continue 'start_stream;
419 }
420 };
421
422 pin_mut!(response_event_stream);
423
424 let _executor = compactor_context.compaction_executor.clone();
425
426 let mut event_loop_iteration_now = Instant::now();
428 'consume_stream: loop {
429 {
430 compactor_context
432 .compactor_metrics
433 .compaction_event_loop_iteration_latency
434 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
435 event_loop_iteration_now = Instant::now();
436 }
437
438 let request_sender = request_sender.clone();
439 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
440 Some(completed_task_key) = task_completion_rx.recv() => {
442 tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
443 task_queue.finish_running(completed_task_key);
444 continue 'consume_stream;
445 }
446
447 _ = task_queue.wait_schedulable() => {
449 schedule_queued_tasks(
450 &mut task_queue,
451 &compactor_context,
452 &shutdown_map,
453 &task_completion_tx,
454 );
455 continue 'consume_stream;
456 }
457
458 _ = periodic_event_interval.tick() => {
459 let should_restart_stream = handle_meta_task_pulling(
461 &mut pull_task_ack,
462 &task_queue,
463 max_task_parallelism,
464 max_pull_task_count,
465 &request_sender,
466 &mut log_throttler,
467 );
468
469 if should_restart_stream {
470 continue 'start_stream;
471 }
472 continue;
473 }
474 event = response_event_stream.next() => {
475 event
476 }
477
478 _ = &mut shutdown_rx => {
479 tracing::info!("Iceberg Compactor is shutting down");
480 return
481 }
482 };
483
484 match event {
485 Some(Ok(SubscribeIcebergCompactionEventResponse {
486 event,
487 create_at: _create_at,
488 })) => {
489 let event = match event {
490 Some(event) => event,
491 None => continue 'consume_stream,
492 };
493
494 match event {
495 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
496 let task_id = iceberg_compaction_task.task_id;
497 let write_parquet_properties = WriterProperties::builder()
498 .set_created_by(concat!(
499 "risingwave version ",
500 env!("CARGO_PKG_VERSION")
501 )
502 .to_owned())
503 .set_max_row_group_size(
504 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
505 )
506 .set_compression(Compression::SNAPPY) .build();
508
509 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
510 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
511 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
512 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
513 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
514 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
515 .write_parquet_properties(write_parquet_properties)
516 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
517 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
518 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
519 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
520 .target_binpack_group_size_mb(
521 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
522 )
523 .min_group_size_mb(
524 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
525 )
526 .min_group_file_count(
527 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
528 )
529 .build() {
530 Ok(config) => config,
531 Err(e) => {
532 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
533 continue 'consume_stream;
534 }
535 };
536
537 let plan_runners = match create_plan_runners(
539 iceberg_compaction_task,
540 compactor_runner_config,
541 compactor_context.compactor_metrics.clone(),
542 ).await {
543 Ok(runners) => runners,
544 Err(e) => {
545 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
546 continue 'consume_stream;
547 }
548 };
549
550 if plan_runners.is_empty() {
551 tracing::info!(task_id, "No plans to execute");
552 continue 'consume_stream;
553 }
554
555 let total_plans = plan_runners.len();
557 let mut enqueued_count = 0;
558
559 for runner in plan_runners {
560 let meta = runner.to_meta();
561 let required_parallelism = runner.required_parallelism();
562 let push_result = task_queue.push(meta.clone(), Some(runner));
563
564 match push_result {
565 PushResult::Added => {
566 enqueued_count += 1;
567 tracing::debug!(
568 task_id = task_id,
569 plan_index = enqueued_count - 1,
570 required_parallelism = required_parallelism,
571 "Iceberg plan runner added to queue"
572 );
573 },
574 PushResult::RejectedCapacity => {
575 tracing::warn!(
576 task_id = task_id,
577 required_parallelism = required_parallelism,
578 pending_budget = pending_parallelism_budget,
579 enqueued_count = enqueued_count,
580 total_plans = total_plans,
581 "Iceberg plan runner rejected - queue capacity exceeded"
582 );
583 break;
585 },
586 PushResult::RejectedTooLarge => {
587 tracing::error!(
588 task_id = task_id,
589 required_parallelism = required_parallelism,
590 max_parallelism = max_task_parallelism,
591 "Iceberg plan runner rejected - parallelism exceeds max"
592 );
593 },
594 PushResult::RejectedInvalidParallelism => {
595 tracing::error!(
596 task_id = task_id,
597 required_parallelism = required_parallelism,
598 "Iceberg plan runner rejected - invalid parallelism"
599 );
600 },
601 PushResult::RejectedDuplicate => {
602 tracing::error!(
603 task_id = task_id,
604 plan_index = meta.plan_index,
605 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
606 );
607 }
608 }
609 }
610
611 tracing::info!(
612 task_id = task_id,
613 total_plans = total_plans,
614 enqueued_count = enqueued_count,
615 "Enqueued {} of {} Iceberg plan runners",
616 enqueued_count,
617 total_plans
618 );
619 },
620 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
621 pull_task_ack = true;
623 },
624 }
625 }
626 Some(Err(e)) => {
627 tracing::warn!("Failed to consume stream. {}", e.message());
628 continue 'start_stream;
629 }
630 _ => {
631 continue 'start_stream;
633 }
634 }
635 }
636 }
637 });
638
639 (join_handle, shutdown_tx)
640}
641
642#[must_use]
645pub fn start_compactor(
646 compactor_context: CompactorContext,
647 hummock_meta_client: Arc<dyn HummockMetaClient>,
648 object_id_manager: Arc<ObjectIdManager>,
649 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
650) -> (JoinHandle<()>, Sender<()>) {
651 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
652 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
653 let stream_retry_interval = Duration::from_secs(30);
654 let task_progress = compactor_context.task_progress_manager.clone();
655 let periodic_event_update_interval = Duration::from_millis(1000);
656
657 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
658 * compactor_context.storage_opts.compactor_max_task_multiplier)
659 .ceil() as u32;
660 let running_task_parallelism = Arc::new(AtomicU32::new(0));
661
662 const MAX_PULL_TASK_COUNT: u32 = 4;
663 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
664
665 assert_ge!(
666 compactor_context.storage_opts.compactor_max_task_multiplier,
667 0.0
668 );
669
670 let join_handle = tokio::spawn(async move {
671 let shutdown_map = CompactionShutdownMap::default();
672 let mut min_interval = tokio::time::interval(stream_retry_interval);
673 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
674
675 let mut log_throttler =
677 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
678
679 'start_stream: loop {
681 let mut pull_task_ack = true;
684 tokio::select! {
685 _ = min_interval.tick() => {},
687 _ = &mut shutdown_rx => {
689 tracing::info!("Compactor is shutting down");
690 return;
691 }
692 }
693
694 let (request_sender, response_event_stream) =
695 match hummock_meta_client.subscribe_compaction_event().await {
696 Ok((request_sender, response_event_stream)) => {
697 tracing::debug!("Succeeded subscribe_compaction_event.");
698 (request_sender, response_event_stream)
699 }
700
701 Err(e) => {
702 tracing::warn!(
703 error = %e.as_report(),
704 "Subscribing to compaction tasks failed with error. Will retry.",
705 );
706 continue 'start_stream;
707 }
708 };
709
710 pin_mut!(response_event_stream);
711
712 let executor = compactor_context.compaction_executor.clone();
713 let object_id_manager = object_id_manager.clone();
714
715 let mut event_loop_iteration_now = Instant::now();
717 'consume_stream: loop {
718 {
719 compactor_context
721 .compactor_metrics
722 .compaction_event_loop_iteration_latency
723 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
724 event_loop_iteration_now = Instant::now();
725 }
726
727 let running_task_parallelism = running_task_parallelism.clone();
728 let request_sender = request_sender.clone();
729 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
730 _ = periodic_event_interval.tick() => {
731 let progress_list = get_task_progress(task_progress.clone());
732
733 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
734 event: Some(RequestEvent::HeartBeat(
735 HeartBeat {
736 progress: progress_list
737 }
738 )),
739 create_at: SystemTime::now()
740 .duration_since(std::time::UNIX_EPOCH)
741 .expect("Clock may have gone backwards")
742 .as_millis() as u64,
743 }) {
744 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
745 continue 'start_stream;
747 }
748
749
750 let mut pending_pull_task_count = 0;
751 if pull_task_ack {
752 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
754
755 if pending_pull_task_count > 0 {
756 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
757 event: Some(RequestEvent::PullTask(
758 PullTask {
759 pull_task_count: pending_pull_task_count,
760 }
761 )),
762 create_at: SystemTime::now()
763 .duration_since(std::time::UNIX_EPOCH)
764 .expect("Clock may have gone backwards")
765 .as_millis() as u64,
766 }) {
767 tracing::warn!(error = %e.as_report(), "Failed to pull task");
768
769 continue 'start_stream;
771 } else {
772 pull_task_ack = false;
773 }
774 }
775 }
776
777 let running_count = running_task_parallelism.load(Ordering::SeqCst);
778 let current_state = CompactionLogState {
779 running_parallelism: running_count,
780 pull_task_ack,
781 pending_pull_task_count,
782 };
783
784 if log_throttler.should_log(¤t_state) {
786 tracing::info!(
787 running_parallelism_count = %current_state.running_parallelism,
788 pull_task_ack = %current_state.pull_task_ack,
789 pending_pull_task_count = %current_state.pending_pull_task_count
790 );
791 log_throttler.update(current_state);
792 }
793
794 continue;
795 }
796 event = response_event_stream.next() => {
797 event
798 }
799
800 _ = &mut shutdown_rx => {
801 tracing::info!("Compactor is shutting down");
802 return
803 }
804 };
805
806 fn send_report_task_event(
807 compact_task: &CompactTask,
808 table_stats: TableStatsMap,
809 object_timestamps: HashMap<HummockSstableObjectId, u64>,
810 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
811 ) {
812 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
813 event: Some(RequestEvent::ReportTask(ReportTask {
814 task_id: compact_task.task_id,
815 task_status: compact_task.task_status.into(),
816 sorted_output_ssts: compact_task
817 .sorted_output_ssts
818 .iter()
819 .map(|sst| sst.into())
820 .collect(),
821 table_stats_change: to_prost_table_stats_map(table_stats),
822 object_timestamps: object_timestamps
823 .into_iter()
824 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
825 .collect(),
826 })),
827 create_at: SystemTime::now()
828 .duration_since(std::time::UNIX_EPOCH)
829 .expect("Clock may have gone backwards")
830 .as_millis() as u64,
831 }) {
832 let task_id = compact_task.task_id;
833 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
834 }
835 }
836
837 match event {
838 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
839 let event = match event {
840 Some(event) => event,
841 None => continue 'consume_stream,
842 };
843 let shutdown = shutdown_map.clone();
844 let context = compactor_context.clone();
845 let consumed_latency_ms = SystemTime::now()
846 .duration_since(std::time::UNIX_EPOCH)
847 .expect("Clock may have gone backwards")
848 .as_millis() as u64
849 - create_at;
850 context
851 .compactor_metrics
852 .compaction_event_consumed_latency
853 .observe(consumed_latency_ms as _);
854
855 let object_id_manager = object_id_manager.clone();
856 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
857
858 match event {
859 ResponseEvent::CompactTask(compact_task) => {
860 let compact_task = CompactTask::from(compact_task);
861 let parallelism =
862 calculate_task_parallelism(&compact_task, &context);
863
864 assert_ne!(parallelism, 0, "splits cannot be empty");
865
866 if (max_task_parallelism
867 - running_task_parallelism.load(Ordering::SeqCst))
868 < parallelism as u32
869 {
870 tracing::warn!(
871 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
872 compact_task.task_id,
873 parallelism,
874 max_task_parallelism,
875 running_task_parallelism.load(Ordering::Relaxed),
876 );
877 let (compact_task, table_stats, object_timestamps) =
878 compact_done(
879 compact_task,
880 context.clone(),
881 vec![],
882 TaskStatus::NoAvailCpuResourceCanceled,
883 );
884
885 send_report_task_event(
886 &compact_task,
887 table_stats,
888 object_timestamps,
889 &request_sender,
890 );
891
892 continue 'consume_stream;
893 }
894
895 running_task_parallelism
896 .fetch_add(parallelism as u32, Ordering::SeqCst);
897 executor.spawn(async move {
898 let (tx, rx) = tokio::sync::oneshot::channel();
899 let task_id = compact_task.task_id;
900 shutdown.lock().unwrap().insert(task_id, tx);
901
902 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
903 context.clone(),
904 compact_task,
905 rx,
906 object_id_manager.clone(),
907 compaction_catalog_manager_ref.clone(),
908 )
909 .await;
910
911 shutdown.lock().unwrap().remove(&task_id);
912 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
913
914 send_report_task_event(
915 &compact_task,
916 table_stats,
917 object_timestamps,
918 &request_sender,
919 );
920
921 let enable_check_compaction_result =
922 context.storage_opts.check_compaction_result;
923 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
924
925 if enable_check_compaction_result && need_check_task {
926 let compact_table_ids = compact_task.build_compact_table_ids();
927 match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
928 Ok(compaction_catalog_agent_ref) => {
929 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
930 {
931 Err(e) => {
932 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
933 }
934 Ok(true) => (),
935 Ok(false) => {
936 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
937 }
938 }
939 },
940 Err(e) => {
941 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
942 }
943 }
944 }
945 });
946 }
947 ResponseEvent::VacuumTask(_) => {
948 unreachable!("unexpected vacuum task");
949 }
950 ResponseEvent::FullScanTask(_) => {
951 unreachable!("unexpected scan task");
952 }
953 ResponseEvent::ValidationTask(validation_task) => {
954 let validation_task = ValidationTask::from(validation_task);
955 executor.spawn(async move {
956 validate_ssts(validation_task, context.sstable_store.clone())
957 .await;
958 });
959 }
960 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
961 .lock()
962 .unwrap()
963 .remove(&cancel_compact_task.task_id)
964 {
965 Some(tx) => {
966 if tx.send(()).is_err() {
967 tracing::warn!(
968 "Cancellation of compaction task failed. task_id: {}",
969 cancel_compact_task.task_id
970 );
971 }
972 }
973 _ => {
974 tracing::warn!(
975 "Attempting to cancel non-existent compaction task. task_id: {}",
976 cancel_compact_task.task_id
977 );
978 }
979 },
980
981 ResponseEvent::PullTaskAck(_pull_task_ack) => {
982 pull_task_ack = true;
984 }
985 }
986 }
987 Some(Err(e)) => {
988 tracing::warn!("Failed to consume stream. {}", e.message());
989 continue 'start_stream;
990 }
991 _ => {
992 continue 'start_stream;
994 }
995 }
996 }
997 }
998 });
999
1000 (join_handle, shutdown_tx)
1001}
1002
1003#[must_use]
1006pub fn start_shared_compactor(
1007 grpc_proxy_client: GrpcCompactorProxyClient,
1008 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1009 context: CompactorContext,
1010) -> (JoinHandle<()>, Sender<()>) {
1011 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1012 let task_progress = context.task_progress_manager.clone();
1013 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1014 let periodic_event_update_interval = Duration::from_millis(1000);
1015
1016 let join_handle = tokio::spawn(async move {
1017 let shutdown_map = CompactionShutdownMap::default();
1018
1019 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1020 let executor = context.compaction_executor.clone();
1021 let report_heartbeat_client = grpc_proxy_client.clone();
1022 'consume_stream: loop {
1023 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1024 _ = periodic_event_interval.tick() => {
1025 let progress_list = get_task_progress(task_progress.clone());
1026 let report_compaction_task_request = ReportCompactionTaskRequest{
1027 event: Some(ReportCompactionTaskEvent::HeartBeat(
1028 SharedHeartBeat {
1029 progress: progress_list
1030 }
1031 )),
1032 };
1033 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1034 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1035 }
1036 continue
1037 }
1038
1039
1040 _ = &mut shutdown_rx => {
1041 tracing::info!("Compactor is shutting down");
1042 return
1043 }
1044
1045 request = receiver.recv() => {
1046 request
1047 }
1048
1049 };
1050 match request {
1051 Some(request) => {
1052 let context = context.clone();
1053 let shutdown = shutdown_map.clone();
1054
1055 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1056 executor.spawn(async move {
1057 let DispatchCompactionTaskRequest {
1058 tables,
1059 output_object_ids,
1060 task: dispatch_task,
1061 } = request.into_inner();
1062 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1063 acc.insert(table.id, table);
1064 acc
1065 });
1066
1067 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1068 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1069 let shared_compactor_object_id_manager =
1070 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1071 match dispatch_task.unwrap() {
1072 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1073 let compact_task = CompactTask::from(&compact_task);
1074 let (tx, rx) = tokio::sync::oneshot::channel();
1075 let task_id = compact_task.task_id;
1076 shutdown.lock().unwrap().insert(task_id, tx);
1077
1078 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1079 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1080 context.clone(),
1081 compact_task,
1082 rx,
1083 shared_compactor_object_id_manager,
1084 compaction_catalog_agent_ref.clone(),
1085 )
1086 .await;
1087 shutdown.lock().unwrap().remove(&task_id);
1088 let report_compaction_task_request = ReportCompactionTaskRequest {
1089 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1090 compact_task: Some(PbCompactTask::from(&compact_task)),
1091 table_stats_change: to_prost_table_stats_map(table_stats),
1092 object_timestamps: object_timestamps
1093 .into_iter()
1094 .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1095 .collect(),
1096 })),
1097 };
1098
1099 match cloned_grpc_proxy_client
1100 .report_compaction_task(report_compaction_task_request)
1101 .await
1102 {
1103 Ok(_) => {
1104 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1106 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1107 if enable_check_compaction_result && need_check_task {
1108 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1109 Err(e) => {
1110 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1111 },
1112 Ok(true) => (),
1113 Ok(false) => {
1114 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1115 }
1116 }
1117 }
1118 }
1119 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1120 }
1121
1122 }
1123 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1124 unreachable!("unexpected vacuum task");
1125 }
1126 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1127 unreachable!("unexpected scan task");
1128 }
1129 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1130 let validation_task = ValidationTask::from(validation_task);
1131 validate_ssts(validation_task, context.sstable_store.clone()).await;
1132 }
1133 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1134 match shutdown
1135 .lock()
1136 .unwrap()
1137 .remove(&cancel_compact_task.task_id)
1138 { Some(tx) => {
1139 if tx.send(()).is_err() {
1140 tracing::warn!(
1141 "Cancellation of compaction task failed. task_id: {}",
1142 cancel_compact_task.task_id
1143 );
1144 }
1145 } _ => {
1146 tracing::warn!(
1147 "Attempting to cancel non-existent compaction task. task_id: {}",
1148 cancel_compact_task.task_id
1149 );
1150 }}
1151 }
1152 }
1153 });
1154 }
1155 None => continue 'consume_stream,
1156 }
1157 }
1158 });
1159 (join_handle, shutdown_tx)
1160}
1161
1162fn get_task_progress(
1163 task_progress: Arc<
1164 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1165 >,
1166) -> Vec<CompactTaskProgress> {
1167 let mut progress_list = Vec::new();
1168 for (&task_id, progress) in &*task_progress.lock() {
1169 progress_list.push(progress.snapshot(task_id));
1170 }
1171 progress_list
1172}
1173
1174fn schedule_queued_tasks(
1176 task_queue: &mut IcebergTaskQueue,
1177 compactor_context: &CompactorContext,
1178 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1179 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1180) {
1181 while let Some(popped_task) = task_queue.pop() {
1182 let task_id = popped_task.meta.task_id;
1183 let plan_index = popped_task.meta.plan_index;
1184 let task_key = (task_id, plan_index);
1185
1186 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1188
1189 let Some(runner) = popped_task.runner else {
1190 tracing::error!(
1191 task_id = task_id,
1192 plan_index = plan_index,
1193 "Popped task missing runner - this should not happen"
1194 );
1195 task_queue.finish_running(task_key);
1196 continue;
1197 };
1198
1199 let executor = compactor_context.compaction_executor.clone();
1200 let shutdown_map_clone = shutdown_map.clone();
1201 let completion_tx_clone = task_completion_tx.clone();
1202
1203 tracing::info!(
1204 task_id = task_id,
1205 plan_index = plan_index,
1206 unique_ident = ?unique_ident,
1207 required_parallelism = popped_task.meta.required_parallelism,
1208 "Starting iceberg compaction task from queue"
1209 );
1210
1211 executor.spawn(async move {
1212 let (tx, rx) = tokio::sync::oneshot::channel();
1213 {
1214 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1215 shutdown_guard.insert(task_key, tx);
1216 }
1217
1218 let _cleanup_guard = scopeguard::guard(
1219 (task_key, shutdown_map_clone, completion_tx_clone),
1220 move |(task_key, shutdown_map, completion_tx)| {
1221 {
1222 let mut shutdown_guard = shutdown_map.lock().unwrap();
1223 shutdown_guard.remove(&task_key);
1224 }
1225 if completion_tx.send(task_key).is_err() {
1228 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1229 }
1230 },
1231 );
1232
1233 if let Err(e) = runner.compact(rx).await {
1234 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1235 }
1236 });
1237 }
1238}
1239
1240fn handle_meta_task_pulling(
1243 pull_task_ack: &mut bool,
1244 task_queue: &IcebergTaskQueue,
1245 max_task_parallelism: u32,
1246 max_pull_task_count: u32,
1247 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1248 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1249) -> bool {
1250 let mut pending_pull_task_count = 0;
1251 if *pull_task_ack {
1252 let current_running_parallelism = task_queue.running_parallelism_sum();
1254 pending_pull_task_count =
1255 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1256
1257 if pending_pull_task_count > 0 {
1258 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1259 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1260 subscribe_iceberg_compaction_event_request::PullTask {
1261 pull_task_count: pending_pull_task_count,
1262 },
1263 )),
1264 create_at: SystemTime::now()
1265 .duration_since(std::time::UNIX_EPOCH)
1266 .expect("Clock may have gone backwards")
1267 .as_millis() as u64,
1268 }) {
1269 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1270 return true; } else {
1272 *pull_task_ack = false;
1273 }
1274 }
1275 }
1276
1277 let running_count = task_queue.running_parallelism_sum();
1278 let waiting_count = task_queue.waiting_parallelism_sum();
1279 let available_count = max_task_parallelism.saturating_sub(running_count);
1280 let current_state = IcebergCompactionLogState {
1281 running_parallelism: running_count,
1282 waiting_parallelism: waiting_count,
1283 available_parallelism: available_count,
1284 pull_task_ack: *pull_task_ack,
1285 pending_pull_task_count,
1286 };
1287
1288 if log_throttler.should_log(¤t_state) {
1290 tracing::info!(
1291 running_parallelism_count = %current_state.running_parallelism,
1292 waiting_parallelism_count = %current_state.waiting_parallelism,
1293 available_parallelism = %current_state.available_parallelism,
1294 pull_task_ack = %current_state.pull_task_ack,
1295 pending_pull_task_count = %current_state.pending_pull_task_count
1296 );
1297 log_throttler.update(current_state);
1298 }
1299
1300 false }
1302
1303#[cfg(test)]
1304mod tests {
1305 use super::*;
1306
1307 #[test]
1308 fn test_log_state_equality() {
1309 let state1 = CompactionLogState {
1311 running_parallelism: 10,
1312 pull_task_ack: true,
1313 pending_pull_task_count: 2,
1314 };
1315 let state2 = CompactionLogState {
1316 running_parallelism: 10,
1317 pull_task_ack: true,
1318 pending_pull_task_count: 2,
1319 };
1320 let state3 = CompactionLogState {
1321 running_parallelism: 11,
1322 pull_task_ack: true,
1323 pending_pull_task_count: 2,
1324 };
1325 assert_eq!(state1, state2);
1326 assert_ne!(state1, state3);
1327
1328 let ice_state1 = IcebergCompactionLogState {
1330 running_parallelism: 10,
1331 waiting_parallelism: 5,
1332 available_parallelism: 15,
1333 pull_task_ack: true,
1334 pending_pull_task_count: 2,
1335 };
1336 let ice_state2 = IcebergCompactionLogState {
1337 running_parallelism: 10,
1338 waiting_parallelism: 6,
1339 available_parallelism: 15,
1340 pull_task_ack: true,
1341 pending_pull_task_count: 2,
1342 };
1343 assert_ne!(ice_state1, ice_state2);
1344 }
1345
1346 #[test]
1347 fn test_log_throttler_state_change_detection() {
1348 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1349 let state1 = CompactionLogState {
1350 running_parallelism: 10,
1351 pull_task_ack: true,
1352 pending_pull_task_count: 2,
1353 };
1354 let state2 = CompactionLogState {
1355 running_parallelism: 11,
1356 pull_task_ack: true,
1357 pending_pull_task_count: 2,
1358 };
1359
1360 assert!(throttler.should_log(&state1));
1362 throttler.update(state1.clone());
1363
1364 assert!(!throttler.should_log(&state1));
1366
1367 assert!(throttler.should_log(&state2));
1369 throttler.update(state2.clone());
1370
1371 assert!(!throttler.should_log(&state2));
1373 }
1374
1375 #[test]
1376 fn test_log_throttler_heartbeat() {
1377 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1378 let state = CompactionLogState {
1379 running_parallelism: 10,
1380 pull_task_ack: true,
1381 pending_pull_task_count: 2,
1382 };
1383
1384 assert!(throttler.should_log(&state));
1386 throttler.update(state.clone());
1387
1388 assert!(!throttler.should_log(&state));
1390
1391 std::thread::sleep(Duration::from_millis(15));
1393
1394 assert!(throttler.should_log(&state));
1396 }
1397}