risingwave_storage/hummock/compactor/iceberg_compaction/
report.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::SystemTime;
17
18use risingwave_pb::iceberg_compaction::{
19    SubscribeIcebergCompactionEventRequest, subscribe_iceberg_compaction_event_request,
20};
21use thiserror_ext::AsReport;
22use tokio::sync::mpsc;
23
24use super::TaskKey;
25
26#[derive(Debug)]
27pub(crate) struct IcebergPlanCompletion {
28    pub(crate) task_key: TaskKey,
29    pub(crate) error_message: Option<String>,
30}
31
32pub(crate) type IcebergTaskReport = subscribe_iceberg_compaction_event_request::ReportTask;
33
34pub(crate) enum ReportSendResult {
35    Sent,
36    RestartStream,
37}
38
39pub(crate) struct IcebergTaskTracker {
40    sink_id: u32,
41    remaining_plans: usize,
42    successful_plans: usize,
43    first_error: Option<String>,
44}
45
46impl IcebergTaskTracker {
47    pub(crate) fn new(sink_id: u32, remaining_plans: usize) -> Self {
48        Self {
49            sink_id,
50            remaining_plans,
51            successful_plans: 0,
52            first_error: None,
53        }
54    }
55
56    pub(crate) fn record_completion(&mut self, error_message: Option<String>) {
57        debug_assert!(self.remaining_plans > 0);
58        self.remaining_plans -= 1;
59        if let Some(error_message) = error_message {
60            if self.first_error.is_none() {
61                self.first_error = Some(error_message);
62            }
63        } else {
64            self.successful_plans += 1;
65        }
66    }
67
68    pub(crate) fn is_finished(&self) -> bool {
69        self.remaining_plans == 0
70    }
71
72    pub(crate) fn into_report(self, task_id: u64) -> IcebergTaskReport {
73        let error_message = if self.successful_plans > 0 {
74            None
75        } else {
76            Some(
77                self.first_error
78                    .unwrap_or_else(|| "All admitted iceberg compaction plans failed".to_owned()),
79            )
80        };
81        build_iceberg_task_report(task_id, self.sink_id, error_message)
82    }
83}
84
85pub(crate) fn build_iceberg_task_report(
86    task_id: u64,
87    sink_id: u32,
88    error_message: Option<String>,
89) -> IcebergTaskReport {
90    subscribe_iceberg_compaction_event_request::ReportTask {
91        task_id,
92        sink_id,
93        status: if error_message.is_some() {
94            subscribe_iceberg_compaction_event_request::report_task::Status::Failed as i32
95        } else {
96            subscribe_iceberg_compaction_event_request::report_task::Status::Success as i32
97        },
98        error_message,
99    }
100}
101
102pub(crate) fn send_iceberg_task_report(
103    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
104    report_event: IcebergTaskReport,
105) -> Result<(), IcebergTaskReport> {
106    if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
107        event: Some(
108            subscribe_iceberg_compaction_event_request::Event::ReportTask(report_event.clone()),
109        ),
110        create_at: SystemTime::now()
111            .duration_since(std::time::UNIX_EPOCH)
112            .expect("Clock may have gone backwards")
113            .as_millis() as u64,
114    }) {
115        tracing::warn!(
116            error = %e.as_report(),
117            task_id = report_event.task_id,
118            sink_id = report_event.sink_id,
119            "Failed to report iceberg compaction task result - will retry on stream restart"
120        );
121        return Err(report_event);
122    }
123
124    Ok(())
125}
126
127pub(crate) fn send_or_buffer_iceberg_task_report(
128    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
129    pending_task_reports: &mut VecDeque<IcebergTaskReport>,
130    report: IcebergTaskReport,
131) -> ReportSendResult {
132    if let Err(report) = send_iceberg_task_report(request_sender, report) {
133        pending_task_reports.push_back(report);
134        return ReportSendResult::RestartStream;
135    }
136    ReportSendResult::Sent
137}
138
139pub(crate) fn flush_pending_iceberg_task_reports(
140    request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
141    pending_task_reports: &mut VecDeque<IcebergTaskReport>,
142) -> ReportSendResult {
143    while let Some(report_event) = pending_task_reports.pop_front() {
144        if let Err(report_event) = send_iceberg_task_report(request_sender, report_event) {
145            pending_task_reports.push_front(report_event);
146            return ReportSendResult::RestartStream;
147        }
148    }
149    ReportSendResult::Sent
150}
151
152#[cfg(test)]
153mod tests {
154    use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request;
155
156    use super::*;
157
158    #[test]
159    fn test_send_iceberg_task_report_returns_payload_on_send_failure() {
160        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
161        drop(rx);
162
163        let report = build_iceberg_task_report(7, 9, Some("send failure".to_owned()));
164        let failed_report = send_iceberg_task_report(&tx, report.clone()).unwrap_err();
165
166        assert_eq!(failed_report.task_id, report.task_id);
167        assert_eq!(failed_report.sink_id, report.sink_id);
168        assert_eq!(failed_report.error_message, report.error_message);
169    }
170
171    #[test]
172    fn test_build_iceberg_task_result_partial_enqueue_is_success_if_admitted_plan_succeeds() {
173        let mut tracker = IcebergTaskTracker::new(9, 1);
174        tracker.record_completion(None);
175
176        let report = tracker.into_report(7);
177
178        assert_eq!(
179            report.status,
180            subscribe_iceberg_compaction_event_request::report_task::Status::Success as i32
181        );
182        assert!(report.error_message.is_none());
183    }
184
185    #[test]
186    fn test_build_iceberg_task_result_fails_if_all_admitted_plans_fail() {
187        let mut tracker = IcebergTaskTracker::new(9, 2);
188        tracker.record_completion(Some("first failure".to_owned()));
189        tracker.record_completion(Some("second failure".to_owned()));
190
191        let report = tracker.into_report(7);
192
193        assert_eq!(
194            report.status,
195            subscribe_iceberg_compaction_event_request::report_task::Status::Failed as i32
196        );
197        assert_eq!(report.error_message.as_deref(), Some("first failure"));
198    }
199}