risingwave_meta/manager/iceberg_compaction/
manual.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_connector::sink::catalog::SinkId;
17use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
18use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
19
20use super::*;
21
22impl IcebergCompactionManager {
23    pub(super) fn complete_manual_task_waiter(
24        waiter: ManualCompactionWaiter,
25        report: &IcebergReportTask,
26    ) {
27        let status = IcebergReportTaskStatus::try_from(report.status)
28            .unwrap_or(IcebergReportTaskStatus::Unspecified);
29        let result = match status {
30            IcebergReportTaskStatus::Success => Ok(report.task_id),
31            IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
32                let message = report
33                    .error_message
34                    .clone()
35                    .unwrap_or_else(|| "manual iceberg compaction failed".to_owned());
36                Err(anyhow!(
37                    "Manual iceberg compaction task {} for sink {} failed: {}",
38                    report.task_id,
39                    report.sink_id,
40                    message
41                )
42                .into())
43            }
44        };
45
46        let _ = waiter.send(result);
47    }
48
49    pub async fn trigger_manual_compaction(&self, sink_id: SinkId) -> MetaResult<u64> {
50        // Fast-fail before registering a waiter when no compactor can pull the
51        // scheduler task.
52        if self.iceberg_compactor_manager.compactor_num() == 0 {
53            return Err(anyhow!("No iceberg compactor available").into());
54        }
55
56        let waiter = self.start_manual_compaction(sink_id).await?;
57        let cleanup_guard = scopeguard::guard(sink_id, |sink_id| {
58            self.cancel_manual_compaction_waiter(sink_id)
59        });
60
61        tracing::info!(
62            "Manual compaction requested for sink {}, waiting for completion...",
63            sink_id
64        );
65
66        match waiter.await {
67            Ok(result) => {
68                let _ = scopeguard::ScopeGuard::into_inner(cleanup_guard);
69                result
70            }
71            Err(_) => {
72                let _ = scopeguard::ScopeGuard::into_inner(cleanup_guard);
73                Err(anyhow!(
74                    "Manual iceberg compaction waiter dropped unexpectedly for sink {}",
75                    sink_id,
76                )
77                .into())
78            }
79        }
80    }
81}