risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
16
17pub(crate) mod iceberg_compactor_runner;
18
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use tokio::sync::Notify;
23
24/// Unique key combining `(task_id, plan_index)` since one task can have multiple plans.
25pub(crate) type TaskKey = (u64, usize);
26
27/// Task metadata for queue operations.
28#[derive(Debug, Clone)]
29pub struct IcebergTaskMeta {
30    pub task_id: u64,
31    pub plan_index: usize,
32    /// Must be in range `1..=max_parallelism`
33    pub required_parallelism: u32,
34}
35
36#[derive(Debug)]
37pub struct PoppedIcebergTask {
38    pub meta: IcebergTaskMeta,
39    pub runner: Option<IcebergCompactionPlanRunner>,
40}
41
42impl IcebergTaskMeta {
43    fn key(&self) -> TaskKey {
44        (self.task_id, self.plan_index)
45    }
46}
47
48/// Internal storage for the task queue.
49struct IcebergTaskQueueInner {
50    /// FIFO queue of waiting task metadata
51    deque: VecDeque<IcebergTaskMeta>,
52    /// Maps `(task_id, plan_index)` to `required_parallelism` for tracking
53    id_map: HashMap<TaskKey, u32>,
54    /// Sum of `required_parallelism` for all waiting tasks
55    waiting_parallelism_sum: u32,
56    /// Sum of `required_parallelism` for all running tasks
57    running_parallelism_sum: u32,
58    /// Optional runner payloads indexed by `(task_id, plan_index)`
59    runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
60}
61
62/// FIFO task queue with parallelism-based scheduling for Iceberg compaction.
63///
64/// Tasks execute in submission order when sufficient parallelism is available.
65/// The queue tracks waiting and running tasks to prevent over-commitment of resources.
66///
67/// Constraints:
68/// - Each task requires `1..=max_parallelism` units
69/// - Total waiting parallelism cannot exceed `pending_parallelism_budget`
70/// - Total running parallelism cannot exceed `max_parallelism`
71/// - Tasks block until enough parallelism is available
72///
73/// Note: The queue does NOT deduplicate or reorder tasks. Task management
74/// (deduplication, merging, cancellation) is Meta's responsibility.
75pub struct IcebergTaskQueue {
76    inner: IcebergTaskQueueInner,
77    /// Maximum concurrent parallelism for running tasks
78    max_parallelism: u32,
79    /// Maximum total parallelism for waiting tasks (backpressure limit)
80    pending_parallelism_budget: u32,
81    /// Notification for event-driven scheduling
82    schedule_notify: Arc<Notify>,
83}
84
85#[derive(Debug, PartialEq, Eq)]
86pub enum PushResult {
87    Added,
88    /// Would exceed `pending_parallelism_budget`
89    RejectedCapacity,
90    /// `required_parallelism` > `max_parallelism`
91    RejectedTooLarge,
92    /// `required_parallelism` == 0
93    RejectedInvalidParallelism,
94    /// Task with same `(task_id, plan_index)` already exists
95    RejectedDuplicate,
96}
97
98impl IcebergTaskQueue {
99    pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
100        assert!(max_parallelism > 0, "max_parallelism must be > 0");
101        assert!(
102            pending_parallelism_budget >= max_parallelism,
103            "pending budget should allow at least one task"
104        );
105        Self {
106            inner: IcebergTaskQueueInner {
107                deque: VecDeque::new(),
108                id_map: HashMap::new(),
109                waiting_parallelism_sum: 0,
110                running_parallelism_sum: 0,
111                runners: HashMap::new(),
112            },
113            max_parallelism,
114            pending_parallelism_budget,
115            schedule_notify: Arc::new(Notify::new()),
116        }
117    }
118
119    /// Waits until there are tasks that can be scheduled.
120    ///
121    /// Returns `true` if there are schedulable tasks, `false` otherwise.
122    /// Use this in a `tokio::select!` to wake up when tasks become schedulable.
123    pub async fn wait_schedulable(&self) -> bool {
124        // Check if we have tasks that can be scheduled right now
125        if self.has_schedulable_tasks() {
126            return true;
127        }
128        // Otherwise wait for notification
129        self.schedule_notify.notified().await;
130        self.has_schedulable_tasks()
131    }
132
133    fn has_schedulable_tasks(&self) -> bool {
134        if let Some(front_task) = self.inner.deque.front() {
135            let available_parallelism = self
136                .max_parallelism
137                .saturating_sub(self.inner.running_parallelism_sum);
138            available_parallelism >= front_task.required_parallelism
139        } else {
140            false
141        }
142    }
143
144    fn notify_schedulable(&self) {
145        if self.has_schedulable_tasks() {
146            self.schedule_notify.notify_one();
147        }
148    }
149
150    pub fn running_parallelism_sum(&self) -> u32 {
151        self.inner.running_parallelism_sum
152    }
153
154    pub fn waiting_parallelism_sum(&self) -> u32 {
155        self.inner.waiting_parallelism_sum
156    }
157
158    fn available_parallelism(&self) -> u32 {
159        self.max_parallelism
160            .saturating_sub(self.inner.running_parallelism_sum)
161    }
162
163    /// Push a task into the queue.
164    ///
165    /// The task is validated and added to the end of the FIFO queue if constraints are met.
166    pub fn push(
167        &mut self,
168        meta: IcebergTaskMeta,
169        runner: Option<IcebergCompactionPlanRunner>,
170    ) -> PushResult {
171        if meta.required_parallelism == 0 {
172            return PushResult::RejectedInvalidParallelism;
173        }
174        if meta.required_parallelism > self.max_parallelism {
175            return PushResult::RejectedTooLarge;
176        }
177
178        let key = meta.key();
179
180        // Reject duplicate keys to prevent inconsistent state between id_map and deque
181        if self.inner.id_map.contains_key(&key) {
182            return PushResult::RejectedDuplicate;
183        }
184
185        let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
186        if new_total > self.pending_parallelism_budget {
187            return PushResult::RejectedCapacity;
188        }
189
190        self.inner.id_map.insert(key, meta.required_parallelism);
191        self.inner.waiting_parallelism_sum = new_total;
192
193        if let Some(r) = runner {
194            self.inner.runners.insert(key, r);
195        }
196
197        self.inner.deque.push_back(meta);
198
199        self.notify_schedulable();
200        PushResult::Added
201    }
202
203    /// Pop the next task if sufficient parallelism is available.
204    ///
205    /// Returns `None` if the queue is empty or the front task cannot fit
206    /// within the available parallelism budget.
207    pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
208        let front = self.inner.deque.front()?;
209        if front.required_parallelism > self.available_parallelism() {
210            return None;
211        }
212
213        let meta = self.inner.deque.pop_front()?;
214        self.inner.waiting_parallelism_sum = self
215            .inner
216            .waiting_parallelism_sum
217            .saturating_sub(meta.required_parallelism);
218        self.inner.running_parallelism_sum = self
219            .inner
220            .running_parallelism_sum
221            .saturating_add(meta.required_parallelism);
222
223        let runner = self.inner.runners.remove(&meta.key());
224        Some(PoppedIcebergTask { meta, runner })
225    }
226
227    /// Mark a task as finished, freeing its parallelism for other tasks.
228    ///
229    /// Returns `true` if the task was found and removed, `false` otherwise.
230    pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
231        let Some(required) = self.inner.id_map.remove(&task_key) else {
232            tracing::warn!(
233                task_id = task_key.0,
234                plan_index = task_key.1,
235                "finish_running called for unknown task key, possible bug: double-finish or invalid key"
236            );
237            return false;
238        };
239
240        self.inner.running_parallelism_sum =
241            self.inner.running_parallelism_sum.saturating_sub(required);
242        self.inner.runners.remove(&task_key);
243        self.notify_schedulable();
244        true
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
253        IcebergTaskMeta {
254            task_id: id,
255            plan_index,
256            required_parallelism: p,
257        }
258    }
259
260    #[test]
261    fn test_basic_push_pop() {
262        let mut q = IcebergTaskQueue::new(8, 32);
263        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
264        assert_eq!(q.waiting_parallelism_sum(), 4);
265
266        let popped = q.pop().expect("should pop");
267        assert_eq!(popped.meta.task_id, 1);
268        assert_eq!(q.waiting_parallelism_sum(), 0);
269        assert_eq!(q.running_parallelism_sum(), 4);
270
271        assert!(q.finish_running((1, 0)));
272        assert_eq!(q.running_parallelism_sum(), 0);
273    }
274
275    #[test]
276    fn test_fifo_ordering() {
277        let mut q = IcebergTaskQueue::new(8, 32);
278        assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
279        assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
280        assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
281
282        assert_eq!(q.pop().unwrap().meta.task_id, 1);
283        assert_eq!(q.pop().unwrap().meta.task_id, 2);
284        assert_eq!(q.pop().unwrap().meta.task_id, 3);
285    }
286
287    #[test]
288    fn test_capacity_reject() {
289        let mut q = IcebergTaskQueue::new(4, 6);
290        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
291        assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); // sum=6
292        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); // would exceed
293    }
294
295    #[test]
296    fn test_invalid_parallelism() {
297        let mut q = IcebergTaskQueue::new(4, 10);
298        assert_eq!(
299            q.push(mk_meta(1, 0, 0), None),
300            PushResult::RejectedInvalidParallelism
301        );
302        assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); // > max
303    }
304
305    #[test]
306    fn test_duplicate_key_rejected() {
307        let mut q = IcebergTaskQueue::new(8, 32);
308        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
309        // Same (task_id, plan_index) should be rejected
310        assert_eq!(
311            q.push(mk_meta(1, 0, 5), None),
312            PushResult::RejectedDuplicate
313        );
314        // Parallelism sum should not have changed
315        assert_eq!(q.waiting_parallelism_sum(), 3);
316
317        // Different plan_index is allowed
318        assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
319        assert_eq!(q.waiting_parallelism_sum(), 5);
320
321        // After pop and finish, the key can be reused
322        let p = q.pop().unwrap();
323        assert_eq!(p.meta.task_id, 1);
324        assert_eq!(p.meta.plan_index, 0);
325        q.finish_running((1, 0));
326
327        // Now the same key can be pushed again
328        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
329    }
330
331    #[test]
332    fn test_pop_insufficient_parallelism() {
333        let mut q = IcebergTaskQueue::new(8, 32);
334        assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
335        assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
336
337        let p1 = q.pop().unwrap();
338        assert_eq!(p1.meta.task_id, 1);
339        // Not enough remaining parallelism (only 2 left)
340        assert!(q.pop().is_none());
341
342        // Finish first, then second becomes schedulable
343        assert!(q.finish_running((1, 0)));
344        let p2 = q.pop().unwrap();
345        assert_eq!(p2.meta.task_id, 2);
346    }
347
348    #[test]
349    fn test_finish_nonexistent_task() {
350        let mut q = IcebergTaskQueue::new(4, 16);
351        assert!(!q.finish_running((999, 0)));
352        assert_eq!(q.running_parallelism_sum(), 0);
353    }
354
355    #[test]
356    fn test_double_finish() {
357        let mut q = IcebergTaskQueue::new(8, 32);
358        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
359        q.pop().unwrap();
360        assert_eq!(q.running_parallelism_sum(), 4);
361
362        // First finish succeeds
363        assert!(q.finish_running((1, 0)));
364        assert_eq!(q.running_parallelism_sum(), 0);
365
366        // Second finish on same key returns false (triggers warn log)
367        assert!(!q.finish_running((1, 0)));
368        assert_eq!(q.running_parallelism_sum(), 0);
369    }
370
371    #[test]
372    fn test_max_parallelism_boundary() {
373        // pending_budget == max_parallelism: minimal valid configuration
374        let mut q = IcebergTaskQueue::new(4, 4);
375
376        // Task with required_parallelism == max_parallelism should be accepted
377        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
378        // Budget exhausted
379        assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
380
381        // Can pop and run at full parallelism
382        let p = q.pop().unwrap();
383        assert_eq!(p.meta.required_parallelism, 4);
384        assert_eq!(q.running_parallelism_sum(), 4);
385
386        // No room for any new running task
387        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
388        assert!(q.pop().is_none());
389    }
390
391    #[test]
392    fn test_same_task_id_multiple_plans() {
393        let mut q = IcebergTaskQueue::new(10, 30);
394        let task_id = 1u64;
395
396        // Same task_id with different plan_index are independent
397        assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
398        assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
399        assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
400        assert_eq!(q.waiting_parallelism_sum(), 9);
401
402        // Pop all
403        for i in 0..3 {
404            let p = q.pop().unwrap();
405            assert_eq!(p.meta.task_id, task_id);
406            assert_eq!(p.meta.plan_index, i);
407        }
408        assert_eq!(q.running_parallelism_sum(), 9);
409
410        // Finish out of order
411        assert!(q.finish_running((task_id, 1)));
412        assert_eq!(q.running_parallelism_sum(), 5);
413        assert!(q.finish_running((task_id, 0)));
414        assert!(q.finish_running((task_id, 2)));
415        assert_eq!(q.running_parallelism_sum(), 0);
416    }
417
418    #[test]
419    fn test_empty_queue_behavior() {
420        let mut q = IcebergTaskQueue::new(8, 32);
421        assert!(q.pop().is_none());
422        assert!(!q.finish_running((1, 0)));
423        assert_eq!(q.waiting_parallelism_sum(), 0);
424        assert_eq!(q.running_parallelism_sum(), 0);
425    }
426}