1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bail;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId, SinkType};
27use risingwave_connector::sink::iceberg::{IcebergConfig, should_enable_iceberg_cow};
28use risingwave_connector::sink::{SinkError, SinkParam};
29use risingwave_pb::catalog::PbSink;
30use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
31use risingwave_pb::iceberg_compaction::{
32 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
33};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot::Sender;
37use tokio::task::JoinHandle;
38use tonic::Streaming;
39
40use super::MetaSrvEnv;
41use crate::MetaResult;
42use crate::hummock::{
43 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
44 IcebergCompactor, IcebergCompactorManagerRef,
45};
46use crate::manager::MetadataManager;
47use crate::rpc::metrics::MetaMetrics;
48
49pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
50
51type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53type CompactorChangeRx =
54 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
55
56#[derive(Debug, Clone)]
57struct CommitInfo {
58 count: usize,
59 next_compaction_time: Option<Instant>,
60 compaction_interval: u64,
61}
62
63impl CommitInfo {
64 fn set_processing(&mut self) {
65 self.count = 0;
66 self.next_compaction_time.take();
68 }
69
70 fn initialize(&mut self) {
71 self.count = 0;
72 self.next_compaction_time =
73 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
74 }
75
76 fn replace(&mut self, commit_info: CommitInfo) {
77 self.count = commit_info.count;
78 self.next_compaction_time = commit_info.next_compaction_time;
79 self.compaction_interval = commit_info.compaction_interval;
80 }
81
82 fn increase_count(&mut self) {
83 self.count += 1;
84 }
85
86 fn update_compaction_interval(&mut self, compaction_interval: u64) {
87 self.compaction_interval = compaction_interval;
88
89 self.next_compaction_time =
91 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
92 }
93}
94
95pub struct IcebergCompactionHandle {
96 sink_id: SinkId,
97 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
98 metadata_manager: MetadataManager,
99 handle_success: bool,
100
101 commit_info: CommitInfo,
103}
104
105impl IcebergCompactionHandle {
106 fn new(
107 sink_id: SinkId,
108 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
109 metadata_manager: MetadataManager,
110 commit_info: CommitInfo,
111 ) -> Self {
112 Self {
113 sink_id,
114 inner,
115 metadata_manager,
116 handle_success: false,
117 commit_info,
118 }
119 }
120
121 pub async fn send_compact_task(
122 mut self,
123 compactor: Arc<IcebergCompactor>,
124 task_id: u64,
125 ) -> MetaResult<()> {
126 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
127 let prost_sink_catalog: PbSink = self
128 .metadata_manager
129 .catalog_controller
130 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
131 .await?
132 .remove(0);
133 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
134 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
135 let task_type: TaskType = match param.sink_type {
136 SinkType::AppendOnly | SinkType::ForceAppendOnly => {
137 if risingwave_common::license::Feature::IcebergCompaction
138 .check_available()
139 .is_ok()
140 {
141 TaskType::SmallDataFileCompaction
142 } else {
143 TaskType::FullCompaction
144 }
145 }
146
147 _ => TaskType::FullCompaction,
148 };
149 let result =
150 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
151 task_id,
152 props: param.properties,
153 task_type: task_type as i32,
154 }));
155
156 if result.is_ok() {
157 self.handle_success = true;
158 }
159
160 result
161 }
162
163 pub fn sink_id(&self) -> SinkId {
164 self.sink_id
165 }
166}
167
168impl Drop for IcebergCompactionHandle {
169 fn drop(&mut self) {
170 if self.handle_success {
171 let mut guard = self.inner.write();
172 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
173 commit_info.initialize();
174 }
175 } else {
176 let mut guard = self.inner.write();
181 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
182 commit_info.replace(self.commit_info.clone());
183 }
184 }
185 }
186}
187
188struct IcebergCompactionManagerInner {
189 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
190}
191
192pub struct IcebergCompactionManager {
193 pub env: MetaSrvEnv,
194 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
195
196 metadata_manager: MetadataManager,
197 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
198
199 compactor_streams_change_tx: CompactorChangeTx,
200
201 pub metrics: Arc<MetaMetrics>,
202}
203
204impl IcebergCompactionManager {
205 pub fn build(
206 env: MetaSrvEnv,
207 metadata_manager: MetadataManager,
208 iceberg_compactor_manager: IcebergCompactorManagerRef,
209 metrics: Arc<MetaMetrics>,
210 ) -> (Arc<Self>, CompactorChangeRx) {
211 let (compactor_streams_change_tx, compactor_streams_change_rx) =
212 tokio::sync::mpsc::unbounded_channel();
213 (
214 Arc::new(Self {
215 env,
216 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
217 iceberg_commits: HashMap::default(),
218 })),
219 metadata_manager,
220 iceberg_compactor_manager,
221 compactor_streams_change_tx,
222 metrics,
223 }),
224 compactor_streams_change_rx,
225 )
226 }
227
228 pub fn compaction_stat_loop(
229 manager: Arc<Self>,
230 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
231 ) -> (JoinHandle<()>, Sender<()>) {
232 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
233 let join_handle = tokio::spawn(async move {
234 loop {
235 tokio::select! {
236 Some(stat) = rx.recv() => {
237 manager.update_iceberg_commit_info(stat);
238 },
239 _ = &mut shutdown_rx => {
240 tracing::info!("Iceberg compaction manager is stopped");
241 return;
242 }
243 }
244 }
245 });
246
247 (join_handle, shutdown_tx)
248 }
249
250 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
251 let mut guard = self.inner.write();
252
253 let IcebergSinkCompactionUpdate {
254 sink_id,
255 compaction_interval,
256 } = msg;
257
258 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
260 count: 0,
261 next_compaction_time: Some(
262 Instant::now() + std::time::Duration::from_secs(compaction_interval),
263 ),
264 compaction_interval,
265 });
266
267 commit_info.increase_count();
268 if commit_info.compaction_interval != compaction_interval {
269 commit_info.update_compaction_interval(compaction_interval);
270 }
271 }
272
273 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
276 let now = Instant::now();
277 let mut guard = self.inner.write();
278 guard
279 .iceberg_commits
280 .iter_mut()
281 .filter(|(_, commit_info)| {
282 commit_info.count > 0
283 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
284 next_compaction_time <= now
285 } else {
286 false
287 }
288 })
289 .sorted_by(|a, b| {
290 b.1.count
291 .cmp(&a.1.count)
292 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
293 })
294 .take(n)
295 .map(|(sink_id, commit_info)| {
296 let handle = IcebergCompactionHandle::new(
298 *sink_id,
299 self.inner.clone(),
300 self.metadata_manager.clone(),
301 commit_info.clone(),
302 );
303
304 commit_info.set_processing();
305
306 handle
307 })
308 .collect::<Vec<_>>()
309 }
310
311 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
312 let mut guard = self.inner.write();
313 guard.iceberg_commits.remove(&sink_id);
314 }
315
316 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
317 let mut sinks = self
318 .metadata_manager
319 .catalog_controller
320 .get_sink_by_ids(vec![sink_id.sink_id as i32])
321 .await?;
322 if sinks.is_empty() {
323 bail!("Sink not found: {}", sink_id.sink_id);
324 }
325 let prost_sink_catalog: PbSink = sinks.remove(0);
326 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
327 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
328 Ok(param)
329 }
330
331 pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
332 let sink_param = self.get_sink_param(sink_id).await?;
333 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
334 Ok(iceberg_config)
335 }
336
337 pub fn add_compactor_stream(
338 &self,
339 context_id: u32,
340 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
341 ) {
342 self.compactor_streams_change_tx
343 .send((context_id, req_stream))
344 .unwrap();
345 }
346
347 pub fn iceberg_compaction_event_loop(
348 iceberg_compaction_manager: Arc<Self>,
349 compactor_streams_change_rx: UnboundedReceiver<(
350 u32,
351 Streaming<SubscribeIcebergCompactionEventRequest>,
352 )>,
353 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
354 let mut join_handle_vec = Vec::default();
355
356 let iceberg_compaction_event_handler =
357 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
358
359 let iceberg_compaction_event_dispatcher =
360 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
361
362 let event_loop = IcebergCompactionEventLoop::new(
363 iceberg_compaction_event_dispatcher,
364 iceberg_compaction_manager.metrics.clone(),
365 compactor_streams_change_rx,
366 );
367
368 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
369 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
370
371 join_handle_vec
372 }
373
374 pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
378 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
379 let join_handle = tokio::spawn(async move {
380 const GC_LOOP_INTERVAL_SECS: u64 = 3600;
382 let mut interval =
383 tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
384
385 loop {
386 tokio::select! {
387 _ = interval.tick() => {
388 if let Err(e) = manager.perform_gc_operations().await {
389 tracing::error!(error = ?e.as_report(), "GC operations failed");
390 }
391 },
392 _ = &mut shutdown_rx => {
393 tracing::info!("Iceberg GC loop is stopped");
394 return;
395 }
396 }
397 }
398 });
399
400 (join_handle, shutdown_tx)
401 }
402
403 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
406 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
407
408 let iceberg_config = self.load_iceberg_config(&sink_id).await?;
410 let initial_table = iceberg_config.load_table().await?;
411 let initial_snapshot_id = initial_table
412 .metadata()
413 .current_snapshot()
414 .map(|s| s.snapshot_id())
415 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
417
418 let compactor = self
420 .iceberg_compactor_manager
421 .next_compactor()
422 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
423
424 let task_id = self
426 .env
427 .hummock_seq
428 .next_interval("compaction_task", 1)
429 .await?;
430
431 let sink_param = self.get_sink_param(&sink_id).await?;
432
433 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
434 task_id,
435 props: sink_param.properties,
436 task_type: TaskType::FullCompaction as i32, }))?;
438
439 tracing::info!(
440 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
441 sink_id.sink_id,
442 task_id
443 );
444
445 self.wait_for_compaction_completion(
446 &sink_id,
447 iceberg_config,
448 initial_snapshot_id,
449 initial_timestamp,
450 task_id,
451 )
452 .await?;
453
454 Ok(task_id)
455 }
456
457 async fn wait_for_compaction_completion(
458 &self,
459 sink_id: &SinkId,
460 iceberg_config: IcebergConfig,
461 initial_snapshot_id: i64,
462 initial_timestamp: i64,
463 task_id: u64,
464 ) -> MetaResult<()> {
465 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
466 const MAX_POLL_INTERVAL_SECS: u64 = 60;
467 const MAX_WAIT_TIME_SECS: u64 = 1800;
468 const BACKOFF_MULTIPLIER: f64 = 1.5;
469
470 let mut elapsed_time = 0;
471 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
472
473 let cow = should_enable_iceberg_cow(
474 iceberg_config.r#type.as_str(),
475 iceberg_config.write_mode.as_str(),
476 );
477
478 while elapsed_time < MAX_WAIT_TIME_SECS {
479 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
480 tokio::time::sleep(poll_interval).await;
481 elapsed_time += current_interval_secs;
482
483 let current_table = iceberg_config.load_table().await?;
484
485 let metadata = current_table.metadata();
486 let new_snapshots: Vec<_> = metadata
487 .snapshots()
488 .filter(|snapshot| {
489 let snapshot_timestamp = snapshot.timestamp_ms();
490 let snapshot_id = snapshot.snapshot_id();
491 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
492 })
493 .collect();
494
495 for snapshot in new_snapshots {
496 let summary = snapshot.summary();
497 if cow {
498 if matches!(summary.operation, Operation::Overwrite) {
499 return Ok(());
500 }
501 } else if matches!(summary.operation, Operation::Replace) {
502 return Ok(());
503 }
504 }
505
506 current_interval_secs = std::cmp::min(
507 MAX_POLL_INTERVAL_SECS,
508 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
509 );
510 }
511
512 Err(anyhow!(
513 "Compaction did not complete within {} seconds for sink {} (task_id={})",
514 MAX_WAIT_TIME_SECS,
515 sink_id.sink_id,
516 task_id
517 )
518 .into())
519 }
520
521 async fn perform_gc_operations(&self) -> MetaResult<()> {
522 let sink_ids = {
523 let guard = self.inner.read();
524 guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
525 };
526
527 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
528
529 for sink_id in sink_ids {
530 if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
531 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
532 }
533 }
534
535 tracing::info!("GC operations completed");
536 Ok(())
537 }
538
539 pub async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
540 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
542
543 let iceberg_config = self.load_iceberg_config(sink_id).await?;
544 if !iceberg_config.enable_snapshot_expiration {
545 return Ok(());
546 }
547
548 let catalog = iceberg_config.create_catalog().await?;
549 let table = catalog
550 .load_table(&iceberg_config.full_table_name()?)
551 .await
552 .map_err(|e| SinkError::Iceberg(e.into()))?;
553
554 let metadata = table.metadata();
555 let mut snapshots = metadata.snapshots().collect_vec();
556 snapshots.sort_by_key(|s| s.timestamp_ms());
557
558 let default_snapshot_expiration_timestamp_ms = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
559
560 let snapshot_expiration_timestamp_ms =
561 match iceberg_config.snapshot_expiration_timestamp_ms(now) {
562 Some(timestamp) => timestamp,
563 None => default_snapshot_expiration_timestamp_ms,
564 };
565
566 if snapshots.is_empty()
567 || snapshots.first().unwrap().timestamp_ms() > snapshot_expiration_timestamp_ms
568 {
569 return Ok(());
571 }
572
573 tracing::info!(
574 catalog_name = iceberg_config.catalog_name(),
575 table_name = iceberg_config.full_table_name()?.to_string(),
576 sink_id = sink_id.sink_id,
577 snapshots_len = snapshots.len(),
578 snapshot_expiration_timestamp_ms = snapshot_expiration_timestamp_ms,
579 snapshot_expiration_retain_last = ?iceberg_config.snapshot_expiration_retain_last,
580 clear_expired_files = ?iceberg_config.snapshot_expiration_clear_expired_files,
581 clear_expired_meta_data = ?iceberg_config.snapshot_expiration_clear_expired_meta_data,
582 "try trigger snapshots expiration",
583 );
584
585 let tx = Transaction::new(&table);
586
587 let mut expired_snapshots = tx.expire_snapshot();
588
589 expired_snapshots = expired_snapshots.expire_older_than(snapshot_expiration_timestamp_ms);
590
591 if let Some(retain_last) = iceberg_config.snapshot_expiration_retain_last {
592 expired_snapshots = expired_snapshots.retain_last(retain_last);
593 }
594
595 expired_snapshots = expired_snapshots
596 .clear_expired_files(iceberg_config.snapshot_expiration_clear_expired_files);
597
598 expired_snapshots = expired_snapshots
599 .clear_expired_meta_data(iceberg_config.snapshot_expiration_clear_expired_meta_data);
600
601 let tx = expired_snapshots
602 .apply()
603 .await
604 .map_err(|e| SinkError::Iceberg(e.into()))?;
605
606 tx.commit(catalog.as_ref())
607 .await
608 .map_err(|e| SinkError::Iceberg(e.into()))?;
609
610 tracing::info!(
611 catalog_name = iceberg_config.catalog_name(),
612 table_name = iceberg_config.full_table_name()?.to_string(),
613 sink_id = sink_id.sink_id,
614 "Expired snapshots for iceberg table",
615 );
616
617 Ok(())
618 }
619}