1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::{ApplyTransactionAction, Transaction};
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28 CompactionType, IcebergConfig, commit_branch, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45 IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53 UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56 UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61 next_compaction_time: Option<Instant>,
62}
63
64#[derive(Debug, Clone)]
66enum CompactionTrackState {
67 Idle { next_compaction_time: Instant },
69 Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75 task_type: TaskType,
76 trigger_interval_sec: u64,
77 trigger_snapshot_count: usize,
80 state: CompactionTrackState,
81}
82
83impl CompactionTrack {
84 fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
96 let next_compaction_time = match &self.state {
98 CompactionTrackState::Idle {
99 next_compaction_time,
100 } => *next_compaction_time,
101 CompactionTrackState::Processing => return false,
102 };
103
104 let time_ready = now >= next_compaction_time;
106 let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
107 let has_snapshots = snapshot_count > 0;
108
109 snapshot_ready || (time_ready && has_snapshots)
112 }
113
114 fn start_processing(&mut self) -> CompactionTrackSnapshot {
116 match &self.state {
117 CompactionTrackState::Idle {
118 next_compaction_time,
119 } => {
120 let snapshot = CompactionTrackSnapshot {
121 next_compaction_time: Some(*next_compaction_time),
122 };
123 self.state = CompactionTrackState::Processing;
124 snapshot
125 }
126 CompactionTrackState::Processing => {
127 unreachable!("Cannot start processing when already processing")
128 }
129 }
130 }
131
132 fn complete_processing(&mut self) {
134 match &self.state {
135 CompactionTrackState::Processing => {
136 self.state = CompactionTrackState::Idle {
137 next_compaction_time: Instant::now()
138 + std::time::Duration::from_secs(self.trigger_interval_sec),
139 };
140 }
141 CompactionTrackState::Idle { .. } => {
142 unreachable!("Cannot complete processing when not processing")
143 }
144 }
145 }
146
147 fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
149 self.state = CompactionTrackState::Idle {
150 next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
151 };
152 }
153
154 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
155 if self.trigger_interval_sec == new_interval_sec {
156 return;
157 }
158
159 self.trigger_interval_sec = new_interval_sec;
160
161 match &mut self.state {
163 CompactionTrackState::Idle {
164 next_compaction_time,
165 } => {
166 *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
167 }
168 CompactionTrackState::Processing => {
169 }
171 }
172 }
173}
174
175pub struct IcebergCompactionHandle {
177 sink_id: SinkId,
178 task_type: TaskType,
179 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
180 metadata_manager: MetadataManager,
181 handle_success: bool,
182
183 track_snapshot: CompactionTrackSnapshot,
185}
186
187impl IcebergCompactionHandle {
188 fn new(
189 sink_id: SinkId,
190 task_type: TaskType,
191 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
192 metadata_manager: MetadataManager,
193 track_snapshot: CompactionTrackSnapshot,
194 ) -> Self {
195 Self {
196 sink_id,
197 task_type,
198 inner,
199 metadata_manager,
200 handle_success: false,
201 track_snapshot,
202 }
203 }
204
205 pub async fn send_compact_task(
206 mut self,
207 compactor: Arc<IcebergCompactor>,
208 task_id: u64,
209 ) -> MetaResult<()> {
210 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
211 let Some(prost_sink_catalog) = self
212 .metadata_manager
213 .catalog_controller
214 .get_sink_by_id(self.sink_id)
215 .await?
216 else {
217 tracing::warn!("Sink not found: {}", self.sink_id);
219 return Ok(());
220 };
221 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
222 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
223
224 let result =
225 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
226 task_id,
227 props: param.properties,
228 task_type: self.task_type as i32,
229 }));
230
231 if result.is_ok() {
232 self.handle_success = true;
233 }
234
235 result
236 }
237
238 pub fn sink_id(&self) -> SinkId {
239 self.sink_id
240 }
241}
242
243impl Drop for IcebergCompactionHandle {
244 fn drop(&mut self) {
245 let mut guard = self.inner.write();
246 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
247 if track.task_type == self.task_type {
249 if self.handle_success {
250 track.complete_processing();
251 } else {
252 track.restore_from_snapshot(self.track_snapshot.clone());
255 }
256 }
257 }
258 }
259}
260
261struct IcebergCompactionManagerInner {
262 pub sink_schedules: HashMap<SinkId, CompactionTrack>,
263}
264
265#[derive(Debug, Clone)]
266pub struct IcebergCompactionScheduleStatus {
267 pub sink_id: SinkId,
268 pub task_type: String,
269 pub trigger_interval_sec: u64,
270 pub trigger_snapshot_count: usize,
271 pub schedule_state: String,
272 pub next_compaction_after_sec: Option<u64>,
273 pub pending_snapshot_count: Option<usize>,
274 pub is_triggerable: bool,
275}
276
277pub struct IcebergCompactionManager {
278 pub env: MetaSrvEnv,
279 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
280
281 metadata_manager: MetadataManager,
282 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
283
284 compactor_streams_change_tx: CompactorChangeTx,
285
286 pub metrics: Arc<MetaMetrics>,
287}
288
289impl IcebergCompactionManager {
290 pub fn build(
291 env: MetaSrvEnv,
292 metadata_manager: MetadataManager,
293 iceberg_compactor_manager: IcebergCompactorManagerRef,
294 metrics: Arc<MetaMetrics>,
295 ) -> (Arc<Self>, CompactorChangeRx) {
296 let (compactor_streams_change_tx, compactor_streams_change_rx) =
297 tokio::sync::mpsc::unbounded_channel();
298 (
299 Arc::new(Self {
300 env,
301 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
302 sink_schedules: HashMap::default(),
303 })),
304 metadata_manager,
305 iceberg_compactor_manager,
306 compactor_streams_change_tx,
307 metrics,
308 }),
309 compactor_streams_change_rx,
310 )
311 }
312
313 pub fn compaction_stat_loop(
314 manager: Arc<Self>,
315 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
316 ) -> (JoinHandle<()>, Sender<()>) {
317 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
318 let join_handle = tokio::spawn(async move {
319 loop {
320 tokio::select! {
321 Some(stat) = rx.recv() => {
322 manager.update_iceberg_commit_info(stat).await;
323 },
324 _ = &mut shutdown_rx => {
325 tracing::info!("Iceberg compaction manager is stopped");
326 return;
327 }
328 }
329 }
330 });
331
332 (join_handle, shutdown_tx)
333 }
334
335 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
336 let IcebergSinkCompactionUpdate {
337 sink_id,
338 compaction_interval,
339 force_compaction,
340 } = msg;
341
342 let track_exists = {
344 let guard = self.inner.read();
345 guard.sink_schedules.contains_key(&sink_id)
346 };
347
348 if !track_exists {
350 let iceberg_config = self.load_iceberg_config(sink_id).await;
352
353 let new_track = match iceberg_config {
354 Ok(config) => {
355 match self.create_compaction_track(sink_id, &config) {
357 Ok(track) => track,
358 Err(e) => {
359 tracing::error!(
360 error = ?e.as_report(),
361 "Failed to create compaction track from config for sink {}, using default Full track",
362 sink_id
363 );
364 CompactionTrack {
366 task_type: TaskType::Full,
367 trigger_interval_sec: compaction_interval,
368 trigger_snapshot_count: 10,
369 state: CompactionTrackState::Idle {
370 next_compaction_time: Instant::now()
371 + std::time::Duration::from_secs(compaction_interval),
372 },
373 }
374 }
375 }
376 }
377 Err(e) => {
378 tracing::error!(
379 error = ?e.as_report(),
380 "Failed to load iceberg config for sink {}, using default Full track",
381 sink_id
382 );
383 CompactionTrack {
385 task_type: TaskType::Full,
386 trigger_interval_sec: compaction_interval,
387 trigger_snapshot_count: 10,
388 state: CompactionTrackState::Idle {
389 next_compaction_time: Instant::now()
390 + std::time::Duration::from_secs(compaction_interval),
391 },
392 }
393 }
394 };
395
396 let mut guard = self.inner.write();
397 guard.sink_schedules.insert(sink_id, new_track);
398 }
399
400 let mut guard = self.inner.write();
402 if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
403 if force_compaction {
405 if let CompactionTrackState::Idle {
406 next_compaction_time,
407 } = &mut track.state
408 {
409 *next_compaction_time = Instant::now();
410 }
411 } else {
413 track.update_interval(compaction_interval, Instant::now());
415 }
416 } else {
417 tracing::error!(
418 "Failed to find compaction track for sink {} during update; configuration changes not applied.",
419 sink_id
420 );
421 }
422 }
423
424 fn create_compaction_track(
426 &self,
427 _sink_id: SinkId,
428 iceberg_config: &IcebergConfig,
429 ) -> MetaResult<CompactionTrack> {
430 let trigger_interval_sec = iceberg_config.compaction_interval_sec();
431 let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
432
433 let task_type =
435 if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
436 {
437 TaskType::Full
438 } else {
439 match iceberg_config.compaction_type() {
441 CompactionType::Full => TaskType::Full,
442 CompactionType::SmallFiles => TaskType::SmallFiles,
443 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
444 }
445 };
446
447 Ok(CompactionTrack {
448 task_type,
449 trigger_interval_sec,
450 trigger_snapshot_count,
451 state: CompactionTrackState::Idle {
452 next_compaction_time: Instant::now()
453 + std::time::Duration::from_secs(trigger_interval_sec),
454 },
455 })
456 }
457
458 pub async fn get_top_n_iceberg_commit_sink_ids(
462 &self,
463 n: usize,
464 ) -> Vec<IcebergCompactionHandle> {
465 let now = Instant::now();
466
467 let sink_ids: Vec<SinkId> = {
469 let guard = self.inner.read();
470 guard.sink_schedules.keys().cloned().collect()
471 };
472
473 let snapshot_count_futures = sink_ids
475 .iter()
476 .map(|sink_id| async move {
477 let count = self.get_pending_snapshot_count(*sink_id).await?;
478 Some((*sink_id, count))
479 })
480 .collect::<Vec<_>>();
481
482 let snapshot_counts: HashMap<SinkId, usize> =
483 futures::future::join_all(snapshot_count_futures)
484 .await
485 .into_iter()
486 .flatten()
487 .collect();
488
489 let mut guard = self.inner.write();
490
491 let mut candidates = Vec::new();
493 for (sink_id, track) in &guard.sink_schedules {
494 let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
496 continue;
497 };
498 if track.should_trigger(now, snapshot_count) {
499 if let CompactionTrackState::Idle {
501 next_compaction_time,
502 } = track.state
503 {
504 candidates.push((*sink_id, track.task_type, next_compaction_time));
505 }
506 }
507 }
508
509 candidates.sort_by(|a, b| a.2.cmp(&b.2));
511
512 candidates
514 .into_iter()
515 .take(n)
516 .filter_map(|(sink_id, task_type, _)| {
517 let track = guard.sink_schedules.get_mut(&sink_id)?;
518
519 let track_snapshot = track.start_processing();
520
521 Some(IcebergCompactionHandle::new(
522 sink_id,
523 task_type,
524 self.inner.clone(),
525 self.metadata_manager.clone(),
526 track_snapshot,
527 ))
528 })
529 .collect()
530 }
531
532 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
533 let mut guard = self.inner.write();
534 guard.sink_schedules.remove(&sink_id);
535 }
536
537 pub async fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
538 let now = Instant::now();
539 let schedules = {
540 let guard = self.inner.read();
541 guard
542 .sink_schedules
543 .iter()
544 .map(|(&sink_id, track)| (sink_id, track.clone()))
545 .collect_vec()
546 };
547
548 let mut statuses =
549 futures::future::join_all(schedules.into_iter().map(|(sink_id, track)| async move {
550 let pending_snapshot_count = self.get_pending_snapshot_count(sink_id).await;
551 let next_compaction_after_sec = match &track.state {
552 CompactionTrackState::Idle {
553 next_compaction_time,
554 } => Some(
555 next_compaction_time
556 .saturating_duration_since(now)
557 .as_secs(),
558 ),
559 CompactionTrackState::Processing => None,
560 };
561 let is_triggerable = pending_snapshot_count
562 .map(|snapshot_count| track.should_trigger(now, snapshot_count))
563 .unwrap_or(false);
564
565 IcebergCompactionScheduleStatus {
566 sink_id,
567 task_type: track.task_type.as_str_name().to_ascii_lowercase(),
568 trigger_interval_sec: track.trigger_interval_sec,
569 trigger_snapshot_count: track.trigger_snapshot_count,
570 schedule_state: match track.state {
571 CompactionTrackState::Idle { .. } => "idle".to_owned(),
572 CompactionTrackState::Processing => "processing".to_owned(),
573 },
574 next_compaction_after_sec,
575 pending_snapshot_count,
576 is_triggerable,
577 }
578 }))
579 .await;
580
581 statuses.sort_by_key(|status| status.sink_id);
582 statuses
583 }
584
585 pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
586 let prost_sink_catalog = self
587 .metadata_manager
588 .catalog_controller
589 .get_sink_by_id(sink_id)
590 .await?
591 .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
592 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
593 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
594 Ok(param)
595 }
596
597 pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
598 let sink_param = self.get_sink_param(sink_id).await?;
599 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
600 Ok(iceberg_config)
601 }
602
603 pub fn add_compactor_stream(
604 &self,
605 context_id: WorkerId,
606 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
607 ) {
608 self.compactor_streams_change_tx
609 .send((context_id, req_stream))
610 .unwrap();
611 }
612
613 pub fn iceberg_compaction_event_loop(
614 iceberg_compaction_manager: Arc<Self>,
615 compactor_streams_change_rx: UnboundedReceiver<(
616 WorkerId,
617 Streaming<SubscribeIcebergCompactionEventRequest>,
618 )>,
619 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
620 let mut join_handle_vec = Vec::default();
621
622 let iceberg_compaction_event_handler =
623 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
624
625 let iceberg_compaction_event_dispatcher =
626 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
627
628 let event_loop = IcebergCompactionEventLoop::new(
629 iceberg_compaction_event_dispatcher,
630 iceberg_compaction_manager.metrics.clone(),
631 compactor_streams_change_rx,
632 );
633
634 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
635 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
636
637 join_handle_vec
638 }
639
640 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
644 assert!(
645 interval_sec > 0,
646 "Iceberg GC interval must be greater than 0"
647 );
648 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
649 let join_handle = tokio::spawn(async move {
650 tracing::info!(
651 interval_sec = interval_sec,
652 "Starting Iceberg GC loop with configurable interval"
653 );
654 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
655
656 loop {
657 tokio::select! {
658 _ = interval.tick() => {
659 if let Err(e) = manager.perform_gc_operations().await {
660 tracing::error!(error = ?e.as_report(), "GC operations failed");
661 }
662 },
663 _ = &mut shutdown_rx => {
664 tracing::info!("Iceberg GC loop is stopped");
665 return;
666 }
667 }
668 }
669 });
670
671 (join_handle, shutdown_tx)
672 }
673
674 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
677 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
678
679 let iceberg_config = self.load_iceberg_config(sink_id).await?;
681 let initial_table = iceberg_config.load_table().await?;
682 let initial_snapshot_id = initial_table
683 .metadata()
684 .current_snapshot()
685 .map(|s| s.snapshot_id())
686 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
688
689 let compactor = self
691 .iceberg_compactor_manager
692 .next_compactor()
693 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
694
695 let task_id = self
697 .env
698 .hummock_seq
699 .next_interval("compaction_task", 1)
700 .await?;
701
702 let sink_param = self.get_sink_param(sink_id).await?;
703
704 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
705 task_id,
706 props: sink_param.properties,
707 task_type: TaskType::Full as i32, }))?;
709
710 tracing::info!(
711 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
712 sink_id,
713 task_id
714 );
715
716 self.wait_for_compaction_completion(
717 &sink_id,
718 iceberg_config,
719 initial_snapshot_id,
720 initial_timestamp,
721 task_id,
722 )
723 .await?;
724
725 Ok(task_id)
726 }
727
728 async fn wait_for_compaction_completion(
729 &self,
730 sink_id: &SinkId,
731 iceberg_config: IcebergConfig,
732 initial_snapshot_id: i64,
733 initial_timestamp: i64,
734 task_id: u64,
735 ) -> MetaResult<()> {
736 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
737 const MAX_POLL_INTERVAL_SECS: u64 = 60;
738 const MAX_WAIT_TIME_SECS: u64 = 1800;
739 const BACKOFF_MULTIPLIER: f64 = 1.5;
740
741 let mut elapsed_time = 0;
742 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
743
744 let cow =
745 should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
746
747 while elapsed_time < MAX_WAIT_TIME_SECS {
748 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
749 tokio::time::sleep(poll_interval).await;
750 elapsed_time += current_interval_secs;
751
752 tracing::info!(
753 "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
754 sink_id,
755 task_id,
756 elapsed_time,
757 current_interval_secs
758 );
759
760 let current_table = iceberg_config.load_table().await?;
761
762 let metadata = current_table.metadata();
763 let new_snapshots: Vec<_> = metadata
764 .snapshots()
765 .filter(|snapshot| {
766 let snapshot_timestamp = snapshot.timestamp_ms();
767 let snapshot_id = snapshot.snapshot_id();
768 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
769 })
770 .collect();
771
772 for snapshot in new_snapshots {
773 let summary = snapshot.summary();
774 if cow {
775 if matches!(summary.operation, Operation::Overwrite) {
776 tracing::info!(
777 "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
778 sink_id,
779 task_id
780 );
781 return Ok(());
782 }
783 } else if matches!(summary.operation, Operation::Replace) {
784 tracing::info!(
785 "Iceberg compaction completed for sink {} task_id={} with Replace operation",
786 sink_id,
787 task_id
788 );
789 return Ok(());
790 }
791 }
792
793 current_interval_secs = std::cmp::min(
794 MAX_POLL_INTERVAL_SECS,
795 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
796 );
797 }
798
799 Err(anyhow!(
800 "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
801 MAX_WAIT_TIME_SECS,
802 sink_id,
803 task_id
804 )
805 .into())
806 }
807
808 async fn perform_gc_operations(&self) -> MetaResult<()> {
809 let sink_ids = {
810 let guard = self.inner.read();
811 guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
812 };
813
814 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
815
816 for sink_id in sink_ids {
817 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
818 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
819 }
820 }
821
822 tracing::info!("GC operations completed");
823 Ok(())
824 }
825
826 async fn get_pending_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
833 let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
834 let is_cow_mode =
835 should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
836 let catalog = iceberg_config.create_catalog().await.ok()?;
837 let table_name = iceberg_config.full_table_name().ok()?;
838 let table = catalog.load_table(&table_name).await.ok()?;
839 let metadata = table.metadata();
840
841 if is_cow_mode {
842 let last_compaction_timestamp = metadata
848 .current_snapshot()
849 .map(|s| s.timestamp_ms())
850 .unwrap_or(0); let branch = commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
854 let current_snapshot = metadata.snapshot_for_ref(&branch)?;
855
856 let mut count = 0;
857 let mut snapshot_id = Some(current_snapshot.snapshot_id());
858
859 while let Some(id) = snapshot_id {
860 let snapshot = metadata.snapshot_by_id(id)?;
861 if snapshot.timestamp_ms() > last_compaction_timestamp {
862 count += 1;
863 snapshot_id = snapshot.parent_snapshot_id();
864 } else {
865 break;
867 }
868 }
869
870 Some(count)
871 } else {
872 let mut snapshots = metadata.snapshots().collect_vec();
874 if snapshots.is_empty() {
875 return Some(0);
876 }
877
878 snapshots.sort_by_key(|s| s.timestamp_ms());
879
880 let last_replace_index = snapshots
881 .iter()
882 .rposition(|s| matches!(s.summary().operation, Operation::Replace));
883
884 let count = match last_replace_index {
885 Some(index) => snapshots.len() - index - 1,
886 None => snapshots.len(),
887 };
888
889 Some(count)
890 }
891 }
892
893 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
894 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
896
897 let iceberg_config = self.load_iceberg_config(sink_id).await?;
898 if !iceberg_config.enable_snapshot_expiration {
899 return Ok(());
900 }
901
902 let catalog = iceberg_config.create_catalog().await?;
903 let mut table = catalog
904 .load_table(&iceberg_config.full_table_name()?)
905 .await
906 .map_err(|e| SinkError::Iceberg(e.into()))?;
907
908 let metadata = table.metadata();
909 let mut snapshots = metadata.snapshots().collect_vec();
910 snapshots.sort_by_key(|s| s.timestamp_ms());
911
912 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
913
914 let snapshot_expiration_timestamp_ms =
915 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
916 Some(timestamp) => timestamp,
917 None => default_snapshot_expiration_timestamp_ms,
918 };
919
920 if snapshots.is_empty()
921 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
922 {
923 return Ok(());
925 }
926
927 tracing::info!(
928 catalog_name = iceberg_config.catalog_name(),
929 table_name = iceberg_config.full_table_name()?.to_string(),
930 %sink_id,
931 snapshots_len = snapshots.len(),
932 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
933 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
934 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
935 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
936 "try trigger snapshots expiration",
937 );
938
939 let txn = Transaction::new(&table);
940
941 let mut expired_snapshots = txn
942 .expire_snapshot()
943 .expire_older_than(snapshot_expiration_timestamp_ms)
944 .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
945 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
946
947 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
948 expired_snapshots = expired_snapshots.retain_last(retain_last);
949 }
950
951 let before_metadata = table.metadata_ref();
952 let tx = expired_snapshots
953 .apply(txn)
954 .map_err(|e| SinkError::Iceberg(e.into()))?;
955 table = tx
956 .commit(catalog.as_ref())
957 .await
958 .map_err(|e| SinkError::Iceberg(e.into()))?;
959
960 if iceberg_config.snapshot_expiration_clear_expired_files {
961 table
962 .cleanup_expired_files(&before_metadata)
963 .await
964 .map_err(|e| SinkError::Iceberg(e.into()))?;
965 }
966
967 tracing::info!(
968 catalog_name = iceberg_config.catalog_name(),
969 table_name = iceberg_config.full_table_name()?.to_string(),
970 %sink_id,
971 "Expired snapshots for iceberg table",
972 );
973
974 Ok(())
975 }
976}