1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use iceberg::transaction::Transaction;
21use itertools::Itertools;
22use parking_lot::RwLock;
23use risingwave_common::bail;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
26use risingwave_connector::sink::iceberg::IcebergConfig;
27use risingwave_connector::sink::{SinkError, SinkParam};
28use risingwave_pb::catalog::PbSink;
29use risingwave_pb::iceberg_compaction::{
30 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
31};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34use tokio::sync::oneshot::Sender;
35use tokio::task::JoinHandle;
36use tonic::Streaming;
37
38use super::MetaSrvEnv;
39use crate::MetaResult;
40use crate::hummock::{
41 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
42 IcebergCompactor, IcebergCompactorManagerRef,
43};
44use crate::manager::MetadataManager;
45use crate::rpc::metrics::MetaMetrics;
46
47pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
48
49type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
50
51type CompactorChangeRx =
52 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
53
54#[derive(Debug, Clone)]
55struct CommitInfo {
56 count: usize,
57 next_compaction_time: Option<Instant>,
58 compaction_interval: u64,
59}
60
61impl CommitInfo {
62 fn set_processing(&mut self) {
63 self.count = 0;
64 self.next_compaction_time.take();
66 }
67
68 fn initialize(&mut self) {
69 self.count = 0;
70 self.next_compaction_time =
71 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
72 }
73
74 fn replace(&mut self, commit_info: CommitInfo) {
75 self.count = commit_info.count;
76 self.next_compaction_time = commit_info.next_compaction_time;
77 self.compaction_interval = commit_info.compaction_interval;
78 }
79
80 fn increase_count(&mut self) {
81 self.count += 1;
82 }
83
84 fn update_compaction_interval(&mut self, compaction_interval: u64) {
85 self.compaction_interval = compaction_interval;
86
87 self.next_compaction_time =
89 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
90 }
91}
92
93pub struct IcebergCompactionHandle {
94 sink_id: SinkId,
95 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
96 metadata_manager: MetadataManager,
97 handle_success: bool,
98
99 commit_info: CommitInfo,
101}
102
103impl IcebergCompactionHandle {
104 fn new(
105 sink_id: SinkId,
106 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
107 metadata_manager: MetadataManager,
108 commit_info: CommitInfo,
109 ) -> Self {
110 Self {
111 sink_id,
112 inner,
113 metadata_manager,
114 handle_success: false,
115 commit_info,
116 }
117 }
118
119 pub async fn send_compact_task(
120 mut self,
121 compactor: Arc<IcebergCompactor>,
122 task_id: u64,
123 ) -> MetaResult<()> {
124 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
125 let prost_sink_catalog: PbSink = self
126 .metadata_manager
127 .catalog_controller
128 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
129 .await?
130 .remove(0);
131 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
132 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
133 let result =
134 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
135 task_id,
137 props: param.properties,
138 }));
139
140 if result.is_ok() {
141 self.handle_success = true;
142 }
143
144 result
145 }
146
147 pub fn sink_id(&self) -> SinkId {
148 self.sink_id
149 }
150}
151
152impl Drop for IcebergCompactionHandle {
153 fn drop(&mut self) {
154 if self.handle_success {
155 let mut guard = self.inner.write();
156 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
157 commit_info.initialize();
158 }
159 } else {
160 let mut guard = self.inner.write();
165 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
166 commit_info.replace(self.commit_info.clone());
167 }
168 }
169 }
170}
171
172struct IcebergCompactionManagerInner {
173 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
174}
175
176pub struct IcebergCompactionManager {
177 pub env: MetaSrvEnv,
178 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
179
180 metadata_manager: MetadataManager,
181 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
182
183 compactor_streams_change_tx: CompactorChangeTx,
184
185 pub metrics: Arc<MetaMetrics>,
186}
187
188impl IcebergCompactionManager {
189 pub fn build(
190 env: MetaSrvEnv,
191 metadata_manager: MetadataManager,
192 iceberg_compactor_manager: IcebergCompactorManagerRef,
193 metrics: Arc<MetaMetrics>,
194 ) -> (Arc<Self>, CompactorChangeRx) {
195 let (compactor_streams_change_tx, compactor_streams_change_rx) =
196 tokio::sync::mpsc::unbounded_channel();
197 (
198 Arc::new(Self {
199 env,
200 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
201 iceberg_commits: HashMap::default(),
202 })),
203 metadata_manager,
204 iceberg_compactor_manager,
205 compactor_streams_change_tx,
206 metrics,
207 }),
208 compactor_streams_change_rx,
209 )
210 }
211
212 pub fn compaction_stat_loop(
213 manager: Arc<Self>,
214 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
215 ) -> (JoinHandle<()>, Sender<()>) {
216 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
217 let join_handle = tokio::spawn(async move {
218 loop {
219 tokio::select! {
220 Some(stat) = rx.recv() => {
221 manager.update_iceberg_commit_info(stat);
222 },
223 _ = &mut shutdown_rx => {
224 tracing::info!("Iceberg compaction manager is stopped");
225 return;
226 }
227 }
228 }
229 });
230
231 (join_handle, shutdown_tx)
232 }
233
234 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
235 let mut guard = self.inner.write();
236
237 let IcebergSinkCompactionUpdate {
238 sink_id,
239 compaction_interval,
240 } = msg;
241
242 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
244 count: 0,
245 next_compaction_time: Some(
246 Instant::now() + std::time::Duration::from_secs(compaction_interval),
247 ),
248 compaction_interval,
249 });
250
251 commit_info.increase_count();
252 if commit_info.compaction_interval != compaction_interval {
253 commit_info.update_compaction_interval(compaction_interval);
254 }
255 }
256
257 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
260 let now = Instant::now();
261 let mut guard = self.inner.write();
262 guard
263 .iceberg_commits
264 .iter_mut()
265 .filter(|(_, commit_info)| {
266 commit_info.count > 0
267 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
268 next_compaction_time <= now
269 } else {
270 false
271 }
272 })
273 .sorted_by(|a, b| {
274 b.1.count
275 .cmp(&a.1.count)
276 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
277 })
278 .take(n)
279 .map(|(sink_id, commit_info)| {
280 let handle = IcebergCompactionHandle::new(
282 *sink_id,
283 self.inner.clone(),
284 self.metadata_manager.clone(),
285 commit_info.clone(),
286 );
287
288 commit_info.set_processing();
289
290 handle
291 })
292 .collect::<Vec<_>>()
293 }
294
295 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
296 let mut guard = self.inner.write();
297 guard.iceberg_commits.remove(&sink_id);
298 }
299
300 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
301 let mut sinks = self
302 .metadata_manager
303 .catalog_controller
304 .get_sink_by_ids(vec![sink_id.sink_id as i32])
305 .await?;
306 if sinks.is_empty() {
307 bail!("Sink not found: {}", sink_id.sink_id);
308 }
309 let prost_sink_catalog: PbSink = sinks.remove(0);
310 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
311 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
312 Ok(param)
313 }
314
315 #[allow(dead_code)]
316 pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
317 let sink_param = self.get_sink_param(sink_id).await?;
318 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
319 let table = iceberg_config.load_table().await?;
320 Ok(table)
321 }
322
323 pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
324 let sink_param = self.get_sink_param(sink_id).await?;
325 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
326 Ok(iceberg_config)
327 }
328
329 pub fn add_compactor_stream(
330 &self,
331 context_id: u32,
332 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
333 ) {
334 self.compactor_streams_change_tx
335 .send((context_id, req_stream))
336 .unwrap();
337 }
338
339 pub fn iceberg_compaction_event_loop(
340 iceberg_compaction_manager: Arc<Self>,
341 compactor_streams_change_rx: UnboundedReceiver<(
342 u32,
343 Streaming<SubscribeIcebergCompactionEventRequest>,
344 )>,
345 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
346 let mut join_handle_vec = Vec::default();
347
348 let iceberg_compaction_event_handler =
349 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
350
351 let iceberg_compaction_event_dispatcher =
352 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
353
354 let event_loop = IcebergCompactionEventLoop::new(
355 iceberg_compaction_event_dispatcher,
356 iceberg_compaction_manager.metrics.clone(),
357 compactor_streams_change_rx,
358 );
359
360 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
361 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
362
363 join_handle_vec
364 }
365
366 pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
370 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
371 let join_handle = tokio::spawn(async move {
372 const GC_LOOP_INTERVAL_SECS: u64 = 3600;
374 let mut interval =
375 tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
376
377 loop {
378 tokio::select! {
379 _ = interval.tick() => {
380 if let Err(e) = manager.perform_gc_operations().await {
381 tracing::error!(error = ?e.as_report(), "GC operations failed");
382 }
383 },
384 _ = &mut shutdown_rx => {
385 tracing::info!("Iceberg GC loop is stopped");
386 return;
387 }
388 }
389 }
390 });
391
392 (join_handle, shutdown_tx)
393 }
394
395 async fn perform_gc_operations(&self) -> MetaResult<()> {
397 let sink_ids = {
399 let guard = self.inner.read();
400 guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
401 };
402
403 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
404
405 for sink_id in sink_ids {
406 if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
407 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
409 }
410 }
411
412 tracing::info!("GC operations completed");
413 Ok(())
414 }
415
416 async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
418 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
421 let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
422
423 let iceberg_config = self.load_iceberg_config(sink_id).await?;
424 if !iceberg_config.enable_snapshot_expiration {
425 return Ok(());
426 }
427
428 let catalog = iceberg_config.create_catalog().await?;
429 let table = catalog
430 .load_table(&iceberg_config.full_table_name()?)
431 .await
432 .map_err(|e| SinkError::Iceberg(e.into()))?;
433
434 let metadata = table.metadata();
435 let mut snapshots = metadata.snapshots().collect_vec();
436 snapshots.sort_by_key(|s| s.timestamp_ms());
437
438 if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
439 return Ok(());
441 }
442
443 tracing::info!(
444 "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
445 iceberg_config.catalog_name(),
446 iceberg_config.full_table_name()?,
447 sink_id.sink_id,
448 snapshots.len(),
449 );
450
451 let tx = Transaction::new(&table);
452
453 let expired_snapshots = tx
455 .expire_snapshot()
456 .clear_expired_files(true)
457 .clear_expired_meta_data(true);
458
459 let tx = expired_snapshots
460 .apply()
461 .await
462 .map_err(|e| SinkError::Iceberg(e.into()))?;
463 tx.commit(catalog.as_ref())
464 .await
465 .map_err(|e| SinkError::Iceberg(e.into()))?;
466
467 tracing::info!(
468 "Expired snapshots for iceberg catalog {} table {} sink-id {}",
469 iceberg_config.catalog_name(),
470 iceberg_config.full_table_name()?,
471 sink_id.sink_id,
472 );
473
474 Ok(())
475 }
476}