1pub use self::iceberg_compactor_runner::create_task_execution;
16pub(crate) use self::report::{
17 IcebergPlanCompletion, IcebergTaskReport, IcebergTaskTracker, ReportSendResult,
18 build_iceberg_task_report, flush_pending_iceberg_task_reports,
19 send_or_buffer_iceberg_task_report,
20};
21use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
22
23pub(crate) mod iceberg_compactor_runner;
24pub(crate) mod report;
25
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28
29use tokio::sync::Notify;
30
31pub(crate) type TaskKey = (u64, usize);
33
34#[derive(Debug, Clone)]
36pub struct IcebergTaskMeta {
37 pub task_id: u64,
38 pub plan_index: usize,
39 pub required_parallelism: u32,
41}
42
43#[derive(Debug)]
44pub struct PoppedIcebergTask {
45 pub meta: IcebergTaskMeta,
46 pub runner: Option<IcebergCompactionPlanRunner>,
47}
48
49impl IcebergTaskMeta {
50 fn key(&self) -> TaskKey {
51 (self.task_id, self.plan_index)
52 }
53}
54
55struct IcebergTaskQueueInner {
57 deque: VecDeque<IcebergTaskMeta>,
59 id_map: HashMap<TaskKey, u32>,
61 waiting_parallelism_sum: u32,
63 running_parallelism_sum: u32,
65 runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
67}
68
69pub struct IcebergTaskQueue {
83 inner: IcebergTaskQueueInner,
84 max_parallelism: u32,
86 pending_parallelism_budget: u32,
88 schedule_notify: Arc<Notify>,
90}
91
92#[derive(Debug, PartialEq, Eq)]
93pub enum PushResult {
94 Added,
95 RejectedCapacity,
97 RejectedTooLarge,
99 RejectedInvalidParallelism,
101 RejectedDuplicate,
103}
104
105impl IcebergTaskQueue {
106 pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
107 assert!(max_parallelism > 0, "max_parallelism must be > 0");
108 assert!(
109 pending_parallelism_budget >= max_parallelism,
110 "pending budget should allow at least one task"
111 );
112 Self {
113 inner: IcebergTaskQueueInner {
114 deque: VecDeque::new(),
115 id_map: HashMap::new(),
116 waiting_parallelism_sum: 0,
117 running_parallelism_sum: 0,
118 runners: HashMap::new(),
119 },
120 max_parallelism,
121 pending_parallelism_budget,
122 schedule_notify: Arc::new(Notify::new()),
123 }
124 }
125
126 pub async fn wait_schedulable(&self) -> bool {
131 if self.has_schedulable_tasks() {
133 return true;
134 }
135 self.schedule_notify.notified().await;
137 self.has_schedulable_tasks()
138 }
139
140 fn has_schedulable_tasks(&self) -> bool {
141 if let Some(front_task) = self.inner.deque.front() {
142 let available_parallelism = self
143 .max_parallelism
144 .saturating_sub(self.inner.running_parallelism_sum);
145 available_parallelism >= front_task.required_parallelism
146 } else {
147 false
148 }
149 }
150
151 fn notify_schedulable(&self) {
152 if self.has_schedulable_tasks() {
153 self.schedule_notify.notify_one();
154 }
155 }
156
157 pub fn running_parallelism_sum(&self) -> u32 {
158 self.inner.running_parallelism_sum
159 }
160
161 pub fn waiting_parallelism_sum(&self) -> u32 {
162 self.inner.waiting_parallelism_sum
163 }
164
165 fn available_parallelism(&self) -> u32 {
166 self.max_parallelism
167 .saturating_sub(self.inner.running_parallelism_sum)
168 }
169
170 pub fn push(
174 &mut self,
175 meta: IcebergTaskMeta,
176 runner: Option<IcebergCompactionPlanRunner>,
177 ) -> PushResult {
178 if meta.required_parallelism == 0 {
179 return PushResult::RejectedInvalidParallelism;
180 }
181 if meta.required_parallelism > self.max_parallelism {
182 return PushResult::RejectedTooLarge;
183 }
184
185 let key = meta.key();
186
187 if self.inner.id_map.contains_key(&key) {
189 return PushResult::RejectedDuplicate;
190 }
191
192 let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
193 if new_total > self.pending_parallelism_budget {
194 return PushResult::RejectedCapacity;
195 }
196
197 self.inner.id_map.insert(key, meta.required_parallelism);
198 self.inner.waiting_parallelism_sum = new_total;
199
200 if let Some(r) = runner {
201 self.inner.runners.insert(key, r);
202 }
203
204 self.inner.deque.push_back(meta);
205
206 self.notify_schedulable();
207 PushResult::Added
208 }
209
210 pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
215 let front = self.inner.deque.front()?;
216 if front.required_parallelism > self.available_parallelism() {
217 return None;
218 }
219
220 let meta = self.inner.deque.pop_front()?;
221 self.inner.waiting_parallelism_sum = self
222 .inner
223 .waiting_parallelism_sum
224 .saturating_sub(meta.required_parallelism);
225 self.inner.running_parallelism_sum = self
226 .inner
227 .running_parallelism_sum
228 .saturating_add(meta.required_parallelism);
229
230 let runner = self.inner.runners.remove(&meta.key());
231 Some(PoppedIcebergTask { meta, runner })
232 }
233
234 pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
238 let Some(required) = self.inner.id_map.remove(&task_key) else {
239 tracing::warn!(
240 task_id = task_key.0,
241 plan_index = task_key.1,
242 "finish_running called for unknown task key, possible bug: double-finish or invalid key"
243 );
244 return false;
245 };
246
247 self.inner.running_parallelism_sum =
248 self.inner.running_parallelism_sum.saturating_sub(required);
249 self.inner.runners.remove(&task_key);
250 self.notify_schedulable();
251 true
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
260 IcebergTaskMeta {
261 task_id: id,
262 plan_index,
263 required_parallelism: p,
264 }
265 }
266
267 #[test]
268 fn test_basic_push_pop() {
269 let mut q = IcebergTaskQueue::new(8, 32);
270 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
271 assert_eq!(q.waiting_parallelism_sum(), 4);
272
273 let popped = q.pop().expect("should pop");
274 assert_eq!(popped.meta.task_id, 1);
275 assert_eq!(q.waiting_parallelism_sum(), 0);
276 assert_eq!(q.running_parallelism_sum(), 4);
277
278 assert!(q.finish_running((1, 0)));
279 assert_eq!(q.running_parallelism_sum(), 0);
280 }
281
282 #[test]
283 fn test_fifo_ordering() {
284 let mut q = IcebergTaskQueue::new(8, 32);
285 assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
286 assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
287 assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
288
289 assert_eq!(q.pop().unwrap().meta.task_id, 1);
290 assert_eq!(q.pop().unwrap().meta.task_id, 2);
291 assert_eq!(q.pop().unwrap().meta.task_id, 3);
292 }
293
294 #[test]
295 fn test_capacity_reject() {
296 let mut q = IcebergTaskQueue::new(4, 6);
297 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
298 assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); }
301
302 #[test]
303 fn test_invalid_parallelism() {
304 let mut q = IcebergTaskQueue::new(4, 10);
305 assert_eq!(
306 q.push(mk_meta(1, 0, 0), None),
307 PushResult::RejectedInvalidParallelism
308 );
309 assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); }
311
312 #[test]
313 fn test_duplicate_key_rejected() {
314 let mut q = IcebergTaskQueue::new(8, 32);
315 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
316 assert_eq!(
318 q.push(mk_meta(1, 0, 5), None),
319 PushResult::RejectedDuplicate
320 );
321 assert_eq!(q.waiting_parallelism_sum(), 3);
323
324 assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
326 assert_eq!(q.waiting_parallelism_sum(), 5);
327
328 let p = q.pop().unwrap();
330 assert_eq!(p.meta.task_id, 1);
331 assert_eq!(p.meta.plan_index, 0);
332 q.finish_running((1, 0));
333
334 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
336 }
337
338 #[test]
339 fn test_pop_insufficient_parallelism() {
340 let mut q = IcebergTaskQueue::new(8, 32);
341 assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
342 assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
343
344 let p1 = q.pop().unwrap();
345 assert_eq!(p1.meta.task_id, 1);
346 assert!(q.pop().is_none());
348
349 assert!(q.finish_running((1, 0)));
351 let p2 = q.pop().unwrap();
352 assert_eq!(p2.meta.task_id, 2);
353 }
354
355 #[test]
356 fn test_finish_nonexistent_task() {
357 let mut q = IcebergTaskQueue::new(4, 16);
358 assert!(!q.finish_running((999, 0)));
359 assert_eq!(q.running_parallelism_sum(), 0);
360 }
361
362 #[test]
363 fn test_double_finish() {
364 let mut q = IcebergTaskQueue::new(8, 32);
365 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
366 q.pop().unwrap();
367 assert_eq!(q.running_parallelism_sum(), 4);
368
369 assert!(q.finish_running((1, 0)));
371 assert_eq!(q.running_parallelism_sum(), 0);
372
373 assert!(!q.finish_running((1, 0)));
375 assert_eq!(q.running_parallelism_sum(), 0);
376 }
377
378 #[test]
379 fn test_max_parallelism_boundary() {
380 let mut q = IcebergTaskQueue::new(4, 4);
382
383 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
385 assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
387
388 let p = q.pop().unwrap();
390 assert_eq!(p.meta.required_parallelism, 4);
391 assert_eq!(q.running_parallelism_sum(), 4);
392
393 assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
395 assert!(q.pop().is_none());
396 }
397
398 #[test]
399 fn test_same_task_id_multiple_plans() {
400 let mut q = IcebergTaskQueue::new(10, 30);
401 let task_id = 1u64;
402
403 assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
405 assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
406 assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
407 assert_eq!(q.waiting_parallelism_sum(), 9);
408
409 for i in 0..3 {
411 let p = q.pop().unwrap();
412 assert_eq!(p.meta.task_id, task_id);
413 assert_eq!(p.meta.plan_index, i);
414 }
415 assert_eq!(q.running_parallelism_sum(), 9);
416
417 assert!(q.finish_running((task_id, 1)));
419 assert_eq!(q.running_parallelism_sum(), 5);
420 assert!(q.finish_running((task_id, 0)));
421 assert!(q.finish_running((task_id, 2)));
422 assert_eq!(q.running_parallelism_sum(), 0);
423 }
424
425 #[test]
426 fn test_empty_queue_behavior() {
427 let mut q = IcebergTaskQueue::new(8, 32);
428 assert!(q.pop().is_none());
429 assert!(!q.finish_running((1, 0)));
430 assert_eq!(q.waiting_parallelism_sum(), 0);
431 assert_eq!(q.running_parallelism_sum(), 0);
432 }
433}