risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use iceberg::transaction::Transaction;
21use itertools::Itertools;
22use parking_lot::RwLock;
23use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
24use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
25use risingwave_connector::sink::iceberg::IcebergConfig;
26use risingwave_connector::sink::{SinkError, SinkParam};
27use risingwave_pb::catalog::PbSink;
28use risingwave_pb::iceberg_compaction::{
29    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
30};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35use tonic::Streaming;
36
37use super::MetaSrvEnv;
38use crate::MetaResult;
39use crate::hummock::{
40    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
41    IcebergCompactor, IcebergCompactorManagerRef,
42};
43use crate::manager::MetadataManager;
44use crate::rpc::metrics::MetaMetrics;
45
46pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
47
48type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
49
50type CompactorChangeRx =
51    UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53#[derive(Debug, Clone)]
54struct CommitInfo {
55    count: usize,
56    next_compaction_time: Option<Instant>,
57    compaction_interval: u64,
58}
59
60impl CommitInfo {
61    fn set_processing(&mut self) {
62        self.count = 0;
63        // `set next_compaction_time` to `None` value that means is processing
64        self.next_compaction_time.take();
65    }
66
67    fn initialize(&mut self) {
68        self.count = 0;
69        self.next_compaction_time =
70            Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
71    }
72
73    fn replace(&mut self, commit_info: CommitInfo) {
74        self.count = commit_info.count;
75        self.next_compaction_time = commit_info.next_compaction_time;
76        self.compaction_interval = commit_info.compaction_interval;
77    }
78
79    fn increase_count(&mut self) {
80        self.count += 1;
81    }
82
83    fn update_compaction_interval(&mut self, compaction_interval: u64) {
84        self.compaction_interval = compaction_interval;
85
86        // reset the next compaction time
87        self.next_compaction_time =
88            Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
89    }
90}
91
92pub struct IcebergCompactionHandle {
93    sink_id: SinkId,
94    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
95    metadata_manager: MetadataManager,
96    handle_success: bool,
97
98    /// The commit info of the iceberg compaction handle for recovery.
99    commit_info: CommitInfo,
100}
101
102impl IcebergCompactionHandle {
103    fn new(
104        sink_id: SinkId,
105        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
106        metadata_manager: MetadataManager,
107        commit_info: CommitInfo,
108    ) -> Self {
109        Self {
110            sink_id,
111            inner,
112            metadata_manager,
113            handle_success: false,
114            commit_info,
115        }
116    }
117
118    pub async fn send_compact_task(
119        mut self,
120        compactor: Arc<IcebergCompactor>,
121        task_id: u64,
122    ) -> MetaResult<()> {
123        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
124        let prost_sink_catalog: PbSink = self
125            .metadata_manager
126            .catalog_controller
127            .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
128            .await?
129            .remove(0);
130        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
131        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
132        let result =
133            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
134                // Todo! Use iceberg's compaction task ID
135                task_id,
136                props: param.properties,
137            }));
138
139        if result.is_ok() {
140            self.handle_success = true;
141        }
142
143        result
144    }
145
146    pub fn sink_id(&self) -> SinkId {
147        self.sink_id
148    }
149}
150
151impl Drop for IcebergCompactionHandle {
152    fn drop(&mut self) {
153        if self.handle_success {
154            let mut guard = self.inner.write();
155            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
156                commit_info.initialize();
157            }
158        } else {
159            // If the handle is not successful, we need to reset the commit info
160            // to the original state.
161            // This is to avoid the case where the handle is dropped before the
162            // compaction task is sent.
163            let mut guard = self.inner.write();
164            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
165                commit_info.replace(self.commit_info.clone());
166            }
167        }
168    }
169}
170
171struct IcebergCompactionManagerInner {
172    pub iceberg_commits: HashMap<SinkId, CommitInfo>,
173}
174
175pub struct IcebergCompactionManager {
176    pub env: MetaSrvEnv,
177    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
178
179    metadata_manager: MetadataManager,
180    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
181
182    compactor_streams_change_tx: CompactorChangeTx,
183
184    pub metrics: Arc<MetaMetrics>,
185}
186
187impl IcebergCompactionManager {
188    pub fn build(
189        env: MetaSrvEnv,
190        metadata_manager: MetadataManager,
191        iceberg_compactor_manager: IcebergCompactorManagerRef,
192        metrics: Arc<MetaMetrics>,
193    ) -> (Arc<Self>, CompactorChangeRx) {
194        let (compactor_streams_change_tx, compactor_streams_change_rx) =
195            tokio::sync::mpsc::unbounded_channel();
196        (
197            Arc::new(Self {
198                env,
199                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
200                    iceberg_commits: HashMap::default(),
201                })),
202                metadata_manager,
203                iceberg_compactor_manager,
204                compactor_streams_change_tx,
205                metrics,
206            }),
207            compactor_streams_change_rx,
208        )
209    }
210
211    pub fn compaction_stat_loop(
212        manager: Arc<Self>,
213        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
214    ) -> (JoinHandle<()>, Sender<()>) {
215        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
216        let join_handle = tokio::spawn(async move {
217            loop {
218                tokio::select! {
219                    Some(stat) = rx.recv() => {
220                        manager.update_iceberg_commit_info(stat);
221                    },
222                    _ = &mut shutdown_rx => {
223                        tracing::info!("Iceberg compaction manager is stopped");
224                        return;
225                    }
226                }
227            }
228        });
229
230        (join_handle, shutdown_tx)
231    }
232
233    pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
234        let mut guard = self.inner.write();
235
236        let IcebergSinkCompactionUpdate {
237            sink_id,
238            compaction_interval,
239        } = msg;
240
241        // if the compaction interval is changed, we need to reset the commit info when the compaction task is sent of initialized
242        let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
243            count: 0,
244            next_compaction_time: Some(
245                Instant::now() + std::time::Duration::from_secs(compaction_interval),
246            ),
247            compaction_interval,
248        });
249
250        commit_info.increase_count();
251        if commit_info.compaction_interval != compaction_interval {
252            commit_info.update_compaction_interval(compaction_interval);
253        }
254    }
255
256    /// Get the top N iceberg commit sink ids
257    /// Sorted by commit count and next compaction time
258    pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
259        let now = Instant::now();
260        let mut guard = self.inner.write();
261        guard
262            .iceberg_commits
263            .iter_mut()
264            .filter(|(_, commit_info)| {
265                commit_info.count > 0
266                    && if let Some(next_compaction_time) = commit_info.next_compaction_time {
267                        next_compaction_time <= now
268                    } else {
269                        false
270                    }
271            })
272            .sorted_by(|a, b| {
273                b.1.count
274                    .cmp(&a.1.count)
275                    .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
276            })
277            .take(n)
278            .map(|(sink_id, commit_info)| {
279                // reset the commit count and next compaction time and avoid double call
280                let handle = IcebergCompactionHandle::new(
281                    *sink_id,
282                    self.inner.clone(),
283                    self.metadata_manager.clone(),
284                    commit_info.clone(),
285                );
286
287                commit_info.set_processing();
288
289                handle
290            })
291            .collect::<Vec<_>>()
292    }
293
294    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
295        let mut guard = self.inner.write();
296        guard.iceberg_commits.remove(&sink_id);
297    }
298
299    pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
300        let prost_sink_catalog: PbSink = self
301            .metadata_manager
302            .catalog_controller
303            .get_sink_by_ids(vec![sink_id.sink_id as i32])
304            .await?
305            .remove(0);
306        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
307        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
308        Ok(param)
309    }
310
311    #[allow(dead_code)]
312    pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
313        let sink_param = self.get_sink_param(sink_id).await?;
314        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
315        let table = iceberg_config.load_table().await?;
316        Ok(table)
317    }
318
319    pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
320        let sink_param = self.get_sink_param(sink_id).await?;
321        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
322        Ok(iceberg_config)
323    }
324
325    pub fn add_compactor_stream(
326        &self,
327        context_id: u32,
328        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
329    ) {
330        self.compactor_streams_change_tx
331            .send((context_id, req_stream))
332            .unwrap();
333    }
334
335    pub fn iceberg_compaction_event_loop(
336        iceberg_compaction_manager: Arc<Self>,
337        compactor_streams_change_rx: UnboundedReceiver<(
338            u32,
339            Streaming<SubscribeIcebergCompactionEventRequest>,
340        )>,
341    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
342        let mut join_handle_vec = Vec::default();
343
344        let iceberg_compaction_event_handler =
345            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
346
347        let iceberg_compaction_event_dispatcher =
348            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
349
350        let event_loop = IcebergCompactionEventLoop::new(
351            iceberg_compaction_event_dispatcher,
352            iceberg_compaction_manager.metrics.clone(),
353            compactor_streams_change_rx,
354        );
355
356        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
357        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
358
359        join_handle_vec
360    }
361
362    /// GC loop for expired snapshots management
363    /// This is a separate loop that periodically checks all tracked Iceberg tables
364    /// and performs garbage collection operations like expiring old snapshots
365    pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
366        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
367        let join_handle = tokio::spawn(async move {
368            // Run GC every hour by default
369            const GC_LOOP_INTERVAL_SECS: u64 = 3600;
370            let mut interval =
371                tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
372
373            loop {
374                tokio::select! {
375                    _ = interval.tick() => {
376                        if let Err(e) = manager.perform_gc_operations().await {
377                            tracing::error!(error = ?e.as_report(), "GC operations failed");
378                        }
379                    },
380                    _ = &mut shutdown_rx => {
381                        tracing::info!("Iceberg GC loop is stopped");
382                        return;
383                    }
384                }
385            }
386        });
387
388        (join_handle, shutdown_tx)
389    }
390
391    /// Perform GC operations on all tracked Iceberg tables
392    async fn perform_gc_operations(&self) -> MetaResult<()> {
393        // Get all sink IDs that are currently tracked
394        let sink_ids = {
395            let guard = self.inner.read();
396            guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
397        };
398
399        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
400
401        for sink_id in sink_ids {
402            if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
403                // Continue with other tables even if one fails
404                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
405            }
406        }
407
408        tracing::info!("GC operations completed");
409        Ok(())
410    }
411
412    /// Check snapshot count for a specific table and trigger expiration if needed
413    async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
414        // Configurable thresholds - could be moved to config later
415        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 1 day
416        let now = chrono::Utc::now().timestamp_millis();
417        let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
418
419        let iceberg_config = self.load_iceberg_config(sink_id).await?;
420        if !iceberg_config.enable_snapshot_expiration {
421            return Ok(());
422        }
423
424        let catalog = iceberg_config.create_catalog().await?;
425        let table = catalog
426            .load_table(&iceberg_config.full_table_name()?)
427            .await
428            .map_err(|e| SinkError::Iceberg(e.into()))?;
429
430        let metadata = table.metadata();
431        let mut snapshots = metadata.snapshots().collect_vec();
432        snapshots.sort_by_key(|s| s.timestamp_ms());
433
434        if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
435            // avoid commit empty table updates
436            return Ok(());
437        }
438
439        tracing::info!(
440            "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
441            iceberg_config.catalog_name(),
442            iceberg_config.full_table_name()?,
443            sink_id.sink_id,
444            snapshots.len(),
445        );
446
447        let tx = Transaction::new(&table);
448
449        // TODO: use config
450        let expired_snapshots = tx
451            .expire_snapshot()
452            .clear_expired_files(true)
453            .clear_expired_meta_data(true);
454
455        let tx = expired_snapshots
456            .apply()
457            .await
458            .map_err(|e| SinkError::Iceberg(e.into()))?;
459        tx.commit(catalog.as_ref())
460            .await
461            .map_err(|e| SinkError::Iceberg(e.into()))?;
462
463        tracing::info!(
464            "Expired snapshots for iceberg catalog {} table {} sink-id {}",
465            iceberg_config.catalog_name(),
466            iceberg_config.full_table_name()?,
467            sink_id.sink_id,
468        );
469
470        Ok(())
471    }
472}