1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28 CompactionType, IcebergConfig, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45 IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53 UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56 UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61 next_compaction_time: Option<Instant>,
62}
63
64#[derive(Debug, Clone)]
66enum CompactionTrackState {
67 Idle { next_compaction_time: Instant },
69 Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75 task_type: TaskType,
76 trigger_interval_sec: u64,
77 trigger_snapshot_count: usize,
79 state: CompactionTrackState,
80}
81
82impl CompactionTrack {
83 fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
84 let next_compaction_time = match &self.state {
86 CompactionTrackState::Idle {
87 next_compaction_time,
88 } => *next_compaction_time,
89 CompactionTrackState::Processing => return false,
90 };
91
92 let time_ready = now >= next_compaction_time;
94 let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
95
96 time_ready && snapshot_ready
97 }
98
99 fn start_processing(&mut self) -> CompactionTrackSnapshot {
101 match &self.state {
102 CompactionTrackState::Idle {
103 next_compaction_time,
104 } => {
105 let snapshot = CompactionTrackSnapshot {
106 next_compaction_time: Some(*next_compaction_time),
107 };
108 self.state = CompactionTrackState::Processing;
109 snapshot
110 }
111 CompactionTrackState::Processing => {
112 unreachable!("Cannot start processing when already processing")
113 }
114 }
115 }
116
117 fn complete_processing(&mut self) {
119 match &self.state {
120 CompactionTrackState::Processing => {
121 self.state = CompactionTrackState::Idle {
122 next_compaction_time: Instant::now()
123 + std::time::Duration::from_secs(self.trigger_interval_sec),
124 };
125 }
126 CompactionTrackState::Idle { .. } => {
127 unreachable!("Cannot complete processing when not processing")
128 }
129 }
130 }
131
132 fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
134 self.state = CompactionTrackState::Idle {
135 next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
136 };
137 }
138
139 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
140 if self.trigger_interval_sec == new_interval_sec {
141 return;
142 }
143
144 self.trigger_interval_sec = new_interval_sec;
145
146 match &mut self.state {
148 CompactionTrackState::Idle {
149 next_compaction_time,
150 } => {
151 *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
152 }
153 CompactionTrackState::Processing => {
154 }
156 }
157 }
158}
159
160pub struct IcebergCompactionHandle {
162 sink_id: SinkId,
163 task_type: TaskType,
164 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
165 metadata_manager: MetadataManager,
166 handle_success: bool,
167
168 track_snapshot: CompactionTrackSnapshot,
170}
171
172impl IcebergCompactionHandle {
173 fn new(
174 sink_id: SinkId,
175 task_type: TaskType,
176 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
177 metadata_manager: MetadataManager,
178 track_snapshot: CompactionTrackSnapshot,
179 ) -> Self {
180 Self {
181 sink_id,
182 task_type,
183 inner,
184 metadata_manager,
185 handle_success: false,
186 track_snapshot,
187 }
188 }
189
190 pub async fn send_compact_task(
191 mut self,
192 compactor: Arc<IcebergCompactor>,
193 task_id: u64,
194 ) -> MetaResult<()> {
195 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
196 let Some(prost_sink_catalog) = self
197 .metadata_manager
198 .catalog_controller
199 .get_sink_by_id(self.sink_id)
200 .await?
201 else {
202 tracing::warn!("Sink not found: {}", self.sink_id);
204 return Ok(());
205 };
206 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
207 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
208
209 let result =
210 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
211 task_id,
212 props: param.properties,
213 task_type: self.task_type as i32,
214 }));
215
216 if result.is_ok() {
217 self.handle_success = true;
218 }
219
220 result
221 }
222
223 pub fn sink_id(&self) -> SinkId {
224 self.sink_id
225 }
226}
227
228impl Drop for IcebergCompactionHandle {
229 fn drop(&mut self) {
230 let mut guard = self.inner.write();
231 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
232 if track.task_type == self.task_type {
234 if self.handle_success {
235 track.complete_processing();
236 } else {
237 track.restore_from_snapshot(self.track_snapshot.clone());
240 }
241 }
242 }
243 }
244}
245
246struct IcebergCompactionManagerInner {
247 pub sink_schedules: HashMap<SinkId, CompactionTrack>,
248}
249
250pub struct IcebergCompactionManager {
251 pub env: MetaSrvEnv,
252 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
253
254 metadata_manager: MetadataManager,
255 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
256
257 compactor_streams_change_tx: CompactorChangeTx,
258
259 pub metrics: Arc<MetaMetrics>,
260}
261
262impl IcebergCompactionManager {
263 pub fn build(
264 env: MetaSrvEnv,
265 metadata_manager: MetadataManager,
266 iceberg_compactor_manager: IcebergCompactorManagerRef,
267 metrics: Arc<MetaMetrics>,
268 ) -> (Arc<Self>, CompactorChangeRx) {
269 let (compactor_streams_change_tx, compactor_streams_change_rx) =
270 tokio::sync::mpsc::unbounded_channel();
271 (
272 Arc::new(Self {
273 env,
274 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
275 sink_schedules: HashMap::default(),
276 })),
277 metadata_manager,
278 iceberg_compactor_manager,
279 compactor_streams_change_tx,
280 metrics,
281 }),
282 compactor_streams_change_rx,
283 )
284 }
285
286 pub fn compaction_stat_loop(
287 manager: Arc<Self>,
288 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
289 ) -> (JoinHandle<()>, Sender<()>) {
290 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
291 let join_handle = tokio::spawn(async move {
292 loop {
293 tokio::select! {
294 Some(stat) = rx.recv() => {
295 manager.update_iceberg_commit_info(stat).await;
296 },
297 _ = &mut shutdown_rx => {
298 tracing::info!("Iceberg compaction manager is stopped");
299 return;
300 }
301 }
302 }
303 });
304
305 (join_handle, shutdown_tx)
306 }
307
308 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
309 let IcebergSinkCompactionUpdate {
310 sink_id,
311 compaction_interval,
312 force_compaction,
313 } = msg;
314
315 let track_exists = {
317 let guard = self.inner.read();
318 guard.sink_schedules.contains_key(&sink_id)
319 };
320
321 if !track_exists {
323 let iceberg_config = self.load_iceberg_config(sink_id).await;
325
326 let new_track = match iceberg_config {
327 Ok(config) => {
328 match self.create_compaction_track(sink_id, &config) {
330 Ok(track) => track,
331 Err(e) => {
332 tracing::error!(
333 error = ?e.as_report(),
334 "Failed to create compaction track from config for sink {}, using default Full track",
335 sink_id
336 );
337 CompactionTrack {
339 task_type: TaskType::Full,
340 trigger_interval_sec: compaction_interval,
341 trigger_snapshot_count: 10,
342 state: CompactionTrackState::Idle {
343 next_compaction_time: Instant::now()
344 + std::time::Duration::from_secs(compaction_interval),
345 },
346 }
347 }
348 }
349 }
350 Err(e) => {
351 tracing::error!(
352 error = ?e.as_report(),
353 "Failed to load iceberg config for sink {}, using default Full track",
354 sink_id
355 );
356 CompactionTrack {
358 task_type: TaskType::Full,
359 trigger_interval_sec: compaction_interval,
360 trigger_snapshot_count: 10,
361 state: CompactionTrackState::Idle {
362 next_compaction_time: Instant::now()
363 + std::time::Duration::from_secs(compaction_interval),
364 },
365 }
366 }
367 };
368
369 let mut guard = self.inner.write();
370 guard.sink_schedules.insert(sink_id, new_track);
371 }
372
373 let mut guard = self.inner.write();
375 if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
376 if force_compaction {
378 if let CompactionTrackState::Idle {
379 next_compaction_time,
380 } = &mut track.state
381 {
382 *next_compaction_time = Instant::now();
383 }
384 } else {
386 track.update_interval(compaction_interval, Instant::now());
388 }
389 } else {
390 tracing::error!(
391 "Failed to find compaction track for sink {} during update; configuration changes not applied.",
392 sink_id
393 );
394 }
395 }
396
397 fn create_compaction_track(
399 &self,
400 _sink_id: SinkId,
401 iceberg_config: &IcebergConfig,
402 ) -> MetaResult<CompactionTrack> {
403 let trigger_interval_sec = iceberg_config.compaction_interval_sec();
404 let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
405
406 let task_type = if should_enable_iceberg_cow(
408 iceberg_config.r#type.as_str(),
409 iceberg_config.write_mode.as_str(),
410 ) {
411 TaskType::Full
412 } else {
413 match iceberg_config.compaction_type() {
415 CompactionType::Full => TaskType::Full,
416 CompactionType::SmallFiles => TaskType::SmallFiles,
417 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
418 }
419 };
420
421 Ok(CompactionTrack {
422 task_type,
423 trigger_interval_sec,
424 trigger_snapshot_count,
425 state: CompactionTrackState::Idle {
426 next_compaction_time: Instant::now()
427 + std::time::Duration::from_secs(trigger_interval_sec),
428 },
429 })
430 }
431
432 pub async fn get_top_n_iceberg_commit_sink_ids(
436 &self,
437 n: usize,
438 ) -> Vec<IcebergCompactionHandle> {
439 let now = Instant::now();
440
441 let sink_ids: Vec<SinkId> = {
443 let guard = self.inner.read();
444 guard.sink_schedules.keys().cloned().collect()
445 };
446
447 let snapshot_count_futures = sink_ids
449 .iter()
450 .map(|sink_id| async move {
451 let count = self.get_snapshot_count(*sink_id).await?;
452 Some((*sink_id, count))
453 })
454 .collect::<Vec<_>>();
455
456 let snapshot_counts: HashMap<SinkId, usize> =
457 futures::future::join_all(snapshot_count_futures)
458 .await
459 .into_iter()
460 .flatten()
461 .collect();
462
463 let mut guard = self.inner.write();
464
465 let mut candidates = Vec::new();
467 for (sink_id, track) in &guard.sink_schedules {
468 let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
470 continue;
471 };
472 if track.should_trigger(now, snapshot_count) {
473 if let CompactionTrackState::Idle {
475 next_compaction_time,
476 } = track.state
477 {
478 candidates.push((*sink_id, track.task_type, next_compaction_time));
479 }
480 }
481 }
482
483 candidates.sort_by(|a, b| a.2.cmp(&b.2));
485
486 candidates
488 .into_iter()
489 .take(n)
490 .filter_map(|(sink_id, task_type, _)| {
491 let track = guard.sink_schedules.get_mut(&sink_id)?;
492
493 let track_snapshot = track.start_processing();
494
495 Some(IcebergCompactionHandle::new(
496 sink_id,
497 task_type,
498 self.inner.clone(),
499 self.metadata_manager.clone(),
500 track_snapshot,
501 ))
502 })
503 .collect()
504 }
505
506 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
507 let mut guard = self.inner.write();
508 guard.sink_schedules.remove(&sink_id);
509 }
510
511 pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
512 let prost_sink_catalog = self
513 .metadata_manager
514 .catalog_controller
515 .get_sink_by_id(sink_id)
516 .await?
517 .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
518 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
519 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
520 Ok(param)
521 }
522
523 pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
524 let sink_param = self.get_sink_param(sink_id).await?;
525 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
526 Ok(iceberg_config)
527 }
528
529 pub fn add_compactor_stream(
530 &self,
531 context_id: WorkerId,
532 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
533 ) {
534 self.compactor_streams_change_tx
535 .send((context_id, req_stream))
536 .unwrap();
537 }
538
539 pub fn iceberg_compaction_event_loop(
540 iceberg_compaction_manager: Arc<Self>,
541 compactor_streams_change_rx: UnboundedReceiver<(
542 WorkerId,
543 Streaming<SubscribeIcebergCompactionEventRequest>,
544 )>,
545 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
546 let mut join_handle_vec = Vec::default();
547
548 let iceberg_compaction_event_handler =
549 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
550
551 let iceberg_compaction_event_dispatcher =
552 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
553
554 let event_loop = IcebergCompactionEventLoop::new(
555 iceberg_compaction_event_dispatcher,
556 iceberg_compaction_manager.metrics.clone(),
557 compactor_streams_change_rx,
558 );
559
560 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
561 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
562
563 join_handle_vec
564 }
565
566 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
570 assert!(
571 interval_sec > 0,
572 "Iceberg GC interval must be greater than 0"
573 );
574 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
575 let join_handle = tokio::spawn(async move {
576 tracing::info!(
577 interval_sec = interval_sec,
578 "Starting Iceberg GC loop with configurable interval"
579 );
580 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
581
582 loop {
583 tokio::select! {
584 _ = interval.tick() => {
585 if let Err(e) = manager.perform_gc_operations().await {
586 tracing::error!(error = ?e.as_report(), "GC operations failed");
587 }
588 },
589 _ = &mut shutdown_rx => {
590 tracing::info!("Iceberg GC loop is stopped");
591 return;
592 }
593 }
594 }
595 });
596
597 (join_handle, shutdown_tx)
598 }
599
600 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
603 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
604
605 let iceberg_config = self.load_iceberg_config(sink_id).await?;
607 let initial_table = iceberg_config.load_table().await?;
608 let initial_snapshot_id = initial_table
609 .metadata()
610 .current_snapshot()
611 .map(|s| s.snapshot_id())
612 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
614
615 let compactor = self
617 .iceberg_compactor_manager
618 .next_compactor()
619 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
620
621 let task_id = self
623 .env
624 .hummock_seq
625 .next_interval("compaction_task", 1)
626 .await?;
627
628 let sink_param = self.get_sink_param(sink_id).await?;
629
630 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
631 task_id,
632 props: sink_param.properties,
633 task_type: TaskType::Full as i32, }))?;
635
636 tracing::info!(
637 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
638 sink_id,
639 task_id
640 );
641
642 self.wait_for_compaction_completion(
643 &sink_id,
644 iceberg_config,
645 initial_snapshot_id,
646 initial_timestamp,
647 task_id,
648 )
649 .await?;
650
651 Ok(task_id)
652 }
653
654 async fn wait_for_compaction_completion(
655 &self,
656 sink_id: &SinkId,
657 iceberg_config: IcebergConfig,
658 initial_snapshot_id: i64,
659 initial_timestamp: i64,
660 task_id: u64,
661 ) -> MetaResult<()> {
662 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
663 const MAX_POLL_INTERVAL_SECS: u64 = 60;
664 const MAX_WAIT_TIME_SECS: u64 = 1800;
665 const BACKOFF_MULTIPLIER: f64 = 1.5;
666
667 let mut elapsed_time = 0;
668 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
669
670 let cow = should_enable_iceberg_cow(
671 iceberg_config.r#type.as_str(),
672 iceberg_config.write_mode.as_str(),
673 );
674
675 while elapsed_time < MAX_WAIT_TIME_SECS {
676 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
677 tokio::time::sleep(poll_interval).await;
678 elapsed_time += current_interval_secs;
679
680 tracing::info!(
681 "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
682 sink_id,
683 task_id,
684 elapsed_time,
685 current_interval_secs
686 );
687
688 let current_table = iceberg_config.load_table().await?;
689
690 let metadata = current_table.metadata();
691 let new_snapshots: Vec<_> = metadata
692 .snapshots()
693 .filter(|snapshot| {
694 let snapshot_timestamp = snapshot.timestamp_ms();
695 let snapshot_id = snapshot.snapshot_id();
696 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
697 })
698 .collect();
699
700 for snapshot in new_snapshots {
701 let summary = snapshot.summary();
702 if cow {
703 if matches!(summary.operation, Operation::Overwrite) {
704 tracing::info!(
705 "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
706 sink_id,
707 task_id
708 );
709 return Ok(());
710 }
711 } else if matches!(summary.operation, Operation::Replace) {
712 tracing::info!(
713 "Iceberg compaction completed for sink {} task_id={} with Replace operation",
714 sink_id,
715 task_id
716 );
717 return Ok(());
718 }
719 }
720
721 current_interval_secs = std::cmp::min(
722 MAX_POLL_INTERVAL_SECS,
723 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
724 );
725 }
726
727 Err(anyhow!(
728 "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
729 MAX_WAIT_TIME_SECS,
730 sink_id,
731 task_id
732 )
733 .into())
734 }
735
736 async fn perform_gc_operations(&self) -> MetaResult<()> {
737 let sink_ids = {
738 let guard = self.inner.read();
739 guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
740 };
741
742 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
743
744 for sink_id in sink_ids {
745 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
746 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
747 }
748 }
749
750 tracing::info!("GC operations completed");
751 Ok(())
752 }
753
754 async fn get_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
757 let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
758 let catalog = iceberg_config.create_catalog().await.ok()?;
759 let table_name = iceberg_config.full_table_name().ok()?;
760 let table = catalog.load_table(&table_name).await.ok()?;
761
762 let metadata = table.metadata();
763 let mut snapshots = metadata.snapshots().collect_vec();
764
765 if snapshots.is_empty() {
766 return Some(0);
767 }
768
769 snapshots.sort_by_key(|s| s.timestamp_ms());
771
772 let last_replace_index = snapshots
774 .iter()
775 .rposition(|snapshot| matches!(snapshot.summary().operation, Operation::Replace));
776
777 let snapshot_count = match last_replace_index {
779 Some(index) => snapshots.len() - index - 1,
780 None => snapshots.len(), };
782
783 Some(snapshot_count)
784 }
785
786 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
787 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
789
790 let iceberg_config = self.load_iceberg_config(sink_id).await?;
791 if !iceberg_config.enable_snapshot_expiration {
792 return Ok(());
793 }
794
795 let catalog = iceberg_config.create_catalog().await?;
796 let table = catalog
797 .load_table(&iceberg_config.full_table_name()?)
798 .await
799 .map_err(|e| SinkError::Iceberg(e.into()))?;
800
801 let metadata = table.metadata();
802 let mut snapshots = metadata.snapshots().collect_vec();
803 snapshots.sort_by_key(|s| s.timestamp_ms());
804
805 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
806
807 let snapshot_expiration_timestamp_ms =
808 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
809 Some(timestamp) => timestamp,
810 None => default_snapshot_expiration_timestamp_ms,
811 };
812
813 if snapshots.is_empty()
814 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
815 {
816 return Ok(());
818 }
819
820 tracing::info!(
821 catalog_name = iceberg_config.catalog_name(),
822 table_name = iceberg_config.full_table_name()?.to_string(),
823 %sink_id,
824 snapshots_len = snapshots.len(),
825 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
826 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
827 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
828 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
829 "try trigger snapshots expiration",
830 );
831
832 let tx = Transaction::new(&table);
833
834 let mut expired_snapshots = tx.expire_snapshot();
835
836 expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
837
838 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
839 expired_snapshots = expired_snapshots.retain_last(retain_last);
840 }
841
842 expired_snapshots = expired_snapshots
843 .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
844
845 expired_snapshots = expired_snapshots
846 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
847
848 let tx = expired_snapshots
849 .apply()
850 .await
851 .map_err(|e| SinkError::Iceberg(e.into()))?;
852
853 tx.commit(catalog.as_ref())
854 .await
855 .map_err(|e| SinkError::Iceberg(e.into()))?;
856
857 tracing::info!(
858 catalog_name = iceberg_config.catalog_name(),
859 table_name = iceberg_config.full_table_name()?.to_string(),
860 %sink_id,
861 "Expired snapshots for iceberg table",
862 );
863
864 Ok(())
865 }
866}