risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26    ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30    subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54    TtlCompactionFilter,
55};
56pub use context::{
57    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60// Import iceberg compactor runner types from the local `iceberg_compaction` module.
61use iceberg_compaction::iceberg_compactor_runner::{
62    IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78    SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::iterator::{Forward, HummockIterator};
101use crate::hummock::{
102    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
103    UnifiedSstableWriterFactory, validate_ssts,
104};
105use crate::monitor::CompactorMetrics;
106
107/// Heartbeat logging interval for compaction tasks
108const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
109
110/// Represents the compaction task state for logging purposes
111#[derive(Debug, Clone, PartialEq, Eq)]
112struct CompactionLogState {
113    running_parallelism: u32,
114    pull_task_ack: bool,
115    pending_pull_task_count: u32,
116}
117
118/// Represents the iceberg compaction task state for logging purposes
119#[derive(Debug, Clone, PartialEq, Eq)]
120struct IcebergCompactionLogState {
121    running_parallelism: u32,
122    waiting_parallelism: u32,
123    available_parallelism: u32,
124    pull_task_ack: bool,
125    pending_pull_task_count: u32,
126}
127
128/// Controls periodic logging with state change detection
129struct LogThrottler<T: PartialEq> {
130    last_logged_state: Option<T>,
131    last_heartbeat: Instant,
132    heartbeat_interval: Duration,
133}
134
135impl<T: PartialEq> LogThrottler<T> {
136    fn new(heartbeat_interval: Duration) -> Self {
137        Self {
138            last_logged_state: None,
139            last_heartbeat: Instant::now(),
140            heartbeat_interval,
141        }
142    }
143
144    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
145    fn should_log(&self, current_state: &T) -> bool {
146        self.last_logged_state.as_ref() != Some(current_state)
147            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
148    }
149
150    /// Updates the state and heartbeat timestamp after logging
151    fn update(&mut self, current_state: T) {
152        self.last_logged_state = Some(current_state);
153        self.last_heartbeat = Instant::now();
154    }
155}
156
157/// Implementation of Hummock compaction.
158pub struct Compactor {
159    /// The context of the compactor.
160    context: CompactorContext,
161    object_id_getter: Arc<dyn GetObjectId>,
162    task_config: TaskConfig,
163    options: SstableBuilderOptions,
164    get_id_time: Arc<AtomicU64>,
165}
166
167pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
168
169impl Compactor {
170    /// Create a new compactor.
171    pub fn new(
172        context: CompactorContext,
173        options: SstableBuilderOptions,
174        task_config: TaskConfig,
175        object_id_getter: Arc<dyn GetObjectId>,
176    ) -> Self {
177        Self {
178            context,
179            options,
180            task_config,
181            get_id_time: Arc::new(AtomicU64::new(0)),
182            object_id_getter,
183        }
184    }
185
186    /// Compact the given key range and merge iterator.
187    /// Upon a successful return, the built SSTs are already uploaded to object store.
188    ///
189    /// `task_progress` is only used for tasks on the compactor.
190    async fn compact_key_range(
191        &self,
192        iter: impl HummockIterator<Direction = Forward>,
193        compaction_filter: impl CompactionFilter,
194        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
195        task_progress: Option<Arc<TaskProgress>>,
196        task_id: Option<HummockCompactionTaskId>,
197        split_index: Option<usize>,
198    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
199        // Monitor time cost building shared buffer to SSTs.
200        let compact_timer = if self.context.is_share_buffer_compact {
201            self.context
202                .compactor_metrics
203                .write_build_l0_sst_duration
204                .start_timer()
205        } else {
206            self.context
207                .compactor_metrics
208                .compact_sst_duration
209                .start_timer()
210        };
211
212        let (split_table_outputs, table_stats_map) = {
213            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
214            if self.task_config.use_block_based_filter {
215                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
216                    factory,
217                    iter,
218                    compaction_filter,
219                    compaction_catalog_agent_ref,
220                    task_progress.clone(),
221                    self.object_id_getter.clone(),
222                )
223                .instrument_await("compact".verbose())
224                .await?
225            } else {
226                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
227                    factory,
228                    iter,
229                    compaction_filter,
230                    compaction_catalog_agent_ref,
231                    task_progress.clone(),
232                    self.object_id_getter.clone(),
233                )
234                .instrument_await("compact".verbose())
235                .await?
236            }
237        };
238
239        compact_timer.observe_duration();
240
241        Self::report_progress(
242            self.context.compactor_metrics.clone(),
243            task_progress,
244            &split_table_outputs,
245            self.context.is_share_buffer_compact,
246        );
247
248        self.context
249            .compactor_metrics
250            .get_table_id_total_time_duration
251            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
252
253        debug_assert!(
254            split_table_outputs
255                .iter()
256                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
257        );
258
259        if task_id.is_some() {
260            // skip shared buffer compaction
261            tracing::info!(
262                "Finish Task {:?} split_index {:?} sst count {}",
263                task_id,
264                split_index,
265                split_table_outputs.len()
266            );
267        }
268        Ok((split_table_outputs, table_stats_map))
269    }
270
271    pub fn report_progress(
272        metrics: Arc<CompactorMetrics>,
273        task_progress: Option<Arc<TaskProgress>>,
274        ssts: &Vec<LocalSstableInfo>,
275        is_share_buffer_compact: bool,
276    ) {
277        for sst_info in ssts {
278            let sst_size = sst_info.file_size();
279            if let Some(tracker) = &task_progress {
280                tracker.inc_ssts_uploaded();
281                tracker.dec_num_pending_write_io();
282            }
283            if is_share_buffer_compact {
284                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
285            } else {
286                metrics.compaction_upload_sst_counts.inc();
287            }
288        }
289    }
290
291    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
292        &self,
293        writer_factory: F,
294        iter: impl HummockIterator<Direction = Forward>,
295        compaction_filter: impl CompactionFilter,
296        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
297        task_progress: Option<Arc<TaskProgress>>,
298        object_id_getter: Arc<dyn GetObjectId>,
299    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
300        let builder_factory = RemoteBuilderFactory::<F, B> {
301            object_id_getter,
302            limiter: self.context.memory_limiter.clone(),
303            options: self.options.clone(),
304            policy: self.task_config.cache_policy,
305            remote_rpc_cost: self.get_id_time.clone(),
306            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
307            sstable_writer_factory: writer_factory,
308            _phantom: PhantomData,
309        };
310
311        let mut sst_builder = CapacitySplitTableBuilder::new(
312            builder_factory,
313            self.context.compactor_metrics.clone(),
314            task_progress.clone(),
315            self.task_config.table_vnode_partition.clone(),
316            self.context
317                .storage_opts
318                .compactor_concurrent_uploading_sst_count,
319            compaction_catalog_agent_ref,
320        );
321        let compaction_statistics = compact_and_build_sst(
322            &mut sst_builder,
323            &self.task_config,
324            self.context.compactor_metrics.clone(),
325            iter,
326            compaction_filter,
327        )
328        .instrument_await("compact_and_build_sst".verbose())
329        .await?;
330
331        let ssts = sst_builder
332            .finish()
333            .instrument_await("builder_finish".verbose())
334            .await?;
335
336        Ok((ssts, compaction_statistics))
337    }
338}
339
340/// The background compaction thread that receives compaction tasks from hummock compaction
341/// manager and runs compaction tasks.
342#[must_use]
343pub fn start_iceberg_compactor(
344    compactor_context: CompactorContext,
345    hummock_meta_client: Arc<dyn HummockMetaClient>,
346) -> (JoinHandle<()>, Sender<()>) {
347    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
348    let stream_retry_interval = Duration::from_secs(30);
349    let periodic_event_update_interval = Duration::from_millis(1000);
350    let worker_num = compactor_context.compaction_executor.worker_num();
351
352    let max_task_parallelism: u32 = (worker_num as f32
353        * compactor_context.storage_opts.compactor_max_task_multiplier)
354        .ceil() as u32;
355
356    const MAX_PULL_TASK_COUNT: u32 = 4;
357    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
358
359    assert_ge!(
360        compactor_context.storage_opts.compactor_max_task_multiplier,
361        0.0
362    );
363
364    let join_handle = tokio::spawn(async move {
365        // Initialize task queue with event-driven scheduling using Notify
366        let pending_parallelism_budget = (max_task_parallelism as f32
367            * compactor_context
368                .storage_opts
369                .iceberg_compaction_pending_parallelism_budget_multiplier)
370            .ceil() as u32;
371        let (mut task_queue, _schedule_notify) =
372            IcebergTaskQueue::new_with_notify(max_task_parallelism, pending_parallelism_budget);
373
374        // Shutdown tracking for running tasks (task_id -> shutdown_sender)
375        let shutdown_map = Arc::new(Mutex::new(
376            HashMap::<u64, tokio::sync::oneshot::Sender<()>>::new(),
377        ));
378
379        // Channel for task completion notifications
380        let (task_completion_tx, mut task_completion_rx) =
381            tokio::sync::mpsc::unbounded_channel::<u64>();
382
383        let mut min_interval = tokio::time::interval(stream_retry_interval);
384        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
385
386        // Track last logged state to avoid duplicate logs
387        let mut log_throttler =
388            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
389
390        // This outer loop is to recreate stream.
391        'start_stream: loop {
392            // reset state
393            // pull_task_ack.store(true, Ordering::SeqCst);
394            let mut pull_task_ack = true;
395            tokio::select! {
396                // Wait for interval.
397                _ = min_interval.tick() => {},
398                // Shutdown compactor.
399                _ = &mut shutdown_rx => {
400                    tracing::info!("Compactor is shutting down");
401                    return;
402                }
403            }
404
405            let (request_sender, response_event_stream) = match hummock_meta_client
406                .subscribe_iceberg_compaction_event()
407                .await
408            {
409                Ok((request_sender, response_event_stream)) => {
410                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
411                    (request_sender, response_event_stream)
412                }
413
414                Err(e) => {
415                    tracing::warn!(
416                        error = %e.as_report(),
417                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
418                    );
419                    continue 'start_stream;
420                }
421            };
422
423            pin_mut!(response_event_stream);
424
425            let _executor = compactor_context.compaction_executor.clone();
426
427            // This inner loop is to consume stream or report task progress.
428            let mut event_loop_iteration_now = Instant::now();
429            'consume_stream: loop {
430                {
431                    // report
432                    compactor_context
433                        .compactor_metrics
434                        .compaction_event_loop_iteration_latency
435                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
436                    event_loop_iteration_now = Instant::now();
437                }
438
439                let request_sender = request_sender.clone();
440                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
441                    // Handle task completion notifications
442                    Some(completed_task_id) = task_completion_rx.recv() => {
443                        tracing::debug!(task_id = completed_task_id, "Task completed, updating queue state");
444                        task_queue.finish_running(completed_task_id);
445                        continue 'consume_stream;
446                    }
447
448                    // Event-driven task scheduling - wait for tasks to become schedulable
449                    _ = task_queue.wait_schedulable() => {
450                        schedule_queued_tasks(
451                            &mut task_queue,
452                            &compactor_context,
453                            &shutdown_map,
454                            &task_completion_tx,
455                        );
456                        continue 'consume_stream;
457                    }
458
459                    _ = periodic_event_interval.tick() => {
460                        // Only handle meta task pulling in periodic tick
461                        let should_restart_stream = handle_meta_task_pulling(
462                            &mut pull_task_ack,
463                            &task_queue,
464                            max_task_parallelism,
465                            max_pull_task_count,
466                            &request_sender,
467                            &mut log_throttler,
468                        );
469
470                        if should_restart_stream {
471                            continue 'start_stream;
472                        }
473                        continue;
474                    }
475                    event = response_event_stream.next() => {
476                        event
477                    }
478
479                    _ = &mut shutdown_rx => {
480                        tracing::info!("Iceberg Compactor is shutting down");
481                        return
482                    }
483                };
484
485                match event {
486                    Some(Ok(SubscribeIcebergCompactionEventResponse {
487                        event,
488                        create_at: _create_at,
489                    })) => {
490                        let event = match event {
491                            Some(event) => event,
492                            None => continue 'consume_stream,
493                        };
494
495                        match event {
496                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
497                                let task_id = iceberg_compaction_task.task_id;
498                                 let write_parquet_properties = WriterProperties::builder()
499                                        .set_created_by(concat!(
500                                            "risingwave version ",
501                                            env!("CARGO_PKG_VERSION")
502                                        )
503                                        .to_owned())
504                                        .set_max_row_group_size(
505                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
506                                        )
507                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
508                                        .build();
509
510                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
511                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
512                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
513                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
514                                    .target_file_size_bytes(compactor_context.storage_opts.iceberg_compaction_target_file_size_mb as u64 * 1024 * 1024)
515                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
516                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
517                                    .write_parquet_properties(write_parquet_properties)
518                                    .small_file_threshold(compactor_context.storage_opts.iceberg_compaction_small_file_threshold_mb as u64 * 1024 * 1024)
519                                    .max_task_total_size(
520                                        compactor_context.storage_opts.iceberg_compaction_max_task_total_size_mb as u64 * 1024 * 1024,
521                                    )
522                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
523                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
524                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
525                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
526                                    .build() {
527                                    Ok(config) => config,
528                                    Err(e) => {
529                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
530                                        continue 'consume_stream;
531                                    }
532                                };
533
534                                // Create multiple plan runners from the task
535                                let plan_runners = match create_plan_runners(
536                                    iceberg_compaction_task,
537                                    compactor_runner_config,
538                                    compactor_context.compactor_metrics.clone(),
539                                ).await {
540                                    Ok(runners) => runners,
541                                    Err(e) => {
542                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
543                                        continue 'consume_stream;
544                                    }
545                                };
546
547                                if plan_runners.is_empty() {
548                                    tracing::info!(task_id, "No plans to execute");
549                                    continue 'consume_stream;
550                                }
551
552                                // Enqueue each plan runner independently
553                                let total_plans = plan_runners.len();
554                                let mut enqueued_count = 0;
555
556                                for runner in plan_runners {
557                                    let meta = runner.to_meta();
558                                    let required_parallelism = runner.required_parallelism();
559                                    let push_result = task_queue.push(meta.clone(), Some(runner));
560
561                                    match push_result {
562                                        PushResult::Added => {
563                                            enqueued_count += 1;
564                                            tracing::debug!(
565                                                task_id = task_id,
566                                                plan_index = enqueued_count - 1,
567                                                required_parallelism = required_parallelism,
568                                                "Iceberg plan runner added to queue"
569                                            );
570                                        },
571                                        PushResult::RejectedCapacity => {
572                                            tracing::warn!(
573                                                task_id = task_id,
574                                                required_parallelism = required_parallelism,
575                                                pending_budget = pending_parallelism_budget,
576                                                enqueued_count = enqueued_count,
577                                                total_plans = total_plans,
578                                                "Iceberg plan runner rejected - queue capacity exceeded"
579                                            );
580                                            // Stop enqueuing remaining plans
581                                            break;
582                                        },
583                                        PushResult::RejectedTooLarge => {
584                                            tracing::error!(
585                                                task_id = task_id,
586                                                required_parallelism = required_parallelism,
587                                                max_parallelism = max_task_parallelism,
588                                                "Iceberg plan runner rejected - parallelism exceeds max"
589                                            );
590                                        },
591                                        PushResult::RejectedInvalidParallelism => {
592                                            tracing::error!(
593                                                task_id = task_id,
594                                                required_parallelism = required_parallelism,
595                                                "Iceberg plan runner rejected - invalid parallelism"
596                                            );
597                                        }
598                                    }
599                                }
600
601                                tracing::info!(
602                                    task_id = task_id,
603                                    total_plans = total_plans,
604                                    enqueued_count = enqueued_count,
605                                    "Enqueued {} of {} Iceberg plan runners",
606                                    enqueued_count,
607                                    total_plans
608                                );
609                            },
610                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
611                                // set flag
612                                pull_task_ack = true;
613                            },
614                        }
615                    }
616                    Some(Err(e)) => {
617                        tracing::warn!("Failed to consume stream. {}", e.message());
618                        continue 'start_stream;
619                    }
620                    _ => {
621                        // The stream is exhausted
622                        continue 'start_stream;
623                    }
624                }
625            }
626        }
627    });
628
629    (join_handle, shutdown_tx)
630}
631
632/// The background compaction thread that receives compaction tasks from hummock compaction
633/// manager and runs compaction tasks.
634#[must_use]
635pub fn start_compactor(
636    compactor_context: CompactorContext,
637    hummock_meta_client: Arc<dyn HummockMetaClient>,
638    object_id_manager: Arc<ObjectIdManager>,
639    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
640) -> (JoinHandle<()>, Sender<()>) {
641    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
642    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
643    let stream_retry_interval = Duration::from_secs(30);
644    let task_progress = compactor_context.task_progress_manager.clone();
645    let periodic_event_update_interval = Duration::from_millis(1000);
646
647    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
648        * compactor_context.storage_opts.compactor_max_task_multiplier)
649        .ceil() as u32;
650    let running_task_parallelism = Arc::new(AtomicU32::new(0));
651
652    const MAX_PULL_TASK_COUNT: u32 = 4;
653    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
654
655    assert_ge!(
656        compactor_context.storage_opts.compactor_max_task_multiplier,
657        0.0
658    );
659
660    let join_handle = tokio::spawn(async move {
661        let shutdown_map = CompactionShutdownMap::default();
662        let mut min_interval = tokio::time::interval(stream_retry_interval);
663        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
664
665        // Track last logged state to avoid duplicate logs
666        let mut log_throttler =
667            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
668
669        // This outer loop is to recreate stream.
670        'start_stream: loop {
671            // reset state
672            // pull_task_ack.store(true, Ordering::SeqCst);
673            let mut pull_task_ack = true;
674            tokio::select! {
675                // Wait for interval.
676                _ = min_interval.tick() => {},
677                // Shutdown compactor.
678                _ = &mut shutdown_rx => {
679                    tracing::info!("Compactor is shutting down");
680                    return;
681                }
682            }
683
684            let (request_sender, response_event_stream) =
685                match hummock_meta_client.subscribe_compaction_event().await {
686                    Ok((request_sender, response_event_stream)) => {
687                        tracing::debug!("Succeeded subscribe_compaction_event.");
688                        (request_sender, response_event_stream)
689                    }
690
691                    Err(e) => {
692                        tracing::warn!(
693                            error = %e.as_report(),
694                            "Subscribing to compaction tasks failed with error. Will retry.",
695                        );
696                        continue 'start_stream;
697                    }
698                };
699
700            pin_mut!(response_event_stream);
701
702            let executor = compactor_context.compaction_executor.clone();
703            let object_id_manager = object_id_manager.clone();
704
705            // This inner loop is to consume stream or report task progress.
706            let mut event_loop_iteration_now = Instant::now();
707            'consume_stream: loop {
708                {
709                    // report
710                    compactor_context
711                        .compactor_metrics
712                        .compaction_event_loop_iteration_latency
713                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
714                    event_loop_iteration_now = Instant::now();
715                }
716
717                let running_task_parallelism = running_task_parallelism.clone();
718                let request_sender = request_sender.clone();
719                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
720                    _ = periodic_event_interval.tick() => {
721                        let progress_list = get_task_progress(task_progress.clone());
722
723                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
724                            event: Some(RequestEvent::HeartBeat(
725                                HeartBeat {
726                                    progress: progress_list
727                                }
728                            )),
729                            create_at: SystemTime::now()
730                                .duration_since(std::time::UNIX_EPOCH)
731                                .expect("Clock may have gone backwards")
732                                .as_millis() as u64,
733                        }) {
734                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
735                            // re subscribe stream
736                            continue 'start_stream;
737                        }
738
739
740                        let mut pending_pull_task_count = 0;
741                        if pull_task_ack {
742                            // TODO: Compute parallelism on meta side
743                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
744
745                            if pending_pull_task_count > 0 {
746                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
747                                    event: Some(RequestEvent::PullTask(
748                                        PullTask {
749                                            pull_task_count: pending_pull_task_count,
750                                        }
751                                    )),
752                                    create_at: SystemTime::now()
753                                        .duration_since(std::time::UNIX_EPOCH)
754                                        .expect("Clock may have gone backwards")
755                                        .as_millis() as u64,
756                                }) {
757                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
758
759                                    // re subscribe stream
760                                    continue 'start_stream;
761                                } else {
762                                    pull_task_ack = false;
763                                }
764                            }
765                        }
766
767                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
768                        let current_state = CompactionLogState {
769                            running_parallelism: running_count,
770                            pull_task_ack,
771                            pending_pull_task_count,
772                        };
773
774                        // Log only when state changes or periodically as heartbeat
775                        if log_throttler.should_log(&current_state) {
776                            tracing::info!(
777                                running_parallelism_count = %current_state.running_parallelism,
778                                pull_task_ack = %current_state.pull_task_ack,
779                                pending_pull_task_count = %current_state.pending_pull_task_count
780                            );
781                            log_throttler.update(current_state);
782                        }
783
784                        continue;
785                    }
786                    event = response_event_stream.next() => {
787                        event
788                    }
789
790                    _ = &mut shutdown_rx => {
791                        tracing::info!("Compactor is shutting down");
792                        return
793                    }
794                };
795
796                fn send_report_task_event(
797                    compact_task: &CompactTask,
798                    table_stats: TableStatsMap,
799                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
800                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
801                ) {
802                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
803                        event: Some(RequestEvent::ReportTask(ReportTask {
804                            task_id: compact_task.task_id,
805                            task_status: compact_task.task_status.into(),
806                            sorted_output_ssts: compact_task
807                                .sorted_output_ssts
808                                .iter()
809                                .map(|sst| sst.into())
810                                .collect(),
811                            table_stats_change: to_prost_table_stats_map(table_stats),
812                            object_timestamps: object_timestamps
813                                .into_iter()
814                                .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
815                                .collect(),
816                        })),
817                        create_at: SystemTime::now()
818                            .duration_since(std::time::UNIX_EPOCH)
819                            .expect("Clock may have gone backwards")
820                            .as_millis() as u64,
821                    }) {
822                        let task_id = compact_task.task_id;
823                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
824                    }
825                }
826
827                match event {
828                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
829                        let event = match event {
830                            Some(event) => event,
831                            None => continue 'consume_stream,
832                        };
833                        let shutdown = shutdown_map.clone();
834                        let context = compactor_context.clone();
835                        let consumed_latency_ms = SystemTime::now()
836                            .duration_since(std::time::UNIX_EPOCH)
837                            .expect("Clock may have gone backwards")
838                            .as_millis() as u64
839                            - create_at;
840                        context
841                            .compactor_metrics
842                            .compaction_event_consumed_latency
843                            .observe(consumed_latency_ms as _);
844
845                        let object_id_manager = object_id_manager.clone();
846                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
847
848                        match event {
849                            ResponseEvent::CompactTask(compact_task) => {
850                                let compact_task = CompactTask::from(compact_task);
851                                let parallelism =
852                                    calculate_task_parallelism(&compact_task, &context);
853
854                                assert_ne!(parallelism, 0, "splits cannot be empty");
855
856                                if (max_task_parallelism
857                                    - running_task_parallelism.load(Ordering::SeqCst))
858                                    < parallelism as u32
859                                {
860                                    tracing::warn!(
861                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
862                                        compact_task.task_id,
863                                        parallelism,
864                                        max_task_parallelism,
865                                        running_task_parallelism.load(Ordering::Relaxed),
866                                    );
867                                    let (compact_task, table_stats, object_timestamps) =
868                                        compact_done(
869                                            compact_task,
870                                            context.clone(),
871                                            vec![],
872                                            TaskStatus::NoAvailCpuResourceCanceled,
873                                        );
874
875                                    send_report_task_event(
876                                        &compact_task,
877                                        table_stats,
878                                        object_timestamps,
879                                        &request_sender,
880                                    );
881
882                                    continue 'consume_stream;
883                                }
884
885                                running_task_parallelism
886                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
887                                executor.spawn(async move {
888                                    let (tx, rx) = tokio::sync::oneshot::channel();
889                                    let task_id = compact_task.task_id;
890                                    shutdown.lock().unwrap().insert(task_id, tx);
891
892                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
893                                        context.clone(),
894                                        compact_task,
895                                        rx,
896                                        object_id_manager.clone(),
897                                        compaction_catalog_manager_ref.clone(),
898                                    )
899                                    .await;
900
901                                    shutdown.lock().unwrap().remove(&task_id);
902                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
903
904                                    send_report_task_event(
905                                        &compact_task,
906                                        table_stats,
907                                        object_timestamps,
908                                        &request_sender,
909                                    );
910
911                                    let enable_check_compaction_result =
912                                    context.storage_opts.check_compaction_result;
913                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
914
915                                    if enable_check_compaction_result && need_check_task {
916                                        let compact_table_ids = compact_task.build_compact_table_ids();
917                                        match compaction_catalog_manager_ref.acquire(compact_table_ids).await {
918                                            Ok(compaction_catalog_agent_ref) =>  {
919                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
920                                                {
921                                                    Err(e) => {
922                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
923                                                    }
924                                                    Ok(true) => (),
925                                                    Ok(false) => {
926                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
927                                                    }
928                                                }
929                                            },
930                                            Err(e) => {
931                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
932                                            }
933                                        }
934                                    }
935                                });
936                            }
937                            ResponseEvent::VacuumTask(_) => {
938                                unreachable!("unexpected vacuum task");
939                            }
940                            ResponseEvent::FullScanTask(_) => {
941                                unreachable!("unexpected scan task");
942                            }
943                            ResponseEvent::ValidationTask(validation_task) => {
944                                let validation_task = ValidationTask::from(validation_task);
945                                executor.spawn(async move {
946                                    validate_ssts(validation_task, context.sstable_store.clone())
947                                        .await;
948                                });
949                            }
950                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
951                                .lock()
952                                .unwrap()
953                                .remove(&cancel_compact_task.task_id)
954                            {
955                                Some(tx) => {
956                                    if tx.send(()).is_err() {
957                                        tracing::warn!(
958                                            "Cancellation of compaction task failed. task_id: {}",
959                                            cancel_compact_task.task_id
960                                        );
961                                    }
962                                }
963                                _ => {
964                                    tracing::warn!(
965                                        "Attempting to cancel non-existent compaction task. task_id: {}",
966                                        cancel_compact_task.task_id
967                                    );
968                                }
969                            },
970
971                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
972                                // set flag
973                                pull_task_ack = true;
974                            }
975                        }
976                    }
977                    Some(Err(e)) => {
978                        tracing::warn!("Failed to consume stream. {}", e.message());
979                        continue 'start_stream;
980                    }
981                    _ => {
982                        // The stream is exhausted
983                        continue 'start_stream;
984                    }
985                }
986            }
987        }
988    });
989
990    (join_handle, shutdown_tx)
991}
992
993/// The background compaction thread that receives compaction tasks from hummock compaction
994/// manager and runs compaction tasks.
995#[must_use]
996pub fn start_shared_compactor(
997    grpc_proxy_client: GrpcCompactorProxyClient,
998    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
999    context: CompactorContext,
1000) -> (JoinHandle<()>, Sender<()>) {
1001    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1002    let task_progress = context.task_progress_manager.clone();
1003    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1004    let periodic_event_update_interval = Duration::from_millis(1000);
1005
1006    let join_handle = tokio::spawn(async move {
1007        let shutdown_map = CompactionShutdownMap::default();
1008
1009        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1010        let executor = context.compaction_executor.clone();
1011        let report_heartbeat_client = grpc_proxy_client.clone();
1012        'consume_stream: loop {
1013            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1014                _ = periodic_event_interval.tick() => {
1015                    let progress_list = get_task_progress(task_progress.clone());
1016                    let report_compaction_task_request = ReportCompactionTaskRequest{
1017                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1018                            SharedHeartBeat {
1019                                progress: progress_list
1020                            }
1021                        )),
1022                     };
1023                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1024                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1025                    }
1026                    continue
1027                }
1028
1029
1030                _ = &mut shutdown_rx => {
1031                    tracing::info!("Compactor is shutting down");
1032                    return
1033                }
1034
1035                request = receiver.recv() => {
1036                    request
1037                }
1038
1039            };
1040            match request {
1041                Some(request) => {
1042                    let context = context.clone();
1043                    let shutdown = shutdown_map.clone();
1044
1045                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1046                    executor.spawn(async move {
1047                        let DispatchCompactionTaskRequest {
1048                            tables,
1049                            output_object_ids,
1050                            task: dispatch_task,
1051                        } = request.into_inner();
1052                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1053                            acc.insert(table.id, table);
1054                            acc
1055                        });
1056
1057                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1058                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1059                        let shared_compactor_object_id_manager =
1060                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1061                            match dispatch_task.unwrap() {
1062                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1063                                    let compact_task = CompactTask::from(&compact_task);
1064                                    let (tx, rx) = tokio::sync::oneshot::channel();
1065                                    let task_id = compact_task.task_id;
1066                                    shutdown.lock().unwrap().insert(task_id, tx);
1067
1068                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1069                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1070                                        context.clone(),
1071                                        compact_task,
1072                                        rx,
1073                                        shared_compactor_object_id_manager,
1074                                        compaction_catalog_agent_ref.clone(),
1075                                    )
1076                                    .await;
1077                                    shutdown.lock().unwrap().remove(&task_id);
1078                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1079                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1080                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1081                                            table_stats_change: to_prost_table_stats_map(table_stats),
1082                                            object_timestamps: object_timestamps
1083                                            .into_iter()
1084                                            .map(|(object_id, timestamp)| (object_id.inner(), timestamp))
1085                                            .collect(),
1086                                    })),
1087                                    };
1088
1089                                    match cloned_grpc_proxy_client
1090                                        .report_compaction_task(report_compaction_task_request)
1091                                        .await
1092                                    {
1093                                        Ok(_) => {
1094                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1095                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1096                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1097                                            if enable_check_compaction_result && need_check_task {
1098                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1099                                                    Err(e) => {
1100                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1101                                                    },
1102                                                    Ok(true) => (),
1103                                                    Ok(false) => {
1104                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1105                                                    }
1106                                                }
1107                                            }
1108                                        }
1109                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1110                                    }
1111
1112                                }
1113                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1114                                    unreachable!("unexpected vacuum task");
1115                                }
1116                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1117                                    unreachable!("unexpected scan task");
1118                                }
1119                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1120                                    let validation_task = ValidationTask::from(validation_task);
1121                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1122                                }
1123                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1124                                    match shutdown
1125                                        .lock()
1126                                        .unwrap()
1127                                        .remove(&cancel_compact_task.task_id)
1128                                    { Some(tx) => {
1129                                        if tx.send(()).is_err() {
1130                                            tracing::warn!(
1131                                                "Cancellation of compaction task failed. task_id: {}",
1132                                                cancel_compact_task.task_id
1133                                            );
1134                                        }
1135                                    } _ => {
1136                                        tracing::warn!(
1137                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1138                                            cancel_compact_task.task_id
1139                                        );
1140                                    }}
1141                                }
1142                            }
1143                    });
1144                }
1145                None => continue 'consume_stream,
1146            }
1147        }
1148    });
1149    (join_handle, shutdown_tx)
1150}
1151
1152fn get_task_progress(
1153    task_progress: Arc<
1154        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1155    >,
1156) -> Vec<CompactTaskProgress> {
1157    let mut progress_list = Vec::new();
1158    for (&task_id, progress) in &*task_progress.lock() {
1159        progress_list.push(progress.snapshot(task_id));
1160    }
1161    progress_list
1162}
1163
1164/// Schedule queued tasks if we have capacity
1165fn schedule_queued_tasks(
1166    task_queue: &mut IcebergTaskQueue,
1167    compactor_context: &CompactorContext,
1168    shutdown_map: &Arc<Mutex<HashMap<u64, tokio::sync::oneshot::Sender<()>>>>,
1169    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<u64>,
1170) {
1171    while let Some(popped_task) = task_queue.pop() {
1172        let task_id = popped_task.meta.task_id;
1173
1174        // Get unique_ident before moving runner
1175        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1176
1177        let Some(runner) = popped_task.runner else {
1178            tracing::error!(
1179                task_id = task_id,
1180                "Popped task missing runner - this should not happen"
1181            );
1182            task_queue.finish_running(task_id);
1183            continue;
1184        };
1185
1186        let executor = compactor_context.compaction_executor.clone();
1187        let shutdown_map_clone = shutdown_map.clone();
1188        let completion_tx_clone = task_completion_tx.clone();
1189
1190        tracing::info!(
1191            task_id = task_id,
1192            unique_ident = ?unique_ident,
1193            required_parallelism = popped_task.meta.required_parallelism,
1194            "Starting iceberg compaction task from queue"
1195        );
1196
1197        executor.spawn(async move {
1198            let (tx, rx) = tokio::sync::oneshot::channel();
1199            {
1200                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1201                shutdown_guard.insert(task_id, tx);
1202            }
1203
1204            let _cleanup_guard = scopeguard::guard(
1205                (task_id, shutdown_map_clone, completion_tx_clone),
1206                move |(task_id, shutdown_map, completion_tx)| {
1207                    {
1208                        let mut shutdown_guard = shutdown_map.lock().unwrap();
1209                        shutdown_guard.remove(&task_id);
1210                    }
1211                    // Notify main loop that task is completed
1212                    // Multiple tasks can send completion notifications concurrently via mpsc
1213                    if completion_tx.send(task_id).is_err() {
1214                        tracing::warn!(task_id = task_id, "Failed to notify task completion - main loop may have shut down");
1215                    }
1216                },
1217            );
1218
1219            if let Err(e) = runner.compact(rx).await {
1220                tracing::warn!(error = %e.as_report(), "Failed to compact iceberg runner {}", task_id);
1221            }
1222        });
1223    }
1224}
1225
1226/// Handle pulling new tasks from meta service
1227/// Returns true if the stream should be restarted
1228fn handle_meta_task_pulling(
1229    pull_task_ack: &mut bool,
1230    task_queue: &IcebergTaskQueue,
1231    max_task_parallelism: u32,
1232    max_pull_task_count: u32,
1233    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1234    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1235) -> bool {
1236    let mut pending_pull_task_count = 0;
1237    if *pull_task_ack {
1238        // Use queue's running parallelism for pull decision
1239        let current_running_parallelism = task_queue.running_parallelism_sum();
1240        pending_pull_task_count =
1241            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1242
1243        if pending_pull_task_count > 0 {
1244            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1245                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1246                    subscribe_iceberg_compaction_event_request::PullTask {
1247                        pull_task_count: pending_pull_task_count,
1248                    },
1249                )),
1250                create_at: SystemTime::now()
1251                    .duration_since(std::time::UNIX_EPOCH)
1252                    .expect("Clock may have gone backwards")
1253                    .as_millis() as u64,
1254            }) {
1255                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1256                return true; // Signal to restart stream
1257            } else {
1258                *pull_task_ack = false;
1259            }
1260        }
1261    }
1262
1263    let running_count = task_queue.running_parallelism_sum();
1264    let waiting_count = task_queue.waiting_parallelism_sum();
1265    let available_count = max_task_parallelism.saturating_sub(running_count);
1266    let current_state = IcebergCompactionLogState {
1267        running_parallelism: running_count,
1268        waiting_parallelism: waiting_count,
1269        available_parallelism: available_count,
1270        pull_task_ack: *pull_task_ack,
1271        pending_pull_task_count,
1272    };
1273
1274    // Log only when state changes or periodically as heartbeat
1275    if log_throttler.should_log(&current_state) {
1276        tracing::info!(
1277            running_parallelism_count = %current_state.running_parallelism,
1278            waiting_parallelism_count = %current_state.waiting_parallelism,
1279            available_parallelism = %current_state.available_parallelism,
1280            pull_task_ack = %current_state.pull_task_ack,
1281            pending_pull_task_count = %current_state.pending_pull_task_count
1282        );
1283        log_throttler.update(current_state);
1284    }
1285
1286    false // No need to restart stream
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use super::*;
1292
1293    #[test]
1294    fn test_log_state_equality() {
1295        // Test CompactionLogState
1296        let state1 = CompactionLogState {
1297            running_parallelism: 10,
1298            pull_task_ack: true,
1299            pending_pull_task_count: 2,
1300        };
1301        let state2 = CompactionLogState {
1302            running_parallelism: 10,
1303            pull_task_ack: true,
1304            pending_pull_task_count: 2,
1305        };
1306        let state3 = CompactionLogState {
1307            running_parallelism: 11,
1308            pull_task_ack: true,
1309            pending_pull_task_count: 2,
1310        };
1311        assert_eq!(state1, state2);
1312        assert_ne!(state1, state3);
1313
1314        // Test IcebergCompactionLogState
1315        let ice_state1 = IcebergCompactionLogState {
1316            running_parallelism: 10,
1317            waiting_parallelism: 5,
1318            available_parallelism: 15,
1319            pull_task_ack: true,
1320            pending_pull_task_count: 2,
1321        };
1322        let ice_state2 = IcebergCompactionLogState {
1323            running_parallelism: 10,
1324            waiting_parallelism: 6,
1325            available_parallelism: 15,
1326            pull_task_ack: true,
1327            pending_pull_task_count: 2,
1328        };
1329        assert_ne!(ice_state1, ice_state2);
1330    }
1331
1332    #[test]
1333    fn test_log_throttler_state_change_detection() {
1334        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1335        let state1 = CompactionLogState {
1336            running_parallelism: 10,
1337            pull_task_ack: true,
1338            pending_pull_task_count: 2,
1339        };
1340        let state2 = CompactionLogState {
1341            running_parallelism: 11,
1342            pull_task_ack: true,
1343            pending_pull_task_count: 2,
1344        };
1345
1346        // First call should always log
1347        assert!(throttler.should_log(&state1));
1348        throttler.update(state1.clone());
1349
1350        // Same state should not log
1351        assert!(!throttler.should_log(&state1));
1352
1353        // Changed state should log
1354        assert!(throttler.should_log(&state2));
1355        throttler.update(state2.clone());
1356
1357        // Same state again should not log
1358        assert!(!throttler.should_log(&state2));
1359    }
1360
1361    #[test]
1362    fn test_log_throttler_heartbeat() {
1363        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1364        let state = CompactionLogState {
1365            running_parallelism: 10,
1366            pull_task_ack: true,
1367            pending_pull_task_count: 2,
1368        };
1369
1370        // First call should log
1371        assert!(throttler.should_log(&state));
1372        throttler.update(state.clone());
1373
1374        // Same state immediately should not log
1375        assert!(!throttler.should_log(&state));
1376
1377        // Wait for heartbeat interval to pass
1378        std::thread::sleep(Duration::from_millis(15));
1379
1380        // Same state after interval should log (heartbeat)
1381        assert!(throttler.should_log(&state));
1382    }
1383}