risingwave_storage/hummock/compactor/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23    Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24    ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27    SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28    subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52    CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55    CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58// Import iceberg compactor runner types from the local `iceberg_compaction` module.
59use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61    IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62    ReportSendResult, build_iceberg_task_report, create_task_execution,
63    flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69    HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73    Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77    CompactTaskProgress, PbSstableFilterType, ReportCompactionTaskRequest,
78    SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87    CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88    check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93    GetObjectId, HummockErrorInner, HummockResult, ObjectIdManager, SstableBuilderOptions,
94    Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97    CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104    BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder,
105    SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106    validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110/// Heartbeat logging interval for compaction tasks
111const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113/// Represents the compaction task state for logging purposes
114#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116    running_parallelism: u32,
117    pull_task_ack: bool,
118    pending_pull_task_count: u32,
119}
120
121/// Represents the iceberg compaction task state for logging purposes
122#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124    running_parallelism: u32,
125    waiting_parallelism: u32,
126    available_parallelism: u32,
127    pull_task_ack: bool,
128    pending_pull_task_count: u32,
129}
130
131/// Controls periodic logging with state change detection
132struct LogThrottler<T: PartialEq> {
133    last_logged_state: Option<T>,
134    last_heartbeat: Instant,
135    heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139    fn new(heartbeat_interval: Duration) -> Self {
140        Self {
141            last_logged_state: None,
142            last_heartbeat: Instant::now(),
143            heartbeat_interval,
144        }
145    }
146
147    /// Returns true if logging should occur (state changed or heartbeat interval elapsed)
148    fn should_log(&self, current_state: &T) -> bool {
149        self.last_logged_state.as_ref() != Some(current_state)
150            || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151    }
152
153    /// Updates the state and heartbeat timestamp after logging
154    fn update(&mut self, current_state: T) {
155        self.last_logged_state = Some(current_state);
156        self.last_heartbeat = Instant::now();
157    }
158}
159
160/// Implementation of Hummock compaction.
161pub struct Compactor {
162    /// The context of the compactor.
163    context: CompactorContext,
164    object_id_getter: Arc<dyn GetObjectId>,
165    task_config: TaskConfig,
166    options: SstableBuilderOptions,
167    get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173    /// Create a new compactor.
174    pub fn new(
175        context: CompactorContext,
176        options: SstableBuilderOptions,
177        task_config: TaskConfig,
178        object_id_getter: Arc<dyn GetObjectId>,
179    ) -> Self {
180        Self {
181            context,
182            options,
183            task_config,
184            get_id_time: Arc::new(AtomicU64::new(0)),
185            object_id_getter,
186        }
187    }
188
189    /// Compact the given key range and merge iterator.
190    /// Upon a successful return, the built SSTs are already uploaded to object store.
191    ///
192    /// `task_progress` is only used for tasks on the compactor.
193    async fn compact_key_range(
194        &self,
195        iter: impl HummockIterator<Direction = Forward>,
196        compaction_filter: impl CompactionFilter,
197        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198        task_progress: Option<Arc<TaskProgress>>,
199        task_id: Option<HummockCompactionTaskId>,
200        split_index: Option<usize>,
201    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202        // Monitor time cost building shared buffer to SSTs.
203        let compact_timer = if self.context.is_share_buffer_compact {
204            self.context
205                .compactor_metrics
206                .write_build_l0_sst_duration
207                .start_timer()
208        } else {
209            self.context
210                .compactor_metrics
211                .compact_sst_duration
212                .start_timer()
213        };
214
215        let (split_table_outputs, table_stats_map) = {
216            let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217            match (
218                self.task_config.sstable_filter_kind,
219                self.task_config.use_block_based_filter,
220            ) {
221                (PbSstableFilterType::SstableFilterXor8, true) => {
222                    self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
223                        factory,
224                        iter,
225                        compaction_filter,
226                        compaction_catalog_agent_ref,
227                        task_progress.clone(),
228                        self.object_id_getter.clone(),
229                    )
230                    .instrument_await("compact".verbose())
231                    .await?
232                }
233                (PbSstableFilterType::SstableFilterXor8, false) => {
234                    self.compact_key_range_impl::<_, Xor8FilterBuilder>(
235                        factory,
236                        iter,
237                        compaction_filter,
238                        compaction_catalog_agent_ref,
239                        task_progress.clone(),
240                        self.object_id_getter.clone(),
241                    )
242                    .instrument_await("compact".verbose())
243                    .await?
244                }
245                (PbSstableFilterType::SstableFilterXor16, true) => {
246                    self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
247                        factory,
248                        iter,
249                        compaction_filter,
250                        compaction_catalog_agent_ref,
251                        task_progress.clone(),
252                        self.object_id_getter.clone(),
253                    )
254                    .instrument_await("compact".verbose())
255                    .await?
256                }
257                (PbSstableFilterType::SstableFilterXor16, false) => {
258                    self.compact_key_range_impl::<_, Xor16FilterBuilder>(
259                        factory,
260                        iter,
261                        compaction_filter,
262                        compaction_catalog_agent_ref,
263                        task_progress.clone(),
264                        self.object_id_getter.clone(),
265                    )
266                    .instrument_await("compact".verbose())
267                    .await?
268                }
269                (kind, _) => unreachable!("unsupported sstable filter kind in compactor: {kind:?}"),
270            }
271        };
272
273        compact_timer.observe_duration();
274
275        Self::report_progress(
276            self.context.compactor_metrics.clone(),
277            task_progress,
278            &split_table_outputs,
279            self.context.is_share_buffer_compact,
280        );
281
282        self.context
283            .compactor_metrics
284            .get_table_id_total_time_duration
285            .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
286
287        debug_assert!(
288            split_table_outputs
289                .iter()
290                .all(|table_info| table_info.sst_info.table_ids.is_sorted())
291        );
292
293        if task_id.is_some() {
294            // skip shared buffer compaction
295            tracing::info!(
296                "Finish Task {:?} split_index {:?} sst count {}",
297                task_id,
298                split_index,
299                split_table_outputs.len()
300            );
301        }
302        Ok((split_table_outputs, table_stats_map))
303    }
304
305    pub fn report_progress(
306        metrics: Arc<CompactorMetrics>,
307        task_progress: Option<Arc<TaskProgress>>,
308        ssts: &Vec<LocalSstableInfo>,
309        is_share_buffer_compact: bool,
310    ) {
311        for sst_info in ssts {
312            let sst_size = sst_info.file_size();
313            if let Some(tracker) = &task_progress {
314                tracker.inc_ssts_uploaded();
315                tracker.dec_num_pending_write_io();
316            }
317            if is_share_buffer_compact {
318                metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
319            } else {
320                metrics.compaction_upload_sst_counts.inc();
321            }
322        }
323    }
324
325    async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
326        &self,
327        writer_factory: F,
328        iter: impl HummockIterator<Direction = Forward>,
329        compaction_filter: impl CompactionFilter,
330        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331        task_progress: Option<Arc<TaskProgress>>,
332        object_id_getter: Arc<dyn GetObjectId>,
333    ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
334        let builder_factory = RemoteBuilderFactory::<F, B> {
335            object_id_getter,
336            limiter: self.context.memory_limiter.clone(),
337            options: self.options.clone(),
338            policy: self.task_config.cache_policy,
339            remote_rpc_cost: self.get_id_time.clone(),
340            compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
341            sstable_writer_factory: writer_factory,
342            _phantom: PhantomData,
343        };
344
345        let mut sst_builder = CapacitySplitTableBuilder::new(
346            builder_factory,
347            self.context.compactor_metrics.clone(),
348            task_progress.clone(),
349            self.task_config.table_vnode_partition.clone(),
350            self.context
351                .storage_opts
352                .compactor_concurrent_uploading_sst_count,
353            compaction_catalog_agent_ref,
354        );
355        let compaction_statistics = compact_and_build_sst(
356            &mut sst_builder,
357            &self.task_config,
358            self.context.compactor_metrics.clone(),
359            iter,
360            compaction_filter,
361        )
362        .instrument_await("compact_and_build_sst".verbose())
363        .await?;
364
365        let ssts = sst_builder
366            .finish()
367            .instrument_await("builder_finish".verbose())
368            .await?;
369
370        Ok((ssts, compaction_statistics))
371    }
372}
373
374/// The background compaction thread that receives compaction tasks from hummock compaction
375/// manager and runs compaction tasks.
376#[must_use]
377pub fn start_iceberg_compactor(
378    compactor_context: CompactorContext,
379    hummock_meta_client: Arc<dyn HummockMetaClient>,
380) -> (JoinHandle<()>, Sender<()>) {
381    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
382    let stream_retry_interval = Duration::from_secs(30);
383    let periodic_event_update_interval = Duration::from_millis(
384        compactor_context
385            .storage_opts
386            .iceberg_compaction_pull_interval_ms,
387    );
388    let worker_num = compactor_context.compaction_executor.worker_num();
389
390    let max_task_parallelism: u32 = (worker_num as f32
391        * compactor_context.storage_opts.compactor_max_task_multiplier)
392        .ceil() as u32;
393
394    const MAX_PULL_TASK_COUNT: u32 = 4;
395    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
396
397    assert_ge!(
398        compactor_context.storage_opts.compactor_max_task_multiplier,
399        0.0
400    );
401
402    let join_handle = tokio::spawn(async move {
403        // Initialize task queue with event-driven scheduling
404        let pending_parallelism_budget = (max_task_parallelism as f32
405            * compactor_context
406                .storage_opts
407                .iceberg_compaction_pending_parallelism_budget_multiplier)
408            .ceil() as u32;
409        let mut task_queue =
410            IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
411
412        // Shutdown tracking for running tasks (task_key -> shutdown_sender)
413        let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
414
415        // Channel for task completion notifications
416        let (task_completion_tx, mut task_completion_rx) =
417            tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
418        let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
419        // Buffers task reports that failed to send on the current stream.
420        // The queue is flushed in FIFO order after the stream reconnects.
421        let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
422
423        let mut min_interval = tokio::time::interval(stream_retry_interval);
424        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
425
426        // Track last logged state to avoid duplicate logs
427        let mut log_throttler =
428            LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
429
430        // This outer loop is to recreate stream.
431        'start_stream: loop {
432            // reset state
433            // pull_task_ack.store(true, Ordering::SeqCst);
434            let mut pull_task_ack = true;
435            tokio::select! {
436                // Wait for interval.
437                _ = min_interval.tick() => {},
438                // Shutdown compactor.
439                _ = &mut shutdown_rx => {
440                    tracing::info!("Compactor is shutting down");
441                    return;
442                }
443            }
444
445            let (request_sender, response_event_stream) = match hummock_meta_client
446                .subscribe_iceberg_compaction_event()
447                .await
448            {
449                Ok((request_sender, response_event_stream)) => {
450                    tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
451                    (request_sender, response_event_stream)
452                }
453
454                Err(e) => {
455                    tracing::warn!(
456                        error = %e.as_report(),
457                        "Subscribing to iceberg compaction tasks failed with error. Will retry.",
458                    );
459                    continue 'start_stream;
460                }
461            };
462
463            if matches!(
464                flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
465                ReportSendResult::RestartStream
466            ) {
467                continue 'start_stream;
468            }
469
470            pin_mut!(response_event_stream);
471
472            let _executor = compactor_context.compaction_executor.clone();
473
474            // This inner loop is to consume stream or report task progress.
475            let mut event_loop_iteration_now = Instant::now();
476            'consume_stream: loop {
477                {
478                    // report
479                    compactor_context
480                        .compactor_metrics
481                        .compaction_event_loop_iteration_latency
482                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
483                    event_loop_iteration_now = Instant::now();
484                }
485
486                let request_sender = request_sender.clone();
487                let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
488                    // Handle task completion notifications
489                    Some(plan_completion) = task_completion_rx.recv() => {
490                        let task_key = plan_completion.task_key;
491                        let error_message = plan_completion.error_message;
492                        tracing::debug!(
493                            task_id = task_key.0,
494                            plan_index = task_key.1,
495                            success = error_message.is_none(),
496                            "Plan completed, updating queue state"
497                        );
498                        task_queue.finish_running(task_key);
499
500                        let completed_task_id = task_key.0;
501                        let Entry::Occupied(mut tracker_entry) =
502                            task_trackers.entry(completed_task_id)
503                        else {
504                            continue 'consume_stream;
505                        };
506                        tracker_entry.get_mut().record_completion(error_message);
507                        if !tracker_entry.get().is_finished() {
508                            continue 'consume_stream;
509                        }
510
511                        let report = tracker_entry.remove().into_report(completed_task_id);
512                        if matches!(
513                            send_or_buffer_iceberg_task_report(
514                                &request_sender,
515                                &mut pending_task_reports,
516                                report,
517                            ),
518                            ReportSendResult::RestartStream
519                        ) {
520                            continue 'start_stream;
521                        }
522                        continue 'consume_stream;
523                    }
524
525                    // Event-driven task scheduling - wait for tasks to become schedulable
526                    _ = task_queue.wait_schedulable() => {
527                        schedule_queued_tasks(
528                            &mut task_queue,
529                            &compactor_context,
530                            &shutdown_map,
531                            &task_completion_tx,
532                        );
533                        continue 'consume_stream;
534                    }
535
536                    _ = periodic_event_interval.tick() => {
537                        // Only handle meta task pulling in periodic tick
538                        let should_restart_stream = handle_meta_task_pulling(
539                            &mut pull_task_ack,
540                            &task_queue,
541                            max_task_parallelism,
542                            max_pull_task_count,
543                            &request_sender,
544                            &mut log_throttler,
545                        );
546
547                        if should_restart_stream {
548                            continue 'start_stream;
549                        }
550                        continue;
551                    }
552                    event = response_event_stream.next() => {
553                        event
554                    }
555
556                    _ = &mut shutdown_rx => {
557                        tracing::info!("Iceberg Compactor is shutting down");
558                        return
559                    }
560                };
561
562                match event {
563                    Some(Ok(SubscribeIcebergCompactionEventResponse {
564                        event,
565                        create_at: _create_at,
566                    })) => {
567                        let event = match event {
568                            Some(event) => event,
569                            None => continue 'consume_stream,
570                        };
571
572                        match event {
573                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
574                                let task_id = iceberg_compaction_task.task_id;
575                                let sink_id = iceberg_compaction_task.sink_id;
576                                // Note: write_parquet_properties is now built from sink config (IcebergConfig) in create_task_execution
577                                let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
578                                    .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
579                                    .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
580                                    .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
581                                    .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
582                                    .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
583                                    .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
584                                    .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
585                                    .enable_prefetch(compactor_context.storage_opts.iceberg_compaction_enable_prefetch)
586                                    .target_binpack_group_size_mb(
587                                        compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
588                                    )
589                                    .min_group_size_mb(
590                                        compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
591                                    )
592                                    .min_group_file_count(
593                                        compactor_context.storage_opts.iceberg_compaction_min_group_file_count
594                                    )
595                                    .build() {
596                                    Ok(config) => config,
597                                    Err(e) => {
598                                        tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
599                                        let report = build_iceberg_task_report(
600                                            task_id,
601                                            sink_id,
602                                            Some(format!(
603                                                "Failed to build iceberg compactor runner config: {}",
604                                                e.as_report()
605                                            )),
606                                        );
607                                        if matches!(
608                                            send_or_buffer_iceberg_task_report(
609                                                &request_sender,
610                                                &mut pending_task_reports,
611                                                report,
612                                            ),
613                                            ReportSendResult::RestartStream
614                                        ) {
615                                            continue 'start_stream;
616                                        }
617                                        continue 'consume_stream;
618                                    }
619                                };
620
621                                // Create task execution context and plan runners from the task
622                                let task_execution = match create_task_execution(
623                                    iceberg_compaction_task,
624                                    compactor_runner_config,
625                                    compactor_context.compactor_metrics.clone(),
626                                ).await {
627                                    Ok(task_execution) => task_execution,
628                                    Err(e) => {
629                                        tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
630                                        let report = build_iceberg_task_report(
631                                            task_id,
632                                            sink_id,
633                                            Some(format!(
634                                                "Failed to create iceberg compaction task execution: {}",
635                                                e.as_report()
636                                            )),
637                                        );
638                                        if matches!(
639                                            send_or_buffer_iceberg_task_report(
640                                                &request_sender,
641                                                &mut pending_task_reports,
642                                                report,
643                                            ),
644                                            ReportSendResult::RestartStream
645                                        ) {
646                                            continue 'start_stream;
647                                        }
648                                        continue 'consume_stream;
649                                    }
650                                };
651
652                                let sink_id = task_execution.sink_id;
653                                let plan_runners = task_execution.plan_runners;
654
655                                if plan_runners.is_empty() {
656                                    tracing::info!(task_id, sink_id, "No plans to execute");
657                                    let report = build_iceberg_task_report(task_id, sink_id, None);
658                                    if matches!(
659                                        send_or_buffer_iceberg_task_report(
660                                            &request_sender,
661                                            &mut pending_task_reports,
662                                            report,
663                                        ),
664                                        ReportSendResult::RestartStream
665                                    ) {
666                                        continue 'start_stream;
667                                    }
668                                    continue 'consume_stream;
669                                }
670
671                                // Enqueue each plan runner independently
672                                let total_plans = plan_runners.len();
673                                let mut enqueued_count = 0;
674
675                                for runner in plan_runners {
676                                    let meta = runner.to_meta();
677                                    let plan_index = meta.plan_index;
678                                    let required_parallelism = runner.required_parallelism();
679                                    let push_result = task_queue.push(meta, Some(runner));
680
681                                    match push_result {
682                                        PushResult::Added => {
683                                            enqueued_count += 1;
684                                            tracing::debug!(
685                                                task_id = task_id,
686                                                plan_index = plan_index,
687                                                required_parallelism = required_parallelism,
688                                                "Iceberg plan runner added to queue"
689                                            );
690                                        },
691                                        PushResult::RejectedCapacity => {
692                                            tracing::warn!(
693                                                task_id = task_id,
694                                                required_parallelism = required_parallelism,
695                                                pending_budget = pending_parallelism_budget,
696                                                enqueued_count = enqueued_count,
697                                                total_plans = total_plans,
698                                                "Iceberg plan runner rejected - queue capacity exceeded"
699                                            );
700                                            // Stop enqueuing remaining plans
701                                            break;
702                                        },
703                                        PushResult::RejectedTooLarge => {
704                                            tracing::error!(
705                                                task_id = task_id,
706                                                required_parallelism = required_parallelism,
707                                                max_parallelism = max_task_parallelism,
708                                                "Iceberg plan runner rejected - parallelism exceeds max"
709                                            );
710                                        },
711                                        PushResult::RejectedInvalidParallelism => {
712                                            tracing::error!(
713                                                task_id = task_id,
714                                                required_parallelism = required_parallelism,
715                                                "Iceberg plan runner rejected - invalid parallelism"
716                                            );
717                                        },
718                                        PushResult::RejectedDuplicate => {
719                                            tracing::error!(
720                                                task_id = task_id,
721                                                plan_index = plan_index,
722                                                "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
723                                            );
724                                        }
725                                    }
726                                }
727
728                                if enqueued_count == 0 {
729                                    let report = build_iceberg_task_report(
730                                        task_id,
731                                        sink_id,
732                                        Some("Failed to enqueue all iceberg compaction plans".to_owned()),
733                                    );
734                                    if matches!(
735                                        send_or_buffer_iceberg_task_report(
736                                            &request_sender,
737                                            &mut pending_task_reports,
738                                            report,
739                                        ),
740                                        ReportSendResult::RestartStream
741                                    ) {
742                                        continue 'start_stream;
743                                    }
744                                } else {
745                                    task_trackers.insert(
746                                        task_id,
747                                        IcebergTaskTracker::new(sink_id, enqueued_count),
748                                    );
749                                }
750
751                                tracing::info!(
752                                    task_id = task_id,
753                                    sink_id = sink_id,
754                                    total_plans = total_plans,
755                                    enqueued_count = enqueued_count,
756                                    "Enqueued {} of {} Iceberg plan runners",
757                                    enqueued_count,
758                                    total_plans
759                                );
760                            },
761                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
762                                // set flag
763                                pull_task_ack = true;
764                            },
765                            risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
766                                cancel_iceberg_task(
767                                    cancel_compact_task.task_id,
768                                    &mut task_queue,
769                                    &shutdown_map,
770                                    &mut task_trackers,
771                                );
772                            },
773                        }
774                    }
775                    Some(Err(e)) => {
776                        tracing::warn!("Failed to consume stream. {}", e.message());
777                        continue 'start_stream;
778                    }
779                    _ => {
780                        // The stream is exhausted
781                        continue 'start_stream;
782                    }
783                }
784            }
785        }
786    });
787
788    (join_handle, shutdown_tx)
789}
790
791/// The background compaction thread that receives compaction tasks from hummock compaction
792/// manager and runs compaction tasks.
793#[must_use]
794pub fn start_compactor(
795    compactor_context: CompactorContext,
796    hummock_meta_client: Arc<dyn HummockMetaClient>,
797    object_id_manager: Arc<ObjectIdManager>,
798    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
799) -> (JoinHandle<()>, Sender<()>) {
800    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
801    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
802    let stream_retry_interval = Duration::from_secs(30);
803    let task_progress = compactor_context.task_progress_manager.clone();
804    let periodic_event_update_interval = Duration::from_millis(1000);
805
806    let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
807        * compactor_context.storage_opts.compactor_max_task_multiplier)
808        .ceil() as u32;
809    let running_task_parallelism = Arc::new(AtomicU32::new(0));
810
811    const MAX_PULL_TASK_COUNT: u32 = 4;
812    let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
813
814    assert_ge!(
815        compactor_context.storage_opts.compactor_max_task_multiplier,
816        0.0
817    );
818
819    let join_handle = tokio::spawn(async move {
820        let shutdown_map = CompactionShutdownMap::default();
821        let mut min_interval = tokio::time::interval(stream_retry_interval);
822        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
823
824        // Track last logged state to avoid duplicate logs
825        let mut log_throttler =
826            LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
827
828        // This outer loop is to recreate stream.
829        'start_stream: loop {
830            // reset state
831            // pull_task_ack.store(true, Ordering::SeqCst);
832            let mut pull_task_ack = true;
833            tokio::select! {
834                // Wait for interval.
835                _ = min_interval.tick() => {},
836                // Shutdown compactor.
837                _ = &mut shutdown_rx => {
838                    tracing::info!("Compactor is shutting down");
839                    return;
840                }
841            }
842
843            let (request_sender, response_event_stream) =
844                match hummock_meta_client.subscribe_compaction_event().await {
845                    Ok((request_sender, response_event_stream)) => {
846                        tracing::debug!("Succeeded subscribe_compaction_event.");
847                        (request_sender, response_event_stream)
848                    }
849
850                    Err(e) => {
851                        tracing::warn!(
852                            error = %e.as_report(),
853                            "Subscribing to compaction tasks failed with error. Will retry.",
854                        );
855                        continue 'start_stream;
856                    }
857                };
858
859            pin_mut!(response_event_stream);
860
861            let executor = compactor_context.compaction_executor.clone();
862            let object_id_manager = object_id_manager.clone();
863
864            // This inner loop is to consume stream or report task progress.
865            let mut event_loop_iteration_now = Instant::now();
866            'consume_stream: loop {
867                {
868                    // report
869                    compactor_context
870                        .compactor_metrics
871                        .compaction_event_loop_iteration_latency
872                        .observe(event_loop_iteration_now.elapsed().as_millis() as _);
873                    event_loop_iteration_now = Instant::now();
874                }
875
876                let running_task_parallelism = running_task_parallelism.clone();
877                let request_sender = request_sender.clone();
878                let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
879                    _ = periodic_event_interval.tick() => {
880                        let progress_list = get_task_progress(task_progress.clone());
881
882                        if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
883                            event: Some(RequestEvent::HeartBeat(
884                                HeartBeat {
885                                    progress: progress_list
886                                }
887                            )),
888                            create_at: SystemTime::now()
889                                .duration_since(std::time::UNIX_EPOCH)
890                                .expect("Clock may have gone backwards")
891                                .as_millis() as u64,
892                        }) {
893                            tracing::warn!(error = %e.as_report(), "Failed to report task progress");
894                            // re subscribe stream
895                            continue 'start_stream;
896                        }
897
898
899                        let mut pending_pull_task_count = 0;
900                        if pull_task_ack {
901                            // TODO: Compute parallelism on meta side
902                            pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
903
904                            if pending_pull_task_count > 0 {
905                                if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
906                                    event: Some(RequestEvent::PullTask(
907                                        PullTask {
908                                            pull_task_count: pending_pull_task_count,
909                                        }
910                                    )),
911                                    create_at: SystemTime::now()
912                                        .duration_since(std::time::UNIX_EPOCH)
913                                        .expect("Clock may have gone backwards")
914                                        .as_millis() as u64,
915                                }) {
916                                    tracing::warn!(error = %e.as_report(), "Failed to pull task");
917
918                                    // re subscribe stream
919                                    continue 'start_stream;
920                                } else {
921                                    pull_task_ack = false;
922                                }
923                            }
924                        }
925
926                        let running_count = running_task_parallelism.load(Ordering::SeqCst);
927                        let current_state = CompactionLogState {
928                            running_parallelism: running_count,
929                            pull_task_ack,
930                            pending_pull_task_count,
931                        };
932
933                        // Log only when state changes or periodically as heartbeat
934                        if log_throttler.should_log(&current_state) {
935                            tracing::info!(
936                                running_parallelism_count = %current_state.running_parallelism,
937                                pull_task_ack = %current_state.pull_task_ack,
938                                pending_pull_task_count = %current_state.pending_pull_task_count
939                            );
940                            log_throttler.update(current_state);
941                        }
942
943                        continue;
944                    }
945                    event = response_event_stream.next() => {
946                        event
947                    }
948
949                    _ = &mut shutdown_rx => {
950                        tracing::info!("Compactor is shutting down");
951                        return
952                    }
953                };
954
955                fn send_report_task_event(
956                    compact_task: &CompactTask,
957                    table_stats: TableStatsMap,
958                    object_timestamps: HashMap<HummockSstableObjectId, u64>,
959                    request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
960                ) {
961                    if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
962                        event: Some(RequestEvent::ReportTask(ReportTask {
963                            task_id: compact_task.task_id,
964                            task_status: compact_task.task_status.into(),
965                            sorted_output_ssts: compact_task
966                                .sorted_output_ssts
967                                .iter()
968                                .map(|sst| sst.into())
969                                .collect(),
970                            table_stats_change: to_prost_table_stats_map(table_stats),
971                            object_timestamps,
972                        })),
973                        create_at: SystemTime::now()
974                            .duration_since(std::time::UNIX_EPOCH)
975                            .expect("Clock may have gone backwards")
976                            .as_millis() as u64,
977                    }) {
978                        let task_id = compact_task.task_id;
979                        tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
980                    }
981                }
982
983                match event {
984                    Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
985                        let event = match event {
986                            Some(event) => event,
987                            None => continue 'consume_stream,
988                        };
989                        let shutdown = shutdown_map.clone();
990                        let context = compactor_context.clone();
991                        let consumed_latency_ms = SystemTime::now()
992                            .duration_since(std::time::UNIX_EPOCH)
993                            .expect("Clock may have gone backwards")
994                            .as_millis() as u64
995                            - create_at;
996                        context
997                            .compactor_metrics
998                            .compaction_event_consumed_latency
999                            .observe(consumed_latency_ms as _);
1000
1001                        let object_id_manager = object_id_manager.clone();
1002                        let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1003
1004                        match event {
1005                            ResponseEvent::CompactTask(compact_task) => {
1006                                let compact_task = CompactTask::from(compact_task);
1007                                let parallelism =
1008                                    calculate_task_parallelism(&compact_task, &context);
1009
1010                                assert_ne!(parallelism, 0, "splits cannot be empty");
1011
1012                                if (max_task_parallelism
1013                                    - running_task_parallelism.load(Ordering::SeqCst))
1014                                    < parallelism as u32
1015                                {
1016                                    tracing::warn!(
1017                                        "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1018                                        compact_task.task_id,
1019                                        parallelism,
1020                                        max_task_parallelism,
1021                                        running_task_parallelism.load(Ordering::Relaxed),
1022                                    );
1023                                    let (compact_task, table_stats, object_timestamps) =
1024                                        compact_done(
1025                                            compact_task,
1026                                            context.clone(),
1027                                            vec![],
1028                                            TaskStatus::NoAvailCpuResourceCanceled,
1029                                        );
1030
1031                                    send_report_task_event(
1032                                        &compact_task,
1033                                        table_stats,
1034                                        object_timestamps,
1035                                        &request_sender,
1036                                    );
1037
1038                                    continue 'consume_stream;
1039                                }
1040
1041                                running_task_parallelism
1042                                    .fetch_add(parallelism as u32, Ordering::SeqCst);
1043                                executor.spawn(async move {
1044                                    let (tx, rx) = tokio::sync::oneshot::channel();
1045                                    let task_id = compact_task.task_id;
1046                                    shutdown.lock().unwrap().insert(task_id, tx);
1047
1048                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1049                                        context.clone(),
1050                                        compact_task,
1051                                        rx,
1052                                        object_id_manager.clone(),
1053                                        compaction_catalog_manager_ref.clone(),
1054                                    )
1055                                    .await;
1056
1057                                    shutdown.lock().unwrap().remove(&task_id);
1058                                    running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1059
1060                                    send_report_task_event(
1061                                        &compact_task,
1062                                        table_stats,
1063                                        object_timestamps,
1064                                        &request_sender,
1065                                    );
1066
1067                                    let enable_check_compaction_result =
1068                                    context.storage_opts.check_compaction_result;
1069                                    let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1070
1071                                    if enable_check_compaction_result && need_check_task {
1072                                        let read_table_ids = compact_task
1073                                            .get_table_ids_from_input_ssts()
1074                                            .collect::<Vec<_>>();
1075                                        match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1076                                            Ok(compaction_catalog_agent_ref) =>  {
1077                                                match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1078                                                {
1079                                                    Err(e) => {
1080                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1081                                                    }
1082                                                    Ok(true) => (),
1083                                                    Ok(false) => {
1084                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1085                                                    }
1086                                                }
1087                                            },
1088                                            Err(e) => {
1089                                                tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1090                                            }
1091                                        }
1092                                    }
1093                                });
1094                            }
1095                            #[expect(deprecated)]
1096                            ResponseEvent::VacuumTask(_) => {
1097                                unreachable!("unexpected vacuum task");
1098                            }
1099                            #[expect(deprecated)]
1100                            ResponseEvent::FullScanTask(_) => {
1101                                unreachable!("unexpected scan task");
1102                            }
1103                            #[expect(deprecated)]
1104                            ResponseEvent::ValidationTask(validation_task) => {
1105                                let validation_task = ValidationTask::from(validation_task);
1106                                executor.spawn(async move {
1107                                    validate_ssts(validation_task, context.sstable_store.clone())
1108                                        .await;
1109                                });
1110                            }
1111                            ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1112                                .lock()
1113                                .unwrap()
1114                                .remove(&cancel_compact_task.task_id)
1115                            {
1116                                Some(tx) => {
1117                                    if tx.send(()).is_err() {
1118                                        tracing::warn!(
1119                                            "Cancellation of compaction task failed. task_id: {}",
1120                                            cancel_compact_task.task_id
1121                                        );
1122                                    }
1123                                }
1124                                _ => {
1125                                    tracing::warn!(
1126                                        "Attempting to cancel non-existent compaction task. task_id: {}",
1127                                        cancel_compact_task.task_id
1128                                    );
1129                                }
1130                            },
1131
1132                            ResponseEvent::PullTaskAck(_pull_task_ack) => {
1133                                // set flag
1134                                pull_task_ack = true;
1135                            }
1136                        }
1137                    }
1138                    Some(Err(e)) => {
1139                        tracing::warn!("Failed to consume stream. {}", e.message());
1140                        continue 'start_stream;
1141                    }
1142                    _ => {
1143                        // The stream is exhausted
1144                        continue 'start_stream;
1145                    }
1146                }
1147            }
1148        }
1149    });
1150
1151    (join_handle, shutdown_tx)
1152}
1153
1154/// The background compaction thread that receives compaction tasks from hummock compaction
1155/// manager and runs compaction tasks.
1156#[must_use]
1157pub fn start_shared_compactor(
1158    grpc_proxy_client: GrpcCompactorProxyClient,
1159    mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1160    context: CompactorContext,
1161) -> (JoinHandle<()>, Sender<()>) {
1162    type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1163    let task_progress = context.task_progress_manager.clone();
1164    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1165    let periodic_event_update_interval = Duration::from_millis(1000);
1166
1167    let join_handle = tokio::spawn(async move {
1168        let shutdown_map = CompactionShutdownMap::default();
1169
1170        let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1171        let executor = context.compaction_executor.clone();
1172        let report_heartbeat_client = grpc_proxy_client.clone();
1173        'consume_stream: loop {
1174            let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1175                _ = periodic_event_interval.tick() => {
1176                    let progress_list = get_task_progress(task_progress.clone());
1177                    let report_compaction_task_request = ReportCompactionTaskRequest{
1178                        event: Some(ReportCompactionTaskEvent::HeartBeat(
1179                            SharedHeartBeat {
1180                                progress: progress_list
1181                            }
1182                        )),
1183                     };
1184                    if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1185                        tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1186                    }
1187                    continue
1188                }
1189
1190
1191                _ = &mut shutdown_rx => {
1192                    tracing::info!("Compactor is shutting down");
1193                    return
1194                }
1195
1196                request = receiver.recv() => {
1197                    request
1198                }
1199
1200            };
1201            match request {
1202                Some(request) => {
1203                    let context = context.clone();
1204                    let shutdown = shutdown_map.clone();
1205
1206                    let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1207                    executor.spawn(async move {
1208                        let DispatchCompactionTaskRequest {
1209                            tables,
1210                            output_object_ids,
1211                            task: dispatch_task,
1212                        } = request.into_inner();
1213                        let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1214                            acc.insert(table.id, table);
1215                            acc
1216                        });
1217
1218                        let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1219                        output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1220                        let shared_compactor_object_id_manager =
1221                            SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1222                            match dispatch_task.unwrap() {
1223                                dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1224                                    let compact_task = CompactTask::from(&compact_task);
1225                                    let (tx, rx) = tokio::sync::oneshot::channel();
1226                                    let task_id = compact_task.task_id;
1227                                    shutdown.lock().unwrap().insert(task_id, tx);
1228
1229                                    let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1230                                    let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1231                                        context.clone(),
1232                                        compact_task,
1233                                        rx,
1234                                        shared_compactor_object_id_manager,
1235                                        compaction_catalog_agent_ref.clone(),
1236                                    )
1237                                    .await;
1238                                    shutdown.lock().unwrap().remove(&task_id);
1239                                    let report_compaction_task_request = ReportCompactionTaskRequest {
1240                                        event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1241                                            compact_task: Some(PbCompactTask::from(&compact_task)),
1242                                            table_stats_change: to_prost_table_stats_map(table_stats),
1243                                            object_timestamps,
1244                                    })),
1245                                    };
1246
1247                                    match cloned_grpc_proxy_client
1248                                        .report_compaction_task(report_compaction_task_request)
1249                                        .await
1250                                    {
1251                                        Ok(_) => {
1252                                            // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.
1253                                            let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1254                                            let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1255                                            if enable_check_compaction_result && need_check_task {
1256                                                match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1257                                                    Err(e) => {
1258                                                        tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1259                                                    },
1260                                                    Ok(true) => (),
1261                                                    Ok(false) => {
1262                                                        panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1263                                                    }
1264                                                }
1265                                            }
1266                                        }
1267                                        Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1268                                    }
1269
1270                                }
1271                                dispatch_compaction_task_request::Task::VacuumTask(_) => {
1272                                    unreachable!("unexpected vacuum task");
1273                                }
1274                                dispatch_compaction_task_request::Task::FullScanTask(_) => {
1275                                    unreachable!("unexpected scan task");
1276                                }
1277                                dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1278                                    let validation_task = ValidationTask::from(validation_task);
1279                                    validate_ssts(validation_task, context.sstable_store.clone()).await;
1280                                }
1281                                dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1282                                    match shutdown
1283                                        .lock()
1284                                        .unwrap()
1285                                        .remove(&cancel_compact_task.task_id)
1286                                    { Some(tx) => {
1287                                        if tx.send(()).is_err() {
1288                                            tracing::warn!(
1289                                                "Cancellation of compaction task failed. task_id: {}",
1290                                                cancel_compact_task.task_id
1291                                            );
1292                                        }
1293                                    } _ => {
1294                                        tracing::warn!(
1295                                            "Attempting to cancel non-existent compaction task. task_id: {}",
1296                                            cancel_compact_task.task_id
1297                                        );
1298                                    }}
1299                                }
1300                            }
1301                    });
1302                }
1303                None => continue 'consume_stream,
1304            }
1305        }
1306    });
1307    (join_handle, shutdown_tx)
1308}
1309
1310fn get_task_progress(
1311    task_progress: Arc<
1312        parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1313    >,
1314) -> Vec<CompactTaskProgress> {
1315    let mut progress_list = Vec::new();
1316    for (&task_id, progress) in &*task_progress.lock() {
1317        progress_list.push(progress.snapshot(task_id));
1318    }
1319    progress_list
1320}
1321
1322/// Schedule queued tasks if we have capacity
1323fn schedule_queued_tasks(
1324    task_queue: &mut IcebergTaskQueue,
1325    compactor_context: &CompactorContext,
1326    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1327    task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1328) {
1329    while let Some(popped_task) = task_queue.pop() {
1330        let task_id = popped_task.meta.task_id;
1331        let plan_index = popped_task.meta.plan_index;
1332        let task_key = (task_id, plan_index);
1333
1334        // Get unique_ident before moving runner
1335        let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1336
1337        let Some(runner) = popped_task.runner else {
1338            tracing::error!(
1339                task_id = task_id,
1340                plan_index = plan_index,
1341                "Popped task missing runner - this should not happen"
1342            );
1343            task_queue.finish_running(task_key);
1344            continue;
1345        };
1346
1347        let executor = compactor_context.compaction_executor.clone();
1348        let shutdown_map_clone = shutdown_map.clone();
1349        let completion_tx_clone = task_completion_tx.clone();
1350        let (tx, rx) = tokio::sync::oneshot::channel();
1351
1352        {
1353            let mut shutdown_guard = shutdown_map.lock().unwrap();
1354            shutdown_guard.insert(task_key, tx);
1355        }
1356
1357        tracing::info!(
1358            task_id = task_id,
1359            plan_index = plan_index,
1360            unique_ident = ?unique_ident,
1361            required_parallelism = popped_task.meta.required_parallelism,
1362            "Starting iceberg compaction task from queue"
1363        );
1364
1365        executor.spawn(async move {
1366            let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1367                let mut shutdown_guard = shutdown_map.lock().unwrap();
1368                shutdown_guard.remove(&task_key);
1369            });
1370
1371            let result = Box::pin(runner.compact(rx)).await;
1372
1373            let completion = match result {
1374                Ok(_) => IcebergPlanCompletion {
1375                    task_key,
1376                    error_message: None,
1377                },
1378                Err(e) => {
1379                    if is_cancelled_iceberg_compaction_error(&e) {
1380                        tracing::info!(
1381                            task_id = task_key.0,
1382                            plan_index = task_key.1,
1383                            "Iceberg compaction plan cancelled"
1384                        );
1385                    } else {
1386                        tracing::warn!(
1387                            error = %e.as_report(),
1388                            task_id = task_key.0,
1389                            plan_index = task_key.1,
1390                            "Failed to compact iceberg runner"
1391                        );
1392                    }
1393                    IcebergPlanCompletion {
1394                        task_key,
1395                        error_message: Some(e.to_report_string()),
1396                    }
1397                }
1398            };
1399
1400            if completion_tx_clone.send(completion).is_err() {
1401                tracing::warn!(
1402                    task_id = task_key.0,
1403                    plan_index = task_key.1,
1404                    "Failed to notify task completion - main loop may have shut down"
1405                );
1406            }
1407        });
1408    }
1409}
1410
1411fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1412    matches!(
1413        error.inner(),
1414        HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1415    )
1416}
1417
1418fn cancel_iceberg_task(
1419    task_id: u64,
1420    task_queue: &mut IcebergTaskQueue,
1421    shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1422    task_trackers: &mut HashMap<u64, IcebergTaskTracker>,
1423) {
1424    // Meta assigns one task id to an Iceberg compact task, but the compactor
1425    // splits it into multiple plan runners tracked by `(task_id, plan_index)`.
1426    // A cancel event only carries `task_id`, so it must cancel all waiting and
1427    // running plan runners that belong to that task.
1428    let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1429    let removed_tracker = task_trackers.remove(&task_id).is_some();
1430
1431    let cancelled_running = {
1432        let mut shutdown_guard = shutdown_map.lock().unwrap();
1433        let task_keys: Vec<_> = shutdown_guard
1434            .keys()
1435            .filter(|(running_task_id, _)| *running_task_id == task_id)
1436            .copied()
1437            .collect();
1438
1439        for task_key in &task_keys {
1440            if let Some(tx) = shutdown_guard.remove(task_key)
1441                && tx.send(()).is_err()
1442            {
1443                tracing::debug!(
1444                    task_id = task_key.0,
1445                    plan_index = task_key.1,
1446                    "Iceberg compaction plan shutdown receiver already closed during cancellation"
1447                );
1448            }
1449        }
1450
1451        task_keys.len()
1452    };
1453
1454    if cancelled_waiting == 0 && cancelled_running == 0 && !removed_tracker {
1455        tracing::warn!(
1456            task_id = task_id,
1457            "Attempting to cancel non-existent iceberg compaction task"
1458        );
1459    } else {
1460        tracing::info!(
1461            task_id = task_id,
1462            cancelled_waiting = cancelled_waiting,
1463            cancelled_running = cancelled_running,
1464            removed_tracker = removed_tracker,
1465            "Cancelled iceberg compaction task"
1466        );
1467    }
1468}
1469
1470/// Handle pulling new tasks from meta service
1471/// Returns true if the stream should be restarted
1472fn handle_meta_task_pulling(
1473    pull_task_ack: &mut bool,
1474    task_queue: &IcebergTaskQueue,
1475    max_task_parallelism: u32,
1476    max_pull_task_count: u32,
1477    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1478    log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1479) -> bool {
1480    let mut pending_pull_task_count = 0;
1481    if *pull_task_ack {
1482        // Use queue's running parallelism for pull decision
1483        let current_running_parallelism = task_queue.running_parallelism_sum();
1484        pending_pull_task_count =
1485            (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1486
1487        if pending_pull_task_count > 0 {
1488            if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1489                event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1490                    subscribe_iceberg_compaction_event_request::PullTask {
1491                        pull_task_count: pending_pull_task_count,
1492                    },
1493                )),
1494                create_at: SystemTime::now()
1495                    .duration_since(std::time::UNIX_EPOCH)
1496                    .expect("Clock may have gone backwards")
1497                    .as_millis() as u64,
1498            }) {
1499                tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1500                return true; // Signal to restart stream
1501            } else {
1502                *pull_task_ack = false;
1503            }
1504        }
1505    }
1506
1507    let running_count = task_queue.running_parallelism_sum();
1508    let waiting_count = task_queue.waiting_parallelism_sum();
1509    let available_count = max_task_parallelism.saturating_sub(running_count);
1510    let current_state = IcebergCompactionLogState {
1511        running_parallelism: running_count,
1512        waiting_parallelism: waiting_count,
1513        available_parallelism: available_count,
1514        pull_task_ack: *pull_task_ack,
1515        pending_pull_task_count,
1516    };
1517
1518    // Log only when state changes or periodically as heartbeat
1519    if log_throttler.should_log(&current_state) {
1520        tracing::info!(
1521            running_parallelism_count = %current_state.running_parallelism,
1522            waiting_parallelism_count = %current_state.waiting_parallelism,
1523            available_parallelism = %current_state.available_parallelism,
1524            pull_task_ack = %current_state.pull_task_ack,
1525            pending_pull_task_count = %current_state.pending_pull_task_count
1526        );
1527        log_throttler.update(current_state);
1528    }
1529
1530    false // No need to restart stream
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535    use super::*;
1536
1537    #[test]
1538    fn test_cancel_iceberg_task_removes_waiting_plans_and_tracker() {
1539        let task_id = 42;
1540        let mut task_queue = IcebergTaskQueue::new(10, 30);
1541        assert_eq!(
1542            task_queue.push(
1543                iceberg_compaction::IcebergTaskMeta {
1544                    task_id,
1545                    plan_index: 0,
1546                    required_parallelism: 3,
1547                },
1548                None,
1549            ),
1550            PushResult::Added
1551        );
1552        assert_eq!(
1553            task_queue.push(
1554                iceberg_compaction::IcebergTaskMeta {
1555                    task_id,
1556                    plan_index: 1,
1557                    required_parallelism: 4,
1558                },
1559                None,
1560            ),
1561            PushResult::Added
1562        );
1563        assert_eq!(task_queue.waiting_parallelism_sum(), 7);
1564
1565        let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1566        let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 2))]);
1567
1568        cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1569
1570        assert_eq!(task_queue.waiting_parallelism_sum(), 0);
1571        assert!(!task_trackers.contains_key(&task_id));
1572    }
1573
1574    #[test]
1575    fn test_cancel_iceberg_task_shuts_down_running_plans_and_tracker() {
1576        let task_id = 43;
1577        let task_key = (task_id, 0);
1578        let mut task_queue = IcebergTaskQueue::new(10, 30);
1579        let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1580        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1581        shutdown_map.lock().unwrap().insert(task_key, shutdown_tx);
1582        let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 1))]);
1583
1584        cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1585
1586        assert!(shutdown_rx.try_recv().is_ok());
1587        assert!(shutdown_map.lock().unwrap().is_empty());
1588        assert!(!task_trackers.contains_key(&task_id));
1589    }
1590
1591    #[test]
1592    fn test_log_state_equality() {
1593        // Test CompactionLogState
1594        let state1 = CompactionLogState {
1595            running_parallelism: 10,
1596            pull_task_ack: true,
1597            pending_pull_task_count: 2,
1598        };
1599        let state2 = CompactionLogState {
1600            running_parallelism: 10,
1601            pull_task_ack: true,
1602            pending_pull_task_count: 2,
1603        };
1604        let state3 = CompactionLogState {
1605            running_parallelism: 11,
1606            pull_task_ack: true,
1607            pending_pull_task_count: 2,
1608        };
1609        assert_eq!(state1, state2);
1610        assert_ne!(state1, state3);
1611
1612        // Test IcebergCompactionLogState
1613        let ice_state1 = IcebergCompactionLogState {
1614            running_parallelism: 10,
1615            waiting_parallelism: 5,
1616            available_parallelism: 15,
1617            pull_task_ack: true,
1618            pending_pull_task_count: 2,
1619        };
1620        let ice_state2 = IcebergCompactionLogState {
1621            running_parallelism: 10,
1622            waiting_parallelism: 6,
1623            available_parallelism: 15,
1624            pull_task_ack: true,
1625            pending_pull_task_count: 2,
1626        };
1627        assert_ne!(ice_state1, ice_state2);
1628    }
1629
1630    #[test]
1631    fn test_log_throttler_state_change_detection() {
1632        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1633        let state1 = CompactionLogState {
1634            running_parallelism: 10,
1635            pull_task_ack: true,
1636            pending_pull_task_count: 2,
1637        };
1638        let state2 = CompactionLogState {
1639            running_parallelism: 11,
1640            pull_task_ack: true,
1641            pending_pull_task_count: 2,
1642        };
1643
1644        // First call should always log
1645        assert!(throttler.should_log(&state1));
1646        throttler.update(state1.clone());
1647
1648        // Same state should not log
1649        assert!(!throttler.should_log(&state1));
1650
1651        // Changed state should log
1652        assert!(throttler.should_log(&state2));
1653        throttler.update(state2.clone());
1654
1655        // Same state again should not log
1656        assert!(!throttler.should_log(&state2));
1657    }
1658
1659    #[test]
1660    fn test_log_throttler_heartbeat() {
1661        let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1662        let state = CompactionLogState {
1663            running_parallelism: 10,
1664            pull_task_ack: true,
1665            pending_pull_task_count: 2,
1666        };
1667
1668        // First call should log
1669        assert!(throttler.should_log(&state));
1670        throttler.update(state.clone());
1671
1672        // Same state immediately should not log
1673        assert!(!throttler.should_log(&state));
1674
1675        // Wait for heartbeat interval to pass
1676        std::thread::sleep(Duration::from_millis(15));
1677
1678        // Same state after interval should log (heartbeat)
1679        assert!(throttler.should_log(&state));
1680    }
1681}