risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28    CompactionType, IcebergConfig, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45    IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58/// Snapshot for restoring track state on failure
59#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61    next_compaction_time: Option<Instant>,
62}
63
64/// Compaction track states using type-safe state machine pattern
65#[derive(Debug, Clone)]
66enum CompactionTrackState {
67    /// Ready to accept commits and check for trigger conditions
68    Idle { next_compaction_time: Instant },
69    /// Compaction task is being processed
70    Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75    task_type: TaskType,
76    trigger_interval_sec: u64,
77    /// Minimum snapshot count threshold to trigger compaction
78    trigger_snapshot_count: usize,
79    state: CompactionTrackState,
80}
81
82impl CompactionTrack {
83    fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
84        // Only Idle state can trigger
85        let next_compaction_time = match &self.state {
86            CompactionTrackState::Idle {
87                next_compaction_time,
88            } => *next_compaction_time,
89            CompactionTrackState::Processing => return false,
90        };
91
92        // Check both time and snapshot count conditions
93        let time_ready = now >= next_compaction_time;
94        let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
95
96        time_ready && snapshot_ready
97    }
98
99    /// Create snapshot and transition to Processing state
100    fn start_processing(&mut self) -> CompactionTrackSnapshot {
101        match &self.state {
102            CompactionTrackState::Idle {
103                next_compaction_time,
104            } => {
105                let snapshot = CompactionTrackSnapshot {
106                    next_compaction_time: Some(*next_compaction_time),
107                };
108                self.state = CompactionTrackState::Processing;
109                snapshot
110            }
111            CompactionTrackState::Processing => {
112                unreachable!("Cannot start processing when already processing")
113            }
114        }
115    }
116
117    /// Complete processing successfully
118    fn complete_processing(&mut self) {
119        match &self.state {
120            CompactionTrackState::Processing => {
121                self.state = CompactionTrackState::Idle {
122                    next_compaction_time: Instant::now()
123                        + std::time::Duration::from_secs(self.trigger_interval_sec),
124                };
125            }
126            CompactionTrackState::Idle { .. } => {
127                unreachable!("Cannot complete processing when not processing")
128            }
129        }
130    }
131
132    /// Restore from snapshot on failure
133    fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
134        self.state = CompactionTrackState::Idle {
135            next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
136        };
137    }
138
139    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
140        if self.trigger_interval_sec == new_interval_sec {
141            return;
142        }
143
144        self.trigger_interval_sec = new_interval_sec;
145
146        // Reset next_compaction_time based on current state
147        match &mut self.state {
148            CompactionTrackState::Idle {
149                next_compaction_time,
150            } => {
151                *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
152            }
153            CompactionTrackState::Processing => {
154                // Keep Processing state, will reset time when completing
155            }
156        }
157    }
158}
159
160// Removed CompactionScheduleState - each sink now only has one CompactionTrack
161pub struct IcebergCompactionHandle {
162    sink_id: SinkId,
163    task_type: TaskType,
164    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
165    metadata_manager: MetadataManager,
166    handle_success: bool,
167
168    /// Snapshot of the compaction track for recovery.
169    track_snapshot: CompactionTrackSnapshot,
170}
171
172impl IcebergCompactionHandle {
173    fn new(
174        sink_id: SinkId,
175        task_type: TaskType,
176        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
177        metadata_manager: MetadataManager,
178        track_snapshot: CompactionTrackSnapshot,
179    ) -> Self {
180        Self {
181            sink_id,
182            task_type,
183            inner,
184            metadata_manager,
185            handle_success: false,
186            track_snapshot,
187        }
188    }
189
190    pub async fn send_compact_task(
191        mut self,
192        compactor: Arc<IcebergCompactor>,
193        task_id: u64,
194    ) -> MetaResult<()> {
195        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
196        let Some(prost_sink_catalog) = self
197            .metadata_manager
198            .catalog_controller
199            .get_sink_by_id(self.sink_id)
200            .await?
201        else {
202            // The sink may be deleted, just return Ok.
203            tracing::warn!("Sink not found: {}", self.sink_id);
204            return Ok(());
205        };
206        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
207        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
208
209        let result =
210            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
211                task_id,
212                props: param.properties,
213                task_type: self.task_type as i32,
214            }));
215
216        if result.is_ok() {
217            self.handle_success = true;
218        }
219
220        result
221    }
222
223    pub fn sink_id(&self) -> SinkId {
224        self.sink_id
225    }
226}
227
228impl Drop for IcebergCompactionHandle {
229    fn drop(&mut self) {
230        let mut guard = self.inner.write();
231        if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
232            // Only restore/complete if this handle's task_type matches the track's task_type
233            if track.task_type == self.task_type {
234                if self.handle_success {
235                    track.complete_processing();
236                } else {
237                    // If the handle is not successful, we need to restore the track from snapshot.
238                    // This is to avoid the case where the handle is dropped before the compaction task is sent.
239                    track.restore_from_snapshot(self.track_snapshot.clone());
240                }
241            }
242        }
243    }
244}
245
246struct IcebergCompactionManagerInner {
247    pub sink_schedules: HashMap<SinkId, CompactionTrack>,
248}
249
250pub struct IcebergCompactionManager {
251    pub env: MetaSrvEnv,
252    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
253
254    metadata_manager: MetadataManager,
255    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
256
257    compactor_streams_change_tx: CompactorChangeTx,
258
259    pub metrics: Arc<MetaMetrics>,
260}
261
262impl IcebergCompactionManager {
263    pub fn build(
264        env: MetaSrvEnv,
265        metadata_manager: MetadataManager,
266        iceberg_compactor_manager: IcebergCompactorManagerRef,
267        metrics: Arc<MetaMetrics>,
268    ) -> (Arc<Self>, CompactorChangeRx) {
269        let (compactor_streams_change_tx, compactor_streams_change_rx) =
270            tokio::sync::mpsc::unbounded_channel();
271        (
272            Arc::new(Self {
273                env,
274                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
275                    sink_schedules: HashMap::default(),
276                })),
277                metadata_manager,
278                iceberg_compactor_manager,
279                compactor_streams_change_tx,
280                metrics,
281            }),
282            compactor_streams_change_rx,
283        )
284    }
285
286    pub fn compaction_stat_loop(
287        manager: Arc<Self>,
288        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
289    ) -> (JoinHandle<()>, Sender<()>) {
290        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
291        let join_handle = tokio::spawn(async move {
292            loop {
293                tokio::select! {
294                    Some(stat) = rx.recv() => {
295                        manager.update_iceberg_commit_info(stat).await;
296                    },
297                    _ = &mut shutdown_rx => {
298                        tracing::info!("Iceberg compaction manager is stopped");
299                        return;
300                    }
301                }
302            }
303        });
304
305        (join_handle, shutdown_tx)
306    }
307
308    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
309        let IcebergSinkCompactionUpdate {
310            sink_id,
311            compaction_interval,
312            force_compaction,
313        } = msg;
314
315        // Check if track exists
316        let track_exists = {
317            let guard = self.inner.read();
318            guard.sink_schedules.contains_key(&sink_id)
319        };
320
321        // Create track if it doesn't exist
322        if !track_exists {
323            // Load config first (async operation outside of lock)
324            let iceberg_config = self.load_iceberg_config(sink_id).await;
325
326            let new_track = match iceberg_config {
327                Ok(config) => {
328                    // Call synchronous create function with the config
329                    match self.create_compaction_track(sink_id, &config) {
330                        Ok(track) => track,
331                        Err(e) => {
332                            tracing::error!(
333                                error = ?e.as_report(),
334                                "Failed to create compaction track from config for sink {}, using default Full track",
335                                sink_id
336                            );
337                            // Fallback to default Full track
338                            CompactionTrack {
339                                task_type: TaskType::Full,
340                                trigger_interval_sec: compaction_interval,
341                                trigger_snapshot_count: 10,
342                                state: CompactionTrackState::Idle {
343                                    next_compaction_time: Instant::now()
344                                        + std::time::Duration::from_secs(compaction_interval),
345                                },
346                            }
347                        }
348                    }
349                }
350                Err(e) => {
351                    tracing::error!(
352                        error = ?e.as_report(),
353                        "Failed to load iceberg config for sink {}, using default Full track",
354                        sink_id
355                    );
356                    // Fallback to default Full track
357                    CompactionTrack {
358                        task_type: TaskType::Full,
359                        trigger_interval_sec: compaction_interval,
360                        trigger_snapshot_count: 10,
361                        state: CompactionTrackState::Idle {
362                            next_compaction_time: Instant::now()
363                                + std::time::Duration::from_secs(compaction_interval),
364                        },
365                    }
366                }
367            };
368
369            let mut guard = self.inner.write();
370            guard.sink_schedules.insert(sink_id, new_track);
371        }
372
373        // Update track
374        let mut guard = self.inner.write();
375        if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
376            // Force compaction: set to trigger immediately if in Idle state
377            if force_compaction {
378                if let CompactionTrackState::Idle {
379                    next_compaction_time,
380                } = &mut track.state
381                {
382                    *next_compaction_time = Instant::now();
383                }
384                // Skip Processing tracks - they will complete naturally
385            } else {
386                // Update interval if changed
387                track.update_interval(compaction_interval, Instant::now());
388            }
389        } else {
390            tracing::error!(
391                "Failed to find compaction track for sink {} during update; configuration changes not applied.",
392                sink_id
393            );
394        }
395    }
396
397    /// Create a compaction track for a sink based on its Iceberg configuration
398    fn create_compaction_track(
399        &self,
400        _sink_id: SinkId,
401        iceberg_config: &IcebergConfig,
402    ) -> MetaResult<CompactionTrack> {
403        let trigger_interval_sec = iceberg_config.compaction_interval_sec();
404        let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
405
406        // For `copy-on-write` mode, always use Full compaction regardless of config
407        let task_type = if should_enable_iceberg_cow(
408            iceberg_config.r#type.as_str(),
409            iceberg_config.write_mode.as_str(),
410        ) {
411            TaskType::Full
412        } else {
413            // For `merge-on-read` mode, use configured compaction_type
414            match iceberg_config.compaction_type() {
415                CompactionType::Full => TaskType::Full,
416                CompactionType::SmallFiles => TaskType::SmallFiles,
417                CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
418            }
419        };
420
421        Ok(CompactionTrack {
422            task_type,
423            trigger_interval_sec,
424            trigger_snapshot_count,
425            state: CompactionTrackState::Idle {
426                next_compaction_time: Instant::now()
427                    + std::time::Duration::from_secs(trigger_interval_sec),
428            },
429        })
430    }
431
432    /// Get the top N compaction tasks to trigger
433    /// Returns handles for tasks that are ready to be compacted
434    /// Sorted by commit count and next compaction time
435    pub async fn get_top_n_iceberg_commit_sink_ids(
436        &self,
437        n: usize,
438    ) -> Vec<IcebergCompactionHandle> {
439        let now = Instant::now();
440
441        // Collect all sink_ids to check
442        let sink_ids: Vec<SinkId> = {
443            let guard = self.inner.read();
444            guard.sink_schedules.keys().cloned().collect()
445        };
446
447        // Fetch snapshot counts for all sinks in parallel
448        let snapshot_count_futures = sink_ids
449            .iter()
450            .map(|sink_id| async move {
451                let count = self.get_snapshot_count(*sink_id).await?;
452                Some((*sink_id, count))
453            })
454            .collect::<Vec<_>>();
455
456        let snapshot_counts: HashMap<SinkId, usize> =
457            futures::future::join_all(snapshot_count_futures)
458                .await
459                .into_iter()
460                .flatten()
461                .collect();
462
463        let mut guard = self.inner.write();
464
465        // Collect all triggerable tasks with their priority info
466        let mut candidates = Vec::new();
467        for (sink_id, track) in &guard.sink_schedules {
468            // Skip sinks that failed to get snapshot count
469            let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
470                continue;
471            };
472            if track.should_trigger(now, snapshot_count) {
473                // Extract next_time from Idle state (triggerable means Idle)
474                if let CompactionTrackState::Idle {
475                    next_compaction_time,
476                } = track.state
477                {
478                    candidates.push((*sink_id, track.task_type, next_compaction_time));
479                }
480            }
481        }
482
483        // Sort by next_compaction_time (ascending) - earlier times have higher priority
484        candidates.sort_by(|a, b| a.2.cmp(&b.2));
485
486        // Take top N and create handles
487        candidates
488            .into_iter()
489            .take(n)
490            .filter_map(|(sink_id, task_type, _)| {
491                let track = guard.sink_schedules.get_mut(&sink_id)?;
492
493                let track_snapshot = track.start_processing();
494
495                Some(IcebergCompactionHandle::new(
496                    sink_id,
497                    task_type,
498                    self.inner.clone(),
499                    self.metadata_manager.clone(),
500                    track_snapshot,
501                ))
502            })
503            .collect()
504    }
505
506    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
507        let mut guard = self.inner.write();
508        guard.sink_schedules.remove(&sink_id);
509    }
510
511    pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
512        let prost_sink_catalog = self
513            .metadata_manager
514            .catalog_controller
515            .get_sink_by_id(sink_id)
516            .await?
517            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
518        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
519        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
520        Ok(param)
521    }
522
523    pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
524        let sink_param = self.get_sink_param(sink_id).await?;
525        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
526        Ok(iceberg_config)
527    }
528
529    pub fn add_compactor_stream(
530        &self,
531        context_id: WorkerId,
532        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
533    ) {
534        self.compactor_streams_change_tx
535            .send((context_id, req_stream))
536            .unwrap();
537    }
538
539    pub fn iceberg_compaction_event_loop(
540        iceberg_compaction_manager: Arc<Self>,
541        compactor_streams_change_rx: UnboundedReceiver<(
542            WorkerId,
543            Streaming<SubscribeIcebergCompactionEventRequest>,
544        )>,
545    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
546        let mut join_handle_vec = Vec::default();
547
548        let iceberg_compaction_event_handler =
549            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
550
551        let iceberg_compaction_event_dispatcher =
552            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
553
554        let event_loop = IcebergCompactionEventLoop::new(
555            iceberg_compaction_event_dispatcher,
556            iceberg_compaction_manager.metrics.clone(),
557            compactor_streams_change_rx,
558        );
559
560        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
561        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
562
563        join_handle_vec
564    }
565
566    /// GC loop for expired snapshots management
567    /// This is a separate loop that periodically checks all tracked Iceberg tables
568    /// and performs garbage collection operations like expiring old snapshots
569    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
570        assert!(
571            interval_sec > 0,
572            "Iceberg GC interval must be greater than 0"
573        );
574        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
575        let join_handle = tokio::spawn(async move {
576            tracing::info!(
577                interval_sec = interval_sec,
578                "Starting Iceberg GC loop with configurable interval"
579            );
580            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
581
582            loop {
583                tokio::select! {
584                    _ = interval.tick() => {
585                        if let Err(e) = manager.perform_gc_operations().await {
586                            tracing::error!(error = ?e.as_report(), "GC operations failed");
587                        }
588                    },
589                    _ = &mut shutdown_rx => {
590                        tracing::info!("Iceberg GC loop is stopped");
591                        return;
592                    }
593                }
594            }
595        });
596
597        (join_handle, shutdown_tx)
598    }
599
600    /// Trigger manual compaction for a specific sink and wait for completion
601    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
602    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
603        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
604
605        // Load the initial table state to get the current snapshot
606        let iceberg_config = self.load_iceberg_config(sink_id).await?;
607        let initial_table = iceberg_config.load_table().await?;
608        let initial_snapshot_id = initial_table
609            .metadata()
610            .current_snapshot()
611            .map(|s| s.snapshot_id())
612            .unwrap_or(0); // Use 0 if no snapshots exist
613        let initial_timestamp = chrono::Utc::now().timestamp_millis();
614
615        // Get a compactor to send the task to
616        let compactor = self
617            .iceberg_compactor_manager
618            .next_compactor()
619            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
620
621        // Generate a unique task ID
622        let task_id = self
623            .env
624            .hummock_seq
625            .next_interval("compaction_task", 1)
626            .await?;
627
628        let sink_param = self.get_sink_param(sink_id).await?;
629
630        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
631            task_id,
632            props: sink_param.properties,
633            task_type: TaskType::Full as i32, // default to full compaction
634        }))?;
635
636        tracing::info!(
637            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
638            sink_id,
639            task_id
640        );
641
642        self.wait_for_compaction_completion(
643            &sink_id,
644            iceberg_config,
645            initial_snapshot_id,
646            initial_timestamp,
647            task_id,
648        )
649        .await?;
650
651        Ok(task_id)
652    }
653
654    async fn wait_for_compaction_completion(
655        &self,
656        sink_id: &SinkId,
657        iceberg_config: IcebergConfig,
658        initial_snapshot_id: i64,
659        initial_timestamp: i64,
660        task_id: u64,
661    ) -> MetaResult<()> {
662        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
663        const MAX_POLL_INTERVAL_SECS: u64 = 60;
664        const MAX_WAIT_TIME_SECS: u64 = 1800;
665        const BACKOFF_MULTIPLIER: f64 = 1.5;
666
667        let mut elapsed_time = 0;
668        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
669
670        let cow = should_enable_iceberg_cow(
671            iceberg_config.r#type.as_str(),
672            iceberg_config.write_mode.as_str(),
673        );
674
675        while elapsed_time < MAX_WAIT_TIME_SECS {
676            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
677            tokio::time::sleep(poll_interval).await;
678            elapsed_time += current_interval_secs;
679
680            tracing::info!(
681                "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
682                sink_id,
683                task_id,
684                elapsed_time,
685                current_interval_secs
686            );
687
688            let current_table = iceberg_config.load_table().await?;
689
690            let metadata = current_table.metadata();
691            let new_snapshots: Vec<_> = metadata
692                .snapshots()
693                .filter(|snapshot| {
694                    let snapshot_timestamp = snapshot.timestamp_ms();
695                    let snapshot_id = snapshot.snapshot_id();
696                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
697                })
698                .collect();
699
700            for snapshot in new_snapshots {
701                let summary = snapshot.summary();
702                if cow {
703                    if matches!(summary.operation, Operation::Overwrite) {
704                        tracing::info!(
705                            "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
706                            sink_id,
707                            task_id
708                        );
709                        return Ok(());
710                    }
711                } else if matches!(summary.operation, Operation::Replace) {
712                    tracing::info!(
713                        "Iceberg compaction completed for sink {} task_id={} with Replace operation",
714                        sink_id,
715                        task_id
716                    );
717                    return Ok(());
718                }
719            }
720
721            current_interval_secs = std::cmp::min(
722                MAX_POLL_INTERVAL_SECS,
723                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
724            );
725        }
726
727        Err(anyhow!(
728            "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
729            MAX_WAIT_TIME_SECS,
730            sink_id,
731            task_id
732        )
733        .into())
734    }
735
736    async fn perform_gc_operations(&self) -> MetaResult<()> {
737        let sink_ids = {
738            let guard = self.inner.read();
739            guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
740        };
741
742        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
743
744        for sink_id in sink_ids {
745            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
746                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
747            }
748        }
749
750        tracing::info!("GC operations completed");
751        Ok(())
752    }
753
754    /// Get the snapshot count for a sink's Iceberg table
755    /// Returns None if the table cannot be loaded
756    async fn get_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
757        let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
758        let catalog = iceberg_config.create_catalog().await.ok()?;
759        let table_name = iceberg_config.full_table_name().ok()?;
760        let table = catalog.load_table(&table_name).await.ok()?;
761
762        let metadata = table.metadata();
763        let mut snapshots = metadata.snapshots().collect_vec();
764
765        if snapshots.is_empty() {
766            return Some(0);
767        }
768
769        // Sort snapshots by timestamp
770        snapshots.sort_by_key(|s| s.timestamp_ms());
771
772        // Find the last Replace operation snapshot
773        let last_replace_index = snapshots
774            .iter()
775            .rposition(|snapshot| matches!(snapshot.summary().operation, Operation::Replace));
776
777        // Calculate count from last Replace to the latest snapshot
778        let snapshot_count = match last_replace_index {
779            Some(index) => snapshots.len() - index - 1,
780            None => snapshots.len(), // No Replace found, count all snapshots
781        };
782
783        Some(snapshot_count)
784    }
785
786    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
787        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 24 hours
788        let now = chrono::Utc::now().timestamp_millis();
789
790        let iceberg_config = self.load_iceberg_config(sink_id).await?;
791        if !iceberg_config.enable_snapshot_expiration {
792            return Ok(());
793        }
794
795        let catalog = iceberg_config.create_catalog().await?;
796        let table = catalog
797            .load_table(&iceberg_config.full_table_name()?)
798            .await
799            .map_err(|e| SinkError::Iceberg(e.into()))?;
800
801        let metadata = table.metadata();
802        let mut snapshots = metadata.snapshots().collect_vec();
803        snapshots.sort_by_key(|s| s.timestamp_ms());
804
805        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
806
807        let snapshot_expiration_timestamp_ms =
808            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
809                Some(timestamp) => timestamp,
810                None => default_snapshot_expiration_timestamp_ms,
811            };
812
813        if snapshots.is_empty()
814            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
815        {
816            // avoid commit empty table updates
817            return Ok(());
818        }
819
820        tracing::info!(
821            catalog_name = iceberg_config.catalog_name(),
822            table_name = iceberg_config.full_table_name()?.to_string(),
823            %sink_id,
824            snapshots_len = snapshots.len(),
825            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
826            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
827            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
828            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
829            "try trigger snapshots expiration",
830        );
831
832        let tx = Transaction::new(&table);
833
834        let mut expired_snapshots = tx.expire_snapshot();
835
836        expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
837
838        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
839            expired_snapshots = expired_snapshots.retain_last(retain_last);
840        }
841
842        expired_snapshots = expired_snapshots
843            .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
844
845        expired_snapshots = expired_snapshots
846            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
847
848        let tx = expired_snapshots
849            .apply()
850            .await
851            .map_err(|e| SinkError::Iceberg(e.into()))?;
852
853        tx.commit(catalog.as_ref())
854            .await
855            .map_err(|e| SinkError::Iceberg(e.into()))?;
856
857        tracing::info!(
858            catalog_name = iceberg_config.catalog_name(),
859            table_name = iceberg_config.full_table_name()?.to_string(),
860            %sink_id,
861            "Expired snapshots for iceberg table",
862        );
863
864        Ok(())
865    }
866}