risingwave_meta/manager/iceberg_compaction/
schedule.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
22use risingwave_connector::sink::SinkParam;
23use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
24use risingwave_connector::sink::iceberg::{
25    CompactionType, IcebergConfig, should_enable_iceberg_cow,
26};
27use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
28use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
29use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
30use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
31use thiserror_ext::AsReport;
32
33use super::*;
34
35/// Compaction track states using type-safe state machine pattern
36#[derive(Debug, Clone)]
37enum CompactionTrackState {
38    /// Ready to accept commits and check for trigger conditions
39    Idle { next_compaction_time: Instant },
40    /// Task has been selected locally but not yet accepted by a compactor.
41    PendingDispatch {
42        next_compaction_time_on_failure: Instant,
43        pending_commit_count_at_dispatch: usize,
44    },
45    /// Compaction task is in-flight. `report_deadline` acts as a lease; if it
46    /// expires before a report arrives, the task becomes retryable.
47    InFlight {
48        task_id: u64,
49        pending_commit_count_at_dispatch: usize,
50        report_deadline: Instant,
51    },
52}
53
54#[derive(Debug, Clone)]
55pub(super) struct CompactionTrack {
56    task_type: TaskType,
57    trigger_interval_sec: u64,
58    /// Minimum pending commit threshold to trigger compaction early.
59    /// Compaction triggers when `pending_commit_count` >= this threshold, even before interval expires.
60    trigger_snapshot_count: usize,
61    report_timeout: Duration,
62    last_config_refresh_at: Instant,
63    pending_commit_count: usize,
64    state: CompactionTrackState,
65}
66
67impl CompactionTrack {
68    fn new(
69        task_type: TaskType,
70        trigger_interval_sec: u64,
71        trigger_snapshot_count: usize,
72        report_timeout: Duration,
73        now: Instant,
74    ) -> Self {
75        Self {
76            task_type,
77            trigger_interval_sec,
78            trigger_snapshot_count,
79            report_timeout,
80            last_config_refresh_at: now,
81            pending_commit_count: 0,
82            state: CompactionTrackState::Idle {
83                next_compaction_time: now + Duration::from_secs(trigger_interval_sec),
84            },
85        }
86    }
87
88    /// Determines if compaction should be triggered.
89    ///
90    /// Trigger conditions (OR logic):
91    /// 1. `commit_ready` - Pending commit count >= threshold (early trigger)
92    /// 2. `time_ready && has_commits` - Interval expired and there's at least 1 pending commit
93    ///
94    /// This ensures:
95    /// - `trigger_snapshot_count` is an early trigger threshold
96    /// - `compaction_interval_sec` is the maximum wait time (as long as there are new snapshots)
97    /// - Force compaction works by setting `next_compaction_time` to now
98    /// - No empty compaction runs (requires at least 1 snapshot for time-based trigger)
99    fn should_trigger(&self, now: Instant) -> bool {
100        let next_compaction_time = match &self.state {
101            CompactionTrackState::Idle {
102                next_compaction_time,
103            } => *next_compaction_time,
104            CompactionTrackState::PendingDispatch { .. }
105            | CompactionTrackState::InFlight { .. } => return false,
106        };
107
108        let time_ready = now >= next_compaction_time;
109        let commit_ready = self.pending_commit_count >= self.trigger_snapshot_count;
110        let has_commits = self.pending_commit_count > 0;
111
112        commit_ready || (time_ready && has_commits)
113    }
114
115    fn record_commit(&mut self) {
116        self.pending_commit_count = self.pending_commit_count.saturating_add(1);
117    }
118
119    fn record_force_compaction(&mut self, now: Instant) {
120        if let CompactionTrackState::Idle {
121            next_compaction_time,
122        } = &mut self.state
123        {
124            *next_compaction_time = now;
125            self.pending_commit_count = self.pending_commit_count.max(1);
126        }
127    }
128
129    fn needs_config_refresh(&self, now: Instant, refresh_interval: Duration) -> bool {
130        now.saturating_duration_since(self.last_config_refresh_at) >= refresh_interval
131    }
132
133    fn should_refresh_config(&self, now: Instant, refresh_interval: Duration) -> bool {
134        matches!(self.state, CompactionTrackState::Idle { .. })
135            && self.needs_config_refresh(now, refresh_interval)
136    }
137
138    fn mark_config_refreshed(&mut self, now: Instant) {
139        self.last_config_refresh_at = now;
140    }
141
142    fn start_processing(&mut self) {
143        match &self.state {
144            CompactionTrackState::Idle {
145                next_compaction_time,
146            } => {
147                self.state = CompactionTrackState::PendingDispatch {
148                    next_compaction_time_on_failure: *next_compaction_time,
149                    pending_commit_count_at_dispatch: self.pending_commit_count,
150                };
151            }
152            CompactionTrackState::PendingDispatch { .. }
153            | CompactionTrackState::InFlight { .. } => {
154                unreachable!("Cannot start processing when already processing")
155            }
156        }
157    }
158
159    fn mark_dispatched(&mut self, task_id: u64, now: Instant) {
160        let pending_commit_count_at_dispatch = match &self.state {
161            CompactionTrackState::PendingDispatch {
162                pending_commit_count_at_dispatch,
163                ..
164            } => *pending_commit_count_at_dispatch,
165            CompactionTrackState::Idle { .. } => unreachable!("Cannot mark dispatched when idle"),
166            CompactionTrackState::InFlight { .. } => {
167                unreachable!("Cannot mark dispatched when already in flight")
168            }
169        };
170        self.state = CompactionTrackState::InFlight {
171            task_id,
172            pending_commit_count_at_dispatch,
173            report_deadline: now + self.report_timeout,
174        };
175    }
176
177    fn is_pending_dispatch(&self) -> bool {
178        matches!(self.state, CompactionTrackState::PendingDispatch { .. })
179    }
180
181    fn is_processing_task(&self, task_id: u64) -> bool {
182        matches!(
183            &self.state,
184            CompactionTrackState::InFlight {
185                task_id: current_task_id,
186                ..
187            } if *current_task_id == task_id
188        )
189    }
190
191    fn is_report_timed_out(&self, now: Instant) -> bool {
192        matches!(
193            &self.state,
194            CompactionTrackState::InFlight {
195                report_deadline,
196                ..
197            } if now >= *report_deadline
198        )
199    }
200
201    fn finish_success(&mut self, now: Instant) {
202        match &self.state {
203            CompactionTrackState::InFlight {
204                pending_commit_count_at_dispatch,
205                ..
206            } => {
207                self.pending_commit_count = self
208                    .pending_commit_count
209                    .saturating_sub(*pending_commit_count_at_dispatch);
210                self.state = CompactionTrackState::Idle {
211                    next_compaction_time: now + Duration::from_secs(self.trigger_interval_sec),
212                };
213            }
214            CompactionTrackState::Idle { .. } => unreachable!("Cannot finish success when idle"),
215            CompactionTrackState::PendingDispatch { .. } => {
216                unreachable!("Cannot finish success before task dispatch")
217            }
218        }
219    }
220
221    fn finish_failed(&mut self, now: Instant) {
222        match &self.state {
223            CompactionTrackState::InFlight { .. } => {
224                self.state = CompactionTrackState::Idle {
225                    next_compaction_time: now,
226                };
227            }
228            CompactionTrackState::Idle { .. } => unreachable!("Cannot finish failed when idle"),
229            CompactionTrackState::PendingDispatch { .. } => {
230                unreachable!("Cannot finish failed before task dispatch")
231            }
232        }
233    }
234
235    /// Restore the idle scheduling state after a pre-dispatch failure.
236    ///
237    /// `pending_commit_count` is intentionally preserved so commits that arrive
238    /// while the track is pending dispatch are not lost if task dispatch fails
239    /// before the compactor accepts the task.
240    fn revert_pre_dispatch_failure(&mut self) {
241        match &self.state {
242            CompactionTrackState::PendingDispatch {
243                next_compaction_time_on_failure,
244                ..
245            } => {
246                self.state = CompactionTrackState::Idle {
247                    next_compaction_time: *next_compaction_time_on_failure,
248                };
249            }
250            CompactionTrackState::Idle { .. } => {
251                unreachable!("Cannot revert a pre-dispatch failure when idle")
252            }
253            CompactionTrackState::InFlight { .. } => {
254                unreachable!("Cannot revert a pre-dispatch failure after dispatch")
255            }
256        }
257    }
258
259    fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
260        if self.trigger_interval_sec == new_interval_sec {
261            return;
262        }
263
264        self.trigger_interval_sec = new_interval_sec;
265
266        match &mut self.state {
267            CompactionTrackState::Idle {
268                next_compaction_time,
269            } => {
270                *next_compaction_time = now + Duration::from_secs(new_interval_sec);
271            }
272            CompactionTrackState::PendingDispatch { .. }
273            | CompactionTrackState::InFlight { .. } => {}
274        }
275    }
276}
277
278pub(crate) struct IcebergCompactionHandle {
279    sink_id: SinkId,
280    task_type: TaskType,
281    inner: Arc<RwLock<IcebergCompactionManagerInner>>,
282    metadata_manager: MetadataManager,
283    handle_success: bool,
284}
285
286impl IcebergCompactionHandle {
287    fn new(
288        sink_id: SinkId,
289        task_type: TaskType,
290        inner: Arc<RwLock<IcebergCompactionManagerInner>>,
291        metadata_manager: MetadataManager,
292    ) -> Self {
293        Self {
294            sink_id,
295            task_type,
296            inner,
297            metadata_manager,
298            handle_success: false,
299        }
300    }
301
302    pub async fn send_compact_task(
303        mut self,
304        compactor: Arc<crate::hummock::IcebergCompactor>,
305        task_id: u64,
306    ) -> MetaResult<()> {
307        use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
308
309        let Some(prost_sink_catalog) = self
310            .metadata_manager
311            .catalog_controller
312            .get_sink_by_id(self.sink_id)
313            .await?
314        else {
315            tracing::warn!("Sink not found: {}", self.sink_id);
316            return Ok(());
317        };
318        let sink_catalog = SinkCatalog::from(prost_sink_catalog);
319        let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
320
321        let result =
322            compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
323                task_id,
324                sink_id: self.sink_id.as_raw_id(),
325                props: param.properties,
326                task_type: self.task_type as i32,
327            }));
328
329        if result.is_ok() {
330            let mut guard = self.inner.write();
331            let mut dispatched = false;
332            if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
333                && track.is_pending_dispatch()
334            {
335                track.mark_dispatched(task_id, Instant::now());
336                dispatched = true;
337            }
338            self.handle_success = dispatched;
339            if !dispatched {
340                tracing::warn!(
341                    sink_id = %self.sink_id,
342                    task_id,
343                    "Iceberg compaction task send succeeded but track was no longer pending dispatch"
344                );
345            }
346        }
347
348        result
349    }
350}
351
352impl Drop for IcebergCompactionHandle {
353    fn drop(&mut self) {
354        let mut guard = self.inner.write();
355        if !self.handle_success
356            && let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
357            && track.is_pending_dispatch()
358        {
359            track.revert_pre_dispatch_failure();
360        }
361    }
362}
363
364#[derive(Debug, Clone, Copy)]
365enum SinkUpdateKind {
366    Commit,
367    ForceCompaction,
368}
369
370impl SinkUpdateKind {
371    fn apply_to_track(self, track: &mut CompactionTrack, now: Instant) {
372        match self {
373            SinkUpdateKind::Commit => track.record_commit(),
374            SinkUpdateKind::ForceCompaction => track.record_force_compaction(now),
375        }
376    }
377}
378
379/// Result of the read-only preparation step before applying a sink update.
380///
381/// This bundles the original update intent together with the metadata loaded
382/// across the async gap, so the apply step can consume a single object.
383///
384/// `allow_track_initialization` stays `true` only when the sink had no track
385/// before the async config load. This lets the apply step initialize a new
386/// track for first-time updates, while preventing a stale update from
387/// resurrecting a track that disappeared during the async gap.
388struct PreparedSinkUpdate {
389    sink_id: SinkId,
390    kind: SinkUpdateKind,
391    now: Instant,
392    allow_track_initialization: bool,
393    loaded_config: Option<IcebergConfig>,
394}
395
396#[derive(Debug, Clone)]
397pub struct IcebergCompactionScheduleStatus {
398    pub sink_id: SinkId,
399    pub task_type: String,
400    pub trigger_interval_sec: u64,
401    pub trigger_snapshot_count: usize,
402    pub schedule_state: String,
403    pub next_compaction_after_sec: Option<u64>,
404    pub pending_snapshot_count: Option<usize>,
405    pub is_triggerable: bool,
406}
407
408impl IcebergCompactionManager {
409    fn refresh_schedule_config(
410        &self,
411        track: &mut CompactionTrack,
412        iceberg_config: &IcebergConfig,
413        now: Instant,
414    ) {
415        let (task_type, trigger_interval_sec, trigger_snapshot_count) =
416            self.resolve_schedule_values(iceberg_config);
417        track.task_type = task_type;
418        track.trigger_snapshot_count = trigger_snapshot_count;
419        track.update_interval(trigger_interval_sec, now);
420        track.mark_config_refreshed(now);
421    }
422
423    pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
424        let prepared_update = self.prepare_sink_update(msg, Instant::now()).await;
425
426        let mut guard = self.inner.write();
427        self.apply_sink_update(&mut guard, prepared_update);
428    }
429
430    async fn prepare_sink_update(
431        &self,
432        msg: IcebergSinkCompactionUpdate,
433        now: Instant,
434    ) -> PreparedSinkUpdate {
435        let IcebergSinkCompactionUpdate {
436            sink_id,
437            force_compaction,
438        } = msg;
439        let kind = if force_compaction {
440            SinkUpdateKind::ForceCompaction
441        } else {
442            SinkUpdateKind::Commit
443        };
444        let refresh_interval = self.config_refresh_interval();
445        let (allow_track_initialization, should_refresh_config) = {
446            let guard = self.inner.read();
447            match guard.sink_schedules.get(&sink_id) {
448                Some(track) => (false, track.should_refresh_config(now, refresh_interval)),
449                None => (true, true),
450            }
451        };
452
453        let loaded_config = if should_refresh_config {
454            match self.load_iceberg_config(sink_id).await {
455                Ok(config) => Some(config),
456                Err(e) => {
457                    tracing::warn!(
458                        error = ?e.as_report(),
459                        "Failed to load iceberg config for sink {}",
460                        sink_id
461                    );
462                    None
463                }
464            }
465        } else {
466            None
467        };
468
469        PreparedSinkUpdate {
470            sink_id,
471            kind,
472            now,
473            allow_track_initialization,
474            loaded_config,
475        }
476    }
477
478    fn apply_sink_update(
479        &self,
480        guard: &mut IcebergCompactionManagerInner,
481        prepared_update: PreparedSinkUpdate,
482    ) {
483        let PreparedSinkUpdate {
484            sink_id,
485            kind,
486            now,
487            allow_track_initialization,
488            loaded_config,
489        } = prepared_update;
490        let refresh_interval = self.config_refresh_interval();
491
492        match guard.sink_schedules.entry(sink_id) {
493            Entry::Occupied(entry) => {
494                let track = entry.into_mut();
495                if track.should_refresh_config(now, refresh_interval)
496                    && let Some(config) = loaded_config.as_ref()
497                {
498                    self.refresh_schedule_config(track, config, now);
499                }
500
501                kind.apply_to_track(track, now);
502            }
503            Entry::Vacant(entry) => {
504                if !allow_track_initialization {
505                    tracing::warn!(
506                        sink_id = %sink_id,
507                        "Ignoring iceberg compaction update because track disappeared before apply"
508                    );
509                    return;
510                }
511
512                let Some(config) = loaded_config.as_ref() else {
513                    tracing::warn!(
514                        sink_id = %sink_id,
515                        "Ignoring iceberg compaction update because sink config is unavailable"
516                    );
517                    return;
518                };
519
520                let track = entry.insert(self.create_compaction_track(config, now));
521                kind.apply_to_track(track, now);
522            }
523        }
524    }
525
526    fn create_compaction_track(
527        &self,
528        iceberg_config: &IcebergConfig,
529        now: Instant,
530    ) -> CompactionTrack {
531        let (task_type, trigger_interval_sec, trigger_snapshot_count) =
532            self.resolve_schedule_values(iceberg_config);
533
534        CompactionTrack::new(
535            task_type,
536            trigger_interval_sec,
537            trigger_snapshot_count,
538            self.report_timeout(),
539            now,
540        )
541    }
542
543    fn resolve_schedule_values(&self, iceberg_config: &IcebergConfig) -> (TaskType, u64, usize) {
544        (
545            if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
546            {
547                TaskType::Full
548            } else {
549                match iceberg_config.compaction_type() {
550                    CompactionType::Full => TaskType::Full,
551                    CompactionType::SmallFiles => TaskType::SmallFiles,
552                    CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
553                }
554            },
555            iceberg_config.compaction_interval_sec(),
556            iceberg_config.trigger_snapshot_count(),
557        )
558    }
559
560    pub(crate) fn get_top_n_iceberg_commit_sink_ids(
561        &self,
562        n: usize,
563    ) -> Vec<IcebergCompactionHandle> {
564        let now = Instant::now();
565        let mut guard = self.inner.write();
566        for (&sink_id, track) in &mut guard.sink_schedules {
567            if track.is_report_timed_out(now) {
568                tracing::warn!(sink_id = %sink_id, "Iceberg compaction task report timed out");
569                track.finish_failed(now);
570            }
571        }
572
573        let mut candidates = Vec::new();
574        for (sink_id, track) in &guard.sink_schedules {
575            if track.should_trigger(now)
576                && let CompactionTrackState::Idle {
577                    next_compaction_time,
578                } = &track.state
579            {
580                candidates.push((*sink_id, track.task_type, *next_compaction_time));
581            }
582        }
583
584        candidates.sort_by(|a, b| a.2.cmp(&b.2));
585
586        candidates
587            .into_iter()
588            .take(n)
589            .filter_map(|(sink_id, task_type, _)| {
590                let track = guard.sink_schedules.get_mut(&sink_id)?;
591                track.start_processing();
592
593                Some(IcebergCompactionHandle::new(
594                    sink_id,
595                    task_type,
596                    self.inner.clone(),
597                    self.metadata_manager.clone(),
598                ))
599            })
600            .collect()
601    }
602
603    pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
604        let mut guard = self.inner.write();
605        guard.sink_schedules.remove(&sink_id);
606    }
607
608    pub fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
609        let now = Instant::now();
610        let schedules = {
611            let guard = self.inner.read();
612            guard
613                .sink_schedules
614                .iter()
615                .map(|(&sink_id, track)| (sink_id, track.clone()))
616                .collect_vec()
617        };
618
619        let mut statuses = schedules
620            .into_iter()
621            .map(|(sink_id, track)| {
622                let next_compaction_after_sec = match &track.state {
623                    CompactionTrackState::Idle {
624                        next_compaction_time,
625                    } => Some(
626                        next_compaction_time
627                            .saturating_duration_since(now)
628                            .as_secs(),
629                    ),
630                    CompactionTrackState::PendingDispatch { .. }
631                    | CompactionTrackState::InFlight { .. } => None,
632                };
633                let is_triggerable = track.should_trigger(now);
634
635                IcebergCompactionScheduleStatus {
636                    sink_id,
637                    task_type: track.task_type.as_str_name().to_ascii_lowercase(),
638                    trigger_interval_sec: track.trigger_interval_sec,
639                    trigger_snapshot_count: track.trigger_snapshot_count,
640                    schedule_state: match track.state {
641                        CompactionTrackState::Idle { .. } => "idle".to_owned(),
642                        CompactionTrackState::PendingDispatch { .. }
643                        | CompactionTrackState::InFlight { .. } => "processing".to_owned(),
644                    },
645                    next_compaction_after_sec,
646                    pending_snapshot_count: Some(track.pending_commit_count),
647                    is_triggerable,
648                }
649            })
650            .collect_vec();
651
652        statuses.sort_by_key(|status| status.sink_id);
653        statuses
654    }
655
656    pub fn handle_report_task(&self, report: IcebergReportTask) {
657        if self.complete_manual_task_if_any(&report) {
658            return;
659        }
660
661        let sink_id = SinkId::from(report.sink_id);
662        let task_id = report.task_id;
663        let status = IcebergReportTaskStatus::try_from(report.status)
664            .unwrap_or(IcebergReportTaskStatus::Unspecified);
665        let now = Instant::now();
666
667        let mut guard = self.inner.write();
668        let Some(track) = guard.sink_schedules.get_mut(&sink_id) else {
669            tracing::warn!(sink_id = %sink_id, task_id, "Received iceberg compaction report for unknown sink");
670            return;
671        };
672
673        if !track.is_processing_task(task_id) {
674            tracing::warn!(sink_id = %sink_id, task_id, "Ignoring stale iceberg compaction report");
675            return;
676        }
677
678        match status {
679            IcebergReportTaskStatus::Success => {
680                track.finish_success(now);
681            }
682            IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
683                tracing::warn!(
684                    sink_id = %sink_id,
685                    task_id,
686                    error_message = report.error_message.unwrap_or_default(),
687                    "Iceberg compaction task reported failure"
688                );
689                track.finish_failed(now);
690            }
691        }
692    }
693}
694
695#[cfg(test)]
696mod tests;