risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24    ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28    subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53    TtlCompactionFilter,
54};
55pub use context::{
56    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59// Import iceberg compactor runner types from the local `iceberg_compaction` module.
60use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
61use iceberg_compaction::{
62    IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
63    ReportSendResult, build_iceberg_task_report, create_task_execution,
64    flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
65};
66pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
67use more_asserts::assert_ge;
68use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
69use risingwave_hummock_sdk::{
70    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
71};
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::subscribe_compaction_event_request::{
74    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
75};
76use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
77use risingwave_pb::hummock::{
78    CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
79    SubscribeCompactionEventResponse,
80};
81use risingwave_rpc_client::HummockMetaClient;
82pub use shared_buffer_compact::{compact, merge_imms_in_memory};
83use tokio::sync::oneshot::Sender;
84use tokio::task::JoinHandle;
85use tokio::time::Instant;
86
87pub use self::compaction_utils::{
88    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
89    check_flush_result,
90};
91pub use self::task_progress::TaskProgress;
92use super::multi_builder::CapacitySplitTableBuilder;
93use super::{
94    GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104    BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
105    UnifiedSstableWriterFactory, validate_ssts,
106};
107use crate::monitor::CompactorMetrics;
108
109/// Heartbeat logging interval for compaction tasks
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
111
112/// Represents the compaction task state for logging purposes
113#[derive(Debug, Clone, PartialEq, Eq)]
114struct CompactionLogState {
115    running_parallelism: u32,
116    pull_task_ack: bool,
117    pending_pull_task_count: u32,
118}
119
120/// Represents the iceberg compaction task state for logging purposes
121#[derive(Debug, Clone, PartialEq, Eq)]
122struct IcebergCompactionLogState {
123    running_parallelism: u32,
124    waiting_parallelism: u32,
125    available_parallelism: u32,
126    pull_task_ack: bool,
127    pending_pull_task_count: u32,
128}
129
130/// Controls periodic logging with state change detection
131struct LogThrottler<T: PartialEq> {
132    last_logged_state: Option<T>,
133    last_heartbeat: Instant,
134    heartbeat_interval: Duration,
135}
136
137impl<T: PartialEq> LogThrottler<T> {
138    fn new(heartbeat_interval: Duration) -> Self {
139        Self {
140            last_logged_state: None,
141            last_heartbeat: Instant::now(),
142            heartbeat_interval,
143        }
144    }
145
146    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
147    fn should_log(&self, current_state: &T) -> bool {
148        self.last_logged_state.as_ref() != Some(current_state)
149            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
150    }
151
152    /// Updates the state and heartbeat timestamp after logging
153    fn update(&mut self, current_state: T) {
154        self.last_logged_state = Some(current_state);
155        self.last_heartbeat = Instant::now();
156    }
157}
158
159/// Implementation of Hummock compaction.
160pub struct Compactor {
161    /// The context of the compactor.
162    context: CompactorContext,
163    object_id_getter: Arc<dyn GetObjectId>,
164    task_config: TaskConfig,
165    options: SstableBuilderOptions,
166    get_id_time: Arc<AtomicU64>,
167}
168
169pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
170
171impl Compactor {
172    /// Create a new compactor.
173    pub fn new(
174        context: CompactorContext,
175        options: SstableBuilderOptions,
176        task_config: TaskConfig,
177        object_id_getter: Arc<dyn GetObjectId>,
178    ) -> Self {
179        Self {
180            context,
181            options,
182            task_config,
183            get_id_time: Arc::new(AtomicU64::new(0)),
184            object_id_getter,
185        }
186    }
187
188    /// Compact the given key range and merge iterator.
189    /// Upon a successful return, the built SSTs are already uploaded to object store.
190    ///
191    /// `task_progress` is only used for tasks on the compactor.
192    async fn compact_key_range(
193        &self,
194        iter: impl HummockIterator<Direction = Forward>,
195        compaction_filter: impl CompactionFilter,
196        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
197        task_progress: Option<Arc<TaskProgress>>,
198        task_id: Option<HummockCompactionTaskId>,
199        split_index: Option<usize>,
200    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
201        // Monitor time cost building shared buffer to SSTs.
202        let compact_timer = if self.context.is_share_buffer_compact {
203            self.context
204                .compactor_metrics
205                .write_build_l0_sst_duration
206                .start_timer()
207        } else {
208            self.context
209                .compactor_metrics
210                .compact_sst_duration
211                .start_timer()
212        };
213
214        let (split_table_outputs, table_stats_map) = {
215            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
216            if self.task_config.use_block_based_filter {
217                self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
218                    factory,
219                    iter,
220                    compaction_filter,
221                    compaction_catalog_agent_ref,
222                    task_progress.clone(),
223                    self.object_id_getter.clone(),
224                )
225                .instrument_await("compact".verbose())
226                .await?
227            } else {
228                self.compact_key_range_impl::<_, Xor16FilterBuilder>(
229                    factory,
230                    iter,
231                    compaction_filter,
232                    compaction_catalog_agent_ref,
233                    task_progress.clone(),
234                    self.object_id_getter.clone(),
235                )
236                .instrument_await("compact".verbose())
237                .await?
238            }
239        };
240
241        compact_timer.observe_duration();
242
243        Self::report_progress(
244            self.context.compactor_metrics.clone(),
245            task_progress,
246            &split_table_outputs,
247            self.context.is_share_buffer_compact,
248        );
249
250        self.context
251            .compactor_metrics
252            .get_table_id_total_time_duration
253            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
254
255        debug_assert!(
256            split_table_outputs
257                .iter()
258                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
259        );
260
261        if task_id.is_some() {
262            // skip shared buffer compaction
263            tracing::info!(
264                "Finish Task {:?} split_index {:?} sst count {}",
265                task_id,
266                split_index,
267                split_table_outputs.len()
268            );
269        }
270        Ok((split_table_outputs, table_stats_map))
271    }
272
273    pub fn report_progress(
274        metrics: Arc<CompactorMetrics>,
275        task_progress: Option<Arc<TaskProgress>>,
276        ssts: &Vec<LocalSstableInfo>,
277        is_share_buffer_compact: bool,
278    ) {
279        for sst_info in ssts {
280            let sst_size = sst_info.file_size();
281            if let Some(tracker) = &task_progress {
282                tracker.inc_ssts_uploaded();
283                tracker.dec_num_pending_write_io();
284            }
285            if is_share_buffer_compact {
286                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
287            } else {
288                metrics.compaction_upload_sst_counts.inc();
289            }
290        }
291    }
292
293    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
294        &self,
295        writer_factory: F,
296        iter: impl HummockIterator<Direction = Forward>,
297        compaction_filter: impl CompactionFilter,
298        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
299        task_progress: Option<Arc<TaskProgress>>,
300        object_id_getter: Arc<dyn GetObjectId>,
301    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
302        let builder_factory = RemoteBuilderFactory::<F, B> {
303            object_id_getter,
304            limiter: self.context.memory_limiter.clone(),
305            options: self.options.clone(),
306            policy: self.task_config.cache_policy,
307            remote_rpc_cost: self.get_id_time.clone(),
308            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
309            sstable_writer_factory: writer_factory,
310            _phantom: PhantomData,
311        };
312
313        let mut sst_builder = CapacitySplitTableBuilder::new(
314            builder_factory,
315            self.context.compactor_metrics.clone(),
316            task_progress.clone(),
317            self.task_config.table_vnode_partition.clone(),
318            self.context
319                .storage_opts
320                .compactor_concurrent_uploading_sst_count,
321            compaction_catalog_agent_ref,
322        );
323        let compaction_statistics = compact_and_build_sst(
324            &mut sst_builder,
325            &self.task_config,
326            self.context.compactor_metrics.clone(),
327            iter,
328            compaction_filter,
329        )
330        .instrument_await("compact_and_build_sst".verbose())
331        .await?;
332
333        let ssts = sst_builder
334            .finish()
335            .instrument_await("builder_finish".verbose())
336            .await?;
337
338        Ok((ssts, compaction_statistics))
339    }
340}
341
342/// The background compaction thread that receives compaction tasks from hummock compaction
343/// manager and runs compaction tasks.
344#[must_use]
345pub fn start_iceberg_compactor(
346    compactor_context: CompactorContext,
347    hummock_meta_client: Arc<dyn HummockMetaClient>,
348) -> (JoinHandle<()>, Sender<()>) {
349    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
350    let stream_retry_interval = Duration::from_secs(30);
351    let periodic_event_update_interval = Duration::from_millis(
352        compactor_context
353            .storage_opts
354            .iceberg_compaction_pull_interval_ms,
355    );
356    let worker_num = compactor_context.compaction_executor.worker_num();
357
358    let max_task_parallelism: u32 = (worker_num as f32
359        * compactor_context.storage_opts.compactor_max_task_multiplier)
360        .ceil() as u32;
361
362    const MAX_PULL_TASK_COUNT: u32 = 4;
363    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
364
365    assert_ge!(
366        compactor_context.storage_opts.compactor_max_task_multiplier,
367        0.0
368    );
369
370    let join_handle = tokio::spawn(async move {
371        // Initialize task queue with event-driven scheduling
372        let pending_parallelism_budget = (max_task_parallelism as f32
373            * compactor_context
374                .storage_opts
375                .iceberg_compaction_pending_parallelism_budget_multiplier)
376            .ceil() as u32;
377        let mut task_queue =
378            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
379
380        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
381        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
382
383        // Channel for task completion notifications
384        let (task_completion_tx, mut task_completion_rx) =
385            tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
386        let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
387        // Buffers task reports that failed to send on the current stream.
388        // The queue is flushed in FIFO order after the stream reconnects.
389        let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
390
391        let mut min_interval = tokio::time::interval(stream_retry_interval);
392        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
393
394        // Track last logged state to avoid duplicate logs
395        let mut log_throttler =
396            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
397
398        // This outer loop is to recreate stream.
399        'start_stream: loop {
400            // reset state
401            // pull_task_ack.store(true, Ordering::SeqCst);
402            let mut pull_task_ack = true;
403            tokio::select! {
404                // Wait for interval.
405                _ = min_interval.tick() => {},
406                // Shutdown compactor.
407                _ = &mut shutdown_rx => {
408                    tracing::info!("Compactor is shutting down");
409                    return;
410                }
411            }
412
413            let (request_sender, response_event_stream) = match hummock_meta_client
414                .subscribe_iceberg_compaction_event()
415                .await
416            {
417                Ok((request_sender, response_event_stream)) => {
418                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
419                    (request_sender, response_event_stream)
420                }
421
422                Err(e) => {
423                    tracing::warn!(
424                        error = %e.as_report(),
425                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
426                    );
427                    continue 'start_stream;
428                }
429            };
430
431            if matches!(
432                flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
433                ReportSendResult::RestartStream
434            ) {
435                continue 'start_stream;
436            }
437
438            pin_mut!(response_event_stream);
439
440            let _executor = compactor_context.compaction_executor.clone();
441
442            // This inner loop is to consume stream or report task progress.
443            let mut event_loop_iteration_now = Instant::now();
444            'consume_stream: loop {
445                {
446                    // report
447                    compactor_context
448                        .compactor_metrics
449                        .compaction_event_loop_iteration_latency
450                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
451                    event_loop_iteration_now = Instant::now();
452                }
453
454                let request_sender = request_sender.clone();
455                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
456                    // Handle task completion notifications
457                    Some(plan_completion) = task_completion_rx.recv() => {
458                        let task_key = plan_completion.task_key;
459                        let error_message = plan_completion.error_message;
460                        tracing::debug!(
461                            task_id = task_key.0,
462                            plan_index = task_key.1,
463                            success = error_message.is_none(),
464                            "Plan completed, updating queue state"
465                        );
466                        task_queue.finish_running(task_key);
467
468                        let completed_task_id = task_key.0;
469                        let Entry::Occupied(mut tracker_entry) =
470                            task_trackers.entry(completed_task_id)
471                        else {
472                            continue 'consume_stream;
473                        };
474                        tracker_entry.get_mut().record_completion(error_message);
475                        if !tracker_entry.get().is_finished() {
476                            continue 'consume_stream;
477                        }
478
479                        let report = tracker_entry.remove().into_report(completed_task_id);
480                        if matches!(
481                            send_or_buffer_iceberg_task_report(
482                                &request_sender,
483                                &mut pending_task_reports,
484                                report,
485                            ),
486                            ReportSendResult::RestartStream
487                        ) {
488                            continue 'start_stream;
489                        }
490                        continue 'consume_stream;
491                    }
492
493                    // Event-driven task scheduling - wait for tasks to become schedulable
494                    _ = task_queue.wait_schedulable() => {
495                        schedule_queued_tasks(
496                            &mut task_queue,
497                            &compactor_context,
498                            &shutdown_map,
499                            &task_completion_tx,
500                        );
501                        continue 'consume_stream;
502                    }
503
504                    _ = periodic_event_interval.tick() => {
505                        // Only handle meta task pulling in periodic tick
506                        let should_restart_stream = handle_meta_task_pulling(
507                            &mut pull_task_ack,
508                            &task_queue,
509                            max_task_parallelism,
510                            max_pull_task_count,
511                            &request_sender,
512                            &mut log_throttler,
513                        );
514
515                        if should_restart_stream {
516                            continue 'start_stream;
517                        }
518                        continue;
519                    }
520                    event = response_event_stream.next() => {
521                        event
522                    }
523
524                    _ = &mut shutdown_rx => {
525                        tracing::info!("Iceberg Compactor is shutting down");
526                        return
527                    }
528                };
529
530                match event {
531                    Some(Ok(SubscribeIcebergCompactionEventResponse {
532                        event,
533                        create_at: _create_at,
534                    })) => {
535                        let event = match event {
536                            Some(event) => event,
537                            None => continue 'consume_stream,
538                        };
539
540                        match event {
541                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
542                                let task_id = iceberg_compaction_task.task_id;
543                                let sink_id = iceberg_compaction_task.sink_id;
544                                // Note: write_parquet_properties is now built from sink config (IcebergConfig) in create_task_execution
545                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
546                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
547                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
548                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
549                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
550                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
551                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
552                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
553                                    .target_binpack_group_size_mb(
554                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
555                                    )
556                                    .min_group_size_mb(
557                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
558                                    )
559                                    .min_group_file_count(
560                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
561                                    )
562                                    .build() {
563                                    Ok(config) => config,
564                                    Err(e) => {
565                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
566                                        let report = build_iceberg_task_report(
567                                            task_id,
568                                            sink_id,
569                                            Some(format!(
570                                                "Failed to build iceberg compactor runner config: {}",
571                                                e.as_report()
572                                            )),
573                                        );
574                                        if matches!(
575                                            send_or_buffer_iceberg_task_report(
576                                                &request_sender,
577                                                &mut pending_task_reports,
578                                                report,
579                                            ),
580                                            ReportSendResult::RestartStream
581                                        ) {
582                                            continue 'start_stream;
583                                        }
584                                        continue 'consume_stream;
585                                    }
586                                };
587
588                                // Create task execution context and plan runners from the task
589                                let task_execution = match create_task_execution(
590                                    iceberg_compaction_task,
591                                    compactor_runner_config,
592                                    compactor_context.compactor_metrics.clone(),
593                                ).await {
594                                    Ok(task_execution) => task_execution,
595                                    Err(e) => {
596                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
597                                        let report = build_iceberg_task_report(
598                                            task_id,
599                                            sink_id,
600                                            Some(format!(
601                                                "Failed to create iceberg compaction task execution: {}",
602                                                e.as_report()
603                                            )),
604                                        );
605                                        if matches!(
606                                            send_or_buffer_iceberg_task_report(
607                                                &request_sender,
608                                                &mut pending_task_reports,
609                                                report,
610                                            ),
611                                            ReportSendResult::RestartStream
612                                        ) {
613                                            continue 'start_stream;
614                                        }
615                                        continue 'consume_stream;
616                                    }
617                                };
618
619                                let sink_id = task_execution.sink_id;
620                                let plan_runners = task_execution.plan_runners;
621
622                                if plan_runners.is_empty() {
623                                    tracing::info!(task_id, sink_id, "No plans to execute");
624                                    let report = build_iceberg_task_report(task_id, sink_id, None);
625                                    if matches!(
626                                        send_or_buffer_iceberg_task_report(
627                                            &request_sender,
628                                            &mut pending_task_reports,
629                                            report,
630                                        ),
631                                        ReportSendResult::RestartStream
632                                    ) {
633                                        continue 'start_stream;
634                                    }
635                                    continue 'consume_stream;
636                                }
637
638                                // Enqueue each plan runner independently
639                                let total_plans = plan_runners.len();
640                                let mut enqueued_count = 0;
641
642                                for runner in plan_runners {
643                                    let meta = runner.to_meta();
644                                    let plan_index = meta.plan_index;
645                                    let required_parallelism = runner.required_parallelism();
646                                    let push_result = task_queue.push(meta, Some(runner));
647
648                                    match push_result {
649                                        PushResult::Added => {
650                                            enqueued_count += 1;
651                                            tracing::debug!(
652                                                task_id = task_id,
653                                                plan_index = plan_index,
654                                                required_parallelism = required_parallelism,
655                                                "Iceberg plan runner added to queue"
656                                            );
657                                        },
658                                        PushResult::RejectedCapacity => {
659                                            tracing::warn!(
660                                                task_id = task_id,
661                                                required_parallelism = required_parallelism,
662                                                pending_budget = pending_parallelism_budget,
663                                                enqueued_count = enqueued_count,
664                                                total_plans = total_plans,
665                                                "Iceberg plan runner rejected - queue capacity exceeded"
666                                            );
667                                            // Stop enqueuing remaining plans
668                                            break;
669                                        },
670                                        PushResult::RejectedTooLarge => {
671                                            tracing::error!(
672                                                task_id = task_id,
673                                                required_parallelism = required_parallelism,
674                                                max_parallelism = max_task_parallelism,
675                                                "Iceberg plan runner rejected - parallelism exceeds max"
676                                            );
677                                        },
678                                        PushResult::RejectedInvalidParallelism => {
679                                            tracing::error!(
680                                                task_id = task_id,
681                                                required_parallelism = required_parallelism,
682                                                "Iceberg plan runner rejected - invalid parallelism"
683                                            );
684                                        },
685                                        PushResult::RejectedDuplicate => {
686                                            tracing::error!(
687                                                task_id = task_id,
688                                                plan_index = plan_index,
689                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
690                                            );
691                                        }
692                                    }
693                                }
694
695                                if enqueued_count == 0 {
696                                    let report = build_iceberg_task_report(
697                                        task_id,
698                                        sink_id,
699                                        Some("Failed to enqueue all iceberg compaction plans".to_owned()),
700                                    );
701                                    if matches!(
702                                        send_or_buffer_iceberg_task_report(
703                                            &request_sender,
704                                            &mut pending_task_reports,
705                                            report,
706                                        ),
707                                        ReportSendResult::RestartStream
708                                    ) {
709                                        continue 'start_stream;
710                                    }
711                                } else {
712                                    task_trackers.insert(
713                                        task_id,
714                                        IcebergTaskTracker::new(sink_id, enqueued_count),
715                                    );
716                                }
717
718                                tracing::info!(
719                                    task_id = task_id,
720                                    sink_id = sink_id,
721                                    total_plans = total_plans,
722                                    enqueued_count = enqueued_count,
723                                    "Enqueued {} of {} Iceberg plan runners",
724                                    enqueued_count,
725                                    total_plans
726                                );
727                            },
728                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
729                                // set flag
730                                pull_task_ack = true;
731                            },
732                        }
733                    }
734                    Some(Err(e)) => {
735                        tracing::warn!("Failed to consume stream. {}", e.message());
736                        continue 'start_stream;
737                    }
738                    _ => {
739                        // The stream is exhausted
740                        continue 'start_stream;
741                    }
742                }
743            }
744        }
745    });
746
747    (join_handle, shutdown_tx)
748}
749
750/// The background compaction thread that receives compaction tasks from hummock compaction
751/// manager and runs compaction tasks.
752#[must_use]
753pub fn start_compactor(
754    compactor_context: CompactorContext,
755    hummock_meta_client: Arc<dyn HummockMetaClient>,
756    object_id_manager: Arc<ObjectIdManager>,
757    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
758) -> (JoinHandle<()>, Sender<()>) {
759    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
760    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
761    let stream_retry_interval = Duration::from_secs(30);
762    let task_progress = compactor_context.task_progress_manager.clone();
763    let periodic_event_update_interval = Duration::from_millis(1000);
764
765    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
766        * compactor_context.storage_opts.compactor_max_task_multiplier)
767        .ceil() as u32;
768    let running_task_parallelism = Arc::new(AtomicU32::new(0));
769
770    const MAX_PULL_TASK_COUNT: u32 = 4;
771    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
772
773    assert_ge!(
774        compactor_context.storage_opts.compactor_max_task_multiplier,
775        0.0
776    );
777
778    let join_handle = tokio::spawn(async move {
779        let shutdown_map = CompactionShutdownMap::default();
780        let mut min_interval = tokio::time::interval(stream_retry_interval);
781        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
782
783        // Track last logged state to avoid duplicate logs
784        let mut log_throttler =
785            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
786
787        // This outer loop is to recreate stream.
788        'start_stream: loop {
789            // reset state
790            // pull_task_ack.store(true, Ordering::SeqCst);
791            let mut pull_task_ack = true;
792            tokio::select! {
793                // Wait for interval.
794                _ = min_interval.tick() => {},
795                // Shutdown compactor.
796                _ = &mut shutdown_rx => {
797                    tracing::info!("Compactor is shutting down");
798                    return;
799                }
800            }
801
802            let (request_sender, response_event_stream) =
803                match hummock_meta_client.subscribe_compaction_event().await {
804                    Ok((request_sender, response_event_stream)) => {
805                        tracing::debug!("Succeeded subscribe_compaction_event.");
806                        (request_sender, response_event_stream)
807                    }
808
809                    Err(e) => {
810                        tracing::warn!(
811                            error = %e.as_report(),
812                            "Subscribing to compaction tasks failed with error. Will retry.",
813                        );
814                        continue 'start_stream;
815                    }
816                };
817
818            pin_mut!(response_event_stream);
819
820            let executor = compactor_context.compaction_executor.clone();
821            let object_id_manager = object_id_manager.clone();
822
823            // This inner loop is to consume stream or report task progress.
824            let mut event_loop_iteration_now = Instant::now();
825            'consume_stream: loop {
826                {
827                    // report
828                    compactor_context
829                        .compactor_metrics
830                        .compaction_event_loop_iteration_latency
831                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
832                    event_loop_iteration_now = Instant::now();
833                }
834
835                let running_task_parallelism = running_task_parallelism.clone();
836                let request_sender = request_sender.clone();
837                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
838                    _ = periodic_event_interval.tick() => {
839                        let progress_list = get_task_progress(task_progress.clone());
840
841                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
842                            event: Some(RequestEvent::HeartBeat(
843                                HeartBeat {
844                                    progress: progress_list
845                                }
846                            )),
847                            create_at: SystemTime::now()
848                                .duration_since(std::time::UNIX_EPOCH)
849                                .expect("Clock may have gone backwards")
850                                .as_millis() as u64,
851                        }) {
852                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
853                            // re subscribe stream
854                            continue 'start_stream;
855                        }
856
857
858                        let mut pending_pull_task_count = 0;
859                        if pull_task_ack {
860                            // TODO: Compute parallelism on meta side
861                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
862
863                            if pending_pull_task_count > 0 {
864                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
865                                    event: Some(RequestEvent::PullTask(
866                                        PullTask {
867                                            pull_task_count: pending_pull_task_count,
868                                        }
869                                    )),
870                                    create_at: SystemTime::now()
871                                        .duration_since(std::time::UNIX_EPOCH)
872                                        .expect("Clock may have gone backwards")
873                                        .as_millis() as u64,
874                                }) {
875                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
876
877                                    // re subscribe stream
878                                    continue 'start_stream;
879                                } else {
880                                    pull_task_ack = false;
881                                }
882                            }
883                        }
884
885                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
886                        let current_state = CompactionLogState {
887                            running_parallelism: running_count,
888                            pull_task_ack,
889                            pending_pull_task_count,
890                        };
891
892                        // Log only when state changes or periodically as heartbeat
893                        if log_throttler.should_log(&current_state) {
894                            tracing::info!(
895                                running_parallelism_count = %current_state.running_parallelism,
896                                pull_task_ack = %current_state.pull_task_ack,
897                                pending_pull_task_count = %current_state.pending_pull_task_count
898                            );
899                            log_throttler.update(current_state);
900                        }
901
902                        continue;
903                    }
904                    event = response_event_stream.next() => {
905                        event
906                    }
907
908                    _ = &mut shutdown_rx => {
909                        tracing::info!("Compactor is shutting down");
910                        return
911                    }
912                };
913
914                fn send_report_task_event(
915                    compact_task: &CompactTask,
916                    table_stats: TableStatsMap,
917                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
918                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
919                ) {
920                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
921                        event: Some(RequestEvent::ReportTask(ReportTask {
922                            task_id: compact_task.task_id,
923                            task_status: compact_task.task_status.into(),
924                            sorted_output_ssts: compact_task
925                                .sorted_output_ssts
926                                .iter()
927                                .map(|sst| sst.into())
928                                .collect(),
929                            table_stats_change: to_prost_table_stats_map(table_stats),
930                            object_timestamps,
931                        })),
932                        create_at: SystemTime::now()
933                            .duration_since(std::time::UNIX_EPOCH)
934                            .expect("Clock may have gone backwards")
935                            .as_millis() as u64,
936                    }) {
937                        let task_id = compact_task.task_id;
938                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
939                    }
940                }
941
942                match event {
943                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
944                        let event = match event {
945                            Some(event) => event,
946                            None => continue 'consume_stream,
947                        };
948                        let shutdown = shutdown_map.clone();
949                        let context = compactor_context.clone();
950                        let consumed_latency_ms = SystemTime::now()
951                            .duration_since(std::time::UNIX_EPOCH)
952                            .expect("Clock may have gone backwards")
953                            .as_millis() as u64
954                            - create_at;
955                        context
956                            .compactor_metrics
957                            .compaction_event_consumed_latency
958                            .observe(consumed_latency_ms as _);
959
960                        let object_id_manager = object_id_manager.clone();
961                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
962
963                        match event {
964                            ResponseEvent::CompactTask(compact_task) => {
965                                let compact_task = CompactTask::from(compact_task);
966                                let parallelism =
967                                    calculate_task_parallelism(&compact_task, &context);
968
969                                assert_ne!(parallelism, 0, "splits cannot be empty");
970
971                                if (max_task_parallelism
972                                    - running_task_parallelism.load(Ordering::SeqCst))
973                                    < parallelism as u32
974                                {
975                                    tracing::warn!(
976                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
977                                        compact_task.task_id,
978                                        parallelism,
979                                        max_task_parallelism,
980                                        running_task_parallelism.load(Ordering::Relaxed),
981                                    );
982                                    let (compact_task, table_stats, object_timestamps) =
983                                        compact_done(
984                                            compact_task,
985                                            context.clone(),
986                                            vec![],
987                                            TaskStatus::NoAvailCpuResourceCanceled,
988                                        );
989
990                                    send_report_task_event(
991                                        &compact_task,
992                                        table_stats,
993                                        object_timestamps,
994                                        &request_sender,
995                                    );
996
997                                    continue 'consume_stream;
998                                }
999
1000                                running_task_parallelism
1001                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
1002                                executor.spawn(async move {
1003                                    let (tx, rx) = tokio::sync::oneshot::channel();
1004                                    let task_id = compact_task.task_id;
1005                                    shutdown.lock().unwrap().insert(task_id, tx);
1006
1007                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1008                                        context.clone(),
1009                                        compact_task,
1010                                        rx,
1011                                        object_id_manager.clone(),
1012                                        compaction_catalog_manager_ref.clone(),
1013                                    )
1014                                    .await;
1015
1016                                    shutdown.lock().unwrap().remove(&task_id);
1017                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1018
1019                                    send_report_task_event(
1020                                        &compact_task,
1021                                        table_stats,
1022                                        object_timestamps,
1023                                        &request_sender,
1024                                    );
1025
1026                                    let enable_check_compaction_result =
1027                                    context.storage_opts.check_compaction_result;
1028                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1029
1030                                    if enable_check_compaction_result && need_check_task {
1031                                        let compact_table_ids = compact_task.build_compact_table_ids();
1032                                        match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
1033                                            Ok(compaction_catalog_agent_ref) =>  {
1034                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1035                                                {
1036                                                    Err(e) => {
1037                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1038                                                    }
1039                                                    Ok(true) => (),
1040                                                    Ok(false) => {
1041                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1042                                                    }
1043                                                }
1044                                            },
1045                                            Err(e) => {
1046                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1047                                            }
1048                                        }
1049                                    }
1050                                });
1051                            }
1052                            #[expect(deprecated)]
1053                            ResponseEvent::VacuumTask(_) => {
1054                                unreachable!("unexpected vacuum task");
1055                            }
1056                            #[expect(deprecated)]
1057                            ResponseEvent::FullScanTask(_) => {
1058                                unreachable!("unexpected scan task");
1059                            }
1060                            #[expect(deprecated)]
1061                            ResponseEvent::ValidationTask(validation_task) => {
1062                                let validation_task = ValidationTask::from(validation_task);
1063                                executor.spawn(async move {
1064                                    validate_ssts(validation_task, context.sstable_store.clone())
1065                                        .await;
1066                                });
1067                            }
1068                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1069                                .lock()
1070                                .unwrap()
1071                                .remove(&cancel_compact_task.task_id)
1072                            {
1073                                Some(tx) => {
1074                                    if tx.send(()).is_err() {
1075                                        tracing::warn!(
1076                                            "Cancellation of compaction task failed. task_id: {}",
1077                                            cancel_compact_task.task_id
1078                                        );
1079                                    }
1080                                }
1081                                _ => {
1082                                    tracing::warn!(
1083                                        "Attempting to cancel non-existent compaction task. task_id: {}",
1084                                        cancel_compact_task.task_id
1085                                    );
1086                                }
1087                            },
1088
1089                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
1090                                // set flag
1091                                pull_task_ack = true;
1092                            }
1093                        }
1094                    }
1095                    Some(Err(e)) => {
1096                        tracing::warn!("Failed to consume stream. {}", e.message());
1097                        continue 'start_stream;
1098                    }
1099                    _ => {
1100                        // The stream is exhausted
1101                        continue 'start_stream;
1102                    }
1103                }
1104            }
1105        }
1106    });
1107
1108    (join_handle, shutdown_tx)
1109}
1110
1111/// The background compaction thread that receives compaction tasks from hummock compaction
1112/// manager and runs compaction tasks.
1113#[must_use]
1114pub fn start_shared_compactor(
1115    grpc_proxy_client: GrpcCompactorProxyClient,
1116    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1117    context: CompactorContext,
1118) -> (JoinHandle<()>, Sender<()>) {
1119    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1120    let task_progress = context.task_progress_manager.clone();
1121    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1122    let periodic_event_update_interval = Duration::from_millis(1000);
1123
1124    let join_handle = tokio::spawn(async move {
1125        let shutdown_map = CompactionShutdownMap::default();
1126
1127        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1128        let executor = context.compaction_executor.clone();
1129        let report_heartbeat_client = grpc_proxy_client.clone();
1130        'consume_stream: loop {
1131            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1132                _ = periodic_event_interval.tick() => {
1133                    let progress_list = get_task_progress(task_progress.clone());
1134                    let report_compaction_task_request = ReportCompactionTaskRequest{
1135                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1136                            SharedHeartBeat {
1137                                progress: progress_list
1138                            }
1139                        )),
1140                     };
1141                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1142                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1143                    }
1144                    continue
1145                }
1146
1147
1148                _ = &mut shutdown_rx => {
1149                    tracing::info!("Compactor is shutting down");
1150                    return
1151                }
1152
1153                request = receiver.recv() => {
1154                    request
1155                }
1156
1157            };
1158            match request {
1159                Some(request) => {
1160                    let context = context.clone();
1161                    let shutdown = shutdown_map.clone();
1162
1163                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1164                    executor.spawn(async move {
1165                        let DispatchCompactionTaskRequest {
1166                            tables,
1167                            output_object_ids,
1168                            task: dispatch_task,
1169                        } = request.into_inner();
1170                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1171                            acc.insert(table.id, table);
1172                            acc
1173                        });
1174
1175                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1176                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1177                        let shared_compactor_object_id_manager =
1178                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1179                            match dispatch_task.unwrap() {
1180                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1181                                    let compact_task = CompactTask::from(&compact_task);
1182                                    let (tx, rx) = tokio::sync::oneshot::channel();
1183                                    let task_id = compact_task.task_id;
1184                                    shutdown.lock().unwrap().insert(task_id, tx);
1185
1186                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1187                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1188                                        context.clone(),
1189                                        compact_task,
1190                                        rx,
1191                                        shared_compactor_object_id_manager,
1192                                        compaction_catalog_agent_ref.clone(),
1193                                    )
1194                                    .await;
1195                                    shutdown.lock().unwrap().remove(&task_id);
1196                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1197                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1198                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1199                                            table_stats_change: to_prost_table_stats_map(table_stats),
1200                                            object_timestamps,
1201                                    })),
1202                                    };
1203
1204                                    match cloned_grpc_proxy_client
1205                                        .report_compaction_task(report_compaction_task_request)
1206                                        .await
1207                                    {
1208                                        Ok(_) => {
1209                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1210                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1211                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1212                                            if enable_check_compaction_result && need_check_task {
1213                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1214                                                    Err(e) => {
1215                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1216                                                    },
1217                                                    Ok(true) => (),
1218                                                    Ok(false) => {
1219                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1220                                                    }
1221                                                }
1222                                            }
1223                                        }
1224                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1225                                    }
1226
1227                                }
1228                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1229                                    unreachable!("unexpected vacuum task");
1230                                }
1231                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1232                                    unreachable!("unexpected scan task");
1233                                }
1234                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1235                                    let validation_task = ValidationTask::from(validation_task);
1236                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1237                                }
1238                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1239                                    match shutdown
1240                                        .lock()
1241                                        .unwrap()
1242                                        .remove(&cancel_compact_task.task_id)
1243                                    { Some(tx) => {
1244                                        if tx.send(()).is_err() {
1245                                            tracing::warn!(
1246                                                "Cancellation of compaction task failed. task_id: {}",
1247                                                cancel_compact_task.task_id
1248                                            );
1249                                        }
1250                                    } _ => {
1251                                        tracing::warn!(
1252                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1253                                            cancel_compact_task.task_id
1254                                        );
1255                                    }}
1256                                }
1257                            }
1258                    });
1259                }
1260                None => continue 'consume_stream,
1261            }
1262        }
1263    });
1264    (join_handle, shutdown_tx)
1265}
1266
1267fn get_task_progress(
1268    task_progress: Arc<
1269        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1270    >,
1271) -> Vec<CompactTaskProgress> {
1272    let mut progress_list = Vec::new();
1273    for (&task_id, progress) in &*task_progress.lock() {
1274        progress_list.push(progress.snapshot(task_id));
1275    }
1276    progress_list
1277}
1278
1279/// Schedule queued tasks if we have capacity
1280fn schedule_queued_tasks(
1281    task_queue: &mut IcebergTaskQueue,
1282    compactor_context: &CompactorContext,
1283    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1284    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1285) {
1286    while let Some(popped_task) = task_queue.pop() {
1287        let task_id = popped_task.meta.task_id;
1288        let plan_index = popped_task.meta.plan_index;
1289        let task_key = (task_id, plan_index);
1290
1291        // Get unique_ident before moving runner
1292        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1293
1294        let Some(runner) = popped_task.runner else {
1295            tracing::error!(
1296                task_id = task_id,
1297                plan_index = plan_index,
1298                "Popped task missing runner - this should not happen"
1299            );
1300            task_queue.finish_running(task_key);
1301            continue;
1302        };
1303
1304        let executor = compactor_context.compaction_executor.clone();
1305        let shutdown_map_clone = shutdown_map.clone();
1306        let completion_tx_clone = task_completion_tx.clone();
1307
1308        tracing::info!(
1309            task_id = task_id,
1310            plan_index = plan_index,
1311            unique_ident = ?unique_ident,
1312            required_parallelism = popped_task.meta.required_parallelism,
1313            "Starting iceberg compaction task from queue"
1314        );
1315
1316        executor.spawn(async move {
1317            let (tx, rx) = tokio::sync::oneshot::channel();
1318            {
1319                let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1320                shutdown_guard.insert(task_key, tx);
1321            }
1322            let _cleanup_guard = scopeguard::guard(shutdown_map_clone.clone(), move |shutdown_map| {
1323                let mut shutdown_guard = shutdown_map.lock().unwrap();
1324                shutdown_guard.remove(&task_key);
1325            });
1326
1327            let result = Box::pin(runner.compact(rx)).await;
1328
1329            let completion = match result {
1330                Ok(_) => IcebergPlanCompletion {
1331                    task_key,
1332                    error_message: None,
1333                },
1334                Err(e) => {
1335                    tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1336                    IcebergPlanCompletion {
1337                        task_key,
1338                        error_message: Some(e.to_report_string()),
1339                    }
1340                }
1341            };
1342
1343            if completion_tx_clone.send(completion).is_err() {
1344                tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1345            }
1346        });
1347    }
1348}
1349
1350/// Handle pulling new tasks from meta service
1351/// Returns true if the stream should be restarted
1352fn handle_meta_task_pulling(
1353    pull_task_ack: &mut bool,
1354    task_queue: &IcebergTaskQueue,
1355    max_task_parallelism: u32,
1356    max_pull_task_count: u32,
1357    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1358    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1359) -> bool {
1360    let mut pending_pull_task_count = 0;
1361    if *pull_task_ack {
1362        // Use queue's running parallelism for pull decision
1363        let current_running_parallelism = task_queue.running_parallelism_sum();
1364        pending_pull_task_count =
1365            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1366
1367        if pending_pull_task_count > 0 {
1368            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1369                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1370                    subscribe_iceberg_compaction_event_request::PullTask {
1371                        pull_task_count: pending_pull_task_count,
1372                    },
1373                )),
1374                create_at: SystemTime::now()
1375                    .duration_since(std::time::UNIX_EPOCH)
1376                    .expect("Clock may have gone backwards")
1377                    .as_millis() as u64,
1378            }) {
1379                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1380                return true; // Signal to restart stream
1381            } else {
1382                *pull_task_ack = false;
1383            }
1384        }
1385    }
1386
1387    let running_count = task_queue.running_parallelism_sum();
1388    let waiting_count = task_queue.waiting_parallelism_sum();
1389    let available_count = max_task_parallelism.saturating_sub(running_count);
1390    let current_state = IcebergCompactionLogState {
1391        running_parallelism: running_count,
1392        waiting_parallelism: waiting_count,
1393        available_parallelism: available_count,
1394        pull_task_ack: *pull_task_ack,
1395        pending_pull_task_count,
1396    };
1397
1398    // Log only when state changes or periodically as heartbeat
1399    if log_throttler.should_log(&current_state) {
1400        tracing::info!(
1401            running_parallelism_count = %current_state.running_parallelism,
1402            waiting_parallelism_count = %current_state.waiting_parallelism,
1403            available_parallelism = %current_state.available_parallelism,
1404            pull_task_ack = %current_state.pull_task_ack,
1405            pending_pull_task_count = %current_state.pending_pull_task_count
1406        );
1407        log_throttler.update(current_state);
1408    }
1409
1410    false // No need to restart stream
1411}
1412
1413#[cfg(test)]
1414mod tests {
1415    use super::*;
1416
1417    #[test]
1418    fn test_log_state_equality() {
1419        // Test CompactionLogState
1420        let state1 = CompactionLogState {
1421            running_parallelism: 10,
1422            pull_task_ack: true,
1423            pending_pull_task_count: 2,
1424        };
1425        let state2 = CompactionLogState {
1426            running_parallelism: 10,
1427            pull_task_ack: true,
1428            pending_pull_task_count: 2,
1429        };
1430        let state3 = CompactionLogState {
1431            running_parallelism: 11,
1432            pull_task_ack: true,
1433            pending_pull_task_count: 2,
1434        };
1435        assert_eq!(state1, state2);
1436        assert_ne!(state1, state3);
1437
1438        // Test IcebergCompactionLogState
1439        let ice_state1 = IcebergCompactionLogState {
1440            running_parallelism: 10,
1441            waiting_parallelism: 5,
1442            available_parallelism: 15,
1443            pull_task_ack: true,
1444            pending_pull_task_count: 2,
1445        };
1446        let ice_state2 = IcebergCompactionLogState {
1447            running_parallelism: 10,
1448            waiting_parallelism: 6,
1449            available_parallelism: 15,
1450            pull_task_ack: true,
1451            pending_pull_task_count: 2,
1452        };
1453        assert_ne!(ice_state1, ice_state2);
1454    }
1455
1456    #[test]
1457    fn test_log_throttler_state_change_detection() {
1458        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1459        let state1 = CompactionLogState {
1460            running_parallelism: 10,
1461            pull_task_ack: true,
1462            pending_pull_task_count: 2,
1463        };
1464        let state2 = CompactionLogState {
1465            running_parallelism: 11,
1466            pull_task_ack: true,
1467            pending_pull_task_count: 2,
1468        };
1469
1470        // First call should always log
1471        assert!(throttler.should_log(&state1));
1472        throttler.update(state1.clone());
1473
1474        // Same state should not log
1475        assert!(!throttler.should_log(&state1));
1476
1477        // Changed state should log
1478        assert!(throttler.should_log(&state2));
1479        throttler.update(state2.clone());
1480
1481        // Same state again should not log
1482        assert!(!throttler.should_log(&state2));
1483    }
1484
1485    #[test]
1486    fn test_log_throttler_heartbeat() {
1487        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1488        let state = CompactionLogState {
1489            running_parallelism: 10,
1490            pull_task_ack: true,
1491            pending_pull_task_count: 2,
1492        };
1493
1494        // First call should log
1495        assert!(throttler.should_log(&state));
1496        throttler.update(state.clone());
1497
1498        // Same state immediately should not log
1499        assert!(!throttler.should_log(&state));
1500
1501        // Wait for heartbeat interval to pass
1502        std::thread::sleep(Duration::from_millis(15));
1503
1504        // Same state after interval should log (heartbeat)
1505        assert!(throttler.should_log(&state));
1506    }
1507}