risingwave_meta/manager/iceberg_compaction/
gc.rs1use iceberg::transaction::{ApplyTransactionAction, Transaction};
16use itertools::Itertools;
17use risingwave_connector::sink::SinkError;
18use risingwave_connector::sink::catalog::SinkId;
19use thiserror_ext::AsReport;
20use tokio::sync::oneshot::Sender;
21use tokio::task::JoinHandle;
22
23use super::*;
24
25const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
26
27fn snapshot_expiration_cutoff_ms(iceberg_config: &IcebergConfig, now: i64) -> i64 {
28 iceberg_config
29 .snapshot_expiration_timestamp_ms(now)
30 .unwrap_or(now - MAX_SNAPSHOT_AGE_MS_DEFAULT)
31}
32
33impl IcebergCompactionManager {
34 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
35 assert!(
36 interval_sec > 0,
37 "Iceberg GC interval must be greater than 0"
38 );
39 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
40 let join_handle = tokio::spawn(async move {
41 tracing::info!(
42 interval_sec = interval_sec,
43 "Starting Iceberg GC loop with configurable interval"
44 );
45 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
46
47 loop {
48 tokio::select! {
49 _ = interval.tick() => {
50 if let Err(e) = manager.perform_gc_operations().await {
51 tracing::error!(error = ?e.as_report(), "GC operations failed");
52 }
53 },
54 _ = &mut shutdown_rx => {
55 tracing::info!("Iceberg GC loop is stopped");
56 return;
57 }
58 }
59 }
60 });
61
62 (join_handle, shutdown_tx)
63 }
64
65 async fn perform_gc_operations(&self) -> MetaResult<()> {
66 let sink_ids = {
67 let guard = self.inner.read();
68 guard
69 .snapshot_expiration_sink_ids
70 .iter()
71 .cloned()
72 .collect::<Vec<_>>()
73 };
74
75 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
76
77 for sink_id in sink_ids {
78 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
79 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
80 }
81 }
82
83 tracing::info!("GC operations completed");
84 Ok(())
85 }
86
87 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
88 let now = chrono::Utc::now().timestamp_millis();
89
90 let iceberg_config = self.load_iceberg_config(sink_id).await?;
91 if !iceberg_config.enable_snapshot_expiration {
92 let mut guard = self.inner.write();
93 guard.snapshot_expiration_sink_ids.remove(&sink_id);
94 return Ok(());
95 }
96
97 let processing_gc_watermark_snapshot = {
98 let guard = self.inner.read();
99 guard
100 .sink_schedules
101 .get(&sink_id)
102 .and_then(|track| track.processing_gc_watermark_snapshot())
103 .map(|snapshot| snapshot.cloned())
104 };
105
106 let mut snapshot_expiration_timestamp_ms =
107 snapshot_expiration_cutoff_ms(&iceberg_config, now);
108
109 match processing_gc_watermark_snapshot {
112 None => {}
113 Some(None) => {
114 tracing::info!(
115 catalog_name = iceberg_config.catalog_name(),
116 table_name = iceberg_config.full_table_name()?.to_string(),
117 %sink_id,
118 "Skip snapshots expiration because an iceberg compaction task has no observed GC watermark",
119 );
120 return Ok(());
121 }
122 Some(Some(snapshot)) => {
123 snapshot_expiration_timestamp_ms =
126 snapshot_expiration_timestamp_ms.min(snapshot.timestamp_ms);
127 tracing::info!(
128 catalog_name = iceberg_config.catalog_name(),
129 table_name = iceberg_config.full_table_name()?.to_string(),
130 %sink_id,
131 gc_watermark_branch = %snapshot.branch,
132 gc_watermark_snapshot_id = snapshot.snapshot_id,
133 gc_watermark_timestamp_ms = snapshot.timestamp_ms,
134 protected_snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
135 "Protect snapshots expiration with iceberg compaction GC watermark",
136 );
137 }
138 }
139
140 let catalog = iceberg_config.create_catalog().await?;
141 let mut table = catalog
142 .load_table(&iceberg_config.full_table_name()?)
143 .await
144 .map_err(|e| SinkError::Iceberg(e.into()))?;
145
146 let metadata = table.metadata();
147 let mut snapshots = metadata.snapshots().collect_vec();
148 snapshots.sort_by_key(|s| s.timestamp_ms());
149
150 if snapshots.is_empty()
151 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
152 {
153 return Ok(());
154 }
155
156 tracing::info!(
157 catalog_name = iceberg_config.catalog_name(),
158 table_name = iceberg_config.full_table_name()?.to_string(),
159 %sink_id,
160 snapshots_len = snapshots.len(),
161 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
162 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
163 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
164 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
165 "try trigger snapshots expiration",
166 );
167
168 let txn = Transaction::new(&table);
169
170 let mut expired_snapshots = txn
171 .expire_snapshot()
172 .expire_older_than(snapshot_expiration_timestamp_ms)
173 .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
174 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
175
176 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
177 expired_snapshots = expired_snapshots.retain_last(retain_last);
178 }
179
180 let before_metadata = table.metadata_ref();
181 let tx = expired_snapshots
182 .apply(txn)
183 .map_err(|e| SinkError::Iceberg(e.into()))?;
184 table = tx
185 .commit(catalog.as_ref())
186 .await
187 .map_err(|e| SinkError::Iceberg(e.into()))?;
188
189 if iceberg_config.snapshot_expiration_clear_expired_files {
190 table
191 .cleanup_expired_files(&before_metadata)
192 .await
193 .map_err(|e| SinkError::Iceberg(e.into()))?;
194 }
195
196 tracing::info!(
197 catalog_name = iceberg_config.catalog_name(),
198 table_name = iceberg_config.full_table_name()?.to_string(),
199 %sink_id,
200 "Expired snapshots for iceberg table",
201 );
202
203 Ok(())
204 }
205}