1pub use self::iceberg_compactor_runner::create_task_execution;
16pub(crate) use self::report::{
17 IcebergPlanCompletion, IcebergTaskReport, IcebergTaskTracker, ReportSendResult,
18 build_iceberg_task_report, flush_pending_iceberg_task_reports,
19 send_or_buffer_iceberg_task_report,
20};
21use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
22
23pub(crate) mod iceberg_compactor_runner;
24pub(crate) mod report;
25
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28
29use tokio::sync::Notify;
30
31pub(crate) type TaskKey = (u64, usize);
33
34#[derive(Debug, Clone)]
36pub struct IcebergTaskMeta {
37 pub task_id: u64,
38 pub plan_index: usize,
39 pub required_parallelism: u32,
41}
42
43#[derive(Debug)]
44pub struct PoppedIcebergTask {
45 pub meta: IcebergTaskMeta,
46 pub runner: Option<IcebergCompactionPlanRunner>,
47}
48
49impl IcebergTaskMeta {
50 fn key(&self) -> TaskKey {
51 (self.task_id, self.plan_index)
52 }
53}
54
55struct IcebergTaskQueueInner {
57 deque: VecDeque<IcebergTaskMeta>,
59 id_map: HashMap<TaskKey, u32>,
61 waiting_parallelism_sum: u32,
63 running_parallelism_sum: u32,
65 runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
67}
68
69pub struct IcebergTaskQueue {
83 inner: IcebergTaskQueueInner,
84 max_parallelism: u32,
86 pending_parallelism_budget: u32,
88 schedule_notify: Arc<Notify>,
90}
91
92#[derive(Debug, PartialEq, Eq)]
93pub enum PushResult {
94 Added,
95 RejectedCapacity,
97 RejectedTooLarge,
99 RejectedInvalidParallelism,
101 RejectedDuplicate,
103}
104
105impl IcebergTaskQueue {
106 pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
107 assert!(max_parallelism > 0, "max_parallelism must be > 0");
108 assert!(
109 pending_parallelism_budget >= max_parallelism,
110 "pending budget should allow at least one task"
111 );
112 Self {
113 inner: IcebergTaskQueueInner {
114 deque: VecDeque::new(),
115 id_map: HashMap::new(),
116 waiting_parallelism_sum: 0,
117 running_parallelism_sum: 0,
118 runners: HashMap::new(),
119 },
120 max_parallelism,
121 pending_parallelism_budget,
122 schedule_notify: Arc::new(Notify::new()),
123 }
124 }
125
126 pub async fn wait_schedulable(&self) -> bool {
131 if self.has_schedulable_tasks() {
133 return true;
134 }
135 self.schedule_notify.notified().await;
137 self.has_schedulable_tasks()
138 }
139
140 fn has_schedulable_tasks(&self) -> bool {
141 if let Some(front_task) = self.inner.deque.front() {
142 let available_parallelism = self
143 .max_parallelism
144 .saturating_sub(self.inner.running_parallelism_sum);
145 available_parallelism >= front_task.required_parallelism
146 } else {
147 false
148 }
149 }
150
151 fn notify_schedulable(&self) {
152 if self.has_schedulable_tasks() {
153 self.schedule_notify.notify_one();
154 }
155 }
156
157 pub fn running_parallelism_sum(&self) -> u32 {
158 self.inner.running_parallelism_sum
159 }
160
161 pub fn waiting_parallelism_sum(&self) -> u32 {
162 self.inner.waiting_parallelism_sum
163 }
164
165 fn available_parallelism(&self) -> u32 {
166 self.max_parallelism
167 .saturating_sub(self.inner.running_parallelism_sum)
168 }
169
170 pub fn push(
174 &mut self,
175 meta: IcebergTaskMeta,
176 runner: Option<IcebergCompactionPlanRunner>,
177 ) -> PushResult {
178 if meta.required_parallelism == 0 {
179 return PushResult::RejectedInvalidParallelism;
180 }
181 if meta.required_parallelism > self.max_parallelism {
182 return PushResult::RejectedTooLarge;
183 }
184
185 let key = meta.key();
186
187 if self.inner.id_map.contains_key(&key) {
189 return PushResult::RejectedDuplicate;
190 }
191
192 let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
193 if new_total > self.pending_parallelism_budget {
194 return PushResult::RejectedCapacity;
195 }
196
197 self.inner.id_map.insert(key, meta.required_parallelism);
198 self.inner.waiting_parallelism_sum = new_total;
199
200 if let Some(r) = runner {
201 self.inner.runners.insert(key, r);
202 }
203
204 self.inner.deque.push_back(meta);
205
206 self.notify_schedulable();
207 PushResult::Added
208 }
209
210 pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
215 let front = self.inner.deque.front()?;
216 if front.required_parallelism > self.available_parallelism() {
217 return None;
218 }
219
220 let meta = self.inner.deque.pop_front()?;
221 self.inner.waiting_parallelism_sum = self
222 .inner
223 .waiting_parallelism_sum
224 .saturating_sub(meta.required_parallelism);
225 self.inner.running_parallelism_sum = self
226 .inner
227 .running_parallelism_sum
228 .saturating_add(meta.required_parallelism);
229
230 let runner = self.inner.runners.remove(&meta.key());
231 Some(PoppedIcebergTask { meta, runner })
232 }
233
234 pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
238 let Some(required) = self.inner.id_map.remove(&task_key) else {
239 tracing::warn!(
240 task_id = task_key.0,
241 plan_index = task_key.1,
242 "finish_running called for unknown task key, possible bug: double-finish or invalid key"
243 );
244 return false;
245 };
246
247 self.inner.running_parallelism_sum =
248 self.inner.running_parallelism_sum.saturating_sub(required);
249 self.inner.runners.remove(&task_key);
250 self.notify_schedulable();
251 true
252 }
253
254 pub fn cancel_waiting_task(&mut self, task_id: u64) -> usize {
258 let mut retained = VecDeque::with_capacity(self.inner.deque.len());
259 let mut cancelled_parallelism = 0;
260 let mut cancelled_count = 0;
261
262 while let Some(meta) = self.inner.deque.pop_front() {
263 if meta.task_id == task_id {
264 cancelled_parallelism += meta.required_parallelism;
265 cancelled_count += 1;
266 self.inner.id_map.remove(&meta.key());
267 self.inner.runners.remove(&meta.key());
268 } else {
269 retained.push_back(meta);
270 }
271 }
272
273 self.inner.deque = retained;
274 self.inner.waiting_parallelism_sum = self
275 .inner
276 .waiting_parallelism_sum
277 .saturating_sub(cancelled_parallelism);
278
279 if cancelled_count > 0 {
280 self.notify_schedulable();
281 }
282
283 cancelled_count
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
292 IcebergTaskMeta {
293 task_id: id,
294 plan_index,
295 required_parallelism: p,
296 }
297 }
298
299 #[test]
300 fn test_basic_push_pop() {
301 let mut q = IcebergTaskQueue::new(8, 32);
302 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
303 assert_eq!(q.waiting_parallelism_sum(), 4);
304
305 let popped = q.pop().expect("should pop");
306 assert_eq!(popped.meta.task_id, 1);
307 assert_eq!(q.waiting_parallelism_sum(), 0);
308 assert_eq!(q.running_parallelism_sum(), 4);
309
310 assert!(q.finish_running((1, 0)));
311 assert_eq!(q.running_parallelism_sum(), 0);
312 }
313
314 #[test]
315 fn test_fifo_ordering() {
316 let mut q = IcebergTaskQueue::new(8, 32);
317 assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
318 assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
319 assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
320
321 assert_eq!(q.pop().unwrap().meta.task_id, 1);
322 assert_eq!(q.pop().unwrap().meta.task_id, 2);
323 assert_eq!(q.pop().unwrap().meta.task_id, 3);
324 }
325
326 #[test]
327 fn test_capacity_reject() {
328 let mut q = IcebergTaskQueue::new(4, 6);
329 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
330 assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); }
333
334 #[test]
335 fn test_invalid_parallelism() {
336 let mut q = IcebergTaskQueue::new(4, 10);
337 assert_eq!(
338 q.push(mk_meta(1, 0, 0), None),
339 PushResult::RejectedInvalidParallelism
340 );
341 assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); }
343
344 #[test]
345 fn test_duplicate_key_rejected() {
346 let mut q = IcebergTaskQueue::new(8, 32);
347 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
348 assert_eq!(
350 q.push(mk_meta(1, 0, 5), None),
351 PushResult::RejectedDuplicate
352 );
353 assert_eq!(q.waiting_parallelism_sum(), 3);
355
356 assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
358 assert_eq!(q.waiting_parallelism_sum(), 5);
359
360 let p = q.pop().unwrap();
362 assert_eq!(p.meta.task_id, 1);
363 assert_eq!(p.meta.plan_index, 0);
364 q.finish_running((1, 0));
365
366 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
368 }
369
370 #[test]
371 fn test_pop_insufficient_parallelism() {
372 let mut q = IcebergTaskQueue::new(8, 32);
373 assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
374 assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
375
376 let p1 = q.pop().unwrap();
377 assert_eq!(p1.meta.task_id, 1);
378 assert!(q.pop().is_none());
380
381 assert!(q.finish_running((1, 0)));
383 let p2 = q.pop().unwrap();
384 assert_eq!(p2.meta.task_id, 2);
385 }
386
387 #[test]
388 fn test_finish_nonexistent_task() {
389 let mut q = IcebergTaskQueue::new(4, 16);
390 assert!(!q.finish_running((999, 0)));
391 assert_eq!(q.running_parallelism_sum(), 0);
392 }
393
394 #[test]
395 fn test_double_finish() {
396 let mut q = IcebergTaskQueue::new(8, 32);
397 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
398 q.pop().unwrap();
399 assert_eq!(q.running_parallelism_sum(), 4);
400
401 assert!(q.finish_running((1, 0)));
403 assert_eq!(q.running_parallelism_sum(), 0);
404
405 assert!(!q.finish_running((1, 0)));
407 assert_eq!(q.running_parallelism_sum(), 0);
408 }
409
410 #[test]
411 fn test_max_parallelism_boundary() {
412 let mut q = IcebergTaskQueue::new(4, 4);
414
415 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
417 assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
419
420 let p = q.pop().unwrap();
422 assert_eq!(p.meta.required_parallelism, 4);
423 assert_eq!(q.running_parallelism_sum(), 4);
424
425 assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
427 assert!(q.pop().is_none());
428 }
429
430 #[test]
431 fn test_same_task_id_multiple_plans() {
432 let mut q = IcebergTaskQueue::new(10, 30);
433 let task_id = 1u64;
434
435 assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
437 assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
438 assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
439 assert_eq!(q.waiting_parallelism_sum(), 9);
440
441 for i in 0..3 {
443 let p = q.pop().unwrap();
444 assert_eq!(p.meta.task_id, task_id);
445 assert_eq!(p.meta.plan_index, i);
446 }
447 assert_eq!(q.running_parallelism_sum(), 9);
448
449 assert!(q.finish_running((task_id, 1)));
451 assert_eq!(q.running_parallelism_sum(), 5);
452 assert!(q.finish_running((task_id, 0)));
453 assert!(q.finish_running((task_id, 2)));
454 assert_eq!(q.running_parallelism_sum(), 0);
455 }
456
457 #[test]
458 fn test_cancel_waiting_task_only_removes_waiting_plans() {
459 let mut q = IcebergTaskQueue::new(10, 30);
460 let task_id = 1u64;
461
462 assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
463 assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
464 assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
465
466 let popped = q.pop().unwrap();
467 assert_eq!(popped.meta.task_id, task_id);
468 assert_eq!(popped.meta.plan_index, 0);
469 assert_eq!(q.running_parallelism_sum(), 3);
470 assert_eq!(q.waiting_parallelism_sum(), 6);
471
472 assert_eq!(q.cancel_waiting_task(task_id), 1);
473 assert_eq!(q.running_parallelism_sum(), 3);
474 assert_eq!(q.waiting_parallelism_sum(), 2);
475
476 assert!(q.finish_running((task_id, 0)));
477 let next = q.pop().unwrap();
478 assert_eq!(next.meta.task_id, 2);
479 assert_eq!(next.meta.plan_index, 0);
480 }
481
482 #[test]
483 fn test_empty_queue_behavior() {
484 let mut q = IcebergTaskQueue::new(8, 32);
485 assert!(q.pop().is_none());
486 assert!(!q.finish_running((1, 0)));
487 assert_eq!(q.waiting_parallelism_sum(), 0);
488 assert_eq!(q.running_parallelism_sum(), 0);
489 }
490}