1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Instant;
18
19use iceberg::table::Table;
20use iceberg::transaction::Transaction;
21use itertools::Itertools;
22use parking_lot::RwLock;
23use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
24use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
25use risingwave_connector::sink::iceberg::IcebergConfig;
26use risingwave_connector::sink::{SinkError, SinkParam};
27use risingwave_pb::catalog::PbSink;
28use risingwave_pb::iceberg_compaction::{
29 IcebergCompactionTask, SubscribeIcebergCompactionEventRequest,
30};
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
33use tokio::sync::oneshot::Sender;
34use tokio::task::JoinHandle;
35use tonic::Streaming;
36
37use super::MetaSrvEnv;
38use crate::MetaResult;
39use crate::hummock::{
40 IcebergCompactionEventDispatcher, IcebergCompactionEventHandler, IcebergCompactionEventLoop,
41 IcebergCompactor, IcebergCompactorManagerRef,
42};
43use crate::manager::MetadataManager;
44use crate::rpc::metrics::MetaMetrics;
45
46pub type IcebergCompactionManagerRef = std::sync::Arc<IcebergCompactionManager>;
47
48type CompactorChangeTx = UnboundedSender<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
49
50type CompactorChangeRx =
51 UnboundedReceiver<(u32, Streaming<SubscribeIcebergCompactionEventRequest>)>;
52
53#[derive(Debug, Clone)]
54struct CommitInfo {
55 count: usize,
56 next_compaction_time: Option<Instant>,
57 compaction_interval: u64,
58}
59
60impl CommitInfo {
61 fn set_processing(&mut self) {
62 self.count = 0;
63 self.next_compaction_time.take();
65 }
66
67 fn initialize(&mut self) {
68 self.count = 0;
69 self.next_compaction_time =
70 Some(Instant::now() + std::time::Duration::from_secs(self.compaction_interval));
71 }
72
73 fn replace(&mut self, commit_info: CommitInfo) {
74 self.count = commit_info.count;
75 self.next_compaction_time = commit_info.next_compaction_time;
76 self.compaction_interval = commit_info.compaction_interval;
77 }
78
79 fn increase_count(&mut self) {
80 self.count += 1;
81 }
82
83 fn update_compaction_interval(&mut self, compaction_interval: u64) {
84 self.compaction_interval = compaction_interval;
85
86 self.next_compaction_time =
88 Some(Instant::now() + std::time::Duration::from_secs(compaction_interval));
89 }
90}
91
92pub struct IcebergCompactionHandle {
93 sink_id: SinkId,
94 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
95 metadata_manager: MetadataManager,
96 handle_success: bool,
97
98 commit_info: CommitInfo,
100}
101
102impl IcebergCompactionHandle {
103 fn new(
104 sink_id: SinkId,
105 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
106 metadata_manager: MetadataManager,
107 commit_info: CommitInfo,
108 ) -> Self {
109 Self {
110 sink_id,
111 inner,
112 metadata_manager,
113 handle_success: false,
114 commit_info,
115 }
116 }
117
118 pub async fn send_compact_task(
119 mut self,
120 compactor: Arc<IcebergCompactor>,
121 task_id: u64,
122 ) -> MetaResult<()> {
123 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
124 let prost_sink_catalog: PbSink = self
125 .metadata_manager
126 .catalog_controller
127 .get_sink_by_ids(vec![self.sink_id.sink_id as i32])
128 .await?
129 .remove(0);
130 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
131 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
132 let result =
133 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
134 task_id,
136 props: param.properties,
137 }));
138
139 if result.is_ok() {
140 self.handle_success = true;
141 }
142
143 result
144 }
145
146 pub fn sink_id(&self) -> SinkId {
147 self.sink_id
148 }
149}
150
151impl Drop for IcebergCompactionHandle {
152 fn drop(&mut self) {
153 if self.handle_success {
154 let mut guard = self.inner.write();
155 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
156 commit_info.initialize();
157 }
158 } else {
159 let mut guard = self.inner.write();
164 if let Some(commit_info) = guard.iceberg_commits.get_mut(&self.sink_id) {
165 commit_info.replace(self.commit_info.clone());
166 }
167 }
168 }
169}
170
171struct IcebergCompactionManagerInner {
172 pub iceberg_commits: HashMap<SinkId, CommitInfo>,
173}
174
175pub struct IcebergCompactionManager {
176 pub env: MetaSrvEnv,
177 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
178
179 metadata_manager: MetadataManager,
180 pub iceberg_compactor_manager: IcebergCompactorManagerRef,
181
182 compactor_streams_change_tx: CompactorChangeTx,
183
184 pub metrics: Arc<MetaMetrics>,
185}
186
187impl IcebergCompactionManager {
188 pub fn build(
189 env: MetaSrvEnv,
190 metadata_manager: MetadataManager,
191 iceberg_compactor_manager: IcebergCompactorManagerRef,
192 metrics: Arc<MetaMetrics>,
193 ) -> (Arc<Self>, CompactorChangeRx) {
194 let (compactor_streams_change_tx, compactor_streams_change_rx) =
195 tokio::sync::mpsc::unbounded_channel();
196 (
197 Arc::new(Self {
198 env,
199 inner: Arc::new(RwLock::new(IcebergCompactionManagerInner {
200 iceberg_commits: HashMap::default(),
201 })),
202 metadata_manager,
203 iceberg_compactor_manager,
204 compactor_streams_change_tx,
205 metrics,
206 }),
207 compactor_streams_change_rx,
208 )
209 }
210
211 pub fn compaction_stat_loop(
212 manager: Arc<Self>,
213 mut rx: UnboundedReceiver<IcebergSinkCompactionUpdate>,
214 ) -> (JoinHandle<()>, Sender<()>) {
215 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
216 let join_handle = tokio::spawn(async move {
217 loop {
218 tokio::select! {
219 Some(stat) = rx.recv() => {
220 manager.update_iceberg_commit_info(stat);
221 },
222 _ = &mut shutdown_rx => {
223 tracing::info!("Iceberg compaction manager is stopped");
224 return;
225 }
226 }
227 }
228 });
229
230 (join_handle, shutdown_tx)
231 }
232
233 pub fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
234 let mut guard = self.inner.write();
235
236 let IcebergSinkCompactionUpdate {
237 sink_id,
238 compaction_interval,
239 } = msg;
240
241 let commit_info = guard.iceberg_commits.entry(sink_id).or_insert(CommitInfo {
243 count: 0,
244 next_compaction_time: Some(
245 Instant::now() + std::time::Duration::from_secs(compaction_interval),
246 ),
247 compaction_interval,
248 });
249
250 commit_info.increase_count();
251 if commit_info.compaction_interval != compaction_interval {
252 commit_info.update_compaction_interval(compaction_interval);
253 }
254 }
255
256 pub fn get_top_n_iceberg_commit_sink_ids(&self, n: usize) -> Vec<IcebergCompactionHandle> {
259 let now = Instant::now();
260 let mut guard = self.inner.write();
261 guard
262 .iceberg_commits
263 .iter_mut()
264 .filter(|(_, commit_info)| {
265 commit_info.count > 0
266 && if let Some(next_compaction_time) = commit_info.next_compaction_time {
267 next_compaction_time <= now
268 } else {
269 false
270 }
271 })
272 .sorted_by(|a, b| {
273 b.1.count
274 .cmp(&a.1.count)
275 .then_with(|| b.1.next_compaction_time.cmp(&a.1.next_compaction_time))
276 })
277 .take(n)
278 .map(|(sink_id, commit_info)| {
279 let handle = IcebergCompactionHandle::new(
281 *sink_id,
282 self.inner.clone(),
283 self.metadata_manager.clone(),
284 commit_info.clone(),
285 );
286
287 commit_info.set_processing();
288
289 handle
290 })
291 .collect::<Vec<_>>()
292 }
293
294 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
295 let mut guard = self.inner.write();
296 guard.iceberg_commits.remove(&sink_id);
297 }
298
299 pub async fn get_sink_param(&self, sink_id: &SinkId) -> MetaResult<SinkParam> {
300 let prost_sink_catalog: PbSink = self
301 .metadata_manager
302 .catalog_controller
303 .get_sink_by_ids(vec![sink_id.sink_id as i32])
304 .await?
305 .remove(0);
306 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
307 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
308 Ok(param)
309 }
310
311 #[allow(dead_code)]
312 pub async fn load_iceberg_table(&self, sink_id: &SinkId) -> MetaResult<Table> {
313 let sink_param = self.get_sink_param(sink_id).await?;
314 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
315 let table = iceberg_config.load_table().await?;
316 Ok(table)
317 }
318
319 pub async fn load_iceberg_config(&self, sink_id: &SinkId) -> MetaResult<IcebergConfig> {
320 let sink_param = self.get_sink_param(sink_id).await?;
321 let iceberg_config = IcebergConfig::from_btreemap(sink_param.properties.clone())?;
322 Ok(iceberg_config)
323 }
324
325 pub fn add_compactor_stream(
326 &self,
327 context_id: u32,
328 req_stream: Streaming<SubscribeIcebergCompactionEventRequest>,
329 ) {
330 self.compactor_streams_change_tx
331 .send((context_id, req_stream))
332 .unwrap();
333 }
334
335 pub fn iceberg_compaction_event_loop(
336 iceberg_compaction_manager: Arc<Self>,
337 compactor_streams_change_rx: UnboundedReceiver<(
338 u32,
339 Streaming<SubscribeIcebergCompactionEventRequest>,
340 )>,
341 ) -> Vec<(JoinHandle<()>, Sender<()>)> {
342 let mut join_handle_vec = Vec::default();
343
344 let iceberg_compaction_event_handler =
345 IcebergCompactionEventHandler::new(iceberg_compaction_manager.clone());
346
347 let iceberg_compaction_event_dispatcher =
348 IcebergCompactionEventDispatcher::new(iceberg_compaction_event_handler);
349
350 let event_loop = IcebergCompactionEventLoop::new(
351 iceberg_compaction_event_dispatcher,
352 iceberg_compaction_manager.metrics.clone(),
353 compactor_streams_change_rx,
354 );
355
356 let (event_loop_join_handle, event_loop_shutdown_tx) = event_loop.run();
357 join_handle_vec.push((event_loop_join_handle, event_loop_shutdown_tx));
358
359 join_handle_vec
360 }
361
362 pub fn gc_loop(manager: Arc<Self>) -> (JoinHandle<()>, Sender<()>) {
366 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
367 let join_handle = tokio::spawn(async move {
368 const GC_LOOP_INTERVAL_SECS: u64 = 3600;
370 let mut interval =
371 tokio::time::interval(std::time::Duration::from_secs(GC_LOOP_INTERVAL_SECS));
372
373 loop {
374 tokio::select! {
375 _ = interval.tick() => {
376 if let Err(e) = manager.perform_gc_operations().await {
377 tracing::error!(error = ?e.as_report(), "GC operations failed");
378 }
379 },
380 _ = &mut shutdown_rx => {
381 tracing::info!("Iceberg GC loop is stopped");
382 return;
383 }
384 }
385 }
386 });
387
388 (join_handle, shutdown_tx)
389 }
390
391 async fn perform_gc_operations(&self) -> MetaResult<()> {
393 let sink_ids = {
395 let guard = self.inner.read();
396 guard.iceberg_commits.keys().cloned().collect::<Vec<_>>()
397 };
398
399 tracing::info!("Starting GC operations for {} tables", sink_ids.len());
400
401 for sink_id in sink_ids {
402 if let Err(e) = self.check_and_expire_snapshots(&sink_id).await {
403 tracing::error!(error = ?e.as_report(), "Failed to perform GC for sink {}", sink_id.sink_id);
405 }
406 }
407
408 tracing::info!("GC operations completed");
409 Ok(())
410 }
411
412 async fn check_and_expire_snapshots(&self, sink_id: &SinkId) -> MetaResult<()> {
414 const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 24 * 60 * 60 * 1000; let now = chrono::Utc::now().timestamp_millis();
417 let expired_older_than = now - MAX_SNAPSHOT_AGE_MS_DEFAULT;
418
419 let iceberg_config = self.load_iceberg_config(sink_id).await?;
420 if !iceberg_config.enable_snapshot_expiration {
421 return Ok(());
422 }
423
424 let catalog = iceberg_config.create_catalog().await?;
425 let table = catalog
426 .load_table(&iceberg_config.full_table_name()?)
427 .await
428 .map_err(|e| SinkError::Iceberg(e.into()))?;
429
430 let metadata = table.metadata();
431 let mut snapshots = metadata.snapshots().collect_vec();
432 snapshots.sort_by_key(|s| s.timestamp_ms());
433
434 if snapshots.is_empty() || snapshots.first().unwrap().timestamp_ms() > expired_older_than {
435 return Ok(());
437 }
438
439 tracing::info!(
440 "Catalog {} table {} sink-id {} has {} snapshots try trigger expiration",
441 iceberg_config.catalog_name(),
442 iceberg_config.full_table_name()?,
443 sink_id.sink_id,
444 snapshots.len(),
445 );
446
447 let tx = Transaction::new(&table);
448
449 let expired_snapshots = tx
451 .expire_snapshot()
452 .clear_expired_files(true)
453 .clear_expired_meta_data(true);
454
455 let tx = expired_snapshots
456 .apply()
457 .await
458 .map_err(|e| SinkError::Iceberg(e.into()))?;
459 tx.commit(catalog.as_ref())
460 .await
461 .map_err(|e| SinkError::Iceberg(e.into()))?;
462
463 tracing::info!(
464 "Expired snapshots for iceberg catalog {} table {} sink-id {}",
465 iceberg_config.catalog_name(),
466 iceberg_config.full_table_name()?,
467 sink_id.sink_id,
468 );
469
470 Ok(())
471 }
472}