risingwave_meta/manager/iceberg_compaction/
gc.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::transaction::{ApplyTransactionAction, Transaction};
16use itertools::Itertools;
17use risingwave_connector::sink::SinkError;
18use risingwave_connector::sink::catalog::SinkId;
19use thiserror_ext::AsReport;
20use tokio::sync::oneshot::Sender;
21use tokio::task::JoinHandle;
22
23use super::*;
24
25impl IcebergCompactionManager {
26    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
27        assert!(
28            interval_sec > 0,
29            "Iceberg GC interval must be greater than 0"
30        );
31        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
32        let join_handle = tokio::spawn(async move {
33            tracing::info!(
34                interval_sec = interval_sec,
35                "Starting Iceberg GC loop with configurable interval"
36            );
37            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
38
39            loop {
40                tokio::select! {
41                    _ = interval.tick() => {
42                        if let Err(e) = manager.perform_gc_operations().await {
43                            tracing::error!(error = ?e.as_report(), "GC operations failed");
44                        }
45                    },
46                    _ = &mut shutdown_rx => {
47                        tracing::info!("Iceberg GC loop is stopped");
48                        return;
49                    }
50                }
51            }
52        });
53
54        (join_handle, shutdown_tx)
55    }
56
57    async fn perform_gc_operations(&self) -> MetaResult<()> {
58        let sink_ids = {
59            let guard = self.inner.read();
60            guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
61        };
62
63        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
64
65        for sink_id in sink_ids {
66            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
67                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
68            }
69        }
70
71        tracing::info!("GC operations completed");
72        Ok(())
73    }
74
75    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
76        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
77        let now = chrono::Utc::now().timestamp_millis();
78
79        let iceberg_config = self.load_iceberg_config(sink_id).await?;
80        if !iceberg_config.enable_snapshot_expiration {
81            return Ok(());
82        }
83
84        let catalog = iceberg_config.create_catalog().await?;
85        let mut table = catalog
86            .load_table(&iceberg_config.full_table_name()?)
87            .await
88            .map_err(|e| SinkError::Iceberg(e.into()))?;
89
90        let metadata = table.metadata();
91        let mut snapshots = metadata.snapshots().collect_vec();
92        snapshots.sort_by_key(|s| s.timestamp_ms());
93
94        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
95
96        let snapshot_expiration_timestamp_ms =
97            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
98                Some(timestamp) => timestamp,
99                None => default_snapshot_expiration_timestamp_ms,
100            };
101
102        if snapshots.is_empty()
103            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
104        {
105            return Ok(());
106        }
107
108        tracing::info!(
109            catalog_name = iceberg_config.catalog_name(),
110            table_name = iceberg_config.full_table_name()?.to_string(),
111            %sink_id,
112            snapshots_len = snapshots.len(),
113            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
114            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
115            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
116            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
117            "try trigger snapshots expiration",
118        );
119
120        let txn = Transaction::new(&table);
121
122        let mut expired_snapshots = txn
123            .expire_snapshot()
124            .expire_older_than(snapshot_expiration_timestamp_ms)
125            .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
126            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
127
128        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
129            expired_snapshots = expired_snapshots.retain_last(retain_last);
130        }
131
132        let before_metadata = table.metadata_ref();
133        let tx = expired_snapshots
134            .apply(txn)
135            .map_err(|e| SinkError::Iceberg(e.into()))?;
136        table = tx
137            .commit(catalog.as_ref())
138            .await
139            .map_err(|e| SinkError::Iceberg(e.into()))?;
140
141        if iceberg_config.snapshot_expiration_clear_expired_files {
142            table
143                .cleanup_expired_files(&before_metadata)
144                .await
145                .map_err(|e| SinkError::Iceberg(e.into()))?;
146        }
147
148        tracing::info!(
149            catalog_name = iceberg_config.catalog_name(),
150            table_name = iceberg_config.full_table_name()?.to_string(),
151            %sink_id,
152            "Expired snapshots for iceberg table",
153        );
154
155        Ok(())
156    }
157}