Skip to main content

risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub use self::iceberg_compactor_runner::create_task_execution;
16pub(crate) use self::report::{
17    IcebergPlanCompletion, IcebergTaskReport, IcebergTaskTracker, ReportSendResult,
18    build_iceberg_task_report, flush_pending_iceberg_task_reports,
19    send_or_buffer_iceberg_task_report,
20};
21use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
22
23pub(crate) mod iceberg_compactor_runner;
24pub(crate) mod report;
25
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28
29use tokio::sync::Notify;
30
31/// Unique key combining `(task_id, plan_index)` since one task can have multiple plans.
32pub(crate) type TaskKey = (u64, usize);
33
34/// Task metadata for queue operations.
35#[derive(Debug, Clone)]
36pub struct IcebergTaskMeta {
37    pub task_id: u64,
38    pub plan_index: usize,
39    /// Must be in range `1..=max_parallelism`
40    pub required_parallelism: u32,
41}
42
43#[derive(Debug)]
44pub struct PoppedIcebergTask {
45    pub meta: IcebergTaskMeta,
46    pub runner: Option<IcebergCompactionPlanRunner>,
47}
48
49impl IcebergTaskMeta {
50    fn key(&self) -> TaskKey {
51        (self.task_id, self.plan_index)
52    }
53}
54
55/// Internal storage for the task queue.
56struct IcebergTaskQueueInner {
57    /// FIFO queue of waiting task metadata
58    deque: VecDeque<IcebergTaskMeta>,
59    /// Maps `(task_id, plan_index)` to `required_parallelism` for tracking
60    id_map: HashMap<TaskKey, u32>,
61    /// Sum of `required_parallelism` for all waiting tasks
62    waiting_parallelism_sum: u32,
63    /// Sum of `required_parallelism` for all running tasks
64    running_parallelism_sum: u32,
65    /// Optional runner payloads indexed by `(task_id, plan_index)`
66    runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
67}
68
69/// FIFO task queue with parallelism-based scheduling for Iceberg compaction.
70///
71/// Tasks execute in submission order when sufficient parallelism is available.
72/// The queue tracks waiting and running tasks to prevent over-commitment of resources.
73///
74/// Constraints:
75/// - Each task requires `1..=max_parallelism` units
76/// - Total waiting parallelism cannot exceed `pending_parallelism_budget`
77/// - Total running parallelism cannot exceed `max_parallelism`
78/// - Tasks block until enough parallelism is available
79///
80/// Note: The queue does NOT deduplicate or reorder tasks. Task management
81/// (deduplication, merging, cancellation) is Meta's responsibility.
82pub struct IcebergTaskQueue {
83    inner: IcebergTaskQueueInner,
84    /// Maximum concurrent parallelism for running tasks
85    max_parallelism: u32,
86    /// Maximum total parallelism for waiting tasks (backpressure limit)
87    pending_parallelism_budget: u32,
88    /// Notification for event-driven scheduling
89    schedule_notify: Arc<Notify>,
90}
91
92#[derive(Debug, PartialEq, Eq)]
93pub enum PushResult {
94    Added,
95    /// Would exceed `pending_parallelism_budget`
96    RejectedCapacity,
97    /// `required_parallelism` > `max_parallelism`
98    RejectedTooLarge,
99    /// `required_parallelism` == 0
100    RejectedInvalidParallelism,
101    /// Task with same `(task_id, plan_index)` already exists
102    RejectedDuplicate,
103}
104
105impl IcebergTaskQueue {
106    pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
107        assert!(max_parallelism > 0, "max_parallelism must be > 0");
108        assert!(
109            pending_parallelism_budget >= max_parallelism,
110            "pending budget should allow at least one task"
111        );
112        Self {
113            inner: IcebergTaskQueueInner {
114                deque: VecDeque::new(),
115                id_map: HashMap::new(),
116                waiting_parallelism_sum: 0,
117                running_parallelism_sum: 0,
118                runners: HashMap::new(),
119            },
120            max_parallelism,
121            pending_parallelism_budget,
122            schedule_notify: Arc::new(Notify::new()),
123        }
124    }
125
126    /// Waits until there are tasks that can be scheduled.
127    ///
128    /// Returns `true` if there are schedulable tasks, `false` otherwise.
129    /// Use this in a `tokio::select!` to wake up when tasks become schedulable.
130    pub async fn wait_schedulable(&self) -> bool {
131        // Check if we have tasks that can be scheduled right now
132        if self.has_schedulable_tasks() {
133            return true;
134        }
135        // Otherwise wait for notification
136        self.schedule_notify.notified().await;
137        self.has_schedulable_tasks()
138    }
139
140    fn has_schedulable_tasks(&self) -> bool {
141        if let Some(front_task) = self.inner.deque.front() {
142            let available_parallelism = self
143                .max_parallelism
144                .saturating_sub(self.inner.running_parallelism_sum);
145            available_parallelism >= front_task.required_parallelism
146        } else {
147            false
148        }
149    }
150
151    fn notify_schedulable(&self) {
152        if self.has_schedulable_tasks() {
153            self.schedule_notify.notify_one();
154        }
155    }
156
157    pub fn running_parallelism_sum(&self) -> u32 {
158        self.inner.running_parallelism_sum
159    }
160
161    pub fn waiting_parallelism_sum(&self) -> u32 {
162        self.inner.waiting_parallelism_sum
163    }
164
165    fn available_parallelism(&self) -> u32 {
166        self.max_parallelism
167            .saturating_sub(self.inner.running_parallelism_sum)
168    }
169
170    /// Push a task into the queue.
171    ///
172    /// The task is validated and added to the end of the FIFO queue if constraints are met.
173    pub fn push(
174        &mut self,
175        meta: IcebergTaskMeta,
176        runner: Option<IcebergCompactionPlanRunner>,
177    ) -> PushResult {
178        if meta.required_parallelism == 0 {
179            return PushResult::RejectedInvalidParallelism;
180        }
181        if meta.required_parallelism > self.max_parallelism {
182            return PushResult::RejectedTooLarge;
183        }
184
185        let key = meta.key();
186
187        // Reject duplicate keys to prevent inconsistent state between id_map and deque
188        if self.inner.id_map.contains_key(&key) {
189            return PushResult::RejectedDuplicate;
190        }
191
192        let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
193        if new_total > self.pending_parallelism_budget {
194            return PushResult::RejectedCapacity;
195        }
196
197        self.inner.id_map.insert(key, meta.required_parallelism);
198        self.inner.waiting_parallelism_sum = new_total;
199
200        if let Some(r) = runner {
201            self.inner.runners.insert(key, r);
202        }
203
204        self.inner.deque.push_back(meta);
205
206        self.notify_schedulable();
207        PushResult::Added
208    }
209
210    /// Pop the next task if sufficient parallelism is available.
211    ///
212    /// Returns `None` if the queue is empty or the front task cannot fit
213    /// within the available parallelism budget.
214    pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
215        let front = self.inner.deque.front()?;
216        if front.required_parallelism > self.available_parallelism() {
217            return None;
218        }
219
220        let meta = self.inner.deque.pop_front()?;
221        self.inner.waiting_parallelism_sum = self
222            .inner
223            .waiting_parallelism_sum
224            .saturating_sub(meta.required_parallelism);
225        self.inner.running_parallelism_sum = self
226            .inner
227            .running_parallelism_sum
228            .saturating_add(meta.required_parallelism);
229
230        let runner = self.inner.runners.remove(&meta.key());
231        Some(PoppedIcebergTask { meta, runner })
232    }
233
234    /// Mark a task as finished, freeing its parallelism for other tasks.
235    ///
236    /// Returns `true` if the task was found and removed, `false` otherwise.
237    pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
238        let Some(required) = self.inner.id_map.remove(&task_key) else {
239            tracing::warn!(
240                task_id = task_key.0,
241                plan_index = task_key.1,
242                "finish_running called for unknown task key, possible bug: double-finish or invalid key"
243            );
244            return false;
245        };
246
247        self.inner.running_parallelism_sum =
248            self.inner.running_parallelism_sum.saturating_sub(required);
249        self.inner.runners.remove(&task_key);
250        self.notify_schedulable();
251        true
252    }
253
254    /// Cancel all waiting plans belonging to the given task.
255    ///
256    /// Returns the number of waiting plans removed from the queue.
257    pub fn cancel_waiting_task(&mut self, task_id: u64) -> usize {
258        let mut retained = VecDeque::with_capacity(self.inner.deque.len());
259        let mut cancelled_parallelism = 0;
260        let mut cancelled_count = 0;
261
262        while let Some(meta) = self.inner.deque.pop_front() {
263            if meta.task_id == task_id {
264                cancelled_parallelism += meta.required_parallelism;
265                cancelled_count += 1;
266                self.inner.id_map.remove(&meta.key());
267                self.inner.runners.remove(&meta.key());
268            } else {
269                retained.push_back(meta);
270            }
271        }
272
273        self.inner.deque = retained;
274        self.inner.waiting_parallelism_sum = self
275            .inner
276            .waiting_parallelism_sum
277            .saturating_sub(cancelled_parallelism);
278
279        if cancelled_count > 0 {
280            self.notify_schedulable();
281        }
282
283        cancelled_count
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
292        IcebergTaskMeta {
293            task_id: id,
294            plan_index,
295            required_parallelism: p,
296        }
297    }
298
299    #[test]
300    fn test_basic_push_pop() {
301        let mut q = IcebergTaskQueue::new(8, 32);
302        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
303        assert_eq!(q.waiting_parallelism_sum(), 4);
304
305        let popped = q.pop().expect("should pop");
306        assert_eq!(popped.meta.task_id, 1);
307        assert_eq!(q.waiting_parallelism_sum(), 0);
308        assert_eq!(q.running_parallelism_sum(), 4);
309
310        assert!(q.finish_running((1, 0)));
311        assert_eq!(q.running_parallelism_sum(), 0);
312    }
313
314    #[test]
315    fn test_fifo_ordering() {
316        let mut q = IcebergTaskQueue::new(8, 32);
317        assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
318        assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
319        assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
320
321        assert_eq!(q.pop().unwrap().meta.task_id, 1);
322        assert_eq!(q.pop().unwrap().meta.task_id, 2);
323        assert_eq!(q.pop().unwrap().meta.task_id, 3);
324    }
325
326    #[test]
327    fn test_capacity_reject() {
328        let mut q = IcebergTaskQueue::new(4, 6);
329        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
330        assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); // sum=6
331        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); // would exceed
332    }
333
334    #[test]
335    fn test_invalid_parallelism() {
336        let mut q = IcebergTaskQueue::new(4, 10);
337        assert_eq!(
338            q.push(mk_meta(1, 0, 0), None),
339            PushResult::RejectedInvalidParallelism
340        );
341        assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); // > max
342    }
343
344    #[test]
345    fn test_duplicate_key_rejected() {
346        let mut q = IcebergTaskQueue::new(8, 32);
347        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
348        // Same (task_id, plan_index) should be rejected
349        assert_eq!(
350            q.push(mk_meta(1, 0, 5), None),
351            PushResult::RejectedDuplicate
352        );
353        // Parallelism sum should not have changed
354        assert_eq!(q.waiting_parallelism_sum(), 3);
355
356        // Different plan_index is allowed
357        assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
358        assert_eq!(q.waiting_parallelism_sum(), 5);
359
360        // After pop and finish, the key can be reused
361        let p = q.pop().unwrap();
362        assert_eq!(p.meta.task_id, 1);
363        assert_eq!(p.meta.plan_index, 0);
364        q.finish_running((1, 0));
365
366        // Now the same key can be pushed again
367        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
368    }
369
370    #[test]
371    fn test_pop_insufficient_parallelism() {
372        let mut q = IcebergTaskQueue::new(8, 32);
373        assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
374        assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
375
376        let p1 = q.pop().unwrap();
377        assert_eq!(p1.meta.task_id, 1);
378        // Not enough remaining parallelism (only 2 left)
379        assert!(q.pop().is_none());
380
381        // Finish first, then second becomes schedulable
382        assert!(q.finish_running((1, 0)));
383        let p2 = q.pop().unwrap();
384        assert_eq!(p2.meta.task_id, 2);
385    }
386
387    #[test]
388    fn test_finish_nonexistent_task() {
389        let mut q = IcebergTaskQueue::new(4, 16);
390        assert!(!q.finish_running((999, 0)));
391        assert_eq!(q.running_parallelism_sum(), 0);
392    }
393
394    #[test]
395    fn test_double_finish() {
396        let mut q = IcebergTaskQueue::new(8, 32);
397        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
398        q.pop().unwrap();
399        assert_eq!(q.running_parallelism_sum(), 4);
400
401        // First finish succeeds
402        assert!(q.finish_running((1, 0)));
403        assert_eq!(q.running_parallelism_sum(), 0);
404
405        // Second finish on same key returns false (triggers warn log)
406        assert!(!q.finish_running((1, 0)));
407        assert_eq!(q.running_parallelism_sum(), 0);
408    }
409
410    #[test]
411    fn test_max_parallelism_boundary() {
412        // pending_budget == max_parallelism: minimal valid configuration
413        let mut q = IcebergTaskQueue::new(4, 4);
414
415        // Task with required_parallelism == max_parallelism should be accepted
416        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
417        // Budget exhausted
418        assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
419
420        // Can pop and run at full parallelism
421        let p = q.pop().unwrap();
422        assert_eq!(p.meta.required_parallelism, 4);
423        assert_eq!(q.running_parallelism_sum(), 4);
424
425        // No room for any new running task
426        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
427        assert!(q.pop().is_none());
428    }
429
430    #[test]
431    fn test_same_task_id_multiple_plans() {
432        let mut q = IcebergTaskQueue::new(10, 30);
433        let task_id = 1u64;
434
435        // Same task_id with different plan_index are independent
436        assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
437        assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
438        assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
439        assert_eq!(q.waiting_parallelism_sum(), 9);
440
441        // Pop all
442        for i in 0..3 {
443            let p = q.pop().unwrap();
444            assert_eq!(p.meta.task_id, task_id);
445            assert_eq!(p.meta.plan_index, i);
446        }
447        assert_eq!(q.running_parallelism_sum(), 9);
448
449        // Finish out of order
450        assert!(q.finish_running((task_id, 1)));
451        assert_eq!(q.running_parallelism_sum(), 5);
452        assert!(q.finish_running((task_id, 0)));
453        assert!(q.finish_running((task_id, 2)));
454        assert_eq!(q.running_parallelism_sum(), 0);
455    }
456
457    #[test]
458    fn test_cancel_waiting_task_only_removes_waiting_plans() {
459        let mut q = IcebergTaskQueue::new(10, 30);
460        let task_id = 1u64;
461
462        assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
463        assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
464        assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
465
466        let popped = q.pop().unwrap();
467        assert_eq!(popped.meta.task_id, task_id);
468        assert_eq!(popped.meta.plan_index, 0);
469        assert_eq!(q.running_parallelism_sum(), 3);
470        assert_eq!(q.waiting_parallelism_sum(), 6);
471
472        assert_eq!(q.cancel_waiting_task(task_id), 1);
473        assert_eq!(q.running_parallelism_sum(), 3);
474        assert_eq!(q.waiting_parallelism_sum(), 2);
475
476        assert!(q.finish_running((task_id, 0)));
477        let next = q.pop().unwrap();
478        assert_eq!(next.meta.task_id, 2);
479        assert_eq!(next.meta.plan_index, 0);
480    }
481
482    #[test]
483    fn test_empty_queue_behavior() {
484        let mut q = IcebergTaskQueue::new(8, 32);
485        assert!(q.pop().is_none());
486        assert!(!q.finish_running((1, 0)));
487        assert_eq!(q.waiting_parallelism_sum(), 0);
488        assert_eq!(q.running_parallelism_sum(), 0);
489    }
490}