1use std::collections::hash_map::Entry;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use itertools::Itertools;
20use parking_lot::RwLock;
21use risingwave_connector::connector_common::IcebergSinkCompactionUpdate;
22use risingwave_connector::sink::SinkParam;
23use risingwave_connector::sink::catalog::{SinkCatalog, SinkId};
24use risingwave_connector::sink::iceberg::{
25 CompactionType, IcebergConfig, should_enable_iceberg_cow,
26};
27use risingwave_pb::iceberg_compaction::IcebergCompactionTask;
28use risingwave_pb::iceberg_compaction::iceberg_compaction_task::TaskType;
29use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::ReportTask as IcebergReportTask;
30use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_request::report_task::Status as IcebergReportTaskStatus;
31use thiserror_ext::AsReport;
32
33use super::*;
34
35#[derive(Debug, Clone)]
37enum CompactionTrackState {
38 Idle { next_compaction_time: Instant },
40 PendingDispatch {
42 next_compaction_time_on_failure: Instant,
43 pending_commit_count_at_dispatch: usize,
44 },
45 InFlight {
48 task_id: u64,
49 pending_commit_count_at_dispatch: usize,
50 report_deadline: Instant,
51 },
52}
53
54#[derive(Debug, Clone)]
55pub(super) struct CompactionTrack {
56 task_type: TaskType,
57 trigger_interval_sec: u64,
58 trigger_snapshot_count: usize,
61 report_timeout: Duration,
62 last_config_refresh_at: Instant,
63 pending_commit_count: usize,
64 state: CompactionTrackState,
65}
66
67impl CompactionTrack {
68 fn new(
69 task_type: TaskType,
70 trigger_interval_sec: u64,
71 trigger_snapshot_count: usize,
72 report_timeout: Duration,
73 now: Instant,
74 ) -> Self {
75 Self {
76 task_type,
77 trigger_interval_sec,
78 trigger_snapshot_count,
79 report_timeout,
80 last_config_refresh_at: now,
81 pending_commit_count: 0,
82 state: CompactionTrackState::Idle {
83 next_compaction_time: now + Duration::from_secs(trigger_interval_sec),
84 },
85 }
86 }
87
88 fn should_trigger(&self, now: Instant) -> bool {
100 let next_compaction_time = match &self.state {
101 CompactionTrackState::Idle {
102 next_compaction_time,
103 } => *next_compaction_time,
104 CompactionTrackState::PendingDispatch { .. }
105 | CompactionTrackState::InFlight { .. } => return false,
106 };
107
108 let time_ready = now >= next_compaction_time;
109 let commit_ready = self.pending_commit_count >= self.trigger_snapshot_count;
110 let has_commits = self.pending_commit_count > 0;
111
112 commit_ready || (time_ready && has_commits)
113 }
114
115 fn record_commit(&mut self) {
116 self.pending_commit_count = self.pending_commit_count.saturating_add(1);
117 }
118
119 fn record_force_compaction(&mut self, now: Instant) {
120 if let CompactionTrackState::Idle {
121 next_compaction_time,
122 } = &mut self.state
123 {
124 *next_compaction_time = now;
125 self.pending_commit_count = self.pending_commit_count.max(1);
126 }
127 }
128
129 fn needs_config_refresh(&self, now: Instant, refresh_interval: Duration) -> bool {
130 now.saturating_duration_since(self.last_config_refresh_at) >= refresh_interval
131 }
132
133 fn should_refresh_config(&self, now: Instant, refresh_interval: Duration) -> bool {
134 matches!(self.state, CompactionTrackState::Idle { .. })
135 && self.needs_config_refresh(now, refresh_interval)
136 }
137
138 fn mark_config_refreshed(&mut self, now: Instant) {
139 self.last_config_refresh_at = now;
140 }
141
142 fn start_processing(&mut self) {
143 match &self.state {
144 CompactionTrackState::Idle {
145 next_compaction_time,
146 } => {
147 self.state = CompactionTrackState::PendingDispatch {
148 next_compaction_time_on_failure: *next_compaction_time,
149 pending_commit_count_at_dispatch: self.pending_commit_count,
150 };
151 }
152 CompactionTrackState::PendingDispatch { .. }
153 | CompactionTrackState::InFlight { .. } => {
154 unreachable!("Cannot start processing when already processing")
155 }
156 }
157 }
158
159 fn mark_dispatched(&mut self, task_id: u64, now: Instant) {
160 let pending_commit_count_at_dispatch = match &self.state {
161 CompactionTrackState::PendingDispatch {
162 pending_commit_count_at_dispatch,
163 ..
164 } => *pending_commit_count_at_dispatch,
165 CompactionTrackState::Idle { .. } => unreachable!("Cannot mark dispatched when idle"),
166 CompactionTrackState::InFlight { .. } => {
167 unreachable!("Cannot mark dispatched when already in flight")
168 }
169 };
170 self.state = CompactionTrackState::InFlight {
171 task_id,
172 pending_commit_count_at_dispatch,
173 report_deadline: now + self.report_timeout,
174 };
175 }
176
177 fn is_pending_dispatch(&self) -> bool {
178 matches!(self.state, CompactionTrackState::PendingDispatch { .. })
179 }
180
181 fn is_processing_task(&self, task_id: u64) -> bool {
182 matches!(
183 &self.state,
184 CompactionTrackState::InFlight {
185 task_id: current_task_id,
186 ..
187 } if *current_task_id == task_id
188 )
189 }
190
191 fn is_report_timed_out(&self, now: Instant) -> bool {
192 matches!(
193 &self.state,
194 CompactionTrackState::InFlight {
195 report_deadline,
196 ..
197 } if now >= *report_deadline
198 )
199 }
200
201 fn finish_success(&mut self, now: Instant) {
202 match &self.state {
203 CompactionTrackState::InFlight {
204 pending_commit_count_at_dispatch,
205 ..
206 } => {
207 self.pending_commit_count = self
208 .pending_commit_count
209 .saturating_sub(*pending_commit_count_at_dispatch);
210 self.state = CompactionTrackState::Idle {
211 next_compaction_time: now + Duration::from_secs(self.trigger_interval_sec),
212 };
213 }
214 CompactionTrackState::Idle { .. } => unreachable!("Cannot finish success when idle"),
215 CompactionTrackState::PendingDispatch { .. } => {
216 unreachable!("Cannot finish success before task dispatch")
217 }
218 }
219 }
220
221 fn finish_failed(&mut self, now: Instant) {
222 match &self.state {
223 CompactionTrackState::InFlight { .. } => {
224 self.state = CompactionTrackState::Idle {
225 next_compaction_time: now,
226 };
227 }
228 CompactionTrackState::Idle { .. } => unreachable!("Cannot finish failed when idle"),
229 CompactionTrackState::PendingDispatch { .. } => {
230 unreachable!("Cannot finish failed before task dispatch")
231 }
232 }
233 }
234
235 fn revert_pre_dispatch_failure(&mut self) {
241 match &self.state {
242 CompactionTrackState::PendingDispatch {
243 next_compaction_time_on_failure,
244 ..
245 } => {
246 self.state = CompactionTrackState::Idle {
247 next_compaction_time: *next_compaction_time_on_failure,
248 };
249 }
250 CompactionTrackState::Idle { .. } => {
251 unreachable!("Cannot revert a pre-dispatch failure when idle")
252 }
253 CompactionTrackState::InFlight { .. } => {
254 unreachable!("Cannot revert a pre-dispatch failure after dispatch")
255 }
256 }
257 }
258
259 fn update_interval(&mut self, new_interval_sec: u64, now: Instant) {
260 if self.trigger_interval_sec == new_interval_sec {
261 return;
262 }
263
264 self.trigger_interval_sec = new_interval_sec;
265
266 match &mut self.state {
267 CompactionTrackState::Idle {
268 next_compaction_time,
269 } => {
270 *next_compaction_time = now + Duration::from_secs(new_interval_sec);
271 }
272 CompactionTrackState::PendingDispatch { .. }
273 | CompactionTrackState::InFlight { .. } => {}
274 }
275 }
276}
277
278pub(crate) struct IcebergCompactionHandle {
279 sink_id: SinkId,
280 task_type: TaskType,
281 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
282 metadata_manager: MetadataManager,
283 handle_success: bool,
284}
285
286impl IcebergCompactionHandle {
287 fn new(
288 sink_id: SinkId,
289 task_type: TaskType,
290 inner: Arc<RwLock<IcebergCompactionManagerInner>>,
291 metadata_manager: MetadataManager,
292 ) -> Self {
293 Self {
294 sink_id,
295 task_type,
296 inner,
297 metadata_manager,
298 handle_success: false,
299 }
300 }
301
302 pub async fn send_compact_task(
303 mut self,
304 compactor: Arc<crate::hummock::IcebergCompactor>,
305 task_id: u64,
306 ) -> MetaResult<()> {
307 use risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event as IcebergResponseEvent;
308
309 let Some(prost_sink_catalog) = self
310 .metadata_manager
311 .catalog_controller
312 .get_sink_by_id(self.sink_id)
313 .await?
314 else {
315 tracing::warn!("Sink not found: {}", self.sink_id);
316 return Ok(());
317 };
318 let sink_catalog = SinkCatalog::from(prost_sink_catalog);
319 let param = SinkParam::try_from_sink_catalog(sink_catalog)?;
320
321 let result =
322 compactor.send_event(IcebergResponseEvent::CompactTask(IcebergCompactionTask {
323 task_id,
324 sink_id: self.sink_id.as_raw_id(),
325 props: param.properties,
326 task_type: self.task_type as i32,
327 }));
328
329 if result.is_ok() {
330 let mut guard = self.inner.write();
331 let mut dispatched = false;
332 if let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
333 && track.is_pending_dispatch()
334 {
335 track.mark_dispatched(task_id, Instant::now());
336 dispatched = true;
337 }
338 self.handle_success = dispatched;
339 if !dispatched {
340 tracing::warn!(
341 sink_id = %self.sink_id,
342 task_id,
343 "Iceberg compaction task send succeeded but track was no longer pending dispatch"
344 );
345 }
346 }
347
348 result
349 }
350}
351
352impl Drop for IcebergCompactionHandle {
353 fn drop(&mut self) {
354 let mut guard = self.inner.write();
355 if !self.handle_success
356 && let Some(track) = guard.sink_schedules.get_mut(&self.sink_id)
357 && track.is_pending_dispatch()
358 {
359 track.revert_pre_dispatch_failure();
360 }
361 }
362}
363
364#[derive(Debug, Clone, Copy)]
365enum SinkUpdateKind {
366 Commit,
367 ForceCompaction,
368}
369
370impl SinkUpdateKind {
371 fn apply_to_track(self, track: &mut CompactionTrack, now: Instant) {
372 match self {
373 SinkUpdateKind::Commit => track.record_commit(),
374 SinkUpdateKind::ForceCompaction => track.record_force_compaction(now),
375 }
376 }
377}
378
379struct PreparedSinkUpdate {
389 sink_id: SinkId,
390 kind: SinkUpdateKind,
391 now: Instant,
392 allow_track_initialization: bool,
393 loaded_config: Option<IcebergConfig>,
394}
395
396#[derive(Debug, Clone)]
397pub struct IcebergCompactionScheduleStatus {
398 pub sink_id: SinkId,
399 pub task_type: String,
400 pub trigger_interval_sec: u64,
401 pub trigger_snapshot_count: usize,
402 pub schedule_state: String,
403 pub next_compaction_after_sec: Option<u64>,
404 pub pending_snapshot_count: Option<usize>,
405 pub is_triggerable: bool,
406}
407
408impl IcebergCompactionManager {
409 fn refresh_schedule_config(
410 &self,
411 track: &mut CompactionTrack,
412 iceberg_config: &IcebergConfig,
413 now: Instant,
414 ) {
415 let (task_type, trigger_interval_sec, trigger_snapshot_count) =
416 self.resolve_schedule_values(iceberg_config);
417 track.task_type = task_type;
418 track.trigger_snapshot_count = trigger_snapshot_count;
419 track.update_interval(trigger_interval_sec, now);
420 track.mark_config_refreshed(now);
421 }
422
423 pub async fn update_iceberg_commit_info(&self, msg: IcebergSinkCompactionUpdate) {
424 let prepared_update = self.prepare_sink_update(msg, Instant::now()).await;
425
426 let mut guard = self.inner.write();
427 self.apply_sink_update(&mut guard, prepared_update);
428 }
429
430 async fn prepare_sink_update(
431 &self,
432 msg: IcebergSinkCompactionUpdate,
433 now: Instant,
434 ) -> PreparedSinkUpdate {
435 let IcebergSinkCompactionUpdate {
436 sink_id,
437 force_compaction,
438 } = msg;
439 let kind = if force_compaction {
440 SinkUpdateKind::ForceCompaction
441 } else {
442 SinkUpdateKind::Commit
443 };
444 let refresh_interval = self.config_refresh_interval();
445 let (allow_track_initialization, should_refresh_config) = {
446 let guard = self.inner.read();
447 match guard.sink_schedules.get(&sink_id) {
448 Some(track) => (false, track.should_refresh_config(now, refresh_interval)),
449 None => (true, true),
450 }
451 };
452
453 let loaded_config = if should_refresh_config {
454 match self.load_iceberg_config(sink_id).await {
455 Ok(config) => Some(config),
456 Err(e) => {
457 tracing::warn!(
458 error = ?e.as_report(),
459 "Failed to load iceberg config for sink {}",
460 sink_id
461 );
462 None
463 }
464 }
465 } else {
466 None
467 };
468
469 PreparedSinkUpdate {
470 sink_id,
471 kind,
472 now,
473 allow_track_initialization,
474 loaded_config,
475 }
476 }
477
478 fn apply_sink_update(
479 &self,
480 guard: &mut IcebergCompactionManagerInner,
481 prepared_update: PreparedSinkUpdate,
482 ) {
483 let PreparedSinkUpdate {
484 sink_id,
485 kind,
486 now,
487 allow_track_initialization,
488 loaded_config,
489 } = prepared_update;
490 let refresh_interval = self.config_refresh_interval();
491
492 match guard.sink_schedules.entry(sink_id) {
493 Entry::Occupied(entry) => {
494 let track = entry.into_mut();
495 if track.should_refresh_config(now, refresh_interval)
496 && let Some(config) = loaded_config.as_ref()
497 {
498 self.refresh_schedule_config(track, config, now);
499 }
500
501 kind.apply_to_track(track, now);
502 }
503 Entry::Vacant(entry) => {
504 if !allow_track_initialization {
505 tracing::warn!(
506 sink_id = %sink_id,
507 "Ignoring iceberg compaction update because track disappeared before apply"
508 );
509 return;
510 }
511
512 let Some(config) = loaded_config.as_ref() else {
513 tracing::warn!(
514 sink_id = %sink_id,
515 "Ignoring iceberg compaction update because sink config is unavailable"
516 );
517 return;
518 };
519
520 let track = entry.insert(self.create_compaction_track(config, now));
521 kind.apply_to_track(track, now);
522 }
523 }
524 }
525
526 fn create_compaction_track(
527 &self,
528 iceberg_config: &IcebergConfig,
529 now: Instant,
530 ) -> CompactionTrack {
531 let (task_type, trigger_interval_sec, trigger_snapshot_count) =
532 self.resolve_schedule_values(iceberg_config);
533
534 CompactionTrack::new(
535 task_type,
536 trigger_interval_sec,
537 trigger_snapshot_count,
538 self.report_timeout(),
539 now,
540 )
541 }
542
543 fn resolve_schedule_values(&self, iceberg_config: &IcebergConfig) -> (TaskType, u64, usize) {
544 (
545 if should_enable_iceberg_cow(iceberg_config.r#type.as_str(), iceberg_config.write_mode)
546 {
547 TaskType::Full
548 } else {
549 match iceberg_config.compaction_type() {
550 CompactionType::Full => TaskType::Full,
551 CompactionType::SmallFiles => TaskType::SmallFiles,
552 CompactionType::FilesWithDelete => TaskType::FilesWithDelete,
553 }
554 },
555 iceberg_config.compaction_interval_sec(),
556 iceberg_config.trigger_snapshot_count(),
557 )
558 }
559
560 pub(crate) fn get_top_n_iceberg_commit_sink_ids(
561 &self,
562 n: usize,
563 ) -> Vec<IcebergCompactionHandle> {
564 let now = Instant::now();
565 let mut guard = self.inner.write();
566 for (&sink_id, track) in &mut guard.sink_schedules {
567 if track.is_report_timed_out(now) {
568 tracing::warn!(sink_id = %sink_id, "Iceberg compaction task report timed out");
569 track.finish_failed(now);
570 }
571 }
572
573 let mut candidates = Vec::new();
574 for (sink_id, track) in &guard.sink_schedules {
575 if track.should_trigger(now)
576 && let CompactionTrackState::Idle {
577 next_compaction_time,
578 } = &track.state
579 {
580 candidates.push((*sink_id, track.task_type, *next_compaction_time));
581 }
582 }
583
584 candidates.sort_by(|a, b| a.2.cmp(&b.2));
585
586 candidates
587 .into_iter()
588 .take(n)
589 .filter_map(|(sink_id, task_type, _)| {
590 let track = guard.sink_schedules.get_mut(&sink_id)?;
591 track.start_processing();
592
593 Some(IcebergCompactionHandle::new(
594 sink_id,
595 task_type,
596 self.inner.clone(),
597 self.metadata_manager.clone(),
598 ))
599 })
600 .collect()
601 }
602
603 pub fn clear_iceberg_commits_by_sink_id(&self, sink_id: SinkId) {
604 let mut guard = self.inner.write();
605 guard.sink_schedules.remove(&sink_id);
606 }
607
608 pub fn list_compaction_statuses(&self) -> Vec<IcebergCompactionScheduleStatus> {
609 let now = Instant::now();
610 let schedules = {
611 let guard = self.inner.read();
612 guard
613 .sink_schedules
614 .iter()
615 .map(|(&sink_id, track)| (sink_id, track.clone()))
616 .collect_vec()
617 };
618
619 let mut statuses = schedules
620 .into_iter()
621 .map(|(sink_id, track)| {
622 let next_compaction_after_sec = match &track.state {
623 CompactionTrackState::Idle {
624 next_compaction_time,
625 } => Some(
626 next_compaction_time
627 .saturating_duration_since(now)
628 .as_secs(),
629 ),
630 CompactionTrackState::PendingDispatch { .. }
631 | CompactionTrackState::InFlight { .. } => None,
632 };
633 let is_triggerable = track.should_trigger(now);
634
635 IcebergCompactionScheduleStatus {
636 sink_id,
637 task_type: track.task_type.as_str_name().to_ascii_lowercase(),
638 trigger_interval_sec: track.trigger_interval_sec,
639 trigger_snapshot_count: track.trigger_snapshot_count,
640 schedule_state: match track.state {
641 CompactionTrackState::Idle { .. } => "idle".to_owned(),
642 CompactionTrackState::PendingDispatch { .. }
643 | CompactionTrackState::InFlight { .. } => "processing".to_owned(),
644 },
645 next_compaction_after_sec,
646 pending_snapshot_count: Some(track.pending_commit_count),
647 is_triggerable,
648 }
649 })
650 .collect_vec();
651
652 statuses.sort_by_key(|status| status.sink_id);
653 statuses
654 }
655
656 pub fn handle_report_task(&self, report: IcebergReportTask) {
657 if self.complete_manual_task_if_any(&report) {
658 return;
659 }
660
661 let sink_id = SinkId::from(report.sink_id);
662 let task_id = report.task_id;
663 let status = IcebergReportTaskStatus::try_from(report.status)
664 .unwrap_or(IcebergReportTaskStatus::Unspecified);
665 let now = Instant::now();
666
667 let mut guard = self.inner.write();
668 let Some(track) = guard.sink_schedules.get_mut(&sink_id) else {
669 tracing::warn!(sink_id = %sink_id, task_id, "Received iceberg compaction report for unknown sink");
670 return;
671 };
672
673 if !track.is_processing_task(task_id) {
674 tracing::warn!(sink_id = %sink_id, task_id, "Ignoring stale iceberg compaction report");
675 return;
676 }
677
678 match status {
679 IcebergReportTaskStatus::Success => {
680 track.finish_success(now);
681 }
682 IcebergReportTaskStatus::Failed | IcebergReportTaskStatus::Unspecified => {
683 tracing::warn!(
684 sink_id = %sink_id,
685 task_id,
686 error_message = report.error_message.unwrap_or_default(),
687 "Iceberg compaction task reported failure"
688 );
689 track.finish_failed(now);
690 }
691 }
692 }
693}
694
695#[cfg(test)]
696mod tests;