risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24type TaskId = u64;
25
26/// Task metadata for queue operations.
27///
28/// The actual runner payload is stored separately to support queue operations
29/// without moving ownership, and to allow tests without runner instances.
30#[derive(Debug, Clone)]
31pub struct IcebergTaskMeta {
32    pub task_id: u64,
33    /// Must be in range `1..=max_parallelism`
34    pub required_parallelism: u32,
35}
36
37#[derive(Debug)]
38pub struct PoppedIcebergTask {
39    pub meta: IcebergTaskMeta,
40    pub runner: Option<IcebergCompactionPlanRunner>,
41}
42
43/// Internal storage for the task queue.
44struct IcebergTaskQueueInner {
45    /// FIFO queue of waiting task metadata
46    deque: VecDeque<IcebergTaskMeta>,
47    /// Maps `task_id` to `required_parallelism` for tracking
48    id_map: HashMap<TaskId, u32>,
49    /// Sum of `required_parallelism` for all waiting tasks
50    waiting_parallelism_sum: u32,
51    /// Sum of `required_parallelism` for all running tasks
52    running_parallelism_sum: u32,
53    /// Optional runner payloads indexed by `task_id`
54    runners: HashMap<TaskId, IcebergCompactionPlanRunner>,
55}
56
57/// FIFO task queue with parallelism-based scheduling for Iceberg compaction.
58///
59/// Tasks execute in submission order when sufficient parallelism is available.
60/// The queue tracks waiting and running tasks to prevent over-commitment of resources.
61///
62/// Constraints:
63/// - Each task requires `1..=max_parallelism` units
64/// - Total waiting parallelism cannot exceed `pending_parallelism_budget`
65/// - Total running parallelism cannot exceed `max_parallelism`
66/// - Tasks block until enough parallelism is available
67///
68/// Note: The queue does NOT deduplicate or reorder tasks. Task management
69/// (deduplication, merging, cancellation) is Meta's responsibility.
70pub struct IcebergTaskQueue {
71    inner: IcebergTaskQueueInner,
72    /// Maximum concurrent parallelism for running tasks
73    max_parallelism: u32,
74    /// Maximum total parallelism for waiting tasks (backpressure limit)
75    pending_parallelism_budget: u32,
76    /// Optional notification for event-driven scheduling
77    schedule_notify: Option<Arc<Notify>>,
78}
79
80#[derive(Debug, PartialEq, Eq)]
81pub enum PushResult {
82    Added,
83    /// Would exceed `pending_parallelism_budget`
84    RejectedCapacity,
85    /// `required_parallelism` > `max_parallelism`
86    RejectedTooLarge,
87    /// `required_parallelism` == 0
88    RejectedInvalidParallelism,
89}
90
91impl IcebergTaskQueue {
92    pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
93        assert!(max_parallelism > 0, "max_parallelism must be > 0");
94        assert!(
95            pending_parallelism_budget >= max_parallelism,
96            "pending budget should allow at least one task"
97        );
98        Self {
99            inner: IcebergTaskQueueInner {
100                deque: VecDeque::new(),
101                id_map: HashMap::new(),
102                waiting_parallelism_sum: 0,
103                running_parallelism_sum: 0,
104                runners: HashMap::new(),
105            },
106            max_parallelism,
107            pending_parallelism_budget,
108            schedule_notify: None,
109        }
110    }
111
112    pub fn new_with_notify(
113        max_parallelism: u32,
114        pending_parallelism_budget: u32,
115    ) -> (Self, Arc<Notify>) {
116        let notify = Arc::new(Notify::new());
117        let mut queue = Self::new(max_parallelism, pending_parallelism_budget);
118        queue.schedule_notify = Some(notify.clone());
119        (queue, notify)
120    }
121
122    pub async fn wait_schedulable(&self) -> bool {
123        if let Some(notify) = &self.schedule_notify {
124            // Check if we have tasks that can be scheduled right now
125            if self.has_schedulable_tasks() {
126                return true;
127            }
128            // Otherwise wait for notification
129            notify.notified().await;
130            self.has_schedulable_tasks()
131        } else {
132            self.has_schedulable_tasks()
133        }
134    }
135
136    fn has_schedulable_tasks(&self) -> bool {
137        if let Some(front_task) = self.inner.deque.front() {
138            let available_parallelism = self
139                .max_parallelism
140                .saturating_sub(self.inner.running_parallelism_sum);
141            available_parallelism >= front_task.required_parallelism
142        } else {
143            false
144        }
145    }
146
147    fn notify_schedulable(&self) {
148        if let Some(notify) = &self.schedule_notify
149            && self.has_schedulable_tasks()
150        {
151            notify.notify_one();
152        }
153    }
154
155    pub fn running_parallelism_sum(&self) -> u32 {
156        self.inner.running_parallelism_sum
157    }
158
159    pub fn waiting_parallelism_sum(&self) -> u32 {
160        self.inner.waiting_parallelism_sum
161    }
162
163    fn available_parallelism(&self) -> u32 {
164        self.max_parallelism
165            .saturating_sub(self.inner.running_parallelism_sum)
166    }
167
168    /// Push a task into the queue.
169    ///
170    /// The task is validated and added to the end of the FIFO queue if constraints are met.
171    pub fn push(
172        &mut self,
173        meta: IcebergTaskMeta,
174        runner: Option<IcebergCompactionPlanRunner>,
175    ) -> PushResult {
176        if meta.required_parallelism == 0 {
177            return PushResult::RejectedInvalidParallelism;
178        }
179        if meta.required_parallelism > self.max_parallelism {
180            return PushResult::RejectedTooLarge;
181        }
182
183        let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
184        if new_total > self.pending_parallelism_budget {
185            return PushResult::RejectedCapacity;
186        }
187
188        self.inner
189            .id_map
190            .insert(meta.task_id, meta.required_parallelism);
191        self.inner.waiting_parallelism_sum = new_total;
192        self.inner.deque.push_back(meta);
193
194        if let Some(r) = runner {
195            self.inner.runners.insert(r.task_id, r);
196        }
197
198        self.notify_schedulable();
199        PushResult::Added
200    }
201
202    /// Pop the next task if sufficient parallelism is available.
203    ///
204    /// Returns `None` if the queue is empty or the front task cannot fit
205    /// within the available parallelism budget.
206    pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
207        let front = self.inner.deque.front()?;
208        if front.required_parallelism > self.available_parallelism() {
209            return None;
210        }
211
212        let meta = self.inner.deque.pop_front()?;
213        self.inner.waiting_parallelism_sum = self
214            .inner
215            .waiting_parallelism_sum
216            .saturating_sub(meta.required_parallelism);
217        self.inner.running_parallelism_sum = self
218            .inner
219            .running_parallelism_sum
220            .saturating_add(meta.required_parallelism);
221
222        let runner = self.inner.runners.remove(&meta.task_id);
223        Some(PoppedIcebergTask { meta, runner })
224    }
225
226    /// Mark a task as finished, freeing its parallelism for other tasks.
227    ///
228    /// Returns `true` if the task was found and removed, `false` otherwise.
229    pub fn finish_running(&mut self, task_id: TaskId) -> bool {
230        let Some(required) = self.inner.id_map.remove(&task_id) else {
231            return false;
232        };
233
234        self.inner.running_parallelism_sum =
235            self.inner.running_parallelism_sum.saturating_sub(required);
236        self.inner.runners.remove(&task_id);
237        self.notify_schedulable();
238        true
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    fn mk_meta(id: u64, p: u32) -> IcebergTaskMeta {
247        IcebergTaskMeta {
248            task_id: id,
249            required_parallelism: p,
250        }
251    }
252
253    #[test]
254    fn test_basic_push_pop() {
255        let mut q = IcebergTaskQueue::new(8, 32);
256        assert_eq!(q.push(mk_meta(1, 4), None), PushResult::Added);
257        assert_eq!(q.waiting_parallelism_sum(), 4);
258
259        let popped = q.pop().expect("should pop");
260        assert_eq!(popped.meta.task_id, 1);
261        assert_eq!(q.waiting_parallelism_sum(), 0);
262        assert_eq!(q.running_parallelism_sum(), 4);
263
264        assert!(q.finish_running(1));
265        assert_eq!(q.running_parallelism_sum(), 0);
266    }
267
268    #[test]
269    fn test_fifo_ordering() {
270        let mut q = IcebergTaskQueue::new(8, 32);
271        assert_eq!(q.push(mk_meta(1, 2), None), PushResult::Added);
272        assert_eq!(q.push(mk_meta(2, 2), None), PushResult::Added);
273        assert_eq!(q.push(mk_meta(3, 2), None), PushResult::Added);
274
275        assert_eq!(q.pop().unwrap().meta.task_id, 1);
276        assert_eq!(q.pop().unwrap().meta.task_id, 2);
277        assert_eq!(q.pop().unwrap().meta.task_id, 3);
278    }
279
280    #[test]
281    fn test_capacity_reject() {
282        let mut q = IcebergTaskQueue::new(4, 6);
283        assert_eq!(q.push(mk_meta(1, 3), None), PushResult::Added);
284        assert_eq!(q.push(mk_meta(2, 3), None), PushResult::Added); // sum=6
285        assert_eq!(q.push(mk_meta(3, 1), None), PushResult::RejectedCapacity); // would exceed
286    }
287
288    #[test]
289    fn test_invalid_parallelism() {
290        let mut q = IcebergTaskQueue::new(4, 10);
291        assert_eq!(
292            q.push(mk_meta(1, 0), None),
293            PushResult::RejectedInvalidParallelism
294        );
295        assert_eq!(q.push(mk_meta(2, 5), None), PushResult::RejectedTooLarge); // > max
296    }
297
298    #[test]
299    fn test_pop_insufficient_parallelism() {
300        let mut q = IcebergTaskQueue::new(8, 32);
301        assert_eq!(q.push(mk_meta(1, 6), None), PushResult::Added);
302        assert_eq!(q.push(mk_meta(2, 4), None), PushResult::Added);
303
304        let p1 = q.pop().unwrap();
305        assert_eq!(p1.meta.task_id, 1);
306        // Not enough remaining parallelism (only 2 left)
307        assert!(q.pop().is_none());
308
309        // Finish first, then second becomes schedulable
310        assert!(q.finish_running(1));
311        let p2 = q.pop().unwrap();
312        assert_eq!(p2.meta.task_id, 2);
313    }
314
315    #[test]
316    fn test_finish_nonexistent_task() {
317        let mut q = IcebergTaskQueue::new(4, 16);
318        assert!(!q.finish_running(999)); // no such task
319        assert_eq!(q.running_parallelism_sum(), 0);
320    }
321
322    #[test]
323    fn test_parallelism_sum_accounting() {
324        let mut q = IcebergTaskQueue::new(10, 20);
325
326        assert_eq!(q.push(mk_meta(1, 3), None), PushResult::Added);
327        assert_eq!(q.push(mk_meta(2, 5), None), PushResult::Added);
328        assert_eq!(q.waiting_parallelism_sum(), 8);
329        assert_eq!(q.running_parallelism_sum(), 0);
330
331        let _p1 = q.pop().unwrap();
332        assert_eq!(q.waiting_parallelism_sum(), 5);
333        assert_eq!(q.running_parallelism_sum(), 3);
334
335        let _p2 = q.pop().unwrap();
336        assert_eq!(q.waiting_parallelism_sum(), 0);
337        assert_eq!(q.running_parallelism_sum(), 8);
338
339        assert!(q.finish_running(1));
340        assert_eq!(q.running_parallelism_sum(), 5);
341
342        assert!(q.finish_running(2));
343        assert_eq!(q.running_parallelism_sum(), 0);
344    }
345
346    #[test]
347    fn test_multiple_tasks_same_parallelism() {
348        let mut q = IcebergTaskQueue::new(10, 30);
349        // All tasks can be enqueued (total = 30)
350        for i in 1..=10 {
351            assert_eq!(q.push(mk_meta(i, 3), None), PushResult::Added);
352        }
353        assert_eq!(q.waiting_parallelism_sum(), 30);
354
355        // Can pop 3 tasks (total parallelism = 9)
356        assert!(q.pop().is_some());
357        assert!(q.pop().is_some());
358        assert!(q.pop().is_some());
359        assert!(q.pop().is_none()); // would need 12 total
360
361        assert_eq!(q.running_parallelism_sum(), 9);
362        assert_eq!(q.waiting_parallelism_sum(), 21);
363    }
364
365    #[test]
366    fn test_empty_queue_behavior() {
367        let mut q = IcebergTaskQueue::new(8, 32);
368        assert!(q.pop().is_none());
369        assert!(!q.finish_running(1));
370        assert_eq!(q.waiting_parallelism_sum(), 0);
371        assert_eq!(q.running_parallelism_sum(), 0);
372    }
373
374    #[test]
375    fn test_runner_lifecycle() {
376        let mut q = IcebergTaskQueue::new(8, 32);
377        // Push without runner
378        assert_eq!(q.push(mk_meta(1, 4), None), PushResult::Added);
379        let popped = q.pop().unwrap();
380        assert!(popped.runner.is_none());
381        assert!(q.finish_running(1));
382
383        // Verify runner map is cleaned up
384        assert!(q.inner.runners.is_empty());
385    }
386}