risingwave_meta/manager/iceberg_compaction/
gc.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use iceberg::transaction::{ApplyTransactionAction, Transaction};
16use itertools::Itertools;
17use risingwave_connector::sink::SinkError;
18use risingwave_connector::sink::catalog::SinkId;
19use thiserror_ext::AsReport;
20use tokio::sync::oneshot::Sender;
21use tokio::task::JoinHandle;
22
23use super::*;
24
25const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
26
27fn snapshot_expiration_cutoff_ms(iceberg_config: &IcebergConfig, now: i64) -> i64 {
28    iceberg_config
29        .snapshot_expiration_timestamp_ms(now)
30        .unwrap_or(now - MAX_SNAPSHOT_AGE_MS_DEFAULT)
31}
32
33impl IcebergCompactionManager {
34    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
35        assert!(
36            interval_sec > 0,
37            "Iceberg GC interval must be greater than 0"
38        );
39        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
40        let join_handle = tokio::spawn(async move {
41            tracing::info!(
42                interval_sec = interval_sec,
43                "Starting Iceberg GC loop with configurable interval"
44            );
45            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
46
47            loop {
48                tokio::select! {
49                    _ = interval.tick() => {
50                        if let Err(e) = manager.perform_gc_operations().await {
51                            tracing::error!(error = ?e.as_report(), "GC operations failed");
52                        }
53                    },
54                    _ = &mut shutdown_rx => {
55                        tracing::info!("Iceberg GC loop is stopped");
56                        return;
57                    }
58                }
59            }
60        });
61
62        (join_handle, shutdown_tx)
63    }
64
65    async fn perform_gc_operations(&self) -> MetaResult<()> {
66        let sink_ids = {
67            let guard = self.inner.read();
68            guard
69                .snapshot_expiration_sink_ids
70                .iter()
71                .cloned()
72                .collect::<Vec<_>>()
73        };
74
75        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
76
77        for sink_id in sink_ids {
78            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
79                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
80            }
81        }
82
83        tracing::info!("GC operations completed");
84        Ok(())
85    }
86
87    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
88        let now = chrono::Utc::now().timestamp_millis();
89
90        let iceberg_config = self.load_iceberg_config(sink_id).await?;
91        if !iceberg_config.enable_snapshot_expiration {
92            let mut guard = self.inner.write();
93            guard.snapshot_expiration_sink_ids.remove(&sink_id);
94            return Ok(());
95        }
96
97        let processing_gc_watermark_snapshot = {
98            let guard = self.inner.read();
99            guard
100                .sink_schedules
101                .get(&sink_id)
102                .and_then(|track| track.processing_gc_watermark_snapshot())
103                .map(|snapshot| snapshot.cloned())
104        };
105
106        let mut snapshot_expiration_timestamp_ms =
107            snapshot_expiration_cutoff_ms(&iceberg_config, now);
108
109        // Outer `None` means no active compaction task. Inner `None` means an
110        // active task exists without a safe snapshot watermark, so GC skips.
111        match processing_gc_watermark_snapshot {
112            None => {}
113            Some(None) => {
114                tracing::info!(
115                    catalog_name = iceberg_config.catalog_name(),
116                    table_name = iceberg_config.full_table_name()?.to_string(),
117                    %sink_id,
118                    "Skip snapshots expiration because an iceberg compaction task has no observed GC watermark",
119                );
120                return Ok(());
121            }
122            Some(Some(snapshot)) => {
123                // A running compaction task may still need snapshots up to its
124                // captured watermark, so GC must not expire newer snapshots.
125                snapshot_expiration_timestamp_ms =
126                    snapshot_expiration_timestamp_ms.min(snapshot.timestamp_ms);
127                tracing::info!(
128                    catalog_name = iceberg_config.catalog_name(),
129                    table_name = iceberg_config.full_table_name()?.to_string(),
130                    %sink_id,
131                    gc_watermark_branch = %snapshot.branch,
132                    gc_watermark_snapshot_id = snapshot.snapshot_id,
133                    gc_watermark_timestamp_ms = snapshot.timestamp_ms,
134                    protected_snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
135                    "Protect snapshots expiration with iceberg compaction GC watermark",
136                );
137            }
138        }
139
140        let catalog = iceberg_config.create_catalog().await?;
141        let mut table = catalog
142            .load_table(&iceberg_config.full_table_name()?)
143            .await
144            .map_err(|e| SinkError::Iceberg(e.into()))?;
145
146        let metadata = table.metadata();
147        let mut snapshots = metadata.snapshots().collect_vec();
148        snapshots.sort_by_key(|s| s.timestamp_ms());
149
150        if snapshots.is_empty()
151            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
152        {
153            return Ok(());
154        }
155
156        tracing::info!(
157            catalog_name = iceberg_config.catalog_name(),
158            table_name = iceberg_config.full_table_name()?.to_string(),
159            %sink_id,
160            snapshots_len = snapshots.len(),
161            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
162            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
163            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
164            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
165            "try trigger snapshots expiration",
166        );
167
168        let txn = Transaction::new(&table);
169
170        let mut expired_snapshots = txn
171            .expire_snapshot()
172            .expire_older_than(snapshot_expiration_timestamp_ms)
173            .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
174            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
175
176        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
177            expired_snapshots = expired_snapshots.retain_last(retain_last);
178        }
179
180        let before_metadata = table.metadata_ref();
181        let tx = expired_snapshots
182            .apply(txn)
183            .map_err(|e| SinkError::Iceberg(e.into()))?;
184        table = tx
185            .commit(catalog.as_ref())
186            .await
187            .map_err(|e| SinkError::Iceberg(e.into()))?;
188
189        if iceberg_config.snapshot_expiration_clear_expired_files {
190            table
191                .cleanup_expired_files(&before_metadata)
192                .await
193                .map_err(|e| SinkError::Iceberg(e.into()))?;
194        }
195
196        tracing::info!(
197            catalog_name = iceberg_config.catalog_name(),
198            table_name = iceberg_config.full_table_name()?.to_string(),
199            %sink_id,
200            "Expired snapshots for iceberg table",
201        );
202
203        Ok(())
204    }
205}