risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs1use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24type TaskId = u64;
25
26#[derive(Debug, Clone)]
31pub struct IcebergTaskMeta {
32 pub task_id: u64,
33 pub required_parallelism: u32,
35}
36
37#[derive(Debug)]
38pub struct PoppedIcebergTask {
39 pub meta: IcebergTaskMeta,
40 pub runner: Option<IcebergCompactionPlanRunner>,
41}
42
43struct IcebergTaskQueueInner {
45 deque: VecDeque<IcebergTaskMeta>,
47 id_map: HashMap<TaskId, u32>,
49 waiting_parallelism_sum: u32,
51 running_parallelism_sum: u32,
53 runners: HashMap<TaskId, IcebergCompactionPlanRunner>,
55}
56
57pub struct IcebergTaskQueue {
71 inner: IcebergTaskQueueInner,
72 max_parallelism: u32,
74 pending_parallelism_budget: u32,
76 schedule_notify: Option<Arc<Notify>>,
78}
79
80#[derive(Debug, PartialEq, Eq)]
81pub enum PushResult {
82 Added,
83 RejectedCapacity,
85 RejectedTooLarge,
87 RejectedInvalidParallelism,
89}
90
91impl IcebergTaskQueue {
92 pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
93 assert!(max_parallelism > 0, "max_parallelism must be > 0");
94 assert!(
95 pending_parallelism_budget >= max_parallelism,
96 "pending budget should allow at least one task"
97 );
98 Self {
99 inner: IcebergTaskQueueInner {
100 deque: VecDeque::new(),
101 id_map: HashMap::new(),
102 waiting_parallelism_sum: 0,
103 running_parallelism_sum: 0,
104 runners: HashMap::new(),
105 },
106 max_parallelism,
107 pending_parallelism_budget,
108 schedule_notify: None,
109 }
110 }
111
112 pub fn new_with_notify(
113 max_parallelism: u32,
114 pending_parallelism_budget: u32,
115 ) -> (Self, Arc<Notify>) {
116 let notify = Arc::new(Notify::new());
117 let mut queue = Self::new(max_parallelism, pending_parallelism_budget);
118 queue.schedule_notify = Some(notify.clone());
119 (queue, notify)
120 }
121
122 pub async fn wait_schedulable(&self) -> bool {
123 if let Some(notify) = &self.schedule_notify {
124 if self.has_schedulable_tasks() {
126 return true;
127 }
128 notify.notified().await;
130 self.has_schedulable_tasks()
131 } else {
132 self.has_schedulable_tasks()
133 }
134 }
135
136 fn has_schedulable_tasks(&self) -> bool {
137 if let Some(front_task) = self.inner.deque.front() {
138 let available_parallelism = self
139 .max_parallelism
140 .saturating_sub(self.inner.running_parallelism_sum);
141 available_parallelism >= front_task.required_parallelism
142 } else {
143 false
144 }
145 }
146
147 fn notify_schedulable(&self) {
148 if let Some(notify) = &self.schedule_notify
149 && self.has_schedulable_tasks()
150 {
151 notify.notify_one();
152 }
153 }
154
155 pub fn running_parallelism_sum(&self) -> u32 {
156 self.inner.running_parallelism_sum
157 }
158
159 pub fn waiting_parallelism_sum(&self) -> u32 {
160 self.inner.waiting_parallelism_sum
161 }
162
163 fn available_parallelism(&self) -> u32 {
164 self.max_parallelism
165 .saturating_sub(self.inner.running_parallelism_sum)
166 }
167
168 pub fn push(
172 &mut self,
173 meta: IcebergTaskMeta,
174 runner: Option<IcebergCompactionPlanRunner>,
175 ) -> PushResult {
176 if meta.required_parallelism == 0 {
177 return PushResult::RejectedInvalidParallelism;
178 }
179 if meta.required_parallelism > self.max_parallelism {
180 return PushResult::RejectedTooLarge;
181 }
182
183 let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
184 if new_total > self.pending_parallelism_budget {
185 return PushResult::RejectedCapacity;
186 }
187
188 self.inner
189 .id_map
190 .insert(meta.task_id, meta.required_parallelism);
191 self.inner.waiting_parallelism_sum = new_total;
192 self.inner.deque.push_back(meta);
193
194 if let Some(r) = runner {
195 self.inner.runners.insert(r.task_id, r);
196 }
197
198 self.notify_schedulable();
199 PushResult::Added
200 }
201
202 pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
207 let front = self.inner.deque.front()?;
208 if front.required_parallelism > self.available_parallelism() {
209 return None;
210 }
211
212 let meta = self.inner.deque.pop_front()?;
213 self.inner.waiting_parallelism_sum = self
214 .inner
215 .waiting_parallelism_sum
216 .saturating_sub(meta.required_parallelism);
217 self.inner.running_parallelism_sum = self
218 .inner
219 .running_parallelism_sum
220 .saturating_add(meta.required_parallelism);
221
222 let runner = self.inner.runners.remove(&meta.task_id);
223 Some(PoppedIcebergTask { meta, runner })
224 }
225
226 pub fn finish_running(&mut self, task_id: TaskId) -> bool {
230 let Some(required) = self.inner.id_map.remove(&task_id) else {
231 return false;
232 };
233
234 self.inner.running_parallelism_sum =
235 self.inner.running_parallelism_sum.saturating_sub(required);
236 self.inner.runners.remove(&task_id);
237 self.notify_schedulable();
238 true
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 fn mk_meta(id: u64, p: u32) -> IcebergTaskMeta {
247 IcebergTaskMeta {
248 task_id: id,
249 required_parallelism: p,
250 }
251 }
252
253 #[test]
254 fn test_basic_push_pop() {
255 let mut q = IcebergTaskQueue::new(8, 32);
256 assert_eq!(q.push(mk_meta(1, 4), None), PushResult::Added);
257 assert_eq!(q.waiting_parallelism_sum(), 4);
258
259 let popped = q.pop().expect("should pop");
260 assert_eq!(popped.meta.task_id, 1);
261 assert_eq!(q.waiting_parallelism_sum(), 0);
262 assert_eq!(q.running_parallelism_sum(), 4);
263
264 assert!(q.finish_running(1));
265 assert_eq!(q.running_parallelism_sum(), 0);
266 }
267
268 #[test]
269 fn test_fifo_ordering() {
270 let mut q = IcebergTaskQueue::new(8, 32);
271 assert_eq!(q.push(mk_meta(1, 2), None), PushResult::Added);
272 assert_eq!(q.push(mk_meta(2, 2), None), PushResult::Added);
273 assert_eq!(q.push(mk_meta(3, 2), None), PushResult::Added);
274
275 assert_eq!(q.pop().unwrap().meta.task_id, 1);
276 assert_eq!(q.pop().unwrap().meta.task_id, 2);
277 assert_eq!(q.pop().unwrap().meta.task_id, 3);
278 }
279
280 #[test]
281 fn test_capacity_reject() {
282 let mut q = IcebergTaskQueue::new(4, 6);
283 assert_eq!(q.push(mk_meta(1, 3), None), PushResult::Added);
284 assert_eq!(q.push(mk_meta(2, 3), None), PushResult::Added); assert_eq!(q.push(mk_meta(3, 1), None), PushResult::RejectedCapacity); }
287
288 #[test]
289 fn test_invalid_parallelism() {
290 let mut q = IcebergTaskQueue::new(4, 10);
291 assert_eq!(
292 q.push(mk_meta(1, 0), None),
293 PushResult::RejectedInvalidParallelism
294 );
295 assert_eq!(q.push(mk_meta(2, 5), None), PushResult::RejectedTooLarge); }
297
298 #[test]
299 fn test_pop_insufficient_parallelism() {
300 let mut q = IcebergTaskQueue::new(8, 32);
301 assert_eq!(q.push(mk_meta(1, 6), None), PushResult::Added);
302 assert_eq!(q.push(mk_meta(2, 4), None), PushResult::Added);
303
304 let p1 = q.pop().unwrap();
305 assert_eq!(p1.meta.task_id, 1);
306 assert!(q.pop().is_none());
308
309 assert!(q.finish_running(1));
311 let p2 = q.pop().unwrap();
312 assert_eq!(p2.meta.task_id, 2);
313 }
314
315 #[test]
316 fn test_finish_nonexistent_task() {
317 let mut q = IcebergTaskQueue::new(4, 16);
318 assert!(!q.finish_running(999)); assert_eq!(q.running_parallelism_sum(), 0);
320 }
321
322 #[test]
323 fn test_parallelism_sum_accounting() {
324 let mut q = IcebergTaskQueue::new(10, 20);
325
326 assert_eq!(q.push(mk_meta(1, 3), None), PushResult::Added);
327 assert_eq!(q.push(mk_meta(2, 5), None), PushResult::Added);
328 assert_eq!(q.waiting_parallelism_sum(), 8);
329 assert_eq!(q.running_parallelism_sum(), 0);
330
331 let _p1 = q.pop().unwrap();
332 assert_eq!(q.waiting_parallelism_sum(), 5);
333 assert_eq!(q.running_parallelism_sum(), 3);
334
335 let _p2 = q.pop().unwrap();
336 assert_eq!(q.waiting_parallelism_sum(), 0);
337 assert_eq!(q.running_parallelism_sum(), 8);
338
339 assert!(q.finish_running(1));
340 assert_eq!(q.running_parallelism_sum(), 5);
341
342 assert!(q.finish_running(2));
343 assert_eq!(q.running_parallelism_sum(), 0);
344 }
345
346 #[test]
347 fn test_multiple_tasks_same_parallelism() {
348 let mut q = IcebergTaskQueue::new(10, 30);
349 for i in 1..=10 {
351 assert_eq!(q.push(mk_meta(i, 3), None), PushResult::Added);
352 }
353 assert_eq!(q.waiting_parallelism_sum(), 30);
354
355 assert!(q.pop().is_some());
357 assert!(q.pop().is_some());
358 assert!(q.pop().is_some());
359 assert!(q.pop().is_none()); assert_eq!(q.running_parallelism_sum(), 9);
362 assert_eq!(q.waiting_parallelism_sum(), 21);
363 }
364
365 #[test]
366 fn test_empty_queue_behavior() {
367 let mut q = IcebergTaskQueue::new(8, 32);
368 assert!(q.pop().is_none());
369 assert!(!q.finish_running(1));
370 assert_eq!(q.waiting_parallelism_sum(), 0);
371 assert_eq!(q.running_parallelism_sum(), 0);
372 }
373
374 #[test]
375 fn test_runner_lifecycle() {
376 let mut q = IcebergTaskQueue::new(8, 32);
377 assert_eq!(q.push(mk_meta(1, 4), None), PushResult::Added);
379 let popped = q.pop().unwrap();
380 assert!(popped.runner.is_none());
381 assert!(q.finish_running(1));
382
383 assert!(q.inner.runners.is_empty());
385 }
386}