risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26    ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30    subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54    TtlCompactionFilter,
55};
56pub use context::{
57    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60// Import iceberg compactor runner types from the local `iceberg_compaction` module.
61use iceberg_compaction::iceberg_compactor_runner::{
62    IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78    SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104    UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108/// Heartbeat logging interval for compaction tasks
109const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111/// Represents the compaction task state for logging purposes
112#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114    running_parallelism: u32,
115    pull_task_ack: bool,
116    pending_pull_task_count: u32,
117}
118
119/// Represents the iceberg compaction task state for logging purposes
120#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122    running_parallelism: u32,
123    waiting_parallelism: u32,
124    available_parallelism: u32,
125    pull_task_ack: bool,
126    pending_pull_task_count: u32,
127}
128
129/// Controls periodic logging with state change detection
130struct LogThrottler<T: PartialEq> {
131    last_logged_state: Option<T>,
132    last_heartbeat: Instant,
133    heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137    fn new(heartbeat_interval: Duration) -> Self {
138        Self {
139            last_logged_state: None,
140            last_heartbeat: Instant::now(),
141            heartbeat_interval,
142        }
143    }
144
145    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
146    fn should_log(&self, current_state: &T) -> bool {
147        self.last_logged_state.as_ref() != Some(current_state)
148            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149    }
150
151    /// Updates the state and heartbeat timestamp after logging
152    fn update(&mut self, current_state: T) {
153        self.last_logged_state = Some(current_state);
154        self.last_heartbeat = Instant::now();
155    }
156}
157
158/// Implementation of Hummock compaction.
159pub struct Compactor {
160    /// The context of the compactor.
161    context: CompactorContext,
162    object_id_getter: Arc<dyn GetObjectId>,
163    task_config: TaskConfig,
164    options: SstableBuilderOptions,
165    get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171    /// Create a new compactor.
172    pub fn new(
173        context: CompactorContext,
174        options: SstableBuilderOptions,
175        task_config: TaskConfig,
176        object_id_getter: Arc<dyn GetObjectId>,
177    ) -> Self {
178        Self {
179            context,
180            options,
181            task_config,
182            get_id_time: Arc::new(AtomicU64::new(0)),
183            object_id_getter,
184        }
185    }
186
187    /// Compact the given key range and merge iterator.
188    /// Upon a successful return, the built SSTs are already uploaded to object store.
189    ///
190    /// `task_progress` is only used for tasks on the compactor.
191    async fn compact_key_range(
192        &self,
193        iter: impl HummockIterator<Direction = Forward>,
194        compaction_filter: impl CompactionFilter,
195        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196        task_progress: Option<Arc<TaskProgress>>,
197        task_id: Option<HummockCompactionTaskId>,
198        split_index: Option<usize>,
199    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200        // Monitor time cost building shared buffer to SSTs.
201        let compact_timer = if self.context.is_share_buffer_compact {
202            self.context
203                .compactor_metrics
204                .write_build_l0_sst_duration
205                .start_timer()
206        } else {
207            self.context
208                .compactor_metrics
209                .compact_sst_duration
210                .start_timer()
211        };
212
213        let (split_table_outputs, table_stats_map) = {
214            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215            if self.task_config.use_block_based_filter {
216                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217                    factory,
218                    iter,
219                    compaction_filter,
220                    compaction_catalog_agent_ref,
221                    task_progress.clone(),
222                    self.object_id_getter.clone(),
223                )
224                .instrument_await("compact".verbose())
225                .await?
226            } else {
227                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228                    factory,
229                    iter,
230                    compaction_filter,
231                    compaction_catalog_agent_ref,
232                    task_progress.clone(),
233                    self.object_id_getter.clone(),
234                )
235                .instrument_await("compact".verbose())
236                .await?
237            }
238        };
239
240        compact_timer.observe_duration();
241
242        Self::report_progress(
243            self.context.compactor_metrics.clone(),
244            task_progress,
245            &split_table_outputs,
246            self.context.is_share_buffer_compact,
247        );
248
249        self.context
250            .compactor_metrics
251            .get_table_id_total_time_duration
252            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254        debug_assert!(
255            split_table_outputs
256                .iter()
257                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258        );
259
260        if task_id.is_some() {
261            // skip shared buffer compaction
262            tracing::info!(
263                "Finish Task {:?} split_index {:?} sst count {}",
264                task_id,
265                split_index,
266                split_table_outputs.len()
267            );
268        }
269        Ok((split_table_outputs, table_stats_map))
270    }
271
272    pub fn report_progress(
273        metrics: Arc<CompactorMetrics>,
274        task_progress: Option<Arc<TaskProgress>>,
275        ssts: &Vec<LocalSstableInfo>,
276        is_share_buffer_compact: bool,
277    ) {
278        for sst_info in ssts {
279            let sst_size = sst_info.file_size();
280            if let Some(tracker) = &task_progress {
281                tracker.inc_ssts_uploaded();
282                tracker.dec_num_pending_write_io();
283            }
284            if is_share_buffer_compact {
285                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286            } else {
287                metrics.compaction_upload_sst_counts.inc();
288            }
289        }
290    }
291
292    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293        &self,
294        writer_factory: F,
295        iter: impl HummockIterator<Direction = Forward>,
296        compaction_filter: impl CompactionFilter,
297        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298        task_progress: Option<Arc<TaskProgress>>,
299        object_id_getter: Arc<dyn GetObjectId>,
300    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301        let builder_factory = RemoteBuilderFactory::<F, B> {
302            object_id_getter,
303            limiter: self.context.memory_limiter.clone(),
304            options: self.options.clone(),
305            policy: self.task_config.cache_policy,
306            remote_rpc_cost: self.get_id_time.clone(),
307            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308            sstable_writer_factory: writer_factory,
309            _phantom: PhantomData,
310        };
311
312        let mut sst_builder = CapacitySplitTableBuilder::new(
313            builder_factory,
314            self.context.compactor_metrics.clone(),
315            task_progress.clone(),
316            self.task_config.table_vnode_partition.clone(),
317            self.context
318                .storage_opts
319                .compactor_concurrent_uploading_sst_count,
320            compaction_catalog_agent_ref,
321        );
322        let compaction_statistics = compact_and_build_sst(
323            &mut sst_builder,
324            &self.task_config,
325            self.context.compactor_metrics.clone(),
326            iter,
327            compaction_filter,
328        )
329        .instrument_await("compact_and_build_sst".verbose())
330        .await?;
331
332        let ssts = sst_builder
333            .finish()
334            .instrument_await("builder_finish".verbose())
335            .await?;
336
337        Ok((ssts, compaction_statistics))
338    }
339}
340
341/// The background compaction thread that receives compaction tasks from hummock compaction
342/// manager and runs compaction tasks.
343#[must_use]
344pub fn start_iceberg_compactor(
345    compactor_context: CompactorContext,
346    hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349    let stream_retry_interval = Duration::from_secs(30);
350    let periodic_event_update_interval = Duration::from_millis(1000);
351    let worker_num = compactor_context.compaction_executor.worker_num();
352
353    let max_task_parallelism: u32 = (worker_num as f32
354        * compactor_context.storage_opts.compactor_max_task_multiplier)
355        .ceil() as u32;
356
357    const MAX_PULL_TASK_COUNT: u32 = 4;
358    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
359
360    assert_ge!(
361        compactor_context.storage_opts.compactor_max_task_multiplier,
362        0.0
363    );
364
365    let join_handle = tokio::spawn(async move {
366        // Initialize task queue with event-driven scheduling
367        let pending_parallelism_budget = (max_task_parallelism as f32
368            * compactor_context
369                .storage_opts
370                .iceberg_compaction_pending_parallelism_budget_multiplier)
371            .ceil() as u32;
372        let mut task_queue =
373            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
374
375        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
376        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
377
378        // Channel for task completion notifications
379        let (task_completion_tx, mut task_completion_rx) =
380            tokio::sync::mpsc::unbounded_channel::<TaskKey>();
381
382        let mut min_interval = tokio::time::interval(stream_retry_interval);
383        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
384
385        // Track last logged state to avoid duplicate logs
386        let mut log_throttler =
387            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
388
389        // This outer loop is to recreate stream.
390        'start_stream: loop {
391            // reset state
392            // pull_task_ack.store(true, Ordering::SeqCst);
393            let mut pull_task_ack = true;
394            tokio::select! {
395                // Wait for interval.
396                _ = min_interval.tick() => {},
397                // Shutdown compactor.
398                _ = &mut shutdown_rx => {
399                    tracing::info!("Compactor is shutting down");
400                    return;
401                }
402            }
403
404            let (request_sender, response_event_stream) = match hummock_meta_client
405                .subscribe_iceberg_compaction_event()
406                .await
407            {
408                Ok((request_sender, response_event_stream)) => {
409                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
410                    (request_sender, response_event_stream)
411                }
412
413                Err(e) => {
414                    tracing::warn!(
415                        error = %e.as_report(),
416                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
417                    );
418                    continue 'start_stream;
419                }
420            };
421
422            pin_mut!(response_event_stream);
423
424            let _executor = compactor_context.compaction_executor.clone();
425
426            // This inner loop is to consume stream or report task progress.
427            let mut event_loop_iteration_now = Instant::now();
428            'consume_stream: loop {
429                {
430                    // report
431                    compactor_context
432                        .compactor_metrics
433                        .compaction_event_loop_iteration_latency
434                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
435                    event_loop_iteration_now = Instant::now();
436                }
437
438                let request_sender = request_sender.clone();
439                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
440                    // Handle task completion notifications
441                    Some(completed_task_key) = task_completion_rx.recv() => {
442                        tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
443                        task_queue.finish_running(completed_task_key);
444                        continue 'consume_stream;
445                    }
446
447                    // Event-driven task scheduling - wait for tasks to become schedulable
448                    _ = task_queue.wait_schedulable() => {
449                        schedule_queued_tasks(
450                            &mut task_queue,
451                            &compactor_context,
452                            &shutdown_map,
453                            &task_completion_tx,
454                        );
455                        continue 'consume_stream;
456                    }
457
458                    _ = periodic_event_interval.tick() => {
459                        // Only handle meta task pulling in periodic tick
460                        let should_restart_stream = handle_meta_task_pulling(
461                            &mut pull_task_ack,
462                            &task_queue,
463                            max_task_parallelism,
464                            max_pull_task_count,
465                            &request_sender,
466                            &mut log_throttler,
467                        );
468
469                        if should_restart_stream {
470                            continue 'start_stream;
471                        }
472                        continue;
473                    }
474                    event = response_event_stream.next() => {
475                        event
476                    }
477
478                    _ = &mut shutdown_rx => {
479                        tracing::info!("Iceberg Compactor is shutting down");
480                        return
481                    }
482                };
483
484                match event {
485                    Some(Ok(SubscribeIcebergCompactionEventResponse {
486                        event,
487                        create_at: _create_at,
488                    })) => {
489                        let event = match event {
490                            Some(event) => event,
491                            None => continue 'consume_stream,
492                        };
493
494                        match event {
495                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
496                                let task_id = iceberg_compaction_task.task_id;
497                                 let write_parquet_properties = WriterProperties::builder()
498                                        .set_created_by(concat!(
499                                            "risingwave version ",
500                                            env!("CARGO_PKG_VERSION")
501                                        )
502                                        .to_owned())
503                                        .set_max_row_group_size(
504                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
505                                        )
506                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
507                                        .build();
508
509                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
510                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
511                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
512                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
513                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
514                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
515                                    .write_parquet_properties(write_parquet_properties)
516                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
517                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
518                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
519                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
520                                    .target_binpack_group_size_mb(
521                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
522                                    )
523                                    .min_group_size_mb(
524                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
525                                    )
526                                    .min_group_file_count(
527                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
528                                    )
529                                    .build() {
530                                    Ok(config) => config,
531                                    Err(e) => {
532                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
533                                        continue 'consume_stream;
534                                    }
535                                };
536
537                                // Create multiple plan runners from the task
538                                let plan_runners = match create_plan_runners(
539                                    iceberg_compaction_task,
540                                    compactor_runner_config,
541                                    compactor_context.compactor_metrics.clone(),
542                                ).await {
543                                    Ok(runners) => runners,
544                                    Err(e) => {
545                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
546                                        continue 'consume_stream;
547                                    }
548                                };
549
550                                if plan_runners.is_empty() {
551                                    tracing::info!(task_id, "No plans to execute");
552                                    continue 'consume_stream;
553                                }
554
555                                // Enqueue each plan runner independently
556                                let total_plans = plan_runners.len();
557                                let mut enqueued_count = 0;
558
559                                for runner in plan_runners {
560                                    let meta = runner.to_meta();
561                                    let required_parallelism = runner.required_parallelism();
562                                    let push_result = task_queue.push(meta.clone(), Some(runner));
563
564                                    match push_result {
565                                        PushResult::Added => {
566                                            enqueued_count += 1;
567                                            tracing::debug!(
568                                                task_id = task_id,
569                                                plan_index = enqueued_count - 1,
570                                                required_parallelism = required_parallelism,
571                                                "Iceberg plan runner added to queue"
572                                            );
573                                        },
574                                        PushResult::RejectedCapacity => {
575                                            tracing::warn!(
576                                                task_id = task_id,
577                                                required_parallelism = required_parallelism,
578                                                pending_budget = pending_parallelism_budget,
579                                                enqueued_count = enqueued_count,
580                                                total_plans = total_plans,
581                                                "Iceberg plan runner rejected - queue capacity exceeded"
582                                            );
583                                            // Stop enqueuing remaining plans
584                                            break;
585                                        },
586                                        PushResult::RejectedTooLarge => {
587                                            tracing::error!(
588                                                task_id = task_id,
589                                                required_parallelism = required_parallelism,
590                                                max_parallelism = max_task_parallelism,
591                                                "Iceberg plan runner rejected - parallelism exceeds max"
592                                            );
593                                        },
594                                        PushResult::RejectedInvalidParallelism => {
595                                            tracing::error!(
596                                                task_id = task_id,
597                                                required_parallelism = required_parallelism,
598                                                "Iceberg plan runner rejected - invalid parallelism"
599                                            );
600                                        },
601                                        PushResult::RejectedDuplicate => {
602                                            tracing::error!(
603                                                task_id = task_id,
604                                                plan_index = meta.plan_index,
605                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
606                                            );
607                                        }
608                                    }
609                                }
610
611                                tracing::info!(
612                                    task_id = task_id,
613                                    total_plans = total_plans,
614                                    enqueued_count = enqueued_count,
615                                    "Enqueued {} of {} Iceberg plan runners",
616                                    enqueued_count,
617                                    total_plans
618                                );
619                            },
620                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
621                                // set flag
622                                pull_task_ack = true;
623                            },
624                        }
625                    }
626                    Some(Err(e)) => {
627                        tracing::warn!("Failed to consume stream. {}", e.message());
628                        continue 'start_stream;
629                    }
630                    _ => {
631                        // The stream is exhausted
632                        continue 'start_stream;
633                    }
634                }
635            }
636        }
637    });
638
639    (join_handle, shutdown_tx)
640}
641
642/// The background compaction thread that receives compaction tasks from hummock compaction
643/// manager and runs compaction tasks.
644#[must_use]
645pub fn start_compactor(
646    compactor_context: CompactorContext,
647    hummock_meta_client: Arc<dyn HummockMetaClient>,
648    object_id_manager: Arc<ObjectIdManager>,
649    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
650) -> (JoinHandle<()>, Sender<()>) {
651    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
652    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
653    let stream_retry_interval = Duration::from_secs(30);
654    let task_progress = compactor_context.task_progress_manager.clone();
655    let periodic_event_update_interval = Duration::from_millis(1000);
656
657    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
658        * compactor_context.storage_opts.compactor_max_task_multiplier)
659        .ceil() as u32;
660    let running_task_parallelism = Arc::new(AtomicU32::new(0));
661
662    const MAX_PULL_TASK_COUNT: u32 = 4;
663    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
664
665    assert_ge!(
666        compactor_context.storage_opts.compactor_max_task_multiplier,
667        0.0
668    );
669
670    let join_handle = tokio::spawn(async move {
671        let shutdown_map = CompactionShutdownMap::default();
672        let mut min_interval = tokio::time::interval(stream_retry_interval);
673        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
674
675        // Track last logged state to avoid duplicate logs
676        let mut log_throttler =
677            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
678
679        // This outer loop is to recreate stream.
680        'start_stream: loop {
681            // reset state
682            // pull_task_ack.store(true, Ordering::SeqCst);
683            let mut pull_task_ack = true;
684            tokio::select! {
685                // Wait for interval.
686                _ = min_interval.tick() => {},
687                // Shutdown compactor.
688                _ = &mut shutdown_rx => {
689                    tracing::info!("Compactor is shutting down");
690                    return;
691                }
692            }
693
694            let (request_sender, response_event_stream) =
695                match hummock_meta_client.subscribe_compaction_event().await {
696                    Ok((request_sender, response_event_stream)) => {
697                        tracing::debug!("Succeeded subscribe_compaction_event.");
698                        (request_sender, response_event_stream)
699                    }
700
701                    Err(e) => {
702                        tracing::warn!(
703                            error = %e.as_report(),
704                            "Subscribing to compaction tasks failed with error. Will retry.",
705                        );
706                        continue 'start_stream;
707                    }
708                };
709
710            pin_mut!(response_event_stream);
711
712            let executor = compactor_context.compaction_executor.clone();
713            let object_id_manager = object_id_manager.clone();
714
715            // This inner loop is to consume stream or report task progress.
716            let mut event_loop_iteration_now = Instant::now();
717            'consume_stream: loop {
718                {
719                    // report
720                    compactor_context
721                        .compactor_metrics
722                        .compaction_event_loop_iteration_latency
723                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
724                    event_loop_iteration_now = Instant::now();
725                }
726
727                let running_task_parallelism = running_task_parallelism.clone();
728                let request_sender = request_sender.clone();
729                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
730                    _ = periodic_event_interval.tick() => {
731                        let progress_list = get_task_progress(task_progress.clone());
732
733                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
734                            event: Some(RequestEvent::HeartBeat(
735                                HeartBeat {
736                                    progress: progress_list
737                                }
738                            )),
739                            create_at: SystemTime::now()
740                                .duration_since(std::time::UNIX_EPOCH)
741                                .expect("Clock may have gone backwards")
742                                .as_millis() as u64,
743                        }) {
744                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
745                            // re subscribe stream
746                            continue 'start_stream;
747                        }
748
749
750                        let mut pending_pull_task_count = 0;
751                        if pull_task_ack {
752                            // TODO: Compute parallelism on meta side
753                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
754
755                            if pending_pull_task_count > 0 {
756                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
757                                    event: Some(RequestEvent::PullTask(
758                                        PullTask {
759                                            pull_task_count: pending_pull_task_count,
760                                        }
761                                    )),
762                                    create_at: SystemTime::now()
763                                        .duration_since(std::time::UNIX_EPOCH)
764                                        .expect("Clock may have gone backwards")
765                                        .as_millis() as u64,
766                                }) {
767                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
768
769                                    // re subscribe stream
770                                    continue 'start_stream;
771                                } else {
772                                    pull_task_ack = false;
773                                }
774                            }
775                        }
776
777                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
778                        let current_state = CompactionLogState {
779                            running_parallelism: running_count,
780                            pull_task_ack,
781                            pending_pull_task_count,
782                        };
783
784                        // Log only when state changes or periodically as heartbeat
785                        if log_throttler.should_log(&current_state) {
786                            tracing::info!(
787                                running_parallelism_count = %current_state.running_parallelism,
788                                pull_task_ack = %current_state.pull_task_ack,
789                                pending_pull_task_count = %current_state.pending_pull_task_count
790                            );
791                            log_throttler.update(current_state);
792                        }
793
794                        continue;
795                    }
796                    event = response_event_stream.next() => {
797                        event
798                    }
799
800                    _ = &mut shutdown_rx => {
801                        tracing::info!("Compactor is shutting down");
802                        return
803                    }
804                };
805
806                fn send_report_task_event(
807                    compact_task: &CompactTask,
808                    table_stats: TableStatsMap,
809                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
810                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
811                ) {
812                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
813                        event: Some(RequestEvent::ReportTask(ReportTask {
814                            task_id: compact_task.task_id,
815                            task_status: compact_task.task_status.into(),
816                            sorted_output_ssts: compact_task
817                                .sorted_output_ssts
818                                .iter()
819                                .map(|sst| sst.into())
820                                .collect(),
821                            table_stats_change: to_prost_table_stats_map(table_stats),
822                            object_timestamps: object_timestamps
823                                .into_iter()
824                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
825                                .collect(),
826                        })),
827                        create_at: SystemTime::now()
828                            .duration_since(std::time::UNIX_EPOCH)
829                            .expect("Clock may have gone backwards")
830                            .as_millis() as u64,
831                    }) {
832                        let task_id = compact_task.task_id;
833                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
834                    }
835                }
836
837                match event {
838                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
839                        let event = match event {
840                            Some(event) => event,
841                            None => continue 'consume_stream,
842                        };
843                        let shutdown = shutdown_map.clone();
844                        let context = compactor_context.clone();
845                        let consumed_latency_ms = SystemTime::now()
846                            .duration_since(std::time::UNIX_EPOCH)
847                            .expect("Clock may have gone backwards")
848                            .as_millis() as u64
849                            - create_at;
850                        context
851                            .compactor_metrics
852                            .compaction_event_consumed_latency
853                            .observe(consumed_latency_ms as _);
854
855                        let object_id_manager = object_id_manager.clone();
856                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
857
858                        match event {
859                            ResponseEvent::CompactTask(compact_task) => {
860                                let compact_task = CompactTask::from(compact_task);
861                                let parallelism =
862                                    calculate_task_parallelism(&compact_task, &context);
863
864                                assert_ne!(parallelism, 0, "splits cannot be empty");
865
866                                if (max_task_parallelism
867                                    - running_task_parallelism.load(Ordering::SeqCst))
868                                    < parallelism as u32
869                                {
870                                    tracing::warn!(
871                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
872                                        compact_task.task_id,
873                                        parallelism,
874                                        max_task_parallelism,
875                                        running_task_parallelism.load(Ordering::Relaxed),
876                                    );
877                                    let (compact_task, table_stats, object_timestamps) =
878                                        compact_done(
879                                            compact_task,
880                                            context.clone(),
881                                            vec![],
882                                            TaskStatus::NoAvailCpuResourceCanceled,
883                                        );
884
885                                    send_report_task_event(
886                                        &compact_task,
887                                        table_stats,
888                                        object_timestamps,
889                                        &request_sender,
890                                    );
891
892                                    continue 'consume_stream;
893                                }
894
895                                running_task_parallelism
896                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
897                                executor.spawn(async move {
898                                    let (tx, rx) = tokio::sync::oneshot::channel();
899                                    let task_id = compact_task.task_id;
900                                    shutdown.lock().unwrap().insert(task_id, tx);
901
902                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
903                                        context.clone(),
904                                        compact_task,
905                                        rx,
906                                        object_id_manager.clone(),
907                                        compaction_catalog_manager_ref.clone(),
908                                    )
909                                    .await;
910
911                                    shutdown.lock().unwrap().remove(&task_id);
912                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
913
914                                    send_report_task_event(
915                                        &compact_task,
916                                        table_stats,
917                                        object_timestamps,
918                                        &request_sender,
919                                    );
920
921                                    let enable_check_compaction_result =
922                                    context.storage_opts.check_compaction_result;
923                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
924
925                                    if enable_check_compaction_result && need_check_task {
926                                        let compact_table_ids = compact_task.build_compact_table_ids();
927                                        match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
928                                            Ok(compaction_catalog_agent_ref) =>  {
929                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
930                                                {
931                                                    Err(e) => {
932                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
933                                                    }
934                                                    Ok(true) => (),
935                                                    Ok(false) => {
936                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
937                                                    }
938                                                }
939                                            },
940                                            Err(e) => {
941                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
942                                            }
943                                        }
944                                    }
945                                });
946                            }
947                            ResponseEvent::VacuumTask(_) => {
948                                unreachable!("unexpected vacuum task");
949                            }
950                            ResponseEvent::FullScanTask(_) => {
951                                unreachable!("unexpected scan task");
952                            }
953                            ResponseEvent::ValidationTask(validation_task) => {
954                                let validation_task = ValidationTask::from(validation_task);
955                                executor.spawn(async move {
956                                    validate_ssts(validation_task, context.sstable_store.clone())
957                                        .await;
958                                });
959                            }
960                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
961                                .lock()
962                                .unwrap()
963                                .remove(&cancel_compact_task.task_id)
964                            {
965                                Some(tx) => {
966                                    if tx.send(()).is_err() {
967                                        tracing::warn!(
968                                            "Cancellation of compaction task failed. task_id: {}",
969                                            cancel_compact_task.task_id
970                                        );
971                                    }
972                                }
973                                _ => {
974                                    tracing::warn!(
975                                        "Attempting to cancel non-existent compaction task. task_id: {}",
976                                        cancel_compact_task.task_id
977                                    );
978                                }
979                            },
980
981                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
982                                // set flag
983                                pull_task_ack = true;
984                            }
985                        }
986                    }
987                    Some(Err(e)) => {
988                        tracing::warn!("Failed to consume stream. {}", e.message());
989                        continue 'start_stream;
990                    }
991                    _ => {
992                        // The stream is exhausted
993                        continue 'start_stream;
994                    }
995                }
996            }
997        }
998    });
999
1000    (join_handle, shutdown_tx)
1001}
1002
1003/// The background compaction thread that receives compaction tasks from hummock compaction
1004/// manager and runs compaction tasks.
1005#[must_use]
1006pub fn start_shared_compactor(
1007    grpc_proxy_client: GrpcCompactorProxyClient,
1008    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1009    context: CompactorContext,
1010) -> (JoinHandle<()>, Sender<()>) {
1011    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1012    let task_progress = context.task_progress_manager.clone();
1013    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1014    let periodic_event_update_interval = Duration::from_millis(1000);
1015
1016    let join_handle = tokio::spawn(async move {
1017        let shutdown_map = CompactionShutdownMap::default();
1018
1019        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1020        let executor = context.compaction_executor.clone();
1021        let report_heartbeat_client = grpc_proxy_client.clone();
1022        'consume_stream: loop {
1023            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1024                _ = periodic_event_interval.tick() => {
1025                    let progress_list = get_task_progress(task_progress.clone());
1026                    let report_compaction_task_request = ReportCompactionTaskRequest{
1027                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1028                            SharedHeartBeat {
1029                                progress: progress_list
1030                            }
1031                        )),
1032                     };
1033                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1034                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1035                    }
1036                    continue
1037                }
1038
1039
1040                _ = &mut shutdown_rx => {
1041                    tracing::info!("Compactor is shutting down");
1042                    return
1043                }
1044
1045                request = receiver.recv() => {
1046                    request
1047                }
1048
1049            };
1050            match request {
1051                Some(request) => {
1052                    let context = context.clone();
1053                    let shutdown = shutdown_map.clone();
1054
1055                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1056                    executor.spawn(async move {
1057                        let DispatchCompactionTaskRequest {
1058                            tables,
1059                            output_object_ids,
1060                            task: dispatch_task,
1061                        } = request.into_inner();
1062                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1063                            acc.insert(table.id, table);
1064                            acc
1065                        });
1066
1067                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1068                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1069                        let shared_compactor_object_id_manager =
1070                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1071                            match dispatch_task.unwrap() {
1072                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1073                                    let compact_task = CompactTask::from(&compact_task);
1074                                    let (tx, rx) = tokio::sync::oneshot::channel();
1075                                    let task_id = compact_task.task_id;
1076                                    shutdown.lock().unwrap().insert(task_id, tx);
1077
1078                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1079                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1080                                        context.clone(),
1081                                        compact_task,
1082                                        rx,
1083                                        shared_compactor_object_id_manager,
1084                                        compaction_catalog_agent_ref.clone(),
1085                                    )
1086                                    .await;
1087                                    shutdown.lock().unwrap().remove(&task_id);
1088                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1089                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1090                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1091                                            table_stats_change: to_prost_table_stats_map(table_stats),
1092                                            object_timestamps: object_timestamps
1093                                            .into_iter()
1094                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1095                                            .collect(),
1096                                    })),
1097                                    };
1098
1099                                    match cloned_grpc_proxy_client
1100                                        .report_compaction_task(report_compaction_task_request)
1101                                        .await
1102                                    {
1103                                        Ok(_) => {
1104                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1105                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1106                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1107                                            if enable_check_compaction_result && need_check_task {
1108                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1109                                                    Err(e) => {
1110                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1111                                                    },
1112                                                    Ok(true) => (),
1113                                                    Ok(false) => {
1114                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1115                                                    }
1116                                                }
1117                                            }
1118                                        }
1119                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1120                                    }
1121
1122                                }
1123                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1124                                    unreachable!("unexpected vacuum task");
1125                                }
1126                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1127                                    unreachable!("unexpected scan task");
1128                                }
1129                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1130                                    let validation_task = ValidationTask::from(validation_task);
1131                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1132                                }
1133                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1134                                    match shutdown
1135                                        .lock()
1136                                        .unwrap()
1137                                        .remove(&cancel_compact_task.task_id)
1138                                    { Some(tx) => {
1139                                        if tx.send(()).is_err() {
1140                                            tracing::warn!(
1141                                                "Cancellation of compaction task failed. task_id: {}",
1142                                                cancel_compact_task.task_id
1143                                            );
1144                                        }
1145                                    } _ => {
1146                                        tracing::warn!(
1147                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1148                                            cancel_compact_task.task_id
1149                                        );
1150                                    }}
1151                                }
1152                            }
1153                    });
1154                }
1155                None => continue 'consume_stream,
1156            }
1157        }
1158    });
1159    (join_handle, shutdown_tx)
1160}
1161
1162fn get_task_progress(
1163    task_progress: Arc<
1164        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1165    >,
1166) -> Vec<CompactTaskProgress> {
1167    let mut progress_list = Vec::new();
1168    for (&task_id, progress) in &*task_progress.lock() {
1169        progress_list.push(progress.snapshot(task_id));
1170    }
1171    progress_list
1172}
1173
1174/// Schedule queued tasks if we have capacity
1175fn schedule_queued_tasks(
1176    task_queue: &mut IcebergTaskQueue,
1177    compactor_context: &CompactorContext,
1178    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1179    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1180) {
1181    while let Some(popped_task) = task_queue.pop() {
1182        let task_id = popped_task.meta.task_id;
1183        let plan_index = popped_task.meta.plan_index;
1184        let task_key = (task_id, plan_index);
1185
1186        // Get unique_ident before moving runner
1187        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1188
1189        let Some(runner) = popped_task.runner else {
1190            tracing::error!(
1191                task_id = task_id,
1192                plan_index = plan_index,
1193                "Popped task missing runner - this should not happen"
1194            );
1195            task_queue.finish_running(task_key);
1196            continue;
1197        };
1198
1199        let executor = compactor_context.compaction_executor.clone();
1200        let shutdown_map_clone = shutdown_map.clone();
1201        let completion_tx_clone = task_completion_tx.clone();
1202
1203        tracing::info!(
1204            task_id = task_id,
1205            plan_index = plan_index,
1206            unique_ident = ?unique_ident,
1207            required_parallelism = popped_task.meta.required_parallelism,
1208            "Starting iceberg compaction task from queue"
1209        );
1210
1211        executor.spawn(async move {
1212            let (tx, rx) = tokio::sync::oneshot::channel();
1213            {
1214                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1215                shutdown_guard.insert(task_key, tx);
1216            }
1217
1218            let _cleanup_guard = scopeguard::guard(
1219                (task_key, shutdown_map_clone, completion_tx_clone),
1220                move |(task_key, shutdown_map, completion_tx)| {
1221                    {
1222                        let mut shutdown_guard = shutdown_map.lock().unwrap();
1223                        shutdown_guard.remove(&task_key);
1224                    }
1225                    // Notify main loop that task is completed
1226                    // Multiple tasks can send completion notifications concurrently via mpsc
1227                    if completion_tx.send(task_key).is_err() {
1228                        tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1229                    }
1230                },
1231            );
1232
1233            if let Err(e) = runner.compact(rx).await {
1234                tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1235            }
1236        });
1237    }
1238}
1239
1240/// Handle pulling new tasks from meta service
1241/// Returns true if the stream should be restarted
1242fn handle_meta_task_pulling(
1243    pull_task_ack: &mut bool,
1244    task_queue: &IcebergTaskQueue,
1245    max_task_parallelism: u32,
1246    max_pull_task_count: u32,
1247    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1248    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1249) -> bool {
1250    let mut pending_pull_task_count = 0;
1251    if *pull_task_ack {
1252        // Use queue's running parallelism for pull decision
1253        let current_running_parallelism = task_queue.running_parallelism_sum();
1254        pending_pull_task_count =
1255            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1256
1257        if pending_pull_task_count > 0 {
1258            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1259                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1260                    subscribe_iceberg_compaction_event_request::PullTask {
1261                        pull_task_count: pending_pull_task_count,
1262                    },
1263                )),
1264                create_at: SystemTime::now()
1265                    .duration_since(std::time::UNIX_EPOCH)
1266                    .expect("Clock may have gone backwards")
1267                    .as_millis() as u64,
1268            }) {
1269                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1270                return true; // Signal to restart stream
1271            } else {
1272                *pull_task_ack = false;
1273            }
1274        }
1275    }
1276
1277    let running_count = task_queue.running_parallelism_sum();
1278    let waiting_count = task_queue.waiting_parallelism_sum();
1279    let available_count = max_task_parallelism.saturating_sub(running_count);
1280    let current_state = IcebergCompactionLogState {
1281        running_parallelism: running_count,
1282        waiting_parallelism: waiting_count,
1283        available_parallelism: available_count,
1284        pull_task_ack: *pull_task_ack,
1285        pending_pull_task_count,
1286    };
1287
1288    // Log only when state changes or periodically as heartbeat
1289    if log_throttler.should_log(&current_state) {
1290        tracing::info!(
1291            running_parallelism_count = %current_state.running_parallelism,
1292            waiting_parallelism_count = %current_state.waiting_parallelism,
1293            available_parallelism = %current_state.available_parallelism,
1294            pull_task_ack = %current_state.pull_task_ack,
1295            pending_pull_task_count = %current_state.pending_pull_task_count
1296        );
1297        log_throttler.update(current_state);
1298    }
1299
1300    false // No need to restart stream
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use super::*;
1306
1307    #[test]
1308    fn test_log_state_equality() {
1309        // Test CompactionLogState
1310        let state1 = CompactionLogState {
1311            running_parallelism: 10,
1312            pull_task_ack: true,
1313            pending_pull_task_count: 2,
1314        };
1315        let state2 = CompactionLogState {
1316            running_parallelism: 10,
1317            pull_task_ack: true,
1318            pending_pull_task_count: 2,
1319        };
1320        let state3 = CompactionLogState {
1321            running_parallelism: 11,
1322            pull_task_ack: true,
1323            pending_pull_task_count: 2,
1324        };
1325        assert_eq!(state1, state2);
1326        assert_ne!(state1, state3);
1327
1328        // Test IcebergCompactionLogState
1329        let ice_state1 = IcebergCompactionLogState {
1330            running_parallelism: 10,
1331            waiting_parallelism: 5,
1332            available_parallelism: 15,
1333            pull_task_ack: true,
1334            pending_pull_task_count: 2,
1335        };
1336        let ice_state2 = IcebergCompactionLogState {
1337            running_parallelism: 10,
1338            waiting_parallelism: 6,
1339            available_parallelism: 15,
1340            pull_task_ack: true,
1341            pending_pull_task_count: 2,
1342        };
1343        assert_ne!(ice_state1, ice_state2);
1344    }
1345
1346    #[test]
1347    fn test_log_throttler_state_change_detection() {
1348        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1349        let state1 = CompactionLogState {
1350            running_parallelism: 10,
1351            pull_task_ack: true,
1352            pending_pull_task_count: 2,
1353        };
1354        let state2 = CompactionLogState {
1355            running_parallelism: 11,
1356            pull_task_ack: true,
1357            pending_pull_task_count: 2,
1358        };
1359
1360        // First call should always log
1361        assert!(throttler.should_log(&state1));
1362        throttler.update(state1.clone());
1363
1364        // Same state should not log
1365        assert!(!throttler.should_log(&state1));
1366
1367        // Changed state should log
1368        assert!(throttler.should_log(&state2));
1369        throttler.update(state2.clone());
1370
1371        // Same state again should not log
1372        assert!(!throttler.should_log(&state2));
1373    }
1374
1375    #[test]
1376    fn test_log_throttler_heartbeat() {
1377        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1378        let state = CompactionLogState {
1379            running_parallelism: 10,
1380            pull_task_ack: true,
1381            pending_pull_task_count: 2,
1382        };
1383
1384        // First call should log
1385        assert!(throttler.should_log(&state));
1386        throttler.update(state.clone());
1387
1388        // Same state immediately should not log
1389        assert!(!throttler.should_log(&state));
1390
1391        // Wait for heartbeat interval to pass
1392        std::thread::sleep(Duration::from_millis(15));
1393
1394        // Same state after interval should log (heartbeat)
1395        assert!(throttler.should_log(&state));
1396    }
1397}