risingwave_meta/manager/iceberg_compaction/
manual.rs1use anyhow::anyhow;
16use risingwave_connector::sink::catalog::SinkId;
17use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
18use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
19
20use super::*;
21
22impl IcebergCompactionManager {
23 pub(super) fn complete_manual_task_waiter(
24 waiter: ManualCompactionWaiter,
25 report: &IcebergReportTask,
26 ) {
27 let status = IcebergReportTaskStatus::try_from(report.status)
28 .unwrap_or(IcebergReportTaskStatus::Unspecified);
29 let result = match status {
30 IcebergReportTaskStatus::Success => Ok(report.task_id),
31 IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
32 let message = report
33 .error_message
34 .clone()
35 .unwrap_or_else(|| "manual iceberg compaction failed".to_owned());
36 Err(anyhow!(
37 "Manual iceberg compaction task {} for sink {} failed: {}",
38 report.task_id,
39 report.sink_id,
40 message
41 )
42 .into())
43 }
44 };
45
46 let _ = waiter.send(result);
47 }
48
49 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
50 if self.iceberg_compactor_manager.compactor_num() == 0 {
53 return Err(anyhow!("No iceberg compactor available").into());
54 }
55
56 let waiter = self.start_manual_compaction(sink_id).await?;
57 let cleanup_guard = scopeguard::guard(sink_id, |sink_id| {
58 self.cancel_manual_compaction_waiter(sink_id)
59 });
60
61 tracing::info!(
62 "Manual compaction requested for sink {}, waiting for completion...",
63 sink_id
64 );
65
66 match waiter.await {
67 Ok(result) => {
68 let _ = scopeguard::ScopeGuard::into_inner(cleanup_guard);
69 result
70 }
71 Err(_) => {
72 let _ = scopeguard::ScopeGuard::into_inner(cleanup_guard);
73 Err(anyhow!(
74 "Manual iceberg compaction waiter dropped unexpectedly for sink {}",
75 sink_id,
76 )
77 .into())
78 }
79 }
80 }
81}