1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28 CompactionType, IcebergConfig, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45 IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53 UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56 UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61 next_compaction_time: Option<Instant>,
62}
63
64#[derive(Debug, Clone)]
66enum CompactionTrackState {
67 Idle { next_compaction_time: Instant },
69 Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75 task_type: TaskType,
76 trigger_interval_sec: u64,
77 trigger_snapshot_count: usize,
79 state: CompactionTrackState,
80}
81
82impl CompactionTrack {
83 fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
84 let next_compaction_time = match &self.state {
86 CompactionTrackState::Idle {
87 next_compaction_time,
88 } => *next_compaction_time,
89 CompactionTrackState::Processing => return false,
90 };
91
92 let time_ready = now >= next_compaction_time;
94 let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
95
96 time_ready && snapshot_ready
97 }
98
99 fn start_processing(&mut self) -> CompactionTrackSnapshot {
101 match &self.state {
102 CompactionTrackState::Idle {
103 next_compaction_time,
104 } => {
105 let snapshot = CompactionTrackSnapshot {
106 next_compaction_time: Some(*next_compaction_time),
107 };
108 self.state = CompactionTrackState::Processing;
109 snapshot
110 }
111 CompactionTrackState::Processing => {
112 unreachable!("Cannot start processing when already processing")
113 }
114 }
115 }
116
117 fn complete_processing(&mut self) {
119 match &self.state {
120 CompactionTrackState::Processing => {
121 self.state = CompactionTrackState::Idle {
122 next_compaction_time: Instant::now()
123 + std::time::Duration::from_secs(self.trigger_interval_sec),
124 };
125 }
126 CompactionTrackState::Idle { .. } => {
127 unreachable!("Cannot complete processing when not processing")
128 }
129 }
130 }
131
132 fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
134 self.state = CompactionTrackState::Idle {
135 next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
136 };
137 }
138
139 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
140 if self.trigger_interval_sec == new_interval_sec {
141 return;
142 }
143
144 self.trigger_interval_sec = new_interval_sec;
145
146 match &mut self.state {
148 CompactionTrackState::Idle {
149 next_compaction_time,
150 } => {
151 *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
152 }
153 CompactionTrackState::Processing => {
154 }
156 }
157 }
158}
159
160pub struct IcebergCompactionHandle {
162 sink_id: SinkId,
163 task_type: TaskType,
164 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
165 metadata_manager: MetadataManager,
166 handle_success: bool,
167
168 track_snapshot: CompactionTrackSnapshot,
170}
171
172impl IcebergCompactionHandle {
173 fn new(
174 sink_id: SinkId,
175 task_type: TaskType,
176 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
177 metadata_manager: MetadataManager,
178 track_snapshot: CompactionTrackSnapshot,
179 ) -> Self {
180 Self {
181 sink_id,
182 task_type,
183 inner,
184 metadata_manager,
185 handle_success: false,
186 track_snapshot,
187 }
188 }
189
190 pub async fn send_compact_task(
191 mut self,
192 compactor: Arc<IcebergCompactor>,
193 task_id: u64,
194 ) -> MetaResult<()> {
195 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
196 let Some(prost_sink_catalog) = self
197 .metadata_manager
198 .catalog_controller
199 .get_sink_by_id(self.sink_id)
200 .await?
201 else {
202 tracing::warn!("Sink not found: {}", self.sink_id);
204 return Ok(());
205 };
206 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
207 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
208
209 let result =
210 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
211 task_id,
212 props: param.properties,
213 task_type: self.task_type as i32,
214 }));
215
216 if result.is_ok() {
217 self.handle_success = true;
218 }
219
220 result
221 }
222
223 pub fn sink_id(&self) -> SinkId {
224 self.sink_id
225 }
226}
227
228impl Drop for IcebergCompactionHandle {
229 fn drop(&mut self) {
230 let mut guard = self.inner.write();
231 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
232 if track.task_type == self.task_type {
234 if self.handle_success {
235 track.complete_processing();
236 } else {
237 track.restore_from_snapshot(self.track_snapshot.clone());
240 }
241 }
242 }
243 }
244}
245
246struct IcebergCompactionManagerInner {
247 pub sink_schedules: HashMap<SinkId, CompactionTrack>,
248}
249
250pub struct IcebergCompactionManager {
251 pub env: MetaSrvEnv,
252 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
253
254 metadata_manager: MetadataManager,
255 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
256
257 compactor_streams_change_tx: CompactorChangeTx,
258
259 pub metrics: Arc<MetaMetrics>,
260}
261
262impl IcebergCompactionManager {
263 pub fn build(
264 env: MetaSrvEnv,
265 metadata_manager: MetadataManager,
266 iceberg_compactor_manager: IcebergCompactorManagerRef,
267 metrics: Arc<MetaMetrics>,
268 ) -> (Arc<Self>, CompactorChangeRx) {
269 let (compactor_streams_change_tx, compactor_streams_change_rx) =
270 tokio::sync::mpsc::unbounded_channel();
271 (
272 Arc::new(Self {
273 env,
274 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
275 sink_schedules: HashMap::default(),
276 })),
277 metadata_manager,
278 iceberg_compactor_manager,
279 compactor_streams_change_tx,
280 metrics,
281 }),
282 compactor_streams_change_rx,
283 )
284 }
285
286 pub fn compaction_stat_loop(
287 manager: Arc<Self>,
288 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
289 ) -> (JoinHandle<()>, Sender<()>) {
290 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
291 let join_handle = tokio::spawn(async move {
292 loop {
293 tokio::select! {
294 Some(stat) = rx.recv() => {
295 manager.update_iceberg_commit_info(stat).await;
296 },
297 _ = &mut shutdown_rx => {
298 tracing::info!("Iceberg compaction manager is stopped");
299 return;
300 }
301 }
302 }
303 });
304
305 (join_handle, shutdown_tx)
306 }
307
308 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
309 let IcebergSinkCompactionUpdate {
310 sink_id,
311 compaction_interval,
312 force_compaction,
313 } = msg;
314
315 let track_exists = {
317 let guard = self.inner.read();
318 guard.sink_schedules.contains_key(&sink_id)
319 };
320
321 if !track_exists {
323 let iceberg_config = self.load_iceberg_config(sink_id).await;
325
326 let new_track = match iceberg_config {
327 Ok(config) => {
328 match self.create_compaction_track(sink_id, &config) {
330 Ok(track) => track,
331 Err(e) => {
332 tracing::error!(
333 error = ?e.as_report(),
334 "Failed to create compaction track from config for sink {}, using default Full track",
335 sink_id
336 );
337 CompactionTrack {
339 task_type: TaskType::Full,
340 trigger_interval_sec: compaction_interval,
341 trigger_snapshot_count: 10,
342 state: CompactionTrackState::Idle {
343 next_compaction_time: Instant::now()
344 + std::time::Duration::from_secs(compaction_interval),
345 },
346 }
347 }
348 }
349 }
350 Err(e) => {
351 tracing::error!(
352 error = ?e.as_report(),
353 "Failed to load iceberg config for sink {}, using default Full track",
354 sink_id
355 );
356 CompactionTrack {
358 task_type: TaskType::Full,
359 trigger_interval_sec: compaction_interval,
360 trigger_snapshot_count: 10,
361 state: CompactionTrackState::Idle {
362 next_compaction_time: Instant::now()
363 + std::time::Duration::from_secs(compaction_interval),
364 },
365 }
366 }
367 };
368
369 let mut guard = self.inner.write();
370 guard.sink_schedules.insert(sink_id, new_track);
371 }
372
373 let mut guard = self.inner.write();
375 if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
376 if force_compaction {
378 if let CompactionTrackState::Idle {
379 next_compaction_time,
380 } = &mut track.state
381 {
382 *next_compaction_time = Instant::now();
383 }
384 } else {
386 track.update_interval(compaction_interval, Instant::now());
388 }
389 } else {
390 tracing::error!(
391 "Failed to find compaction track for sink {} during update; configuration changes not applied.",
392 sink_id
393 );
394 }
395 }
396
397 fn create_compaction_track(
399 &self,
400 _sink_id: SinkId,
401 iceberg_config: &IcebergConfig,
402 ) -> MetaResult<CompactionTrack> {
403 let trigger_interval_sec = iceberg_config.compaction_interval_sec();
404 let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
405
406 let task_type =
408 if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
409 {
410 TaskType::Full
411 } else {
412 match iceberg_config.compaction_type() {
414 CompactionType::Full => TaskType::Full,
415 CompactionType::SmallFiles => TaskType::SmallFiles,
416 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
417 }
418 };
419
420 Ok(CompactionTrack {
421 task_type,
422 trigger_interval_sec,
423 trigger_snapshot_count,
424 state: CompactionTrackState::Idle {
425 next_compaction_time: Instant::now()
426 + std::time::Duration::from_secs(trigger_interval_sec),
427 },
428 })
429 }
430
431 pub async fn get_top_n_iceberg_commit_sink_ids(
435 &self,
436 n: usize,
437 ) -> Vec<IcebergCompactionHandle> {
438 let now = Instant::now();
439
440 let sink_ids: Vec<SinkId> = {
442 let guard = self.inner.read();
443 guard.sink_schedules.keys().cloned().collect()
444 };
445
446 let snapshot_count_futures = sink_ids
448 .iter()
449 .map(|sink_id| async move {
450 let count = self.get_snapshot_count(*sink_id).await?;
451 Some((*sink_id, count))
452 })
453 .collect::<Vec<_>>();
454
455 let snapshot_counts: HashMap<SinkId, usize> =
456 futures::future::join_all(snapshot_count_futures)
457 .await
458 .into_iter()
459 .flatten()
460 .collect();
461
462 let mut guard = self.inner.write();
463
464 let mut candidates = Vec::new();
466 for (sink_id, track) in &guard.sink_schedules {
467 let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
469 continue;
470 };
471 if track.should_trigger(now, snapshot_count) {
472 if let CompactionTrackState::Idle {
474 next_compaction_time,
475 } = track.state
476 {
477 candidates.push((*sink_id, track.task_type, next_compaction_time));
478 }
479 }
480 }
481
482 candidates.sort_by(|a, b| a.2.cmp(&b.2));
484
485 candidates
487 .into_iter()
488 .take(n)
489 .filter_map(|(sink_id, task_type, _)| {
490 let track = guard.sink_schedules.get_mut(&sink_id)?;
491
492 let track_snapshot = track.start_processing();
493
494 Some(IcebergCompactionHandle::new(
495 sink_id,
496 task_type,
497 self.inner.clone(),
498 self.metadata_manager.clone(),
499 track_snapshot,
500 ))
501 })
502 .collect()
503 }
504
505 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
506 let mut guard = self.inner.write();
507 guard.sink_schedules.remove(&sink_id);
508 }
509
510 pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
511 let prost_sink_catalog = self
512 .metadata_manager
513 .catalog_controller
514 .get_sink_by_id(sink_id)
515 .await?
516 .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
517 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
518 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
519 Ok(param)
520 }
521
522 pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
523 let sink_param = self.get_sink_param(sink_id).await?;
524 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
525 Ok(iceberg_config)
526 }
527
528 pub fn add_compactor_stream(
529 &self,
530 context_id: WorkerId,
531 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
532 ) {
533 self.compactor_streams_change_tx
534 .send((context_id, req_stream))
535 .unwrap();
536 }
537
538 pub fn iceberg_compaction_event_loop(
539 iceberg_compaction_manager: Arc<Self>,
540 compactor_streams_change_rx: UnboundedReceiver<(
541 WorkerId,
542 Streaming<SubscribeIcebergCompactionEventRequest>,
543 )>,
544 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
545 let mut join_handle_vec = Vec::default();
546
547 let iceberg_compaction_event_handler =
548 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
549
550 let iceberg_compaction_event_dispatcher =
551 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
552
553 let event_loop = IcebergCompactionEventLoop::new(
554 iceberg_compaction_event_dispatcher,
555 iceberg_compaction_manager.metrics.clone(),
556 compactor_streams_change_rx,
557 );
558
559 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
560 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
561
562 join_handle_vec
563 }
564
565 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
569 assert!(
570 interval_sec > 0,
571 "Iceberg GC interval must be greater than 0"
572 );
573 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
574 let join_handle = tokio::spawn(async move {
575 tracing::info!(
576 interval_sec = interval_sec,
577 "Starting Iceberg GC loop with configurable interval"
578 );
579 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
580
581 loop {
582 tokio::select! {
583 _ = interval.tick() => {
584 if let Err(e) = manager.perform_gc_operations().await {
585 tracing::error!(error = ?e.as_report(), "GC operations failed");
586 }
587 },
588 _ = &mut shutdown_rx => {
589 tracing::info!("Iceberg GC loop is stopped");
590 return;
591 }
592 }
593 }
594 });
595
596 (join_handle, shutdown_tx)
597 }
598
599 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
602 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
603
604 let iceberg_config = self.load_iceberg_config(sink_id).await?;
606 let initial_table = iceberg_config.load_table().await?;
607 let initial_snapshot_id = initial_table
608 .metadata()
609 .current_snapshot()
610 .map(|s| s.snapshot_id())
611 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
613
614 let compactor = self
616 .iceberg_compactor_manager
617 .next_compactor()
618 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
619
620 let task_id = self
622 .env
623 .hummock_seq
624 .next_interval("compaction_task", 1)
625 .await?;
626
627 let sink_param = self.get_sink_param(sink_id).await?;
628
629 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
630 task_id,
631 props: sink_param.properties,
632 task_type: TaskType::Full as i32, }))?;
634
635 tracing::info!(
636 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
637 sink_id,
638 task_id
639 );
640
641 self.wait_for_compaction_completion(
642 &sink_id,
643 iceberg_config,
644 initial_snapshot_id,
645 initial_timestamp,
646 task_id,
647 )
648 .await?;
649
650 Ok(task_id)
651 }
652
653 async fn wait_for_compaction_completion(
654 &self,
655 sink_id: &SinkId,
656 iceberg_config: IcebergConfig,
657 initial_snapshot_id: i64,
658 initial_timestamp: i64,
659 task_id: u64,
660 ) -> MetaResult<()> {
661 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
662 const MAX_POLL_INTERVAL_SECS: u64 = 60;
663 const MAX_WAIT_TIME_SECS: u64 = 1800;
664 const BACKOFF_MULTIPLIER: f64 = 1.5;
665
666 let mut elapsed_time = 0;
667 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
668
669 let cow =
670 should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
671
672 while elapsed_time < MAX_WAIT_TIME_SECS {
673 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
674 tokio::time::sleep(poll_interval).await;
675 elapsed_time += current_interval_secs;
676
677 tracing::info!(
678 "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
679 sink_id,
680 task_id,
681 elapsed_time,
682 current_interval_secs
683 );
684
685 let current_table = iceberg_config.load_table().await?;
686
687 let metadata = current_table.metadata();
688 let new_snapshots: Vec<_> = metadata
689 .snapshots()
690 .filter(|snapshot| {
691 let snapshot_timestamp = snapshot.timestamp_ms();
692 let snapshot_id = snapshot.snapshot_id();
693 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
694 })
695 .collect();
696
697 for snapshot in new_snapshots {
698 let summary = snapshot.summary();
699 if cow {
700 if matches!(summary.operation, Operation::Overwrite) {
701 tracing::info!(
702 "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
703 sink_id,
704 task_id
705 );
706 return Ok(());
707 }
708 } else if matches!(summary.operation, Operation::Replace) {
709 tracing::info!(
710 "Iceberg compaction completed for sink {} task_id={} with Replace operation",
711 sink_id,
712 task_id
713 );
714 return Ok(());
715 }
716 }
717
718 current_interval_secs = std::cmp::min(
719 MAX_POLL_INTERVAL_SECS,
720 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
721 );
722 }
723
724 Err(anyhow!(
725 "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
726 MAX_WAIT_TIME_SECS,
727 sink_id,
728 task_id
729 )
730 .into())
731 }
732
733 async fn perform_gc_operations(&self) -> MetaResult<()> {
734 let sink_ids = {
735 let guard = self.inner.read();
736 guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
737 };
738
739 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
740
741 for sink_id in sink_ids {
742 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
743 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
744 }
745 }
746
747 tracing::info!("GC operations completed");
748 Ok(())
749 }
750
751 async fn get_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
754 let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
755 let catalog = iceberg_config.create_catalog().await.ok()?;
756 let table_name = iceberg_config.full_table_name().ok()?;
757 let table = catalog.load_table(&table_name).await.ok()?;
758
759 let metadata = table.metadata();
760 let mut snapshots = metadata.snapshots().collect_vec();
761
762 if snapshots.is_empty() {
763 return Some(0);
764 }
765
766 snapshots.sort_by_key(|s| s.timestamp_ms());
768
769 let last_replace_index = snapshots
771 .iter()
772 .rposition(|snapshot| matches!(snapshot.summary().operation, Operation::Replace));
773
774 let snapshot_count = match last_replace_index {
776 Some(index) => snapshots.len() - index - 1,
777 None => snapshots.len(), };
779
780 Some(snapshot_count)
781 }
782
783 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
784 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
786
787 let iceberg_config = self.load_iceberg_config(sink_id).await?;
788 if !iceberg_config.enable_snapshot_expiration {
789 return Ok(());
790 }
791
792 let catalog = iceberg_config.create_catalog().await?;
793 let table = catalog
794 .load_table(&iceberg_config.full_table_name()?)
795 .await
796 .map_err(|e| SinkError::Iceberg(e.into()))?;
797
798 let metadata = table.metadata();
799 let mut snapshots = metadata.snapshots().collect_vec();
800 snapshots.sort_by_key(|s| s.timestamp_ms());
801
802 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
803
804 let snapshot_expiration_timestamp_ms =
805 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
806 Some(timestamp) => timestamp,
807 None => default_snapshot_expiration_timestamp_ms,
808 };
809
810 if snapshots.is_empty()
811 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
812 {
813 return Ok(());
815 }
816
817 tracing::info!(
818 catalog_name = iceberg_config.catalog_name(),
819 table_name = iceberg_config.full_table_name()?.to_string(),
820 %sink_id,
821 snapshots_len = snapshots.len(),
822 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
823 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
824 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
825 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
826 "try trigger snapshots expiration",
827 );
828
829 let tx = Transaction::new(&table);
830
831 let mut expired_snapshots = tx.expire_snapshot();
832
833 expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
834
835 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
836 expired_snapshots = expired_snapshots.retain_last(retain_last);
837 }
838
839 expired_snapshots = expired_snapshots
840 .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
841
842 expired_snapshots = expired_snapshots
843 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
844
845 let tx = expired_snapshots
846 .apply()
847 .await
848 .map_err(|e| SinkError::Iceberg(e.into()))?;
849
850 tx.commit(catalog.as_ref())
851 .await
852 .map_err(|e| SinkError::Iceberg(e.into()))?;
853
854 tracing::info!(
855 catalog_name = iceberg_config.catalog_name(),
856 table_name = iceberg_config.full_table_name()?.to_string(),
857 %sink_id,
858 "Expired snapshots for iceberg table",
859 );
860
861 Ok(())
862 }
863}