1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bail;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId, SinkType};
27use risingwave_connector::sink::iceberg::{IcebergConfig, should_enable_iceberg_cow};
28use risingwave_connector::sink::{SinkError, SinkParam};
29use risingwave_pb::catalog::PbSink;
30use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
31use risingwave_pb::iceberg_compaction::{
32 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
33};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot::Sender;
37use tokio::task::JoinHandle;
38use tonic::Streaming;
39
40use super::MetaSrvEnv;
41use crate::MetaResult;
42use crate::hummock::{
43 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
44 IcebergCompactor, IcebergCompactorManagerRef,
45};
46use crate::manager::MetadataManager;
47use crate::rpc::metrics::MetaMetrics;
48
49pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
50
51type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53type CompactorChangeRx =
54 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
55
56#[derive(Debug, Clone)]
57struct CommitInfo {
58 count: usize,
59 next_compaction_time: Option<Instant>,
60 compaction_interval: u64,
61}
62
63impl CommitInfo {
64 fn set_processing(&mut self) {
65 self.count = 0;
66 self.next_compaction_time.take();
68 }
69
70 fn initialize(&mut self) {
71 self.count = 0;
72 self.next_compaction_time =
73 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
74 }
75
76 fn replace(&mut self, commit_info: CommitInfo) {
77 self.count = commit_info.count;
78 self.next_compaction_time = commit_info.next_compaction_time;
79 self.compaction_interval = commit_info.compaction_interval;
80 }
81
82 fn increase_count(&mut self) {
83 self.count += 1;
84 }
85
86 fn update_compaction_interval(&mut self, compaction_interval: u64) {
87 self.compaction_interval = compaction_interval;
88
89 self.next_compaction_time =
91 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
92 }
93}
94
95pub struct IcebergCompactionHandle {
96 sink_id: SinkId,
97 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
98 metadata_manager: MetadataManager,
99 handle_success: bool,
100
101 commit_info: CommitInfo,
103}
104
105impl IcebergCompactionHandle {
106 fn new(
107 sink_id: SinkId,
108 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
109 metadata_manager: MetadataManager,
110 commit_info: CommitInfo,
111 ) -> Self {
112 Self {
113 sink_id,
114 inner,
115 metadata_manager,
116 handle_success: false,
117 commit_info,
118 }
119 }
120
121 pub async fn send_compact_task(
122 mut self,
123 compactor: Arc<IcebergCompactor>,
124 task_id: u64,
125 ) -> MetaResult<()> {
126 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
127 let mut sinks = self
128 .metadata_manager
129 .catalog_controller
130 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
131 .await?;
132 if sinks.is_empty() {
133 tracing::warn!("Sink not found: {}", self.sink_id.sink_id);
135 return Ok(());
136 }
137 let prost_sink_catalog: PbSink = sinks.remove(0);
138 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
139 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
140 let task_type: TaskType = match param.sink_type {
141 SinkType::AppendOnly | SinkType::ForceAppendOnly => {
142 if risingwave_common::license::Feature::IcebergCompaction
143 .check_available()
144 .is_ok()
145 {
146 TaskType::SmallDataFileCompaction
147 } else {
148 TaskType::FullCompaction
149 }
150 }
151
152 _ => TaskType::FullCompaction,
153 };
154 let result =
155 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
156 task_id,
157 props: param.properties,
158 task_type: task_type as i32,
159 }));
160
161 if result.is_ok() {
162 self.handle_success = true;
163 }
164
165 result
166 }
167
168 pub fn sink_id(&self) -> SinkId {
169 self.sink_id
170 }
171}
172
173impl Drop for IcebergCompactionHandle {
174 fn drop(&mut self) {
175 if self.handle_success {
176 let mut guard = self.inner.write();
177 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
178 commit_info.initialize();
179 }
180 } else {
181 let mut guard = self.inner.write();
186 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
187 commit_info.replace(self.commit_info.clone());
188 }
189 }
190 }
191}
192
193struct IcebergCompactionManagerInner {
194 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
195}
196
197pub struct IcebergCompactionManager {
198 pub env: MetaSrvEnv,
199 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
200
201 metadata_manager: MetadataManager,
202 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
203
204 compactor_streams_change_tx: CompactorChangeTx,
205
206 pub metrics: Arc<MetaMetrics>,
207}
208
209impl IcebergCompactionManager {
210 pub fn build(
211 env: MetaSrvEnv,
212 metadata_manager: MetadataManager,
213 iceberg_compactor_manager: IcebergCompactorManagerRef,
214 metrics: Arc<MetaMetrics>,
215 ) -> (Arc<Self>, CompactorChangeRx) {
216 let (compactor_streams_change_tx, compactor_streams_change_rx) =
217 tokio::sync::mpsc::unbounded_channel();
218 (
219 Arc::new(Self {
220 env,
221 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
222 iceberg_commits: HashMap::default(),
223 })),
224 metadata_manager,
225 iceberg_compactor_manager,
226 compactor_streams_change_tx,
227 metrics,
228 }),
229 compactor_streams_change_rx,
230 )
231 }
232
233 pub fn compaction_stat_loop(
234 manager: Arc<Self>,
235 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
236 ) -> (JoinHandle<()>, Sender<()>) {
237 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
238 let join_handle = tokio::spawn(async move {
239 loop {
240 tokio::select! {
241 Some(stat) = rx.recv() => {
242 manager.update_iceberg_commit_info(stat);
243 },
244 _ = &mut shutdown_rx => {
245 tracing::info!("Iceberg compaction manager is stopped");
246 return;
247 }
248 }
249 }
250 });
251
252 (join_handle, shutdown_tx)
253 }
254
255 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
256 let mut guard = self.inner.write();
257
258 let IcebergSinkCompactionUpdate {
259 sink_id,
260 compaction_interval,
261 } = msg;
262
263 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
265 count: 0,
266 next_compaction_time: Some(
267 Instant::now() + std::time::Duration::from_secs(compaction_interval),
268 ),
269 compaction_interval,
270 });
271
272 commit_info.increase_count();
273 if commit_info.compaction_interval != compaction_interval {
274 commit_info.update_compaction_interval(compaction_interval);
275 }
276 }
277
278 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
281 let now = Instant::now();
282 let mut guard = self.inner.write();
283 guard
284 .iceberg_commits
285 .iter_mut()
286 .filter(|(_, commit_info)| {
287 commit_info.count > 0
288 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
289 next_compaction_time <= now
290 } else {
291 false
292 }
293 })
294 .sorted_by(|a, b| {
295 b.1.count
296 .cmp(&a.1.count)
297 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
298 })
299 .take(n)
300 .map(|(sink_id, commit_info)| {
301 let handle = IcebergCompactionHandle::new(
303 *sink_id,
304 self.inner.clone(),
305 self.metadata_manager.clone(),
306 commit_info.clone(),
307 );
308
309 commit_info.set_processing();
310
311 handle
312 })
313 .collect::<Vec<_>>()
314 }
315
316 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
317 let mut guard = self.inner.write();
318 guard.iceberg_commits.remove(&sink_id);
319 }
320
321 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
322 let mut sinks = self
323 .metadata_manager
324 .catalog_controller
325 .get_sink_by_ids(vec![sink_id.sink_id as i32])
326 .await?;
327 if sinks.is_empty() {
328 bail!("Sink not found: {}", sink_id.sink_id);
329 }
330 let prost_sink_catalog: PbSink = sinks.remove(0);
331 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
332 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
333 Ok(param)
334 }
335
336 pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
337 let sink_param = self.get_sink_param(sink_id).await?;
338 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
339 Ok(iceberg_config)
340 }
341
342 pub fn add_compactor_stream(
343 &self,
344 context_id: u32,
345 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
346 ) {
347 self.compactor_streams_change_tx
348 .send((context_id, req_stream))
349 .unwrap();
350 }
351
352 pub fn iceberg_compaction_event_loop(
353 iceberg_compaction_manager: Arc<Self>,
354 compactor_streams_change_rx: UnboundedReceiver<(
355 u32,
356 Streaming<SubscribeIcebergCompactionEventRequest>,
357 )>,
358 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
359 let mut join_handle_vec = Vec::default();
360
361 let iceberg_compaction_event_handler =
362 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
363
364 let iceberg_compaction_event_dispatcher =
365 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
366
367 let event_loop = IcebergCompactionEventLoop::new(
368 iceberg_compaction_event_dispatcher,
369 iceberg_compaction_manager.metrics.clone(),
370 compactor_streams_change_rx,
371 );
372
373 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
374 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
375
376 join_handle_vec
377 }
378
379 pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
383 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
384 let join_handle = tokio::spawn(async move {
385 const GC_LOOP_INTERVAL_SECS: u64 = 3600;
387 let mut interval =
388 tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
389
390 loop {
391 tokio::select! {
392 _ = interval.tick() => {
393 if let Err(e) = manager.perform_gc_operations().await {
394 tracing::error!(error = ?e.as_report(), "GC operations failed");
395 }
396 },
397 _ = &mut shutdown_rx => {
398 tracing::info!("Iceberg GC loop is stopped");
399 return;
400 }
401 }
402 }
403 });
404
405 (join_handle, shutdown_tx)
406 }
407
408 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
411 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
412
413 let iceberg_config = self.load_iceberg_config(&sink_id).await?;
415 let initial_table = iceberg_config.load_table().await?;
416 let initial_snapshot_id = initial_table
417 .metadata()
418 .current_snapshot()
419 .map(|s| s.snapshot_id())
420 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
422
423 let compactor = self
425 .iceberg_compactor_manager
426 .next_compactor()
427 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
428
429 let task_id = self
431 .env
432 .hummock_seq
433 .next_interval("compaction_task", 1)
434 .await?;
435
436 let sink_param = self.get_sink_param(&sink_id).await?;
437
438 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
439 task_id,
440 props: sink_param.properties,
441 task_type: TaskType::FullCompaction as i32, }))?;
443
444 tracing::info!(
445 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
446 sink_id.sink_id,
447 task_id
448 );
449
450 self.wait_for_compaction_completion(
451 &sink_id,
452 iceberg_config,
453 initial_snapshot_id,
454 initial_timestamp,
455 task_id,
456 )
457 .await?;
458
459 Ok(task_id)
460 }
461
462 async fn wait_for_compaction_completion(
463 &self,
464 sink_id: &SinkId,
465 iceberg_config: IcebergConfig,
466 initial_snapshot_id: i64,
467 initial_timestamp: i64,
468 task_id: u64,
469 ) -> MetaResult<()> {
470 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
471 const MAX_POLL_INTERVAL_SECS: u64 = 60;
472 const MAX_WAIT_TIME_SECS: u64 = 1800;
473 const BACKOFF_MULTIPLIER: f64 = 1.5;
474
475 let mut elapsed_time = 0;
476 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
477
478 let cow = should_enable_iceberg_cow(
479 iceberg_config.r#type.as_str(),
480 iceberg_config.write_mode.as_str(),
481 );
482
483 while elapsed_time < MAX_WAIT_TIME_SECS {
484 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
485 tokio::time::sleep(poll_interval).await;
486 elapsed_time += current_interval_secs;
487
488 let current_table = iceberg_config.load_table().await?;
489
490 let metadata = current_table.metadata();
491 let new_snapshots: Vec<_> = metadata
492 .snapshots()
493 .filter(|snapshot| {
494 let snapshot_timestamp = snapshot.timestamp_ms();
495 let snapshot_id = snapshot.snapshot_id();
496 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
497 })
498 .collect();
499
500 for snapshot in new_snapshots {
501 let summary = snapshot.summary();
502 if cow {
503 if matches!(summary.operation, Operation::Overwrite) {
504 return Ok(());
505 }
506 } else if matches!(summary.operation, Operation::Replace) {
507 return Ok(());
508 }
509 }
510
511 current_interval_secs = std::cmp::min(
512 MAX_POLL_INTERVAL_SECS,
513 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
514 );
515 }
516
517 Err(anyhow!(
518 "Compaction did not complete within {} seconds for sink {} (task_id={})",
519 MAX_WAIT_TIME_SECS,
520 sink_id.sink_id,
521 task_id
522 )
523 .into())
524 }
525
526 async fn perform_gc_operations(&self) -> MetaResult<()> {
527 let sink_ids = {
528 let guard = self.inner.read();
529 guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
530 };
531
532 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
533
534 for sink_id in sink_ids {
535 if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
536 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
537 }
538 }
539
540 tracing::info!("GC operations completed");
541 Ok(())
542 }
543
544 pub async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
545 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
547
548 let iceberg_config = self.load_iceberg_config(sink_id).await?;
549 if !iceberg_config.enable_snapshot_expiration {
550 return Ok(());
551 }
552
553 let catalog = iceberg_config.create_catalog().await?;
554 let table = catalog
555 .load_table(&iceberg_config.full_table_name()?)
556 .await
557 .map_err(|e| SinkError::Iceberg(e.into()))?;
558
559 let metadata = table.metadata();
560 let mut snapshots = metadata.snapshots().collect_vec();
561 snapshots.sort_by_key(|s| s.timestamp_ms());
562
563 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
564
565 let snapshot_expiration_timestamp_ms =
566 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
567 Some(timestamp) => timestamp,
568 None => default_snapshot_expiration_timestamp_ms,
569 };
570
571 if snapshots.is_empty()
572 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
573 {
574 return Ok(());
576 }
577
578 tracing::info!(
579 catalog_name = iceberg_config.catalog_name(),
580 table_name = iceberg_config.full_table_name()?.to_string(),
581 sink_id = sink_id.sink_id,
582 snapshots_len = snapshots.len(),
583 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
584 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
585 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
586 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
587 "try trigger snapshots expiration",
588 );
589
590 let tx = Transaction::new(&table);
591
592 let mut expired_snapshots = tx.expire_snapshot();
593
594 expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
595
596 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
597 expired_snapshots = expired_snapshots.retain_last(retain_last);
598 }
599
600 expired_snapshots = expired_snapshots
601 .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
602
603 expired_snapshots = expired_snapshots
604 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
605
606 let tx = expired_snapshots
607 .apply()
608 .await
609 .map_err(|e| SinkError::Iceberg(e.into()))?;
610
611 tx.commit(catalog.as_ref())
612 .await
613 .map_err(|e| SinkError::Iceberg(e.into()))?;
614
615 tracing::info!(
616 catalog_name = iceberg_config.catalog_name(),
617 table_name = iceberg_config.full_table_name()?.to_string(),
618 sink_id = sink_id.sink_id,
619 "Expired snapshots for iceberg table",
620 );
621
622 Ok(())
623 }
624}