risingwave_meta/manager/iceberg_compaction/
manual.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_connector::sink::catalog::SinkId;
17use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
18use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
19use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
20use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
21use tokio::sync::oneshot;
22
23use super::*;
24use crate::hummock::sequence::next_compaction_task_id;
25
26impl IcebergCompactionManager {
27    fn register_manual_task_waiter(
28        &self,
29        task_id: u64,
30    ) -> MetaResult<oneshot::Receiver<MetaResult<()>>> {
31        let (tx, rx) = oneshot::channel();
32        match self.inner.write().manual_task_waiters.entry(task_id) {
33            std::collections::hash_map::Entry::Vacant(entry) => {
34                entry.insert(tx);
35                Ok(rx)
36            }
37            std::collections::hash_map::Entry::Occupied(_) => Err(anyhow!(
38                "manual iceberg compaction waiter already exists for task_id={}",
39                task_id
40            )
41            .into()),
42        }
43    }
44
45    fn clear_manual_task_waiter(&self, task_id: u64) {
46        self.inner.write().manual_task_waiters.remove(&task_id);
47    }
48
49    pub(super) fn complete_manual_task_if_any(&self, report: &IcebergReportTask) -> bool {
50        let Some(waiter) = self
51            .inner
52            .write()
53            .manual_task_waiters
54            .remove(&report.task_id)
55        else {
56            return false;
57        };
58
59        let status = IcebergReportTaskStatus::try_from(report.status)
60            .unwrap_or(IcebergReportTaskStatus::Unspecified);
61        let result = match status {
62            IcebergReportTaskStatus::Success => Ok(()),
63            IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
64                let message = report
65                    .error_message
66                    .clone()
67                    .unwrap_or_else(|| "manual iceberg compaction failed".to_owned());
68                Err(anyhow!(
69                    "Manual iceberg compaction task {} for sink {} failed: {}",
70                    report.task_id,
71                    report.sink_id,
72                    message
73                )
74                .into())
75            }
76        };
77
78        let _ = waiter.send(result);
79        true
80    }
81
82    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
83        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
84
85        let compactor = self
86            .iceberg_compactor_manager
87            .next_compactor()
88            .ok_or_else(|| anyhow!("No iceberg compactor available"))?;
89
90        let task_id = next_compaction_task_id(&self.env).await?;
91        let sink_param = self.get_sink_param(sink_id).await?;
92        let waiter = self.register_manual_task_waiter(task_id)?;
93
94        if let Err(error) =
95            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
96                task_id,
97                sink_id: sink_id.as_raw_id(),
98                props: sink_param.properties,
99                task_type: TaskType::Full as i32,
100            }))
101        {
102            self.clear_manual_task_waiter(task_id);
103            return Err(error);
104        }
105
106        tracing::info!(
107            "Manual compaction triggered for sink {} with task ID {}, waiting for completion...",
108            sink_id,
109            task_id
110        );
111
112        self.wait_for_compaction_completion(&sink_id, task_id, waiter)
113            .await?;
114
115        Ok(task_id)
116    }
117
118    async fn wait_for_compaction_completion(
119        &self,
120        sink_id: &SinkId,
121        task_id: u64,
122        waiter: oneshot::Receiver<MetaResult<()>>,
123    ) -> MetaResult<()> {
124        let _cleanup_guard =
125            scopeguard::guard(task_id, |task_id| self.clear_manual_task_waiter(task_id));
126
127        match tokio::time::timeout(self.report_timeout(), waiter).await {
128            Ok(Ok(result)) => result,
129            Ok(Err(_)) => Err(anyhow!(
130                "Manual iceberg compaction waiter dropped unexpectedly for sink {} (task_id={})",
131                sink_id,
132                task_id
133            )
134            .into()),
135            Err(_) => Err(anyhow!(
136                "Iceberg compaction did not report completion within {} seconds for sink {} (task_id={})",
137                self.report_timeout().as_secs(),
138                sink_id,
139                task_id
140            )
141            .into()),
142        }
143    }
144}