risingwave_meta/manager/iceberg_compaction/
gc.rs1use iceberg::transaction::{ApplyTransactionAction, Transaction};
16use itertools::Itertools;
17use risingwave_connector::sink::SinkError;
18use risingwave_connector::sink::catalog::SinkId;
19use thiserror_ext::AsReport;
20use tokio::sync::oneshot::Sender;
21use tokio::task::JoinHandle;
22
23use super::*;
24
25impl IcebergCompactionManager {
26 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
27 assert!(
28 interval_sec > 0,
29 "Iceberg GC interval must be greater than 0"
30 );
31 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
32 let join_handle = tokio::spawn(async move {
33 tracing::info!(
34 interval_sec = interval_sec,
35 "Starting Iceberg GC loop with configurable interval"
36 );
37 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
38
39 loop {
40 tokio::select! {
41 _ = interval.tick() => {
42 if let Err(e) = manager.perform_gc_operations().await {
43 tracing::error!(error = ?e.as_report(), "GC operations failed");
44 }
45 },
46 _ = &mut shutdown_rx => {
47 tracing::info!("Iceberg GC loop is stopped");
48 return;
49 }
50 }
51 }
52 });
53
54 (join_handle, shutdown_tx)
55 }
56
57 async fn perform_gc_operations(&self) -> MetaResult<()> {
58 let sink_ids = {
59 let guard = self.inner.read();
60 guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
61 };
62
63 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
64
65 for sink_id in sink_ids {
66 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
67 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
68 }
69 }
70
71 tracing::info!("GC operations completed");
72 Ok(())
73 }
74
75 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
76 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
77 let now = chrono::Utc::now().timestamp_millis();
78
79 let iceberg_config = self.load_iceberg_config(sink_id).await?;
80 if !iceberg_config.enable_snapshot_expiration {
81 return Ok(());
82 }
83
84 let catalog = iceberg_config.create_catalog().await?;
85 let mut table = catalog
86 .load_table(&iceberg_config.full_table_name()?)
87 .await
88 .map_err(|e| SinkError::Iceberg(e.into()))?;
89
90 let metadata = table.metadata();
91 let mut snapshots = metadata.snapshots().collect_vec();
92 snapshots.sort_by_key(|s| s.timestamp_ms());
93
94 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
95
96 let snapshot_expiration_timestamp_ms =
97 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
98 Some(timestamp) => timestamp,
99 None => default_snapshot_expiration_timestamp_ms,
100 };
101
102 if snapshots.is_empty()
103 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
104 {
105 return Ok(());
106 }
107
108 tracing::info!(
109 catalog_name = iceberg_config.catalog_name(),
110 table_name = iceberg_config.full_table_name()?.to_string(),
111 %sink_id,
112 snapshots_len = snapshots.len(),
113 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
114 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
115 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
116 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
117 "try trigger snapshots expiration",
118 );
119
120 let txn = Transaction::new(&table);
121
122 let mut expired_snapshots = txn
123 .expire_snapshot()
124 .expire_older_than(snapshot_expiration_timestamp_ms)
125 .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
126 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
127
128 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
129 expired_snapshots = expired_snapshots.retain_last(retain_last);
130 }
131
132 let before_metadata = table.metadata_ref();
133 let tx = expired_snapshots
134 .apply(txn)
135 .map_err(|e| SinkError::Iceberg(e.into()))?;
136 table = tx
137 .commit(catalog.as_ref())
138 .await
139 .map_err(|e| SinkError::Iceberg(e.into()))?;
140
141 if iceberg_config.snapshot_expiration_clear_expired_files {
142 table
143 .cleanup_expired_files(&before_metadata)
144 .await
145 .map_err(|e| SinkError::Iceberg(e.into()))?;
146 }
147
148 tracing::info!(
149 catalog_name = iceberg_config.catalog_name(),
150 table_name = iceberg_config.full_table_name()?.to_string(),
151 %sink_id,
152 "Expired snapshots for iceberg table",
153 );
154
155 Ok(())
156 }
157}