risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use iceberg::transaction::Transaction;
21use itertools::Itertools;
22use parking_lot::RwLock;
23use risingwave_common::bail;
24use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
25use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
26use risingwave_connector::sink::iceberg::IcebergConfig;
27use risingwave_connector::sink::{SinkError, SinkParam};
28use risingwave_pb::catalog::PbSink;
29use risingwave_pb::iceberg_compaction::{
30    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
31};
32use thiserror_ext::AsReport;
33use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
34use tokio::sync::oneshot::Sender;
35use tokio::task::JoinHandle;
36use tonic::Streaming;
37
38use super::MetaSrvEnv;
39use crate::MetaResult;
40use crate::hummock::{
41    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
42    IcebergCompactor, IcebergCompactorManagerRef,
43};
44use crate::manager::MetadataManager;
45use crate::rpc::metrics::MetaMetrics;
46
47pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
48
49type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
50
51type CompactorChangeRx =
52    UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
53
54#[derive(Debug, Clone)]
55struct CommitInfo {
56    count: usize,
57    next_compaction_time: Option<Instant>,
58    compaction_interval: u64,
59}
60
61impl CommitInfo {
62    fn set_processing(&mut self) {
63        self.count = 0;
64        // `set next_compaction_time` to `None` value that means is processing
65        self.next_compaction_time.take();
66    }
67
68    fn initialize(&mut self) {
69        self.count = 0;
70        self.next_compaction_time =
71            Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
72    }
73
74    fn replace(&mut self, commit_info: CommitInfo) {
75        self.count = commit_info.count;
76        self.next_compaction_time = commit_info.next_compaction_time;
77        self.compaction_interval = commit_info.compaction_interval;
78    }
79
80    fn increase_count(&mut self) {
81        self.count += 1;
82    }
83
84    fn update_compaction_interval(&mut self, compaction_interval: u64) {
85        self.compaction_interval = compaction_interval;
86
87        // reset the next compaction time
88        self.next_compaction_time =
89            Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
90    }
91}
92
93pub struct IcebergCompactionHandle {
94    sink_id: SinkId,
95    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
96    metadata_manager: MetadataManager,
97    handle_success: bool,
98
99    /// The commit info of the iceberg compaction handle for recovery.
100    commit_info: CommitInfo,
101}
102
103impl IcebergCompactionHandle {
104    fn new(
105        sink_id: SinkId,
106        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
107        metadata_manager: MetadataManager,
108        commit_info: CommitInfo,
109    ) -> Self {
110        Self {
111            sink_id,
112            inner,
113            metadata_manager,
114            handle_success: false,
115            commit_info,
116        }
117    }
118
119    pub async fn send_compact_task(
120        mut self,
121        compactor: Arc<IcebergCompactor>,
122        task_id: u64,
123    ) -> MetaResult<()> {
124        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
125        let prost_sink_catalog: PbSink = self
126            .metadata_manager
127            .catalog_controller
128            .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
129            .await?
130            .remove(0);
131        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
132        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
133        let result =
134            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
135                // Todo! Use iceberg's compaction task ID
136                task_id,
137                props: param.properties,
138            }));
139
140        if result.is_ok() {
141            self.handle_success = true;
142        }
143
144        result
145    }
146
147    pub fn sink_id(&self) -> SinkId {
148        self.sink_id
149    }
150}
151
152impl Drop for IcebergCompactionHandle {
153    fn drop(&mut self) {
154        if self.handle_success {
155            let mut guard = self.inner.write();
156            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
157                commit_info.initialize();
158            }
159        } else {
160            // If the handle is not successful, we need to reset the commit info
161            // to the original state.
162            // This is to avoid the case where the handle is dropped before the
163            // compaction task is sent.
164            let mut guard = self.inner.write();
165            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
166                commit_info.replace(self.commit_info.clone());
167            }
168        }
169    }
170}
171
172struct IcebergCompactionManagerInner {
173    pub iceberg_commits: HashMap<SinkId, CommitInfo>,
174}
175
176pub struct IcebergCompactionManager {
177    pub env: MetaSrvEnv,
178    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
179
180    metadata_manager: MetadataManager,
181    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
182
183    compactor_streams_change_tx: CompactorChangeTx,
184
185    pub metrics: Arc<MetaMetrics>,
186}
187
188impl IcebergCompactionManager {
189    pub fn build(
190        env: MetaSrvEnv,
191        metadata_manager: MetadataManager,
192        iceberg_compactor_manager: IcebergCompactorManagerRef,
193        metrics: Arc<MetaMetrics>,
194    ) -> (Arc<Self>, CompactorChangeRx) {
195        let (compactor_streams_change_tx, compactor_streams_change_rx) =
196            tokio::sync::mpsc::unbounded_channel();
197        (
198            Arc::new(Self {
199                env,
200                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
201                    iceberg_commits: HashMap::default(),
202                })),
203                metadata_manager,
204                iceberg_compactor_manager,
205                compactor_streams_change_tx,
206                metrics,
207            }),
208            compactor_streams_change_rx,
209        )
210    }
211
212    pub fn compaction_stat_loop(
213        manager: Arc<Self>,
214        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
215    ) -> (JoinHandle<()>, Sender<()>) {
216        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
217        let join_handle = tokio::spawn(async move {
218            loop {
219                tokio::select! {
220                    Some(stat) = rx.recv() => {
221                        manager.update_iceberg_commit_info(stat);
222                    },
223                    _ = &mut shutdown_rx => {
224                        tracing::info!("Iceberg compaction manager is stopped");
225                        return;
226                    }
227                }
228            }
229        });
230
231        (join_handle, shutdown_tx)
232    }
233
234    pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
235        let mut guard = self.inner.write();
236
237        let IcebergSinkCompactionUpdate {
238            sink_id,
239            compaction_interval,
240        } = msg;
241
242        // if the compaction interval is changed, we need to reset the commit info when the compaction task is sent of initialized
243        let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
244            count: 0,
245            next_compaction_time: Some(
246                Instant::now() + std::time::Duration::from_secs(compaction_interval),
247            ),
248            compaction_interval,
249        });
250
251        commit_info.increase_count();
252        if commit_info.compaction_interval != compaction_interval {
253            commit_info.update_compaction_interval(compaction_interval);
254        }
255    }
256
257    /// Get the top N iceberg commit sink ids
258    /// Sorted by commit count and next compaction time
259    pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
260        let now = Instant::now();
261        let mut guard = self.inner.write();
262        guard
263            .iceberg_commits
264            .iter_mut()
265            .filter(|(_, commit_info)| {
266                commit_info.count > 0
267                    && if let Some(next_compaction_time) = commit_info.next_compaction_time {
268                        next_compaction_time <= now
269                    } else {
270                        false
271                    }
272            })
273            .sorted_by(|a, b| {
274                b.1.count
275                    .cmp(&a.1.count)
276                    .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
277            })
278            .take(n)
279            .map(|(sink_id, commit_info)| {
280                // reset the commit count and next compaction time and avoid double call
281                let handle = IcebergCompactionHandle::new(
282                    *sink_id,
283                    self.inner.clone(),
284                    self.metadata_manager.clone(),
285                    commit_info.clone(),
286                );
287
288                commit_info.set_processing();
289
290                handle
291            })
292            .collect::<Vec<_>>()
293    }
294
295    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
296        let mut guard = self.inner.write();
297        guard.iceberg_commits.remove(&sink_id);
298    }
299
300    pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
301        let mut sinks = self
302            .metadata_manager
303            .catalog_controller
304            .get_sink_by_ids(vec![sink_id.sink_id as i32])
305            .await?;
306        if sinks.is_empty() {
307            bail!("Sink not found: {}", sink_id.sink_id);
308        }
309        let prost_sink_catalog: PbSink = sinks.remove(0);
310        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
311        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
312        Ok(param)
313    }
314
315    #[allow(dead_code)]
316    pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
317        let sink_param = self.get_sink_param(sink_id).await?;
318        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
319        let table = iceberg_config.load_table().await?;
320        Ok(table)
321    }
322
323    pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
324        let sink_param = self.get_sink_param(sink_id).await?;
325        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
326        Ok(iceberg_config)
327    }
328
329    pub fn add_compactor_stream(
330        &self,
331        context_id: u32,
332        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
333    ) {
334        self.compactor_streams_change_tx
335            .send((context_id, req_stream))
336            .unwrap();
337    }
338
339    pub fn iceberg_compaction_event_loop(
340        iceberg_compaction_manager: Arc<Self>,
341        compactor_streams_change_rx: UnboundedReceiver<(
342            u32,
343            Streaming<SubscribeIcebergCompactionEventRequest>,
344        )>,
345    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
346        let mut join_handle_vec = Vec::default();
347
348        let iceberg_compaction_event_handler =
349            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
350
351        let iceberg_compaction_event_dispatcher =
352            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
353
354        let event_loop = IcebergCompactionEventLoop::new(
355            iceberg_compaction_event_dispatcher,
356            iceberg_compaction_manager.metrics.clone(),
357            compactor_streams_change_rx,
358        );
359
360        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
361        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
362
363        join_handle_vec
364    }
365
366    /// GC loop for expired snapshots management
367    /// This is a separate loop that periodically checks all tracked Iceberg tables
368    /// and performs garbage collection operations like expiring old snapshots
369    pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
370        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
371        let join_handle = tokio::spawn(async move {
372            // Run GC every hour by default
373            const GC_LOOP_INTERVAL_SECS: u64 = 3600;
374            let mut interval =
375                tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
376
377            loop {
378                tokio::select! {
379                    _ = interval.tick() => {
380                        if let Err(e) = manager.perform_gc_operations().await {
381                            tracing::error!(error = ?e.as_report(), "GC operations failed");
382                        }
383                    },
384                    _ = &mut shutdown_rx => {
385                        tracing::info!("Iceberg GC loop is stopped");
386                        return;
387                    }
388                }
389            }
390        });
391
392        (join_handle, shutdown_tx)
393    }
394
395    /// Perform GC operations on all tracked Iceberg tables
396    async fn perform_gc_operations(&self) -> MetaResult<()> {
397        // Get all sink IDs that are currently tracked
398        let sink_ids = {
399            let guard = self.inner.read();
400            guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
401        };
402
403        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
404
405        for sink_id in sink_ids {
406            if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
407                // Continue with other tables even if one fails
408                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
409            }
410        }
411
412        tracing::info!("GC operations completed");
413        Ok(())
414    }
415
416    /// Check snapshot count for a specific table and trigger expiration if needed
417    async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
418        // Configurable thresholds - could be moved to config later
419        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 1 day
420        let now = chrono::Utc::now().timestamp_millis();
421        let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
422
423        let iceberg_config = self.load_iceberg_config(sink_id).await?;
424        if !iceberg_config.enable_snapshot_expiration {
425            return Ok(());
426        }
427
428        let catalog = iceberg_config.create_catalog().await?;
429        let table = catalog
430            .load_table(&iceberg_config.full_table_name()?)
431            .await
432            .map_err(|e| SinkError::Iceberg(e.into()))?;
433
434        let metadata = table.metadata();
435        let mut snapshots = metadata.snapshots().collect_vec();
436        snapshots.sort_by_key(|s| s.timestamp_ms());
437
438        if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
439            // avoid commit empty table updates
440            return Ok(());
441        }
442
443        tracing::info!(
444            "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
445            iceberg_config.catalog_name(),
446            iceberg_config.full_table_name()?,
447            sink_id.sink_id,
448            snapshots.len(),
449        );
450
451        let tx = Transaction::new(&table);
452
453        // TODO: use config
454        let expired_snapshots = tx
455            .expire_snapshot()
456            .clear_expired_files(true)
457            .clear_expired_meta_data(true);
458
459        let tx = expired_snapshots
460            .apply()
461            .await
462            .map_err(|e| SinkError::Iceberg(e.into()))?;
463        tx.commit(catalog.as_ref())
464            .await
465            .map_err(|e| SinkError::Iceberg(e.into()))?;
466
467        tracing::info!(
468            "Expired snapshots for iceberg catalog {} table {} sink-id {}",
469            iceberg_config.catalog_name(),
470            iceberg_config.full_table_name()?,
471            sink_id.sink_id,
472        );
473
474        Ok(())
475    }
476}