risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bail;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId, SinkType};
27use risingwave_connector::sink::iceberg::{IcebergConfig, should_enable_iceberg_cow};
28use risingwave_connector::sink::{SinkError, SinkParam};
29use risingwave_pb::catalog::PbSink;
30use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
31use risingwave_pb::iceberg_compaction::{
32    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
33};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot::Sender;
37use tokio::task::JoinHandle;
38use tonic::Streaming;
39
40use super::MetaSrvEnv;
41use crate::MetaResult;
42use crate::hummock::{
43    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
44    IcebergCompactor, IcebergCompactorManagerRef,
45};
46use crate::manager::MetadataManager;
47use crate::rpc::metrics::MetaMetrics;
48
49pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
50
51type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53type CompactorChangeRx =
54    UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
55
56#[derive(Debug, Clone)]
57struct CommitInfo {
58    count: usize,
59    next_compaction_time: Option<Instant>,
60    compaction_interval: u64,
61}
62
63impl CommitInfo {
64    fn set_processing(&mut self) {
65        self.count = 0;
66        // `set next_compaction_time` to `None` value that means is processing
67        self.next_compaction_time.take();
68    }
69
70    fn initialize(&mut self) {
71        self.count = 0;
72        self.next_compaction_time =
73            Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
74    }
75
76    fn replace(&mut self, commit_info: CommitInfo) {
77        self.count = commit_info.count;
78        self.next_compaction_time = commit_info.next_compaction_time;
79        self.compaction_interval = commit_info.compaction_interval;
80    }
81
82    fn increase_count(&mut self) {
83        self.count += 1;
84    }
85
86    fn update_compaction_interval(&mut self, compaction_interval: u64) {
87        self.compaction_interval = compaction_interval;
88
89        // reset the next compaction time
90        self.next_compaction_time =
91            Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
92    }
93}
94
95pub struct IcebergCompactionHandle {
96    sink_id: SinkId,
97    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
98    metadata_manager: MetadataManager,
99    handle_success: bool,
100
101    /// The commit info of the iceberg compaction handle for recovery.
102    commit_info: CommitInfo,
103}
104
105impl IcebergCompactionHandle {
106    fn new(
107        sink_id: SinkId,
108        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
109        metadata_manager: MetadataManager,
110        commit_info: CommitInfo,
111    ) -> Self {
112        Self {
113            sink_id,
114            inner,
115            metadata_manager,
116            handle_success: false,
117            commit_info,
118        }
119    }
120
121    pub async fn send_compact_task(
122        mut self,
123        compactor: Arc<IcebergCompactor>,
124        task_id: u64,
125    ) -> MetaResult<()> {
126        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
127        let prost_sink_catalog: PbSink = self
128            .metadata_manager
129            .catalog_controller
130            .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
131            .await?
132            .remove(0);
133        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
134        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
135        let task_type: TaskType = match param.sink_type {
136            SinkType::AppendOnly | SinkType::ForceAppendOnly => TaskType::SmallDataFileCompaction,
137
138            _ => TaskType::FullCompaction,
139        };
140        let result =
141            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
142                task_id,
143                props: param.properties,
144                task_type: task_type as i32,
145            }));
146
147        if result.is_ok() {
148            self.handle_success = true;
149        }
150
151        result
152    }
153
154    pub fn sink_id(&self) -> SinkId {
155        self.sink_id
156    }
157}
158
159impl Drop for IcebergCompactionHandle {
160    fn drop(&mut self) {
161        if self.handle_success {
162            let mut guard = self.inner.write();
163            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
164                commit_info.initialize();
165            }
166        } else {
167            // If the handle is not successful, we need to reset the commit info
168            // to the original state.
169            // This is to avoid the case where the handle is dropped before the
170            // compaction task is sent.
171            let mut guard = self.inner.write();
172            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
173                commit_info.replace(self.commit_info.clone());
174            }
175        }
176    }
177}
178
179struct IcebergCompactionManagerInner {
180    pub iceberg_commits: HashMap<SinkId, CommitInfo>,
181}
182
183pub struct IcebergCompactionManager {
184    pub env: MetaSrvEnv,
185    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
186
187    metadata_manager: MetadataManager,
188    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
189
190    compactor_streams_change_tx: CompactorChangeTx,
191
192    pub metrics: Arc<MetaMetrics>,
193}
194
195impl IcebergCompactionManager {
196    pub fn build(
197        env: MetaSrvEnv,
198        metadata_manager: MetadataManager,
199        iceberg_compactor_manager: IcebergCompactorManagerRef,
200        metrics: Arc<MetaMetrics>,
201    ) -> (Arc<Self>, CompactorChangeRx) {
202        let (compactor_streams_change_tx, compactor_streams_change_rx) =
203            tokio::sync::mpsc::unbounded_channel();
204        (
205            Arc::new(Self {
206                env,
207                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
208                    iceberg_commits: HashMap::default(),
209                })),
210                metadata_manager,
211                iceberg_compactor_manager,
212                compactor_streams_change_tx,
213                metrics,
214            }),
215            compactor_streams_change_rx,
216        )
217    }
218
219    pub fn compaction_stat_loop(
220        manager: Arc<Self>,
221        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
222    ) -> (JoinHandle<()>, Sender<()>) {
223        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
224        let join_handle = tokio::spawn(async move {
225            loop {
226                tokio::select! {
227                    Some(stat) = rx.recv() => {
228                        manager.update_iceberg_commit_info(stat);
229                    },
230                    _ = &mut shutdown_rx => {
231                        tracing::info!("Iceberg compaction manager is stopped");
232                        return;
233                    }
234                }
235            }
236        });
237
238        (join_handle, shutdown_tx)
239    }
240
241    pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
242        let mut guard = self.inner.write();
243
244        let IcebergSinkCompactionUpdate {
245            sink_id,
246            compaction_interval,
247        } = msg;
248
249        // if the compaction interval is changed, we need to reset the commit info when the compaction task is sent of initialized
250        let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
251            count: 0,
252            next_compaction_time: Some(
253                Instant::now() + std::time::Duration::from_secs(compaction_interval),
254            ),
255            compaction_interval,
256        });
257
258        commit_info.increase_count();
259        if commit_info.compaction_interval != compaction_interval {
260            commit_info.update_compaction_interval(compaction_interval);
261        }
262    }
263
264    /// Get the top N iceberg commit sink ids
265    /// Sorted by commit count and next compaction time
266    pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
267        let now = Instant::now();
268        let mut guard = self.inner.write();
269        guard
270            .iceberg_commits
271            .iter_mut()
272            .filter(|(_, commit_info)| {
273                commit_info.count > 0
274                    && if let Some(next_compaction_time) = commit_info.next_compaction_time {
275                        next_compaction_time <= now
276                    } else {
277                        false
278                    }
279            })
280            .sorted_by(|a, b| {
281                b.1.count
282                    .cmp(&a.1.count)
283                    .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
284            })
285            .take(n)
286            .map(|(sink_id, commit_info)| {
287                // reset the commit count and next compaction time and avoid double call
288                let handle = IcebergCompactionHandle::new(
289                    *sink_id,
290                    self.inner.clone(),
291                    self.metadata_manager.clone(),
292                    commit_info.clone(),
293                );
294
295                commit_info.set_processing();
296
297                handle
298            })
299            .collect::<Vec<_>>()
300    }
301
302    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
303        let mut guard = self.inner.write();
304        guard.iceberg_commits.remove(&sink_id);
305    }
306
307    pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
308        let mut sinks = self
309            .metadata_manager
310            .catalog_controller
311            .get_sink_by_ids(vec![sink_id.sink_id as i32])
312            .await?;
313        if sinks.is_empty() {
314            bail!("Sink not found: {}", sink_id.sink_id);
315        }
316        let prost_sink_catalog: PbSink = sinks.remove(0);
317        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
318        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
319        Ok(param)
320    }
321
322    pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
323        let sink_param = self.get_sink_param(sink_id).await?;
324        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
325        Ok(iceberg_config)
326    }
327
328    pub fn add_compactor_stream(
329        &self,
330        context_id: u32,
331        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
332    ) {
333        self.compactor_streams_change_tx
334            .send((context_id, req_stream))
335            .unwrap();
336    }
337
338    pub fn iceberg_compaction_event_loop(
339        iceberg_compaction_manager: Arc<Self>,
340        compactor_streams_change_rx: UnboundedReceiver<(
341            u32,
342            Streaming<SubscribeIcebergCompactionEventRequest>,
343        )>,
344    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
345        let mut join_handle_vec = Vec::default();
346
347        let iceberg_compaction_event_handler =
348            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
349
350        let iceberg_compaction_event_dispatcher =
351            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
352
353        let event_loop = IcebergCompactionEventLoop::new(
354            iceberg_compaction_event_dispatcher,
355            iceberg_compaction_manager.metrics.clone(),
356            compactor_streams_change_rx,
357        );
358
359        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
360        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
361
362        join_handle_vec
363    }
364
365    /// GC loop for expired snapshots management
366    /// This is a separate loop that periodically checks all tracked Iceberg tables
367    /// and performs garbage collection operations like expiring old snapshots
368    pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
369        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
370        let join_handle = tokio::spawn(async move {
371            // Run GC every hour by default
372            const GC_LOOP_INTERVAL_SECS: u64 = 3600;
373            let mut interval =
374                tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
375
376            loop {
377                tokio::select! {
378                    _ = interval.tick() => {
379                        if let Err(e) = manager.perform_gc_operations().await {
380                            tracing::error!(error = ?e.as_report(), "GC operations failed");
381                        }
382                    },
383                    _ = &mut shutdown_rx => {
384                        tracing::info!("Iceberg GC loop is stopped");
385                        return;
386                    }
387                }
388            }
389        });
390
391        (join_handle, shutdown_tx)
392    }
393
394    /// Trigger manual compaction for a specific sink and wait for completion
395    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
396    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
397        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
398
399        // Load the initial table state to get the current snapshot
400        let iceberg_config = self.load_iceberg_config(&sink_id).await?;
401        let initial_table = iceberg_config.load_table().await?;
402        let initial_snapshot_id = initial_table
403            .metadata()
404            .current_snapshot()
405            .map(|s| s.snapshot_id())
406            .unwrap_or(0); // Use 0 if no snapshots exist
407        let initial_timestamp = chrono::Utc::now().timestamp_millis();
408
409        // Get a compactor to send the task to
410        let compactor = self
411            .iceberg_compactor_manager
412            .next_compactor()
413            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
414
415        // Generate a unique task ID
416        let task_id = self
417            .env
418            .hummock_seq
419            .next_interval("compaction_task", 1)
420            .await?;
421
422        let sink_param = self.get_sink_param(&sink_id).await?;
423
424        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
425            task_id,
426            props: sink_param.properties,
427            task_type: TaskType::FullCompaction as i32, // default to full compaction
428        }))?;
429
430        tracing::info!(
431            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
432            sink_id.sink_id,
433            task_id
434        );
435
436        self.wait_for_compaction_completion(
437            &sink_id,
438            iceberg_config,
439            initial_snapshot_id,
440            initial_timestamp,
441            task_id,
442        )
443        .await?;
444
445        Ok(task_id)
446    }
447
448    async fn wait_for_compaction_completion(
449        &self,
450        sink_id: &SinkId,
451        iceberg_config: IcebergConfig,
452        initial_snapshot_id: i64,
453        initial_timestamp: i64,
454        task_id: u64,
455    ) -> MetaResult<()> {
456        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
457        const MAX_POLL_INTERVAL_SECS: u64 = 60;
458        const MAX_WAIT_TIME_SECS: u64 = 1800;
459        const BACKOFF_MULTIPLIER: f64 = 1.5;
460
461        let mut elapsed_time = 0;
462        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
463
464        let cow = should_enable_iceberg_cow(
465            iceberg_config.r#type.as_str(),
466            iceberg_config.write_mode.as_str(),
467        );
468
469        while elapsed_time < MAX_WAIT_TIME_SECS {
470            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
471            tokio::time::sleep(poll_interval).await;
472            elapsed_time += current_interval_secs;
473
474            let current_table = iceberg_config.load_table().await?;
475
476            let metadata = current_table.metadata();
477            let new_snapshots: Vec<_> = metadata
478                .snapshots()
479                .filter(|snapshot| {
480                    let snapshot_timestamp = snapshot.timestamp_ms();
481                    let snapshot_id = snapshot.snapshot_id();
482                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
483                })
484                .collect();
485
486            for snapshot in new_snapshots {
487                let summary = snapshot.summary();
488                if cow {
489                    if matches!(summary.operation, Operation::Overwrite) {
490                        return Ok(());
491                    }
492                } else if matches!(summary.operation, Operation::Replace) {
493                    return Ok(());
494                }
495            }
496
497            current_interval_secs = std::cmp::min(
498                MAX_POLL_INTERVAL_SECS,
499                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
500            );
501        }
502
503        Err(anyhow!(
504            "Compaction did not complete within {} seconds for sink {} (task_id={})",
505            MAX_WAIT_TIME_SECS,
506            sink_id.sink_id,
507            task_id
508        )
509        .into())
510    }
511
512    async fn perform_gc_operations(&self) -> MetaResult<()> {
513        let sink_ids = {
514            let guard = self.inner.read();
515            guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
516        };
517
518        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
519
520        for sink_id in sink_ids {
521            if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
522                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
523            }
524        }
525
526        tracing::info!("GC operations completed");
527        Ok(())
528    }
529
530    async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
531        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
532        let now = chrono::Utc::now().timestamp_millis();
533        let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
534
535        let iceberg_config = self.load_iceberg_config(sink_id).await?;
536        if !iceberg_config.enable_snapshot_expiration {
537            return Ok(());
538        }
539
540        let catalog = iceberg_config.create_catalog().await?;
541        let table = catalog
542            .load_table(&iceberg_config.full_table_name()?)
543            .await
544            .map_err(|e| SinkError::Iceberg(e.into()))?;
545
546        let metadata = table.metadata();
547        let mut snapshots = metadata.snapshots().collect_vec();
548        snapshots.sort_by_key(|s| s.timestamp_ms());
549
550        if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
551            // avoid commit empty table updates
552            return Ok(());
553        }
554
555        tracing::info!(
556            "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
557            iceberg_config.catalog_name(),
558            iceberg_config.full_table_name()?,
559            sink_id.sink_id,
560            snapshots.len(),
561        );
562
563        let tx = Transaction::new(&table);
564
565        // TODO: use config
566        let expired_snapshots = tx
567            .expire_snapshot()
568            .clear_expired_files(true)
569            .clear_expired_meta_data(true);
570
571        let tx = expired_snapshots
572            .apply()
573            .await
574            .map_err(|e| SinkError::Iceberg(e.into()))?;
575        tx.commit(catalog.as_ref())
576            .await
577            .map_err(|e| SinkError::Iceberg(e.into()))?;
578
579        tracing::info!(
580            "Expired snapshots for iceberg catalog {} table {} sink-id {}",
581            iceberg_config.catalog_name(),
582            iceberg_config.full_table_name()?,
583            sink_id.sink_id,
584        );
585
586        Ok(())
587    }
588}