risingwave_meta/manager/iceberg_compaction/
manual.rs1use anyhow::anyhow;
16use risingwave_connector::sink::catalog::SinkId;
17use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
18use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
19use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
20use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
21use tokio::sync::oneshot;
22
23use super::*;
24use crate::hummock::sequence::next_compaction_task_id;
25
26impl IcebergCompactionManager {
27 fn register_manual_task_waiter(
28 &self,
29 task_id: u64,
30 ) -> MetaResult<oneshot::Receiver<MetaResult<()>>> {
31 let (tx, rx) = oneshot::channel();
32 match self.inner.write().manual_task_waiters.entry(task_id) {
33 std::collections::hash_map::Entry::Vacant(entry) => {
34 entry.insert(tx);
35 Ok(rx)
36 }
37 std::collections::hash_map::Entry::Occupied(_) => Err(anyhow!(
38 "manual iceberg compaction waiter already exists for task_id={}",
39 task_id
40 )
41 .into()),
42 }
43 }
44
45 fn clear_manual_task_waiter(&self, task_id: u64) {
46 self.inner.write().manual_task_waiters.remove(&task_id);
47 }
48
49 pub(super) fn complete_manual_task_if_any(&self, report: &IcebergReportTask) -> bool {
50 let Some(waiter) = self
51 .inner
52 .write()
53 .manual_task_waiters
54 .remove(&report.task_id)
55 else {
56 return false;
57 };
58
59 let status = IcebergReportTaskStatus::try_from(report.status)
60 .unwrap_or(IcebergReportTaskStatus::Unspecified);
61 let result = match status {
62 IcebergReportTaskStatus::Success => Ok(()),
63 IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
64 let message = report
65 .error_message
66 .clone()
67 .unwrap_or_else(|| "manual iceberg compaction failed".to_owned());
68 Err(anyhow!(
69 "Manual iceberg compaction task {} for sink {} failed: {}",
70 report.task_id,
71 report.sink_id,
72 message
73 )
74 .into())
75 }
76 };
77
78 let _ = waiter.send(result);
79 true
80 }
81
82 pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
83 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
84
85 let compactor = self
86 .iceberg_compactor_manager
87 .next_compactor()
88 .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
89
90 let task_id = next_compaction_task_id(&self.env).await?;
91 let sink_param = self.get_sink_param(sink_id).await?;
92 let waiter = self.register_manual_task_waiter(task_id)?;
93
94 if let Err(error) =
95 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
96 task_id,
97 sink_id: sink_id.as_raw_id(),
98 props: sink_param.properties,
99 task_type: TaskType::Full as i32,
100 }))
101 {
102 self.clear_manual_task_waiter(task_id);
103 return Err(error);
104 }
105
106 tracing::info!(
107 "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
108 sink_id,
109 task_id
110 );
111
112 self.wait_for_compaction_completion(&sink_id, task_id, waiter)
113 .await?;
114
115 Ok(task_id)
116 }
117
118 async fn wait_for_compaction_completion(
119 &self,
120 sink_id: &SinkId,
121 task_id: u64,
122 waiter: oneshot::Receiver<MetaResult<()>>,
123 ) -> MetaResult<()> {
124 let _cleanup_guard =
125 scopeguard::guard(task_id, |task_id| self.clear_manual_task_waiter(task_id));
126
127 match tokio::time::timeout(self.report_timeout(), waiter).await {
128 Ok(Ok(result)) => result,
129 Ok(Err(_)) => Err(anyhow!(
130 "Manual iceberg compaction waiter dropped unexpectedly for sink {} (task_id={})",
131 sink_id,
132 task_id
133 )
134 .into()),
135 Err(_) => Err(anyhow!(
136 "Iceberg compaction did not report completion within {} seconds for sink {} (task_id={})",
137 self.report_timeout().as_secs(),
138 sink_id,
139 task_id
140 )
141 .into()),
142 }
143 }
144}