risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::{ApplyTransactionAction, Transaction};
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28    CompactionType, IcebergConfig, commit_branch, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45    IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58/// Snapshot for restoring track state on failure
59#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61    next_compaction_time: Option<Instant>,
62}
63
64/// Compaction track states using type-safe state machine pattern
65#[derive(Debug, Clone)]
66enum CompactionTrackState {
67    /// Ready to accept commits and check for trigger conditions
68    Idle { next_compaction_time: Instant },
69    /// Compaction task is being processed
70    Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75    task_type: TaskType,
76    trigger_interval_sec: u64,
77    /// Minimum snapshot count threshold to trigger compaction (early trigger).
78    /// Compaction triggers when `pending_snapshot_count` >= this threshold, even before interval expires.
79    trigger_snapshot_count: usize,
80    state: CompactionTrackState,
81}
82
83impl CompactionTrack {
84    /// Determines if compaction should be triggered.
85    ///
86    /// Trigger conditions (OR logic):
87    /// 1. `snapshot_ready` - Snapshot count >= threshold (early trigger)
88    /// 2. `time_ready && has_snapshots` - Interval expired and there's at least 1 snapshot
89    ///
90    /// This ensures:
91    /// - `trigger_snapshot_count` is an early trigger threshold
92    /// - `compaction_interval_sec` is the maximum wait time (as long as there are new snapshots)
93    /// - Force compaction works by setting `next_compaction_time` to now
94    /// - No empty compaction runs (requires at least 1 snapshot for time-based trigger)
95    fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
96        // Only Idle state can trigger
97        let next_compaction_time = match &self.state {
98            CompactionTrackState::Idle {
99                next_compaction_time,
100            } => *next_compaction_time,
101            CompactionTrackState::Processing => return false,
102        };
103
104        // Check conditions
105        let time_ready = now >= next_compaction_time;
106        let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
107        let has_snapshots = snapshot_count > 0;
108
109        // OR logic: snapshot threshold triggers early,
110        // time trigger requires at least 1 snapshot to avoid empty compaction
111        snapshot_ready || (time_ready && has_snapshots)
112    }
113
114    /// Create snapshot and transition to Processing state
115    fn start_processing(&mut self) -> CompactionTrackSnapshot {
116        match &self.state {
117            CompactionTrackState::Idle {
118                next_compaction_time,
119            } => {
120                let snapshot = CompactionTrackSnapshot {
121                    next_compaction_time: Some(*next_compaction_time),
122                };
123                self.state = CompactionTrackState::Processing;
124                snapshot
125            }
126            CompactionTrackState::Processing => {
127                unreachable!("Cannot start processing when already processing")
128            }
129        }
130    }
131
132    /// Complete processing successfully
133    fn complete_processing(&mut self) {
134        match &self.state {
135            CompactionTrackState::Processing => {
136                self.state = CompactionTrackState::Idle {
137                    next_compaction_time: Instant::now()
138                        + std::time::Duration::from_secs(self.trigger_interval_sec),
139                };
140            }
141            CompactionTrackState::Idle { .. } => {
142                unreachable!("Cannot complete processing when not processing")
143            }
144        }
145    }
146
147    /// Restore from snapshot on failure
148    fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
149        self.state = CompactionTrackState::Idle {
150            next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
151        };
152    }
153
154    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
155        if self.trigger_interval_sec == new_interval_sec {
156            return;
157        }
158
159        self.trigger_interval_sec = new_interval_sec;
160
161        // Reset next_compaction_time based on current state
162        match &mut self.state {
163            CompactionTrackState::Idle {
164                next_compaction_time,
165            } => {
166                *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
167            }
168            CompactionTrackState::Processing => {
169                // Keep Processing state, will reset time when completing
170            }
171        }
172    }
173}
174
175// Removed CompactionScheduleState - each sink now only has one CompactionTrack
176pub struct IcebergCompactionHandle {
177    sink_id: SinkId,
178    task_type: TaskType,
179    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
180    metadata_manager: MetadataManager,
181    handle_success: bool,
182
183    /// Snapshot of the compaction track for recovery.
184    track_snapshot: CompactionTrackSnapshot,
185}
186
187impl IcebergCompactionHandle {
188    fn new(
189        sink_id: SinkId,
190        task_type: TaskType,
191        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
192        metadata_manager: MetadataManager,
193        track_snapshot: CompactionTrackSnapshot,
194    ) -> Self {
195        Self {
196            sink_id,
197            task_type,
198            inner,
199            metadata_manager,
200            handle_success: false,
201            track_snapshot,
202        }
203    }
204
205    pub async fn send_compact_task(
206        mut self,
207        compactor: Arc<IcebergCompactor>,
208        task_id: u64,
209    ) -> MetaResult<()> {
210        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
211        let Some(prost_sink_catalog) = self
212            .metadata_manager
213            .catalog_controller
214            .get_sink_by_id(self.sink_id)
215            .await?
216        else {
217            // The sink may be deleted, just return Ok.
218            tracing::warn!("Sink not found: {}", self.sink_id);
219            return Ok(());
220        };
221        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
222        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
223
224        let result =
225            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
226                task_id,
227                props: param.properties,
228                task_type: self.task_type as i32,
229            }));
230
231        if result.is_ok() {
232            self.handle_success = true;
233        }
234
235        result
236    }
237
238    pub fn sink_id(&self) -> SinkId {
239        self.sink_id
240    }
241}
242
243impl Drop for IcebergCompactionHandle {
244    fn drop(&mut self) {
245        let mut guard = self.inner.write();
246        if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
247            // Only restore/complete if this handle's task_type matches the track's task_type
248            if track.task_type == self.task_type {
249                if self.handle_success {
250                    track.complete_processing();
251                } else {
252                    // If the handle is not successful, we need to restore the track from snapshot.
253                    // This is to avoid the case where the handle is dropped before the compaction task is sent.
254                    track.restore_from_snapshot(self.track_snapshot.clone());
255                }
256            }
257        }
258    }
259}
260
261struct IcebergCompactionManagerInner {
262    pub sink_schedules: HashMap<SinkId, CompactionTrack>,
263}
264
265pub struct IcebergCompactionManager {
266    pub env: MetaSrvEnv,
267    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
268
269    metadata_manager: MetadataManager,
270    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
271
272    compactor_streams_change_tx: CompactorChangeTx,
273
274    pub metrics: Arc<MetaMetrics>,
275}
276
277impl IcebergCompactionManager {
278    pub fn build(
279        env: MetaSrvEnv,
280        metadata_manager: MetadataManager,
281        iceberg_compactor_manager: IcebergCompactorManagerRef,
282        metrics: Arc<MetaMetrics>,
283    ) -> (Arc<Self>, CompactorChangeRx) {
284        let (compactor_streams_change_tx, compactor_streams_change_rx) =
285            tokio::sync::mpsc::unbounded_channel();
286        (
287            Arc::new(Self {
288                env,
289                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
290                    sink_schedules: HashMap::default(),
291                })),
292                metadata_manager,
293                iceberg_compactor_manager,
294                compactor_streams_change_tx,
295                metrics,
296            }),
297            compactor_streams_change_rx,
298        )
299    }
300
301    pub fn compaction_stat_loop(
302        manager: Arc<Self>,
303        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
304    ) -> (JoinHandle<()>, Sender<()>) {
305        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
306        let join_handle = tokio::spawn(async move {
307            loop {
308                tokio::select! {
309                    Some(stat) = rx.recv() => {
310                        manager.update_iceberg_commit_info(stat).await;
311                    },
312                    _ = &mut shutdown_rx => {
313                        tracing::info!("Iceberg compaction manager is stopped");
314                        return;
315                    }
316                }
317            }
318        });
319
320        (join_handle, shutdown_tx)
321    }
322
323    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
324        let IcebergSinkCompactionUpdate {
325            sink_id,
326            compaction_interval,
327            force_compaction,
328        } = msg;
329
330        // Check if track exists
331        let track_exists = {
332            let guard = self.inner.read();
333            guard.sink_schedules.contains_key(&sink_id)
334        };
335
336        // Create track if it doesn't exist
337        if !track_exists {
338            // Load config first (async operation outside of lock)
339            let iceberg_config = self.load_iceberg_config(sink_id).await;
340
341            let new_track = match iceberg_config {
342                Ok(config) => {
343                    // Call synchronous create function with the config
344                    match self.create_compaction_track(sink_id, &config) {
345                        Ok(track) => track,
346                        Err(e) => {
347                            tracing::error!(
348                                error = ?e.as_report(),
349                                "Failed to create compaction track from config for sink {}, using default Full track",
350                                sink_id
351                            );
352                            // Fallback to default Full track
353                            CompactionTrack {
354                                task_type: TaskType::Full,
355                                trigger_interval_sec: compaction_interval,
356                                trigger_snapshot_count: 10,
357                                state: CompactionTrackState::Idle {
358                                    next_compaction_time: Instant::now()
359                                        + std::time::Duration::from_secs(compaction_interval),
360                                },
361                            }
362                        }
363                    }
364                }
365                Err(e) => {
366                    tracing::error!(
367                        error = ?e.as_report(),
368                        "Failed to load iceberg config for sink {}, using default Full track",
369                        sink_id
370                    );
371                    // Fallback to default Full track
372                    CompactionTrack {
373                        task_type: TaskType::Full,
374                        trigger_interval_sec: compaction_interval,
375                        trigger_snapshot_count: 10,
376                        state: CompactionTrackState::Idle {
377                            next_compaction_time: Instant::now()
378                                + std::time::Duration::from_secs(compaction_interval),
379                        },
380                    }
381                }
382            };
383
384            let mut guard = self.inner.write();
385            guard.sink_schedules.insert(sink_id, new_track);
386        }
387
388        // Update track
389        let mut guard = self.inner.write();
390        if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
391            // Force compaction: trigger immediately by setting next_compaction_time to now
392            if force_compaction {
393                if let CompactionTrackState::Idle {
394                    next_compaction_time,
395                } = &mut track.state
396                {
397                    *next_compaction_time = Instant::now();
398                }
399                // Skip Processing tracks - they will complete naturally
400            } else {
401                // Update interval if changed
402                track.update_interval(compaction_interval, Instant::now());
403            }
404        } else {
405            tracing::error!(
406                "Failed to find compaction track for sink {} during update; configuration changes not applied.",
407                sink_id
408            );
409        }
410    }
411
412    /// Create a compaction track for a sink based on its Iceberg configuration
413    fn create_compaction_track(
414        &self,
415        _sink_id: SinkId,
416        iceberg_config: &IcebergConfig,
417    ) -> MetaResult<CompactionTrack> {
418        let trigger_interval_sec = iceberg_config.compaction_interval_sec();
419        let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
420
421        // For `copy-on-write` mode, always use Full compaction regardless of config
422        let task_type =
423            if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
424            {
425                TaskType::Full
426            } else {
427                // For `merge-on-read` mode, use configured compaction_type
428                match iceberg_config.compaction_type() {
429                    CompactionType::Full => TaskType::Full,
430                    CompactionType::SmallFiles => TaskType::SmallFiles,
431                    CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
432                }
433            };
434
435        Ok(CompactionTrack {
436            task_type,
437            trigger_interval_sec,
438            trigger_snapshot_count,
439            state: CompactionTrackState::Idle {
440                next_compaction_time: Instant::now()
441                    + std::time::Duration::from_secs(trigger_interval_sec),
442            },
443        })
444    }
445
446    /// Get the top N compaction tasks to trigger
447    /// Returns handles for tasks that are ready to be compacted
448    /// Sorted by commit count and next compaction time
449    pub async fn get_top_n_iceberg_commit_sink_ids(
450        &self,
451        n: usize,
452    ) -> Vec<IcebergCompactionHandle> {
453        let now = Instant::now();
454
455        // Collect all sink_ids to check
456        let sink_ids: Vec<SinkId> = {
457            let guard = self.inner.read();
458            guard.sink_schedules.keys().cloned().collect()
459        };
460
461        // Fetch snapshot counts for all sinks in parallel
462        let snapshot_count_futures = sink_ids
463            .iter()
464            .map(|sink_id| async move {
465                let count = self.get_pending_snapshot_count(*sink_id).await?;
466                Some((*sink_id, count))
467            })
468            .collect::<Vec<_>>();
469
470        let snapshot_counts: HashMap<SinkId, usize> =
471            futures::future::join_all(snapshot_count_futures)
472                .await
473                .into_iter()
474                .flatten()
475                .collect();
476
477        let mut guard = self.inner.write();
478
479        // Collect all triggerable tasks with their priority info
480        let mut candidates = Vec::new();
481        for (sink_id, track) in &guard.sink_schedules {
482            // Skip sinks that failed to get snapshot count
483            let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
484                continue;
485            };
486            if track.should_trigger(now, snapshot_count) {
487                // Extract next_time from Idle state (triggerable means Idle)
488                if let CompactionTrackState::Idle {
489                    next_compaction_time,
490                } = track.state
491                {
492                    candidates.push((*sink_id, track.task_type, next_compaction_time));
493                }
494            }
495        }
496
497        // Sort by next_compaction_time (ascending) - earlier times have higher priority
498        candidates.sort_by(|a, b| a.2.cmp(&b.2));
499
500        // Take top N and create handles
501        candidates
502            .into_iter()
503            .take(n)
504            .filter_map(|(sink_id, task_type, _)| {
505                let track = guard.sink_schedules.get_mut(&sink_id)?;
506
507                let track_snapshot = track.start_processing();
508
509                Some(IcebergCompactionHandle::new(
510                    sink_id,
511                    task_type,
512                    self.inner.clone(),
513                    self.metadata_manager.clone(),
514                    track_snapshot,
515                ))
516            })
517            .collect()
518    }
519
520    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
521        let mut guard = self.inner.write();
522        guard.sink_schedules.remove(&sink_id);
523    }
524
525    pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
526        let prost_sink_catalog = self
527            .metadata_manager
528            .catalog_controller
529            .get_sink_by_id(sink_id)
530            .await?
531            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
532        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
533        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
534        Ok(param)
535    }
536
537    pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
538        let sink_param = self.get_sink_param(sink_id).await?;
539        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
540        Ok(iceberg_config)
541    }
542
543    pub fn add_compactor_stream(
544        &self,
545        context_id: WorkerId,
546        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
547    ) {
548        self.compactor_streams_change_tx
549            .send((context_id, req_stream))
550            .unwrap();
551    }
552
553    pub fn iceberg_compaction_event_loop(
554        iceberg_compaction_manager: Arc<Self>,
555        compactor_streams_change_rx: UnboundedReceiver<(
556            WorkerId,
557            Streaming<SubscribeIcebergCompactionEventRequest>,
558        )>,
559    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
560        let mut join_handle_vec = Vec::default();
561
562        let iceberg_compaction_event_handler =
563            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
564
565        let iceberg_compaction_event_dispatcher =
566            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
567
568        let event_loop = IcebergCompactionEventLoop::new(
569            iceberg_compaction_event_dispatcher,
570            iceberg_compaction_manager.metrics.clone(),
571            compactor_streams_change_rx,
572        );
573
574        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
575        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
576
577        join_handle_vec
578    }
579
580    /// GC loop for expired snapshots management
581    /// This is a separate loop that periodically checks all tracked Iceberg tables
582    /// and performs garbage collection operations like expiring old snapshots
583    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
584        assert!(
585            interval_sec > 0,
586            "Iceberg GC interval must be greater than 0"
587        );
588        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
589        let join_handle = tokio::spawn(async move {
590            tracing::info!(
591                interval_sec = interval_sec,
592                "Starting Iceberg GC loop with configurable interval"
593            );
594            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
595
596            loop {
597                tokio::select! {
598                    _ = interval.tick() => {
599                        if let Err(e) = manager.perform_gc_operations().await {
600                            tracing::error!(error = ?e.as_report(), "GC operations failed");
601                        }
602                    },
603                    _ = &mut shutdown_rx => {
604                        tracing::info!("Iceberg GC loop is stopped");
605                        return;
606                    }
607                }
608            }
609        });
610
611        (join_handle, shutdown_tx)
612    }
613
614    /// Trigger manual compaction for a specific sink and wait for completion
615    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
616    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
617        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
618
619        // Load the initial table state to get the current snapshot
620        let iceberg_config = self.load_iceberg_config(sink_id).await?;
621        let initial_table = iceberg_config.load_table().await?;
622        let initial_snapshot_id = initial_table
623            .metadata()
624            .current_snapshot()
625            .map(|s| s.snapshot_id())
626            .unwrap_or(0); // Use 0 if no snapshots exist
627        let initial_timestamp = chrono::Utc::now().timestamp_millis();
628
629        // Get a compactor to send the task to
630        let compactor = self
631            .iceberg_compactor_manager
632            .next_compactor()
633            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
634
635        // Generate a unique task ID
636        let task_id = self
637            .env
638            .hummock_seq
639            .next_interval("compaction_task", 1)
640            .await?;
641
642        let sink_param = self.get_sink_param(sink_id).await?;
643
644        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
645            task_id,
646            props: sink_param.properties,
647            task_type: TaskType::Full as i32, // default to full compaction
648        }))?;
649
650        tracing::info!(
651            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
652            sink_id,
653            task_id
654        );
655
656        self.wait_for_compaction_completion(
657            &sink_id,
658            iceberg_config,
659            initial_snapshot_id,
660            initial_timestamp,
661            task_id,
662        )
663        .await?;
664
665        Ok(task_id)
666    }
667
668    async fn wait_for_compaction_completion(
669        &self,
670        sink_id: &SinkId,
671        iceberg_config: IcebergConfig,
672        initial_snapshot_id: i64,
673        initial_timestamp: i64,
674        task_id: u64,
675    ) -> MetaResult<()> {
676        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
677        const MAX_POLL_INTERVAL_SECS: u64 = 60;
678        const MAX_WAIT_TIME_SECS: u64 = 1800;
679        const BACKOFF_MULTIPLIER: f64 = 1.5;
680
681        let mut elapsed_time = 0;
682        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
683
684        let cow =
685            should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
686
687        while elapsed_time < MAX_WAIT_TIME_SECS {
688            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
689            tokio::time::sleep(poll_interval).await;
690            elapsed_time += current_interval_secs;
691
692            tracing::info!(
693                "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
694                sink_id,
695                task_id,
696                elapsed_time,
697                current_interval_secs
698            );
699
700            let current_table = iceberg_config.load_table().await?;
701
702            let metadata = current_table.metadata();
703            let new_snapshots: Vec<_> = metadata
704                .snapshots()
705                .filter(|snapshot| {
706                    let snapshot_timestamp = snapshot.timestamp_ms();
707                    let snapshot_id = snapshot.snapshot_id();
708                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
709                })
710                .collect();
711
712            for snapshot in new_snapshots {
713                let summary = snapshot.summary();
714                if cow {
715                    if matches!(summary.operation, Operation::Overwrite) {
716                        tracing::info!(
717                            "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
718                            sink_id,
719                            task_id
720                        );
721                        return Ok(());
722                    }
723                } else if matches!(summary.operation, Operation::Replace) {
724                    tracing::info!(
725                        "Iceberg compaction completed for sink {} task_id={} with Replace operation",
726                        sink_id,
727                        task_id
728                    );
729                    return Ok(());
730                }
731            }
732
733            current_interval_secs = std::cmp::min(
734                MAX_POLL_INTERVAL_SECS,
735                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
736            );
737        }
738
739        Err(anyhow!(
740            "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
741            MAX_WAIT_TIME_SECS,
742            sink_id,
743            task_id
744        )
745        .into())
746    }
747
748    async fn perform_gc_operations(&self) -> MetaResult<()> {
749        let sink_ids = {
750            let guard = self.inner.read();
751            guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
752        };
753
754        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
755
756        for sink_id in sink_ids {
757            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
758                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
759            }
760        }
761
762        tracing::info!("GC operations completed");
763        Ok(())
764    }
765
766    /// Get the number of snapshots pending compaction for a sink's Iceberg table.
767    /// Returns None if the table cannot be loaded.
768    ///
769    /// Counts snapshots since last compaction:
770    /// - For COW mode: Counts snapshots on `ingestion` branch with timestamp > current snapshot on main
771    /// - For MORE mode: Counts snapshots since last `Replace` on main branch
772    async fn get_pending_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
773        let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
774        let is_cow_mode =
775            should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
776        let catalog = iceberg_config.create_catalog().await.ok()?;
777        let table_name = iceberg_config.full_table_name().ok()?;
778        let table = catalog.load_table(&table_name).await.ok()?;
779        let metadata = table.metadata();
780
781        if is_cow_mode {
782            // COW mode: count snapshots on ingestion branch since last compaction.
783            // Compaction writes Overwrite to main branch, so the current snapshot on main
784            // is the last compaction point.
785
786            // Get last compaction timestamp from main branch's current snapshot
787            let last_compaction_timestamp = metadata
788                .current_snapshot()
789                .map(|s| s.timestamp_ms())
790                .unwrap_or(0); // 0 means no compaction has happened yet
791
792            // Count snapshots on ingestion branch with timestamp > last compaction
793            let branch = commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
794            let current_snapshot = metadata.snapshot_for_ref(&branch)?;
795
796            let mut count = 0;
797            let mut snapshot_id = Some(current_snapshot.snapshot_id());
798
799            while let Some(id) = snapshot_id {
800                let snapshot = metadata.snapshot_by_id(id)?;
801                if snapshot.timestamp_ms() > last_compaction_timestamp {
802                    count += 1;
803                    snapshot_id = snapshot.parent_snapshot_id();
804                } else {
805                    // Reached snapshots before or at last compaction, stop counting
806                    break;
807                }
808            }
809
810            Some(count)
811        } else {
812            // MORE mode: simple rposition on all snapshots (main branch only)
813            let mut snapshots = metadata.snapshots().collect_vec();
814            if snapshots.is_empty() {
815                return Some(0);
816            }
817
818            snapshots.sort_by_key(|s| s.timestamp_ms());
819
820            let last_replace_index = snapshots
821                .iter()
822                .rposition(|s| matches!(s.summary().operation, Operation::Replace));
823
824            let count = match last_replace_index {
825                Some(index) => snapshots.len() - index - 1,
826                None => snapshots.len(),
827            };
828
829            Some(count)
830        }
831    }
832
833    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
834        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 24 hours
835        let now = chrono::Utc::now().timestamp_millis();
836
837        let iceberg_config = self.load_iceberg_config(sink_id).await?;
838        if !iceberg_config.enable_snapshot_expiration {
839            return Ok(());
840        }
841
842        let catalog = iceberg_config.create_catalog().await?;
843        let mut table = catalog
844            .load_table(&iceberg_config.full_table_name()?)
845            .await
846            .map_err(|e| SinkError::Iceberg(e.into()))?;
847
848        let metadata = table.metadata();
849        let mut snapshots = metadata.snapshots().collect_vec();
850        snapshots.sort_by_key(|s| s.timestamp_ms());
851
852        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
853
854        let snapshot_expiration_timestamp_ms =
855            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
856                Some(timestamp) => timestamp,
857                None => default_snapshot_expiration_timestamp_ms,
858            };
859
860        if snapshots.is_empty()
861            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
862        {
863            // avoid commit empty table updates
864            return Ok(());
865        }
866
867        tracing::info!(
868            catalog_name = iceberg_config.catalog_name(),
869            table_name = iceberg_config.full_table_name()?.to_string(),
870            %sink_id,
871            snapshots_len = snapshots.len(),
872            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
873            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
874            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
875            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
876            "try trigger snapshots expiration",
877        );
878
879        let txn = Transaction::new(&table);
880
881        let mut expired_snapshots = txn
882            .expire_snapshot()
883            .expire_older_than(snapshot_expiration_timestamp_ms)
884            .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
885            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
886
887        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
888            expired_snapshots = expired_snapshots.retain_last(retain_last);
889        }
890
891        let before_metadata = table.metadata_ref();
892        let tx = expired_snapshots
893            .apply(txn)
894            .map_err(|e| SinkError::Iceberg(e.into()))?;
895        table = tx
896            .commit(catalog.as_ref())
897            .await
898            .map_err(|e| SinkError::Iceberg(e.into()))?;
899
900        if iceberg_config.snapshot_expiration_clear_expired_files {
901            table
902                .cleanup_expired_files(&before_metadata)
903                .await
904                .map_err(|e| SinkError::Iceberg(e.into()))?;
905        }
906
907        tracing::info!(
908            catalog_name = iceberg_config.catalog_name(),
909            table_name = iceberg_config.full_table_name()?.to_string(),
910            %sink_id,
911            "Expired snapshots for iceberg table",
912        );
913
914        Ok(())
915    }
916}