risingwave_meta/manager/iceberg_compaction/
schedule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_connector::connector_common::{
22    IcebergCommittedSnapshot, IcebergSinkCompactionUpdate,
23};
24use risingwave_connector::sink::SinkParam;
25use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
26use risingwave_connector::sink::iceberg::{
27    CompactionType, IcebergConfig, should_enable_iceberg_cow,
28};
29use risingwave_hummock_sdk::HummockContextId;
30use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
34use thiserror_ext::AsReport;
35use tokio::sync::oneshot;
36
37use super::*;
38
39/// Compaction track states using type-safe state machine pattern
40#[derive(Debug, Clone)]
41enum CompactionTrackState {
42    /// Ready to accept commits and check for trigger conditions.
43    ///
44    /// `Idle` is not an active task state. A manual request may leave a
45    /// one-shot task type override here for the next scheduler selection.
46    Idle {
47        next_compaction_time: Instant,
48        /// `None` uses the configured track task type; `Some` is consumed when
49        /// the next task enters `PendingDispatch`.
50        next_task_type_override: Option<TaskType>,
51    },
52    /// Task has been selected locally but not yet accepted by a compactor.
53    PendingDispatch {
54        task_type: TaskType,
55        next_compaction_time_on_failure: Instant,
56        pending_commit_count_at_dispatch: usize,
57        gc_watermark_snapshot: Option<IcebergCommittedSnapshot>,
58    },
59    /// Compaction task is in-flight. `report_deadline` acts as a lease; if it
60    /// expires before a report arrives, the task becomes retryable.
61    InFlight {
62        task_id: u64,
63        compactor_context_id: HummockContextId,
64        task_type: TaskType,
65        pending_commit_count_at_dispatch: usize,
66        report_deadline: Instant,
67        gc_watermark_snapshot: Option<IcebergCommittedSnapshot>,
68    },
69}
70
71#[derive(Debug, Clone, Copy)]
72struct ScheduledCompactionTask {
73    task_id: u64,
74    compactor_context_id: HummockContextId,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78enum CompactionTrackFinishAction {
79    KeepTrack,
80    RemoveTrack,
81}
82
83#[derive(Debug, Clone)]
84pub(super) struct CompactionTrack {
85    task_type: TaskType,
86    trigger_interval_sec: u64,
87    /// Minimum pending commit threshold to trigger compaction early.
88    /// Compaction triggers when `pending_commit_count` >= this threshold, even before interval expires.
89    trigger_snapshot_count: usize,
90    report_timeout: Duration,
91    last_config_refresh_at: Instant,
92    pending_commit_count: usize,
93    latest_observed_snapshot: Option<IcebergCommittedSnapshot>,
94    /// Track lifecycle policy after a selected task finishes. Manual compaction
95    /// uses `RemoveTrack` only while automatic compaction is disabled; a later
96    /// config refresh can turn the temporary track into a normal schedule track.
97    finish_action: CompactionTrackFinishAction,
98    state: CompactionTrackState,
99}
100
101impl CompactionTrack {
102    fn new(
103        task_type: TaskType,
104        trigger_interval_sec: u64,
105        trigger_snapshot_count: usize,
106        report_timeout: Duration,
107        now: Instant,
108    ) -> Self {
109        Self {
110            task_type,
111            trigger_interval_sec,
112            trigger_snapshot_count,
113            report_timeout,
114            last_config_refresh_at: now,
115            pending_commit_count: 0,
116            latest_observed_snapshot: None,
117            finish_action: CompactionTrackFinishAction::KeepTrack,
118            state: CompactionTrackState::Idle {
119                next_compaction_time: now + Duration::from_secs(trigger_interval_sec),
120                next_task_type_override: None,
121            },
122        }
123    }
124
125    /// Determines if compaction should be triggered.
126    ///
127    /// Trigger conditions (OR logic):
128    /// 1. `commit_ready` - Pending commit count >= threshold (early trigger)
129    /// 2. `time_ready && has_commits` - Interval expired and there's at least 1 pending commit
130    ///
131    /// This ensures:
132    /// - `trigger_snapshot_count` is an early trigger threshold
133    /// - `compaction_interval_sec` is the maximum wait time (as long as there are new snapshots)
134    /// - Force compaction works by setting `next_compaction_time` to now
135    /// - No empty compaction runs (requires at least 1 snapshot for time-based trigger)
136    fn should_trigger(&self, now: Instant) -> bool {
137        let next_compaction_time = match &self.state {
138            CompactionTrackState::Idle {
139                next_compaction_time,
140                ..
141            } => *next_compaction_time,
142            CompactionTrackState::PendingDispatch { .. }
143            | CompactionTrackState::InFlight { .. } => return false,
144        };
145
146        let time_ready = now >= next_compaction_time;
147        let commit_ready = self.pending_commit_count >= self.trigger_snapshot_count;
148        let has_commits = self.pending_commit_count > 0;
149
150        commit_ready || (time_ready && has_commits)
151    }
152
153    fn record_observed_snapshot(&mut self, observed_snapshot: IcebergCommittedSnapshot) {
154        self.latest_observed_snapshot = Some(observed_snapshot);
155    }
156
157    fn record_commit(&mut self) {
158        self.pending_commit_count = self.pending_commit_count.saturating_add(1);
159    }
160
161    fn record_force_compaction(&mut self, now: Instant, forced_task_type: Option<TaskType>) {
162        if let CompactionTrackState::Idle {
163            next_compaction_time,
164            next_task_type_override,
165        } = &mut self.state
166        {
167            if let Some(task_type) = forced_task_type {
168                *next_task_type_override = Some(task_type);
169            }
170            *next_compaction_time = now;
171            self.pending_commit_count = self.pending_commit_count.max(1);
172        }
173    }
174
175    fn needs_config_refresh(&self, now: Instant, refresh_interval: Duration) -> bool {
176        now.saturating_duration_since(self.last_config_refresh_at) >= refresh_interval
177    }
178
179    fn should_refresh_config(&self, now: Instant, refresh_interval: Duration) -> bool {
180        matches!(self.state, CompactionTrackState::Idle { .. })
181            && self.needs_config_refresh(now, refresh_interval)
182    }
183
184    fn mark_config_refreshed(&mut self, now: Instant) {
185        self.last_config_refresh_at = now;
186    }
187
188    fn current_task_type(&self) -> TaskType {
189        match &self.state {
190            CompactionTrackState::Idle {
191                next_task_type_override,
192                ..
193            } => next_task_type_override.unwrap_or(self.task_type),
194            CompactionTrackState::PendingDispatch { task_type, .. }
195            | CompactionTrackState::InFlight { task_type, .. } => *task_type,
196        }
197    }
198
199    fn start_processing(&mut self) -> TaskType {
200        match &mut self.state {
201            CompactionTrackState::Idle {
202                next_compaction_time,
203                next_task_type_override,
204            } => {
205                let task_type = next_task_type_override.take().unwrap_or(self.task_type);
206                self.state = CompactionTrackState::PendingDispatch {
207                    task_type,
208                    next_compaction_time_on_failure: *next_compaction_time,
209                    pending_commit_count_at_dispatch: self.pending_commit_count,
210                    gc_watermark_snapshot: self.latest_observed_snapshot.clone(),
211                };
212                task_type
213            }
214            CompactionTrackState::PendingDispatch { .. }
215            | CompactionTrackState::InFlight { .. } => {
216                unreachable!("Cannot start processing when already processing")
217            }
218        }
219    }
220
221    fn mark_dispatched(
222        &mut self,
223        task_id: u64,
224        compactor_context_id: HummockContextId,
225        now: Instant,
226    ) {
227        let (task_type, pending_commit_count_at_dispatch, gc_watermark_snapshot) = match &mut self
228            .state
229        {
230            CompactionTrackState::PendingDispatch {
231                task_type,
232                pending_commit_count_at_dispatch,
233                gc_watermark_snapshot,
234                ..
235            } => {
236                let gc_watermark_snapshot = gc_watermark_snapshot.take();
237                (
238                    *task_type,
239                    *pending_commit_count_at_dispatch,
240                    gc_watermark_snapshot,
241                )
242            }
243            CompactionTrackState::Idle { .. } => unreachable!("Cannot mark dispatched when idle"),
244            CompactionTrackState::InFlight { .. } => {
245                unreachable!("Cannot mark dispatched when already in flight")
246            }
247        };
248        self.state = CompactionTrackState::InFlight {
249            task_id,
250            compactor_context_id,
251            task_type,
252            pending_commit_count_at_dispatch,
253            report_deadline: now + self.report_timeout,
254            gc_watermark_snapshot,
255        };
256    }
257
258    pub(super) fn processing_gc_watermark_snapshot(
259        &self,
260    ) -> Option<Option<&IcebergCommittedSnapshot>> {
261        match &self.state {
262            CompactionTrackState::PendingDispatch {
263                gc_watermark_snapshot,
264                ..
265            }
266            | CompactionTrackState::InFlight {
267                gc_watermark_snapshot,
268                ..
269            } => Some(gc_watermark_snapshot.as_ref()),
270            CompactionTrackState::Idle { .. } => None,
271        }
272    }
273
274    fn is_pending_dispatch(&self) -> bool {
275        matches!(self.state, CompactionTrackState::PendingDispatch { .. })
276    }
277
278    fn removes_track_after_finish(&self) -> bool {
279        self.finish_action == CompactionTrackFinishAction::RemoveTrack
280    }
281
282    pub(super) fn is_processing_task(&self, task_id: u64) -> bool {
283        matches!(
284            &self.state,
285            CompactionTrackState::InFlight {
286                task_id: current_task_id,
287                ..
288            } if *current_task_id == task_id
289        )
290    }
291
292    fn scheduled_task(&self) -> Option<ScheduledCompactionTask> {
293        match &self.state {
294            CompactionTrackState::InFlight {
295                task_id,
296                compactor_context_id,
297                ..
298            } => Some(ScheduledCompactionTask {
299                task_id: *task_id,
300                compactor_context_id: *compactor_context_id,
301            }),
302            CompactionTrackState::Idle { .. } | CompactionTrackState::PendingDispatch { .. } => {
303                None
304            }
305        }
306    }
307
308    fn is_report_timed_out(&self, now: Instant) -> bool {
309        matches!(
310            &self.state,
311            CompactionTrackState::InFlight {
312                report_deadline,
313                ..
314            } if now >= *report_deadline
315        )
316    }
317
318    fn finish_success(&mut self, now: Instant) -> CompactionTrackFinishAction {
319        match &self.state {
320            CompactionTrackState::InFlight {
321                pending_commit_count_at_dispatch,
322                ..
323            } => {
324                self.pending_commit_count = self
325                    .pending_commit_count
326                    .saturating_sub(*pending_commit_count_at_dispatch);
327                self.state = CompactionTrackState::Idle {
328                    next_compaction_time: now + Duration::from_secs(self.trigger_interval_sec),
329                    next_task_type_override: None,
330                };
331                self.finish_action
332            }
333            CompactionTrackState::Idle { .. } => unreachable!("Cannot finish success when idle"),
334            CompactionTrackState::PendingDispatch { .. } => {
335                unreachable!("Cannot finish success before task dispatch")
336            }
337        }
338    }
339
340    fn finish_failed(&mut self, now: Instant) -> CompactionTrackFinishAction {
341        match &self.state {
342            CompactionTrackState::InFlight { .. } => {
343                self.state = CompactionTrackState::Idle {
344                    next_compaction_time: now,
345                    next_task_type_override: None,
346                };
347                self.finish_action
348            }
349            CompactionTrackState::Idle { .. } => unreachable!("Cannot finish failed when idle"),
350            CompactionTrackState::PendingDispatch { .. } => {
351                unreachable!("Cannot finish failed before task dispatch")
352            }
353        }
354    }
355
356    /// Restore the idle scheduling state after a pre-dispatch failure.
357    ///
358    /// `pending_commit_count` is intentionally preserved so commits that arrive
359    /// while the track is pending dispatch are not lost if task dispatch fails
360    /// before the compactor accepts the task.
361    fn revert_pre_dispatch_failure(&mut self) -> CompactionTrackFinishAction {
362        match &self.state {
363            CompactionTrackState::PendingDispatch {
364                next_compaction_time_on_failure,
365                ..
366            } => {
367                self.state = CompactionTrackState::Idle {
368                    next_compaction_time: *next_compaction_time_on_failure,
369                    next_task_type_override: None,
370                };
371                self.finish_action
372            }
373            CompactionTrackState::Idle { .. } => {
374                unreachable!("Cannot revert a pre-dispatch failure when idle")
375            }
376            CompactionTrackState::InFlight { .. } => {
377                unreachable!("Cannot revert a pre-dispatch failure after dispatch")
378            }
379        }
380    }
381
382    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
383        if self.trigger_interval_sec == new_interval_sec {
384            return;
385        }
386
387        self.trigger_interval_sec = new_interval_sec;
388
389        match &mut self.state {
390            CompactionTrackState::Idle {
391                next_compaction_time,
392                ..
393            } => {
394                *next_compaction_time = now + Duration::from_secs(new_interval_sec);
395            }
396            CompactionTrackState::PendingDispatch { .. }
397            | CompactionTrackState::InFlight { .. } => {}
398        }
399    }
400}
401
402pub(crate) struct IcebergCompactionHandle {
403    sink_id: SinkId,
404    task_type: TaskType,
405    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
406    metadata_manager: MetadataManager,
407    handle_success: bool,
408}
409
410impl IcebergCompactionHandle {
411    fn new(
412        sink_id: SinkId,
413        task_type: TaskType,
414        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
415        metadata_manager: MetadataManager,
416    ) -> Self {
417        Self {
418            sink_id,
419            task_type,
420            inner,
421            metadata_manager,
422            handle_success: false,
423        }
424    }
425
426    pub async fn send_compact_task(
427        mut self,
428        compactor: Arc<crate::hummock::IcebergCompactor>,
429        task_id: u64,
430    ) -> MetaResult<()> {
431        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
432
433        let Some(prost_sink_catalog) = self
434            .metadata_manager
435            .catalog_controller
436            .get_sink_by_id(self.sink_id)
437            .await?
438        else {
439            tracing::warn!("Sink not found: {}", self.sink_id);
440            return Ok(());
441        };
442        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
443        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
444
445        let result =
446            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
447                task_id,
448                sink_id: self.sink_id.as_raw_id(),
449                props: param.properties,
450                task_type: self.task_type as i32,
451            }));
452
453        if result.is_ok() {
454            let mut should_cancel_sent_task = false;
455            let mut guard = self.inner.write();
456            let mut dispatched = false;
457            if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
458                && track.is_pending_dispatch()
459            {
460                track.mark_dispatched(task_id, compactor.context_id(), Instant::now());
461                dispatched = true;
462            }
463            self.handle_success = dispatched;
464            if !dispatched {
465                should_cancel_sent_task = true;
466                tracing::warn!(
467                    sink_id = %self.sink_id,
468                    task_id,
469                    "Iceberg compaction task send succeeded but track was no longer pending dispatch"
470                );
471            }
472            drop(guard);
473
474            if should_cancel_sent_task {
475                self.cancel_sent_task(&compactor, task_id);
476            }
477        }
478
479        result
480    }
481
482    fn cancel_sent_task(&self, compactor: &crate::hummock::IcebergCompactor, task_id: u64) {
483        if let Err(e) = compactor.cancel_task(task_id) {
484            tracing::warn!(
485                error = %e.as_report(),
486                sink_id = %self.sink_id,
487                task_id,
488                "Failed to cancel iceberg compaction task after schedule removal",
489            );
490        }
491    }
492}
493
494impl Drop for IcebergCompactionHandle {
495    fn drop(&mut self) {
496        let waiter = {
497            let mut guard = self.inner.write();
498            if !self.handle_success
499                && let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
500                && track.is_pending_dispatch()
501            {
502                let finish_action = track.revert_pre_dispatch_failure();
503                let waiter = guard.manual_compaction_waiters.remove(&self.sink_id);
504                IcebergCompactionManager::apply_track_finish_action(
505                    &mut guard,
506                    self.sink_id,
507                    finish_action,
508                );
509                waiter
510            } else {
511                None
512            }
513        };
514
515        if let Some(waiter) = waiter {
516            let _ = waiter.send(Err(anyhow!(
517                "Iceberg compaction task failed before dispatch for sink {}",
518                self.sink_id
519            )
520            .into()));
521        }
522    }
523}
524
525#[derive(Debug, Clone)]
526enum SinkUpdateKind {
527    /// A normal sink commit. It increases the pending snapshot count.
528    Commit {
529        observed_snapshot: IcebergCommittedSnapshot,
530    },
531    /// A force signal from the sink update path. It triggers the configured
532    /// compaction type and still follows the automatic-compaction config gate.
533    ForceCompaction {
534        observed_snapshot: IcebergCommittedSnapshot,
535    },
536    /// A user-triggered manual request. It can bypass disabled automatic
537    /// compaction and supplies the task type selected for this request.
538    ManualForceCompaction { task_type: TaskType },
539}
540
541impl SinkUpdateKind {
542    fn apply_to_track(self, track: &mut CompactionTrack, now: Instant) {
543        match self {
544            SinkUpdateKind::Commit { observed_snapshot } => {
545                track.record_observed_snapshot(observed_snapshot);
546                track.record_commit();
547            }
548            SinkUpdateKind::ForceCompaction { observed_snapshot } => {
549                if matches!(track.state, CompactionTrackState::Idle { .. }) {
550                    track.record_observed_snapshot(observed_snapshot);
551                }
552                track.record_force_compaction(now, None);
553            }
554            SinkUpdateKind::ManualForceCompaction { task_type } => {
555                track.record_force_compaction(now, Some(task_type))
556            }
557        }
558    }
559
560    fn allows_disabled_compaction(&self) -> bool {
561        matches!(self, SinkUpdateKind::ManualForceCompaction { .. })
562    }
563}
564
565/// Result of the read-only preparation step before applying a sink update.
566///
567/// This bundles the original update intent together with the metadata loaded
568/// across the async gap, so the apply step can consume a single object.
569///
570/// `allow_track_initialization` stays `true` only when the sink had no track
571/// before the async config load. This lets the apply step initialize a new
572/// track for first-time updates, while preventing a stale update from
573/// resurrecting a track that disappeared during the async gap.
574struct PreparedSinkUpdate {
575    sink_id: SinkId,
576    kind: SinkUpdateKind,
577    now: Instant,
578    allow_track_initialization: bool,
579    loaded_config: Option<IcebergConfig>,
580}
581
582#[derive(Debug, Clone)]
583pub struct IcebergCompactionScheduleStatus {
584    pub sink_id: SinkId,
585    pub task_type: String,
586    pub trigger_interval_sec: u64,
587    pub trigger_snapshot_count: usize,
588    pub schedule_state: String,
589    pub next_compaction_after_sec: Option<u64>,
590    pub pending_snapshot_count: Option<usize>,
591    pub is_triggerable: bool,
592}
593
594impl IcebergCompactionManager {
595    fn apply_track_finish_action(
596        guard: &mut IcebergCompactionManagerInner,
597        sink_id: SinkId,
598        finish_action: CompactionTrackFinishAction,
599    ) {
600        match finish_action {
601            CompactionTrackFinishAction::KeepTrack => {}
602            CompactionTrackFinishAction::RemoveTrack => {
603                guard.sink_schedules.remove(&sink_id);
604            }
605        }
606    }
607
608    pub(super) fn refresh_schedule_config(
609        &self,
610        track: &mut CompactionTrack,
611        iceberg_config: &IcebergConfig,
612        now: Instant,
613    ) {
614        let (task_type, trigger_interval_sec, trigger_snapshot_count) =
615            self.resolve_schedule_values(iceberg_config);
616        track.task_type = task_type;
617        track.trigger_snapshot_count = trigger_snapshot_count;
618        track.update_interval(trigger_interval_sec, now);
619        track.mark_config_refreshed(now);
620    }
621
622    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
623        let IcebergSinkCompactionUpdate {
624            sink_id,
625            force_compaction,
626            observed_snapshot,
627        } = msg;
628        let kind = if force_compaction {
629            SinkUpdateKind::ForceCompaction { observed_snapshot }
630        } else {
631            SinkUpdateKind::Commit { observed_snapshot }
632        };
633        let prepared_update = self
634            .prepare_sink_update(sink_id, kind, Instant::now())
635            .await;
636
637        let mut guard = self.inner.write();
638        self.apply_sink_update(&mut guard, prepared_update);
639    }
640
641    async fn prepare_sink_update(
642        &self,
643        sink_id: SinkId,
644        kind: SinkUpdateKind,
645        now: Instant,
646    ) -> PreparedSinkUpdate {
647        let refresh_interval = self.config_refresh_interval();
648        let (allow_track_initialization, should_refresh_config) = {
649            let guard = self.inner.read();
650            match guard.sink_schedules.get(&sink_id) {
651                Some(track) => (false, track.should_refresh_config(now, refresh_interval)),
652                None => (true, true),
653            }
654        };
655
656        let loaded_config = if should_refresh_config {
657            match self.load_iceberg_config(sink_id).await {
658                Ok(config) => Some(config),
659                Err(e) => {
660                    tracing::warn!(
661                        error = ?e.as_report(),
662                        "Failed to load iceberg config for sink {}",
663                        sink_id
664                    );
665                    None
666                }
667            }
668        } else {
669            None
670        };
671
672        PreparedSinkUpdate {
673            sink_id,
674            kind,
675            now,
676            allow_track_initialization,
677            loaded_config,
678        }
679    }
680
681    fn apply_sink_update(
682        &self,
683        guard: &mut IcebergCompactionManagerInner,
684        prepared_update: PreparedSinkUpdate,
685    ) -> bool {
686        let PreparedSinkUpdate {
687            sink_id,
688            kind,
689            now,
690            allow_track_initialization,
691            loaded_config,
692        } = prepared_update;
693        let refresh_interval = self.config_refresh_interval();
694
695        if let Some(config) = loaded_config.as_ref() {
696            if config.enable_snapshot_expiration {
697                guard.snapshot_expiration_sink_ids.insert(sink_id);
698            } else {
699                guard.snapshot_expiration_sink_ids.remove(&sink_id);
700            }
701
702            if !config.enable_compaction && !kind.allows_disabled_compaction() {
703                if !guard.sink_schedules.get(&sink_id).is_some_and(|track| {
704                    matches!(
705                        &track.state,
706                        CompactionTrackState::PendingDispatch { .. }
707                            | CompactionTrackState::InFlight { .. }
708                    ) || track.removes_track_after_finish()
709                }) {
710                    guard.sink_schedules.remove(&sink_id);
711                }
712                return false;
713            }
714        }
715
716        match guard.sink_schedules.entry(sink_id) {
717            Entry::Occupied(entry) => {
718                let track = entry.into_mut();
719                if track.removes_track_after_finish()
720                    && !kind.allows_disabled_compaction()
721                    && !loaded_config
722                        .as_ref()
723                        .is_some_and(|config| config.enable_compaction)
724                {
725                    return false;
726                }
727                if track.should_refresh_config(now, refresh_interval)
728                    && let Some(config) = loaded_config.as_ref()
729                {
730                    self.refresh_schedule_config(track, config, now);
731                }
732                if let Some(config) = loaded_config.as_ref() {
733                    track.finish_action =
734                        if kind.allows_disabled_compaction() && !config.enable_compaction {
735                            CompactionTrackFinishAction::RemoveTrack
736                        } else {
737                            CompactionTrackFinishAction::KeepTrack
738                        };
739                }
740
741                kind.apply_to_track(track, now);
742                true
743            }
744            Entry::Vacant(entry) => {
745                if !allow_track_initialization {
746                    tracing::warn!(
747                        sink_id = %sink_id,
748                        "Ignoring iceberg compaction update because track disappeared before apply"
749                    );
750                    return false;
751                }
752
753                let Some(config) = loaded_config.as_ref() else {
754                    tracing::warn!(
755                        sink_id = %sink_id,
756                        "Ignoring iceberg compaction update because sink config is unavailable"
757                    );
758                    return false;
759                };
760
761                let track = entry.insert(self.create_compaction_track(config, now));
762                track.finish_action =
763                    if kind.allows_disabled_compaction() && !config.enable_compaction {
764                        CompactionTrackFinishAction::RemoveTrack
765                    } else {
766                        CompactionTrackFinishAction::KeepTrack
767                    };
768                kind.apply_to_track(track, now);
769                true
770            }
771        }
772    }
773
774    pub(super) fn create_compaction_track(
775        &self,
776        iceberg_config: &IcebergConfig,
777        now: Instant,
778    ) -> CompactionTrack {
779        let (task_type, trigger_interval_sec, trigger_snapshot_count) =
780            self.resolve_schedule_values(iceberg_config);
781
782        CompactionTrack::new(
783            task_type,
784            trigger_interval_sec,
785            trigger_snapshot_count,
786            self.report_timeout(),
787            now,
788        )
789    }
790
791    fn resolve_schedule_values(&self, iceberg_config: &IcebergConfig) -> (TaskType, u64, usize) {
792        (
793            if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
794            {
795                TaskType::Full
796            } else {
797                match iceberg_config.compaction_type() {
798                    CompactionType::Full => TaskType::Full,
799                    CompactionType::SmallFiles => TaskType::SmallFiles,
800                    CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
801                }
802            },
803            iceberg_config.compaction_interval_sec(),
804            iceberg_config.trigger_snapshot_count(),
805        )
806    }
807
808    pub(super) async fn start_manual_compaction(
809        &self,
810        sink_id: SinkId,
811    ) -> MetaResult<oneshot::Receiver<MetaResult<u64>>> {
812        let prepared_update = self
813            .prepare_sink_update(
814                sink_id,
815                SinkUpdateKind::ManualForceCompaction {
816                    task_type: TaskType::Full,
817                },
818                Instant::now(),
819            )
820            .await;
821        let mut guard = self.inner.write();
822        let now = Instant::now();
823        if guard.manual_compaction_waiters.contains_key(&sink_id) {
824            return Err(anyhow!(
825                "manual iceberg compaction is already waiting for sink {}",
826                sink_id
827            )
828            .into());
829        }
830
831        if let Some(track) = guard.sink_schedules.get(&sink_id) {
832            match &track.state {
833                CompactionTrackState::PendingDispatch {
834                    pending_commit_count_at_dispatch,
835                    ..
836                } => {
837                    return Err(anyhow!(
838                        "iceberg compaction task is already running for sink {} \
839                         (state=pending_dispatch, pending_commit_count_at_dispatch={}, \
840                         pending_commit_count={})",
841                        sink_id,
842                        pending_commit_count_at_dispatch,
843                        track.pending_commit_count
844                    )
845                    .into());
846                }
847                CompactionTrackState::InFlight {
848                    task_id,
849                    pending_commit_count_at_dispatch,
850                    report_deadline,
851                    ..
852                } => {
853                    return Err(anyhow!(
854                        "iceberg compaction task is already running for sink {} \
855                         (state=in_flight, task_id={}, pending_commit_count_at_dispatch={}, \
856                         pending_commit_count={}, report_timeout_after_sec={})",
857                        sink_id,
858                        task_id,
859                        pending_commit_count_at_dispatch,
860                        track.pending_commit_count,
861                        report_deadline.saturating_duration_since(now).as_secs()
862                    )
863                    .into());
864                }
865                CompactionTrackState::Idle { .. } => {}
866            }
867        }
868
869        if self.apply_sink_update(&mut guard, prepared_update) {
870            let (tx, rx) = oneshot::channel();
871            guard.manual_compaction_waiters.insert(sink_id, tx);
872            Ok(rx)
873        } else {
874            Err(anyhow!(
875                "failed to trigger manual iceberg compaction for sink {}",
876                sink_id
877            )
878            .into())
879        }
880    }
881
882    pub(super) fn cancel_manual_compaction_waiter(&self, sink_id: SinkId) {
883        self.inner
884            .write()
885            .manual_compaction_waiters
886            .remove(&sink_id);
887    }
888
889    fn finish_timed_out_compaction_tasks(
890        guard: &mut IcebergCompactionManagerInner,
891        now: Instant,
892    ) -> Vec<(SinkId, ManualCompactionWaiter)> {
893        let mut timed_out_tasks = Vec::new();
894        for (&sink_id, track) in &mut guard.sink_schedules {
895            if track.is_report_timed_out(now) {
896                tracing::warn!(sink_id = %sink_id, "Iceberg compaction task report timed out");
897                timed_out_tasks.push((sink_id, track.finish_failed(now)));
898            }
899        }
900
901        let mut timed_out_waiters = Vec::new();
902        for (sink_id, finish_action) in timed_out_tasks {
903            if let Some(waiter) = guard.manual_compaction_waiters.remove(&sink_id) {
904                timed_out_waiters.push((sink_id, waiter));
905            }
906            Self::apply_track_finish_action(guard, sink_id, finish_action);
907        }
908        timed_out_waiters
909    }
910
911    pub(crate) fn get_top_n_iceberg_commit_sink_ids(
912        &self,
913        n: usize,
914    ) -> Vec<IcebergCompactionHandle> {
915        let now = Instant::now();
916        let (handles, timed_out_waiters) = {
917            let mut guard = self.inner.write();
918            let timed_out_waiters = Self::finish_timed_out_compaction_tasks(&mut guard, now);
919
920            let mut candidates = Vec::new();
921            for (sink_id, track) in &guard.sink_schedules {
922                if track.should_trigger(now)
923                    && let CompactionTrackState::Idle {
924                        next_compaction_time,
925                        ..
926                    } = &track.state
927                {
928                    candidates.push((*sink_id, *next_compaction_time));
929                }
930            }
931
932            candidates.sort_by(|a, b| a.1.cmp(&b.1));
933
934            let handles = candidates
935                .into_iter()
936                .take(n)
937                .filter_map(|(sink_id, _)| {
938                    let track = guard.sink_schedules.get_mut(&sink_id)?;
939                    let task_type = track.start_processing();
940
941                    Some(IcebergCompactionHandle::new(
942                        sink_id,
943                        task_type,
944                        self.inner.clone(),
945                        self.metadata_manager.clone(),
946                    ))
947                })
948                .collect();
949
950            (handles, timed_out_waiters)
951        };
952
953        for (sink_id, waiter) in timed_out_waiters {
954            let _ = waiter.send(Err(anyhow!(
955                "Iceberg compaction task report timed out for sink {}",
956                sink_id
957            )
958            .into()));
959        }
960
961        handles
962    }
963
964    pub fn clear_iceberg_maintenance_by_sink_id(&self, sink_id: SinkId) {
965        let (task_to_cancel, waiter) = {
966            let mut guard = self.inner.write();
967            let task_to_cancel = Self::remove_sink_schedule(&mut guard, sink_id);
968            guard.snapshot_expiration_sink_ids.remove(&sink_id);
969            let waiter = guard.manual_compaction_waiters.remove(&sink_id);
970            (task_to_cancel, waiter)
971        };
972        self.cancel_scheduled_task_if_any(sink_id, task_to_cancel);
973
974        if let Some(waiter) = waiter {
975            let _ = waiter.send(Err(anyhow!(
976                "Iceberg compaction maintenance was cleared for sink {}",
977                sink_id
978            )
979            .into()));
980        }
981    }
982
983    fn remove_sink_schedule(
984        guard: &mut IcebergCompactionManagerInner,
985        sink_id: SinkId,
986    ) -> Option<ScheduledCompactionTask> {
987        guard
988            .sink_schedules
989            .remove(&sink_id)
990            .and_then(|track| track.scheduled_task())
991    }
992
993    fn cancel_scheduled_task_if_any(&self, sink_id: SinkId, task: Option<ScheduledCompactionTask>) {
994        let Some(ScheduledCompactionTask {
995            task_id,
996            compactor_context_id,
997        }) = task
998        else {
999            return;
1000        };
1001
1002        let Some(compactor) = self
1003            .iceberg_compactor_manager
1004            .get_compactor(compactor_context_id)
1005        else {
1006            tracing::warn!(
1007                sink_id = %sink_id,
1008                task_id,
1009                compactor_context_id = %compactor_context_id,
1010                "Unable to cancel iceberg compaction task because compactor is no longer registered",
1011            );
1012            return;
1013        };
1014
1015        tracing::info!(
1016            sink_id = %sink_id,
1017            task_id,
1018            compactor_context_id = %compactor_context_id,
1019            "Cancelling iceberg compaction task for removed schedule",
1020        );
1021
1022        if let Err(e) = compactor.cancel_task(task_id) {
1023            tracing::warn!(
1024                error = %e.as_report(),
1025                sink_id = %sink_id,
1026                task_id,
1027                compactor_context_id = %compactor_context_id,
1028                "Failed to cancel iceberg compaction task for removed schedule",
1029            );
1030        }
1031    }
1032
1033    pub fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
1034        let now = Instant::now();
1035        let schedules = {
1036            let guard = self.inner.read();
1037            guard
1038                .sink_schedules
1039                .iter()
1040                .map(|(&sink_id, track)| (sink_id, track.clone()))
1041                .collect_vec()
1042        };
1043
1044        let mut statuses = schedules
1045            .into_iter()
1046            .map(|(sink_id, track)| {
1047                let next_compaction_after_sec = match &track.state {
1048                    CompactionTrackState::Idle {
1049                        next_compaction_time,
1050                        ..
1051                    } => Some(
1052                        next_compaction_time
1053                            .saturating_duration_since(now)
1054                            .as_secs(),
1055                    ),
1056                    CompactionTrackState::PendingDispatch { .. }
1057                    | CompactionTrackState::InFlight { .. } => None,
1058                };
1059                let is_triggerable = track.should_trigger(now);
1060
1061                IcebergCompactionScheduleStatus {
1062                    sink_id,
1063                    task_type: track.current_task_type().as_str_name().to_ascii_lowercase(),
1064                    trigger_interval_sec: track.trigger_interval_sec,
1065                    trigger_snapshot_count: track.trigger_snapshot_count,
1066                    schedule_state: match track.state {
1067                        CompactionTrackState::Idle { .. } => "idle".to_owned(),
1068                        CompactionTrackState::PendingDispatch { .. }
1069                        | CompactionTrackState::InFlight { .. } => "processing".to_owned(),
1070                    },
1071                    next_compaction_after_sec,
1072                    pending_snapshot_count: Some(track.pending_commit_count),
1073                    is_triggerable,
1074                }
1075            })
1076            .collect_vec();
1077
1078        statuses.sort_by_key(|status| status.sink_id);
1079        statuses
1080    }
1081
1082    pub fn handle_report_task(&self, report: IcebergReportTask) {
1083        let sink_id = SinkId::from(report.sink_id);
1084        let task_id = report.task_id;
1085        let status = IcebergReportTaskStatus::try_from(report.status)
1086            .unwrap_or(IcebergReportTaskStatus::Unspecified);
1087        let now = Instant::now();
1088
1089        let waiter = {
1090            let mut guard = self.inner.write();
1091            let mut waiter = None;
1092
1093            match guard.sink_schedules.get_mut(&sink_id) {
1094                Some(track) if track.is_processing_task(task_id) => {
1095                    let finish_action = match status {
1096                        IcebergReportTaskStatus::Success => track.finish_success(now),
1097                        IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
1098                            tracing::warn!(
1099                                sink_id = %sink_id,
1100                                task_id,
1101                                error_message = report.error_message.clone().unwrap_or_default(),
1102                                "Iceberg compaction task reported failure"
1103                            );
1104                            track.finish_failed(now)
1105                        }
1106                    };
1107
1108                    Self::apply_track_finish_action(&mut guard, sink_id, finish_action);
1109                    waiter = guard.manual_compaction_waiters.remove(&sink_id);
1110                }
1111                Some(_) => {
1112                    tracing::warn!(sink_id = %sink_id, task_id, "Ignoring stale iceberg compaction report");
1113                }
1114                None => {
1115                    tracing::warn!(sink_id = %sink_id, task_id, "Received iceberg compaction report for unknown sink");
1116                }
1117            }
1118
1119            waiter
1120        };
1121
1122        if let Some(waiter) = waiter {
1123            Self::complete_manual_task_waiter(waiter, &report);
1124        }
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests;