risingwave_meta/manager/
iceberg_compaction.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::{ApplyTransactionAction, Transaction};
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28    CompactionType, IcebergConfig, commit_branch, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33    IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44    IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45    IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53    UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56    UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58/// Snapshot for restoring track state on failure
59#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61    next_compaction_time: Option<Instant>,
62}
63
64/// Compaction track states using type-safe state machine pattern
65#[derive(Debug, Clone)]
66enum CompactionTrackState {
67    /// Ready to accept commits and check for trigger conditions
68    Idle { next_compaction_time: Instant },
69    /// Compaction task is being processed
70    Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75    task_type: TaskType,
76    trigger_interval_sec: u64,
77    /// Minimum snapshot count threshold to trigger compaction (early trigger).
78    /// Compaction triggers when `pending_snapshot_count` >= this threshold, even before interval expires.
79    trigger_snapshot_count: usize,
80    state: CompactionTrackState,
81}
82
83impl CompactionTrack {
84    /// Determines if compaction should be triggered.
85    ///
86    /// Trigger conditions (OR logic):
87    /// 1. `snapshot_ready` - Snapshot count >= threshold (early trigger)
88    /// 2. `time_ready && has_snapshots` - Interval expired and there's at least 1 snapshot
89    ///
90    /// This ensures:
91    /// - `trigger_snapshot_count` is an early trigger threshold
92    /// - `compaction_interval_sec` is the maximum wait time (as long as there are new snapshots)
93    /// - Force compaction works by setting `next_compaction_time` to now
94    /// - No empty compaction runs (requires at least 1 snapshot for time-based trigger)
95    fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
96        // Only Idle state can trigger
97        let next_compaction_time = match &self.state {
98            CompactionTrackState::Idle {
99                next_compaction_time,
100            } => *next_compaction_time,
101            CompactionTrackState::Processing => return false,
102        };
103
104        // Check conditions
105        let time_ready = now >= next_compaction_time;
106        let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
107        let has_snapshots = snapshot_count > 0;
108
109        // OR logic: snapshot threshold triggers early,
110        // time trigger requires at least 1 snapshot to avoid empty compaction
111        snapshot_ready || (time_ready && has_snapshots)
112    }
113
114    /// Create snapshot and transition to Processing state
115    fn start_processing(&mut self) -> CompactionTrackSnapshot {
116        match &self.state {
117            CompactionTrackState::Idle {
118                next_compaction_time,
119            } => {
120                let snapshot = CompactionTrackSnapshot {
121                    next_compaction_time: Some(*next_compaction_time),
122                };
123                self.state = CompactionTrackState::Processing;
124                snapshot
125            }
126            CompactionTrackState::Processing => {
127                unreachable!("Cannot start processing when already processing")
128            }
129        }
130    }
131
132    /// Complete processing successfully
133    fn complete_processing(&mut self) {
134        match &self.state {
135            CompactionTrackState::Processing => {
136                self.state = CompactionTrackState::Idle {
137                    next_compaction_time: Instant::now()
138                        + std::time::Duration::from_secs(self.trigger_interval_sec),
139                };
140            }
141            CompactionTrackState::Idle { .. } => {
142                unreachable!("Cannot complete processing when not processing")
143            }
144        }
145    }
146
147    /// Restore from snapshot on failure
148    fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
149        self.state = CompactionTrackState::Idle {
150            next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
151        };
152    }
153
154    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
155        if self.trigger_interval_sec == new_interval_sec {
156            return;
157        }
158
159        self.trigger_interval_sec = new_interval_sec;
160
161        // Reset next_compaction_time based on current state
162        match &mut self.state {
163            CompactionTrackState::Idle {
164                next_compaction_time,
165            } => {
166                *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
167            }
168            CompactionTrackState::Processing => {
169                // Keep Processing state, will reset time when completing
170            }
171        }
172    }
173}
174
175// Removed CompactionScheduleState - each sink now only has one CompactionTrack
176pub struct IcebergCompactionHandle {
177    sink_id: SinkId,
178    task_type: TaskType,
179    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
180    metadata_manager: MetadataManager,
181    handle_success: bool,
182
183    /// Snapshot of the compaction track for recovery.
184    track_snapshot: CompactionTrackSnapshot,
185}
186
187impl IcebergCompactionHandle {
188    fn new(
189        sink_id: SinkId,
190        task_type: TaskType,
191        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
192        metadata_manager: MetadataManager,
193        track_snapshot: CompactionTrackSnapshot,
194    ) -> Self {
195        Self {
196            sink_id,
197            task_type,
198            inner,
199            metadata_manager,
200            handle_success: false,
201            track_snapshot,
202        }
203    }
204
205    pub async fn send_compact_task(
206        mut self,
207        compactor: Arc<IcebergCompactor>,
208        task_id: u64,
209    ) -> MetaResult<()> {
210        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
211        let Some(prost_sink_catalog) = self
212            .metadata_manager
213            .catalog_controller
214            .get_sink_by_id(self.sink_id)
215            .await?
216        else {
217            // The sink may be deleted, just return Ok.
218            tracing::warn!("Sink not found: {}", self.sink_id);
219            return Ok(());
220        };
221        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
222        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
223
224        let result =
225            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
226                task_id,
227                props: param.properties,
228                task_type: self.task_type as i32,
229            }));
230
231        if result.is_ok() {
232            self.handle_success = true;
233        }
234
235        result
236    }
237
238    pub fn sink_id(&self) -> SinkId {
239        self.sink_id
240    }
241}
242
243impl Drop for IcebergCompactionHandle {
244    fn drop(&mut self) {
245        let mut guard = self.inner.write();
246        if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
247            // Only restore/complete if this handle's task_type matches the track's task_type
248            if track.task_type == self.task_type {
249                if self.handle_success {
250                    track.complete_processing();
251                } else {
252                    // If the handle is not successful, we need to restore the track from snapshot.
253                    // This is to avoid the case where the handle is dropped before the compaction task is sent.
254                    track.restore_from_snapshot(self.track_snapshot.clone());
255                }
256            }
257        }
258    }
259}
260
261struct IcebergCompactionManagerInner {
262    pub sink_schedules: HashMap<SinkId, CompactionTrack>,
263}
264
265#[derive(Debug, Clone)]
266pub struct IcebergCompactionScheduleStatus {
267    pub sink_id: SinkId,
268    pub task_type: String,
269    pub trigger_interval_sec: u64,
270    pub trigger_snapshot_count: usize,
271    pub schedule_state: String,
272    pub next_compaction_after_sec: Option<u64>,
273    pub pending_snapshot_count: Option<usize>,
274    pub is_triggerable: bool,
275}
276
277pub struct IcebergCompactionManager {
278    pub env: MetaSrvEnv,
279    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
280
281    metadata_manager: MetadataManager,
282    pub iceberg_compactor_manager: IcebergCompactorManagerRef,
283
284    compactor_streams_change_tx: CompactorChangeTx,
285
286    pub metrics: Arc<MetaMetrics>,
287}
288
289impl IcebergCompactionManager {
290    pub fn build(
291        env: MetaSrvEnv,
292        metadata_manager: MetadataManager,
293        iceberg_compactor_manager: IcebergCompactorManagerRef,
294        metrics: Arc<MetaMetrics>,
295    ) -> (Arc<Self>, CompactorChangeRx) {
296        let (compactor_streams_change_tx, compactor_streams_change_rx) =
297            tokio::sync::mpsc::unbounded_channel();
298        (
299            Arc::new(Self {
300                env,
301                inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
302                    sink_schedules: HashMap::default(),
303                })),
304                metadata_manager,
305                iceberg_compactor_manager,
306                compactor_streams_change_tx,
307                metrics,
308            }),
309            compactor_streams_change_rx,
310        )
311    }
312
313    pub fn compaction_stat_loop(
314        manager: Arc<Self>,
315        mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
316    ) -> (JoinHandle<()>, Sender<()>) {
317        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
318        let join_handle = tokio::spawn(async move {
319            loop {
320                tokio::select! {
321                    Some(stat) = rx.recv() => {
322                        manager.update_iceberg_commit_info(stat).await;
323                    },
324                    _ = &mut shutdown_rx => {
325                        tracing::info!("Iceberg compaction manager is stopped");
326                        return;
327                    }
328                }
329            }
330        });
331
332        (join_handle, shutdown_tx)
333    }
334
335    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
336        let IcebergSinkCompactionUpdate {
337            sink_id,
338            compaction_interval,
339            force_compaction,
340        } = msg;
341
342        // Check if track exists
343        let track_exists = {
344            let guard = self.inner.read();
345            guard.sink_schedules.contains_key(&sink_id)
346        };
347
348        // Create track if it doesn't exist
349        if !track_exists {
350            // Load config first (async operation outside of lock)
351            let iceberg_config = self.load_iceberg_config(sink_id).await;
352
353            let new_track = match iceberg_config {
354                Ok(config) => {
355                    // Call synchronous create function with the config
356                    match self.create_compaction_track(sink_id, &config) {
357                        Ok(track) => track,
358                        Err(e) => {
359                            tracing::error!(
360                                error = ?e.as_report(),
361                                "Failed to create compaction track from config for sink {}, using default Full track",
362                                sink_id
363                            );
364                            // Fallback to default Full track
365                            CompactionTrack {
366                                task_type: TaskType::Full,
367                                trigger_interval_sec: compaction_interval,
368                                trigger_snapshot_count: 10,
369                                state: CompactionTrackState::Idle {
370                                    next_compaction_time: Instant::now()
371                                        + std::time::Duration::from_secs(compaction_interval),
372                                },
373                            }
374                        }
375                    }
376                }
377                Err(e) => {
378                    tracing::error!(
379                        error = ?e.as_report(),
380                        "Failed to load iceberg config for sink {}, using default Full track",
381                        sink_id
382                    );
383                    // Fallback to default Full track
384                    CompactionTrack {
385                        task_type: TaskType::Full,
386                        trigger_interval_sec: compaction_interval,
387                        trigger_snapshot_count: 10,
388                        state: CompactionTrackState::Idle {
389                            next_compaction_time: Instant::now()
390                                + std::time::Duration::from_secs(compaction_interval),
391                        },
392                    }
393                }
394            };
395
396            let mut guard = self.inner.write();
397            guard.sink_schedules.insert(sink_id, new_track);
398        }
399
400        // Update track
401        let mut guard = self.inner.write();
402        if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
403            // Force compaction: trigger immediately by setting next_compaction_time to now
404            if force_compaction {
405                if let CompactionTrackState::Idle {
406                    next_compaction_time,
407                } = &mut track.state
408                {
409                    *next_compaction_time = Instant::now();
410                }
411                // Skip Processing tracks - they will complete naturally
412            } else {
413                // Update interval if changed
414                track.update_interval(compaction_interval, Instant::now());
415            }
416        } else {
417            tracing::error!(
418                "Failed to find compaction track for sink {} during update; configuration changes not applied.",
419                sink_id
420            );
421        }
422    }
423
424    /// Create a compaction track for a sink based on its Iceberg configuration
425    fn create_compaction_track(
426        &self,
427        _sink_id: SinkId,
428        iceberg_config: &IcebergConfig,
429    ) -> MetaResult<CompactionTrack> {
430        let trigger_interval_sec = iceberg_config.compaction_interval_sec();
431        let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
432
433        // For `copy-on-write` mode, always use Full compaction regardless of config
434        let task_type =
435            if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
436            {
437                TaskType::Full
438            } else {
439                // For `merge-on-read` mode, use configured compaction_type
440                match iceberg_config.compaction_type() {
441                    CompactionType::Full => TaskType::Full,
442                    CompactionType::SmallFiles => TaskType::SmallFiles,
443                    CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
444                }
445            };
446
447        Ok(CompactionTrack {
448            task_type,
449            trigger_interval_sec,
450            trigger_snapshot_count,
451            state: CompactionTrackState::Idle {
452                next_compaction_time: Instant::now()
453                    + std::time::Duration::from_secs(trigger_interval_sec),
454            },
455        })
456    }
457
458    /// Get the top N compaction tasks to trigger
459    /// Returns handles for tasks that are ready to be compacted
460    /// Sorted by commit count and next compaction time
461    pub async fn get_top_n_iceberg_commit_sink_ids(
462        &self,
463        n: usize,
464    ) -> Vec<IcebergCompactionHandle> {
465        let now = Instant::now();
466
467        // Collect all sink_ids to check
468        let sink_ids: Vec<SinkId> = {
469            let guard = self.inner.read();
470            guard.sink_schedules.keys().cloned().collect()
471        };
472
473        // Fetch snapshot counts for all sinks in parallel
474        let snapshot_count_futures = sink_ids
475            .iter()
476            .map(|sink_id| async move {
477                let count = self.get_pending_snapshot_count(*sink_id).await?;
478                Some((*sink_id, count))
479            })
480            .collect::<Vec<_>>();
481
482        let snapshot_counts: HashMap<SinkId, usize> =
483            futures::future::join_all(snapshot_count_futures)
484                .await
485                .into_iter()
486                .flatten()
487                .collect();
488
489        let mut guard = self.inner.write();
490
491        // Collect all triggerable tasks with their priority info
492        let mut candidates = Vec::new();
493        for (sink_id, track) in &guard.sink_schedules {
494            // Skip sinks that failed to get snapshot count
495            let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
496                continue;
497            };
498            if track.should_trigger(now, snapshot_count) {
499                // Extract next_time from Idle state (triggerable means Idle)
500                if let CompactionTrackState::Idle {
501                    next_compaction_time,
502                } = track.state
503                {
504                    candidates.push((*sink_id, track.task_type, next_compaction_time));
505                }
506            }
507        }
508
509        // Sort by next_compaction_time (ascending) - earlier times have higher priority
510        candidates.sort_by(|a, b| a.2.cmp(&b.2));
511
512        // Take top N and create handles
513        candidates
514            .into_iter()
515            .take(n)
516            .filter_map(|(sink_id, task_type, _)| {
517                let track = guard.sink_schedules.get_mut(&sink_id)?;
518
519                let track_snapshot = track.start_processing();
520
521                Some(IcebergCompactionHandle::new(
522                    sink_id,
523                    task_type,
524                    self.inner.clone(),
525                    self.metadata_manager.clone(),
526                    track_snapshot,
527                ))
528            })
529            .collect()
530    }
531
532    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
533        let mut guard = self.inner.write();
534        guard.sink_schedules.remove(&sink_id);
535    }
536
537    pub async fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
538        let now = Instant::now();
539        let schedules = {
540            let guard = self.inner.read();
541            guard
542                .sink_schedules
543                .iter()
544                .map(|(&sink_id, track)| (sink_id, track.clone()))
545                .collect_vec()
546        };
547
548        let mut statuses =
549            futures::future::join_all(schedules.into_iter().map(|(sink_id, track)| async move {
550                let pending_snapshot_count = self.get_pending_snapshot_count(sink_id).await;
551                let next_compaction_after_sec = match &track.state {
552                    CompactionTrackState::Idle {
553                        next_compaction_time,
554                    } => Some(
555                        next_compaction_time
556                            .saturating_duration_since(now)
557                            .as_secs(),
558                    ),
559                    CompactionTrackState::Processing => None,
560                };
561                let is_triggerable = pending_snapshot_count
562                    .map(|snapshot_count| track.should_trigger(now, snapshot_count))
563                    .unwrap_or(false);
564
565                IcebergCompactionScheduleStatus {
566                    sink_id,
567                    task_type: track.task_type.as_str_name().to_ascii_lowercase(),
568                    trigger_interval_sec: track.trigger_interval_sec,
569                    trigger_snapshot_count: track.trigger_snapshot_count,
570                    schedule_state: match track.state {
571                        CompactionTrackState::Idle { .. } => "idle".to_owned(),
572                        CompactionTrackState::Processing => "processing".to_owned(),
573                    },
574                    next_compaction_after_sec,
575                    pending_snapshot_count,
576                    is_triggerable,
577                }
578            }))
579            .await;
580
581        statuses.sort_by_key(|status| status.sink_id);
582        statuses
583    }
584
585    pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
586        let prost_sink_catalog = self
587            .metadata_manager
588            .catalog_controller
589            .get_sink_by_id(sink_id)
590            .await?
591            .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
592        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
593        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
594        Ok(param)
595    }
596
597    pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
598        let sink_param = self.get_sink_param(sink_id).await?;
599        let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
600        Ok(iceberg_config)
601    }
602
603    pub fn add_compactor_stream(
604        &self,
605        context_id: WorkerId,
606        req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
607    ) {
608        self.compactor_streams_change_tx
609            .send((context_id, req_stream))
610            .unwrap();
611    }
612
613    pub fn iceberg_compaction_event_loop(
614        iceberg_compaction_manager: Arc<Self>,
615        compactor_streams_change_rx: UnboundedReceiver<(
616            WorkerId,
617            Streaming<SubscribeIcebergCompactionEventRequest>,
618        )>,
619    ) -> Vec<(JoinHandle<()>, Sender<()>)> {
620        let mut join_handle_vec = Vec::default();
621
622        let iceberg_compaction_event_handler =
623            IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
624
625        let iceberg_compaction_event_dispatcher =
626            IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
627
628        let event_loop = IcebergCompactionEventLoop::new(
629            iceberg_compaction_event_dispatcher,
630            iceberg_compaction_manager.metrics.clone(),
631            compactor_streams_change_rx,
632        );
633
634        let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
635        join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
636
637        join_handle_vec
638    }
639
640    /// GC loop for expired snapshots management
641    /// This is a separate loop that periodically checks all tracked Iceberg tables
642    /// and performs garbage collection operations like expiring old snapshots
643    pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
644        assert!(
645            interval_sec > 0,
646            "Iceberg GC interval must be greater than 0"
647        );
648        let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
649        let join_handle = tokio::spawn(async move {
650            tracing::info!(
651                interval_sec = interval_sec,
652                "Starting Iceberg GC loop with configurable interval"
653            );
654            let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
655
656            loop {
657                tokio::select! {
658                    _ = interval.tick() => {
659                        if let Err(e) = manager.perform_gc_operations().await {
660                            tracing::error!(error = ?e.as_report(), "GC operations failed");
661                        }
662                    },
663                    _ = &mut shutdown_rx => {
664                        tracing::info!("Iceberg GC loop is stopped");
665                        return;
666                    }
667                }
668            }
669        });
670
671        (join_handle, shutdown_tx)
672    }
673
674    /// Trigger manual compaction for a specific sink and wait for completion
675    /// This method records the initial snapshot, sends a compaction task, then waits for a new snapshot with replace operation
676    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
677        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
678
679        // Load the initial table state to get the current snapshot
680        let iceberg_config = self.load_iceberg_config(sink_id).await?;
681        let initial_table = iceberg_config.load_table().await?;
682        let initial_snapshot_id = initial_table
683            .metadata()
684            .current_snapshot()
685            .map(|s| s.snapshot_id())
686            .unwrap_or(0); // Use 0 if no snapshots exist
687        let initial_timestamp = chrono::Utc::now().timestamp_millis();
688
689        // Get a compactor to send the task to
690        let compactor = self
691            .iceberg_compactor_manager
692            .next_compactor()
693            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
694
695        // Generate a unique task ID
696        let task_id = self
697            .env
698            .hummock_seq
699            .next_interval("compaction_task", 1)
700            .await?;
701
702        let sink_param = self.get_sink_param(sink_id).await?;
703
704        compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
705            task_id,
706            props: sink_param.properties,
707            task_type: TaskType::Full as i32, // default to full compaction
708        }))?;
709
710        tracing::info!(
711            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
712            sink_id,
713            task_id
714        );
715
716        self.wait_for_compaction_completion(
717            &sink_id,
718            iceberg_config,
719            initial_snapshot_id,
720            initial_timestamp,
721            task_id,
722        )
723        .await?;
724
725        Ok(task_id)
726    }
727
728    async fn wait_for_compaction_completion(
729        &self,
730        sink_id: &SinkId,
731        iceberg_config: IcebergConfig,
732        initial_snapshot_id: i64,
733        initial_timestamp: i64,
734        task_id: u64,
735    ) -> MetaResult<()> {
736        const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
737        const MAX_POLL_INTERVAL_SECS: u64 = 60;
738        const MAX_WAIT_TIME_SECS: u64 = 1800;
739        const BACKOFF_MULTIPLIER: f64 = 1.5;
740
741        let mut elapsed_time = 0;
742        let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
743
744        let cow =
745            should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
746
747        while elapsed_time < MAX_WAIT_TIME_SECS {
748            let poll_interval = std::time::Duration::from_secs(current_interval_secs);
749            tokio::time::sleep(poll_interval).await;
750            elapsed_time += current_interval_secs;
751
752            tracing::info!(
753                "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
754                sink_id,
755                task_id,
756                elapsed_time,
757                current_interval_secs
758            );
759
760            let current_table = iceberg_config.load_table().await?;
761
762            let metadata = current_table.metadata();
763            let new_snapshots: Vec<_> = metadata
764                .snapshots()
765                .filter(|snapshot| {
766                    let snapshot_timestamp = snapshot.timestamp_ms();
767                    let snapshot_id = snapshot.snapshot_id();
768                    snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
769                })
770                .collect();
771
772            for snapshot in new_snapshots {
773                let summary = snapshot.summary();
774                if cow {
775                    if matches!(summary.operation, Operation::Overwrite) {
776                        tracing::info!(
777                            "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
778                            sink_id,
779                            task_id
780                        );
781                        return Ok(());
782                    }
783                } else if matches!(summary.operation, Operation::Replace) {
784                    tracing::info!(
785                        "Iceberg compaction completed for sink {} task_id={} with Replace operation",
786                        sink_id,
787                        task_id
788                    );
789                    return Ok(());
790                }
791            }
792
793            current_interval_secs = std::cmp::min(
794                MAX_POLL_INTERVAL_SECS,
795                ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
796            );
797        }
798
799        Err(anyhow!(
800            "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
801            MAX_WAIT_TIME_SECS,
802            sink_id,
803            task_id
804        )
805        .into())
806    }
807
808    async fn perform_gc_operations(&self) -> MetaResult<()> {
809        let sink_ids = {
810            let guard = self.inner.read();
811            guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
812        };
813
814        tracing::info!("Starting GC operations for {} tables", sink_ids.len());
815
816        for sink_id in sink_ids {
817            if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
818                tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
819            }
820        }
821
822        tracing::info!("GC operations completed");
823        Ok(())
824    }
825
826    /// Get the number of snapshots pending compaction for a sink's Iceberg table.
827    /// Returns None if the table cannot be loaded.
828    ///
829    /// Counts snapshots since last compaction:
830    /// - For COW mode: Counts snapshots on `ingestion` branch with timestamp > current snapshot on main
831    /// - For MORE mode: Counts snapshots since last `Replace` on main branch
832    async fn get_pending_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
833        let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
834        let is_cow_mode =
835            should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
836        let catalog = iceberg_config.create_catalog().await.ok()?;
837        let table_name = iceberg_config.full_table_name().ok()?;
838        let table = catalog.load_table(&table_name).await.ok()?;
839        let metadata = table.metadata();
840
841        if is_cow_mode {
842            // COW mode: count snapshots on ingestion branch since last compaction.
843            // Compaction writes Overwrite to main branch, so the current snapshot on main
844            // is the last compaction point.
845
846            // Get last compaction timestamp from main branch's current snapshot
847            let last_compaction_timestamp = metadata
848                .current_snapshot()
849                .map(|s| s.timestamp_ms())
850                .unwrap_or(0); // 0 means no compaction has happened yet
851
852            // Count snapshots on ingestion branch with timestamp > last compaction
853            let branch = commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
854            let current_snapshot = metadata.snapshot_for_ref(&branch)?;
855
856            let mut count = 0;
857            let mut snapshot_id = Some(current_snapshot.snapshot_id());
858
859            while let Some(id) = snapshot_id {
860                let snapshot = metadata.snapshot_by_id(id)?;
861                if snapshot.timestamp_ms() > last_compaction_timestamp {
862                    count += 1;
863                    snapshot_id = snapshot.parent_snapshot_id();
864                } else {
865                    // Reached snapshots before or at last compaction, stop counting
866                    break;
867                }
868            }
869
870            Some(count)
871        } else {
872            // MORE mode: simple rposition on all snapshots (main branch only)
873            let mut snapshots = metadata.snapshots().collect_vec();
874            if snapshots.is_empty() {
875                return Some(0);
876            }
877
878            snapshots.sort_by_key(|s| s.timestamp_ms());
879
880            let last_replace_index = snapshots
881                .iter()
882                .rposition(|s| matches!(s.summary().operation, Operation::Replace));
883
884            let count = match last_replace_index {
885                Some(index) => snapshots.len() - index - 1,
886                None => snapshots.len(),
887            };
888
889            Some(count)
890        }
891    }
892
893    pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
894        const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; // 24 hours
895        let now = chrono::Utc::now().timestamp_millis();
896
897        let iceberg_config = self.load_iceberg_config(sink_id).await?;
898        if !iceberg_config.enable_snapshot_expiration {
899            return Ok(());
900        }
901
902        let catalog = iceberg_config.create_catalog().await?;
903        let mut table = catalog
904            .load_table(&iceberg_config.full_table_name()?)
905            .await
906            .map_err(|e| SinkError::Iceberg(e.into()))?;
907
908        let metadata = table.metadata();
909        let mut snapshots = metadata.snapshots().collect_vec();
910        snapshots.sort_by_key(|s| s.timestamp_ms());
911
912        let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
913
914        let snapshot_expiration_timestamp_ms =
915            match iceberg_config.snapshot_expiration_timestamp_ms(now) {
916                Some(timestamp) => timestamp,
917                None => default_snapshot_expiration_timestamp_ms,
918            };
919
920        if snapshots.is_empty()
921            || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
922        {
923            // avoid commit empty table updates
924            return Ok(());
925        }
926
927        tracing::info!(
928            catalog_name = iceberg_config.catalog_name(),
929            table_name = iceberg_config.full_table_name()?.to_string(),
930            %sink_id,
931            snapshots_len = snapshots.len(),
932            snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
933            snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
934            clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
935            clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
936            "try trigger snapshots expiration",
937        );
938
939        let txn = Transaction::new(&table);
940
941        let mut expired_snapshots = txn
942            .expire_snapshot()
943            .expire_older_than(snapshot_expiration_timestamp_ms)
944            .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
945            .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
946
947        if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
948            expired_snapshots = expired_snapshots.retain_last(retain_last);
949        }
950
951        let before_metadata = table.metadata_ref();
952        let tx = expired_snapshots
953            .apply(txn)
954            .map_err(|e| SinkError::Iceberg(e.into()))?;
955        table = tx
956            .commit(catalog.as_ref())
957            .await
958            .map_err(|e| SinkError::Iceberg(e.into()))?;
959
960        if iceberg_config.snapshot_expiration_clear_expired_files {
961            table
962                .cleanup_expired_files(&before_metadata)
963                .await
964                .map_err(|e| SinkError::Iceberg(e.into()))?;
965        }
966
967        tracing::info!(
968            catalog_name = iceberg_config.catalog_name(),
969            table_name = iceberg_config.full_table_name()?.to_string(),
970            %sink_id,
971            "Expired snapshots for iceberg table",
972        );
973
974        Ok(())
975    }
976}