1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::{ApplyTransactionAction, Transaction};
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::id::WorkerId;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
27use risingwave_connector::sink::iceberg::{
28 CompactionType, IcebergConfig, commit_branch, should_enable_iceberg_cow,
29};
30use risingwave_connector::sink::{SinkError, SinkParam};
31use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
32use risingwave_pb::iceberg_compaction::{
33 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
34};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
37use tokio::sync::oneshot::Sender;
38use tokio::task::JoinHandle;
39use tonic::Streaming;
40
41use super::MetaSrvEnv;
42use crate::MetaResult;
43use crate::hummock::{
44 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
45 IcebergCompactor, IcebergCompactorManagerRef,
46};
47use crate::manager::MetadataManager;
48use crate::rpc::metrics::MetaMetrics;
49
50pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
51
52type CompactorChangeTx =
53 UnboundedSender<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
54
55type CompactorChangeRx =
56 UnboundedReceiver<(WorkerId, Streaming<SubscribeIcebergCompactionEventRequest>)>;
57
58#[derive(Debug, Clone)]
60struct CompactionTrackSnapshot {
61 next_compaction_time: Option<Instant>,
62}
63
64#[derive(Debug, Clone)]
66enum CompactionTrackState {
67 Idle { next_compaction_time: Instant },
69 Processing,
71}
72
73#[derive(Debug, Clone)]
74struct CompactionTrack {
75 task_type: TaskType,
76 trigger_interval_sec: u64,
77 trigger_snapshot_count: usize,
80 state: CompactionTrackState,
81}
82
83impl CompactionTrack {
84 fn should_trigger(&self, now: Instant, snapshot_count: usize) -> bool {
96 let next_compaction_time = match &self.state {
98 CompactionTrackState::Idle {
99 next_compaction_time,
100 } => *next_compaction_time,
101 CompactionTrackState::Processing => return false,
102 };
103
104 let time_ready = now >= next_compaction_time;
106 let snapshot_ready = snapshot_count >= self.trigger_snapshot_count;
107 let has_snapshots = snapshot_count > 0;
108
109 snapshot_ready || (time_ready && has_snapshots)
112 }
113
114 fn start_processing(&mut self) -> CompactionTrackSnapshot {
116 match &self.state {
117 CompactionTrackState::Idle {
118 next_compaction_time,
119 } => {
120 let snapshot = CompactionTrackSnapshot {
121 next_compaction_time: Some(*next_compaction_time),
122 };
123 self.state = CompactionTrackState::Processing;
124 snapshot
125 }
126 CompactionTrackState::Processing => {
127 unreachable!("Cannot start processing when already processing")
128 }
129 }
130 }
131
132 fn complete_processing(&mut self) {
134 match &self.state {
135 CompactionTrackState::Processing => {
136 self.state = CompactionTrackState::Idle {
137 next_compaction_time: Instant::now()
138 + std::time::Duration::from_secs(self.trigger_interval_sec),
139 };
140 }
141 CompactionTrackState::Idle { .. } => {
142 unreachable!("Cannot complete processing when not processing")
143 }
144 }
145 }
146
147 fn restore_from_snapshot(&mut self, snapshot: CompactionTrackSnapshot) {
149 self.state = CompactionTrackState::Idle {
150 next_compaction_time: snapshot.next_compaction_time.unwrap_or_else(Instant::now),
151 };
152 }
153
154 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
155 if self.trigger_interval_sec == new_interval_sec {
156 return;
157 }
158
159 self.trigger_interval_sec = new_interval_sec;
160
161 match &mut self.state {
163 CompactionTrackState::Idle {
164 next_compaction_time,
165 } => {
166 *next_compaction_time = now + std::time::Duration::from_secs(new_interval_sec);
167 }
168 CompactionTrackState::Processing => {
169 }
171 }
172 }
173}
174
175pub struct IcebergCompactionHandle {
177 sink_id: SinkId,
178 task_type: TaskType,
179 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
180 metadata_manager: MetadataManager,
181 handle_success: bool,
182
183 track_snapshot: CompactionTrackSnapshot,
185}
186
187impl IcebergCompactionHandle {
188 fn new(
189 sink_id: SinkId,
190 task_type: TaskType,
191 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
192 metadata_manager: MetadataManager,
193 track_snapshot: CompactionTrackSnapshot,
194 ) -> Self {
195 Self {
196 sink_id,
197 task_type,
198 inner,
199 metadata_manager,
200 handle_success: false,
201 track_snapshot,
202 }
203 }
204
205 pub async fn send_compact_task(
206 mut self,
207 compactor: Arc<IcebergCompactor>,
208 task_id: u64,
209 ) -> MetaResult<()> {
210 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
211 let Some(prost_sink_catalog) = self
212 .metadata_manager
213 .catalog_controller
214 .get_sink_by_id(self.sink_id)
215 .await?
216 else {
217 tracing::warn!("Sink not found: {}", self.sink_id);
219 return Ok(());
220 };
221 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
222 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
223
224 let result =
225 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
226 task_id,
227 props: param.properties,
228 task_type: self.task_type as i32,
229 }));
230
231 if result.is_ok() {
232 self.handle_success = true;
233 }
234
235 result
236 }
237
238 pub fn sink_id(&self) -> SinkId {
239 self.sink_id
240 }
241}
242
243impl Drop for IcebergCompactionHandle {
244 fn drop(&mut self) {
245 let mut guard = self.inner.write();
246 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id) {
247 if track.task_type == self.task_type {
249 if self.handle_success {
250 track.complete_processing();
251 } else {
252 track.restore_from_snapshot(self.track_snapshot.clone());
255 }
256 }
257 }
258 }
259}
260
261struct IcebergCompactionManagerInner {
262 pub sink_schedules: HashMap<SinkId, CompactionTrack>,
263}
264
265pub struct IcebergCompactionManager {
266 pub env: MetaSrvEnv,
267 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
268
269 metadata_manager: MetadataManager,
270 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
271
272 compactor_streams_change_tx: CompactorChangeTx,
273
274 pub metrics: Arc<MetaMetrics>,
275}
276
277impl IcebergCompactionManager {
278 pub fn build(
279 env: MetaSrvEnv,
280 metadata_manager: MetadataManager,
281 iceberg_compactor_manager: IcebergCompactorManagerRef,
282 metrics: Arc<MetaMetrics>,
283 ) -> (Arc<Self>, CompactorChangeRx) {
284 let (compactor_streams_change_tx, compactor_streams_change_rx) =
285 tokio::sync::mpsc::unbounded_channel();
286 (
287 Arc::new(Self {
288 env,
289 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
290 sink_schedules: HashMap::default(),
291 })),
292 metadata_manager,
293 iceberg_compactor_manager,
294 compactor_streams_change_tx,
295 metrics,
296 }),
297 compactor_streams_change_rx,
298 )
299 }
300
301 pub fn compaction_stat_loop(
302 manager: Arc<Self>,
303 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
304 ) -> (JoinHandle<()>, Sender<()>) {
305 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
306 let join_handle = tokio::spawn(async move {
307 loop {
308 tokio::select! {
309 Some(stat) = rx.recv() => {
310 manager.update_iceberg_commit_info(stat).await;
311 },
312 _ = &mut shutdown_rx => {
313 tracing::info!("Iceberg compaction manager is stopped");
314 return;
315 }
316 }
317 }
318 });
319
320 (join_handle, shutdown_tx)
321 }
322
323 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
324 let IcebergSinkCompactionUpdate {
325 sink_id,
326 compaction_interval,
327 force_compaction,
328 } = msg;
329
330 let track_exists = {
332 let guard = self.inner.read();
333 guard.sink_schedules.contains_key(&sink_id)
334 };
335
336 if !track_exists {
338 let iceberg_config = self.load_iceberg_config(sink_id).await;
340
341 let new_track = match iceberg_config {
342 Ok(config) => {
343 match self.create_compaction_track(sink_id, &config) {
345 Ok(track) => track,
346 Err(e) => {
347 tracing::error!(
348 error = ?e.as_report(),
349 "Failed to create compaction track from config for sink {}, using default Full track",
350 sink_id
351 );
352 CompactionTrack {
354 task_type: TaskType::Full,
355 trigger_interval_sec: compaction_interval,
356 trigger_snapshot_count: 10,
357 state: CompactionTrackState::Idle {
358 next_compaction_time: Instant::now()
359 + std::time::Duration::from_secs(compaction_interval),
360 },
361 }
362 }
363 }
364 }
365 Err(e) => {
366 tracing::error!(
367 error = ?e.as_report(),
368 "Failed to load iceberg config for sink {}, using default Full track",
369 sink_id
370 );
371 CompactionTrack {
373 task_type: TaskType::Full,
374 trigger_interval_sec: compaction_interval,
375 trigger_snapshot_count: 10,
376 state: CompactionTrackState::Idle {
377 next_compaction_time: Instant::now()
378 + std::time::Duration::from_secs(compaction_interval),
379 },
380 }
381 }
382 };
383
384 let mut guard = self.inner.write();
385 guard.sink_schedules.insert(sink_id, new_track);
386 }
387
388 let mut guard = self.inner.write();
390 if let Some(track) = guard.sink_schedules.get_mut(&sink_id) {
391 if force_compaction {
393 if let CompactionTrackState::Idle {
394 next_compaction_time,
395 } = &mut track.state
396 {
397 *next_compaction_time = Instant::now();
398 }
399 } else {
401 track.update_interval(compaction_interval, Instant::now());
403 }
404 } else {
405 tracing::error!(
406 "Failed to find compaction track for sink {} during update; configuration changes not applied.",
407 sink_id
408 );
409 }
410 }
411
412 fn create_compaction_track(
414 &self,
415 _sink_id: SinkId,
416 iceberg_config: &IcebergConfig,
417 ) -> MetaResult<CompactionTrack> {
418 let trigger_interval_sec = iceberg_config.compaction_interval_sec();
419 let trigger_snapshot_count = iceberg_config.trigger_snapshot_count();
420
421 let task_type =
423 if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
424 {
425 TaskType::Full
426 } else {
427 match iceberg_config.compaction_type() {
429 CompactionType::Full => TaskType::Full,
430 CompactionType::SmallFiles => TaskType::SmallFiles,
431 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
432 }
433 };
434
435 Ok(CompactionTrack {
436 task_type,
437 trigger_interval_sec,
438 trigger_snapshot_count,
439 state: CompactionTrackState::Idle {
440 next_compaction_time: Instant::now()
441 + std::time::Duration::from_secs(trigger_interval_sec),
442 },
443 })
444 }
445
446 pub async fn get_top_n_iceberg_commit_sink_ids(
450 &self,
451 n: usize,
452 ) -> Vec<IcebergCompactionHandle> {
453 let now = Instant::now();
454
455 let sink_ids: Vec<SinkId> = {
457 let guard = self.inner.read();
458 guard.sink_schedules.keys().cloned().collect()
459 };
460
461 let snapshot_count_futures = sink_ids
463 .iter()
464 .map(|sink_id| async move {
465 let count = self.get_pending_snapshot_count(*sink_id).await?;
466 Some((*sink_id, count))
467 })
468 .collect::<Vec<_>>();
469
470 let snapshot_counts: HashMap<SinkId, usize> =
471 futures::future::join_all(snapshot_count_futures)
472 .await
473 .into_iter()
474 .flatten()
475 .collect();
476
477 let mut guard = self.inner.write();
478
479 let mut candidates = Vec::new();
481 for (sink_id, track) in &guard.sink_schedules {
482 let Some(&snapshot_count) = snapshot_counts.get(sink_id) else {
484 continue;
485 };
486 if track.should_trigger(now, snapshot_count) {
487 if let CompactionTrackState::Idle {
489 next_compaction_time,
490 } = track.state
491 {
492 candidates.push((*sink_id, track.task_type, next_compaction_time));
493 }
494 }
495 }
496
497 candidates.sort_by(|a, b| a.2.cmp(&b.2));
499
500 candidates
502 .into_iter()
503 .take(n)
504 .filter_map(|(sink_id, task_type, _)| {
505 let track = guard.sink_schedules.get_mut(&sink_id)?;
506
507 let track_snapshot = track.start_processing();
508
509 Some(IcebergCompactionHandle::new(
510 sink_id,
511 task_type,
512 self.inner.clone(),
513 self.metadata_manager.clone(),
514 track_snapshot,
515 ))
516 })
517 .collect()
518 }
519
520 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
521 let mut guard = self.inner.write();
522 guard.sink_schedules.remove(&sink_id);
523 }
524
525 pub async fn get_sink_param(&self, sink_id: SinkId) -> MetaResult<SinkParam> {
526 let prost_sink_catalog = self
527 .metadata_manager
528 .catalog_controller
529 .get_sink_by_id(sink_id)
530 .await?
531 .ok_or_else(|| anyhow!("Sink not found: {}", sink_id))?;
532 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
533 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
534 Ok(param)
535 }
536
537 pub async fn load_iceberg_config(&self, sink_id: SinkId) -> MetaResult<IcebergConfig> {
538 let sink_param = self.get_sink_param(sink_id).await?;
539 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties)?;
540 Ok(iceberg_config)
541 }
542
543 pub fn add_compactor_stream(
544 &self,
545 context_id: WorkerId,
546 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
547 ) {
548 self.compactor_streams_change_tx
549 .send((context_id, req_stream))
550 .unwrap();
551 }
552
553 pub fn iceberg_compaction_event_loop(
554 iceberg_compaction_manager: Arc<Self>,
555 compactor_streams_change_rx: UnboundedReceiver<(
556 WorkerId,
557 Streaming<SubscribeIcebergCompactionEventRequest>,
558 )>,
559 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
560 let mut join_handle_vec = Vec::default();
561
562 let iceberg_compaction_event_handler =
563 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
564
565 let iceberg_compaction_event_dispatcher =
566 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
567
568 let event_loop = IcebergCompactionEventLoop::new(
569 iceberg_compaction_event_dispatcher,
570 iceberg_compaction_manager.metrics.clone(),
571 compactor_streams_change_rx,
572 );
573
574 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
575 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
576
577 join_handle_vec
578 }
579
580 pub fn gc_loop(manager: Arc<Self>, interval_sec: u64) -> (JoinHandle<()>, Sender<()>) {
584 assert!(
585 interval_sec > 0,
586 "Iceberg GC interval must be greater than 0"
587 );
588 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
589 let join_handle = tokio::spawn(async move {
590 tracing::info!(
591 interval_sec = interval_sec,
592 "Starting Iceberg GC loop with configurable interval"
593 );
594 let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_sec));
595
596 loop {
597 tokio::select! {
598 _ = interval.tick() => {
599 if let Err(e) = manager.perform_gc_operations().await {
600 tracing::error!(error = ?e.as_report(), "GC operations failed");
601 }
602 },
603 _ = &mut shutdown_rx => {
604 tracing::info!("Iceberg GC loop is stopped");
605 return;
606 }
607 }
608 }
609 });
610
611 (join_handle, shutdown_tx)
612 }
613
614 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
617 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
618
619 let iceberg_config = self.load_iceberg_config(sink_id).await?;
621 let initial_table = iceberg_config.load_table().await?;
622 let initial_snapshot_id = initial_table
623 .metadata()
624 .current_snapshot()
625 .map(|s| s.snapshot_id())
626 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
628
629 let compactor = self
631 .iceberg_compactor_manager
632 .next_compactor()
633 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
634
635 let task_id = self
637 .env
638 .hummock_seq
639 .next_interval("compaction_task", 1)
640 .await?;
641
642 let sink_param = self.get_sink_param(sink_id).await?;
643
644 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
645 task_id,
646 props: sink_param.properties,
647 task_type: TaskType::Full as i32, }))?;
649
650 tracing::info!(
651 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
652 sink_id,
653 task_id
654 );
655
656 self.wait_for_compaction_completion(
657 &sink_id,
658 iceberg_config,
659 initial_snapshot_id,
660 initial_timestamp,
661 task_id,
662 )
663 .await?;
664
665 Ok(task_id)
666 }
667
668 async fn wait_for_compaction_completion(
669 &self,
670 sink_id: &SinkId,
671 iceberg_config: IcebergConfig,
672 initial_snapshot_id: i64,
673 initial_timestamp: i64,
674 task_id: u64,
675 ) -> MetaResult<()> {
676 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
677 const MAX_POLL_INTERVAL_SECS: u64 = 60;
678 const MAX_WAIT_TIME_SECS: u64 = 1800;
679 const BACKOFF_MULTIPLIER: f64 = 1.5;
680
681 let mut elapsed_time = 0;
682 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
683
684 let cow =
685 should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
686
687 while elapsed_time < MAX_WAIT_TIME_SECS {
688 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
689 tokio::time::sleep(poll_interval).await;
690 elapsed_time += current_interval_secs;
691
692 tracing::info!(
693 "Checking iceberg compaction completion for sink {} task_id={}, elapsed={}s, interval={}s",
694 sink_id,
695 task_id,
696 elapsed_time,
697 current_interval_secs
698 );
699
700 let current_table = iceberg_config.load_table().await?;
701
702 let metadata = current_table.metadata();
703 let new_snapshots: Vec<_> = metadata
704 .snapshots()
705 .filter(|snapshot| {
706 let snapshot_timestamp = snapshot.timestamp_ms();
707 let snapshot_id = snapshot.snapshot_id();
708 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
709 })
710 .collect();
711
712 for snapshot in new_snapshots {
713 let summary = snapshot.summary();
714 if cow {
715 if matches!(summary.operation, Operation::Overwrite) {
716 tracing::info!(
717 "Iceberg compaction completed for sink {} task_id={} with Overwrite operation",
718 sink_id,
719 task_id
720 );
721 return Ok(());
722 }
723 } else if matches!(summary.operation, Operation::Replace) {
724 tracing::info!(
725 "Iceberg compaction completed for sink {} task_id={} with Replace operation",
726 sink_id,
727 task_id
728 );
729 return Ok(());
730 }
731 }
732
733 current_interval_secs = std::cmp::min(
734 MAX_POLL_INTERVAL_SECS,
735 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
736 );
737 }
738
739 Err(anyhow!(
740 "Iceberg compaction did not complete within {} seconds for sink {} (task_id={})",
741 MAX_WAIT_TIME_SECS,
742 sink_id,
743 task_id
744 )
745 .into())
746 }
747
748 async fn perform_gc_operations(&self) -> MetaResult<()> {
749 let sink_ids = {
750 let guard = self.inner.read();
751 guard.sink_schedules.keys().cloned().collect::<Vec<_>>()
752 };
753
754 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
755
756 for sink_id in sink_ids {
757 if let Err(e) = self.check_and_expire_snapshots(sink_id).await {
758 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id);
759 }
760 }
761
762 tracing::info!("GC operations completed");
763 Ok(())
764 }
765
766 async fn get_pending_snapshot_count(&self, sink_id: SinkId) -> Option<usize> {
773 let iceberg_config = self.load_iceberg_config(sink_id).await.ok()?;
774 let is_cow_mode =
775 should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
776 let catalog = iceberg_config.create_catalog().await.ok()?;
777 let table_name = iceberg_config.full_table_name().ok()?;
778 let table = catalog.load_table(&table_name).await.ok()?;
779 let metadata = table.metadata();
780
781 if is_cow_mode {
782 let last_compaction_timestamp = metadata
788 .current_snapshot()
789 .map(|s| s.timestamp_ms())
790 .unwrap_or(0); let branch = commit_branch(iceberg_config.r#type.as_str(), iceberg_config.write_mode);
794 let current_snapshot = metadata.snapshot_for_ref(&branch)?;
795
796 let mut count = 0;
797 let mut snapshot_id = Some(current_snapshot.snapshot_id());
798
799 while let Some(id) = snapshot_id {
800 let snapshot = metadata.snapshot_by_id(id)?;
801 if snapshot.timestamp_ms() > last_compaction_timestamp {
802 count += 1;
803 snapshot_id = snapshot.parent_snapshot_id();
804 } else {
805 break;
807 }
808 }
809
810 Some(count)
811 } else {
812 let mut snapshots = metadata.snapshots().collect_vec();
814 if snapshots.is_empty() {
815 return Some(0);
816 }
817
818 snapshots.sort_by_key(|s| s.timestamp_ms());
819
820 let last_replace_index = snapshots
821 .iter()
822 .rposition(|s| matches!(s.summary().operation, Operation::Replace));
823
824 let count = match last_replace_index {
825 Some(index) => snapshots.len() - index - 1,
826 None => snapshots.len(),
827 };
828
829 Some(count)
830 }
831 }
832
833 pub async fn check_and_expire_snapshots(&self, sink_id: SinkId) -> MetaResult<()> {
834 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
836
837 let iceberg_config = self.load_iceberg_config(sink_id).await?;
838 if !iceberg_config.enable_snapshot_expiration {
839 return Ok(());
840 }
841
842 let catalog = iceberg_config.create_catalog().await?;
843 let mut table = catalog
844 .load_table(&iceberg_config.full_table_name()?)
845 .await
846 .map_err(|e| SinkError::Iceberg(e.into()))?;
847
848 let metadata = table.metadata();
849 let mut snapshots = metadata.snapshots().collect_vec();
850 snapshots.sort_by_key(|s| s.timestamp_ms());
851
852 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
853
854 let snapshot_expiration_timestamp_ms =
855 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
856 Some(timestamp) => timestamp,
857 None => default_snapshot_expiration_timestamp_ms,
858 };
859
860 if snapshots.is_empty()
861 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
862 {
863 return Ok(());
865 }
866
867 tracing::info!(
868 catalog_name = iceberg_config.catalog_name(),
869 table_name = iceberg_config.full_table_name()?.to_string(),
870 %sink_id,
871 snapshots_len = snapshots.len(),
872 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
873 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
874 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
875 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
876 "try trigger snapshots expiration",
877 );
878
879 let txn = Transaction::new(&table);
880
881 let mut expired_snapshots = txn
882 .expire_snapshot()
883 .expire_older_than(snapshot_expiration_timestamp_ms)
884 .clear_expire_files(iceberg_config.snapshot_expiration_clear_expired_files)
885 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
886
887 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
888 expired_snapshots = expired_snapshots.retain_last(retain_last);
889 }
890
891 let before_metadata = table.metadata_ref();
892 let tx = expired_snapshots
893 .apply(txn)
894 .map_err(|e| SinkError::Iceberg(e.into()))?;
895 table = tx
896 .commit(catalog.as_ref())
897 .await
898 .map_err(|e| SinkError::Iceberg(e.into()))?;
899
900 if iceberg_config.snapshot_expiration_clear_expired_files {
901 table
902 .cleanup_expired_files(&before_metadata)
903 .await
904 .map_err(|e| SinkError::Iceberg(e.into()))?;
905 }
906
907 tracing::info!(
908 catalog_name = iceberg_config.catalog_name(),
909 table_name = iceberg_config.full_table_name()?.to_string(),
910 %sink_id,
911 "Expired snapshots for iceberg table",
912 );
913
914 Ok(())
915 }
916}