1use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24pub(crate) type TaskKey = (u64, usize);
26
27#[derive(Debug, Clone)]
29pub struct IcebergTaskMeta {
30 pub task_id: u64,
31 pub plan_index: usize,
32 pub required_parallelism: u32,
34}
35
36#[derive(Debug)]
37pub struct PoppedIcebergTask {
38 pub meta: IcebergTaskMeta,
39 pub runner: Option<IcebergCompactionPlanRunner>,
40}
41
42impl IcebergTaskMeta {
43 fn key(&self) -> TaskKey {
44 (self.task_id, self.plan_index)
45 }
46}
47
48struct IcebergTaskQueueInner {
50 deque: VecDeque<IcebergTaskMeta>,
52 id_map: HashMap<TaskKey, u32>,
54 waiting_parallelism_sum: u32,
56 running_parallelism_sum: u32,
58 runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
60}
61
62pub struct IcebergTaskQueue {
76 inner: IcebergTaskQueueInner,
77 max_parallelism: u32,
79 pending_parallelism_budget: u32,
81 schedule_notify: Arc<Notify>,
83}
84
85#[derive(Debug, PartialEq, Eq)]
86pub enum PushResult {
87 Added,
88 RejectedCapacity,
90 RejectedTooLarge,
92 RejectedInvalidParallelism,
94 RejectedDuplicate,
96}
97
98impl IcebergTaskQueue {
99 pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
100 assert!(max_parallelism > 0, "max_parallelism must be > 0");
101 assert!(
102 pending_parallelism_budget >= max_parallelism,
103 "pending budget should allow at least one task"
104 );
105 Self {
106 inner: IcebergTaskQueueInner {
107 deque: VecDeque::new(),
108 id_map: HashMap::new(),
109 waiting_parallelism_sum: 0,
110 running_parallelism_sum: 0,
111 runners: HashMap::new(),
112 },
113 max_parallelism,
114 pending_parallelism_budget,
115 schedule_notify: Arc::new(Notify::new()),
116 }
117 }
118
119 pub async fn wait_schedulable(&self) -> bool {
124 if self.has_schedulable_tasks() {
126 return true;
127 }
128 self.schedule_notify.notified().await;
130 self.has_schedulable_tasks()
131 }
132
133 fn has_schedulable_tasks(&self) -> bool {
134 if let Some(front_task) = self.inner.deque.front() {
135 let available_parallelism = self
136 .max_parallelism
137 .saturating_sub(self.inner.running_parallelism_sum);
138 available_parallelism >= front_task.required_parallelism
139 } else {
140 false
141 }
142 }
143
144 fn notify_schedulable(&self) {
145 if self.has_schedulable_tasks() {
146 self.schedule_notify.notify_one();
147 }
148 }
149
150 pub fn running_parallelism_sum(&self) -> u32 {
151 self.inner.running_parallelism_sum
152 }
153
154 pub fn waiting_parallelism_sum(&self) -> u32 {
155 self.inner.waiting_parallelism_sum
156 }
157
158 fn available_parallelism(&self) -> u32 {
159 self.max_parallelism
160 .saturating_sub(self.inner.running_parallelism_sum)
161 }
162
163 pub fn push(
167 &mut self,
168 meta: IcebergTaskMeta,
169 runner: Option<IcebergCompactionPlanRunner>,
170 ) -> PushResult {
171 if meta.required_parallelism == 0 {
172 return PushResult::RejectedInvalidParallelism;
173 }
174 if meta.required_parallelism > self.max_parallelism {
175 return PushResult::RejectedTooLarge;
176 }
177
178 let key = meta.key();
179
180 if self.inner.id_map.contains_key(&key) {
182 return PushResult::RejectedDuplicate;
183 }
184
185 let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
186 if new_total > self.pending_parallelism_budget {
187 return PushResult::RejectedCapacity;
188 }
189
190 self.inner.id_map.insert(key, meta.required_parallelism);
191 self.inner.waiting_parallelism_sum = new_total;
192
193 if let Some(r) = runner {
194 self.inner.runners.insert(key, r);
195 }
196
197 self.inner.deque.push_back(meta);
198
199 self.notify_schedulable();
200 PushResult::Added
201 }
202
203 pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
208 let front = self.inner.deque.front()?;
209 if front.required_parallelism > self.available_parallelism() {
210 return None;
211 }
212
213 let meta = self.inner.deque.pop_front()?;
214 self.inner.waiting_parallelism_sum = self
215 .inner
216 .waiting_parallelism_sum
217 .saturating_sub(meta.required_parallelism);
218 self.inner.running_parallelism_sum = self
219 .inner
220 .running_parallelism_sum
221 .saturating_add(meta.required_parallelism);
222
223 let runner = self.inner.runners.remove(&meta.key());
224 Some(PoppedIcebergTask { meta, runner })
225 }
226
227 pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
231 let Some(required) = self.inner.id_map.remove(&task_key) else {
232 tracing::warn!(
233 task_id = task_key.0,
234 plan_index = task_key.1,
235 "finish_running called for unknown task key, possible bug: double-finish or invalid key"
236 );
237 return false;
238 };
239
240 self.inner.running_parallelism_sum =
241 self.inner.running_parallelism_sum.saturating_sub(required);
242 self.inner.runners.remove(&task_key);
243 self.notify_schedulable();
244 true
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
253 IcebergTaskMeta {
254 task_id: id,
255 plan_index,
256 required_parallelism: p,
257 }
258 }
259
260 #[test]
261 fn test_basic_push_pop() {
262 let mut q = IcebergTaskQueue::new(8, 32);
263 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
264 assert_eq!(q.waiting_parallelism_sum(), 4);
265
266 let popped = q.pop().expect("should pop");
267 assert_eq!(popped.meta.task_id, 1);
268 assert_eq!(q.waiting_parallelism_sum(), 0);
269 assert_eq!(q.running_parallelism_sum(), 4);
270
271 assert!(q.finish_running((1, 0)));
272 assert_eq!(q.running_parallelism_sum(), 0);
273 }
274
275 #[test]
276 fn test_fifo_ordering() {
277 let mut q = IcebergTaskQueue::new(8, 32);
278 assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
279 assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
280 assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
281
282 assert_eq!(q.pop().unwrap().meta.task_id, 1);
283 assert_eq!(q.pop().unwrap().meta.task_id, 2);
284 assert_eq!(q.pop().unwrap().meta.task_id, 3);
285 }
286
287 #[test]
288 fn test_capacity_reject() {
289 let mut q = IcebergTaskQueue::new(4, 6);
290 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
291 assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); }
294
295 #[test]
296 fn test_invalid_parallelism() {
297 let mut q = IcebergTaskQueue::new(4, 10);
298 assert_eq!(
299 q.push(mk_meta(1, 0, 0), None),
300 PushResult::RejectedInvalidParallelism
301 );
302 assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); }
304
305 #[test]
306 fn test_duplicate_key_rejected() {
307 let mut q = IcebergTaskQueue::new(8, 32);
308 assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
309 assert_eq!(
311 q.push(mk_meta(1, 0, 5), None),
312 PushResult::RejectedDuplicate
313 );
314 assert_eq!(q.waiting_parallelism_sum(), 3);
316
317 assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
319 assert_eq!(q.waiting_parallelism_sum(), 5);
320
321 let p = q.pop().unwrap();
323 assert_eq!(p.meta.task_id, 1);
324 assert_eq!(p.meta.plan_index, 0);
325 q.finish_running((1, 0));
326
327 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
329 }
330
331 #[test]
332 fn test_pop_insufficient_parallelism() {
333 let mut q = IcebergTaskQueue::new(8, 32);
334 assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
335 assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
336
337 let p1 = q.pop().unwrap();
338 assert_eq!(p1.meta.task_id, 1);
339 assert!(q.pop().is_none());
341
342 assert!(q.finish_running((1, 0)));
344 let p2 = q.pop().unwrap();
345 assert_eq!(p2.meta.task_id, 2);
346 }
347
348 #[test]
349 fn test_finish_nonexistent_task() {
350 let mut q = IcebergTaskQueue::new(4, 16);
351 assert!(!q.finish_running((999, 0)));
352 assert_eq!(q.running_parallelism_sum(), 0);
353 }
354
355 #[test]
356 fn test_double_finish() {
357 let mut q = IcebergTaskQueue::new(8, 32);
358 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
359 q.pop().unwrap();
360 assert_eq!(q.running_parallelism_sum(), 4);
361
362 assert!(q.finish_running((1, 0)));
364 assert_eq!(q.running_parallelism_sum(), 0);
365
366 assert!(!q.finish_running((1, 0)));
368 assert_eq!(q.running_parallelism_sum(), 0);
369 }
370
371 #[test]
372 fn test_max_parallelism_boundary() {
373 let mut q = IcebergTaskQueue::new(4, 4);
375
376 assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
378 assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
380
381 let p = q.pop().unwrap();
383 assert_eq!(p.meta.required_parallelism, 4);
384 assert_eq!(q.running_parallelism_sum(), 4);
385
386 assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
388 assert!(q.pop().is_none());
389 }
390
391 #[test]
392 fn test_same_task_id_multiple_plans() {
393 let mut q = IcebergTaskQueue::new(10, 30);
394 let task_id = 1u64;
395
396 assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
398 assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
399 assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
400 assert_eq!(q.waiting_parallelism_sum(), 9);
401
402 for i in 0..3 {
404 let p = q.pop().unwrap();
405 assert_eq!(p.meta.task_id, task_id);
406 assert_eq!(p.meta.plan_index, i);
407 }
408 assert_eq!(q.running_parallelism_sum(), 9);
409
410 assert!(q.finish_running((task_id, 1)));
412 assert_eq!(q.running_parallelism_sum(), 5);
413 assert!(q.finish_running((task_id, 0)));
414 assert!(q.finish_running((task_id, 2)));
415 assert_eq!(q.running_parallelism_sum(), 0);
416 }
417
418 #[test]
419 fn test_empty_queue_behavior() {
420 let mut q = IcebergTaskQueue::new(8, 32);
421 assert!(q.pop().is_none());
422 assert!(!q.finish_running((1, 0)));
423 assert_eq!(q.waiting_parallelism_sum(), 0);
424 assert_eq!(q.running_parallelism_sum(), 0);
425 }
426}