1use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, HashSet, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24type TaskId = u64;
25type TaskUniqueIdent = String; #[derive(Debug, Clone)]
30pub struct IcebergTaskMeta {
31 pub task_id: TaskId,
32 pub unique_ident: TaskUniqueIdent,
33 pub enqueue_at: std::time::Instant,
34 pub required_parallelism: u32,
36}
37
38#[derive(Debug)]
39pub struct PoppedIcebergTask {
40 pub meta: IcebergTaskMeta,
41 pub runner: Option<IcebergCompactorRunner>,
42}
43
44struct IcebergTaskQueueInner {
46 deque: VecDeque<IcebergTaskMeta>, waiting: HashSet<TaskUniqueIdent>, running: HashSet<TaskUniqueIdent>, id_map: HashMap<TaskId, (TaskUniqueIdent, u32)>, waiting_parallelism_sum: u32, running_parallelism_sum: u32, runners: HashMap<TaskId, IcebergCompactorRunner>, }
54
55pub struct IcebergTaskQueue {
70 inner: IcebergTaskQueueInner,
71 max_parallelism: u32,
73 pending_parallelism_budget: u32,
75 schedule_notify: Option<Arc<Notify>>,
77}
78
79#[derive(Debug, PartialEq, Eq)]
80pub enum PushResult {
81 Added,
82 Replaced { old_task_id: TaskId },
83 RejectedRunningDuplicate, RejectedCapacity, RejectedTooLarge, RejectedInvalidParallelism, }
88
89impl IcebergTaskQueue {
90 pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
91 assert!(max_parallelism > 0, "max_parallelism must be > 0");
92 assert!(
93 pending_parallelism_budget >= max_parallelism,
94 "pending budget should allow at least one task"
95 );
96 Self {
97 inner: IcebergTaskQueueInner {
98 deque: VecDeque::new(),
99 waiting: HashSet::new(),
100 running: HashSet::new(),
101 id_map: HashMap::new(),
102 waiting_parallelism_sum: 0,
103 running_parallelism_sum: 0,
104 runners: HashMap::new(),
105 },
106 max_parallelism,
107 pending_parallelism_budget,
108 schedule_notify: None,
109 }
110 }
111
112 pub fn new_with_notify(
113 max_parallelism: u32,
114 pending_parallelism_budget: u32,
115 ) -> (Self, Arc<Notify>) {
116 let notify = Arc::new(Notify::new());
117 let mut queue = Self::new(max_parallelism, pending_parallelism_budget);
118 queue.schedule_notify = Some(notify.clone());
119 (queue, notify)
120 }
121
122 pub async fn wait_schedulable(&self) -> bool {
123 if let Some(notify) = &self.schedule_notify {
124 if self.has_schedulable_tasks() {
126 return true;
127 }
128 notify.notified().await;
130 self.has_schedulable_tasks()
131 } else {
132 self.has_schedulable_tasks()
133 }
134 }
135
136 fn has_schedulable_tasks(&self) -> bool {
137 if let Some(front_task) = self.inner.deque.front() {
138 let available_parallelism = self
139 .max_parallelism
140 .saturating_sub(self.inner.running_parallelism_sum);
141 available_parallelism >= front_task.required_parallelism
142 } else {
143 false
144 }
145 }
146
147 fn notify_schedulable(&self) {
148 if let Some(notify) = &self.schedule_notify
149 && self.has_schedulable_tasks()
150 {
151 notify.notify_one();
152 }
153 }
154
155 pub fn running_parallelism_sum(&self) -> u32 {
156 self.inner.running_parallelism_sum
157 }
158
159 pub fn waiting_parallelism_sum(&self) -> u32 {
160 self.inner.waiting_parallelism_sum
161 }
162
163 fn available_parallelism(&self) -> u32 {
164 self.max_parallelism
165 .saturating_sub(self.inner.running_parallelism_sum)
166 }
167
168 pub fn push(
177 &mut self,
178 meta: IcebergTaskMeta,
179 runner: Option<IcebergCompactorRunner>,
180 ) -> PushResult {
181 let uid = &meta.unique_ident;
182 if meta.required_parallelism == 0 {
183 return PushResult::RejectedInvalidParallelism;
184 }
185 if meta.required_parallelism > self.max_parallelism {
186 return PushResult::RejectedTooLarge;
187 }
188 if self.inner.running.contains(uid) {
189 return PushResult::RejectedRunningDuplicate;
190 }
191
192 if self.inner.waiting.contains(uid) {
193 for slot in &mut self.inner.deque {
194 if slot.unique_ident == *uid {
195 let old_task_id = slot.task_id;
196 let old_required = slot.required_parallelism;
197 let new_sum = self.inner.waiting_parallelism_sum - old_required
198 + meta.required_parallelism;
199 if new_sum > self.pending_parallelism_budget {
200 return PushResult::RejectedCapacity;
201 }
202 self.inner.id_map.remove(&old_task_id);
204 slot.task_id = meta.task_id;
205 slot.enqueue_at = meta.enqueue_at;
206 slot.required_parallelism = meta.required_parallelism;
207 self.inner
208 .id_map
209 .insert(slot.task_id, (uid.clone(), slot.required_parallelism));
210 self.inner.waiting_parallelism_sum = new_sum;
211 if let Some(r) = runner {
212 self.inner.runners.remove(&old_task_id);
214 self.inner.runners.insert(slot.task_id, r);
215 } else {
216 if old_task_id != slot.task_id
218 && let Some(old_runner) = self.inner.runners.remove(&old_task_id)
219 {
220 self.inner.runners.insert(slot.task_id, old_runner);
221 }
222 }
223 return PushResult::Replaced { old_task_id };
224 }
225 }
226 panic!(
229 "IcebergTaskQueue invariant violated: waiting contains {uid} but deque missing. waiting_size={}, deque_len={}, id_map_size={}, waiting_parallelism_sum={}, running_parallelism_sum={}",
230 self.inner.waiting.len(),
231 self.inner.deque.len(),
232 self.inner.id_map.len(),
233 self.inner.waiting_parallelism_sum,
234 self.inner.running_parallelism_sum,
235 );
236 }
237 if self.inner.waiting_parallelism_sum + meta.required_parallelism
239 > self.pending_parallelism_budget
240 {
241 return PushResult::RejectedCapacity;
242 }
243 self.inner.waiting.insert(uid.clone());
244 self.inner
245 .id_map
246 .insert(meta.task_id, (uid.clone(), meta.required_parallelism));
247 self.inner.waiting_parallelism_sum += meta.required_parallelism;
248 self.inner.deque.push_back(meta);
249 if let Some(r) = runner {
250 self.inner.runners.insert(r.task_id, r);
251 }
252 self.notify_schedulable();
254 PushResult::Added
255 }
256
257 pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
259 let front = self.inner.deque.front()?;
260 if front.required_parallelism > self.available_parallelism() {
261 return None;
262 }
263 let meta = self.inner.deque.pop_front()?;
264 let uid = meta.unique_ident.clone();
265 debug_assert!(self.inner.waiting.contains(&uid));
266 self.inner.waiting.remove(&uid);
267 self.inner.running.insert(uid);
268 self.inner.waiting_parallelism_sum = self
269 .inner
270 .waiting_parallelism_sum
271 .saturating_sub(meta.required_parallelism);
272 self.inner.running_parallelism_sum = self
273 .inner
274 .running_parallelism_sum
275 .saturating_add(meta.required_parallelism);
276 let runner = self.inner.runners.remove(&meta.task_id);
277 Some(PoppedIcebergTask { meta, runner })
278 }
279
280 pub fn finish_running(&mut self, task_id: TaskId) -> bool {
281 let Some((uid, required)) = self.inner.id_map.remove(&task_id) else {
282 return false;
283 };
284 if self.inner.running.remove(&uid) {
285 self.inner.running_parallelism_sum =
286 self.inner.running_parallelism_sum.saturating_sub(required);
287 self.inner.runners.remove(&task_id);
289 self.notify_schedulable();
291 true
292 } else {
293 false
294 }
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 fn mk_meta(id: u64, ident: &str, p: u32) -> IcebergTaskMeta {
303 IcebergTaskMeta {
304 task_id: id,
305 unique_ident: ident.to_owned(),
306 enqueue_at: std::time::Instant::now(),
307 required_parallelism: p,
308 }
309 }
310
311 #[test]
312 fn test_push_pop_with_runner() {
313 let mut q = IcebergTaskQueue::new(8, 32); let meta = mk_meta(1, "t1", 4);
316 let res = q.push(meta, None); assert_eq!(res, PushResult::Added);
318 let popped = q.pop().expect("should pop");
319 assert_eq!(popped.meta.task_id, 1);
320 assert!(popped.runner.is_none());
321 assert!(q.finish_running(1));
322 assert_eq!(q.running_parallelism_sum(), 0);
323 }
324
325 #[test]
326 fn test_replacement_keep_old_runner() {
327 let mut q = IcebergTaskQueue::new(8, 32);
328 let m1 = mk_meta(10, "same", 3);
329 let _ = q.push(m1, None); let m2 = mk_meta(11, "same", 5); let res = q.push(m2, None); assert!(matches!(res, PushResult::Replaced { old_task_id: 10 }));
333 let popped = q.pop().unwrap();
334 assert_eq!(popped.meta.task_id, 11);
335 assert!(popped.runner.is_none());
336 }
337
338 #[test]
339 fn test_capacity_reject() {
340 let mut q = IcebergTaskQueue::new(4, 6); assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
342 assert_eq!(q.push(mk_meta(2, "b", 3), None), PushResult::Added); assert_eq!(
345 q.push(mk_meta(3, "c", 1), None),
346 PushResult::RejectedCapacity
347 );
348 }
349
350 #[test]
351 fn test_invalid_parallelism() {
352 let mut q = IcebergTaskQueue::new(4, 10);
353 assert_eq!(
354 q.push(mk_meta(1, "a", 0), None),
355 PushResult::RejectedInvalidParallelism
356 );
357 assert_eq!(
358 q.push(mk_meta(2, "a", 5), None),
359 PushResult::RejectedTooLarge
360 ); }
362
363 #[test]
364 fn test_running_duplicate_reject() {
365 let mut q = IcebergTaskQueue::new(8, 32);
366 assert_eq!(q.push(mk_meta(1, "x", 4), None), PushResult::Added);
367 let _p = q.pop().unwrap();
368 assert_eq!(
370 q.push(mk_meta(2, "x", 4), None),
371 PushResult::RejectedRunningDuplicate
372 );
373 assert!(q.finish_running(1));
374 assert_eq!(q.push(mk_meta(3, "x", 4), None), PushResult::Added);
376 }
377
378 #[test]
379 fn test_replacement_exceed_budget_reject() {
380 let mut q = IcebergTaskQueue::new(8, 10);
382 assert_eq!(q.push(mk_meta(1, "a", 4), None), PushResult::Added);
383 assert_eq!(q.push(mk_meta(2, "b", 4), None), PushResult::Added);
384 assert_eq!(
386 q.push(mk_meta(3, "b", 7), None),
387 PushResult::RejectedCapacity
388 );
389 let p1 = q.pop().unwrap();
391 assert_eq!(p1.meta.unique_ident, "a");
392 let p2 = q.pop().unwrap();
393 assert_eq!(p2.meta.unique_ident, "b");
394 assert_eq!(p2.meta.required_parallelism, 4);
395 }
396
397 #[test]
398 fn test_replacement_position_preserved() {
399 let mut q = IcebergTaskQueue::new(8, 32);
400 assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
401 assert_eq!(q.push(mk_meta(2, "b", 3), None), PushResult::Added);
402 assert!(matches!(
404 q.push(mk_meta(10, "a", 5), None),
405 PushResult::Replaced { old_task_id: 1 }
406 ));
407 let p1 = q.pop().unwrap();
409 assert_eq!(p1.meta.task_id, 10);
410 assert_eq!(p1.meta.unique_ident, "a");
411 let p2 = q.pop().unwrap();
412 assert_eq!(p2.meta.unique_ident, "b");
413 }
414
415 #[test]
416 fn test_pop_insufficient_parallelism() {
417 let mut q = IcebergTaskQueue::new(8, 32);
419 assert_eq!(q.push(mk_meta(1, "a", 6), None), PushResult::Added);
420 assert_eq!(q.push(mk_meta(2, "b", 4), None), PushResult::Added);
421 let p1 = q.pop().unwrap();
422 assert_eq!(p1.meta.unique_ident, "a");
423 assert!(q.pop().is_none());
425 assert!(q.finish_running(1));
427 let p2 = q.pop().unwrap();
428 assert_eq!(p2.meta.unique_ident, "b");
429 }
430
431 #[test]
432 fn test_finish_running_nonexistent() {
433 let mut q = IcebergTaskQueue::new(4, 16);
434 assert!(!q.finish_running(999)); assert_eq!(q.running_parallelism_sum(), 0);
436 assert_eq!(q.waiting_parallelism_sum(), 0);
437 }
438
439 #[test]
440 fn test_finish_running_updates_sums() {
441 let mut q = IcebergTaskQueue::new(8, 32);
442 assert_eq!(q.push(mk_meta(1, "a", 5), None), PushResult::Added);
443 let _ = q.pop().unwrap();
444 assert_eq!(q.running_parallelism_sum(), 5);
445 assert!(q.finish_running(1));
446 assert_eq!(q.running_parallelism_sum(), 0);
447 assert!(q.pop().is_none()); }
449
450 #[test]
451 fn test_replacement_parallelism_sum_adjustment() {
452 let mut q = IcebergTaskQueue::new(8, 32);
453 assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
454 assert_eq!(q.waiting_parallelism_sum(), 3);
455 assert!(matches!(
457 q.push(mk_meta(2, "a", 6), None),
458 PushResult::Replaced { old_task_id: 1 }
459 ));
460 assert_eq!(q.waiting_parallelism_sum(), 6);
461 let p = q.pop().unwrap();
463 assert_eq!(p.meta.required_parallelism, 6);
464 assert!(q.finish_running(p.meta.task_id));
465 }
466}