1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use anyhow::anyhow;
20use iceberg::spec::Operation;
21use iceberg::transaction::Transaction;
22use itertools::Itertools;
23use parking_lot::RwLock;
24use risingwave_common::bail;
25use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
26use risingwave_connector::sink::catalog::{SinkCatalog, SinkId, SinkType};
27use risingwave_connector::sink::iceberg::{IcebergConfig, should_enable_iceberg_cow};
28use risingwave_connector::sink::{SinkError, SinkParam};
29use risingwave_pb::catalog::PbSink;
30use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
31use risingwave_pb::iceberg_compaction::{
32 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
33};
34use thiserror_ext::AsReport;
35use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
36use tokio::sync::oneshot::Sender;
37use tokio::task::JoinHandle;
38use tonic::Streaming;
39
40use super::MetaSrvEnv;
41use crate::MetaResult;
42use crate::hummock::{
43 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
44 IcebergCompactor, IcebergCompactorManagerRef,
45};
46use crate::manager::MetadataManager;
47use crate::rpc::metrics::MetaMetrics;
48
49pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
50
51type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53type CompactorChangeRx =
54 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
55
56#[derive(Debug, Clone)]
57struct CommitInfo {
58 count: usize,
59 next_compaction_time: Option<Instant>,
60 compaction_interval: u64,
61}
62
63impl CommitInfo {
64 fn set_processing(&mut self) {
65 self.count = 0;
66 self.next_compaction_time.take();
68 }
69
70 fn initialize(&mut self) {
71 self.count = 0;
72 self.next_compaction_time =
73 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
74 }
75
76 fn replace(&mut self, commit_info: CommitInfo) {
77 self.count = commit_info.count;
78 self.next_compaction_time = commit_info.next_compaction_time;
79 self.compaction_interval = commit_info.compaction_interval;
80 }
81
82 fn increase_count(&mut self) {
83 self.count += 1;
84 }
85
86 fn update_compaction_interval(&mut self, compaction_interval: u64) {
87 self.compaction_interval = compaction_interval;
88
89 self.next_compaction_time =
91 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
92 }
93}
94
95pub struct IcebergCompactionHandle {
96 sink_id: SinkId,
97 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
98 metadata_manager: MetadataManager,
99 handle_success: bool,
100
101 commit_info: CommitInfo,
103}
104
105impl IcebergCompactionHandle {
106 fn new(
107 sink_id: SinkId,
108 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
109 metadata_manager: MetadataManager,
110 commit_info: CommitInfo,
111 ) -> Self {
112 Self {
113 sink_id,
114 inner,
115 metadata_manager,
116 handle_success: false,
117 commit_info,
118 }
119 }
120
121 pub async fn send_compact_task(
122 mut self,
123 compactor: Arc<IcebergCompactor>,
124 task_id: u64,
125 ) -> MetaResult<()> {
126 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
127 let prost_sink_catalog: PbSink = self
128 .metadata_manager
129 .catalog_controller
130 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
131 .await?
132 .remove(0);
133 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
134 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
135 let task_type: TaskType = match param.sink_type {
136 SinkType::AppendOnly | SinkType::ForceAppendOnly => TaskType::SmallDataFileCompaction,
137
138 _ => TaskType::FullCompaction,
139 };
140 let result =
141 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
142 task_id,
143 props: param.properties,
144 task_type: task_type as i32,
145 }));
146
147 if result.is_ok() {
148 self.handle_success = true;
149 }
150
151 result
152 }
153
154 pub fn sink_id(&self) -> SinkId {
155 self.sink_id
156 }
157}
158
159impl Drop for IcebergCompactionHandle {
160 fn drop(&mut self) {
161 if self.handle_success {
162 let mut guard = self.inner.write();
163 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
164 commit_info.initialize();
165 }
166 } else {
167 let mut guard = self.inner.write();
172 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
173 commit_info.replace(self.commit_info.clone());
174 }
175 }
176 }
177}
178
179struct IcebergCompactionManagerInner {
180 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
181}
182
183pub struct IcebergCompactionManager {
184 pub env: MetaSrvEnv,
185 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
186
187 metadata_manager: MetadataManager,
188 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
189
190 compactor_streams_change_tx: CompactorChangeTx,
191
192 pub metrics: Arc<MetaMetrics>,
193}
194
195impl IcebergCompactionManager {
196 pub fn build(
197 env: MetaSrvEnv,
198 metadata_manager: MetadataManager,
199 iceberg_compactor_manager: IcebergCompactorManagerRef,
200 metrics: Arc<MetaMetrics>,
201 ) -> (Arc<Self>, CompactorChangeRx) {
202 let (compactor_streams_change_tx, compactor_streams_change_rx) =
203 tokio::sync::mpsc::unbounded_channel();
204 (
205 Arc::new(Self {
206 env,
207 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
208 iceberg_commits: HashMap::default(),
209 })),
210 metadata_manager,
211 iceberg_compactor_manager,
212 compactor_streams_change_tx,
213 metrics,
214 }),
215 compactor_streams_change_rx,
216 )
217 }
218
219 pub fn compaction_stat_loop(
220 manager: Arc<Self>,
221 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
222 ) -> (JoinHandle<()>, Sender<()>) {
223 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
224 let join_handle = tokio::spawn(async move {
225 loop {
226 tokio::select! {
227 Some(stat) = rx.recv() => {
228 manager.update_iceberg_commit_info(stat);
229 },
230 _ = &mut shutdown_rx => {
231 tracing::info!("Iceberg compaction manager is stopped");
232 return;
233 }
234 }
235 }
236 });
237
238 (join_handle, shutdown_tx)
239 }
240
241 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
242 let mut guard = self.inner.write();
243
244 let IcebergSinkCompactionUpdate {
245 sink_id,
246 compaction_interval,
247 } = msg;
248
249 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
251 count: 0,
252 next_compaction_time: Some(
253 Instant::now() + std::time::Duration::from_secs(compaction_interval),
254 ),
255 compaction_interval,
256 });
257
258 commit_info.increase_count();
259 if commit_info.compaction_interval != compaction_interval {
260 commit_info.update_compaction_interval(compaction_interval);
261 }
262 }
263
264 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
267 let now = Instant::now();
268 let mut guard = self.inner.write();
269 guard
270 .iceberg_commits
271 .iter_mut()
272 .filter(|(_, commit_info)| {
273 commit_info.count > 0
274 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
275 next_compaction_time <= now
276 } else {
277 false
278 }
279 })
280 .sorted_by(|a, b| {
281 b.1.count
282 .cmp(&a.1.count)
283 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
284 })
285 .take(n)
286 .map(|(sink_id, commit_info)| {
287 let handle = IcebergCompactionHandle::new(
289 *sink_id,
290 self.inner.clone(),
291 self.metadata_manager.clone(),
292 commit_info.clone(),
293 );
294
295 commit_info.set_processing();
296
297 handle
298 })
299 .collect::<Vec<_>>()
300 }
301
302 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
303 let mut guard = self.inner.write();
304 guard.iceberg_commits.remove(&sink_id);
305 }
306
307 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
308 let mut sinks = self
309 .metadata_manager
310 .catalog_controller
311 .get_sink_by_ids(vec![sink_id.sink_id as i32])
312 .await?;
313 if sinks.is_empty() {
314 bail!("Sink not found: {}", sink_id.sink_id);
315 }
316 let prost_sink_catalog: PbSink = sinks.remove(0);
317 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
318 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
319 Ok(param)
320 }
321
322 pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
323 let sink_param = self.get_sink_param(sink_id).await?;
324 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
325 Ok(iceberg_config)
326 }
327
328 pub fn add_compactor_stream(
329 &self,
330 context_id: u32,
331 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
332 ) {
333 self.compactor_streams_change_tx
334 .send((context_id, req_stream))
335 .unwrap();
336 }
337
338 pub fn iceberg_compaction_event_loop(
339 iceberg_compaction_manager: Arc<Self>,
340 compactor_streams_change_rx: UnboundedReceiver<(
341 u32,
342 Streaming<SubscribeIcebergCompactionEventRequest>,
343 )>,
344 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
345 let mut join_handle_vec = Vec::default();
346
347 let iceberg_compaction_event_handler =
348 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
349
350 let iceberg_compaction_event_dispatcher =
351 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
352
353 let event_loop = IcebergCompactionEventLoop::new(
354 iceberg_compaction_event_dispatcher,
355 iceberg_compaction_manager.metrics.clone(),
356 compactor_streams_change_rx,
357 );
358
359 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
360 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
361
362 join_handle_vec
363 }
364
365 pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
369 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
370 let join_handle = tokio::spawn(async move {
371 const GC_LOOP_INTERVAL_SECS: u64 = 3600;
373 let mut interval =
374 tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
375
376 loop {
377 tokio::select! {
378 _ = interval.tick() => {
379 if let Err(e) = manager.perform_gc_operations().await {
380 tracing::error!(error = ?e.as_report(), "GC operations failed");
381 }
382 },
383 _ = &mut shutdown_rx => {
384 tracing::info!("Iceberg GC loop is stopped");
385 return;
386 }
387 }
388 }
389 });
390
391 (join_handle, shutdown_tx)
392 }
393
394 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
397 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
398
399 let iceberg_config = self.load_iceberg_config(&sink_id).await?;
401 let initial_table = iceberg_config.load_table().await?;
402 let initial_snapshot_id = initial_table
403 .metadata()
404 .current_snapshot()
405 .map(|s| s.snapshot_id())
406 .unwrap_or(0); let initial_timestamp = chrono::Utc::now().timestamp_millis();
408
409 let compactor = self
411 .iceberg_compactor_manager
412 .next_compactor()
413 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
414
415 let task_id = self
417 .env
418 .hummock_seq
419 .next_interval("compaction_task", 1)
420 .await?;
421
422 let sink_param = self.get_sink_param(&sink_id).await?;
423
424 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
425 task_id,
426 props: sink_param.properties,
427 task_type: TaskType::FullCompaction as i32, }))?;
429
430 tracing::info!(
431 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
432 sink_id.sink_id,
433 task_id
434 );
435
436 self.wait_for_compaction_completion(
437 &sink_id,
438 iceberg_config,
439 initial_snapshot_id,
440 initial_timestamp,
441 task_id,
442 )
443 .await?;
444
445 Ok(task_id)
446 }
447
448 async fn wait_for_compaction_completion(
449 &self,
450 sink_id: &SinkId,
451 iceberg_config: IcebergConfig,
452 initial_snapshot_id: i64,
453 initial_timestamp: i64,
454 task_id: u64,
455 ) -> MetaResult<()> {
456 const INITIAL_POLL_INTERVAL_SECS: u64 = 2;
457 const MAX_POLL_INTERVAL_SECS: u64 = 60;
458 const MAX_WAIT_TIME_SECS: u64 = 1800;
459 const BACKOFF_MULTIPLIER: f64 = 1.5;
460
461 let mut elapsed_time = 0;
462 let mut current_interval_secs = INITIAL_POLL_INTERVAL_SECS;
463
464 let cow = should_enable_iceberg_cow(
465 iceberg_config.r#type.as_str(),
466 iceberg_config.write_mode.as_str(),
467 );
468
469 while elapsed_time < MAX_WAIT_TIME_SECS {
470 let poll_interval = std::time::Duration::from_secs(current_interval_secs);
471 tokio::time::sleep(poll_interval).await;
472 elapsed_time += current_interval_secs;
473
474 let current_table = iceberg_config.load_table().await?;
475
476 let metadata = current_table.metadata();
477 let new_snapshots: Vec<_> = metadata
478 .snapshots()
479 .filter(|snapshot| {
480 let snapshot_timestamp = snapshot.timestamp_ms();
481 let snapshot_id = snapshot.snapshot_id();
482 snapshot_timestamp > initial_timestamp && snapshot_id != initial_snapshot_id
483 })
484 .collect();
485
486 for snapshot in new_snapshots {
487 let summary = snapshot.summary();
488 if cow {
489 if matches!(summary.operation, Operation::Overwrite) {
490 return Ok(());
491 }
492 } else if matches!(summary.operation, Operation::Replace) {
493 return Ok(());
494 }
495 }
496
497 current_interval_secs = std::cmp::min(
498 MAX_POLL_INTERVAL_SECS,
499 ((current_interval_secs as f64) * BACKOFF_MULTIPLIER) as u64,
500 );
501 }
502
503 Err(anyhow!(
504 "Compaction did not complete within {} seconds for sink {} (task_id={})",
505 MAX_WAIT_TIME_SECS,
506 sink_id.sink_id,
507 task_id
508 )
509 .into())
510 }
511
512 async fn perform_gc_operations(&self) -> MetaResult<()> {
513 let sink_ids = {
514 let guard = self.inner.read();
515 guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
516 };
517
518 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
519
520 for sink_id in sink_ids {
521 if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
522 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
523 }
524 }
525
526 tracing::info!("GC operations completed");
527 Ok(())
528 }
529
530 async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
531 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000;
532 let now = chrono::Utc::now().timestamp_millis();
533 let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
534
535 let iceberg_config = self.load_iceberg_config(sink_id).await?;
536 if !iceberg_config.enable_snapshot_expiration {
537 return Ok(());
538 }
539
540 let catalog = iceberg_config.create_catalog().await?;
541 let table = catalog
542 .load_table(&iceberg_config.full_table_name()?)
543 .await
544 .map_err(|e| SinkError::Iceberg(e.into()))?;
545
546 let metadata = table.metadata();
547 let mut snapshots = metadata.snapshots().collect_vec();
548 snapshots.sort_by_key(|s| s.timestamp_ms());
549
550 if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
551 return Ok(());
553 }
554
555 tracing::info!(
556 "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
557 iceberg_config.catalog_name(),
558 iceberg_config.full_table_name()?,
559 sink_id.sink_id,
560 snapshots.len(),
561 );
562
563 let tx = Transaction::new(&table);
564
565 let expired_snapshots = tx
567 .expire_snapshot()
568 .clear_expired_files(true)
569 .clear_expired_meta_data(true);
570
571 let tx = expired_snapshots
572 .apply()
573 .await
574 .map_err(|e| SinkError::Iceberg(e.into()))?;
575 tx.commit(catalog.as_ref())
576 .await
577 .map_err(|e| SinkError::Iceberg(e.into()))?;
578
579 tracing::info!(
580 "Expired snapshots for iceberg catalog {} table {} sink-id {}",
581 iceberg_config.catalog_name(),
582 iceberg_config.full_table_name()?,
583 sink_id.sink_id,
584 );
585
586 Ok(())
587 }
588}