risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use parquet::basic::Compression;
20use parquet::file::properties::WriterProperties;
21use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
22use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
23use risingwave_pb::hummock::PbCompactTask;
24use risingwave_pb::hummock::report_compaction_task_request::{
25    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
26    ReportTask as ReportSharedTask,
27};
28use risingwave_pb::iceberg_compaction::{
29    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
30    subscribe_iceberg_compaction_event_request,
31};
32use risingwave_rpc_client::GrpcCompactorProxyClient;
33use thiserror_ext::AsReport;
34use tokio::sync::mpsc;
35use tonic::Request;
36
37pub mod compactor_runner;
38mod context;
39pub mod fast_compactor_runner;
40mod iterator;
41mod shared_buffer_compact;
42pub(super) mod task_progress;
43
44use std::collections::{HashMap, VecDeque};
45use std::marker::PhantomData;
46use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::{Duration, SystemTime};
49
50use await_tree::{InstrumentAwait, SpanExt};
51pub use compaction_executor::CompactionExecutor;
52pub use compaction_filter::{
53    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
54    TtlCompactionFilter,
55};
56pub use context::{
57    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
58};
59use futures::{StreamExt, pin_mut};
60// Import iceberg compactor runner types from the local `iceberg_compaction` module.
61use iceberg_compaction::iceberg_compactor_runner::{
62    IcebergCompactorRunnerConfigBuilder, create_plan_runners,
63};
64use iceberg_compaction::{IcebergTaskQueue, PushResult};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78    SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104    UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108/// Heartbeat logging interval for compaction tasks
109const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111/// Represents the compaction task state for logging purposes
112#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114    running_parallelism: u32,
115    pull_task_ack: bool,
116    pending_pull_task_count: u32,
117}
118
119/// Represents the iceberg compaction task state for logging purposes
120#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122    running_parallelism: u32,
123    waiting_parallelism: u32,
124    available_parallelism: u32,
125    pull_task_ack: bool,
126    pending_pull_task_count: u32,
127}
128
129/// Controls periodic logging with state change detection
130struct LogThrottler<T: PartialEq> {
131    last_logged_state: Option<T>,
132    last_heartbeat: Instant,
133    heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137    fn new(heartbeat_interval: Duration) -> Self {
138        Self {
139            last_logged_state: None,
140            last_heartbeat: Instant::now(),
141            heartbeat_interval,
142        }
143    }
144
145    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
146    fn should_log(&self, current_state: &T) -> bool {
147        self.last_logged_state.as_ref() != Some(current_state)
148            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149    }
150
151    /// Updates the state and heartbeat timestamp after logging
152    fn update(&mut self, current_state: T) {
153        self.last_logged_state = Some(current_state);
154        self.last_heartbeat = Instant::now();
155    }
156}
157
158/// Implementation of Hummock compaction.
159pub struct Compactor {
160    /// The context of the compactor.
161    context: CompactorContext,
162    object_id_getter: Arc<dyn GetObjectId>,
163    task_config: TaskConfig,
164    options: SstableBuilderOptions,
165    get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171    /// Create a new compactor.
172    pub fn new(
173        context: CompactorContext,
174        options: SstableBuilderOptions,
175        task_config: TaskConfig,
176        object_id_getter: Arc<dyn GetObjectId>,
177    ) -> Self {
178        Self {
179            context,
180            options,
181            task_config,
182            get_id_time: Arc::new(AtomicU64::new(0)),
183            object_id_getter,
184        }
185    }
186
187    /// Compact the given key range and merge iterator.
188    /// Upon a successful return, the built SSTs are already uploaded to object store.
189    ///
190    /// `task_progress` is only used for tasks on the compactor.
191    async fn compact_key_range(
192        &self,
193        iter: impl HummockIterator<Direction = Forward>,
194        compaction_filter: impl CompactionFilter,
195        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196        task_progress: Option<Arc<TaskProgress>>,
197        task_id: Option<HummockCompactionTaskId>,
198        split_index: Option<usize>,
199    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200        // Monitor time cost building shared buffer to SSTs.
201        let compact_timer = if self.context.is_share_buffer_compact {
202            self.context
203                .compactor_metrics
204                .write_build_l0_sst_duration
205                .start_timer()
206        } else {
207            self.context
208                .compactor_metrics
209                .compact_sst_duration
210                .start_timer()
211        };
212
213        let (split_table_outputs, table_stats_map) = {
214            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215            if self.task_config.use_block_based_filter {
216                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217                    factory,
218                    iter,
219                    compaction_filter,
220                    compaction_catalog_agent_ref,
221                    task_progress.clone(),
222                    self.object_id_getter.clone(),
223                )
224                .instrument_await("compact".verbose())
225                .await?
226            } else {
227                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228                    factory,
229                    iter,
230                    compaction_filter,
231                    compaction_catalog_agent_ref,
232                    task_progress.clone(),
233                    self.object_id_getter.clone(),
234                )
235                .instrument_await("compact".verbose())
236                .await?
237            }
238        };
239
240        compact_timer.observe_duration();
241
242        Self::report_progress(
243            self.context.compactor_metrics.clone(),
244            task_progress,
245            &split_table_outputs,
246            self.context.is_share_buffer_compact,
247        );
248
249        self.context
250            .compactor_metrics
251            .get_table_id_total_time_duration
252            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254        debug_assert!(
255            split_table_outputs
256                .iter()
257                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258        );
259
260        if task_id.is_some() {
261            // skip shared buffer compaction
262            tracing::info!(
263                "Finish Task {:?} split_index {:?} sst count {}",
264                task_id,
265                split_index,
266                split_table_outputs.len()
267            );
268        }
269        Ok((split_table_outputs, table_stats_map))
270    }
271
272    pub fn report_progress(
273        metrics: Arc<CompactorMetrics>,
274        task_progress: Option<Arc<TaskProgress>>,
275        ssts: &Vec<LocalSstableInfo>,
276        is_share_buffer_compact: bool,
277    ) {
278        for sst_info in ssts {
279            let sst_size = sst_info.file_size();
280            if let Some(tracker) = &task_progress {
281                tracker.inc_ssts_uploaded();
282                tracker.dec_num_pending_write_io();
283            }
284            if is_share_buffer_compact {
285                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286            } else {
287                metrics.compaction_upload_sst_counts.inc();
288            }
289        }
290    }
291
292    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293        &self,
294        writer_factory: F,
295        iter: impl HummockIterator<Direction = Forward>,
296        compaction_filter: impl CompactionFilter,
297        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298        task_progress: Option<Arc<TaskProgress>>,
299        object_id_getter: Arc<dyn GetObjectId>,
300    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301        let builder_factory = RemoteBuilderFactory::<F, B> {
302            object_id_getter,
303            limiter: self.context.memory_limiter.clone(),
304            options: self.options.clone(),
305            policy: self.task_config.cache_policy,
306            remote_rpc_cost: self.get_id_time.clone(),
307            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308            sstable_writer_factory: writer_factory,
309            _phantom: PhantomData,
310        };
311
312        let mut sst_builder = CapacitySplitTableBuilder::new(
313            builder_factory,
314            self.context.compactor_metrics.clone(),
315            task_progress.clone(),
316            self.task_config.table_vnode_partition.clone(),
317            self.context
318                .storage_opts
319                .compactor_concurrent_uploading_sst_count,
320            compaction_catalog_agent_ref,
321        );
322        let compaction_statistics = compact_and_build_sst(
323            &mut sst_builder,
324            &self.task_config,
325            self.context.compactor_metrics.clone(),
326            iter,
327            compaction_filter,
328        )
329        .instrument_await("compact_and_build_sst".verbose())
330        .await?;
331
332        let ssts = sst_builder
333            .finish()
334            .instrument_await("builder_finish".verbose())
335            .await?;
336
337        Ok((ssts, compaction_statistics))
338    }
339}
340
341/// The background compaction thread that receives compaction tasks from hummock compaction
342/// manager and runs compaction tasks.
343#[must_use]
344pub fn start_iceberg_compactor(
345    compactor_context: CompactorContext,
346    hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349    let stream_retry_interval = Duration::from_secs(30);
350    let periodic_event_update_interval = Duration::from_millis(1000);
351    let worker_num = compactor_context.compaction_executor.worker_num();
352
353    let max_task_parallelism: u32 = (worker_num as f32
354        * compactor_context.storage_opts.compactor_max_task_multiplier)
355        .ceil() as u32;
356
357    const MAX_PULL_TASK_COUNT: u32 = 4;
358    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
359
360    assert_ge!(
361        compactor_context.storage_opts.compactor_max_task_multiplier,
362        0.0
363    );
364
365    let join_handle = tokio::spawn(async move {
366        // Initialize task queue with event-driven scheduling
367        let pending_parallelism_budget = (max_task_parallelism as f32
368            * compactor_context
369                .storage_opts
370                .iceberg_compaction_pending_parallelism_budget_multiplier)
371            .ceil() as u32;
372        let mut task_queue =
373            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
374
375        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
376        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
377
378        // Channel for task completion notifications
379        let (task_completion_tx, mut task_completion_rx) =
380            tokio::sync::mpsc::unbounded_channel::<TaskKey>();
381
382        let mut min_interval = tokio::time::interval(stream_retry_interval);
383        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
384
385        // Track last logged state to avoid duplicate logs
386        let mut log_throttler =
387            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
388
389        // This outer loop is to recreate stream.
390        'start_stream: loop {
391            // reset state
392            // pull_task_ack.store(true, Ordering::SeqCst);
393            let mut pull_task_ack = true;
394            tokio::select! {
395                // Wait for interval.
396                _ = min_interval.tick() => {},
397                // Shutdown compactor.
398                _ = &mut shutdown_rx => {
399                    tracing::info!("Compactor is shutting down");
400                    return;
401                }
402            }
403
404            let (request_sender, response_event_stream) = match hummock_meta_client
405                .subscribe_iceberg_compaction_event()
406                .await
407            {
408                Ok((request_sender, response_event_stream)) => {
409                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
410                    (request_sender, response_event_stream)
411                }
412
413                Err(e) => {
414                    tracing::warn!(
415                        error = %e.as_report(),
416                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
417                    );
418                    continue 'start_stream;
419                }
420            };
421
422            pin_mut!(response_event_stream);
423
424            let _executor = compactor_context.compaction_executor.clone();
425
426            // This inner loop is to consume stream or report task progress.
427            let mut event_loop_iteration_now = Instant::now();
428            'consume_stream: loop {
429                {
430                    // report
431                    compactor_context
432                        .compactor_metrics
433                        .compaction_event_loop_iteration_latency
434                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
435                    event_loop_iteration_now = Instant::now();
436                }
437
438                let request_sender = request_sender.clone();
439                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
440                    // Handle task completion notifications
441                    Some(completed_task_key) = task_completion_rx.recv() => {
442                        tracing::debug!(task_id = completed_task_key.0, plan_index = completed_task_key.1, "Task completed, updating queue state");
443                        task_queue.finish_running(completed_task_key);
444                        continue 'consume_stream;
445                    }
446
447                    // Event-driven task scheduling - wait for tasks to become schedulable
448                    _ = task_queue.wait_schedulable() => {
449                        schedule_queued_tasks(
450                            &mut task_queue,
451                            &compactor_context,
452                            &shutdown_map,
453                            &task_completion_tx,
454                        );
455                        continue 'consume_stream;
456                    }
457
458                    _ = periodic_event_interval.tick() => {
459                        // Only handle meta task pulling in periodic tick
460                        let should_restart_stream = handle_meta_task_pulling(
461                            &mut pull_task_ack,
462                            &task_queue,
463                            max_task_parallelism,
464                            max_pull_task_count,
465                            &request_sender,
466                            &mut log_throttler,
467                        );
468
469                        if should_restart_stream {
470                            continue 'start_stream;
471                        }
472                        continue;
473                    }
474                    event = response_event_stream.next() => {
475                        event
476                    }
477
478                    _ = &mut shutdown_rx => {
479                        tracing::info!("Iceberg Compactor is shutting down");
480                        return
481                    }
482                };
483
484                match event {
485                    Some(Ok(SubscribeIcebergCompactionEventResponse {
486                        event,
487                        create_at: _create_at,
488                    })) => {
489                        let event = match event {
490                            Some(event) => event,
491                            None => continue 'consume_stream,
492                        };
493
494                        match event {
495                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
496                                let task_id = iceberg_compaction_task.task_id;
497                                 let write_parquet_properties = WriterProperties::builder()
498                                        .set_created_by(concat!(
499                                            "risingwave version ",
500                                            env!("CARGO_PKG_VERSION")
501                                        )
502                                        .to_owned())
503                                        .set_max_row_group_size(
504                                            compactor_context.storage_opts.iceberg_compaction_write_parquet_max_row_group_rows
505                                        )
506                                        .set_compression(Compression::SNAPPY) // TODO: make it configurable
507                                        .build();
508
509                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
510                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
511                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
512                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
513                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
514                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
515                                    .write_parquet_properties(write_parquet_properties)
516                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
517                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
518                                    .enable_dynamic_size_estimation(compactor_context.storage_opts.iceberg_compaction_enable_dynamic_size_estimation)
519                                    .size_estimation_smoothing_factor(compactor_context.storage_opts.iceberg_compaction_size_estimation_smoothing_factor)
520                                    .target_binpack_group_size_mb(
521                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
522                                    )
523                                    .min_group_size_mb(
524                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
525                                    )
526                                    .min_group_file_count(
527                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
528                                    )
529                                    .build() {
530                                    Ok(config) => config,
531                                    Err(e) => {
532                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
533                                        continue 'consume_stream;
534                                    }
535                                };
536
537                                // Create multiple plan runners from the task
538                                let plan_runners = match create_plan_runners(
539                                    iceberg_compaction_task,
540                                    compactor_runner_config,
541                                    compactor_context.compactor_metrics.clone(),
542                                ).await {
543                                    Ok(runners) => runners,
544                                    Err(e) => {
545                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
546                                        continue 'consume_stream;
547                                    }
548                                };
549
550                                if plan_runners.is_empty() {
551                                    tracing::info!(task_id, "No plans to execute");
552                                    continue 'consume_stream;
553                                }
554
555                                // Enqueue each plan runner independently
556                                let total_plans = plan_runners.len();
557                                let mut enqueued_count = 0;
558
559                                for runner in plan_runners {
560                                    let meta = runner.to_meta();
561                                    let required_parallelism = runner.required_parallelism();
562                                    let push_result = task_queue.push(meta.clone(), Some(runner));
563
564                                    match push_result {
565                                        PushResult::Added => {
566                                            enqueued_count += 1;
567                                            tracing::debug!(
568                                                task_id = task_id,
569                                                plan_index = enqueued_count - 1,
570                                                required_parallelism = required_parallelism,
571                                                "Iceberg plan runner added to queue"
572                                            );
573                                        },
574                                        PushResult::RejectedCapacity => {
575                                            tracing::warn!(
576                                                task_id = task_id,
577                                                required_parallelism = required_parallelism,
578                                                pending_budget = pending_parallelism_budget,
579                                                enqueued_count = enqueued_count,
580                                                total_plans = total_plans,
581                                                "Iceberg plan runner rejected - queue capacity exceeded"
582                                            );
583                                            // Stop enqueuing remaining plans
584                                            break;
585                                        },
586                                        PushResult::RejectedTooLarge => {
587                                            tracing::error!(
588                                                task_id = task_id,
589                                                required_parallelism = required_parallelism,
590                                                max_parallelism = max_task_parallelism,
591                                                "Iceberg plan runner rejected - parallelism exceeds max"
592                                            );
593                                        },
594                                        PushResult::RejectedInvalidParallelism => {
595                                            tracing::error!(
596                                                task_id = task_id,
597                                                required_parallelism = required_parallelism,
598                                                "Iceberg plan runner rejected - invalid parallelism"
599                                            );
600                                        },
601                                        PushResult::RejectedDuplicate => {
602                                            tracing::error!(
603                                                task_id = task_id,
604                                                plan_index = meta.plan_index,
605                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
606                                            );
607                                        }
608                                    }
609                                }
610
611                                tracing::info!(
612                                    task_id = task_id,
613                                    total_plans = total_plans,
614                                    enqueued_count = enqueued_count,
615                                    "Enqueued {} of {} Iceberg plan runners",
616                                    enqueued_count,
617                                    total_plans
618                                );
619                            },
620                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
621                                // set flag
622                                pull_task_ack = true;
623                            },
624                        }
625                    }
626                    Some(Err(e)) => {
627                        tracing::warn!("Failed to consume stream. {}", e.message());
628                        continue 'start_stream;
629                    }
630                    _ => {
631                        // The stream is exhausted
632                        continue 'start_stream;
633                    }
634                }
635            }
636        }
637    });
638
639    (join_handle, shutdown_tx)
640}
641
642/// The background compaction thread that receives compaction tasks from hummock compaction
643/// manager and runs compaction tasks.
644#[must_use]
645pub fn start_compactor(
646    compactor_context: CompactorContext,
647    hummock_meta_client: Arc<dyn HummockMetaClient>,
648    object_id_manager: Arc<ObjectIdManager>,
649    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
650) -> (JoinHandle<()>, Sender<()>) {
651    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
652    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
653    let stream_retry_interval = Duration::from_secs(30);
654    let task_progress = compactor_context.task_progress_manager.clone();
655    let periodic_event_update_interval = Duration::from_millis(1000);
656
657    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
658        * compactor_context.storage_opts.compactor_max_task_multiplier)
659        .ceil() as u32;
660    let running_task_parallelism = Arc::new(AtomicU32::new(0));
661
662    const MAX_PULL_TASK_COUNT: u32 = 4;
663    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
664
665    assert_ge!(
666        compactor_context.storage_opts.compactor_max_task_multiplier,
667        0.0
668    );
669
670    let join_handle = tokio::spawn(async move {
671        let shutdown_map = CompactionShutdownMap::default();
672        let mut min_interval = tokio::time::interval(stream_retry_interval);
673        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
674
675        // Track last logged state to avoid duplicate logs
676        let mut log_throttler =
677            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
678
679        // This outer loop is to recreate stream.
680        'start_stream: loop {
681            // reset state
682            // pull_task_ack.store(true, Ordering::SeqCst);
683            let mut pull_task_ack = true;
684            tokio::select! {
685                // Wait for interval.
686                _ = min_interval.tick() => {},
687                // Shutdown compactor.
688                _ = &mut shutdown_rx => {
689                    tracing::info!("Compactor is shutting down");
690                    return;
691                }
692            }
693
694            let (request_sender, response_event_stream) =
695                match hummock_meta_client.subscribe_compaction_event().await {
696                    Ok((request_sender, response_event_stream)) => {
697                        tracing::debug!("Succeeded subscribe_compaction_event.");
698                        (request_sender, response_event_stream)
699                    }
700
701                    Err(e) => {
702                        tracing::warn!(
703                            error = %e.as_report(),
704                            "Subscribing to compaction tasks failed with error. Will retry.",
705                        );
706                        continue 'start_stream;
707                    }
708                };
709
710            pin_mut!(response_event_stream);
711
712            let executor = compactor_context.compaction_executor.clone();
713            let object_id_manager = object_id_manager.clone();
714
715            // This inner loop is to consume stream or report task progress.
716            let mut event_loop_iteration_now = Instant::now();
717            'consume_stream: loop {
718                {
719                    // report
720                    compactor_context
721                        .compactor_metrics
722                        .compaction_event_loop_iteration_latency
723                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
724                    event_loop_iteration_now = Instant::now();
725                }
726
727                let running_task_parallelism = running_task_parallelism.clone();
728                let request_sender = request_sender.clone();
729                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
730                    _ = periodic_event_interval.tick() => {
731                        let progress_list = get_task_progress(task_progress.clone());
732
733                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
734                            event: Some(RequestEvent::HeartBeat(
735                                HeartBeat {
736                                    progress: progress_list
737                                }
738                            )),
739                            create_at: SystemTime::now()
740                                .duration_since(std::time::UNIX_EPOCH)
741                                .expect("Clock may have gone backwards")
742                                .as_millis() as u64,
743                        }) {
744                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
745                            // re subscribe stream
746                            continue 'start_stream;
747                        }
748
749
750                        let mut pending_pull_task_count = 0;
751                        if pull_task_ack {
752                            // TODO: Compute parallelism on meta side
753                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
754
755                            if pending_pull_task_count > 0 {
756                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
757                                    event: Some(RequestEvent::PullTask(
758                                        PullTask {
759                                            pull_task_count: pending_pull_task_count,
760                                        }
761                                    )),
762                                    create_at: SystemTime::now()
763                                        .duration_since(std::time::UNIX_EPOCH)
764                                        .expect("Clock may have gone backwards")
765                                        .as_millis() as u64,
766                                }) {
767                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
768
769                                    // re subscribe stream
770                                    continue 'start_stream;
771                                } else {
772                                    pull_task_ack = false;
773                                }
774                            }
775                        }
776
777                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
778                        let current_state = CompactionLogState {
779                            running_parallelism: running_count,
780                            pull_task_ack,
781                            pending_pull_task_count,
782                        };
783
784                        // Log only when state changes or periodically as heartbeat
785                        if log_throttler.should_log(&current_state) {
786                            tracing::info!(
787                                running_parallelism_count = %current_state.running_parallelism,
788                                pull_task_ack = %current_state.pull_task_ack,
789                                pending_pull_task_count = %current_state.pending_pull_task_count
790                            );
791                            log_throttler.update(current_state);
792                        }
793
794                        continue;
795                    }
796                    event = response_event_stream.next() => {
797                        event
798                    }
799
800                    _ = &mut shutdown_rx => {
801                        tracing::info!("Compactor is shutting down");
802                        return
803                    }
804                };
805
806                fn send_report_task_event(
807                    compact_task: &CompactTask,
808                    table_stats: TableStatsMap,
809                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
810                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
811                ) {
812                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
813                        event: Some(RequestEvent::ReportTask(ReportTask {
814                            task_id: compact_task.task_id,
815                            task_status: compact_task.task_status.into(),
816                            sorted_output_ssts: compact_task
817                                .sorted_output_ssts
818                                .iter()
819                                .map(|sst| sst.into())
820                                .collect(),
821                            table_stats_change: to_prost_table_stats_map(table_stats),
822                            object_timestamps,
823                        })),
824                        create_at: SystemTime::now()
825                            .duration_since(std::time::UNIX_EPOCH)
826                            .expect("Clock may have gone backwards")
827                            .as_millis() as u64,
828                    }) {
829                        let task_id = compact_task.task_id;
830                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
831                    }
832                }
833
834                match event {
835                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
836                        let event = match event {
837                            Some(event) => event,
838                            None => continue 'consume_stream,
839                        };
840                        let shutdown = shutdown_map.clone();
841                        let context = compactor_context.clone();
842                        let consumed_latency_ms = SystemTime::now()
843                            .duration_since(std::time::UNIX_EPOCH)
844                            .expect("Clock may have gone backwards")
845                            .as_millis() as u64
846                            - create_at;
847                        context
848                            .compactor_metrics
849                            .compaction_event_consumed_latency
850                            .observe(consumed_latency_ms as _);
851
852                        let object_id_manager = object_id_manager.clone();
853                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
854
855                        match event {
856                            ResponseEvent::CompactTask(compact_task) => {
857                                let compact_task = CompactTask::from(compact_task);
858                                let parallelism =
859                                    calculate_task_parallelism(&compact_task, &context);
860
861                                assert_ne!(parallelism, 0, "splits cannot be empty");
862
863                                if (max_task_parallelism
864                                    - running_task_parallelism.load(Ordering::SeqCst))
865                                    < parallelism as u32
866                                {
867                                    tracing::warn!(
868                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
869                                        compact_task.task_id,
870                                        parallelism,
871                                        max_task_parallelism,
872                                        running_task_parallelism.load(Ordering::Relaxed),
873                                    );
874                                    let (compact_task, table_stats, object_timestamps) =
875                                        compact_done(
876                                            compact_task,
877                                            context.clone(),
878                                            vec![],
879                                            TaskStatus::NoAvailCpuResourceCanceled,
880                                        );
881
882                                    send_report_task_event(
883                                        &compact_task,
884                                        table_stats,
885                                        object_timestamps,
886                                        &request_sender,
887                                    );
888
889                                    continue 'consume_stream;
890                                }
891
892                                running_task_parallelism
893                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
894                                executor.spawn(async move {
895                                    let (tx, rx) = tokio::sync::oneshot::channel();
896                                    let task_id = compact_task.task_id;
897                                    shutdown.lock().unwrap().insert(task_id, tx);
898
899                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
900                                        context.clone(),
901                                        compact_task,
902                                        rx,
903                                        object_id_manager.clone(),
904                                        compaction_catalog_manager_ref.clone(),
905                                    )
906                                    .await;
907
908                                    shutdown.lock().unwrap().remove(&task_id);
909                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
910
911                                    send_report_task_event(
912                                        &compact_task,
913                                        table_stats,
914                                        object_timestamps,
915                                        &request_sender,
916                                    );
917
918                                    let enable_check_compaction_result =
919                                    context.storage_opts.check_compaction_result;
920                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
921
922                                    if enable_check_compaction_result && need_check_task {
923                                        let compact_table_ids = compact_task.build_compact_table_ids();
924                                        match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
925                                            Ok(compaction_catalog_agent_ref) =>  {
926                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
927                                                {
928                                                    Err(e) => {
929                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
930                                                    }
931                                                    Ok(true) => (),
932                                                    Ok(false) => {
933                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
934                                                    }
935                                                }
936                                            },
937                                            Err(e) => {
938                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
939                                            }
940                                        }
941                                    }
942                                });
943                            }
944                            ResponseEvent::VacuumTask(_) => {
945                                unreachable!("unexpected vacuum task");
946                            }
947                            ResponseEvent::FullScanTask(_) => {
948                                unreachable!("unexpected scan task");
949                            }
950                            ResponseEvent::ValidationTask(validation_task) => {
951                                let validation_task = ValidationTask::from(validation_task);
952                                executor.spawn(async move {
953                                    validate_ssts(validation_task, context.sstable_store.clone())
954                                        .await;
955                                });
956                            }
957                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
958                                .lock()
959                                .unwrap()
960                                .remove(&cancel_compact_task.task_id)
961                            {
962                                Some(tx) => {
963                                    if tx.send(()).is_err() {
964                                        tracing::warn!(
965                                            "Cancellation of compaction task failed. task_id: {}",
966                                            cancel_compact_task.task_id
967                                        );
968                                    }
969                                }
970                                _ => {
971                                    tracing::warn!(
972                                        "Attempting to cancel non-existent compaction task. task_id: {}",
973                                        cancel_compact_task.task_id
974                                    );
975                                }
976                            },
977
978                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
979                                // set flag
980                                pull_task_ack = true;
981                            }
982                        }
983                    }
984                    Some(Err(e)) => {
985                        tracing::warn!("Failed to consume stream. {}", e.message());
986                        continue 'start_stream;
987                    }
988                    _ => {
989                        // The stream is exhausted
990                        continue 'start_stream;
991                    }
992                }
993            }
994        }
995    });
996
997    (join_handle, shutdown_tx)
998}
999
1000/// The background compaction thread that receives compaction tasks from hummock compaction
1001/// manager and runs compaction tasks.
1002#[must_use]
1003pub fn start_shared_compactor(
1004    grpc_proxy_client: GrpcCompactorProxyClient,
1005    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1006    context: CompactorContext,
1007) -> (JoinHandle<()>, Sender<()>) {
1008    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1009    let task_progress = context.task_progress_manager.clone();
1010    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1011    let periodic_event_update_interval = Duration::from_millis(1000);
1012
1013    let join_handle = tokio::spawn(async move {
1014        let shutdown_map = CompactionShutdownMap::default();
1015
1016        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1017        let executor = context.compaction_executor.clone();
1018        let report_heartbeat_client = grpc_proxy_client.clone();
1019        'consume_stream: loop {
1020            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1021                _ = periodic_event_interval.tick() => {
1022                    let progress_list = get_task_progress(task_progress.clone());
1023                    let report_compaction_task_request = ReportCompactionTaskRequest{
1024                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1025                            SharedHeartBeat {
1026                                progress: progress_list
1027                            }
1028                        )),
1029                     };
1030                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1031                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1032                    }
1033                    continue
1034                }
1035
1036
1037                _ = &mut shutdown_rx => {
1038                    tracing::info!("Compactor is shutting down");
1039                    return
1040                }
1041
1042                request = receiver.recv() => {
1043                    request
1044                }
1045
1046            };
1047            match request {
1048                Some(request) => {
1049                    let context = context.clone();
1050                    let shutdown = shutdown_map.clone();
1051
1052                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1053                    executor.spawn(async move {
1054                        let DispatchCompactionTaskRequest {
1055                            tables,
1056                            output_object_ids,
1057                            task: dispatch_task,
1058                        } = request.into_inner();
1059                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1060                            acc.insert(table.id, table);
1061                            acc
1062                        });
1063
1064                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1065                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1066                        let shared_compactor_object_id_manager =
1067                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1068                            match dispatch_task.unwrap() {
1069                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1070                                    let compact_task = CompactTask::from(&compact_task);
1071                                    let (tx, rx) = tokio::sync::oneshot::channel();
1072                                    let task_id = compact_task.task_id;
1073                                    shutdown.lock().unwrap().insert(task_id, tx);
1074
1075                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1076                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1077                                        context.clone(),
1078                                        compact_task,
1079                                        rx,
1080                                        shared_compactor_object_id_manager,
1081                                        compaction_catalog_agent_ref.clone(),
1082                                    )
1083                                    .await;
1084                                    shutdown.lock().unwrap().remove(&task_id);
1085                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1086                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1087                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1088                                            table_stats_change: to_prost_table_stats_map(table_stats),
1089                                            object_timestamps,
1090                                    })),
1091                                    };
1092
1093                                    match cloned_grpc_proxy_client
1094                                        .report_compaction_task(report_compaction_task_request)
1095                                        .await
1096                                    {
1097                                        Ok(_) => {
1098                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1099                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1100                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1101                                            if enable_check_compaction_result && need_check_task {
1102                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1103                                                    Err(e) => {
1104                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1105                                                    },
1106                                                    Ok(true) => (),
1107                                                    Ok(false) => {
1108                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1109                                                    }
1110                                                }
1111                                            }
1112                                        }
1113                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1114                                    }
1115
1116                                }
1117                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1118                                    unreachable!("unexpected vacuum task");
1119                                }
1120                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1121                                    unreachable!("unexpected scan task");
1122                                }
1123                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1124                                    let validation_task = ValidationTask::from(validation_task);
1125                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1126                                }
1127                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1128                                    match shutdown
1129                                        .lock()
1130                                        .unwrap()
1131                                        .remove(&cancel_compact_task.task_id)
1132                                    { Some(tx) => {
1133                                        if tx.send(()).is_err() {
1134                                            tracing::warn!(
1135                                                "Cancellation of compaction task failed. task_id: {}",
1136                                                cancel_compact_task.task_id
1137                                            );
1138                                        }
1139                                    } _ => {
1140                                        tracing::warn!(
1141                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1142                                            cancel_compact_task.task_id
1143                                        );
1144                                    }}
1145                                }
1146                            }
1147                    });
1148                }
1149                None => continue 'consume_stream,
1150            }
1151        }
1152    });
1153    (join_handle, shutdown_tx)
1154}
1155
1156fn get_task_progress(
1157    task_progress: Arc<
1158        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1159    >,
1160) -> Vec<CompactTaskProgress> {
1161    let mut progress_list = Vec::new();
1162    for (&task_id, progress) in &*task_progress.lock() {
1163        progress_list.push(progress.snapshot(task_id));
1164    }
1165    progress_list
1166}
1167
1168/// Schedule queued tasks if we have capacity
1169fn schedule_queued_tasks(
1170    task_queue: &mut IcebergTaskQueue,
1171    compactor_context: &CompactorContext,
1172    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1173    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<TaskKey>,
1174) {
1175    while let Some(popped_task) = task_queue.pop() {
1176        let task_id = popped_task.meta.task_id;
1177        let plan_index = popped_task.meta.plan_index;
1178        let task_key = (task_id, plan_index);
1179
1180        // Get unique_ident before moving runner
1181        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1182
1183        let Some(runner) = popped_task.runner else {
1184            tracing::error!(
1185                task_id = task_id,
1186                plan_index = plan_index,
1187                "Popped task missing runner - this should not happen"
1188            );
1189            task_queue.finish_running(task_key);
1190            continue;
1191        };
1192
1193        let executor = compactor_context.compaction_executor.clone();
1194        let shutdown_map_clone = shutdown_map.clone();
1195        let completion_tx_clone = task_completion_tx.clone();
1196
1197        tracing::info!(
1198            task_id = task_id,
1199            plan_index = plan_index,
1200            unique_ident = ?unique_ident,
1201            required_parallelism = popped_task.meta.required_parallelism,
1202            "Starting iceberg compaction task from queue"
1203        );
1204
1205        executor.spawn(async move {
1206            let (tx, rx) = tokio::sync::oneshot::channel();
1207            {
1208                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1209                shutdown_guard.insert(task_key, tx);
1210            }
1211
1212            let _cleanup_guard = scopeguard::guard(
1213                (task_key, shutdown_map_clone, completion_tx_clone),
1214                move |(task_key, shutdown_map, completion_tx)| {
1215                    {
1216                        let mut shutdown_guard = shutdown_map.lock().unwrap();
1217                        shutdown_guard.remove(&task_key);
1218                    }
1219                    // Notify main loop that task is completed
1220                    // Multiple tasks can send completion notifications concurrently via mpsc
1221                    if completion_tx.send(task_key).is_err() {
1222                        tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1223                    }
1224                },
1225            );
1226
1227            if let Err(e) = runner.compact(rx).await {
1228                tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1229            }
1230        });
1231    }
1232}
1233
1234/// Handle pulling new tasks from meta service
1235/// Returns true if the stream should be restarted
1236fn handle_meta_task_pulling(
1237    pull_task_ack: &mut bool,
1238    task_queue: &IcebergTaskQueue,
1239    max_task_parallelism: u32,
1240    max_pull_task_count: u32,
1241    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1242    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1243) -> bool {
1244    let mut pending_pull_task_count = 0;
1245    if *pull_task_ack {
1246        // Use queue's running parallelism for pull decision
1247        let current_running_parallelism = task_queue.running_parallelism_sum();
1248        pending_pull_task_count =
1249            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1250
1251        if pending_pull_task_count > 0 {
1252            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1253                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1254                    subscribe_iceberg_compaction_event_request::PullTask {
1255                        pull_task_count: pending_pull_task_count,
1256                    },
1257                )),
1258                create_at: SystemTime::now()
1259                    .duration_since(std::time::UNIX_EPOCH)
1260                    .expect("Clock may have gone backwards")
1261                    .as_millis() as u64,
1262            }) {
1263                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1264                return true; // Signal to restart stream
1265            } else {
1266                *pull_task_ack = false;
1267            }
1268        }
1269    }
1270
1271    let running_count = task_queue.running_parallelism_sum();
1272    let waiting_count = task_queue.waiting_parallelism_sum();
1273    let available_count = max_task_parallelism.saturating_sub(running_count);
1274    let current_state = IcebergCompactionLogState {
1275        running_parallelism: running_count,
1276        waiting_parallelism: waiting_count,
1277        available_parallelism: available_count,
1278        pull_task_ack: *pull_task_ack,
1279        pending_pull_task_count,
1280    };
1281
1282    // Log only when state changes or periodically as heartbeat
1283    if log_throttler.should_log(&current_state) {
1284        tracing::info!(
1285            running_parallelism_count = %current_state.running_parallelism,
1286            waiting_parallelism_count = %current_state.waiting_parallelism,
1287            available_parallelism = %current_state.available_parallelism,
1288            pull_task_ack = %current_state.pull_task_ack,
1289            pending_pull_task_count = %current_state.pending_pull_task_count
1290        );
1291        log_throttler.update(current_state);
1292    }
1293
1294    false // No need to restart stream
1295}
1296
1297#[cfg(test)]
1298mod tests {
1299    use super::*;
1300
1301    #[test]
1302    fn test_log_state_equality() {
1303        // Test CompactionLogState
1304        let state1 = CompactionLogState {
1305            running_parallelism: 10,
1306            pull_task_ack: true,
1307            pending_pull_task_count: 2,
1308        };
1309        let state2 = CompactionLogState {
1310            running_parallelism: 10,
1311            pull_task_ack: true,
1312            pending_pull_task_count: 2,
1313        };
1314        let state3 = CompactionLogState {
1315            running_parallelism: 11,
1316            pull_task_ack: true,
1317            pending_pull_task_count: 2,
1318        };
1319        assert_eq!(state1, state2);
1320        assert_ne!(state1, state3);
1321
1322        // Test IcebergCompactionLogState
1323        let ice_state1 = IcebergCompactionLogState {
1324            running_parallelism: 10,
1325            waiting_parallelism: 5,
1326            available_parallelism: 15,
1327            pull_task_ack: true,
1328            pending_pull_task_count: 2,
1329        };
1330        let ice_state2 = IcebergCompactionLogState {
1331            running_parallelism: 10,
1332            waiting_parallelism: 6,
1333            available_parallelism: 15,
1334            pull_task_ack: true,
1335            pending_pull_task_count: 2,
1336        };
1337        assert_ne!(ice_state1, ice_state2);
1338    }
1339
1340    #[test]
1341    fn test_log_throttler_state_change_detection() {
1342        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1343        let state1 = CompactionLogState {
1344            running_parallelism: 10,
1345            pull_task_ack: true,
1346            pending_pull_task_count: 2,
1347        };
1348        let state2 = CompactionLogState {
1349            running_parallelism: 11,
1350            pull_task_ack: true,
1351            pending_pull_task_count: 2,
1352        };
1353
1354        // First call should always log
1355        assert!(throttler.should_log(&state1));
1356        throttler.update(state1.clone());
1357
1358        // Same state should not log
1359        assert!(!throttler.should_log(&state1));
1360
1361        // Changed state should log
1362        assert!(throttler.should_log(&state2));
1363        throttler.update(state2.clone());
1364
1365        // Same state again should not log
1366        assert!(!throttler.should_log(&state2));
1367    }
1368
1369    #[test]
1370    fn test_log_throttler_heartbeat() {
1371        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1372        let state = CompactionLogState {
1373            running_parallelism: 10,
1374            pull_task_ack: true,
1375            pending_pull_task_count: 2,
1376        };
1377
1378        // First call should log
1379        assert!(throttler.should_log(&state));
1380        throttler.update(state.clone());
1381
1382        // Same state immediately should not log
1383        assert!(!throttler.should_log(&state));
1384
1385        // Wait for heartbeat interval to pass
1386        std::thread::sleep(Duration::from_millis(15));
1387
1388        // Same state after interval should log (heartbeat)
1389        assert!(throttler.should_log(&state));
1390    }
1391}