risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, HashSet, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24type TaskId = u64;
25type TaskUniqueIdent = String; // format!("{}-{:?}", catalog_name, table_ident)
26
27/// Metadata of a task waiting (or running) – independent from the actual runner implementation.
28/// Runner (payload) is stored separately so tests can omit it.
29#[derive(Debug, Clone)]
30pub struct IcebergTaskMeta {
31    pub task_id: TaskId,
32    pub unique_ident: TaskUniqueIdent,
33    pub enqueue_at: std::time::Instant,
34    /// Estimated parallelism required to execute this task (must be >0 and <= `max_parallelism`)
35    pub required_parallelism: u32,
36}
37
38#[derive(Debug)]
39pub struct PoppedIcebergTask {
40    pub meta: IcebergTaskMeta,
41    pub runner: Option<IcebergCompactorRunner>,
42}
43
44/// Internal storage. `waiting` and `running` are disjoint.
45struct IcebergTaskQueueInner {
46    deque: VecDeque<IcebergTaskMeta>,  // FIFO of waiting task metadata
47    waiting: HashSet<TaskUniqueIdent>, // unique_idents waiting
48    running: HashSet<TaskUniqueIdent>, // unique_idents currently running
49    id_map: HashMap<TaskId, (TaskUniqueIdent, u32)>, /* task_id -> (unique_ident, required_parallelism) */
50    waiting_parallelism_sum: u32, // sum(required_parallelism) for waiting tasks only
51    running_parallelism_sum: u32, // sum(required_parallelism) for running tasks only
52    runners: HashMap<TaskId, IcebergCompactorRunner>, /* optional runner payloads (may be absent in tests) */
53}
54
55/// FIFO compaction task queue with lightweight replacement and parallelism budgeting.
56/// Features:
57/// - Push with replacement (same `unique_ident` waiting -> metadata/runner updated in place).
58/// - Pop moves the head (if it fits remaining parallelism) to the running set.
59/// - Reject duplicate if the same `unique_ident` is already running.
60/// - Capacity control: reject when adding/replacing would exceed `pending_parallelism_budget`
61///   (sum of waiting tasks' `required_parallelism`). No eviction beyond hard rejection.
62/// - Optional notification when new tasks become schedulable.
63///
64/// Invariants (enforced by logic; violation triggers panic):
65/// - `waiting ∩ running = ∅`.
66/// - Each waiting `unique_ident` appears exactly once in `deque`.
67/// - `waiting_parallelism_sum = Σ required_parallelism(waiting)`.
68/// - `running_parallelism_sum = Σ required_parallelism(running)`.
69pub struct IcebergTaskQueue {
70    inner: IcebergTaskQueueInner,
71    /// Maximum parallelism that a single task may require (cluster effective max / scheduling window upper bound).
72    max_parallelism: u32,
73    /// Budget for `sum(required_parallelism)` of waiting tasks (buffer), e.g. 4 * `max_parallelism`.
74    pending_parallelism_budget: u32,
75    /// Notification for when tasks become schedulable
76    schedule_notify: Option<Arc<Notify>>,
77}
78
79#[derive(Debug, PartialEq, Eq)]
80pub enum PushResult {
81    Added,
82    Replaced { old_task_id: TaskId },
83    RejectedRunningDuplicate,   // same unique ident already running
84    RejectedCapacity,           // waiting parallelism budget exceeded and no replacement happened
85    RejectedTooLarge,           // required_parallelism > max_parallelism
86    RejectedInvalidParallelism, // required_parallelism == 0
87}
88
89impl IcebergTaskQueue {
90    pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
91        assert!(max_parallelism > 0, "max_parallelism must be > 0");
92        assert!(
93            pending_parallelism_budget >= max_parallelism,
94            "pending budget should allow at least one task"
95        );
96        Self {
97            inner: IcebergTaskQueueInner {
98                deque: VecDeque::new(),
99                waiting: HashSet::new(),
100                running: HashSet::new(),
101                id_map: HashMap::new(),
102                waiting_parallelism_sum: 0,
103                running_parallelism_sum: 0,
104                runners: HashMap::new(),
105            },
106            max_parallelism,
107            pending_parallelism_budget,
108            schedule_notify: None,
109        }
110    }
111
112    pub fn new_with_notify(
113        max_parallelism: u32,
114        pending_parallelism_budget: u32,
115    ) -> (Self, Arc<Notify>) {
116        let notify = Arc::new(Notify::new());
117        let mut queue = Self::new(max_parallelism, pending_parallelism_budget);
118        queue.schedule_notify = Some(notify.clone());
119        (queue, notify)
120    }
121
122    pub async fn wait_schedulable(&self) -> bool {
123        if let Some(notify) = &self.schedule_notify {
124            // Check if we have tasks that can be scheduled right now
125            if self.has_schedulable_tasks() {
126                return true;
127            }
128            // Otherwise wait for notification
129            notify.notified().await;
130            self.has_schedulable_tasks()
131        } else {
132            self.has_schedulable_tasks()
133        }
134    }
135
136    fn has_schedulable_tasks(&self) -> bool {
137        if let Some(front_task) = self.inner.deque.front() {
138            let available_parallelism = self
139                .max_parallelism
140                .saturating_sub(self.inner.running_parallelism_sum);
141            available_parallelism >= front_task.required_parallelism
142        } else {
143            false
144        }
145    }
146
147    fn notify_schedulable(&self) {
148        if let Some(notify) = &self.schedule_notify
149            && self.has_schedulable_tasks()
150        {
151            notify.notify_one();
152        }
153    }
154
155    pub fn running_parallelism_sum(&self) -> u32 {
156        self.inner.running_parallelism_sum
157    }
158
159    pub fn waiting_parallelism_sum(&self) -> u32 {
160        self.inner.waiting_parallelism_sum
161    }
162
163    fn available_parallelism(&self) -> u32 {
164        self.max_parallelism
165            .saturating_sub(self.inner.running_parallelism_sum)
166    }
167
168    /// Enqueue semantics:
169    /// - Waiting duplicate: replace in place (position preserved) -> `Replaced { old_task_id }`.
170    ///   * If new runner is `Some`, replace runner; else keep old runner.
171    /// - Running duplicate: reject -> `RejectedRunningDuplicate`.
172    /// - Budget exceeded (sum of waiting parallelism would surpass `pending_parallelism_budget`): `RejectedCapacity`.
173    /// - `required_parallelism == 0`: `RejectedInvalidParallelism`.
174    /// - `required_parallelism > max_parallelism`: `RejectedTooLarge`.
175    /// - Otherwise append -> `Added`.
176    pub fn push(
177        &mut self,
178        meta: IcebergTaskMeta,
179        runner: Option<IcebergCompactorRunner>,
180    ) -> PushResult {
181        let uid = &meta.unique_ident;
182        if meta.required_parallelism == 0 {
183            return PushResult::RejectedInvalidParallelism;
184        }
185        if meta.required_parallelism > self.max_parallelism {
186            return PushResult::RejectedTooLarge;
187        }
188        if self.inner.running.contains(uid) {
189            return PushResult::RejectedRunningDuplicate;
190        }
191
192        if self.inner.waiting.contains(uid) {
193            for slot in &mut self.inner.deque {
194                if slot.unique_ident == *uid {
195                    let old_task_id = slot.task_id;
196                    let old_required = slot.required_parallelism;
197                    let new_sum = self.inner.waiting_parallelism_sum - old_required
198                        + meta.required_parallelism;
199                    if new_sum > self.pending_parallelism_budget {
200                        return PushResult::RejectedCapacity;
201                    }
202                    // meta replacement (position preserved)
203                    self.inner.id_map.remove(&old_task_id);
204                    slot.task_id = meta.task_id;
205                    slot.enqueue_at = meta.enqueue_at;
206                    slot.required_parallelism = meta.required_parallelism;
207                    self.inner
208                        .id_map
209                        .insert(slot.task_id, (uid.clone(), slot.required_parallelism));
210                    self.inner.waiting_parallelism_sum = new_sum;
211                    if let Some(r) = runner {
212                        // replace runner only if provided
213                        self.inner.runners.remove(&old_task_id);
214                        self.inner.runners.insert(slot.task_id, r);
215                    } else {
216                        // retain old runner mapping
217                        if old_task_id != slot.task_id
218                            && let Some(old_runner) = self.inner.runners.remove(&old_task_id)
219                        {
220                            self.inner.runners.insert(slot.task_id, old_runner);
221                        }
222                    }
223                    return PushResult::Replaced { old_task_id };
224                }
225            }
226            // Invariant violation: `waiting` contains `uid` but no corresponding entry in `deque`.
227            // This indicates internal inconsistency (likely a logic regression). Crash fast to surface the bug.
228            panic!(
229                "IcebergTaskQueue invariant violated: waiting contains {uid} but deque missing. waiting_size={}, deque_len={}, id_map_size={}, waiting_parallelism_sum={}, running_parallelism_sum={}",
230                self.inner.waiting.len(),
231                self.inner.deque.len(),
232                self.inner.id_map.len(),
233                self.inner.waiting_parallelism_sum,
234                self.inner.running_parallelism_sum,
235            );
236        }
237        // New unique ident
238        if self.inner.waiting_parallelism_sum + meta.required_parallelism
239            > self.pending_parallelism_budget
240        {
241            return PushResult::RejectedCapacity;
242        }
243        self.inner.waiting.insert(uid.clone());
244        self.inner
245            .id_map
246            .insert(meta.task_id, (uid.clone(), meta.required_parallelism));
247        self.inner.waiting_parallelism_sum += meta.required_parallelism;
248        self.inner.deque.push_back(meta);
249        if let Some(r) = runner {
250            self.inner.runners.insert(r.task_id, r);
251        }
252        // Notify that we might have schedulable tasks now
253        self.notify_schedulable();
254        PushResult::Added
255    }
256
257    // Pop the head task (strict FIFO) if it fits remaining parallelism.
258    pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
259        let front = self.inner.deque.front()?;
260        if front.required_parallelism > self.available_parallelism() {
261            return None;
262        }
263        let meta = self.inner.deque.pop_front()?;
264        let uid = meta.unique_ident.clone();
265        debug_assert!(self.inner.waiting.contains(&uid));
266        self.inner.waiting.remove(&uid);
267        self.inner.running.insert(uid);
268        self.inner.waiting_parallelism_sum = self
269            .inner
270            .waiting_parallelism_sum
271            .saturating_sub(meta.required_parallelism);
272        self.inner.running_parallelism_sum = self
273            .inner
274            .running_parallelism_sum
275            .saturating_add(meta.required_parallelism);
276        let runner = self.inner.runners.remove(&meta.task_id);
277        Some(PoppedIcebergTask { meta, runner })
278    }
279
280    pub fn finish_running(&mut self, task_id: TaskId) -> bool {
281        let Some((uid, required)) = self.inner.id_map.remove(&task_id) else {
282            return false;
283        };
284        if self.inner.running.remove(&uid) {
285            self.inner.running_parallelism_sum =
286                self.inner.running_parallelism_sum.saturating_sub(required);
287            // Runner lifecycle: consumed by logic after take_runner; cleanup here if still in map to avoid leak.
288            self.inner.runners.remove(&task_id);
289            // Notify that we might have schedulable tasks now (capacity freed up)
290            self.notify_schedulable();
291            true
292        } else {
293            false
294        }
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    fn mk_meta(id: u64, ident: &str, p: u32) -> IcebergTaskMeta {
303        IcebergTaskMeta {
304            task_id: id,
305            unique_ident: ident.to_owned(),
306            enqueue_at: std::time::Instant::now(),
307            required_parallelism: p,
308        }
309    }
310
311    #[test]
312    fn test_push_pop_with_runner() {
313        let mut q = IcebergTaskQueue::new(8, 32); // max_parallelism=8, budget 32
314        // Fabricating runner via minimal construction path is complex; we only test presence flag by pushing None and Some.
315        let meta = mk_meta(1, "t1", 4);
316        let res = q.push(meta, None); // no runner
317        assert_eq!(res, PushResult::Added);
318        let popped = q.pop().expect("should pop");
319        assert_eq!(popped.meta.task_id, 1);
320        assert!(popped.runner.is_none());
321        assert!(q.finish_running(1));
322        assert_eq!(q.running_parallelism_sum(), 0);
323    }
324
325    #[test]
326    fn test_replacement_keep_old_runner() {
327        let mut q = IcebergTaskQueue::new(8, 32);
328        let m1 = mk_meta(10, "same", 3);
329        let _ = q.push(m1, None); // initial meta, no runner
330        let m2 = mk_meta(11, "same", 5); // new required_parallelism
331        let res = q.push(m2, None); // replacement preserves absence of runner
332        assert!(matches!(res, PushResult::Replaced { old_task_id: 10 }));
333        let popped = q.pop().unwrap();
334        assert_eq!(popped.meta.task_id, 11);
335        assert!(popped.runner.is_none());
336    }
337
338    #[test]
339    fn test_capacity_reject() {
340        let mut q = IcebergTaskQueue::new(4, 6); // max_parallelism=4, budget=6
341        assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
342        assert_eq!(q.push(mk_meta(2, "b", 3), None), PushResult::Added); // waiting sum=6
343        // new unique ident would exceed budget
344        assert_eq!(
345            q.push(mk_meta(3, "c", 1), None),
346            PushResult::RejectedCapacity
347        );
348    }
349
350    #[test]
351    fn test_invalid_parallelism() {
352        let mut q = IcebergTaskQueue::new(4, 10);
353        assert_eq!(
354            q.push(mk_meta(1, "a", 0), None),
355            PushResult::RejectedInvalidParallelism
356        );
357        assert_eq!(
358            q.push(mk_meta(2, "a", 5), None),
359            PushResult::RejectedTooLarge
360        ); // > max_parallelism
361    }
362
363    #[test]
364    fn test_running_duplicate_reject() {
365        let mut q = IcebergTaskQueue::new(8, 32);
366        assert_eq!(q.push(mk_meta(1, "x", 4), None), PushResult::Added);
367        let _p = q.pop().unwrap();
368        // now x is running
369        assert_eq!(
370            q.push(mk_meta(2, "x", 4), None),
371            PushResult::RejectedRunningDuplicate
372        );
373        assert!(q.finish_running(1));
374        // after finishing can push again
375        assert_eq!(q.push(mk_meta(3, "x", 4), None), PushResult::Added);
376    }
377
378    #[test]
379    fn test_replacement_exceed_budget_reject() {
380        // budget=10, current waiting sum=8, replacement would raise to 11 -> reject
381        let mut q = IcebergTaskQueue::new(8, 10);
382        assert_eq!(q.push(mk_meta(1, "a", 4), None), PushResult::Added);
383        assert_eq!(q.push(mk_meta(2, "b", 4), None), PushResult::Added);
384        // Attempt to replace b (waiting) with required_parallelism=7 -> exceeds budget
385        assert_eq!(
386            q.push(mk_meta(3, "b", 7), None),
387            PushResult::RejectedCapacity
388        );
389        // Pop order & parallelism unchanged
390        let p1 = q.pop().unwrap();
391        assert_eq!(p1.meta.unique_ident, "a");
392        let p2 = q.pop().unwrap();
393        assert_eq!(p2.meta.unique_ident, "b");
394        assert_eq!(p2.meta.required_parallelism, 4);
395    }
396
397    #[test]
398    fn test_replacement_position_preserved() {
399        let mut q = IcebergTaskQueue::new(8, 32);
400        assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
401        assert_eq!(q.push(mk_meta(2, "b", 3), None), PushResult::Added);
402        // Replace head (a) with new task id 10 and different parallelism
403        assert!(matches!(
404            q.push(mk_meta(10, "a", 5), None),
405            PushResult::Replaced { old_task_id: 1 }
406        ));
407        // Pop should still return "a" (now id 10) before "b"
408        let p1 = q.pop().unwrap();
409        assert_eq!(p1.meta.task_id, 10);
410        assert_eq!(p1.meta.unique_ident, "a");
411        let p2 = q.pop().unwrap();
412        assert_eq!(p2.meta.unique_ident, "b");
413    }
414
415    #[test]
416    fn test_pop_insufficient_parallelism() {
417        // max_parallelism=8, first task uses 6, second needs 4 (cannot run concurrently)
418        let mut q = IcebergTaskQueue::new(8, 32);
419        assert_eq!(q.push(mk_meta(1, "a", 6), None), PushResult::Added);
420        assert_eq!(q.push(mk_meta(2, "b", 4), None), PushResult::Added);
421        let p1 = q.pop().unwrap();
422        assert_eq!(p1.meta.unique_ident, "a");
423        // Not enough remaining parallelism (only 2 left)
424        assert!(q.pop().is_none());
425        // Finish first, then second becomes schedulable
426        assert!(q.finish_running(1));
427        let p2 = q.pop().unwrap();
428        assert_eq!(p2.meta.unique_ident, "b");
429    }
430
431    #[test]
432    fn test_finish_running_nonexistent() {
433        let mut q = IcebergTaskQueue::new(4, 16);
434        assert!(!q.finish_running(999)); // no such task id
435        assert_eq!(q.running_parallelism_sum(), 0);
436        assert_eq!(q.waiting_parallelism_sum(), 0);
437    }
438
439    #[test]
440    fn test_finish_running_updates_sums() {
441        let mut q = IcebergTaskQueue::new(8, 32);
442        assert_eq!(q.push(mk_meta(1, "a", 5), None), PushResult::Added);
443        let _ = q.pop().unwrap();
444        assert_eq!(q.running_parallelism_sum(), 5);
445        assert!(q.finish_running(1));
446        assert_eq!(q.running_parallelism_sum(), 0);
447        assert!(q.pop().is_none()); // queue empty
448    }
449
450    #[test]
451    fn test_replacement_parallelism_sum_adjustment() {
452        let mut q = IcebergTaskQueue::new(8, 32);
453        assert_eq!(q.push(mk_meta(1, "a", 3), None), PushResult::Added);
454        assert_eq!(q.waiting_parallelism_sum(), 3);
455        // Replace with higher required_parallelism
456        assert!(matches!(
457            q.push(mk_meta(2, "a", 6), None),
458            PushResult::Replaced { old_task_id: 1 }
459        ));
460        assert_eq!(q.waiting_parallelism_sum(), 6);
461        // Pop should show new parallelism
462        let p = q.pop().unwrap();
463        assert_eq!(p.meta.required_parallelism, 6);
464        assert!(q.finish_running(p.meta.task_id));
465    }
466}