1use std::collections::hash_map::Entry;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_connector::connector_common::{
22 IcebergCommittedSnapshot, IcebergSinkCompactionUpdate,
23};
24use risingwave_connector::sink::SinkParam;
25use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
26use risingwave_connector::sink::iceberg::{
27 CompactionType, IcebergConfig, should_enable_iceberg_cow,
28};
29use risingwave_hummock_sdk::HummockContextId;
30use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
33use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
34use thiserror_ext::AsReport;
35use tokio::sync::oneshot;
36
37use super::*;
38
39#[derive(Debug, Clone)]
41enum CompactionTrackState {
42 Idle {
47 next_compaction_time: Instant,
48 next_task_type_override: Option<TaskType>,
51 },
52 PendingDispatch {
54 task_type: TaskType,
55 next_compaction_time_on_failure: Instant,
56 pending_commit_count_at_dispatch: usize,
57 gc_watermark_snapshot: Option<IcebergCommittedSnapshot>,
58 },
59 InFlight {
62 task_id: u64,
63 compactor_context_id: HummockContextId,
64 task_type: TaskType,
65 pending_commit_count_at_dispatch: usize,
66 report_deadline: Instant,
67 gc_watermark_snapshot: Option<IcebergCommittedSnapshot>,
68 },
69}
70
71#[derive(Debug, Clone, Copy)]
72struct ScheduledCompactionTask {
73 task_id: u64,
74 compactor_context_id: HummockContextId,
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78enum CompactionTrackFinishAction {
79 KeepTrack,
80 RemoveTrack,
81}
82
83#[derive(Debug, Clone)]
84pub(super) struct CompactionTrack {
85 task_type: TaskType,
86 trigger_interval_sec: u64,
87 trigger_snapshot_count: usize,
90 report_timeout: Duration,
91 last_config_refresh_at: Instant,
92 pending_commit_count: usize,
93 latest_observed_snapshot: Option<IcebergCommittedSnapshot>,
94 finish_action: CompactionTrackFinishAction,
98 state: CompactionTrackState,
99}
100
101impl CompactionTrack {
102 fn new(
103 task_type: TaskType,
104 trigger_interval_sec: u64,
105 trigger_snapshot_count: usize,
106 report_timeout: Duration,
107 now: Instant,
108 ) -> Self {
109 Self {
110 task_type,
111 trigger_interval_sec,
112 trigger_snapshot_count,
113 report_timeout,
114 last_config_refresh_at: now,
115 pending_commit_count: 0,
116 latest_observed_snapshot: None,
117 finish_action: CompactionTrackFinishAction::KeepTrack,
118 state: CompactionTrackState::Idle {
119 next_compaction_time: now + Duration::from_secs(trigger_interval_sec),
120 next_task_type_override: None,
121 },
122 }
123 }
124
125 fn should_trigger(&self, now: Instant) -> bool {
137 let next_compaction_time = match &self.state {
138 CompactionTrackState::Idle {
139 next_compaction_time,
140 ..
141 } => *next_compaction_time,
142 CompactionTrackState::PendingDispatch { .. }
143 | CompactionTrackState::InFlight { .. } => return false,
144 };
145
146 let time_ready = now >= next_compaction_time;
147 let commit_ready = self.pending_commit_count >= self.trigger_snapshot_count;
148 let has_commits = self.pending_commit_count > 0;
149
150 commit_ready || (time_ready && has_commits)
151 }
152
153 fn record_observed_snapshot(&mut self, observed_snapshot: IcebergCommittedSnapshot) {
154 self.latest_observed_snapshot = Some(observed_snapshot);
155 }
156
157 fn record_commit(&mut self) {
158 self.pending_commit_count = self.pending_commit_count.saturating_add(1);
159 }
160
161 fn record_force_compaction(&mut self, now: Instant, forced_task_type: Option<TaskType>) {
162 if let CompactionTrackState::Idle {
163 next_compaction_time,
164 next_task_type_override,
165 } = &mut self.state
166 {
167 if let Some(task_type) = forced_task_type {
168 *next_task_type_override = Some(task_type);
169 }
170 *next_compaction_time = now;
171 self.pending_commit_count = self.pending_commit_count.max(1);
172 }
173 }
174
175 fn needs_config_refresh(&self, now: Instant, refresh_interval: Duration) -> bool {
176 now.saturating_duration_since(self.last_config_refresh_at) >= refresh_interval
177 }
178
179 fn should_refresh_config(&self, now: Instant, refresh_interval: Duration) -> bool {
180 matches!(self.state, CompactionTrackState::Idle { .. })
181 && self.needs_config_refresh(now, refresh_interval)
182 }
183
184 fn mark_config_refreshed(&mut self, now: Instant) {
185 self.last_config_refresh_at = now;
186 }
187
188 fn current_task_type(&self) -> TaskType {
189 match &self.state {
190 CompactionTrackState::Idle {
191 next_task_type_override,
192 ..
193 } => next_task_type_override.unwrap_or(self.task_type),
194 CompactionTrackState::PendingDispatch { task_type, .. }
195 | CompactionTrackState::InFlight { task_type, .. } => *task_type,
196 }
197 }
198
199 fn start_processing(&mut self) -> TaskType {
200 match &mut self.state {
201 CompactionTrackState::Idle {
202 next_compaction_time,
203 next_task_type_override,
204 } => {
205 let task_type = next_task_type_override.take().unwrap_or(self.task_type);
206 self.state = CompactionTrackState::PendingDispatch {
207 task_type,
208 next_compaction_time_on_failure: *next_compaction_time,
209 pending_commit_count_at_dispatch: self.pending_commit_count,
210 gc_watermark_snapshot: self.latest_observed_snapshot.clone(),
211 };
212 task_type
213 }
214 CompactionTrackState::PendingDispatch { .. }
215 | CompactionTrackState::InFlight { .. } => {
216 unreachable!("Cannot start processing when already processing")
217 }
218 }
219 }
220
221 fn mark_dispatched(
222 &mut self,
223 task_id: u64,
224 compactor_context_id: HummockContextId,
225 now: Instant,
226 ) {
227 let (task_type, pending_commit_count_at_dispatch, gc_watermark_snapshot) = match &mut self
228 .state
229 {
230 CompactionTrackState::PendingDispatch {
231 task_type,
232 pending_commit_count_at_dispatch,
233 gc_watermark_snapshot,
234 ..
235 } => {
236 let gc_watermark_snapshot = gc_watermark_snapshot.take();
237 (
238 *task_type,
239 *pending_commit_count_at_dispatch,
240 gc_watermark_snapshot,
241 )
242 }
243 CompactionTrackState::Idle { .. } => unreachable!("Cannot mark dispatched when idle"),
244 CompactionTrackState::InFlight { .. } => {
245 unreachable!("Cannot mark dispatched when already in flight")
246 }
247 };
248 self.state = CompactionTrackState::InFlight {
249 task_id,
250 compactor_context_id,
251 task_type,
252 pending_commit_count_at_dispatch,
253 report_deadline: now + self.report_timeout,
254 gc_watermark_snapshot,
255 };
256 }
257
258 pub(super) fn processing_gc_watermark_snapshot(
259 &self,
260 ) -> Option<Option<&IcebergCommittedSnapshot>> {
261 match &self.state {
262 CompactionTrackState::PendingDispatch {
263 gc_watermark_snapshot,
264 ..
265 }
266 | CompactionTrackState::InFlight {
267 gc_watermark_snapshot,
268 ..
269 } => Some(gc_watermark_snapshot.as_ref()),
270 CompactionTrackState::Idle { .. } => None,
271 }
272 }
273
274 fn is_pending_dispatch(&self) -> bool {
275 matches!(self.state, CompactionTrackState::PendingDispatch { .. })
276 }
277
278 fn removes_track_after_finish(&self) -> bool {
279 self.finish_action == CompactionTrackFinishAction::RemoveTrack
280 }
281
282 pub(super) fn is_processing_task(&self, task_id: u64) -> bool {
283 matches!(
284 &self.state,
285 CompactionTrackState::InFlight {
286 task_id: current_task_id,
287 ..
288 } if *current_task_id == task_id
289 )
290 }
291
292 fn scheduled_task(&self) -> Option<ScheduledCompactionTask> {
293 match &self.state {
294 CompactionTrackState::InFlight {
295 task_id,
296 compactor_context_id,
297 ..
298 } => Some(ScheduledCompactionTask {
299 task_id: *task_id,
300 compactor_context_id: *compactor_context_id,
301 }),
302 CompactionTrackState::Idle { .. } | CompactionTrackState::PendingDispatch { .. } => {
303 None
304 }
305 }
306 }
307
308 fn is_report_timed_out(&self, now: Instant) -> bool {
309 matches!(
310 &self.state,
311 CompactionTrackState::InFlight {
312 report_deadline,
313 ..
314 } if now >= *report_deadline
315 )
316 }
317
318 fn finish_success(&mut self, now: Instant) -> CompactionTrackFinishAction {
319 match &self.state {
320 CompactionTrackState::InFlight {
321 pending_commit_count_at_dispatch,
322 ..
323 } => {
324 self.pending_commit_count = self
325 .pending_commit_count
326 .saturating_sub(*pending_commit_count_at_dispatch);
327 self.state = CompactionTrackState::Idle {
328 next_compaction_time: now + Duration::from_secs(self.trigger_interval_sec),
329 next_task_type_override: None,
330 };
331 self.finish_action
332 }
333 CompactionTrackState::Idle { .. } => unreachable!("Cannot finish success when idle"),
334 CompactionTrackState::PendingDispatch { .. } => {
335 unreachable!("Cannot finish success before task dispatch")
336 }
337 }
338 }
339
340 fn finish_failed(&mut self, now: Instant) -> CompactionTrackFinishAction {
341 match &self.state {
342 CompactionTrackState::InFlight { .. } => {
343 self.state = CompactionTrackState::Idle {
344 next_compaction_time: now,
345 next_task_type_override: None,
346 };
347 self.finish_action
348 }
349 CompactionTrackState::Idle { .. } => unreachable!("Cannot finish failed when idle"),
350 CompactionTrackState::PendingDispatch { .. } => {
351 unreachable!("Cannot finish failed before task dispatch")
352 }
353 }
354 }
355
356 fn revert_pre_dispatch_failure(&mut self) -> CompactionTrackFinishAction {
362 match &self.state {
363 CompactionTrackState::PendingDispatch {
364 next_compaction_time_on_failure,
365 ..
366 } => {
367 self.state = CompactionTrackState::Idle {
368 next_compaction_time: *next_compaction_time_on_failure,
369 next_task_type_override: None,
370 };
371 self.finish_action
372 }
373 CompactionTrackState::Idle { .. } => {
374 unreachable!("Cannot revert a pre-dispatch failure when idle")
375 }
376 CompactionTrackState::InFlight { .. } => {
377 unreachable!("Cannot revert a pre-dispatch failure after dispatch")
378 }
379 }
380 }
381
382 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
383 if self.trigger_interval_sec == new_interval_sec {
384 return;
385 }
386
387 self.trigger_interval_sec = new_interval_sec;
388
389 match &mut self.state {
390 CompactionTrackState::Idle {
391 next_compaction_time,
392 ..
393 } => {
394 *next_compaction_time = now + Duration::from_secs(new_interval_sec);
395 }
396 CompactionTrackState::PendingDispatch { .. }
397 | CompactionTrackState::InFlight { .. } => {}
398 }
399 }
400}
401
402pub(crate) struct IcebergCompactionHandle {
403 sink_id: SinkId,
404 task_type: TaskType,
405 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
406 metadata_manager: MetadataManager,
407 handle_success: bool,
408}
409
410impl IcebergCompactionHandle {
411 fn new(
412 sink_id: SinkId,
413 task_type: TaskType,
414 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
415 metadata_manager: MetadataManager,
416 ) -> Self {
417 Self {
418 sink_id,
419 task_type,
420 inner,
421 metadata_manager,
422 handle_success: false,
423 }
424 }
425
426 pub async fn send_compact_task(
427 mut self,
428 compactor: Arc<crate::hummock::IcebergCompactor>,
429 task_id: u64,
430 ) -> MetaResult<()> {
431 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
432
433 let Some(prost_sink_catalog) = self
434 .metadata_manager
435 .catalog_controller
436 .get_sink_by_id(self.sink_id)
437 .await?
438 else {
439 tracing::warn!("Sink not found: {}", self.sink_id);
440 return Ok(());
441 };
442 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
443 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
444
445 let result =
446 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
447 task_id,
448 sink_id: self.sink_id.as_raw_id(),
449 props: param.properties,
450 task_type: self.task_type as i32,
451 }));
452
453 if result.is_ok() {
454 let mut should_cancel_sent_task = false;
455 let mut guard = self.inner.write();
456 let mut dispatched = false;
457 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
458 && track.is_pending_dispatch()
459 {
460 track.mark_dispatched(task_id, compactor.context_id(), Instant::now());
461 dispatched = true;
462 }
463 self.handle_success = dispatched;
464 if !dispatched {
465 should_cancel_sent_task = true;
466 tracing::warn!(
467 sink_id = %self.sink_id,
468 task_id,
469 "Iceberg compaction task send succeeded but track was no longer pending dispatch"
470 );
471 }
472 drop(guard);
473
474 if should_cancel_sent_task {
475 self.cancel_sent_task(&compactor, task_id);
476 }
477 }
478
479 result
480 }
481
482 fn cancel_sent_task(&self, compactor: &crate::hummock::IcebergCompactor, task_id: u64) {
483 if let Err(e) = compactor.cancel_task(task_id) {
484 tracing::warn!(
485 error = %e.as_report(),
486 sink_id = %self.sink_id,
487 task_id,
488 "Failed to cancel iceberg compaction task after schedule removal",
489 );
490 }
491 }
492}
493
494impl Drop for IcebergCompactionHandle {
495 fn drop(&mut self) {
496 let waiter = {
497 let mut guard = self.inner.write();
498 if !self.handle_success
499 && let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
500 && track.is_pending_dispatch()
501 {
502 let finish_action = track.revert_pre_dispatch_failure();
503 let waiter = guard.manual_compaction_waiters.remove(&self.sink_id);
504 IcebergCompactionManager::apply_track_finish_action(
505 &mut guard,
506 self.sink_id,
507 finish_action,
508 );
509 waiter
510 } else {
511 None
512 }
513 };
514
515 if let Some(waiter) = waiter {
516 let _ = waiter.send(Err(anyhow!(
517 "Iceberg compaction task failed before dispatch for sink {}",
518 self.sink_id
519 )
520 .into()));
521 }
522 }
523}
524
525#[derive(Debug, Clone)]
526enum SinkUpdateKind {
527 Commit {
529 observed_snapshot: IcebergCommittedSnapshot,
530 },
531 ForceCompaction {
534 observed_snapshot: IcebergCommittedSnapshot,
535 },
536 ManualForceCompaction { task_type: TaskType },
539}
540
541impl SinkUpdateKind {
542 fn apply_to_track(self, track: &mut CompactionTrack, now: Instant) {
543 match self {
544 SinkUpdateKind::Commit { observed_snapshot } => {
545 track.record_observed_snapshot(observed_snapshot);
546 track.record_commit();
547 }
548 SinkUpdateKind::ForceCompaction { observed_snapshot } => {
549 if matches!(track.state, CompactionTrackState::Idle { .. }) {
550 track.record_observed_snapshot(observed_snapshot);
551 }
552 track.record_force_compaction(now, None);
553 }
554 SinkUpdateKind::ManualForceCompaction { task_type } => {
555 track.record_force_compaction(now, Some(task_type))
556 }
557 }
558 }
559
560 fn allows_disabled_compaction(&self) -> bool {
561 matches!(self, SinkUpdateKind::ManualForceCompaction { .. })
562 }
563}
564
565struct PreparedSinkUpdate {
575 sink_id: SinkId,
576 kind: SinkUpdateKind,
577 now: Instant,
578 allow_track_initialization: bool,
579 loaded_config: Option<IcebergConfig>,
580}
581
582#[derive(Debug, Clone)]
583pub struct IcebergCompactionScheduleStatus {
584 pub sink_id: SinkId,
585 pub task_type: String,
586 pub trigger_interval_sec: u64,
587 pub trigger_snapshot_count: usize,
588 pub schedule_state: String,
589 pub next_compaction_after_sec: Option<u64>,
590 pub pending_snapshot_count: Option<usize>,
591 pub is_triggerable: bool,
592}
593
594impl IcebergCompactionManager {
595 fn apply_track_finish_action(
596 guard: &mut IcebergCompactionManagerInner,
597 sink_id: SinkId,
598 finish_action: CompactionTrackFinishAction,
599 ) {
600 match finish_action {
601 CompactionTrackFinishAction::KeepTrack => {}
602 CompactionTrackFinishAction::RemoveTrack => {
603 guard.sink_schedules.remove(&sink_id);
604 }
605 }
606 }
607
608 pub(super) fn refresh_schedule_config(
609 &self,
610 track: &mut CompactionTrack,
611 iceberg_config: &IcebergConfig,
612 now: Instant,
613 ) {
614 let (task_type, trigger_interval_sec, trigger_snapshot_count) =
615 self.resolve_schedule_values(iceberg_config);
616 track.task_type = task_type;
617 track.trigger_snapshot_count = trigger_snapshot_count;
618 track.update_interval(trigger_interval_sec, now);
619 track.mark_config_refreshed(now);
620 }
621
622 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
623 let IcebergSinkCompactionUpdate {
624 sink_id,
625 force_compaction,
626 observed_snapshot,
627 } = msg;
628 let kind = if force_compaction {
629 SinkUpdateKind::ForceCompaction { observed_snapshot }
630 } else {
631 SinkUpdateKind::Commit { observed_snapshot }
632 };
633 let prepared_update = self
634 .prepare_sink_update(sink_id, kind, Instant::now())
635 .await;
636
637 let mut guard = self.inner.write();
638 self.apply_sink_update(&mut guard, prepared_update);
639 }
640
641 async fn prepare_sink_update(
642 &self,
643 sink_id: SinkId,
644 kind: SinkUpdateKind,
645 now: Instant,
646 ) -> PreparedSinkUpdate {
647 let refresh_interval = self.config_refresh_interval();
648 let (allow_track_initialization, should_refresh_config) = {
649 let guard = self.inner.read();
650 match guard.sink_schedules.get(&sink_id) {
651 Some(track) => (false, track.should_refresh_config(now, refresh_interval)),
652 None => (true, true),
653 }
654 };
655
656 let loaded_config = if should_refresh_config {
657 match self.load_iceberg_config(sink_id).await {
658 Ok(config) => Some(config),
659 Err(e) => {
660 tracing::warn!(
661 error = ?e.as_report(),
662 "Failed to load iceberg config for sink {}",
663 sink_id
664 );
665 None
666 }
667 }
668 } else {
669 None
670 };
671
672 PreparedSinkUpdate {
673 sink_id,
674 kind,
675 now,
676 allow_track_initialization,
677 loaded_config,
678 }
679 }
680
681 fn apply_sink_update(
682 &self,
683 guard: &mut IcebergCompactionManagerInner,
684 prepared_update: PreparedSinkUpdate,
685 ) -> bool {
686 let PreparedSinkUpdate {
687 sink_id,
688 kind,
689 now,
690 allow_track_initialization,
691 loaded_config,
692 } = prepared_update;
693 let refresh_interval = self.config_refresh_interval();
694
695 if let Some(config) = loaded_config.as_ref() {
696 if config.enable_snapshot_expiration {
697 guard.snapshot_expiration_sink_ids.insert(sink_id);
698 } else {
699 guard.snapshot_expiration_sink_ids.remove(&sink_id);
700 }
701
702 if !config.enable_compaction && !kind.allows_disabled_compaction() {
703 if !guard.sink_schedules.get(&sink_id).is_some_and(|track| {
704 matches!(
705 &track.state,
706 CompactionTrackState::PendingDispatch { .. }
707 | CompactionTrackState::InFlight { .. }
708 ) || track.removes_track_after_finish()
709 }) {
710 guard.sink_schedules.remove(&sink_id);
711 }
712 return false;
713 }
714 }
715
716 match guard.sink_schedules.entry(sink_id) {
717 Entry::Occupied(entry) => {
718 let track = entry.into_mut();
719 if track.removes_track_after_finish()
720 && !kind.allows_disabled_compaction()
721 && !loaded_config
722 .as_ref()
723 .is_some_and(|config| config.enable_compaction)
724 {
725 return false;
726 }
727 if track.should_refresh_config(now, refresh_interval)
728 && let Some(config) = loaded_config.as_ref()
729 {
730 self.refresh_schedule_config(track, config, now);
731 }
732 if let Some(config) = loaded_config.as_ref() {
733 track.finish_action =
734 if kind.allows_disabled_compaction() && !config.enable_compaction {
735 CompactionTrackFinishAction::RemoveTrack
736 } else {
737 CompactionTrackFinishAction::KeepTrack
738 };
739 }
740
741 kind.apply_to_track(track, now);
742 true
743 }
744 Entry::Vacant(entry) => {
745 if !allow_track_initialization {
746 tracing::warn!(
747 sink_id = %sink_id,
748 "Ignoring iceberg compaction update because track disappeared before apply"
749 );
750 return false;
751 }
752
753 let Some(config) = loaded_config.as_ref() else {
754 tracing::warn!(
755 sink_id = %sink_id,
756 "Ignoring iceberg compaction update because sink config is unavailable"
757 );
758 return false;
759 };
760
761 let track = entry.insert(self.create_compaction_track(config, now));
762 track.finish_action =
763 if kind.allows_disabled_compaction() && !config.enable_compaction {
764 CompactionTrackFinishAction::RemoveTrack
765 } else {
766 CompactionTrackFinishAction::KeepTrack
767 };
768 kind.apply_to_track(track, now);
769 true
770 }
771 }
772 }
773
774 pub(super) fn create_compaction_track(
775 &self,
776 iceberg_config: &IcebergConfig,
777 now: Instant,
778 ) -> CompactionTrack {
779 let (task_type, trigger_interval_sec, trigger_snapshot_count) =
780 self.resolve_schedule_values(iceberg_config);
781
782 CompactionTrack::new(
783 task_type,
784 trigger_interval_sec,
785 trigger_snapshot_count,
786 self.report_timeout(),
787 now,
788 )
789 }
790
791 fn resolve_schedule_values(&self, iceberg_config: &IcebergConfig) -> (TaskType, u64, usize) {
792 (
793 if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
794 {
795 TaskType::Full
796 } else {
797 match iceberg_config.compaction_type() {
798 CompactionType::Full => TaskType::Full,
799 CompactionType::SmallFiles => TaskType::SmallFiles,
800 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
801 }
802 },
803 iceberg_config.compaction_interval_sec(),
804 iceberg_config.trigger_snapshot_count(),
805 )
806 }
807
808 pub(super) async fn start_manual_compaction(
809 &self,
810 sink_id: SinkId,
811 ) -> MetaResult<oneshot::Receiver<MetaResult<u64>>> {
812 let prepared_update = self
813 .prepare_sink_update(
814 sink_id,
815 SinkUpdateKind::ManualForceCompaction {
816 task_type: TaskType::Full,
817 },
818 Instant::now(),
819 )
820 .await;
821 let mut guard = self.inner.write();
822 let now = Instant::now();
823 if guard.manual_compaction_waiters.contains_key(&sink_id) {
824 return Err(anyhow!(
825 "manual iceberg compaction is already waiting for sink {}",
826 sink_id
827 )
828 .into());
829 }
830
831 if let Some(track) = guard.sink_schedules.get(&sink_id) {
832 match &track.state {
833 CompactionTrackState::PendingDispatch {
834 pending_commit_count_at_dispatch,
835 ..
836 } => {
837 return Err(anyhow!(
838 "iceberg compaction task is already running for sink {} \
839 (state=pending_dispatch, pending_commit_count_at_dispatch={}, \
840 pending_commit_count={})",
841 sink_id,
842 pending_commit_count_at_dispatch,
843 track.pending_commit_count
844 )
845 .into());
846 }
847 CompactionTrackState::InFlight {
848 task_id,
849 pending_commit_count_at_dispatch,
850 report_deadline,
851 ..
852 } => {
853 return Err(anyhow!(
854 "iceberg compaction task is already running for sink {} \
855 (state=in_flight, task_id={}, pending_commit_count_at_dispatch={}, \
856 pending_commit_count={}, report_timeout_after_sec={})",
857 sink_id,
858 task_id,
859 pending_commit_count_at_dispatch,
860 track.pending_commit_count,
861 report_deadline.saturating_duration_since(now).as_secs()
862 )
863 .into());
864 }
865 CompactionTrackState::Idle { .. } => {}
866 }
867 }
868
869 if self.apply_sink_update(&mut guard, prepared_update) {
870 let (tx, rx) = oneshot::channel();
871 guard.manual_compaction_waiters.insert(sink_id, tx);
872 Ok(rx)
873 } else {
874 Err(anyhow!(
875 "failed to trigger manual iceberg compaction for sink {}",
876 sink_id
877 )
878 .into())
879 }
880 }
881
882 pub(super) fn cancel_manual_compaction_waiter(&self, sink_id: SinkId) {
883 self.inner
884 .write()
885 .manual_compaction_waiters
886 .remove(&sink_id);
887 }
888
889 fn finish_timed_out_compaction_tasks(
890 guard: &mut IcebergCompactionManagerInner,
891 now: Instant,
892 ) -> Vec<(SinkId, ManualCompactionWaiter)> {
893 let mut timed_out_tasks = Vec::new();
894 for (&sink_id, track) in &mut guard.sink_schedules {
895 if track.is_report_timed_out(now) {
896 tracing::warn!(sink_id = %sink_id, "Iceberg compaction task report timed out");
897 timed_out_tasks.push((sink_id, track.finish_failed(now)));
898 }
899 }
900
901 let mut timed_out_waiters = Vec::new();
902 for (sink_id, finish_action) in timed_out_tasks {
903 if let Some(waiter) = guard.manual_compaction_waiters.remove(&sink_id) {
904 timed_out_waiters.push((sink_id, waiter));
905 }
906 Self::apply_track_finish_action(guard, sink_id, finish_action);
907 }
908 timed_out_waiters
909 }
910
911 pub(crate) fn get_top_n_iceberg_commit_sink_ids(
912 &self,
913 n: usize,
914 ) -> Vec<IcebergCompactionHandle> {
915 let now = Instant::now();
916 let (handles, timed_out_waiters) = {
917 let mut guard = self.inner.write();
918 let timed_out_waiters = Self::finish_timed_out_compaction_tasks(&mut guard, now);
919
920 let mut candidates = Vec::new();
921 for (sink_id, track) in &guard.sink_schedules {
922 if track.should_trigger(now)
923 && let CompactionTrackState::Idle {
924 next_compaction_time,
925 ..
926 } = &track.state
927 {
928 candidates.push((*sink_id, *next_compaction_time));
929 }
930 }
931
932 candidates.sort_by(|a, b| a.1.cmp(&b.1));
933
934 let handles = candidates
935 .into_iter()
936 .take(n)
937 .filter_map(|(sink_id, _)| {
938 let track = guard.sink_schedules.get_mut(&sink_id)?;
939 let task_type = track.start_processing();
940
941 Some(IcebergCompactionHandle::new(
942 sink_id,
943 task_type,
944 self.inner.clone(),
945 self.metadata_manager.clone(),
946 ))
947 })
948 .collect();
949
950 (handles, timed_out_waiters)
951 };
952
953 for (sink_id, waiter) in timed_out_waiters {
954 let _ = waiter.send(Err(anyhow!(
955 "Iceberg compaction task report timed out for sink {}",
956 sink_id
957 )
958 .into()));
959 }
960
961 handles
962 }
963
964 pub fn clear_iceberg_maintenance_by_sink_id(&self, sink_id: SinkId) {
965 let (task_to_cancel, waiter) = {
966 let mut guard = self.inner.write();
967 let task_to_cancel = Self::remove_sink_schedule(&mut guard, sink_id);
968 guard.snapshot_expiration_sink_ids.remove(&sink_id);
969 let waiter = guard.manual_compaction_waiters.remove(&sink_id);
970 (task_to_cancel, waiter)
971 };
972 self.cancel_scheduled_task_if_any(sink_id, task_to_cancel);
973
974 if let Some(waiter) = waiter {
975 let _ = waiter.send(Err(anyhow!(
976 "Iceberg compaction maintenance was cleared for sink {}",
977 sink_id
978 )
979 .into()));
980 }
981 }
982
983 fn remove_sink_schedule(
984 guard: &mut IcebergCompactionManagerInner,
985 sink_id: SinkId,
986 ) -> Option<ScheduledCompactionTask> {
987 guard
988 .sink_schedules
989 .remove(&sink_id)
990 .and_then(|track| track.scheduled_task())
991 }
992
993 fn cancel_scheduled_task_if_any(&self, sink_id: SinkId, task: Option<ScheduledCompactionTask>) {
994 let Some(ScheduledCompactionTask {
995 task_id,
996 compactor_context_id,
997 }) = task
998 else {
999 return;
1000 };
1001
1002 let Some(compactor) = self
1003 .iceberg_compactor_manager
1004 .get_compactor(compactor_context_id)
1005 else {
1006 tracing::warn!(
1007 sink_id = %sink_id,
1008 task_id,
1009 compactor_context_id = %compactor_context_id,
1010 "Unable to cancel iceberg compaction task because compactor is no longer registered",
1011 );
1012 return;
1013 };
1014
1015 tracing::info!(
1016 sink_id = %sink_id,
1017 task_id,
1018 compactor_context_id = %compactor_context_id,
1019 "Cancelling iceberg compaction task for removed schedule",
1020 );
1021
1022 if let Err(e) = compactor.cancel_task(task_id) {
1023 tracing::warn!(
1024 error = %e.as_report(),
1025 sink_id = %sink_id,
1026 task_id,
1027 compactor_context_id = %compactor_context_id,
1028 "Failed to cancel iceberg compaction task for removed schedule",
1029 );
1030 }
1031 }
1032
1033 pub fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
1034 let now = Instant::now();
1035 let schedules = {
1036 let guard = self.inner.read();
1037 guard
1038 .sink_schedules
1039 .iter()
1040 .map(|(&sink_id, track)| (sink_id, track.clone()))
1041 .collect_vec()
1042 };
1043
1044 let mut statuses = schedules
1045 .into_iter()
1046 .map(|(sink_id, track)| {
1047 let next_compaction_after_sec = match &track.state {
1048 CompactionTrackState::Idle {
1049 next_compaction_time,
1050 ..
1051 } => Some(
1052 next_compaction_time
1053 .saturating_duration_since(now)
1054 .as_secs(),
1055 ),
1056 CompactionTrackState::PendingDispatch { .. }
1057 | CompactionTrackState::InFlight { .. } => None,
1058 };
1059 let is_triggerable = track.should_trigger(now);
1060
1061 IcebergCompactionScheduleStatus {
1062 sink_id,
1063 task_type: track.current_task_type().as_str_name().to_ascii_lowercase(),
1064 trigger_interval_sec: track.trigger_interval_sec,
1065 trigger_snapshot_count: track.trigger_snapshot_count,
1066 schedule_state: match track.state {
1067 CompactionTrackState::Idle { .. } => "idle".to_owned(),
1068 CompactionTrackState::PendingDispatch { .. }
1069 | CompactionTrackState::InFlight { .. } => "processing".to_owned(),
1070 },
1071 next_compaction_after_sec,
1072 pending_snapshot_count: Some(track.pending_commit_count),
1073 is_triggerable,
1074 }
1075 })
1076 .collect_vec();
1077
1078 statuses.sort_by_key(|status| status.sink_id);
1079 statuses
1080 }
1081
1082 pub fn handle_report_task(&self, report: IcebergReportTask) {
1083 let sink_id = SinkId::from(report.sink_id);
1084 let task_id = report.task_id;
1085 let status = IcebergReportTaskStatus::try_from(report.status)
1086 .unwrap_or(IcebergReportTaskStatus::Unspecified);
1087 let now = Instant::now();
1088
1089 let waiter = {
1090 let mut guard = self.inner.write();
1091 let mut waiter = None;
1092
1093 match guard.sink_schedules.get_mut(&sink_id) {
1094 Some(track) if track.is_processing_task(task_id) => {
1095 let finish_action = match status {
1096 IcebergReportTaskStatus::Success => track.finish_success(now),
1097 IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
1098 tracing::warn!(
1099 sink_id = %sink_id,
1100 task_id,
1101 error_message = report.error_message.clone().unwrap_or_default(),
1102 "Iceberg compaction task reported failure"
1103 );
1104 track.finish_failed(now)
1105 }
1106 };
1107
1108 Self::apply_track_finish_action(&mut guard, sink_id, finish_action);
1109 waiter = guard.manual_compaction_waiters.remove(&sink_id);
1110 }
1111 Some(_) => {
1112 tracing::warn!(sink_id = %sink_id, task_id, "Ignoring stale iceberg compaction report");
1113 }
1114 None => {
1115 tracing::warn!(sink_id = %sink_id, task_id, "Received iceberg compaction report for unknown sink");
1116 }
1117 }
1118
1119 waiter
1120 };
1121
1122 if let Some(waiter) = waiter {
1123 Self::complete_manual_task_waiter(waiter, &report);
1124 }
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests;