risingwave_storage/hummock/compactor/iceberg_compaction/
report.rs1use std::collections::VecDeque;
16use std::time::SystemTime;
17
18use risingwave_pb::iceberg_compaction::{
19 SubscribeIcebergCompactionEventRequest, subscribe_iceberg_compaction_event_request,
20};
21use thiserror_ext::AsReport;
22use tokio::sync::mpsc;
23
24use super::TaskKey;
25
26#[derive(Debug)]
27pub(crate) struct IcebergPlanCompletion {
28 pub(crate) task_key: TaskKey,
29 pub(crate) error_message: Option<String>,
30}
31
32pub(crate) type IcebergTaskReport = subscribe_iceberg_compaction_event_request::ReportTask;
33
34pub(crate) enum ReportSendResult {
35 Sent,
36 RestartStream,
37}
38
39pub(crate) struct IcebergTaskTracker {
40 sink_id: u32,
41 remaining_plans: usize,
42 successful_plans: usize,
43 first_error: Option<String>,
44}
45
46impl IcebergTaskTracker {
47 pub(crate) fn new(sink_id: u32, remaining_plans: usize) -> Self {
48 Self {
49 sink_id,
50 remaining_plans,
51 successful_plans: 0,
52 first_error: None,
53 }
54 }
55
56 pub(crate) fn record_completion(&mut self, error_message: Option<String>) {
57 debug_assert!(self.remaining_plans > 0);
58 self.remaining_plans -= 1;
59 if let Some(error_message) = error_message {
60 if self.first_error.is_none() {
61 self.first_error = Some(error_message);
62 }
63 } else {
64 self.successful_plans += 1;
65 }
66 }
67
68 pub(crate) fn is_finished(&self) -> bool {
69 self.remaining_plans == 0
70 }
71
72 pub(crate) fn into_report(self, task_id: u64) -> IcebergTaskReport {
73 let error_message = if self.successful_plans > 0 {
74 None
75 } else {
76 Some(
77 self.first_error
78 .unwrap_or_else(|| "All admitted iceberg compaction plans failed".to_owned()),
79 )
80 };
81 build_iceberg_task_report(task_id, self.sink_id, error_message)
82 }
83}
84
85pub(crate) fn build_iceberg_task_report(
86 task_id: u64,
87 sink_id: u32,
88 error_message: Option<String>,
89) -> IcebergTaskReport {
90 subscribe_iceberg_compaction_event_request::ReportTask {
91 task_id,
92 sink_id,
93 status: if error_message.is_some() {
94 subscribe_iceberg_compaction_event_request::report_task::Status::Failed as i32
95 } else {
96 subscribe_iceberg_compaction_event_request::report_task::Status::Success as i32
97 },
98 error_message,
99 }
100}
101
102pub(crate) fn send_iceberg_task_report(
103 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
104 report_event: IcebergTaskReport,
105) -> Result<(), IcebergTaskReport> {
106 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
107 event: Some(
108 subscribe_iceberg_compaction_event_request::Event::ReportTask(report_event.clone()),
109 ),
110 create_at: SystemTime::now()
111 .duration_since(std::time::UNIX_EPOCH)
112 .expect("Clock may have gone backwards")
113 .as_millis() as u64,
114 }) {
115 tracing::warn!(
116 error = %e.as_report(),
117 task_id = report_event.task_id,
118 sink_id = report_event.sink_id,
119 "Failed to report iceberg compaction task result - will retry on stream restart"
120 );
121 return Err(report_event);
122 }
123
124 Ok(())
125}
126
127pub(crate) fn send_or_buffer_iceberg_task_report(
128 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
129 pending_task_reports: &mut VecDeque<IcebergTaskReport>,
130 report: IcebergTaskReport,
131) -> ReportSendResult {
132 if let Err(report) = send_iceberg_task_report(request_sender, report) {
133 pending_task_reports.push_back(report);
134 return ReportSendResult::RestartStream;
135 }
136 ReportSendResult::Sent
137}
138
139pub(crate) fn flush_pending_iceberg_task_reports(
140 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
141 pending_task_reports: &mut VecDeque<IcebergTaskReport>,
142) -> ReportSendResult {
143 while let Some(report_event) = pending_task_reports.pop_front() {
144 if let Err(report_event) = send_iceberg_task_report(request_sender, report_event) {
145 pending_task_reports.push_front(report_event);
146 return ReportSendResult::RestartStream;
147 }
148 }
149 ReportSendResult::Sent
150}
151
152#[cfg(test)]
153mod tests {
154 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request;
155
156 use super::*;
157
158 #[test]
159 fn test_send_iceberg_task_report_returns_payload_on_send_failure() {
160 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
161 drop(rx);
162
163 let report = build_iceberg_task_report(7, 9, Some("send failure".to_owned()));
164 let failed_report = send_iceberg_task_report(&tx, report.clone()).unwrap_err();
165
166 assert_eq!(failed_report.task_id, report.task_id);
167 assert_eq!(failed_report.sink_id, report.sink_id);
168 assert_eq!(failed_report.error_message, report.error_message);
169 }
170
171 #[test]
172 fn test_build_iceberg_task_result_partial_enqueue_is_success_if_admitted_plan_succeeds() {
173 let mut tracker = IcebergTaskTracker::new(9, 1);
174 tracker.record_completion(None);
175
176 let report = tracker.into_report(7);
177
178 assert_eq!(
179 report.status,
180 subscribe_iceberg_compaction_event_request::report_task::Status::Success as i32
181 );
182 assert!(report.error_message.is_none());
183 }
184
185 #[test]
186 fn test_build_iceberg_task_result_fails_if_all_admitted_plans_fail() {
187 let mut tracker = IcebergTaskTracker::new(9, 2);
188 tracker.record_completion(Some("first failure".to_owned()));
189 tracker.record_completion(Some("second failure".to_owned()));
190
191 let report = tracker.into_report(7);
192
193 assert_eq!(
194 report.status,
195 subscribe_iceberg_compaction_event_request::report_task::Status::Failed as i32
196 );
197 assert_eq!(report.error_message.as_deref(), Some("first failure"));
198 }
199}