1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26 ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30 subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54 TtlCompactionFilter,
55};
56pub use context::{
57 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60use iceberg_compaction::iceberg_compactor_runner::{
62 IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78 SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104 UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114 running_parallelism: u32,
115 pull_task_ack: bool,
116 pending_pull_task_count: u32,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122 running_parallelism: u32,
123 waiting_parallelism: u32,
124 available_parallelism: u32,
125 pull_task_ack: bool,
126 pending_pull_task_count: u32,
127}
128
129struct LogThrottler<T: PartialEq> {
131 last_logged_state: Option<T>,
132 last_heartbeat: Instant,
133 heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137 fn new(heartbeat_interval: Duration) -> Self {
138 Self {
139 last_logged_state: None,
140 last_heartbeat: Instant::now(),
141 heartbeat_interval,
142 }
143 }
144
145 fn should_log(&self, current_state: &T) -> bool {
147 self.last_logged_state.as_ref() != Some(current_state)
148 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149 }
150
151 fn update(&mut self, current_state: T) {
153 self.last_logged_state = Some(current_state);
154 self.last_heartbeat = Instant::now();
155 }
156}
157
158pub struct Compactor {
160 context: CompactorContext,
162 object_id_getter: Arc<dyn GetObjectId>,
163 task_config: TaskConfig,
164 options: SstableBuilderOptions,
165 get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171 pub fn new(
173 context: CompactorContext,
174 options: SstableBuilderOptions,
175 task_config: TaskConfig,
176 object_id_getter: Arc<dyn GetObjectId>,
177 ) -> Self {
178 Self {
179 context,
180 options,
181 task_config,
182 get_id_time: Arc::new(AtomicU64::new(0)),
183 object_id_getter,
184 }
185 }
186
187 async fn compact_key_range(
192 &self,
193 iter: impl HummockIterator<Direction = Forward>,
194 compaction_filter: impl CompactionFilter,
195 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196 task_progress: Option<Arc<TaskProgress>>,
197 task_id: Option<HummockCompactionTaskId>,
198 split_index: Option<usize>,
199 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200 let compact_timer = if self.context.is_share_buffer_compact {
202 self.context
203 .compactor_metrics
204 .write_build_l0_sst_duration
205 .start_timer()
206 } else {
207 self.context
208 .compactor_metrics
209 .compact_sst_duration
210 .start_timer()
211 };
212
213 let (split_table_outputs, table_stats_map) = {
214 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215 if self.task_config.use_block_based_filter {
216 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217 factory,
218 iter,
219 compaction_filter,
220 compaction_catalog_agent_ref,
221 task_progress.clone(),
222 self.object_id_getter.clone(),
223 )
224 .instrument_await("compact".verbose())
225 .await?
226 } else {
227 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228 factory,
229 iter,
230 compaction_filter,
231 compaction_catalog_agent_ref,
232 task_progress.clone(),
233 self.object_id_getter.clone(),
234 )
235 .instrument_await("compact".verbose())
236 .await?
237 }
238 };
239
240 compact_timer.observe_duration();
241
242 Self::report_progress(
243 self.context.compactor_metrics.clone(),
244 task_progress,
245 &split_table_outputs,
246 self.context.is_share_buffer_compact,
247 );
248
249 self.context
250 .compactor_metrics
251 .get_table_id_total_time_duration
252 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254 debug_assert!(
255 split_table_outputs
256 .iter()
257 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258 );
259
260 if task_id.is_some() {
261 tracing::info!(
263 "Finish Task {:?} split_index {:?} sst count {}",
264 task_id,
265 split_index,
266 split_table_outputs.len()
267 );
268 }
269 Ok((split_table_outputs, table_stats_map))
270 }
271
272 pub fn report_progress(
273 metrics: Arc<CompactorMetrics>,
274 task_progress: Option<Arc<TaskProgress>>,
275 ssts: &Vec<LocalSstableInfo>,
276 is_share_buffer_compact: bool,
277 ) {
278 for sst_info in ssts {
279 let sst_size = sst_info.file_size();
280 if let Some(tracker) = &task_progress {
281 tracker.inc_ssts_uploaded();
282 tracker.dec_num_pending_write_io();
283 }
284 if is_share_buffer_compact {
285 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286 } else {
287 metrics.compaction_upload_sst_counts.inc();
288 }
289 }
290 }
291
292 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293 &self,
294 writer_factory: F,
295 iter: impl HummockIterator<Direction = Forward>,
296 compaction_filter: impl CompactionFilter,
297 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298 task_progress: Option<Arc<TaskProgress>>,
299 object_id_getter: Arc<dyn GetObjectId>,
300 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301 let builder_factory = RemoteBuilderFactory::<F, B> {
302 object_id_getter,
303 limiter: self.context.memory_limiter.clone(),
304 options: self.options.clone(),
305 policy: self.task_config.cache_policy,
306 remote_rpc_cost: self.get_id_time.clone(),
307 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308 sstable_writer_factory: writer_factory,
309 _phantom: PhantomData,
310 };
311
312 let mut sst_builder = CapacitySplitTableBuilder::new(
313 builder_factory,
314 self.context.compactor_metrics.clone(),
315 task_progress.clone(),
316 self.task_config.table_vnode_partition.clone(),
317 self.context
318 .storage_opts
319 .compactor_concurrent_uploading_sst_count,
320 compaction_catalog_agent_ref,
321 );
322 let compaction_statistics = compact_and_build_sst(
323 &mut sst_builder,
324 &self.task_config,
325 self.context.compactor_metrics.clone(),
326 iter,
327 compaction_filter,
328 )
329 .instrument_await("compact_and_build_sst".verbose())
330 .await?;
331
332 let ssts = sst_builder
333 .finish()
334 .instrument_await("builder_finish".verbose())
335 .await?;
336
337 Ok((ssts, compaction_statistics))
338 }
339}
340
341#[must_use]
344pub fn start_iceberg_compactor(
345 compactor_context: CompactorContext,
346 hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349 let stream_retry_interval = Duration::from_secs(30);
350 let periodic_event_update_interval = Duration::from_millis(1000);
351 let worker_num = compactor_context.compaction_executor.worker_num();
352
353 let max_task_parallelism: u32 = (worker_num as f32
354 * compactor_context.storage_opts.compactor_max_task_multiplier)
355 .ceil() as u32;
356
357 const MAX_PULL_TASK_COUNT: u32 = 4;
358 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
359
360 assert_ge!(
361 compactor_context.storage_opts.compactor_max_task_multiplier,
362 0.0
363 );
364
365 let join_handle = tokio::spawn(async move {
366 let pending_parallelism_budget = (max_task_parallelism as f32
368 * compactor_context
369 .storage_opts
370 .iceberg_compaction_pending_parallelism_budget_multiplier)
371 .ceil() as u32;
372 let mut task_queue =
373 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
374
375 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
377
378 let (task_completion_tx, mut task_completion_rx) =
380 tokio::sync::mpsc::unbounded_channel::<TaskKey>();
381
382 let mut min_interval = tokio::time::interval(stream_retry_interval);
383 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
384
385 let mut log_throttler =
387 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
388
389 'start_stream: loop {
391 let mut pull_task_ack = true;
394 tokio::select! {
395 _ = min_interval.tick() => {},
397 _ = &mut shutdown_rx => {
399 tracing::info!("Compactor is shutting down");
400 return;
401 }
402 }
403
404 let (request_sender, response_event_stream) = match hummock_meta_client
405 .subscribe_iceberg_compaction_event()
406 .await
407 {
408 Ok((request_sender, response_event_stream)) => {
409 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
410 (request_sender, response_event_stream)
411 }
412
413 Err(e) => {
414 tracing::warn!(
415 error = %e.as_report(),
416 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
417 );
418 continue 'start_stream;
419 }
420 };
421
422 pin_mut!(response_event_stream);
423
424 let _executor = compactor_context.compaction_executor.clone();
425
426 let mut event_loop_iteration_now = Instant::now();
428 'consume_stream: loop {
429 {
430 compactor_context
432 .compactor_metrics
433 .compaction_event_loop_iteration_latency
434 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
435 event_loop_iteration_now = Instant::now();
436 }
437
438 let request_sender = request_sender.clone();
439 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
440 Some(completed_task_key) = task_completion_rx.recv() => {
442 tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
443 task_queue.finish_running(completed_task_key);
444 continue 'consume_stream;
445 }
446
447 _ = task_queue.wait_schedulable() => {
449 schedule_queued_tasks(
450 &mut task_queue,
451 &compactor_context,
452 &shutdown_map,
453 &task_completion_tx,
454 );
455 continue 'consume_stream;
456 }
457
458 _ = periodic_event_interval.tick() => {
459 let should_restart_stream = handle_meta_task_pulling(
461 &mut pull_task_ack,
462 &task_queue,
463 max_task_parallelism,
464 max_pull_task_count,
465 &request_sender,
466 &mut log_throttler,
467 );
468
469 if should_restart_stream {
470 continue 'start_stream;
471 }
472 continue;
473 }
474 event = response_event_stream.next() => {
475 event
476 }
477
478 _ = &mut shutdown_rx => {
479 tracing::info!("Iceberg Compactor is shutting down");
480 return
481 }
482 };
483
484 match event {
485 Some(Ok(SubscribeIcebergCompactionEventResponse {
486 event,
487 create_at: _create_at,
488 })) => {
489 let event = match event {
490 Some(event) => event,
491 None => continue 'consume_stream,
492 };
493
494 match event {
495 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
496 let task_id = iceberg_compaction_task.task_id;
497 let write_parquet_properties = WriterProperties::builder()
498 .set_created_by(concat!(
499 "risingwave version ",
500 env!("CARGO_PKG_VERSION")
501 )
502 .to_owned())
503 .set_max_row_group_size(
504 compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
505 )
506 .set_compression(Compression::SNAPPY) .build();
508
509 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
510 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
511 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
512 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
513 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
514 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
515 .write_parquet_properties(write_parquet_properties)
516 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
517 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
518 .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
519 .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
520 .target_binpack_group_size_mb(
521 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
522 )
523 .min_group_size_mb(
524 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
525 )
526 .min_group_file_count(
527 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
528 )
529 .build() {
530 Ok(config) => config,
531 Err(e) => {
532 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
533 continue 'consume_stream;
534 }
535 };
536
537 let plan_runners = match create_plan_runners(
539 iceberg_compaction_task,
540 compactor_runner_config,
541 compactor_context.compactor_metrics.clone(),
542 ).await {
543 Ok(runners) => runners,
544 Err(e) => {
545 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
546 continue 'consume_stream;
547 }
548 };
549
550 if plan_runners.is_empty() {
551 tracing::info!(task_id, "No plans to execute");
552 continue 'consume_stream;
553 }
554
555 let total_plans = plan_runners.len();
557 let mut enqueued_count = 0;
558
559 for runner in plan_runners {
560 let meta = runner.to_meta();
561 let required_parallelism = runner.required_parallelism();
562 let push_result = task_queue.push(meta.clone(), Some(runner));
563
564 match push_result {
565 PushResult::Added => {
566 enqueued_count += 1;
567 tracing::debug!(
568 task_id = task_id,
569 plan_index = enqueued_count - 1,
570 required_parallelism = required_parallelism,
571 "Iceberg plan runner added to queue"
572 );
573 },
574 PushResult::RejectedCapacity => {
575 tracing::warn!(
576 task_id = task_id,
577 required_parallelism = required_parallelism,
578 pending_budget = pending_parallelism_budget,
579 enqueued_count = enqueued_count,
580 total_plans = total_plans,
581 "Iceberg plan runner rejected - queue capacity exceeded"
582 );
583 break;
585 },
586 PushResult::RejectedTooLarge => {
587 tracing::error!(
588 task_id = task_id,
589 required_parallelism = required_parallelism,
590 max_parallelism = max_task_parallelism,
591 "Iceberg plan runner rejected - parallelism exceeds max"
592 );
593 },
594 PushResult::RejectedInvalidParallelism => {
595 tracing::error!(
596 task_id = task_id,
597 required_parallelism = required_parallelism,
598 "Iceberg plan runner rejected - invalid parallelism"
599 );
600 },
601 PushResult::RejectedDuplicate => {
602 tracing::error!(
603 task_id = task_id,
604 plan_index = meta.plan_index,
605 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
606 );
607 }
608 }
609 }
610
611 tracing::info!(
612 task_id = task_id,
613 total_plans = total_plans,
614 enqueued_count = enqueued_count,
615 "Enqueued {} of {} Iceberg plan runners",
616 enqueued_count,
617 total_plans
618 );
619 },
620 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
621 pull_task_ack = true;
623 },
624 }
625 }
626 Some(Err(e)) => {
627 tracing::warn!("Failed to consume stream. {}", e.message());
628 continue 'start_stream;
629 }
630 _ => {
631 continue 'start_stream;
633 }
634 }
635 }
636 }
637 });
638
639 (join_handle, shutdown_tx)
640}
641
642#[must_use]
645pub fn start_compactor(
646 compactor_context: CompactorContext,
647 hummock_meta_client: Arc<dyn HummockMetaClient>,
648 object_id_manager: Arc<ObjectIdManager>,
649 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
650) -> (JoinHandle<()>, Sender<()>) {
651 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
652 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
653 let stream_retry_interval = Duration::from_secs(30);
654 let task_progress = compactor_context.task_progress_manager.clone();
655 let periodic_event_update_interval = Duration::from_millis(1000);
656
657 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
658 * compactor_context.storage_opts.compactor_max_task_multiplier)
659 .ceil() as u32;
660 let running_task_parallelism = Arc::new(AtomicU32::new(0));
661
662 const MAX_PULL_TASK_COUNT: u32 = 4;
663 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
664
665 assert_ge!(
666 compactor_context.storage_opts.compactor_max_task_multiplier,
667 0.0
668 );
669
670 let join_handle = tokio::spawn(async move {
671 let shutdown_map = CompactionShutdownMap::default();
672 let mut min_interval = tokio::time::interval(stream_retry_interval);
673 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
674
675 let mut log_throttler =
677 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
678
679 'start_stream: loop {
681 let mut pull_task_ack = true;
684 tokio::select! {
685 _ = min_interval.tick() => {},
687 _ = &mut shutdown_rx => {
689 tracing::info!("Compactor is shutting down");
690 return;
691 }
692 }
693
694 let (request_sender, response_event_stream) =
695 match hummock_meta_client.subscribe_compaction_event().await {
696 Ok((request_sender, response_event_stream)) => {
697 tracing::debug!("Succeeded subscribe_compaction_event.");
698 (request_sender, response_event_stream)
699 }
700
701 Err(e) => {
702 tracing::warn!(
703 error = %e.as_report(),
704 "Subscribing to compaction tasks failed with error. Will retry.",
705 );
706 continue 'start_stream;
707 }
708 };
709
710 pin_mut!(response_event_stream);
711
712 let executor = compactor_context.compaction_executor.clone();
713 let object_id_manager = object_id_manager.clone();
714
715 let mut event_loop_iteration_now = Instant::now();
717 'consume_stream: loop {
718 {
719 compactor_context
721 .compactor_metrics
722 .compaction_event_loop_iteration_latency
723 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
724 event_loop_iteration_now = Instant::now();
725 }
726
727 let running_task_parallelism = running_task_parallelism.clone();
728 let request_sender = request_sender.clone();
729 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
730 _ = periodic_event_interval.tick() => {
731 let progress_list = get_task_progress(task_progress.clone());
732
733 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
734 event: Some(RequestEvent::HeartBeat(
735 HeartBeat {
736 progress: progress_list
737 }
738 )),
739 create_at: SystemTime::now()
740 .duration_since(std::time::UNIX_EPOCH)
741 .expect("Clock may have gone backwards")
742 .as_millis() as u64,
743 }) {
744 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
745 continue 'start_stream;
747 }
748
749
750 let mut pending_pull_task_count = 0;
751 if pull_task_ack {
752 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
754
755 if pending_pull_task_count > 0 {
756 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
757 event: Some(RequestEvent::PullTask(
758 PullTask {
759 pull_task_count: pending_pull_task_count,
760 }
761 )),
762 create_at: SystemTime::now()
763 .duration_since(std::time::UNIX_EPOCH)
764 .expect("Clock may have gone backwards")
765 .as_millis() as u64,
766 }) {
767 tracing::warn!(error = %e.as_report(), "Failed to pull task");
768
769 continue 'start_stream;
771 } else {
772 pull_task_ack = false;
773 }
774 }
775 }
776
777 let running_count = running_task_parallelism.load(Ordering::SeqCst);
778 let current_state = CompactionLogState {
779 running_parallelism: running_count,
780 pull_task_ack,
781 pending_pull_task_count,
782 };
783
784 if log_throttler.should_log(¤t_state) {
786 tracing::info!(
787 running_parallelism_count = %current_state.running_parallelism,
788 pull_task_ack = %current_state.pull_task_ack,
789 pending_pull_task_count = %current_state.pending_pull_task_count
790 );
791 log_throttler.update(current_state);
792 }
793
794 continue;
795 }
796 event = response_event_stream.next() => {
797 event
798 }
799
800 _ = &mut shutdown_rx => {
801 tracing::info!("Compactor is shutting down");
802 return
803 }
804 };
805
806 fn send_report_task_event(
807 compact_task: &CompactTask,
808 table_stats: TableStatsMap,
809 object_timestamps: HashMap<HummockSstableObjectId, u64>,
810 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
811 ) {
812 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
813 event: Some(RequestEvent::ReportTask(ReportTask {
814 task_id: compact_task.task_id,
815 task_status: compact_task.task_status.into(),
816 sorted_output_ssts: compact_task
817 .sorted_output_ssts
818 .iter()
819 .map(|sst| sst.into())
820 .collect(),
821 table_stats_change: to_prost_table_stats_map(table_stats),
822 object_timestamps,
823 })),
824 create_at: SystemTime::now()
825 .duration_since(std::time::UNIX_EPOCH)
826 .expect("Clock may have gone backwards")
827 .as_millis() as u64,
828 }) {
829 let task_id = compact_task.task_id;
830 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
831 }
832 }
833
834 match event {
835 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
836 let event = match event {
837 Some(event) => event,
838 None => continue 'consume_stream,
839 };
840 let shutdown = shutdown_map.clone();
841 let context = compactor_context.clone();
842 let consumed_latency_ms = SystemTime::now()
843 .duration_since(std::time::UNIX_EPOCH)
844 .expect("Clock may have gone backwards")
845 .as_millis() as u64
846 - create_at;
847 context
848 .compactor_metrics
849 .compaction_event_consumed_latency
850 .observe(consumed_latency_ms as _);
851
852 let object_id_manager = object_id_manager.clone();
853 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
854
855 match event {
856 ResponseEvent::CompactTask(compact_task) => {
857 let compact_task = CompactTask::from(compact_task);
858 let parallelism =
859 calculate_task_parallelism(&compact_task, &context);
860
861 assert_ne!(parallelism, 0, "splits cannot be empty");
862
863 if (max_task_parallelism
864 - running_task_parallelism.load(Ordering::SeqCst))
865 < parallelism as u32
866 {
867 tracing::warn!(
868 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
869 compact_task.task_id,
870 parallelism,
871 max_task_parallelism,
872 running_task_parallelism.load(Ordering::Relaxed),
873 );
874 let (compact_task, table_stats, object_timestamps) =
875 compact_done(
876 compact_task,
877 context.clone(),
878 vec![],
879 TaskStatus::NoAvailCpuResourceCanceled,
880 );
881
882 send_report_task_event(
883 &compact_task,
884 table_stats,
885 object_timestamps,
886 &request_sender,
887 );
888
889 continue 'consume_stream;
890 }
891
892 running_task_parallelism
893 .fetch_add(parallelism as u32, Ordering::SeqCst);
894 executor.spawn(async move {
895 let (tx, rx) = tokio::sync::oneshot::channel();
896 let task_id = compact_task.task_id;
897 shutdown.lock().unwrap().insert(task_id, tx);
898
899 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
900 context.clone(),
901 compact_task,
902 rx,
903 object_id_manager.clone(),
904 compaction_catalog_manager_ref.clone(),
905 )
906 .await;
907
908 shutdown.lock().unwrap().remove(&task_id);
909 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
910
911 send_report_task_event(
912 &compact_task,
913 table_stats,
914 object_timestamps,
915 &request_sender,
916 );
917
918 let enable_check_compaction_result =
919 context.storage_opts.check_compaction_result;
920 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
921
922 if enable_check_compaction_result && need_check_task {
923 let compact_table_ids = compact_task.build_compact_table_ids();
924 match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
925 Ok(compaction_catalog_agent_ref) => {
926 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
927 {
928 Err(e) => {
929 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
930 }
931 Ok(true) => (),
932 Ok(false) => {
933 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
934 }
935 }
936 },
937 Err(e) => {
938 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
939 }
940 }
941 }
942 });
943 }
944 ResponseEvent::VacuumTask(_) => {
945 unreachable!("unexpected vacuum task");
946 }
947 ResponseEvent::FullScanTask(_) => {
948 unreachable!("unexpected scan task");
949 }
950 ResponseEvent::ValidationTask(validation_task) => {
951 let validation_task = ValidationTask::from(validation_task);
952 executor.spawn(async move {
953 validate_ssts(validation_task, context.sstable_store.clone())
954 .await;
955 });
956 }
957 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
958 .lock()
959 .unwrap()
960 .remove(&cancel_compact_task.task_id)
961 {
962 Some(tx) => {
963 if tx.send(()).is_err() {
964 tracing::warn!(
965 "Cancellation of compaction task failed. task_id: {}",
966 cancel_compact_task.task_id
967 );
968 }
969 }
970 _ => {
971 tracing::warn!(
972 "Attempting to cancel non-existent compaction task. task_id: {}",
973 cancel_compact_task.task_id
974 );
975 }
976 },
977
978 ResponseEvent::PullTaskAck(_pull_task_ack) => {
979 pull_task_ack = true;
981 }
982 }
983 }
984 Some(Err(e)) => {
985 tracing::warn!("Failed to consume stream. {}", e.message());
986 continue 'start_stream;
987 }
988 _ => {
989 continue 'start_stream;
991 }
992 }
993 }
994 }
995 });
996
997 (join_handle, shutdown_tx)
998}
999
1000#[must_use]
1003pub fn start_shared_compactor(
1004 grpc_proxy_client: GrpcCompactorProxyClient,
1005 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1006 context: CompactorContext,
1007) -> (JoinHandle<()>, Sender<()>) {
1008 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1009 let task_progress = context.task_progress_manager.clone();
1010 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1011 let periodic_event_update_interval = Duration::from_millis(1000);
1012
1013 let join_handle = tokio::spawn(async move {
1014 let shutdown_map = CompactionShutdownMap::default();
1015
1016 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1017 let executor = context.compaction_executor.clone();
1018 let report_heartbeat_client = grpc_proxy_client.clone();
1019 'consume_stream: loop {
1020 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1021 _ = periodic_event_interval.tick() => {
1022 let progress_list = get_task_progress(task_progress.clone());
1023 let report_compaction_task_request = ReportCompactionTaskRequest{
1024 event: Some(ReportCompactionTaskEvent::HeartBeat(
1025 SharedHeartBeat {
1026 progress: progress_list
1027 }
1028 )),
1029 };
1030 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1031 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1032 }
1033 continue
1034 }
1035
1036
1037 _ = &mut shutdown_rx => {
1038 tracing::info!("Compactor is shutting down");
1039 return
1040 }
1041
1042 request = receiver.recv() => {
1043 request
1044 }
1045
1046 };
1047 match request {
1048 Some(request) => {
1049 let context = context.clone();
1050 let shutdown = shutdown_map.clone();
1051
1052 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1053 executor.spawn(async move {
1054 let DispatchCompactionTaskRequest {
1055 tables,
1056 output_object_ids,
1057 task: dispatch_task,
1058 } = request.into_inner();
1059 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1060 acc.insert(table.id, table);
1061 acc
1062 });
1063
1064 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1065 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1066 let shared_compactor_object_id_manager =
1067 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1068 match dispatch_task.unwrap() {
1069 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1070 let compact_task = CompactTask::from(&compact_task);
1071 let (tx, rx) = tokio::sync::oneshot::channel();
1072 let task_id = compact_task.task_id;
1073 shutdown.lock().unwrap().insert(task_id, tx);
1074
1075 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1076 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1077 context.clone(),
1078 compact_task,
1079 rx,
1080 shared_compactor_object_id_manager,
1081 compaction_catalog_agent_ref.clone(),
1082 )
1083 .await;
1084 shutdown.lock().unwrap().remove(&task_id);
1085 let report_compaction_task_request = ReportCompactionTaskRequest {
1086 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1087 compact_task: Some(PbCompactTask::from(&compact_task)),
1088 table_stats_change: to_prost_table_stats_map(table_stats),
1089 object_timestamps,
1090 })),
1091 };
1092
1093 match cloned_grpc_proxy_client
1094 .report_compaction_task(report_compaction_task_request)
1095 .await
1096 {
1097 Ok(_) => {
1098 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1100 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1101 if enable_check_compaction_result && need_check_task {
1102 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1103 Err(e) => {
1104 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1105 },
1106 Ok(true) => (),
1107 Ok(false) => {
1108 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1109 }
1110 }
1111 }
1112 }
1113 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1114 }
1115
1116 }
1117 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1118 unreachable!("unexpected vacuum task");
1119 }
1120 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1121 unreachable!("unexpected scan task");
1122 }
1123 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1124 let validation_task = ValidationTask::from(validation_task);
1125 validate_ssts(validation_task, context.sstable_store.clone()).await;
1126 }
1127 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1128 match shutdown
1129 .lock()
1130 .unwrap()
1131 .remove(&cancel_compact_task.task_id)
1132 { Some(tx) => {
1133 if tx.send(()).is_err() {
1134 tracing::warn!(
1135 "Cancellation of compaction task failed. task_id: {}",
1136 cancel_compact_task.task_id
1137 );
1138 }
1139 } _ => {
1140 tracing::warn!(
1141 "Attempting to cancel non-existent compaction task. task_id: {}",
1142 cancel_compact_task.task_id
1143 );
1144 }}
1145 }
1146 }
1147 });
1148 }
1149 None => continue 'consume_stream,
1150 }
1151 }
1152 });
1153 (join_handle, shutdown_tx)
1154}
1155
1156fn get_task_progress(
1157 task_progress: Arc<
1158 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1159 >,
1160) -> Vec<CompactTaskProgress> {
1161 let mut progress_list = Vec::new();
1162 for (&task_id, progress) in &*task_progress.lock() {
1163 progress_list.push(progress.snapshot(task_id));
1164 }
1165 progress_list
1166}
1167
1168fn schedule_queued_tasks(
1170 task_queue: &mut IcebergTaskQueue,
1171 compactor_context: &CompactorContext,
1172 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1173 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1174) {
1175 while let Some(popped_task) = task_queue.pop() {
1176 let task_id = popped_task.meta.task_id;
1177 let plan_index = popped_task.meta.plan_index;
1178 let task_key = (task_id, plan_index);
1179
1180 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1182
1183 let Some(runner) = popped_task.runner else {
1184 tracing::error!(
1185 task_id = task_id,
1186 plan_index = plan_index,
1187 "Popped task missing runner - this should not happen"
1188 );
1189 task_queue.finish_running(task_key);
1190 continue;
1191 };
1192
1193 let executor = compactor_context.compaction_executor.clone();
1194 let shutdown_map_clone = shutdown_map.clone();
1195 let completion_tx_clone = task_completion_tx.clone();
1196
1197 tracing::info!(
1198 task_id = task_id,
1199 plan_index = plan_index,
1200 unique_ident = ?unique_ident,
1201 required_parallelism = popped_task.meta.required_parallelism,
1202 "Starting iceberg compaction task from queue"
1203 );
1204
1205 executor.spawn(async move {
1206 let (tx, rx) = tokio::sync::oneshot::channel();
1207 {
1208 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1209 shutdown_guard.insert(task_key, tx);
1210 }
1211
1212 let _cleanup_guard = scopeguard::guard(
1213 (task_key, shutdown_map_clone, completion_tx_clone),
1214 move |(task_key, shutdown_map, completion_tx)| {
1215 {
1216 let mut shutdown_guard = shutdown_map.lock().unwrap();
1217 shutdown_guard.remove(&task_key);
1218 }
1219 if completion_tx.send(task_key).is_err() {
1222 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1223 }
1224 },
1225 );
1226
1227 if let Err(e) = runner.compact(rx).await {
1228 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1229 }
1230 });
1231 }
1232}
1233
1234fn handle_meta_task_pulling(
1237 pull_task_ack: &mut bool,
1238 task_queue: &IcebergTaskQueue,
1239 max_task_parallelism: u32,
1240 max_pull_task_count: u32,
1241 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1242 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1243) -> bool {
1244 let mut pending_pull_task_count = 0;
1245 if *pull_task_ack {
1246 let current_running_parallelism = task_queue.running_parallelism_sum();
1248 pending_pull_task_count =
1249 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1250
1251 if pending_pull_task_count > 0 {
1252 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1253 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1254 subscribe_iceberg_compaction_event_request::PullTask {
1255 pull_task_count: pending_pull_task_count,
1256 },
1257 )),
1258 create_at: SystemTime::now()
1259 .duration_since(std::time::UNIX_EPOCH)
1260 .expect("Clock may have gone backwards")
1261 .as_millis() as u64,
1262 }) {
1263 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1264 return true; } else {
1266 *pull_task_ack = false;
1267 }
1268 }
1269 }
1270
1271 let running_count = task_queue.running_parallelism_sum();
1272 let waiting_count = task_queue.waiting_parallelism_sum();
1273 let available_count = max_task_parallelism.saturating_sub(running_count);
1274 let current_state = IcebergCompactionLogState {
1275 running_parallelism: running_count,
1276 waiting_parallelism: waiting_count,
1277 available_parallelism: available_count,
1278 pull_task_ack: *pull_task_ack,
1279 pending_pull_task_count,
1280 };
1281
1282 if log_throttler.should_log(¤t_state) {
1284 tracing::info!(
1285 running_parallelism_count = %current_state.running_parallelism,
1286 waiting_parallelism_count = %current_state.waiting_parallelism,
1287 available_parallelism = %current_state.available_parallelism,
1288 pull_task_ack = %current_state.pull_task_ack,
1289 pending_pull_task_count = %current_state.pending_pull_task_count
1290 );
1291 log_throttler.update(current_state);
1292 }
1293
1294 false }
1296
1297#[cfg(test)]
1298mod tests {
1299 use super::*;
1300
1301 #[test]
1302 fn test_log_state_equality() {
1303 let state1 = CompactionLogState {
1305 running_parallelism: 10,
1306 pull_task_ack: true,
1307 pending_pull_task_count: 2,
1308 };
1309 let state2 = CompactionLogState {
1310 running_parallelism: 10,
1311 pull_task_ack: true,
1312 pending_pull_task_count: 2,
1313 };
1314 let state3 = CompactionLogState {
1315 running_parallelism: 11,
1316 pull_task_ack: true,
1317 pending_pull_task_count: 2,
1318 };
1319 assert_eq!(state1, state2);
1320 assert_ne!(state1, state3);
1321
1322 let ice_state1 = IcebergCompactionLogState {
1324 running_parallelism: 10,
1325 waiting_parallelism: 5,
1326 available_parallelism: 15,
1327 pull_task_ack: true,
1328 pending_pull_task_count: 2,
1329 };
1330 let ice_state2 = IcebergCompactionLogState {
1331 running_parallelism: 10,
1332 waiting_parallelism: 6,
1333 available_parallelism: 15,
1334 pull_task_ack: true,
1335 pending_pull_task_count: 2,
1336 };
1337 assert_ne!(ice_state1, ice_state2);
1338 }
1339
1340 #[test]
1341 fn test_log_throttler_state_change_detection() {
1342 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1343 let state1 = CompactionLogState {
1344 running_parallelism: 10,
1345 pull_task_ack: true,
1346 pending_pull_task_count: 2,
1347 };
1348 let state2 = CompactionLogState {
1349 running_parallelism: 11,
1350 pull_task_ack: true,
1351 pending_pull_task_count: 2,
1352 };
1353
1354 assert!(throttler.should_log(&state1));
1356 throttler.update(state1.clone());
1357
1358 assert!(!throttler.should_log(&state1));
1360
1361 assert!(throttler.should_log(&state2));
1363 throttler.update(state2.clone());
1364
1365 assert!(!throttler.should_log(&state2));
1367 }
1368
1369 #[test]
1370 fn test_log_throttler_heartbeat() {
1371 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1372 let state = CompactionLogState {
1373 running_parallelism: 10,
1374 pull_task_ack: true,
1375 pending_pull_task_count: 2,
1376 };
1377
1378 assert!(throttler.should_log(&state));
1380 throttler.update(state.clone());
1381
1382 assert!(!throttler.should_log(&state));
1384
1385 std::thread::sleep(Duration::from_millis(15));
1387
1388 assert!(throttler.should_log(&state));
1390 }
1391}