risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bail;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId, SinkType};
27use risingwave_connector::sink::iceberg::{IcebergConfig, should_enable_iceberg_cow};
28use risingwave_connector::sink::{SinkError, SinkParam};
29use risingwave_pb::catalog::PbSink;
30use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
31use risingwave_pb::iceberg_compaction::{
32    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
33};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot::Sender;
37use tokio::task::JoinHandle;
38use tonic::Streaming;
39
40use super::MetaSrvEnv;
41use crate::MetaResult;
42use crate::hummock::{
43    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
44    IcebergCompactor, IcebergCompactorManagerRef,
45};
46use crate::manager::MetadataManager;
47use crate::rpc::metrics::MetaMetrics;
48
49pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
50
51type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53type CompactorChangeRx =
54    UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
55
56#[derive(Debug, Clone)]
57struct CommitInfo {
58    count: usize,
59    next_compaction_time: Option<Instant>,
60    compaction_interval: u64,
61}
62
63impl CommitInfo {
64    fn set_processing(&mut self) {
65        self.count = 0;
66        // `set next_compaction_time` to `None` value that means is processing
67        self.next_compaction_time.take();
68    }
69
70    fn initialize(&mut self) {
71        self.count = 0;
72        self.next_compaction_time =
73            Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
74    }
75
76    fn replace(&mut self, commit_info: CommitInfo) {
77        self.count = commit_info.count;
78        self.next_compaction_time = commit_info.next_compaction_time;
79        self.compaction_interval = commit_info.compaction_interval;
80    }
81
82    fn increase_count(&mut self) {
83        self.count += 1;
84    }
85
86    fn update_compaction_interval(&mut self, compaction_interval: u64) {
87        self.compaction_interval = compaction_interval;
88
89        // reset the next compaction time
90        self.next_compaction_time =
91            Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
92    }
93}
94
95pub struct IcebergCompactionHandle {
96    sink_id: SinkId,
97    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
98    metadata_manager: MetadataManager,
99    handle_success: bool,
100
101    /// The commit info of the iceberg compaction handle for recovery.
102    commit_info: CommitInfo,
103}
104
105impl IcebergCompactionHandle {
106    fn new(
107        sink_id: SinkId,
108        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
109        metadata_manager: MetadataManager,
110        commit_info: CommitInfo,
111    ) -> Self {
112        Self {
113            sink_id,
114            inner,
115            metadata_manager,
116            handle_success: false,
117            commit_info,
118        }
119    }
120
121    pub async fn send_compact_task(
122        mut self,
123        compactor: Arc<IcebergCompactor>,
124        task_id: u64,
125    ) -> MetaResult<()> {
126        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
127        let mut sinks = self
128            .metadata_manager
129            .catalog_controller
130            .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
131            .await?;
132        if sinks.is_empty() {
133            // The sink may be deleted, just return Ok.
134            tracing::warn!("Sink not found: {}", self.sink_id.sink_id);
135            return Ok(());
136        }
137        let prost_sink_catalog: PbSink = sinks.remove(0);
138        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
139        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
140        let task_type: TaskType = match param.sink_type {
141            SinkType::AppendOnly | SinkType::ForceAppendOnly => {
142                if risingwave_common::license::Feature::IcebergCompaction
143                    .check_available()
144                    .is_ok()
145                {
146                    TaskType::SmallDataFileCompaction
147                } else {
148                    TaskType::FullCompaction
149                }
150            }
151
152            _ => TaskType::FullCompaction,
153        };
154        let result =
155            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
156                task_id,
157                props: param.properties,
158                task_type: task_type as i32,
159            }));
160
161        if result.is_ok() {
162            self.handle_success = true;
163        }
164
165        result
166    }
167
168    pub fn sink_id(&self) -> SinkId {
169        self.sink_id
170    }
171}
172
173impl Drop for IcebergCompactionHandle {
174    fn drop(&mut self) {
175        if self.handle_success {
176            let mut guard = self.inner.write();
177            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
178                commit_info.initialize();
179            }
180        } else {
181            // If the handle is not successful, we need to reset the commit info
182            // to the original state.
183            // This is to avoid the case where the handle is dropped before the
184            // compaction task is sent.
185            let mut guard = self.inner.write();
186            if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
187                commit_info.replace(self.commit_info.clone());
188            }
189        }
190    }
191}
192
193struct IcebergCompactionManagerInner {
194    pub iceberg_commits: HashMap<SinkId, CommitInfo>,
195}
196
197pub struct IcebergCompactionManager {
198    pub env: MetaSrvEnv,
199    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
200
201    metadata_manager: MetadataManager,
202    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
203
204    compactor_streams_change_tx: CompactorChangeTx,
205
206    pub metrics: Arc<MetaMetrics>,
207}
208
209impl IcebergCompactionManager {
210    pub fn build(
211        env: MetaSrvEnv,
212        metadata_manager: MetadataManager,
213        iceberg_compactor_manager: IcebergCompactorManagerRef,
214        metrics: Arc<MetaMetrics>,
215    ) -> (Arc<Self>, CompactorChangeRx) {
216        let (compactor_streams_change_tx, compactor_streams_change_rx) =
217            tokio::sync::mpsc::unbounded_channel();
218        (
219            Arc::new(Self {
220                env,
221                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
222                    iceberg_commits: HashMap::default(),
223                })),
224                metadata_manager,
225                iceberg_compactor_manager,
226                compactor_streams_change_tx,
227                metrics,
228            }),
229            compactor_streams_change_rx,
230        )
231    }
232
233    pub fn compaction_stat_loop(
234        manager: Arc<Self>,
235        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
236    ) -> (JoinHandle<()>, Sender<()>) {
237        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
238        let join_handle = tokio::spawn(async move {
239            loop {
240                tokio::select! {
241                    Some(stat) = rx.recv() => {
242                        manager.update_iceberg_commit_info(stat);
243                    },
244                    _ = &mut shutdown_rx => {
245                        tracing::info!("Iceberg compaction manager is stopped");
246                        return;
247                    }
248                }
249            }
250        });
251
252        (join_handle, shutdown_tx)
253    }
254
255    pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
256        let mut guard = self.inner.write();
257
258        let IcebergSinkCompactionUpdate {
259            sink_id,
260            compaction_interval,
261            force_compaction,
262        } = msg;
263
264        let compaction_interval = if force_compaction {
265            0
266        } else {
267            compaction_interval
268        };
269
270        // if the compaction interval is changed, we need to reset the commit info when the compaction task is sent of initialized
271        let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
272            count: 0,
273            next_compaction_time: Some(
274                Instant::now() + std::time::Duration::from_secs(compaction_interval),
275            ),
276            compaction_interval,
277        });
278
279        commit_info.increase_count();
280        if commit_info.compaction_interval != compaction_interval {
281            commit_info.update_compaction_interval(compaction_interval);
282        }
283    }
284
285    /// Get the top N iceberg commit sink ids
286    /// Sorted by commit count and next compaction time
287    pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
288        let now = Instant::now();
289        let mut guard = self.inner.write();
290        guard
291            .iceberg_commits
292            .iter_mut()
293            .filter(|(_, commit_info)| {
294                commit_info.count > 0
295                    && if let Some(next_compaction_time) = commit_info.next_compaction_time {
296                        next_compaction_time <= now
297                    } else {
298                        false
299                    }
300            })
301            .sorted_by(|a, b| {
302                b.1.count
303                    .cmp(&a.1.count)
304                    .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
305            })
306            .take(n)
307            .map(|(sink_id, commit_info)| {
308                // reset the commit count and next compaction time and avoid double call
309                let handle = IcebergCompactionHandle::new(
310                    *sink_id,
311                    self.inner.clone(),
312                    self.metadata_manager.clone(),
313                    commit_info.clone(),
314                );
315
316                commit_info.set_processing();
317
318                handle
319            })
320            .collect::<Vec<_>>()
321    }
322
323    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
324        let mut guard = self.inner.write();
325        guard.iceberg_commits.remove(&sink_id);
326    }
327
328    pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
329        let mut sinks = self
330            .metadata_manager
331            .catalog_controller
332            .get_sink_by_ids(vec![sink_id.sink_id as i32])
333            .await?;
334        if sinks.is_empty() {
335            bail!("Sink not found: {}", sink_id.sink_id);
336        }
337        let prost_sink_catalog: PbSink = sinks.remove(0);
338        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
339        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
340        Ok(param)
341    }
342
343    pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
344        let sink_param = self.get_sink_param(sink_id).await?;
345        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
346        Ok(iceberg_config)
347    }
348
349    pub fn add_compactor_stream(
350        &self,
351        context_id: u32,
352        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
353    ) {
354        self.compactor_streams_change_tx
355            .send((context_id, req_stream))
356            .unwrap();
357    }
358
359    pub fn iceberg_compaction_event_loop(
360        iceberg_compaction_manager: Arc<Self>,
361        compactor_streams_change_rx: UnboundedReceiver<(
362            u32,
363            Streaming<SubscribeIcebergCompactionEventRequest>,
364        )>,
365    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
366        let mut join_handle_vec = Vec::default();
367
368        let iceberg_compaction_event_handler =
369            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
370
371        let iceberg_compaction_event_dispatcher =
372            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
373
374        let event_loop = IcebergCompactionEventLoop::new(
375            iceberg_compaction_event_dispatcher,
376            iceberg_compaction_manager.metrics.clone(),
377            compactor_streams_change_rx,
378        );
379
380        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
381        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
382
383        join_handle_vec
384    }
385
386    /// GC loop for expired snapshots management
387    /// This is a separate loop that periodically checks all tracked Iceberg tables
388    /// and performs garbage collection operations like expiring old snapshots
389    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
390        assert!(
391            interval_sec > 0,
392            "Iceberg GC interval must be greater than 0"
393        );
394        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
395        let join_handle = tokio::spawn(async move {
396            tracing::info!(
397                interval_sec = interval_sec,
398                "Starting Iceberg GC loop with configurable interval"
399            );
400            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
401
402            loop {
403                tokio::select! {
404                    _ = interval.tick() => {
405                        if let Err(e) = manager.perform_gc_operations().await {
406                            tracing::error!(error = ?e.as_report(), "GC operations failed");
407                        }
408                    },
409                    _ = &mut shutdown_rx => {
410                        tracing::info!("Iceberg GC loop is stopped");
411                        return;
412                    }
413                }
414            }
415        });
416
417        (join_handle, shutdown_tx)
418    }
419
420    /// Trigger manual compaction for a specific sink and wait for completion
421    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
422    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
423        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
424
425        // Load the initial table state to get the current snapshot
426        let iceberg_config = self.load_iceberg_config(&sink_id).await?;
427        let initial_table = iceberg_config.load_table().await?;
428        let initial_snapshot_id = initial_table
429            .metadata()
430            .current_snapshot()
431            .map(|s| s.snapshot_id())
432            .unwrap_or(0); // Use 0 if no snapshots exist
433        let initial_timestamp = chrono::Utc::now().timestamp_millis();
434
435        // Get a compactor to send the task to
436        let compactor = self
437            .iceberg_compactor_manager
438            .next_compactor()
439            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
440
441        // Generate a unique task ID
442        let task_id = self
443            .env
444            .hummock_seq
445            .next_interval("compaction_task", 1)
446            .await?;
447
448        let sink_param = self.get_sink_param(&sink_id).await?;
449
450        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
451            task_id,
452            props: sink_param.properties,
453            task_type: TaskType::FullCompaction as i32, // default to full compaction
454        }))?;
455
456        tracing::info!(
457            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
458            sink_id.sink_id,
459            task_id
460        );
461
462        self.wait_for_compaction_completion(
463            &sink_id,
464            iceberg_config,
465            initial_snapshot_id,
466            initial_timestamp,
467            task_id,
468        )
469        .await?;
470
471        Ok(task_id)
472    }
473
474    async fn wait_for_compaction_completion(
475        &self,
476        sink_id: &SinkId,
477        iceberg_config: IcebergConfig,
478        initial_snapshot_id: i64,
479        initial_timestamp: i64,
480        task_id: u64,
481    ) -> MetaResult<()> {
482        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
483        const MAX_POLL_INTERVAL_SECS: u64 = 60;
484        const MAX_WAIT_TIME_SECS: u64 = 1800;
485        const BACKOFF_MULTIPLIER: f64 = 1.5;
486
487        let mut elapsed_time = 0;
488        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
489
490        let cow = should_enable_iceberg_cow(
491            iceberg_config.r#type.as_str(),
492            iceberg_config.write_mode.as_str(),
493        );
494
495        while elapsed_time < MAX_WAIT_TIME_SECS {
496            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
497            tokio::time::sleep(poll_interval).await;
498            elapsed_time += current_interval_secs;
499
500            let current_table = iceberg_config.load_table().await?;
501
502            let metadata = current_table.metadata();
503            let new_snapshots: Vec<_> = metadata
504                .snapshots()
505                .filter(|snapshot| {
506                    let snapshot_timestamp = snapshot.timestamp_ms();
507                    let snapshot_id = snapshot.snapshot_id();
508                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
509                })
510                .collect();
511
512            for snapshot in new_snapshots {
513                let summary = snapshot.summary();
514                if cow {
515                    if matches!(summary.operation, Operation::Overwrite) {
516                        return Ok(());
517                    }
518                } else if matches!(summary.operation, Operation::Replace) {
519                    return Ok(());
520                }
521            }
522
523            current_interval_secs = std::cmp::min(
524                MAX_POLL_INTERVAL_SECS,
525                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
526            );
527        }
528
529        Err(anyhow!(
530            "Compaction did not complete within {} seconds for sink {} (task_id={})",
531            MAX_WAIT_TIME_SECS,
532            sink_id.sink_id,
533            task_id
534        )
535        .into())
536    }
537
538    async fn perform_gc_operations(&self) -> MetaResult<()> {
539        let sink_ids = {
540            let guard = self.inner.read();
541            guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
542        };
543
544        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
545
546        for sink_id in sink_ids {
547            if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
548                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
549            }
550        }
551
552        tracing::info!("GC operations completed");
553        Ok(())
554    }
555
556    pub async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
557        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 24 hours
558        let now = chrono::Utc::now().timestamp_millis();
559
560        let iceberg_config = self.load_iceberg_config(sink_id).await?;
561        if !iceberg_config.enable_snapshot_expiration {
562            return Ok(());
563        }
564
565        let catalog = iceberg_config.create_catalog().await?;
566        let table = catalog
567            .load_table(&iceberg_config.full_table_name()?)
568            .await
569            .map_err(|e| SinkError::Iceberg(e.into()))?;
570
571        let metadata = table.metadata();
572        let mut snapshots = metadata.snapshots().collect_vec();
573        snapshots.sort_by_key(|s| s.timestamp_ms());
574
575        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
576
577        let snapshot_expiration_timestamp_ms =
578            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
579                Some(timestamp) => timestamp,
580                None => default_snapshot_expiration_timestamp_ms,
581            };
582
583        if snapshots.is_empty()
584            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
585        {
586            // avoid commit empty table updates
587            return Ok(());
588        }
589
590        tracing::info!(
591            catalog_name = iceberg_config.catalog_name(),
592            table_name = iceberg_config.full_table_name()?.to_string(),
593            sink_id = sink_id.sink_id,
594            snapshots_len = snapshots.len(),
595            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
596            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
597            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
598            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
599            "try trigger snapshots expiration",
600        );
601
602        let tx = Transaction::new(&table);
603
604        let mut expired_snapshots = tx.expire_snapshot();
605
606        expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
607
608        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
609            expired_snapshots = expired_snapshots.retain_last(retain_last);
610        }
611
612        expired_snapshots = expired_snapshots
613            .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
614
615        expired_snapshots = expired_snapshots
616            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
617
618        let tx = expired_snapshots
619            .apply()
620            .await
621            .map_err(|e| SinkError::Iceberg(e.into()))?;
622
623        tx.commit(catalog.as_ref())
624            .await
625            .map_err(|e| SinkError::Iceberg(e.into()))?;
626
627        tracing::info!(
628            catalog_name = iceberg_config.catalog_name(),
629            table_name = iceberg_config.full_table_name()?.to_string(),
630            sink_id = sink_id.sink_id,
631            "Expired snapshots for iceberg table",
632        );
633
634        Ok(())
635    }
636}