risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24    ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28    subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58// Import iceberg compactor runner types from the local `iceberg_compaction` module.
59use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61    IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62    ReportSendResult, build_iceberg_task_report, create_task_execution,
63    flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78    SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104    UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108/// Heartbeat logging interval for compaction tasks
109const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111/// Represents the compaction task state for logging purposes
112#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114    running_parallelism: u32,
115    pull_task_ack: bool,
116    pending_pull_task_count: u32,
117}
118
119/// Represents the iceberg compaction task state for logging purposes
120#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122    running_parallelism: u32,
123    waiting_parallelism: u32,
124    available_parallelism: u32,
125    pull_task_ack: bool,
126    pending_pull_task_count: u32,
127}
128
129/// Controls periodic logging with state change detection
130struct LogThrottler<T: PartialEq> {
131    last_logged_state: Option<T>,
132    last_heartbeat: Instant,
133    heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137    fn new(heartbeat_interval: Duration) -> Self {
138        Self {
139            last_logged_state: None,
140            last_heartbeat: Instant::now(),
141            heartbeat_interval,
142        }
143    }
144
145    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
146    fn should_log(&self, current_state: &T) -> bool {
147        self.last_logged_state.as_ref() != Some(current_state)
148            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149    }
150
151    /// Updates the state and heartbeat timestamp after logging
152    fn update(&mut self, current_state: T) {
153        self.last_logged_state = Some(current_state);
154        self.last_heartbeat = Instant::now();
155    }
156}
157
158/// Implementation of Hummock compaction.
159pub struct Compactor {
160    /// The context of the compactor.
161    context: CompactorContext,
162    object_id_getter: Arc<dyn GetObjectId>,
163    task_config: TaskConfig,
164    options: SstableBuilderOptions,
165    get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171    /// Create a new compactor.
172    pub fn new(
173        context: CompactorContext,
174        options: SstableBuilderOptions,
175        task_config: TaskConfig,
176        object_id_getter: Arc<dyn GetObjectId>,
177    ) -> Self {
178        Self {
179            context,
180            options,
181            task_config,
182            get_id_time: Arc::new(AtomicU64::new(0)),
183            object_id_getter,
184        }
185    }
186
187    /// Compact the given key range and merge iterator.
188    /// Upon a successful return, the built SSTs are already uploaded to object store.
189    ///
190    /// `task_progress` is only used for tasks on the compactor.
191    async fn compact_key_range(
192        &self,
193        iter: impl HummockIterator<Direction = Forward>,
194        compaction_filter: impl CompactionFilter,
195        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196        task_progress: Option<Arc<TaskProgress>>,
197        task_id: Option<HummockCompactionTaskId>,
198        split_index: Option<usize>,
199    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200        // Monitor time cost building shared buffer to SSTs.
201        let compact_timer = if self.context.is_share_buffer_compact {
202            self.context
203                .compactor_metrics
204                .write_build_l0_sst_duration
205                .start_timer()
206        } else {
207            self.context
208                .compactor_metrics
209                .compact_sst_duration
210                .start_timer()
211        };
212
213        let (split_table_outputs, table_stats_map) = {
214            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215            if self.task_config.use_block_based_filter {
216                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217                    factory,
218                    iter,
219                    compaction_filter,
220                    compaction_catalog_agent_ref,
221                    task_progress.clone(),
222                    self.object_id_getter.clone(),
223                )
224                .instrument_await("compact".verbose())
225                .await?
226            } else {
227                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228                    factory,
229                    iter,
230                    compaction_filter,
231                    compaction_catalog_agent_ref,
232                    task_progress.clone(),
233                    self.object_id_getter.clone(),
234                )
235                .instrument_await("compact".verbose())
236                .await?
237            }
238        };
239
240        compact_timer.observe_duration();
241
242        Self::report_progress(
243            self.context.compactor_metrics.clone(),
244            task_progress,
245            &split_table_outputs,
246            self.context.is_share_buffer_compact,
247        );
248
249        self.context
250            .compactor_metrics
251            .get_table_id_total_time_duration
252            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254        debug_assert!(
255            split_table_outputs
256                .iter()
257                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258        );
259
260        if task_id.is_some() {
261            // skip shared buffer compaction
262            tracing::info!(
263                "Finish Task {:?} split_index {:?} sst count {}",
264                task_id,
265                split_index,
266                split_table_outputs.len()
267            );
268        }
269        Ok((split_table_outputs, table_stats_map))
270    }
271
272    pub fn report_progress(
273        metrics: Arc<CompactorMetrics>,
274        task_progress: Option<Arc<TaskProgress>>,
275        ssts: &Vec<LocalSstableInfo>,
276        is_share_buffer_compact: bool,
277    ) {
278        for sst_info in ssts {
279            let sst_size = sst_info.file_size();
280            if let Some(tracker) = &task_progress {
281                tracker.inc_ssts_uploaded();
282                tracker.dec_num_pending_write_io();
283            }
284            if is_share_buffer_compact {
285                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286            } else {
287                metrics.compaction_upload_sst_counts.inc();
288            }
289        }
290    }
291
292    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293        &self,
294        writer_factory: F,
295        iter: impl HummockIterator<Direction = Forward>,
296        compaction_filter: impl CompactionFilter,
297        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298        task_progress: Option<Arc<TaskProgress>>,
299        object_id_getter: Arc<dyn GetObjectId>,
300    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301        let builder_factory = RemoteBuilderFactory::<F, B> {
302            object_id_getter,
303            limiter: self.context.memory_limiter.clone(),
304            options: self.options.clone(),
305            policy: self.task_config.cache_policy,
306            remote_rpc_cost: self.get_id_time.clone(),
307            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308            sstable_writer_factory: writer_factory,
309            _phantom: PhantomData,
310        };
311
312        let mut sst_builder = CapacitySplitTableBuilder::new(
313            builder_factory,
314            self.context.compactor_metrics.clone(),
315            task_progress.clone(),
316            self.task_config.table_vnode_partition.clone(),
317            self.context
318                .storage_opts
319                .compactor_concurrent_uploading_sst_count,
320            compaction_catalog_agent_ref,
321        );
322        let compaction_statistics = compact_and_build_sst(
323            &mut sst_builder,
324            &self.task_config,
325            self.context.compactor_metrics.clone(),
326            iter,
327            compaction_filter,
328        )
329        .instrument_await("compact_and_build_sst".verbose())
330        .await?;
331
332        let ssts = sst_builder
333            .finish()
334            .instrument_await("builder_finish".verbose())
335            .await?;
336
337        Ok((ssts, compaction_statistics))
338    }
339}
340
341/// The background compaction thread that receives compaction tasks from hummock compaction
342/// manager and runs compaction tasks.
343#[must_use]
344pub fn start_iceberg_compactor(
345    compactor_context: CompactorContext,
346    hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349    let stream_retry_interval = Duration::from_secs(30);
350    let periodic_event_update_interval = Duration::from_millis(
351        compactor_context
352            .storage_opts
353            .iceberg_compaction_pull_interval_ms,
354    );
355    let worker_num = compactor_context.compaction_executor.worker_num();
356
357    let max_task_parallelism: u32 = (worker_num as f32
358        * compactor_context.storage_opts.compactor_max_task_multiplier)
359        .ceil() as u32;
360
361    const MAX_PULL_TASK_COUNT: u32 = 4;
362    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
363
364    assert_ge!(
365        compactor_context.storage_opts.compactor_max_task_multiplier,
366        0.0
367    );
368
369    let join_handle = tokio::spawn(async move {
370        // Initialize task queue with event-driven scheduling
371        let pending_parallelism_budget = (max_task_parallelism as f32
372            * compactor_context
373                .storage_opts
374                .iceberg_compaction_pending_parallelism_budget_multiplier)
375            .ceil() as u32;
376        let mut task_queue =
377            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
378
379        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
380        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
381
382        // Channel for task completion notifications
383        let (task_completion_tx, mut task_completion_rx) =
384            tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
385        let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
386        // Buffers task reports that failed to send on the current stream.
387        // The queue is flushed in FIFO order after the stream reconnects.
388        let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
389
390        let mut min_interval = tokio::time::interval(stream_retry_interval);
391        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
392
393        // Track last logged state to avoid duplicate logs
394        let mut log_throttler =
395            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
396
397        // This outer loop is to recreate stream.
398        'start_stream: loop {
399            // reset state
400            // pull_task_ack.store(true, Ordering::SeqCst);
401            let mut pull_task_ack = true;
402            tokio::select! {
403                // Wait for interval.
404                _ = min_interval.tick() => {},
405                // Shutdown compactor.
406                _ = &mut shutdown_rx => {
407                    tracing::info!("Compactor is shutting down");
408                    return;
409                }
410            }
411
412            let (request_sender, response_event_stream) = match hummock_meta_client
413                .subscribe_iceberg_compaction_event()
414                .await
415            {
416                Ok((request_sender, response_event_stream)) => {
417                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
418                    (request_sender, response_event_stream)
419                }
420
421                Err(e) => {
422                    tracing::warn!(
423                        error = %e.as_report(),
424                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
425                    );
426                    continue 'start_stream;
427                }
428            };
429
430            if matches!(
431                flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
432                ReportSendResult::RestartStream
433            ) {
434                continue 'start_stream;
435            }
436
437            pin_mut!(response_event_stream);
438
439            let _executor = compactor_context.compaction_executor.clone();
440
441            // This inner loop is to consume stream or report task progress.
442            let mut event_loop_iteration_now = Instant::now();
443            'consume_stream: loop {
444                {
445                    // report
446                    compactor_context
447                        .compactor_metrics
448                        .compaction_event_loop_iteration_latency
449                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
450                    event_loop_iteration_now = Instant::now();
451                }
452
453                let request_sender = request_sender.clone();
454                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
455                    // Handle task completion notifications
456                    Some(plan_completion) = task_completion_rx.recv() => {
457                        let task_key = plan_completion.task_key;
458                        let error_message = plan_completion.error_message;
459                        tracing::debug!(
460                            task_id = task_key.0,
461                            plan_index = task_key.1,
462                            success = error_message.is_none(),
463                            "Plan completed, updating queue state"
464                        );
465                        task_queue.finish_running(task_key);
466
467                        let completed_task_id = task_key.0;
468                        let Entry::Occupied(mut tracker_entry) =
469                            task_trackers.entry(completed_task_id)
470                        else {
471                            continue 'consume_stream;
472                        };
473                        tracker_entry.get_mut().record_completion(error_message);
474                        if !tracker_entry.get().is_finished() {
475                            continue 'consume_stream;
476                        }
477
478                        let report = tracker_entry.remove().into_report(completed_task_id);
479                        if matches!(
480                            send_or_buffer_iceberg_task_report(
481                                &request_sender,
482                                &mut pending_task_reports,
483                                report,
484                            ),
485                            ReportSendResult::RestartStream
486                        ) {
487                            continue 'start_stream;
488                        }
489                        continue 'consume_stream;
490                    }
491
492                    // Event-driven task scheduling - wait for tasks to become schedulable
493                    _ = task_queue.wait_schedulable() => {
494                        schedule_queued_tasks(
495                            &mut task_queue,
496                            &compactor_context,
497                            &shutdown_map,
498                            &task_completion_tx,
499                        );
500                        continue 'consume_stream;
501                    }
502
503                    _ = periodic_event_interval.tick() => {
504                        // Only handle meta task pulling in periodic tick
505                        let should_restart_stream = handle_meta_task_pulling(
506                            &mut pull_task_ack,
507                            &task_queue,
508                            max_task_parallelism,
509                            max_pull_task_count,
510                            &request_sender,
511                            &mut log_throttler,
512                        );
513
514                        if should_restart_stream {
515                            continue 'start_stream;
516                        }
517                        continue;
518                    }
519                    event = response_event_stream.next() => {
520                        event
521                    }
522
523                    _ = &mut shutdown_rx => {
524                        tracing::info!("Iceberg Compactor is shutting down");
525                        return
526                    }
527                };
528
529                match event {
530                    Some(Ok(SubscribeIcebergCompactionEventResponse {
531                        event,
532                        create_at: _create_at,
533                    })) => {
534                        let event = match event {
535                            Some(event) => event,
536                            None => continue 'consume_stream,
537                        };
538
539                        match event {
540                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
541                                let task_id = iceberg_compaction_task.task_id;
542                                let sink_id = iceberg_compaction_task.sink_id;
543                                // Note: write_parquet_properties is now built from sink config (IcebergConfig) in create_task_execution
544                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
545                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
546                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
547                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
548                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
549                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
550                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
551                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
552                                    .target_binpack_group_size_mb(
553                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
554                                    )
555                                    .min_group_size_mb(
556                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
557                                    )
558                                    .min_group_file_count(
559                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
560                                    )
561                                    .build() {
562                                    Ok(config) => config,
563                                    Err(e) => {
564                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
565                                        let report = build_iceberg_task_report(
566                                            task_id,
567                                            sink_id,
568                                            Some(format!(
569                                                "Failed to build iceberg compactor runner config: {}",
570                                                e.as_report()
571                                            )),
572                                        );
573                                        if matches!(
574                                            send_or_buffer_iceberg_task_report(
575                                                &request_sender,
576                                                &mut pending_task_reports,
577                                                report,
578                                            ),
579                                            ReportSendResult::RestartStream
580                                        ) {
581                                            continue 'start_stream;
582                                        }
583                                        continue 'consume_stream;
584                                    }
585                                };
586
587                                // Create task execution context and plan runners from the task
588                                let task_execution = match create_task_execution(
589                                    iceberg_compaction_task,
590                                    compactor_runner_config,
591                                    compactor_context.compactor_metrics.clone(),
592                                ).await {
593                                    Ok(task_execution) => task_execution,
594                                    Err(e) => {
595                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
596                                        let report = build_iceberg_task_report(
597                                            task_id,
598                                            sink_id,
599                                            Some(format!(
600                                                "Failed to create iceberg compaction task execution: {}",
601                                                e.as_report()
602                                            )),
603                                        );
604                                        if matches!(
605                                            send_or_buffer_iceberg_task_report(
606                                                &request_sender,
607                                                &mut pending_task_reports,
608                                                report,
609                                            ),
610                                            ReportSendResult::RestartStream
611                                        ) {
612                                            continue 'start_stream;
613                                        }
614                                        continue 'consume_stream;
615                                    }
616                                };
617
618                                let sink_id = task_execution.sink_id;
619                                let plan_runners = task_execution.plan_runners;
620
621                                if plan_runners.is_empty() {
622                                    tracing::info!(task_id, sink_id, "No plans to execute");
623                                    let report = build_iceberg_task_report(task_id, sink_id, None);
624                                    if matches!(
625                                        send_or_buffer_iceberg_task_report(
626                                            &request_sender,
627                                            &mut pending_task_reports,
628                                            report,
629                                        ),
630                                        ReportSendResult::RestartStream
631                                    ) {
632                                        continue 'start_stream;
633                                    }
634                                    continue 'consume_stream;
635                                }
636
637                                // Enqueue each plan runner independently
638                                let total_plans = plan_runners.len();
639                                let mut enqueued_count = 0;
640
641                                for runner in plan_runners {
642                                    let meta = runner.to_meta();
643                                    let plan_index = meta.plan_index;
644                                    let required_parallelism = runner.required_parallelism();
645                                    let push_result = task_queue.push(meta, Some(runner));
646
647                                    match push_result {
648                                        PushResult::Added => {
649                                            enqueued_count += 1;
650                                            tracing::debug!(
651                                                task_id = task_id,
652                                                plan_index = plan_index,
653                                                required_parallelism = required_parallelism,
654                                                "Iceberg plan runner added to queue"
655                                            );
656                                        },
657                                        PushResult::RejectedCapacity => {
658                                            tracing::warn!(
659                                                task_id = task_id,
660                                                required_parallelism = required_parallelism,
661                                                pending_budget = pending_parallelism_budget,
662                                                enqueued_count = enqueued_count,
663                                                total_plans = total_plans,
664                                                "Iceberg plan runner rejected - queue capacity exceeded"
665                                            );
666                                            // Stop enqueuing remaining plans
667                                            break;
668                                        },
669                                        PushResult::RejectedTooLarge => {
670                                            tracing::error!(
671                                                task_id = task_id,
672                                                required_parallelism = required_parallelism,
673                                                max_parallelism = max_task_parallelism,
674                                                "Iceberg plan runner rejected - parallelism exceeds max"
675                                            );
676                                        },
677                                        PushResult::RejectedInvalidParallelism => {
678                                            tracing::error!(
679                                                task_id = task_id,
680                                                required_parallelism = required_parallelism,
681                                                "Iceberg plan runner rejected - invalid parallelism"
682                                            );
683                                        },
684                                        PushResult::RejectedDuplicate => {
685                                            tracing::error!(
686                                                task_id = task_id,
687                                                plan_index = plan_index,
688                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
689                                            );
690                                        }
691                                    }
692                                }
693
694                                if enqueued_count == 0 {
695                                    let report = build_iceberg_task_report(
696                                        task_id,
697                                        sink_id,
698                                        Some("Failed to enqueue all iceberg compaction plans".to_owned()),
699                                    );
700                                    if matches!(
701                                        send_or_buffer_iceberg_task_report(
702                                            &request_sender,
703                                            &mut pending_task_reports,
704                                            report,
705                                        ),
706                                        ReportSendResult::RestartStream
707                                    ) {
708                                        continue 'start_stream;
709                                    }
710                                } else {
711                                    task_trackers.insert(
712                                        task_id,
713                                        IcebergTaskTracker::new(sink_id, enqueued_count),
714                                    );
715                                }
716
717                                tracing::info!(
718                                    task_id = task_id,
719                                    sink_id = sink_id,
720                                    total_plans = total_plans,
721                                    enqueued_count = enqueued_count,
722                                    "Enqueued {} of {} Iceberg plan runners",
723                                    enqueued_count,
724                                    total_plans
725                                );
726                            },
727                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
728                                // set flag
729                                pull_task_ack = true;
730                            },
731                        }
732                    }
733                    Some(Err(e)) => {
734                        tracing::warn!("Failed to consume stream. {}", e.message());
735                        continue 'start_stream;
736                    }
737                    _ => {
738                        // The stream is exhausted
739                        continue 'start_stream;
740                    }
741                }
742            }
743        }
744    });
745
746    (join_handle, shutdown_tx)
747}
748
749/// The background compaction thread that receives compaction tasks from hummock compaction
750/// manager and runs compaction tasks.
751#[must_use]
752pub fn start_compactor(
753    compactor_context: CompactorContext,
754    hummock_meta_client: Arc<dyn HummockMetaClient>,
755    object_id_manager: Arc<ObjectIdManager>,
756    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
757) -> (JoinHandle<()>, Sender<()>) {
758    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
759    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
760    let stream_retry_interval = Duration::from_secs(30);
761    let task_progress = compactor_context.task_progress_manager.clone();
762    let periodic_event_update_interval = Duration::from_millis(1000);
763
764    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
765        * compactor_context.storage_opts.compactor_max_task_multiplier)
766        .ceil() as u32;
767    let running_task_parallelism = Arc::new(AtomicU32::new(0));
768
769    const MAX_PULL_TASK_COUNT: u32 = 4;
770    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
771
772    assert_ge!(
773        compactor_context.storage_opts.compactor_max_task_multiplier,
774        0.0
775    );
776
777    let join_handle = tokio::spawn(async move {
778        let shutdown_map = CompactionShutdownMap::default();
779        let mut min_interval = tokio::time::interval(stream_retry_interval);
780        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
781
782        // Track last logged state to avoid duplicate logs
783        let mut log_throttler =
784            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
785
786        // This outer loop is to recreate stream.
787        'start_stream: loop {
788            // reset state
789            // pull_task_ack.store(true, Ordering::SeqCst);
790            let mut pull_task_ack = true;
791            tokio::select! {
792                // Wait for interval.
793                _ = min_interval.tick() => {},
794                // Shutdown compactor.
795                _ = &mut shutdown_rx => {
796                    tracing::info!("Compactor is shutting down");
797                    return;
798                }
799            }
800
801            let (request_sender, response_event_stream) =
802                match hummock_meta_client.subscribe_compaction_event().await {
803                    Ok((request_sender, response_event_stream)) => {
804                        tracing::debug!("Succeeded subscribe_compaction_event.");
805                        (request_sender, response_event_stream)
806                    }
807
808                    Err(e) => {
809                        tracing::warn!(
810                            error = %e.as_report(),
811                            "Subscribing to compaction tasks failed with error. Will retry.",
812                        );
813                        continue 'start_stream;
814                    }
815                };
816
817            pin_mut!(response_event_stream);
818
819            let executor = compactor_context.compaction_executor.clone();
820            let object_id_manager = object_id_manager.clone();
821
822            // This inner loop is to consume stream or report task progress.
823            let mut event_loop_iteration_now = Instant::now();
824            'consume_stream: loop {
825                {
826                    // report
827                    compactor_context
828                        .compactor_metrics
829                        .compaction_event_loop_iteration_latency
830                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
831                    event_loop_iteration_now = Instant::now();
832                }
833
834                let running_task_parallelism = running_task_parallelism.clone();
835                let request_sender = request_sender.clone();
836                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
837                    _ = periodic_event_interval.tick() => {
838                        let progress_list = get_task_progress(task_progress.clone());
839
840                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
841                            event: Some(RequestEvent::HeartBeat(
842                                HeartBeat {
843                                    progress: progress_list
844                                }
845                            )),
846                            create_at: SystemTime::now()
847                                .duration_since(std::time::UNIX_EPOCH)
848                                .expect("Clock may have gone backwards")
849                                .as_millis() as u64,
850                        }) {
851                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
852                            // re subscribe stream
853                            continue 'start_stream;
854                        }
855
856
857                        let mut pending_pull_task_count = 0;
858                        if pull_task_ack {
859                            // TODO: Compute parallelism on meta side
860                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
861
862                            if pending_pull_task_count > 0 {
863                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
864                                    event: Some(RequestEvent::PullTask(
865                                        PullTask {
866                                            pull_task_count: pending_pull_task_count,
867                                        }
868                                    )),
869                                    create_at: SystemTime::now()
870                                        .duration_since(std::time::UNIX_EPOCH)
871                                        .expect("Clock may have gone backwards")
872                                        .as_millis() as u64,
873                                }) {
874                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
875
876                                    // re subscribe stream
877                                    continue 'start_stream;
878                                } else {
879                                    pull_task_ack = false;
880                                }
881                            }
882                        }
883
884                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
885                        let current_state = CompactionLogState {
886                            running_parallelism: running_count,
887                            pull_task_ack,
888                            pending_pull_task_count,
889                        };
890
891                        // Log only when state changes or periodically as heartbeat
892                        if log_throttler.should_log(&current_state) {
893                            tracing::info!(
894                                running_parallelism_count = %current_state.running_parallelism,
895                                pull_task_ack = %current_state.pull_task_ack,
896                                pending_pull_task_count = %current_state.pending_pull_task_count
897                            );
898                            log_throttler.update(current_state);
899                        }
900
901                        continue;
902                    }
903                    event = response_event_stream.next() => {
904                        event
905                    }
906
907                    _ = &mut shutdown_rx => {
908                        tracing::info!("Compactor is shutting down");
909                        return
910                    }
911                };
912
913                fn send_report_task_event(
914                    compact_task: &CompactTask,
915                    table_stats: TableStatsMap,
916                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
917                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
918                ) {
919                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
920                        event: Some(RequestEvent::ReportTask(ReportTask {
921                            task_id: compact_task.task_id,
922                            task_status: compact_task.task_status.into(),
923                            sorted_output_ssts: compact_task
924                                .sorted_output_ssts
925                                .iter()
926                                .map(|sst| sst.into())
927                                .collect(),
928                            table_stats_change: to_prost_table_stats_map(table_stats),
929                            object_timestamps,
930                        })),
931                        create_at: SystemTime::now()
932                            .duration_since(std::time::UNIX_EPOCH)
933                            .expect("Clock may have gone backwards")
934                            .as_millis() as u64,
935                    }) {
936                        let task_id = compact_task.task_id;
937                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
938                    }
939                }
940
941                match event {
942                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
943                        let event = match event {
944                            Some(event) => event,
945                            None => continue 'consume_stream,
946                        };
947                        let shutdown = shutdown_map.clone();
948                        let context = compactor_context.clone();
949                        let consumed_latency_ms = SystemTime::now()
950                            .duration_since(std::time::UNIX_EPOCH)
951                            .expect("Clock may have gone backwards")
952                            .as_millis() as u64
953                            - create_at;
954                        context
955                            .compactor_metrics
956                            .compaction_event_consumed_latency
957                            .observe(consumed_latency_ms as _);
958
959                        let object_id_manager = object_id_manager.clone();
960                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
961
962                        match event {
963                            ResponseEvent::CompactTask(compact_task) => {
964                                let compact_task = CompactTask::from(compact_task);
965                                let parallelism =
966                                    calculate_task_parallelism(&compact_task, &context);
967
968                                assert_ne!(parallelism, 0, "splits cannot be empty");
969
970                                if (max_task_parallelism
971                                    - running_task_parallelism.load(Ordering::SeqCst))
972                                    < parallelism as u32
973                                {
974                                    tracing::warn!(
975                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
976                                        compact_task.task_id,
977                                        parallelism,
978                                        max_task_parallelism,
979                                        running_task_parallelism.load(Ordering::Relaxed),
980                                    );
981                                    let (compact_task, table_stats, object_timestamps) =
982                                        compact_done(
983                                            compact_task,
984                                            context.clone(),
985                                            vec![],
986                                            TaskStatus::NoAvailCpuResourceCanceled,
987                                        );
988
989                                    send_report_task_event(
990                                        &compact_task,
991                                        table_stats,
992                                        object_timestamps,
993                                        &request_sender,
994                                    );
995
996                                    continue 'consume_stream;
997                                }
998
999                                running_task_parallelism
1000                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
1001                                executor.spawn(async move {
1002                                    let (tx, rx) = tokio::sync::oneshot::channel();
1003                                    let task_id = compact_task.task_id;
1004                                    shutdown.lock().unwrap().insert(task_id, tx);
1005
1006                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1007                                        context.clone(),
1008                                        compact_task,
1009                                        rx,
1010                                        object_id_manager.clone(),
1011                                        compaction_catalog_manager_ref.clone(),
1012                                    )
1013                                    .await;
1014
1015                                    shutdown.lock().unwrap().remove(&task_id);
1016                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1017
1018                                    send_report_task_event(
1019                                        &compact_task,
1020                                        table_stats,
1021                                        object_timestamps,
1022                                        &request_sender,
1023                                    );
1024
1025                                    let enable_check_compaction_result =
1026                                    context.storage_opts.check_compaction_result;
1027                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1028
1029                                    if enable_check_compaction_result && need_check_task {
1030                                        let read_table_ids = compact_task
1031                                            .get_table_ids_from_input_ssts()
1032                                            .collect::<Vec<_>>();
1033                                        match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1034                                            Ok(compaction_catalog_agent_ref) =>  {
1035                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1036                                                {
1037                                                    Err(e) => {
1038                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1039                                                    }
1040                                                    Ok(true) => (),
1041                                                    Ok(false) => {
1042                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1043                                                    }
1044                                                }
1045                                            },
1046                                            Err(e) => {
1047                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1048                                            }
1049                                        }
1050                                    }
1051                                });
1052                            }
1053                            #[expect(deprecated)]
1054                            ResponseEvent::VacuumTask(_) => {
1055                                unreachable!("unexpected vacuum task");
1056                            }
1057                            #[expect(deprecated)]
1058                            ResponseEvent::FullScanTask(_) => {
1059                                unreachable!("unexpected scan task");
1060                            }
1061                            #[expect(deprecated)]
1062                            ResponseEvent::ValidationTask(validation_task) => {
1063                                let validation_task = ValidationTask::from(validation_task);
1064                                executor.spawn(async move {
1065                                    validate_ssts(validation_task, context.sstable_store.clone())
1066                                        .await;
1067                                });
1068                            }
1069                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1070                                .lock()
1071                                .unwrap()
1072                                .remove(&cancel_compact_task.task_id)
1073                            {
1074                                Some(tx) => {
1075                                    if tx.send(()).is_err() {
1076                                        tracing::warn!(
1077                                            "Cancellation of compaction task failed. task_id: {}",
1078                                            cancel_compact_task.task_id
1079                                        );
1080                                    }
1081                                }
1082                                _ => {
1083                                    tracing::warn!(
1084                                        "Attempting to cancel non-existent compaction task. task_id: {}",
1085                                        cancel_compact_task.task_id
1086                                    );
1087                                }
1088                            },
1089
1090                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
1091                                // set flag
1092                                pull_task_ack = true;
1093                            }
1094                        }
1095                    }
1096                    Some(Err(e)) => {
1097                        tracing::warn!("Failed to consume stream. {}", e.message());
1098                        continue 'start_stream;
1099                    }
1100                    _ => {
1101                        // The stream is exhausted
1102                        continue 'start_stream;
1103                    }
1104                }
1105            }
1106        }
1107    });
1108
1109    (join_handle, shutdown_tx)
1110}
1111
1112/// The background compaction thread that receives compaction tasks from hummock compaction
1113/// manager and runs compaction tasks.
1114#[must_use]
1115pub fn start_shared_compactor(
1116    grpc_proxy_client: GrpcCompactorProxyClient,
1117    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1118    context: CompactorContext,
1119) -> (JoinHandle<()>, Sender<()>) {
1120    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1121    let task_progress = context.task_progress_manager.clone();
1122    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1123    let periodic_event_update_interval = Duration::from_millis(1000);
1124
1125    let join_handle = tokio::spawn(async move {
1126        let shutdown_map = CompactionShutdownMap::default();
1127
1128        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1129        let executor = context.compaction_executor.clone();
1130        let report_heartbeat_client = grpc_proxy_client.clone();
1131        'consume_stream: loop {
1132            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1133                _ = periodic_event_interval.tick() => {
1134                    let progress_list = get_task_progress(task_progress.clone());
1135                    let report_compaction_task_request = ReportCompactionTaskRequest{
1136                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1137                            SharedHeartBeat {
1138                                progress: progress_list
1139                            }
1140                        )),
1141                     };
1142                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1143                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1144                    }
1145                    continue
1146                }
1147
1148
1149                _ = &mut shutdown_rx => {
1150                    tracing::info!("Compactor is shutting down");
1151                    return
1152                }
1153
1154                request = receiver.recv() => {
1155                    request
1156                }
1157
1158            };
1159            match request {
1160                Some(request) => {
1161                    let context = context.clone();
1162                    let shutdown = shutdown_map.clone();
1163
1164                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1165                    executor.spawn(async move {
1166                        let DispatchCompactionTaskRequest {
1167                            tables,
1168                            output_object_ids,
1169                            task: dispatch_task,
1170                        } = request.into_inner();
1171                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1172                            acc.insert(table.id, table);
1173                            acc
1174                        });
1175
1176                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1177                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1178                        let shared_compactor_object_id_manager =
1179                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1180                            match dispatch_task.unwrap() {
1181                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1182                                    let compact_task = CompactTask::from(&compact_task);
1183                                    let (tx, rx) = tokio::sync::oneshot::channel();
1184                                    let task_id = compact_task.task_id;
1185                                    shutdown.lock().unwrap().insert(task_id, tx);
1186
1187                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1188                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1189                                        context.clone(),
1190                                        compact_task,
1191                                        rx,
1192                                        shared_compactor_object_id_manager,
1193                                        compaction_catalog_agent_ref.clone(),
1194                                    )
1195                                    .await;
1196                                    shutdown.lock().unwrap().remove(&task_id);
1197                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1198                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1199                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1200                                            table_stats_change: to_prost_table_stats_map(table_stats),
1201                                            object_timestamps,
1202                                    })),
1203                                    };
1204
1205                                    match cloned_grpc_proxy_client
1206                                        .report_compaction_task(report_compaction_task_request)
1207                                        .await
1208                                    {
1209                                        Ok(_) => {
1210                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1211                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1212                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1213                                            if enable_check_compaction_result && need_check_task {
1214                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1215                                                    Err(e) => {
1216                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1217                                                    },
1218                                                    Ok(true) => (),
1219                                                    Ok(false) => {
1220                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1221                                                    }
1222                                                }
1223                                            }
1224                                        }
1225                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1226                                    }
1227
1228                                }
1229                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1230                                    unreachable!("unexpected vacuum task");
1231                                }
1232                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1233                                    unreachable!("unexpected scan task");
1234                                }
1235                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1236                                    let validation_task = ValidationTask::from(validation_task);
1237                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1238                                }
1239                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1240                                    match shutdown
1241                                        .lock()
1242                                        .unwrap()
1243                                        .remove(&cancel_compact_task.task_id)
1244                                    { Some(tx) => {
1245                                        if tx.send(()).is_err() {
1246                                            tracing::warn!(
1247                                                "Cancellation of compaction task failed. task_id: {}",
1248                                                cancel_compact_task.task_id
1249                                            );
1250                                        }
1251                                    } _ => {
1252                                        tracing::warn!(
1253                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1254                                            cancel_compact_task.task_id
1255                                        );
1256                                    }}
1257                                }
1258                            }
1259                    });
1260                }
1261                None => continue 'consume_stream,
1262            }
1263        }
1264    });
1265    (join_handle, shutdown_tx)
1266}
1267
1268fn get_task_progress(
1269    task_progress: Arc<
1270        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1271    >,
1272) -> Vec<CompactTaskProgress> {
1273    let mut progress_list = Vec::new();
1274    for (&task_id, progress) in &*task_progress.lock() {
1275        progress_list.push(progress.snapshot(task_id));
1276    }
1277    progress_list
1278}
1279
1280/// Schedule queued tasks if we have capacity
1281fn schedule_queued_tasks(
1282    task_queue: &mut IcebergTaskQueue,
1283    compactor_context: &CompactorContext,
1284    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1285    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1286) {
1287    while let Some(popped_task) = task_queue.pop() {
1288        let task_id = popped_task.meta.task_id;
1289        let plan_index = popped_task.meta.plan_index;
1290        let task_key = (task_id, plan_index);
1291
1292        // Get unique_ident before moving runner
1293        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1294
1295        let Some(runner) = popped_task.runner else {
1296            tracing::error!(
1297                task_id = task_id,
1298                plan_index = plan_index,
1299                "Popped task missing runner - this should not happen"
1300            );
1301            task_queue.finish_running(task_key);
1302            continue;
1303        };
1304
1305        let executor = compactor_context.compaction_executor.clone();
1306        let shutdown_map_clone = shutdown_map.clone();
1307        let completion_tx_clone = task_completion_tx.clone();
1308
1309        tracing::info!(
1310            task_id = task_id,
1311            plan_index = plan_index,
1312            unique_ident = ?unique_ident,
1313            required_parallelism = popped_task.meta.required_parallelism,
1314            "Starting iceberg compaction task from queue"
1315        );
1316
1317        executor.spawn(async move {
1318            let (tx, rx) = tokio::sync::oneshot::channel();
1319            {
1320                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1321                shutdown_guard.insert(task_key, tx);
1322            }
1323            let _cleanup_guard = scopeguard::guard(shutdown_map_clone.clone(), move |shutdown_map| {
1324                let mut shutdown_guard = shutdown_map.lock().unwrap();
1325                shutdown_guard.remove(&task_key);
1326            });
1327
1328            let result = Box::pin(runner.compact(rx)).await;
1329
1330            let completion = match result {
1331                Ok(_) => IcebergPlanCompletion {
1332                    task_key,
1333                    error_message: None,
1334                },
1335                Err(e) => {
1336                    tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1337                    IcebergPlanCompletion {
1338                        task_key,
1339                        error_message: Some(e.to_report_string()),
1340                    }
1341                }
1342            };
1343
1344            if completion_tx_clone.send(completion).is_err() {
1345                tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1346            }
1347        });
1348    }
1349}
1350
1351/// Handle pulling new tasks from meta service
1352/// Returns true if the stream should be restarted
1353fn handle_meta_task_pulling(
1354    pull_task_ack: &mut bool,
1355    task_queue: &IcebergTaskQueue,
1356    max_task_parallelism: u32,
1357    max_pull_task_count: u32,
1358    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1359    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1360) -> bool {
1361    let mut pending_pull_task_count = 0;
1362    if *pull_task_ack {
1363        // Use queue's running parallelism for pull decision
1364        let current_running_parallelism = task_queue.running_parallelism_sum();
1365        pending_pull_task_count =
1366            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1367
1368        if pending_pull_task_count > 0 {
1369            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1370                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1371                    subscribe_iceberg_compaction_event_request::PullTask {
1372                        pull_task_count: pending_pull_task_count,
1373                    },
1374                )),
1375                create_at: SystemTime::now()
1376                    .duration_since(std::time::UNIX_EPOCH)
1377                    .expect("Clock may have gone backwards")
1378                    .as_millis() as u64,
1379            }) {
1380                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1381                return true; // Signal to restart stream
1382            } else {
1383                *pull_task_ack = false;
1384            }
1385        }
1386    }
1387
1388    let running_count = task_queue.running_parallelism_sum();
1389    let waiting_count = task_queue.waiting_parallelism_sum();
1390    let available_count = max_task_parallelism.saturating_sub(running_count);
1391    let current_state = IcebergCompactionLogState {
1392        running_parallelism: running_count,
1393        waiting_parallelism: waiting_count,
1394        available_parallelism: available_count,
1395        pull_task_ack: *pull_task_ack,
1396        pending_pull_task_count,
1397    };
1398
1399    // Log only when state changes or periodically as heartbeat
1400    if log_throttler.should_log(&current_state) {
1401        tracing::info!(
1402            running_parallelism_count = %current_state.running_parallelism,
1403            waiting_parallelism_count = %current_state.waiting_parallelism,
1404            available_parallelism = %current_state.available_parallelism,
1405            pull_task_ack = %current_state.pull_task_ack,
1406            pending_pull_task_count = %current_state.pending_pull_task_count
1407        );
1408        log_throttler.update(current_state);
1409    }
1410
1411    false // No need to restart stream
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use super::*;
1417
1418    #[test]
1419    fn test_log_state_equality() {
1420        // Test CompactionLogState
1421        let state1 = CompactionLogState {
1422            running_parallelism: 10,
1423            pull_task_ack: true,
1424            pending_pull_task_count: 2,
1425        };
1426        let state2 = CompactionLogState {
1427            running_parallelism: 10,
1428            pull_task_ack: true,
1429            pending_pull_task_count: 2,
1430        };
1431        let state3 = CompactionLogState {
1432            running_parallelism: 11,
1433            pull_task_ack: true,
1434            pending_pull_task_count: 2,
1435        };
1436        assert_eq!(state1, state2);
1437        assert_ne!(state1, state3);
1438
1439        // Test IcebergCompactionLogState
1440        let ice_state1 = IcebergCompactionLogState {
1441            running_parallelism: 10,
1442            waiting_parallelism: 5,
1443            available_parallelism: 15,
1444            pull_task_ack: true,
1445            pending_pull_task_count: 2,
1446        };
1447        let ice_state2 = IcebergCompactionLogState {
1448            running_parallelism: 10,
1449            waiting_parallelism: 6,
1450            available_parallelism: 15,
1451            pull_task_ack: true,
1452            pending_pull_task_count: 2,
1453        };
1454        assert_ne!(ice_state1, ice_state2);
1455    }
1456
1457    #[test]
1458    fn test_log_throttler_state_change_detection() {
1459        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1460        let state1 = CompactionLogState {
1461            running_parallelism: 10,
1462            pull_task_ack: true,
1463            pending_pull_task_count: 2,
1464        };
1465        let state2 = CompactionLogState {
1466            running_parallelism: 11,
1467            pull_task_ack: true,
1468            pending_pull_task_count: 2,
1469        };
1470
1471        // First call should always log
1472        assert!(throttler.should_log(&state1));
1473        throttler.update(state1.clone());
1474
1475        // Same state should not log
1476        assert!(!throttler.should_log(&state1));
1477
1478        // Changed state should log
1479        assert!(throttler.should_log(&state2));
1480        throttler.update(state2.clone());
1481
1482        // Same state again should not log
1483        assert!(!throttler.should_log(&state2));
1484    }
1485
1486    #[test]
1487    fn test_log_throttler_heartbeat() {
1488        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1489        let state = CompactionLogState {
1490            running_parallelism: 10,
1491            pull_task_ack: true,
1492            pending_pull_task_count: 2,
1493        };
1494
1495        // First call should log
1496        assert!(throttler.should_log(&state));
1497        throttler.update(state.clone());
1498
1499        // Same state immediately should not log
1500        assert!(!throttler.should_log(&state));
1501
1502        // Wait for heartbeat interval to pass
1503        std::thread::sleep(Duration::from_millis(15));
1504
1505        // Same state after interval should log (heartbeat)
1506        assert!(throttler.should_log(&state));
1507    }
1508}