risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24    ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28    subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::{HashMap, VecDeque};
43use std::marker::PhantomData;
44use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
45use std::sync::{Arc, Mutex};
46use std::time::{Duration, SystemTime};
47
48use await_tree::{InstrumentAwait, SpanExt};
49pub use compaction_executor::CompactionExecutor;
50pub use compaction_filter::{
51    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
52    TtlCompactionFilter,
53};
54pub use context::{
55    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58// Import iceberg compactor runner types from the local `iceberg_compaction` module.
59use iceberg_compaction::iceberg_compactor_runner::{
60    IcebergCompactorRunnerConfigBuilder, create_plan_runners,
61};
62use iceberg_compaction::{IcebergTaskQueue, PushResult};
63pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
64use more_asserts::assert_ge;
65use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
66use risingwave_hummock_sdk::{
67    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
68};
69use risingwave_pb::hummock::compact_task::TaskStatus;
70use risingwave_pb::hummock::subscribe_compaction_event_request::{
71    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
72};
73use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
74use risingwave_pb::hummock::{
75    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
76    SubscribeCompactionEventResponse,
77};
78use risingwave_rpc_client::HummockMetaClient;
79pub use shared_buffer_compact::{compact, merge_imms_in_memory};
80use tokio::sync::oneshot::Sender;
81use tokio::task::JoinHandle;
82use tokio::time::Instant;
83
84pub use self::compaction_utils::{
85    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
86    check_flush_result,
87};
88pub use self::task_progress::TaskProgress;
89use super::multi_builder::CapacitySplitTableBuilder;
90use super::{
91    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
92};
93use crate::compaction_catalog_manager::{
94    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
95};
96use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
97use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
98use crate::hummock::compactor::iceberg_compaction::TaskKey;
99use crate::hummock::iterator::{Forward, HummockIterator};
100use crate::hummock::{
101    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
102    UnifiedSstableWriterFactory, validate_ssts,
103};
104use crate::monitor::CompactorMetrics;
105
106/// Heartbeat logging interval for compaction tasks
107const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
108
109/// Represents the compaction task state for logging purposes
110#[derive(Debug, Clone, PartialEq, Eq)]
111struct CompactionLogState {
112    running_parallelism: u32,
113    pull_task_ack: bool,
114    pending_pull_task_count: u32,
115}
116
117/// Represents the iceberg compaction task state for logging purposes
118#[derive(Debug, Clone, PartialEq, Eq)]
119struct IcebergCompactionLogState {
120    running_parallelism: u32,
121    waiting_parallelism: u32,
122    available_parallelism: u32,
123    pull_task_ack: bool,
124    pending_pull_task_count: u32,
125}
126
127/// Controls periodic logging with state change detection
128struct LogThrottler<T: PartialEq> {
129    last_logged_state: Option<T>,
130    last_heartbeat: Instant,
131    heartbeat_interval: Duration,
132}
133
134impl<T: PartialEq> LogThrottler<T> {
135    fn new(heartbeat_interval: Duration) -> Self {
136        Self {
137            last_logged_state: None,
138            last_heartbeat: Instant::now(),
139            heartbeat_interval,
140        }
141    }
142
143    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
144    fn should_log(&self, current_state: &T) -> bool {
145        self.last_logged_state.as_ref() != Some(current_state)
146            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
147    }
148
149    /// Updates the state and heartbeat timestamp after logging
150    fn update(&mut self, current_state: T) {
151        self.last_logged_state = Some(current_state);
152        self.last_heartbeat = Instant::now();
153    }
154}
155
156/// Implementation of Hummock compaction.
157pub struct Compactor {
158    /// The context of the compactor.
159    context: CompactorContext,
160    object_id_getter: Arc<dyn GetObjectId>,
161    task_config: TaskConfig,
162    options: SstableBuilderOptions,
163    get_id_time: Arc<AtomicU64>,
164}
165
166pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
167
168impl Compactor {
169    /// Create a new compactor.
170    pub fn new(
171        context: CompactorContext,
172        options: SstableBuilderOptions,
173        task_config: TaskConfig,
174        object_id_getter: Arc<dyn GetObjectId>,
175    ) -> Self {
176        Self {
177            context,
178            options,
179            task_config,
180            get_id_time: Arc::new(AtomicU64::new(0)),
181            object_id_getter,
182        }
183    }
184
185    /// Compact the given key range and merge iterator.
186    /// Upon a successful return, the built SSTs are already uploaded to object store.
187    ///
188    /// `task_progress` is only used for tasks on the compactor.
189    async fn compact_key_range(
190        &self,
191        iter: impl HummockIterator<Direction = Forward>,
192        compaction_filter: impl CompactionFilter,
193        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
194        task_progress: Option<Arc<TaskProgress>>,
195        task_id: Option<HummockCompactionTaskId>,
196        split_index: Option<usize>,
197    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
198        // Monitor time cost building shared buffer to SSTs.
199        let compact_timer = if self.context.is_share_buffer_compact {
200            self.context
201                .compactor_metrics
202                .write_build_l0_sst_duration
203                .start_timer()
204        } else {
205            self.context
206                .compactor_metrics
207                .compact_sst_duration
208                .start_timer()
209        };
210
211        let (split_table_outputs, table_stats_map) = {
212            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
213            if self.task_config.use_block_based_filter {
214                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
215                    factory,
216                    iter,
217                    compaction_filter,
218                    compaction_catalog_agent_ref,
219                    task_progress.clone(),
220                    self.object_id_getter.clone(),
221                )
222                .instrument_await("compact".verbose())
223                .await?
224            } else {
225                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
226                    factory,
227                    iter,
228                    compaction_filter,
229                    compaction_catalog_agent_ref,
230                    task_progress.clone(),
231                    self.object_id_getter.clone(),
232                )
233                .instrument_await("compact".verbose())
234                .await?
235            }
236        };
237
238        compact_timer.observe_duration();
239
240        Self::report_progress(
241            self.context.compactor_metrics.clone(),
242            task_progress,
243            &split_table_outputs,
244            self.context.is_share_buffer_compact,
245        );
246
247        self.context
248            .compactor_metrics
249            .get_table_id_total_time_duration
250            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
251
252        debug_assert!(
253            split_table_outputs
254                .iter()
255                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
256        );
257
258        if task_id.is_some() {
259            // skip shared buffer compaction
260            tracing::info!(
261                "Finish Task {:?} split_index {:?} sst count {}",
262                task_id,
263                split_index,
264                split_table_outputs.len()
265            );
266        }
267        Ok((split_table_outputs, table_stats_map))
268    }
269
270    pub fn report_progress(
271        metrics: Arc<CompactorMetrics>,
272        task_progress: Option<Arc<TaskProgress>>,
273        ssts: &Vec<LocalSstableInfo>,
274        is_share_buffer_compact: bool,
275    ) {
276        for sst_info in ssts {
277            let sst_size = sst_info.file_size();
278            if let Some(tracker) = &task_progress {
279                tracker.inc_ssts_uploaded();
280                tracker.dec_num_pending_write_io();
281            }
282            if is_share_buffer_compact {
283                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
284            } else {
285                metrics.compaction_upload_sst_counts.inc();
286            }
287        }
288    }
289
290    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
291        &self,
292        writer_factory: F,
293        iter: impl HummockIterator<Direction = Forward>,
294        compaction_filter: impl CompactionFilter,
295        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
296        task_progress: Option<Arc<TaskProgress>>,
297        object_id_getter: Arc<dyn GetObjectId>,
298    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
299        let builder_factory = RemoteBuilderFactory::<F, B> {
300            object_id_getter,
301            limiter: self.context.memory_limiter.clone(),
302            options: self.options.clone(),
303            policy: self.task_config.cache_policy,
304            remote_rpc_cost: self.get_id_time.clone(),
305            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
306            sstable_writer_factory: writer_factory,
307            _phantom: PhantomData,
308        };
309
310        let mut sst_builder = CapacitySplitTableBuilder::new(
311            builder_factory,
312            self.context.compactor_metrics.clone(),
313            task_progress.clone(),
314            self.task_config.table_vnode_partition.clone(),
315            self.context
316                .storage_opts
317                .compactor_concurrent_uploading_sst_count,
318            compaction_catalog_agent_ref,
319        );
320        let compaction_statistics = compact_and_build_sst(
321            &mut sst_builder,
322            &self.task_config,
323            self.context.compactor_metrics.clone(),
324            iter,
325            compaction_filter,
326        )
327        .instrument_await("compact_and_build_sst".verbose())
328        .await?;
329
330        let ssts = sst_builder
331            .finish()
332            .instrument_await("builder_finish".verbose())
333            .await?;
334
335        Ok((ssts, compaction_statistics))
336    }
337}
338
339/// The background compaction thread that receives compaction tasks from hummock compaction
340/// manager and runs compaction tasks.
341#[must_use]
342pub fn start_iceberg_compactor(
343    compactor_context: CompactorContext,
344    hummock_meta_client: Arc<dyn HummockMetaClient>,
345) -> (JoinHandle<()>, Sender<()>) {
346    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
347    let stream_retry_interval = Duration::from_secs(30);
348    let periodic_event_update_interval = Duration::from_millis(1000);
349    let worker_num = compactor_context.compaction_executor.worker_num();
350
351    let max_task_parallelism: u32 = (worker_num as f32
352        * compactor_context.storage_opts.compactor_max_task_multiplier)
353        .ceil() as u32;
354
355    const MAX_PULL_TASK_COUNT: u32 = 4;
356    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
357
358    assert_ge!(
359        compactor_context.storage_opts.compactor_max_task_multiplier,
360        0.0
361    );
362
363    let join_handle = tokio::spawn(async move {
364        // Initialize task queue with event-driven scheduling
365        let pending_parallelism_budget = (max_task_parallelism as f32
366            * compactor_context
367                .storage_opts
368                .iceberg_compaction_pending_parallelism_budget_multiplier)
369            .ceil() as u32;
370        let mut task_queue =
371            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
372
373        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
374        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
375
376        // Channel for task completion notifications
377        let (task_completion_tx, mut task_completion_rx) =
378            tokio::sync::mpsc::unbounded_channel::<TaskKey>();
379
380        let mut min_interval = tokio::time::interval(stream_retry_interval);
381        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
382
383        // Track last logged state to avoid duplicate logs
384        let mut log_throttler =
385            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
386
387        // This outer loop is to recreate stream.
388        'start_stream: loop {
389            // reset state
390            // pull_task_ack.store(true, Ordering::SeqCst);
391            let mut pull_task_ack = true;
392            tokio::select! {
393                // Wait for interval.
394                _ = min_interval.tick() => {},
395                // Shutdown compactor.
396                _ = &mut shutdown_rx => {
397                    tracing::info!("Compactor is shutting down");
398                    return;
399                }
400            }
401
402            let (request_sender, response_event_stream) = match hummock_meta_client
403                .subscribe_iceberg_compaction_event()
404                .await
405            {
406                Ok((request_sender, response_event_stream)) => {
407                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
408                    (request_sender, response_event_stream)
409                }
410
411                Err(e) => {
412                    tracing::warn!(
413                        error = %e.as_report(),
414                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
415                    );
416                    continue 'start_stream;
417                }
418            };
419
420            pin_mut!(response_event_stream);
421
422            let _executor = compactor_context.compaction_executor.clone();
423
424            // This inner loop is to consume stream or report task progress.
425            let mut event_loop_iteration_now = Instant::now();
426            'consume_stream: loop {
427                {
428                    // report
429                    compactor_context
430                        .compactor_metrics
431                        .compaction_event_loop_iteration_latency
432                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
433                    event_loop_iteration_now = Instant::now();
434                }
435
436                let request_sender = request_sender.clone();
437                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
438                    // Handle task completion notifications
439                    Some(completed_task_key) = task_completion_rx.recv() => {
440                        tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
441                        task_queue.finish_running(completed_task_key);
442                        continue 'consume_stream;
443                    }
444
445                    // Event-driven task scheduling - wait for tasks to become schedulable
446                    _ = task_queue.wait_schedulable() => {
447                        schedule_queued_tasks(
448                            &mut task_queue,
449                            &compactor_context,
450                            &shutdown_map,
451                            &task_completion_tx,
452                        );
453                        continue 'consume_stream;
454                    }
455
456                    _ = periodic_event_interval.tick() => {
457                        // Only handle meta task pulling in periodic tick
458                        let should_restart_stream = handle_meta_task_pulling(
459                            &mut pull_task_ack,
460                            &task_queue,
461                            max_task_parallelism,
462                            max_pull_task_count,
463                            &request_sender,
464                            &mut log_throttler,
465                        );
466
467                        if should_restart_stream {
468                            continue 'start_stream;
469                        }
470                        continue;
471                    }
472                    event = response_event_stream.next() => {
473                        event
474                    }
475
476                    _ = &mut shutdown_rx => {
477                        tracing::info!("Iceberg Compactor is shutting down");
478                        return
479                    }
480                };
481
482                match event {
483                    Some(Ok(SubscribeIcebergCompactionEventResponse {
484                        event,
485                        create_at: _create_at,
486                    })) => {
487                        let event = match event {
488                            Some(event) => event,
489                            None => continue 'consume_stream,
490                        };
491
492                        match event {
493                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
494                                let task_id = iceberg_compaction_task.task_id;
495                                // Note: write_parquet_properties is now built from sink config (IcebergConfig) in create_plan_runners
496                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
497                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
498                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
499                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
500                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
501                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
502                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
503                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
504                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
505                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
506                                    .target_binpack_group_size_mb(
507                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
508                                    )
509                                    .min_group_size_mb(
510                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
511                                    )
512                                    .min_group_file_count(
513                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
514                                    )
515                                    .build() {
516                                    Ok(config) => config,
517                                    Err(e) => {
518                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
519                                        continue 'consume_stream;
520                                    }
521                                };
522
523                                // Create multiple plan runners from the task
524                                let plan_runners = match create_plan_runners(
525                                    iceberg_compaction_task,
526                                    compactor_runner_config,
527                                    compactor_context.compactor_metrics.clone(),
528                                ).await {
529                                    Ok(runners) => runners,
530                                    Err(e) => {
531                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
532                                        continue 'consume_stream;
533                                    }
534                                };
535
536                                if plan_runners.is_empty() {
537                                    tracing::info!(task_id, "No plans to execute");
538                                    continue 'consume_stream;
539                                }
540
541                                // Enqueue each plan runner independently
542                                let total_plans = plan_runners.len();
543                                let mut enqueued_count = 0;
544
545                                for runner in plan_runners {
546                                    let meta = runner.to_meta();
547                                    let required_parallelism = runner.required_parallelism();
548                                    let push_result = task_queue.push(meta.clone(), Some(runner));
549
550                                    match push_result {
551                                        PushResult::Added => {
552                                            enqueued_count += 1;
553                                            tracing::debug!(
554                                                task_id = task_id,
555                                                plan_index = enqueued_count - 1,
556                                                required_parallelism = required_parallelism,
557                                                "Iceberg plan runner added to queue"
558                                            );
559                                        },
560                                        PushResult::RejectedCapacity => {
561                                            tracing::warn!(
562                                                task_id = task_id,
563                                                required_parallelism = required_parallelism,
564                                                pending_budget = pending_parallelism_budget,
565                                                enqueued_count = enqueued_count,
566                                                total_plans = total_plans,
567                                                "Iceberg plan runner rejected - queue capacity exceeded"
568                                            );
569                                            // Stop enqueuing remaining plans
570                                            break;
571                                        },
572                                        PushResult::RejectedTooLarge => {
573                                            tracing::error!(
574                                                task_id = task_id,
575                                                required_parallelism = required_parallelism,
576                                                max_parallelism = max_task_parallelism,
577                                                "Iceberg plan runner rejected - parallelism exceeds max"
578                                            );
579                                        },
580                                        PushResult::RejectedInvalidParallelism => {
581                                            tracing::error!(
582                                                task_id = task_id,
583                                                required_parallelism = required_parallelism,
584                                                "Iceberg plan runner rejected - invalid parallelism"
585                                            );
586                                        },
587                                        PushResult::RejectedDuplicate => {
588                                            tracing::error!(
589                                                task_id = task_id,
590                                                plan_index = meta.plan_index,
591                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
592                                            );
593                                        }
594                                    }
595                                }
596
597                                tracing::info!(
598                                    task_id = task_id,
599                                    total_plans = total_plans,
600                                    enqueued_count = enqueued_count,
601                                    "Enqueued {} of {} Iceberg plan runners",
602                                    enqueued_count,
603                                    total_plans
604                                );
605                            },
606                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
607                                // set flag
608                                pull_task_ack = true;
609                            },
610                        }
611                    }
612                    Some(Err(e)) => {
613                        tracing::warn!("Failed to consume stream. {}", e.message());
614                        continue 'start_stream;
615                    }
616                    _ => {
617                        // The stream is exhausted
618                        continue 'start_stream;
619                    }
620                }
621            }
622        }
623    });
624
625    (join_handle, shutdown_tx)
626}
627
628/// The background compaction thread that receives compaction tasks from hummock compaction
629/// manager and runs compaction tasks.
630#[must_use]
631pub fn start_compactor(
632    compactor_context: CompactorContext,
633    hummock_meta_client: Arc<dyn HummockMetaClient>,
634    object_id_manager: Arc<ObjectIdManager>,
635    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
636) -> (JoinHandle<()>, Sender<()>) {
637    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
638    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
639    let stream_retry_interval = Duration::from_secs(30);
640    let task_progress = compactor_context.task_progress_manager.clone();
641    let periodic_event_update_interval = Duration::from_millis(1000);
642
643    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
644        * compactor_context.storage_opts.compactor_max_task_multiplier)
645        .ceil() as u32;
646    let running_task_parallelism = Arc::new(AtomicU32::new(0));
647
648    const MAX_PULL_TASK_COUNT: u32 = 4;
649    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
650
651    assert_ge!(
652        compactor_context.storage_opts.compactor_max_task_multiplier,
653        0.0
654    );
655
656    let join_handle = tokio::spawn(async move {
657        let shutdown_map = CompactionShutdownMap::default();
658        let mut min_interval = tokio::time::interval(stream_retry_interval);
659        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
660
661        // Track last logged state to avoid duplicate logs
662        let mut log_throttler =
663            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
664
665        // This outer loop is to recreate stream.
666        'start_stream: loop {
667            // reset state
668            // pull_task_ack.store(true, Ordering::SeqCst);
669            let mut pull_task_ack = true;
670            tokio::select! {
671                // Wait for interval.
672                _ = min_interval.tick() => {},
673                // Shutdown compactor.
674                _ = &mut shutdown_rx => {
675                    tracing::info!("Compactor is shutting down");
676                    return;
677                }
678            }
679
680            let (request_sender, response_event_stream) =
681                match hummock_meta_client.subscribe_compaction_event().await {
682                    Ok((request_sender, response_event_stream)) => {
683                        tracing::debug!("Succeeded subscribe_compaction_event.");
684                        (request_sender, response_event_stream)
685                    }
686
687                    Err(e) => {
688                        tracing::warn!(
689                            error = %e.as_report(),
690                            "Subscribing to compaction tasks failed with error. Will retry.",
691                        );
692                        continue 'start_stream;
693                    }
694                };
695
696            pin_mut!(response_event_stream);
697
698            let executor = compactor_context.compaction_executor.clone();
699            let object_id_manager = object_id_manager.clone();
700
701            // This inner loop is to consume stream or report task progress.
702            let mut event_loop_iteration_now = Instant::now();
703            'consume_stream: loop {
704                {
705                    // report
706                    compactor_context
707                        .compactor_metrics
708                        .compaction_event_loop_iteration_latency
709                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
710                    event_loop_iteration_now = Instant::now();
711                }
712
713                let running_task_parallelism = running_task_parallelism.clone();
714                let request_sender = request_sender.clone();
715                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
716                    _ = periodic_event_interval.tick() => {
717                        let progress_list = get_task_progress(task_progress.clone());
718
719                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
720                            event: Some(RequestEvent::HeartBeat(
721                                HeartBeat {
722                                    progress: progress_list
723                                }
724                            )),
725                            create_at: SystemTime::now()
726                                .duration_since(std::time::UNIX_EPOCH)
727                                .expect("Clock may have gone backwards")
728                                .as_millis() as u64,
729                        }) {
730                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
731                            // re subscribe stream
732                            continue 'start_stream;
733                        }
734
735
736                        let mut pending_pull_task_count = 0;
737                        if pull_task_ack {
738                            // TODO: Compute parallelism on meta side
739                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
740
741                            if pending_pull_task_count > 0 {
742                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
743                                    event: Some(RequestEvent::PullTask(
744                                        PullTask {
745                                            pull_task_count: pending_pull_task_count,
746                                        }
747                                    )),
748                                    create_at: SystemTime::now()
749                                        .duration_since(std::time::UNIX_EPOCH)
750                                        .expect("Clock may have gone backwards")
751                                        .as_millis() as u64,
752                                }) {
753                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
754
755                                    // re subscribe stream
756                                    continue 'start_stream;
757                                } else {
758                                    pull_task_ack = false;
759                                }
760                            }
761                        }
762
763                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
764                        let current_state = CompactionLogState {
765                            running_parallelism: running_count,
766                            pull_task_ack,
767                            pending_pull_task_count,
768                        };
769
770                        // Log only when state changes or periodically as heartbeat
771                        if log_throttler.should_log(&current_state) {
772                            tracing::info!(
773                                running_parallelism_count = %current_state.running_parallelism,
774                                pull_task_ack = %current_state.pull_task_ack,
775                                pending_pull_task_count = %current_state.pending_pull_task_count
776                            );
777                            log_throttler.update(current_state);
778                        }
779
780                        continue;
781                    }
782                    event = response_event_stream.next() => {
783                        event
784                    }
785
786                    _ = &mut shutdown_rx => {
787                        tracing::info!("Compactor is shutting down");
788                        return
789                    }
790                };
791
792                fn send_report_task_event(
793                    compact_task: &CompactTask,
794                    table_stats: TableStatsMap,
795                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
796                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
797                ) {
798                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
799                        event: Some(RequestEvent::ReportTask(ReportTask {
800                            task_id: compact_task.task_id,
801                            task_status: compact_task.task_status.into(),
802                            sorted_output_ssts: compact_task
803                                .sorted_output_ssts
804                                .iter()
805                                .map(|sst| sst.into())
806                                .collect(),
807                            table_stats_change: to_prost_table_stats_map(table_stats),
808                            object_timestamps,
809                        })),
810                        create_at: SystemTime::now()
811                            .duration_since(std::time::UNIX_EPOCH)
812                            .expect("Clock may have gone backwards")
813                            .as_millis() as u64,
814                    }) {
815                        let task_id = compact_task.task_id;
816                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
817                    }
818                }
819
820                match event {
821                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
822                        let event = match event {
823                            Some(event) => event,
824                            None => continue 'consume_stream,
825                        };
826                        let shutdown = shutdown_map.clone();
827                        let context = compactor_context.clone();
828                        let consumed_latency_ms = SystemTime::now()
829                            .duration_since(std::time::UNIX_EPOCH)
830                            .expect("Clock may have gone backwards")
831                            .as_millis() as u64
832                            - create_at;
833                        context
834                            .compactor_metrics
835                            .compaction_event_consumed_latency
836                            .observe(consumed_latency_ms as _);
837
838                        let object_id_manager = object_id_manager.clone();
839                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
840
841                        match event {
842                            ResponseEvent::CompactTask(compact_task) => {
843                                let compact_task = CompactTask::from(compact_task);
844                                let parallelism =
845                                    calculate_task_parallelism(&compact_task, &context);
846
847                                assert_ne!(parallelism, 0, "splits cannot be empty");
848
849                                if (max_task_parallelism
850                                    - running_task_parallelism.load(Ordering::SeqCst))
851                                    < parallelism as u32
852                                {
853                                    tracing::warn!(
854                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
855                                        compact_task.task_id,
856                                        parallelism,
857                                        max_task_parallelism,
858                                        running_task_parallelism.load(Ordering::Relaxed),
859                                    );
860                                    let (compact_task, table_stats, object_timestamps) =
861                                        compact_done(
862                                            compact_task,
863                                            context.clone(),
864                                            vec![],
865                                            TaskStatus::NoAvailCpuResourceCanceled,
866                                        );
867
868                                    send_report_task_event(
869                                        &compact_task,
870                                        table_stats,
871                                        object_timestamps,
872                                        &request_sender,
873                                    );
874
875                                    continue 'consume_stream;
876                                }
877
878                                running_task_parallelism
879                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
880                                executor.spawn(async move {
881                                    let (tx, rx) = tokio::sync::oneshot::channel();
882                                    let task_id = compact_task.task_id;
883                                    shutdown.lock().unwrap().insert(task_id, tx);
884
885                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
886                                        context.clone(),
887                                        compact_task,
888                                        rx,
889                                        object_id_manager.clone(),
890                                        compaction_catalog_manager_ref.clone(),
891                                    )
892                                    .await;
893
894                                    shutdown.lock().unwrap().remove(&task_id);
895                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
896
897                                    send_report_task_event(
898                                        &compact_task,
899                                        table_stats,
900                                        object_timestamps,
901                                        &request_sender,
902                                    );
903
904                                    let enable_check_compaction_result =
905                                    context.storage_opts.check_compaction_result;
906                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
907
908                                    if enable_check_compaction_result && need_check_task {
909                                        let compact_table_ids = compact_task.build_compact_table_ids();
910                                        match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
911                                            Ok(compaction_catalog_agent_ref) =>  {
912                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
913                                                {
914                                                    Err(e) => {
915                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
916                                                    }
917                                                    Ok(true) => (),
918                                                    Ok(false) => {
919                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
920                                                    }
921                                                }
922                                            },
923                                            Err(e) => {
924                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
925                                            }
926                                        }
927                                    }
928                                });
929                            }
930                            #[expect(deprecated)]
931                            ResponseEvent::VacuumTask(_) => {
932                                unreachable!("unexpected vacuum task");
933                            }
934                            #[expect(deprecated)]
935                            ResponseEvent::FullScanTask(_) => {
936                                unreachable!("unexpected scan task");
937                            }
938                            #[expect(deprecated)]
939                            ResponseEvent::ValidationTask(validation_task) => {
940                                let validation_task = ValidationTask::from(validation_task);
941                                executor.spawn(async move {
942                                    validate_ssts(validation_task, context.sstable_store.clone())
943                                        .await;
944                                });
945                            }
946                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
947                                .lock()
948                                .unwrap()
949                                .remove(&cancel_compact_task.task_id)
950                            {
951                                Some(tx) => {
952                                    if tx.send(()).is_err() {
953                                        tracing::warn!(
954                                            "Cancellation of compaction task failed. task_id: {}",
955                                            cancel_compact_task.task_id
956                                        );
957                                    }
958                                }
959                                _ => {
960                                    tracing::warn!(
961                                        "Attempting to cancel non-existent compaction task. task_id: {}",
962                                        cancel_compact_task.task_id
963                                    );
964                                }
965                            },
966
967                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
968                                // set flag
969                                pull_task_ack = true;
970                            }
971                        }
972                    }
973                    Some(Err(e)) => {
974                        tracing::warn!("Failed to consume stream. {}", e.message());
975                        continue 'start_stream;
976                    }
977                    _ => {
978                        // The stream is exhausted
979                        continue 'start_stream;
980                    }
981                }
982            }
983        }
984    });
985
986    (join_handle, shutdown_tx)
987}
988
989/// The background compaction thread that receives compaction tasks from hummock compaction
990/// manager and runs compaction tasks.
991#[must_use]
992pub fn start_shared_compactor(
993    grpc_proxy_client: GrpcCompactorProxyClient,
994    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
995    context: CompactorContext,
996) -> (JoinHandle<()>, Sender<()>) {
997    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
998    let task_progress = context.task_progress_manager.clone();
999    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1000    let periodic_event_update_interval = Duration::from_millis(1000);
1001
1002    let join_handle = tokio::spawn(async move {
1003        let shutdown_map = CompactionShutdownMap::default();
1004
1005        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1006        let executor = context.compaction_executor.clone();
1007        let report_heartbeat_client = grpc_proxy_client.clone();
1008        'consume_stream: loop {
1009            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1010                _ = periodic_event_interval.tick() => {
1011                    let progress_list = get_task_progress(task_progress.clone());
1012                    let report_compaction_task_request = ReportCompactionTaskRequest{
1013                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1014                            SharedHeartBeat {
1015                                progress: progress_list
1016                            }
1017                        )),
1018                     };
1019                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1020                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1021                    }
1022                    continue
1023                }
1024
1025
1026                _ = &mut shutdown_rx => {
1027                    tracing::info!("Compactor is shutting down");
1028                    return
1029                }
1030
1031                request = receiver.recv() => {
1032                    request
1033                }
1034
1035            };
1036            match request {
1037                Some(request) => {
1038                    let context = context.clone();
1039                    let shutdown = shutdown_map.clone();
1040
1041                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1042                    executor.spawn(async move {
1043                        let DispatchCompactionTaskRequest {
1044                            tables,
1045                            output_object_ids,
1046                            task: dispatch_task,
1047                        } = request.into_inner();
1048                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1049                            acc.insert(table.id, table);
1050                            acc
1051                        });
1052
1053                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1054                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1055                        let shared_compactor_object_id_manager =
1056                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1057                            match dispatch_task.unwrap() {
1058                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1059                                    let compact_task = CompactTask::from(&compact_task);
1060                                    let (tx, rx) = tokio::sync::oneshot::channel();
1061                                    let task_id = compact_task.task_id;
1062                                    shutdown.lock().unwrap().insert(task_id, tx);
1063
1064                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1065                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1066                                        context.clone(),
1067                                        compact_task,
1068                                        rx,
1069                                        shared_compactor_object_id_manager,
1070                                        compaction_catalog_agent_ref.clone(),
1071                                    )
1072                                    .await;
1073                                    shutdown.lock().unwrap().remove(&task_id);
1074                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1075                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1076                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1077                                            table_stats_change: to_prost_table_stats_map(table_stats),
1078                                            object_timestamps,
1079                                    })),
1080                                    };
1081
1082                                    match cloned_grpc_proxy_client
1083                                        .report_compaction_task(report_compaction_task_request)
1084                                        .await
1085                                    {
1086                                        Ok(_) => {
1087                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1088                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1089                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1090                                            if enable_check_compaction_result && need_check_task {
1091                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1092                                                    Err(e) => {
1093                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1094                                                    },
1095                                                    Ok(true) => (),
1096                                                    Ok(false) => {
1097                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1098                                                    }
1099                                                }
1100                                            }
1101                                        }
1102                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1103                                    }
1104
1105                                }
1106                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1107                                    unreachable!("unexpected vacuum task");
1108                                }
1109                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1110                                    unreachable!("unexpected scan task");
1111                                }
1112                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1113                                    let validation_task = ValidationTask::from(validation_task);
1114                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1115                                }
1116                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1117                                    match shutdown
1118                                        .lock()
1119                                        .unwrap()
1120                                        .remove(&cancel_compact_task.task_id)
1121                                    { Some(tx) => {
1122                                        if tx.send(()).is_err() {
1123                                            tracing::warn!(
1124                                                "Cancellation of compaction task failed. task_id: {}",
1125                                                cancel_compact_task.task_id
1126                                            );
1127                                        }
1128                                    } _ => {
1129                                        tracing::warn!(
1130                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1131                                            cancel_compact_task.task_id
1132                                        );
1133                                    }}
1134                                }
1135                            }
1136                    });
1137                }
1138                None => continue 'consume_stream,
1139            }
1140        }
1141    });
1142    (join_handle, shutdown_tx)
1143}
1144
1145fn get_task_progress(
1146    task_progress: Arc<
1147        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1148    >,
1149) -> Vec<CompactTaskProgress> {
1150    let mut progress_list = Vec::new();
1151    for (&task_id, progress) in &*task_progress.lock() {
1152        progress_list.push(progress.snapshot(task_id));
1153    }
1154    progress_list
1155}
1156
1157/// Schedule queued tasks if we have capacity
1158fn schedule_queued_tasks(
1159    task_queue: &mut IcebergTaskQueue,
1160    compactor_context: &CompactorContext,
1161    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1162    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1163) {
1164    while let Some(popped_task) = task_queue.pop() {
1165        let task_id = popped_task.meta.task_id;
1166        let plan_index = popped_task.meta.plan_index;
1167        let task_key = (task_id, plan_index);
1168
1169        // Get unique_ident before moving runner
1170        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1171
1172        let Some(runner) = popped_task.runner else {
1173            tracing::error!(
1174                task_id = task_id,
1175                plan_index = plan_index,
1176                "Popped task missing runner - this should not happen"
1177            );
1178            task_queue.finish_running(task_key);
1179            continue;
1180        };
1181
1182        let executor = compactor_context.compaction_executor.clone();
1183        let shutdown_map_clone = shutdown_map.clone();
1184        let completion_tx_clone = task_completion_tx.clone();
1185
1186        tracing::info!(
1187            task_id = task_id,
1188            plan_index = plan_index,
1189            unique_ident = ?unique_ident,
1190            required_parallelism = popped_task.meta.required_parallelism,
1191            "Starting iceberg compaction task from queue"
1192        );
1193
1194        executor.spawn(async move {
1195            let (tx, rx) = tokio::sync::oneshot::channel();
1196            {
1197                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1198                shutdown_guard.insert(task_key, tx);
1199            }
1200
1201            let _cleanup_guard = scopeguard::guard(
1202                (task_key, shutdown_map_clone, completion_tx_clone),
1203                move |(task_key, shutdown_map, completion_tx)| {
1204                    {
1205                        let mut shutdown_guard = shutdown_map.lock().unwrap();
1206                        shutdown_guard.remove(&task_key);
1207                    }
1208                    // Notify main loop that task is completed
1209                    // Multiple tasks can send completion notifications concurrently via mpsc
1210                    if completion_tx.send(task_key).is_err() {
1211                        tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1212                    }
1213                },
1214            );
1215
1216            if let Err(e) = Box::pin(runner.compact(rx)).await {
1217                tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1218            }
1219        });
1220    }
1221}
1222
1223/// Handle pulling new tasks from meta service
1224/// Returns true if the stream should be restarted
1225fn handle_meta_task_pulling(
1226    pull_task_ack: &mut bool,
1227    task_queue: &IcebergTaskQueue,
1228    max_task_parallelism: u32,
1229    max_pull_task_count: u32,
1230    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1231    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1232) -> bool {
1233    let mut pending_pull_task_count = 0;
1234    if *pull_task_ack {
1235        // Use queue's running parallelism for pull decision
1236        let current_running_parallelism = task_queue.running_parallelism_sum();
1237        pending_pull_task_count =
1238            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1239
1240        if pending_pull_task_count > 0 {
1241            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1242                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1243                    subscribe_iceberg_compaction_event_request::PullTask {
1244                        pull_task_count: pending_pull_task_count,
1245                    },
1246                )),
1247                create_at: SystemTime::now()
1248                    .duration_since(std::time::UNIX_EPOCH)
1249                    .expect("Clock may have gone backwards")
1250                    .as_millis() as u64,
1251            }) {
1252                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1253                return true; // Signal to restart stream
1254            } else {
1255                *pull_task_ack = false;
1256            }
1257        }
1258    }
1259
1260    let running_count = task_queue.running_parallelism_sum();
1261    let waiting_count = task_queue.waiting_parallelism_sum();
1262    let available_count = max_task_parallelism.saturating_sub(running_count);
1263    let current_state = IcebergCompactionLogState {
1264        running_parallelism: running_count,
1265        waiting_parallelism: waiting_count,
1266        available_parallelism: available_count,
1267        pull_task_ack: *pull_task_ack,
1268        pending_pull_task_count,
1269    };
1270
1271    // Log only when state changes or periodically as heartbeat
1272    if log_throttler.should_log(&current_state) {
1273        tracing::info!(
1274            running_parallelism_count = %current_state.running_parallelism,
1275            waiting_parallelism_count = %current_state.waiting_parallelism,
1276            available_parallelism = %current_state.available_parallelism,
1277            pull_task_ack = %current_state.pull_task_ack,
1278            pending_pull_task_count = %current_state.pending_pull_task_count
1279        );
1280        log_throttler.update(current_state);
1281    }
1282
1283    false // No need to restart stream
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288    use super::*;
1289
1290    #[test]
1291    fn test_log_state_equality() {
1292        // Test CompactionLogState
1293        let state1 = CompactionLogState {
1294            running_parallelism: 10,
1295            pull_task_ack: true,
1296            pending_pull_task_count: 2,
1297        };
1298        let state2 = CompactionLogState {
1299            running_parallelism: 10,
1300            pull_task_ack: true,
1301            pending_pull_task_count: 2,
1302        };
1303        let state3 = CompactionLogState {
1304            running_parallelism: 11,
1305            pull_task_ack: true,
1306            pending_pull_task_count: 2,
1307        };
1308        assert_eq!(state1, state2);
1309        assert_ne!(state1, state3);
1310
1311        // Test IcebergCompactionLogState
1312        let ice_state1 = IcebergCompactionLogState {
1313            running_parallelism: 10,
1314            waiting_parallelism: 5,
1315            available_parallelism: 15,
1316            pull_task_ack: true,
1317            pending_pull_task_count: 2,
1318        };
1319        let ice_state2 = IcebergCompactionLogState {
1320            running_parallelism: 10,
1321            waiting_parallelism: 6,
1322            available_parallelism: 15,
1323            pull_task_ack: true,
1324            pending_pull_task_count: 2,
1325        };
1326        assert_ne!(ice_state1, ice_state2);
1327    }
1328
1329    #[test]
1330    fn test_log_throttler_state_change_detection() {
1331        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1332        let state1 = CompactionLogState {
1333            running_parallelism: 10,
1334            pull_task_ack: true,
1335            pending_pull_task_count: 2,
1336        };
1337        let state2 = CompactionLogState {
1338            running_parallelism: 11,
1339            pull_task_ack: true,
1340            pending_pull_task_count: 2,
1341        };
1342
1343        // First call should always log
1344        assert!(throttler.should_log(&state1));
1345        throttler.update(state1.clone());
1346
1347        // Same state should not log
1348        assert!(!throttler.should_log(&state1));
1349
1350        // Changed state should log
1351        assert!(throttler.should_log(&state2));
1352        throttler.update(state2.clone());
1353
1354        // Same state again should not log
1355        assert!(!throttler.should_log(&state2));
1356    }
1357
1358    #[test]
1359    fn test_log_throttler_heartbeat() {
1360        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1361        let state = CompactionLogState {
1362            running_parallelism: 10,
1363            pull_task_ack: true,
1364            pending_pull_task_count: 2,
1365        };
1366
1367        // First call should log
1368        assert!(throttler.should_log(&state));
1369        throttler.update(state.clone());
1370
1371        // Same state immediately should not log
1372        assert!(!throttler.should_log(&state));
1373
1374        // Wait for heartbeat interval to pass
1375        std::thread::sleep(Duration::from_millis(15));
1376
1377        // Same state after interval should log (heartbeat)
1378        assert!(throttler.should_log(&state));
1379    }
1380}