risingwave_storage/hummock/compactor/iceberg_compaction/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub use self::iceberg_compactor_runner::create_task_execution;
16pub(crate) use self::report::{
17    IcebergPlanCompletion, IcebergTaskReport, IcebergTaskTracker, ReportSendResult,
18    build_iceberg_task_report, flush_pending_iceberg_task_reports,
19    send_or_buffer_iceberg_task_report,
20};
21use crate::hummock::compactor::iceberg_compaction::iceberg_compactor_runner::IcebergCompactionPlanRunner;
22
23pub(crate) mod iceberg_compactor_runner;
24pub(crate) mod report;
25
26use std::collections::{HashMap, VecDeque};
27use std::sync::Arc;
28
29use tokio::sync::Notify;
30
31/// Unique key combining `(task_id, plan_index)` since one task can have multiple plans.
32pub(crate) type TaskKey = (u64, usize);
33
34/// Task metadata for queue operations.
35#[derive(Debug, Clone)]
36pub struct IcebergTaskMeta {
37    pub task_id: u64,
38    pub plan_index: usize,
39    /// Must be in range `1..=max_parallelism`
40    pub required_parallelism: u32,
41}
42
43#[derive(Debug)]
44pub struct PoppedIcebergTask {
45    pub meta: IcebergTaskMeta,
46    pub runner: Option<IcebergCompactionPlanRunner>,
47}
48
49impl IcebergTaskMeta {
50    fn key(&self) -> TaskKey {
51        (self.task_id, self.plan_index)
52    }
53}
54
55/// Internal storage for the task queue.
56struct IcebergTaskQueueInner {
57    /// FIFO queue of waiting task metadata
58    deque: VecDeque<IcebergTaskMeta>,
59    /// Maps `(task_id, plan_index)` to `required_parallelism` for tracking
60    id_map: HashMap<TaskKey, u32>,
61    /// Sum of `required_parallelism` for all waiting tasks
62    waiting_parallelism_sum: u32,
63    /// Sum of `required_parallelism` for all running tasks
64    running_parallelism_sum: u32,
65    /// Optional runner payloads indexed by `(task_id, plan_index)`
66    runners: HashMap<TaskKey, IcebergCompactionPlanRunner>,
67}
68
69/// FIFO task queue with parallelism-based scheduling for Iceberg compaction.
70///
71/// Tasks execute in submission order when sufficient parallelism is available.
72/// The queue tracks waiting and running tasks to prevent over-commitment of resources.
73///
74/// Constraints:
75/// - Each task requires `1..=max_parallelism` units
76/// - Total waiting parallelism cannot exceed `pending_parallelism_budget`
77/// - Total running parallelism cannot exceed `max_parallelism`
78/// - Tasks block until enough parallelism is available
79///
80/// Note: The queue does NOT deduplicate or reorder tasks. Task management
81/// (deduplication, merging, cancellation) is Meta's responsibility.
82pub struct IcebergTaskQueue {
83    inner: IcebergTaskQueueInner,
84    /// Maximum concurrent parallelism for running tasks
85    max_parallelism: u32,
86    /// Maximum total parallelism for waiting tasks (backpressure limit)
87    pending_parallelism_budget: u32,
88    /// Notification for event-driven scheduling
89    schedule_notify: Arc<Notify>,
90}
91
92#[derive(Debug, PartialEq, Eq)]
93pub enum PushResult {
94    Added,
95    /// Would exceed `pending_parallelism_budget`
96    RejectedCapacity,
97    /// `required_parallelism` > `max_parallelism`
98    RejectedTooLarge,
99    /// `required_parallelism` == 0
100    RejectedInvalidParallelism,
101    /// Task with same `(task_id, plan_index)` already exists
102    RejectedDuplicate,
103}
104
105impl IcebergTaskQueue {
106    pub fn new(max_parallelism: u32, pending_parallelism_budget: u32) -> Self {
107        assert!(max_parallelism > 0, "max_parallelism must be > 0");
108        assert!(
109            pending_parallelism_budget >= max_parallelism,
110            "pending budget should allow at least one task"
111        );
112        Self {
113            inner: IcebergTaskQueueInner {
114                deque: VecDeque::new(),
115                id_map: HashMap::new(),
116                waiting_parallelism_sum: 0,
117                running_parallelism_sum: 0,
118                runners: HashMap::new(),
119            },
120            max_parallelism,
121            pending_parallelism_budget,
122            schedule_notify: Arc::new(Notify::new()),
123        }
124    }
125
126    /// Waits until there are tasks that can be scheduled.
127    ///
128    /// Returns `true` if there are schedulable tasks, `false` otherwise.
129    /// Use this in a `tokio::select!` to wake up when tasks become schedulable.
130    pub async fn wait_schedulable(&self) -> bool {
131        // Check if we have tasks that can be scheduled right now
132        if self.has_schedulable_tasks() {
133            return true;
134        }
135        // Otherwise wait for notification
136        self.schedule_notify.notified().await;
137        self.has_schedulable_tasks()
138    }
139
140    fn has_schedulable_tasks(&self) -> bool {
141        if let Some(front_task) = self.inner.deque.front() {
142            let available_parallelism = self
143                .max_parallelism
144                .saturating_sub(self.inner.running_parallelism_sum);
145            available_parallelism >= front_task.required_parallelism
146        } else {
147            false
148        }
149    }
150
151    fn notify_schedulable(&self) {
152        if self.has_schedulable_tasks() {
153            self.schedule_notify.notify_one();
154        }
155    }
156
157    pub fn running_parallelism_sum(&self) -> u32 {
158        self.inner.running_parallelism_sum
159    }
160
161    pub fn waiting_parallelism_sum(&self) -> u32 {
162        self.inner.waiting_parallelism_sum
163    }
164
165    fn available_parallelism(&self) -> u32 {
166        self.max_parallelism
167            .saturating_sub(self.inner.running_parallelism_sum)
168    }
169
170    /// Push a task into the queue.
171    ///
172    /// The task is validated and added to the end of the FIFO queue if constraints are met.
173    pub fn push(
174        &mut self,
175        meta: IcebergTaskMeta,
176        runner: Option<IcebergCompactionPlanRunner>,
177    ) -> PushResult {
178        if meta.required_parallelism == 0 {
179            return PushResult::RejectedInvalidParallelism;
180        }
181        if meta.required_parallelism > self.max_parallelism {
182            return PushResult::RejectedTooLarge;
183        }
184
185        let key = meta.key();
186
187        // Reject duplicate keys to prevent inconsistent state between id_map and deque
188        if self.inner.id_map.contains_key(&key) {
189            return PushResult::RejectedDuplicate;
190        }
191
192        let new_total = self.inner.waiting_parallelism_sum + meta.required_parallelism;
193        if new_total > self.pending_parallelism_budget {
194            return PushResult::RejectedCapacity;
195        }
196
197        self.inner.id_map.insert(key, meta.required_parallelism);
198        self.inner.waiting_parallelism_sum = new_total;
199
200        if let Some(r) = runner {
201            self.inner.runners.insert(key, r);
202        }
203
204        self.inner.deque.push_back(meta);
205
206        self.notify_schedulable();
207        PushResult::Added
208    }
209
210    /// Pop the next task if sufficient parallelism is available.
211    ///
212    /// Returns `None` if the queue is empty or the front task cannot fit
213    /// within the available parallelism budget.
214    pub fn pop(&mut self) -> Option<PoppedIcebergTask> {
215        let front = self.inner.deque.front()?;
216        if front.required_parallelism > self.available_parallelism() {
217            return None;
218        }
219
220        let meta = self.inner.deque.pop_front()?;
221        self.inner.waiting_parallelism_sum = self
222            .inner
223            .waiting_parallelism_sum
224            .saturating_sub(meta.required_parallelism);
225        self.inner.running_parallelism_sum = self
226            .inner
227            .running_parallelism_sum
228            .saturating_add(meta.required_parallelism);
229
230        let runner = self.inner.runners.remove(&meta.key());
231        Some(PoppedIcebergTask { meta, runner })
232    }
233
234    /// Mark a task as finished, freeing its parallelism for other tasks.
235    ///
236    /// Returns `true` if the task was found and removed, `false` otherwise.
237    pub fn finish_running(&mut self, task_key: TaskKey) -> bool {
238        let Some(required) = self.inner.id_map.remove(&task_key) else {
239            tracing::warn!(
240                task_id = task_key.0,
241                plan_index = task_key.1,
242                "finish_running called for unknown task key, possible bug: double-finish or invalid key"
243            );
244            return false;
245        };
246
247        self.inner.running_parallelism_sum =
248            self.inner.running_parallelism_sum.saturating_sub(required);
249        self.inner.runners.remove(&task_key);
250        self.notify_schedulable();
251        true
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    fn mk_meta(id: u64, plan_index: usize, p: u32) -> IcebergTaskMeta {
260        IcebergTaskMeta {
261            task_id: id,
262            plan_index,
263            required_parallelism: p,
264        }
265    }
266
267    #[test]
268    fn test_basic_push_pop() {
269        let mut q = IcebergTaskQueue::new(8, 32);
270        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
271        assert_eq!(q.waiting_parallelism_sum(), 4);
272
273        let popped = q.pop().expect("should pop");
274        assert_eq!(popped.meta.task_id, 1);
275        assert_eq!(q.waiting_parallelism_sum(), 0);
276        assert_eq!(q.running_parallelism_sum(), 4);
277
278        assert!(q.finish_running((1, 0)));
279        assert_eq!(q.running_parallelism_sum(), 0);
280    }
281
282    #[test]
283    fn test_fifo_ordering() {
284        let mut q = IcebergTaskQueue::new(8, 32);
285        assert_eq!(q.push(mk_meta(1, 0, 2), None), PushResult::Added);
286        assert_eq!(q.push(mk_meta(2, 0, 2), None), PushResult::Added);
287        assert_eq!(q.push(mk_meta(3, 0, 2), None), PushResult::Added);
288
289        assert_eq!(q.pop().unwrap().meta.task_id, 1);
290        assert_eq!(q.pop().unwrap().meta.task_id, 2);
291        assert_eq!(q.pop().unwrap().meta.task_id, 3);
292    }
293
294    #[test]
295    fn test_capacity_reject() {
296        let mut q = IcebergTaskQueue::new(4, 6);
297        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
298        assert_eq!(q.push(mk_meta(2, 0, 3), None), PushResult::Added); // sum=6
299        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::RejectedCapacity); // would exceed
300    }
301
302    #[test]
303    fn test_invalid_parallelism() {
304        let mut q = IcebergTaskQueue::new(4, 10);
305        assert_eq!(
306            q.push(mk_meta(1, 0, 0), None),
307            PushResult::RejectedInvalidParallelism
308        );
309        assert_eq!(q.push(mk_meta(2, 0, 5), None), PushResult::RejectedTooLarge); // > max
310    }
311
312    #[test]
313    fn test_duplicate_key_rejected() {
314        let mut q = IcebergTaskQueue::new(8, 32);
315        assert_eq!(q.push(mk_meta(1, 0, 3), None), PushResult::Added);
316        // Same (task_id, plan_index) should be rejected
317        assert_eq!(
318            q.push(mk_meta(1, 0, 5), None),
319            PushResult::RejectedDuplicate
320        );
321        // Parallelism sum should not have changed
322        assert_eq!(q.waiting_parallelism_sum(), 3);
323
324        // Different plan_index is allowed
325        assert_eq!(q.push(mk_meta(1, 1, 2), None), PushResult::Added);
326        assert_eq!(q.waiting_parallelism_sum(), 5);
327
328        // After pop and finish, the key can be reused
329        let p = q.pop().unwrap();
330        assert_eq!(p.meta.task_id, 1);
331        assert_eq!(p.meta.plan_index, 0);
332        q.finish_running((1, 0));
333
334        // Now the same key can be pushed again
335        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
336    }
337
338    #[test]
339    fn test_pop_insufficient_parallelism() {
340        let mut q = IcebergTaskQueue::new(8, 32);
341        assert_eq!(q.push(mk_meta(1, 0, 6), None), PushResult::Added);
342        assert_eq!(q.push(mk_meta(2, 0, 4), None), PushResult::Added);
343
344        let p1 = q.pop().unwrap();
345        assert_eq!(p1.meta.task_id, 1);
346        // Not enough remaining parallelism (only 2 left)
347        assert!(q.pop().is_none());
348
349        // Finish first, then second becomes schedulable
350        assert!(q.finish_running((1, 0)));
351        let p2 = q.pop().unwrap();
352        assert_eq!(p2.meta.task_id, 2);
353    }
354
355    #[test]
356    fn test_finish_nonexistent_task() {
357        let mut q = IcebergTaskQueue::new(4, 16);
358        assert!(!q.finish_running((999, 0)));
359        assert_eq!(q.running_parallelism_sum(), 0);
360    }
361
362    #[test]
363    fn test_double_finish() {
364        let mut q = IcebergTaskQueue::new(8, 32);
365        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
366        q.pop().unwrap();
367        assert_eq!(q.running_parallelism_sum(), 4);
368
369        // First finish succeeds
370        assert!(q.finish_running((1, 0)));
371        assert_eq!(q.running_parallelism_sum(), 0);
372
373        // Second finish on same key returns false (triggers warn log)
374        assert!(!q.finish_running((1, 0)));
375        assert_eq!(q.running_parallelism_sum(), 0);
376    }
377
378    #[test]
379    fn test_max_parallelism_boundary() {
380        // pending_budget == max_parallelism: minimal valid configuration
381        let mut q = IcebergTaskQueue::new(4, 4);
382
383        // Task with required_parallelism == max_parallelism should be accepted
384        assert_eq!(q.push(mk_meta(1, 0, 4), None), PushResult::Added);
385        // Budget exhausted
386        assert_eq!(q.push(mk_meta(2, 0, 1), None), PushResult::RejectedCapacity);
387
388        // Can pop and run at full parallelism
389        let p = q.pop().unwrap();
390        assert_eq!(p.meta.required_parallelism, 4);
391        assert_eq!(q.running_parallelism_sum(), 4);
392
393        // No room for any new running task
394        assert_eq!(q.push(mk_meta(3, 0, 1), None), PushResult::Added);
395        assert!(q.pop().is_none());
396    }
397
398    #[test]
399    fn test_same_task_id_multiple_plans() {
400        let mut q = IcebergTaskQueue::new(10, 30);
401        let task_id = 1u64;
402
403        // Same task_id with different plan_index are independent
404        assert_eq!(q.push(mk_meta(task_id, 0, 3), None), PushResult::Added);
405        assert_eq!(q.push(mk_meta(task_id, 1, 4), None), PushResult::Added);
406        assert_eq!(q.push(mk_meta(task_id, 2, 2), None), PushResult::Added);
407        assert_eq!(q.waiting_parallelism_sum(), 9);
408
409        // Pop all
410        for i in 0..3 {
411            let p = q.pop().unwrap();
412            assert_eq!(p.meta.task_id, task_id);
413            assert_eq!(p.meta.plan_index, i);
414        }
415        assert_eq!(q.running_parallelism_sum(), 9);
416
417        // Finish out of order
418        assert!(q.finish_running((task_id, 1)));
419        assert_eq!(q.running_parallelism_sum(), 5);
420        assert!(q.finish_running((task_id, 0)));
421        assert!(q.finish_running((task_id, 2)));
422        assert_eq!(q.running_parallelism_sum(), 0);
423    }
424
425    #[test]
426    fn test_empty_queue_behavior() {
427        let mut q = IcebergTaskQueue::new(8, 32);
428        assert!(q.pop().is_none());
429        assert!(!q.finish_running((1, 0)));
430        assert_eq!(q.waiting_parallelism_sum(), 0);
431        assert_eq!(q.running_parallelism_sum(), 0);
432    }
433}