risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28    CompactionType, IcebergConfig, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45    IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58/// Snapshot for restoring track state on failure
59#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61    next_compaction_time: Option<Instant>,
62}
63
64/// Compaction track states using type-safe state machine pattern
65#[derive(Debug, Clone)]
66enum CompactionTrackState {
67    /// Ready to accept commits and check for trigger conditions
68    Idle { next_compaction_time: Instant },
69    /// Compaction task is being processed
70    Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75    task_type: TaskType,
76    trigger_interval_sec: u64,
77    /// Minimum snapshot count threshold to trigger compaction
78    trigger_snapshot_count: usize,
79    state: CompactionTrackState,
80}
81
82impl CompactionTrack {
83    fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
84        // Only Idle state can trigger
85        let next_compaction_time = match &self.state {
86            CompactionTrackState::Idle {
87                next_compaction_time,
88            } => *next_compaction_time,
89            CompactionTrackState::Processing => return false,
90        };
91
92        // Check both time and snapshot count conditions
93        let time_ready = now >= next_compaction_time;
94        let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
95
96        time_ready && snapshot_ready
97    }
98
99    /// Create snapshot and transition to Processing state
100    fn start_processing(&mut self) -> CompactionTrackSnapshot {
101        match &self.state {
102            CompactionTrackState::Idle {
103                next_compaction_time,
104            } => {
105                let snapshot = CompactionTrackSnapshot {
106                    next_compaction_time: Some(*next_compaction_time),
107                };
108                self.state = CompactionTrackState::Processing;
109                snapshot
110            }
111            CompactionTrackState::Processing => {
112                unreachable!("Cannot start processing when already processing")
113            }
114        }
115    }
116
117    /// Complete processing successfully
118    fn complete_processing(&mut self) {
119        match &self.state {
120            CompactionTrackState::Processing => {
121                self.state = CompactionTrackState::Idle {
122                    next_compaction_time: Instant::now()
123                        + std::time::Duration::from_secs(self.trigger_interval_sec),
124                };
125            }
126            CompactionTrackState::Idle { .. } => {
127                unreachable!("Cannot complete processing when not processing")
128            }
129        }
130    }
131
132    /// Restore from snapshot on failure
133    fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
134        self.state = CompactionTrackState::Idle {
135            next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
136        };
137    }
138
139    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
140        if self.trigger_interval_sec == new_interval_sec {
141            return;
142        }
143
144        self.trigger_interval_sec = new_interval_sec;
145
146        // Reset next_compaction_time based on current state
147        match &mut self.state {
148            CompactionTrackState::Idle {
149                next_compaction_time,
150            } => {
151                *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
152            }
153            CompactionTrackState::Processing => {
154                // Keep Processing state, will reset time when completing
155            }
156        }
157    }
158}
159
160// Removed CompactionScheduleState - each sink now only has one CompactionTrack
161pub struct IcebergCompactionHandle {
162    sink_id: SinkId,
163    task_type: TaskType,
164    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
165    metadata_manager: MetadataManager,
166    handle_success: bool,
167
168    /// Snapshot of the compaction track for recovery.
169    track_snapshot: CompactionTrackSnapshot,
170}
171
172impl IcebergCompactionHandle {
173    fn new(
174        sink_id: SinkId,
175        task_type: TaskType,
176        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
177        metadata_manager: MetadataManager,
178        track_snapshot: CompactionTrackSnapshot,
179    ) -> Self {
180        Self {
181            sink_id,
182            task_type,
183            inner,
184            metadata_manager,
185            handle_success: false,
186            track_snapshot,
187        }
188    }
189
190    pub async fn send_compact_task(
191        mut self,
192        compactor: Arc<IcebergCompactor>,
193        task_id: u64,
194    ) -> MetaResult<()> {
195        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
196        let Some(prost_sink_catalog) = self
197            .metadata_manager
198            .catalog_controller
199            .get_sink_by_id(self.sink_id)
200            .await?
201        else {
202            // The sink may be deleted, just return Ok.
203            tracing::warn!("Sink not found: {}", self.sink_id);
204            return Ok(());
205        };
206        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
207        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
208
209        let result =
210            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
211                task_id,
212                props: param.properties,
213                task_type: self.task_type as i32,
214            }));
215
216        if result.is_ok() {
217            self.handle_success = true;
218        }
219
220        result
221    }
222
223    pub fn sink_id(&self) -> SinkId {
224        self.sink_id
225    }
226}
227
228impl Drop for IcebergCompactionHandle {
229    fn drop(&mut self) {
230        let mut guard = self.inner.write();
231        if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
232            // Only restore/complete if this handle's task_type matches the track's task_type
233            if track.task_type == self.task_type {
234                if self.handle_success {
235                    track.complete_processing();
236                } else {
237                    // If the handle is not successful, we need to restore the track from snapshot.
238                    // This is to avoid the case where the handle is dropped before the compaction task is sent.
239                    track.restore_from_snapshot(self.track_snapshot.clone());
240                }
241            }
242        }
243    }
244}
245
246struct IcebergCompactionManagerInner {
247    pub sink_schedules: HashMap<SinkId, CompactionTrack>,
248}
249
250pub struct IcebergCompactionManager {
251    pub env: MetaSrvEnv,
252    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
253
254    metadata_manager: MetadataManager,
255    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
256
257    compactor_streams_change_tx: CompactorChangeTx,
258
259    pub metrics: Arc<MetaMetrics>,
260}
261
262impl IcebergCompactionManager {
263    pub fn build(
264        env: MetaSrvEnv,
265        metadata_manager: MetadataManager,
266        iceberg_compactor_manager: IcebergCompactorManagerRef,
267        metrics: Arc<MetaMetrics>,
268    ) -> (Arc<Self>, CompactorChangeRx) {
269        let (compactor_streams_change_tx, compactor_streams_change_rx) =
270            tokio::sync::mpsc::unbounded_channel();
271        (
272            Arc::new(Self {
273                env,
274                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
275                    sink_schedules: HashMap::default(),
276                })),
277                metadata_manager,
278                iceberg_compactor_manager,
279                compactor_streams_change_tx,
280                metrics,
281            }),
282            compactor_streams_change_rx,
283        )
284    }
285
286    pub fn compaction_stat_loop(
287        manager: Arc<Self>,
288        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
289    ) -> (JoinHandle<()>, Sender<()>) {
290        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
291        let join_handle = tokio::spawn(async move {
292            loop {
293                tokio::select! {
294                    Some(stat) = rx.recv() => {
295                        manager.update_iceberg_commit_info(stat).await;
296                    },
297                    _ = &mut shutdown_rx => {
298                        tracing::info!("Iceberg compaction manager is stopped");
299                        return;
300                    }
301                }
302            }
303        });
304
305        (join_handle, shutdown_tx)
306    }
307
308    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
309        let IcebergSinkCompactionUpdate {
310            sink_id,
311            compaction_interval,
312            force_compaction,
313        } = msg;
314
315        // Check if track exists
316        let track_exists = {
317            let guard = self.inner.read();
318            guard.sink_schedules.contains_key(&sink_id)
319        };
320
321        // Create track if it doesn't exist
322        if !track_exists {
323            // Load config first (async operation outside of lock)
324            let iceberg_config = self.load_iceberg_config(sink_id).await;
325
326            let new_track = match iceberg_config {
327                Ok(config) => {
328                    // Call synchronous create function with the config
329                    match self.create_compaction_track(sink_id, &config) {
330                        Ok(track) => track,
331                        Err(e) => {
332                            tracing::error!(
333                                error = ?e.as_report(),
334                                "Failed to create compaction track from config for sink {}, using default Full track",
335                                sink_id
336                            );
337                            // Fallback to default Full track
338                            CompactionTrack {
339                                task_type: TaskType::Full,
340                                trigger_interval_sec: compaction_interval,
341                                trigger_snapshot_count: 10,
342                                state: CompactionTrackState::Idle {
343                                    next_compaction_time: Instant::now()
344                                        + std::time::Duration::from_secs(compaction_interval),
345                                },
346                            }
347                        }
348                    }
349                }
350                Err(e) => {
351                    tracing::error!(
352                        error = ?e.as_report(),
353                        "Failed to load iceberg config for sink {}, using default Full track",
354                        sink_id
355                    );
356                    // Fallback to default Full track
357                    CompactionTrack {
358                        task_type: TaskType::Full,
359                        trigger_interval_sec: compaction_interval,
360                        trigger_snapshot_count: 10,
361                        state: CompactionTrackState::Idle {
362                            next_compaction_time: Instant::now()
363                                + std::time::Duration::from_secs(compaction_interval),
364                        },
365                    }
366                }
367            };
368
369            let mut guard = self.inner.write();
370            guard.sink_schedules.insert(sink_id, new_track);
371        }
372
373        // Update track
374        let mut guard = self.inner.write();
375        if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
376            // Force compaction: set to trigger immediately if in Idle state
377            if force_compaction {
378                if let CompactionTrackState::Idle {
379                    next_compaction_time,
380                } = &mut track.state
381                {
382                    *next_compaction_time = Instant::now();
383                }
384                // Skip Processing tracks - they will complete naturally
385            } else {
386                // Update interval if changed
387                track.update_interval(compaction_interval, Instant::now());
388            }
389        } else {
390            tracing::error!(
391                "Failed to find compaction track for sink {} during update; configuration changes not applied.",
392                sink_id
393            );
394        }
395    }
396
397    /// Create a compaction track for a sink based on its Iceberg configuration
398    fn create_compaction_track(
399        &self,
400        _sink_id: SinkId,
401        iceberg_config: &IcebergConfig,
402    ) -> MetaResult<CompactionTrack> {
403        let trigger_interval_sec = iceberg_config.compaction_interval_sec();
404        let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
405
406        // For `copy-on-write` mode, always use Full compaction regardless of config
407        let task_type =
408            if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
409            {
410                TaskType::Full
411            } else {
412                // For `merge-on-read` mode, use configured compaction_type
413                match iceberg_config.compaction_type() {
414                    CompactionType::Full => TaskType::Full,
415                    CompactionType::SmallFiles => TaskType::SmallFiles,
416                    CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
417                }
418            };
419
420        Ok(CompactionTrack {
421            task_type,
422            trigger_interval_sec,
423            trigger_snapshot_count,
424            state: CompactionTrackState::Idle {
425                next_compaction_time: Instant::now()
426                    + std::time::Duration::from_secs(trigger_interval_sec),
427            },
428        })
429    }
430
431    /// Get the top N compaction tasks to trigger
432    /// Returns handles for tasks that are ready to be compacted
433    /// Sorted by commit count and next compaction time
434    pub async fn get_top_n_iceberg_commit_sink_ids(
435        &self,
436        n: usize,
437    ) -> Vec<IcebergCompactionHandle> {
438        let now = Instant::now();
439
440        // Collect all sink_ids to check
441        let sink_ids: Vec<SinkId> = {
442            let guard = self.inner.read();
443            guard.sink_schedules.keys().cloned().collect()
444        };
445
446        // Fetch snapshot counts for all sinks in parallel
447        let snapshot_count_futures = sink_ids
448            .iter()
449            .map(|sink_id| async move {
450                let count = self.get_snapshot_count(*sink_id).await?;
451                Some((*sink_id, count))
452            })
453            .collect::<Vec<_>>();
454
455        let snapshot_counts: HashMap<SinkId, usize> =
456            futures::future::join_all(snapshot_count_futures)
457                .await
458                .into_iter()
459                .flatten()
460                .collect();
461
462        let mut guard = self.inner.write();
463
464        // Collect all triggerable tasks with their priority info
465        let mut candidates = Vec::new();
466        for (sink_id, track) in &guard.sink_schedules {
467            // Skip sinks that failed to get snapshot count
468            let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
469                continue;
470            };
471            if track.should_trigger(now, snapshot_count) {
472                // Extract next_time from Idle state (triggerable means Idle)
473                if let CompactionTrackState::Idle {
474                    next_compaction_time,
475                } = track.state
476                {
477                    candidates.push((*sink_id, track.task_type, next_compaction_time));
478                }
479            }
480        }
481
482        // Sort by next_compaction_time (ascending) - earlier times have higher priority
483        candidates.sort_by(|a, b| a.2.cmp(&b.2));
484
485        // Take top N and create handles
486        candidates
487            .into_iter()
488            .take(n)
489            .filter_map(|(sink_id, task_type, _)| {
490                let track = guard.sink_schedules.get_mut(&sink_id)?;
491
492                let track_snapshot = track.start_processing();
493
494                Some(IcebergCompactionHandle::new(
495                    sink_id,
496                    task_type,
497                    self.inner.clone(),
498                    self.metadata_manager.clone(),
499                    track_snapshot,
500                ))
501            })
502            .collect()
503    }
504
505    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
506        let mut guard = self.inner.write();
507        guard.sink_schedules.remove(&sink_id);
508    }
509
510    pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
511        let prost_sink_catalog = self
512            .metadata_manager
513            .catalog_controller
514            .get_sink_by_id(sink_id)
515            .await?
516            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
517        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
518        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
519        Ok(param)
520    }
521
522    pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
523        let sink_param = self.get_sink_param(sink_id).await?;
524        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
525        Ok(iceberg_config)
526    }
527
528    pub fn add_compactor_stream(
529        &self,
530        context_id: WorkerId,
531        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
532    ) {
533        self.compactor_streams_change_tx
534            .send((context_id, req_stream))
535            .unwrap();
536    }
537
538    pub fn iceberg_compaction_event_loop(
539        iceberg_compaction_manager: Arc<Self>,
540        compactor_streams_change_rx: UnboundedReceiver<(
541            WorkerId,
542            Streaming<SubscribeIcebergCompactionEventRequest>,
543        )>,
544    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
545        let mut join_handle_vec = Vec::default();
546
547        let iceberg_compaction_event_handler =
548            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
549
550        let iceberg_compaction_event_dispatcher =
551            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
552
553        let event_loop = IcebergCompactionEventLoop::new(
554            iceberg_compaction_event_dispatcher,
555            iceberg_compaction_manager.metrics.clone(),
556            compactor_streams_change_rx,
557        );
558
559        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
560        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
561
562        join_handle_vec
563    }
564
565    /// GC loop for expired snapshots management
566    /// This is a separate loop that periodically checks all tracked Iceberg tables
567    /// and performs garbage collection operations like expiring old snapshots
568    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
569        assert!(
570            interval_sec > 0,
571            "Iceberg GC interval must be greater than 0"
572        );
573        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
574        let join_handle = tokio::spawn(async move {
575            tracing::info!(
576                interval_sec = interval_sec,
577                "Starting Iceberg GC loop with configurable interval"
578            );
579            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
580
581            loop {
582                tokio::select! {
583                    _ = interval.tick() => {
584                        if let Err(e) = manager.perform_gc_operations().await {
585                            tracing::error!(error = ?e.as_report(), "GC operations failed");
586                        }
587                    },
588                    _ = &mut shutdown_rx => {
589                        tracing::info!("Iceberg GC loop is stopped");
590                        return;
591                    }
592                }
593            }
594        });
595
596        (join_handle, shutdown_tx)
597    }
598
599    /// Trigger manual compaction for a specific sink and wait for completion
600    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
601    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
602        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
603
604        // Load the initial table state to get the current snapshot
605        let iceberg_config = self.load_iceberg_config(sink_id).await?;
606        let initial_table = iceberg_config.load_table().await?;
607        let initial_snapshot_id = initial_table
608            .metadata()
609            .current_snapshot()
610            .map(|s| s.snapshot_id())
611            .unwrap_or(0); // Use 0 if no snapshots exist
612        let initial_timestamp = chrono::Utc::now().timestamp_millis();
613
614        // Get a compactor to send the task to
615        let compactor = self
616            .iceberg_compactor_manager
617            .next_compactor()
618            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
619
620        // Generate a unique task ID
621        let task_id = self
622            .env
623            .hummock_seq
624            .next_interval("compaction_task", 1)
625            .await?;
626
627        let sink_param = self.get_sink_param(sink_id).await?;
628
629        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
630            task_id,
631            props: sink_param.properties,
632            task_type: TaskType::Full as i32, // default to full compaction
633        }))?;
634
635        tracing::info!(
636            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
637            sink_id,
638            task_id
639        );
640
641        self.wait_for_compaction_completion(
642            &sink_id,
643            iceberg_config,
644            initial_snapshot_id,
645            initial_timestamp,
646            task_id,
647        )
648        .await?;
649
650        Ok(task_id)
651    }
652
653    async fn wait_for_compaction_completion(
654        &self,
655        sink_id: &SinkId,
656        iceberg_config: IcebergConfig,
657        initial_snapshot_id: i64,
658        initial_timestamp: i64,
659        task_id: u64,
660    ) -> MetaResult<()> {
661        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
662        const MAX_POLL_INTERVAL_SECS: u64 = 60;
663        const MAX_WAIT_TIME_SECS: u64 = 1800;
664        const BACKOFF_MULTIPLIER: f64 = 1.5;
665
666        let mut elapsed_time = 0;
667        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
668
669        let cow =
670            should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
671
672        while elapsed_time < MAX_WAIT_TIME_SECS {
673            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
674            tokio::time::sleep(poll_interval).await;
675            elapsed_time += current_interval_secs;
676
677            tracing::info!(
678                "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
679                sink_id,
680                task_id,
681                elapsed_time,
682                current_interval_secs
683            );
684
685            let current_table = iceberg_config.load_table().await?;
686
687            let metadata = current_table.metadata();
688            let new_snapshots: Vec<_> = metadata
689                .snapshots()
690                .filter(|snapshot| {
691                    let snapshot_timestamp = snapshot.timestamp_ms();
692                    let snapshot_id = snapshot.snapshot_id();
693                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
694                })
695                .collect();
696
697            for snapshot in new_snapshots {
698                let summary = snapshot.summary();
699                if cow {
700                    if matches!(summary.operation, Operation::Overwrite) {
701                        tracing::info!(
702                            "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
703                            sink_id,
704                            task_id
705                        );
706                        return Ok(());
707                    }
708                } else if matches!(summary.operation, Operation::Replace) {
709                    tracing::info!(
710                        "Iceberg compaction completed for sink {} task_id={} with Replace operation",
711                        sink_id,
712                        task_id
713                    );
714                    return Ok(());
715                }
716            }
717
718            current_interval_secs = std::cmp::min(
719                MAX_POLL_INTERVAL_SECS,
720                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
721            );
722        }
723
724        Err(anyhow!(
725            "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
726            MAX_WAIT_TIME_SECS,
727            sink_id,
728            task_id
729        )
730        .into())
731    }
732
733    async fn perform_gc_operations(&self) -> MetaResult<()> {
734        let sink_ids = {
735            let guard = self.inner.read();
736            guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
737        };
738
739        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
740
741        for sink_id in sink_ids {
742            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
743                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
744            }
745        }
746
747        tracing::info!("GC operations completed");
748        Ok(())
749    }
750
751    /// Get the snapshot count for a sink's Iceberg table
752    /// Returns None if the table cannot be loaded
753    async fn get_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
754        let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
755        let catalog = iceberg_config.create_catalog().await.ok()?;
756        let table_name = iceberg_config.full_table_name().ok()?;
757        let table = catalog.load_table(&table_name).await.ok()?;
758
759        let metadata = table.metadata();
760        let mut snapshots = metadata.snapshots().collect_vec();
761
762        if snapshots.is_empty() {
763            return Some(0);
764        }
765
766        // Sort snapshots by timestamp
767        snapshots.sort_by_key(|s| s.timestamp_ms());
768
769        // Find the last Replace operation snapshot
770        let last_replace_index = snapshots
771            .iter()
772            .rposition(|snapshot| matches!(snapshot.summary().operation, Operation::Replace));
773
774        // Calculate count from last Replace to the latest snapshot
775        let snapshot_count = match last_replace_index {
776            Some(index) => snapshots.len() - index - 1,
777            None => snapshots.len(), // No Replace found, count all snapshots
778        };
779
780        Some(snapshot_count)
781    }
782
783    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
784        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 24 hours
785        let now = chrono::Utc::now().timestamp_millis();
786
787        let iceberg_config = self.load_iceberg_config(sink_id).await?;
788        if !iceberg_config.enable_snapshot_expiration {
789            return Ok(());
790        }
791
792        let catalog = iceberg_config.create_catalog().await?;
793        let table = catalog
794            .load_table(&iceberg_config.full_table_name()?)
795            .await
796            .map_err(|e| SinkError::Iceberg(e.into()))?;
797
798        let metadata = table.metadata();
799        let mut snapshots = metadata.snapshots().collect_vec();
800        snapshots.sort_by_key(|s| s.timestamp_ms());
801
802        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
803
804        let snapshot_expiration_timestamp_ms =
805            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
806                Some(timestamp) => timestamp,
807                None => default_snapshot_expiration_timestamp_ms,
808            };
809
810        if snapshots.is_empty()
811            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
812        {
813            // avoid commit empty table updates
814            return Ok(());
815        }
816
817        tracing::info!(
818            catalog_name = iceberg_config.catalog_name(),
819            table_name = iceberg_config.full_table_name()?.to_string(),
820            %sink_id,
821            snapshots_len = snapshots.len(),
822            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
823            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
824            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
825            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
826            "try trigger snapshots expiration",
827        );
828
829        let tx = Transaction::new(&table);
830
831        let mut expired_snapshots = tx.expire_snapshot();
832
833        expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
834
835        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
836            expired_snapshots = expired_snapshots.retain_last(retain_last);
837        }
838
839        expired_snapshots = expired_snapshots
840            .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
841
842        expired_snapshots = expired_snapshots
843            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
844
845        let tx = expired_snapshots
846            .apply()
847            .await
848            .map_err(|e| SinkError::Iceberg(e.into()))?;
849
850        tx.commit(catalog.as_ref())
851            .await
852            .map_err(|e| SinkError::Iceberg(e.into()))?;
853
854        tracing::info!(
855            catalog_name = iceberg_config.catalog_name(),
856            table_name = iceberg_config.full_table_name()?.to_string(),
857            %sink_id,
858            "Expired snapshots for iceberg table",
859        );
860
861        Ok(())
862    }
863}