1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, PbSstableFilterType, ReportCompactionTaskRequest,
78 SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockErrorInner, HummockResult, ObjectIdManager, SstableBuilderOptions,
94 Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder,
105 SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106 validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116 running_parallelism: u32,
117 pull_task_ack: bool,
118 pending_pull_task_count: u32,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124 running_parallelism: u32,
125 waiting_parallelism: u32,
126 available_parallelism: u32,
127 pull_task_ack: bool,
128 pending_pull_task_count: u32,
129}
130
131struct LogThrottler<T: PartialEq> {
133 last_logged_state: Option<T>,
134 last_heartbeat: Instant,
135 heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139 fn new(heartbeat_interval: Duration) -> Self {
140 Self {
141 last_logged_state: None,
142 last_heartbeat: Instant::now(),
143 heartbeat_interval,
144 }
145 }
146
147 fn should_log(&self, current_state: &T) -> bool {
149 self.last_logged_state.as_ref() != Some(current_state)
150 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151 }
152
153 fn update(&mut self, current_state: T) {
155 self.last_logged_state = Some(current_state);
156 self.last_heartbeat = Instant::now();
157 }
158}
159
160pub struct Compactor {
162 context: CompactorContext,
164 object_id_getter: Arc<dyn GetObjectId>,
165 task_config: TaskConfig,
166 options: SstableBuilderOptions,
167 get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173 pub fn new(
175 context: CompactorContext,
176 options: SstableBuilderOptions,
177 task_config: TaskConfig,
178 object_id_getter: Arc<dyn GetObjectId>,
179 ) -> Self {
180 Self {
181 context,
182 options,
183 task_config,
184 get_id_time: Arc::new(AtomicU64::new(0)),
185 object_id_getter,
186 }
187 }
188
189 async fn compact_key_range(
194 &self,
195 iter: impl HummockIterator<Direction = Forward>,
196 compaction_filter: impl CompactionFilter,
197 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198 task_progress: Option<Arc<TaskProgress>>,
199 task_id: Option<HummockCompactionTaskId>,
200 split_index: Option<usize>,
201 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202 let compact_timer = if self.context.is_share_buffer_compact {
204 self.context
205 .compactor_metrics
206 .write_build_l0_sst_duration
207 .start_timer()
208 } else {
209 self.context
210 .compactor_metrics
211 .compact_sst_duration
212 .start_timer()
213 };
214
215 let (split_table_outputs, table_stats_map) = {
216 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217 match (
218 self.task_config.sstable_filter_kind,
219 self.task_config.use_block_based_filter,
220 ) {
221 (PbSstableFilterType::SstableFilterXor8, true) => {
222 self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
223 factory,
224 iter,
225 compaction_filter,
226 compaction_catalog_agent_ref,
227 task_progress.clone(),
228 self.object_id_getter.clone(),
229 )
230 .instrument_await("compact".verbose())
231 .await?
232 }
233 (PbSstableFilterType::SstableFilterXor8, false) => {
234 self.compact_key_range_impl::<_, Xor8FilterBuilder>(
235 factory,
236 iter,
237 compaction_filter,
238 compaction_catalog_agent_ref,
239 task_progress.clone(),
240 self.object_id_getter.clone(),
241 )
242 .instrument_await("compact".verbose())
243 .await?
244 }
245 (PbSstableFilterType::SstableFilterXor16, true) => {
246 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
247 factory,
248 iter,
249 compaction_filter,
250 compaction_catalog_agent_ref,
251 task_progress.clone(),
252 self.object_id_getter.clone(),
253 )
254 .instrument_await("compact".verbose())
255 .await?
256 }
257 (PbSstableFilterType::SstableFilterXor16, false) => {
258 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
259 factory,
260 iter,
261 compaction_filter,
262 compaction_catalog_agent_ref,
263 task_progress.clone(),
264 self.object_id_getter.clone(),
265 )
266 .instrument_await("compact".verbose())
267 .await?
268 }
269 (kind, _) => unreachable!("unsupported sstable filter kind in compactor: {kind:?}"),
270 }
271 };
272
273 compact_timer.observe_duration();
274
275 Self::report_progress(
276 self.context.compactor_metrics.clone(),
277 task_progress,
278 &split_table_outputs,
279 self.context.is_share_buffer_compact,
280 );
281
282 self.context
283 .compactor_metrics
284 .get_table_id_total_time_duration
285 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
286
287 debug_assert!(
288 split_table_outputs
289 .iter()
290 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
291 );
292
293 if task_id.is_some() {
294 tracing::info!(
296 "Finish Task {:?} split_index {:?} sst count {}",
297 task_id,
298 split_index,
299 split_table_outputs.len()
300 );
301 }
302 Ok((split_table_outputs, table_stats_map))
303 }
304
305 pub fn report_progress(
306 metrics: Arc<CompactorMetrics>,
307 task_progress: Option<Arc<TaskProgress>>,
308 ssts: &Vec<LocalSstableInfo>,
309 is_share_buffer_compact: bool,
310 ) {
311 for sst_info in ssts {
312 let sst_size = sst_info.file_size();
313 if let Some(tracker) = &task_progress {
314 tracker.inc_ssts_uploaded();
315 tracker.dec_num_pending_write_io();
316 }
317 if is_share_buffer_compact {
318 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
319 } else {
320 metrics.compaction_upload_sst_counts.inc();
321 }
322 }
323 }
324
325 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
326 &self,
327 writer_factory: F,
328 iter: impl HummockIterator<Direction = Forward>,
329 compaction_filter: impl CompactionFilter,
330 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331 task_progress: Option<Arc<TaskProgress>>,
332 object_id_getter: Arc<dyn GetObjectId>,
333 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
334 let builder_factory = RemoteBuilderFactory::<F, B> {
335 object_id_getter,
336 limiter: self.context.memory_limiter.clone(),
337 options: self.options.clone(),
338 policy: self.task_config.cache_policy,
339 remote_rpc_cost: self.get_id_time.clone(),
340 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
341 sstable_writer_factory: writer_factory,
342 _phantom: PhantomData,
343 };
344
345 let mut sst_builder = CapacitySplitTableBuilder::new(
346 builder_factory,
347 self.context.compactor_metrics.clone(),
348 task_progress.clone(),
349 self.task_config.table_vnode_partition.clone(),
350 self.context
351 .storage_opts
352 .compactor_concurrent_uploading_sst_count,
353 compaction_catalog_agent_ref,
354 );
355 let compaction_statistics = compact_and_build_sst(
356 &mut sst_builder,
357 &self.task_config,
358 self.context.compactor_metrics.clone(),
359 iter,
360 compaction_filter,
361 )
362 .instrument_await("compact_and_build_sst".verbose())
363 .await?;
364
365 let ssts = sst_builder
366 .finish()
367 .instrument_await("builder_finish".verbose())
368 .await?;
369
370 Ok((ssts, compaction_statistics))
371 }
372}
373
374#[must_use]
377pub fn start_iceberg_compactor(
378 compactor_context: CompactorContext,
379 hummock_meta_client: Arc<dyn HummockMetaClient>,
380) -> (JoinHandle<()>, Sender<()>) {
381 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
382 let stream_retry_interval = Duration::from_secs(30);
383 let periodic_event_update_interval = Duration::from_millis(
384 compactor_context
385 .storage_opts
386 .iceberg_compaction_pull_interval_ms,
387 );
388 let worker_num = compactor_context.compaction_executor.worker_num();
389
390 let max_task_parallelism: u32 = (worker_num as f32
391 * compactor_context.storage_opts.compactor_max_task_multiplier)
392 .ceil() as u32;
393
394 const MAX_PULL_TASK_COUNT: u32 = 4;
395 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
396
397 assert_ge!(
398 compactor_context.storage_opts.compactor_max_task_multiplier,
399 0.0
400 );
401
402 let join_handle = tokio::spawn(async move {
403 let pending_parallelism_budget = (max_task_parallelism as f32
405 * compactor_context
406 .storage_opts
407 .iceberg_compaction_pending_parallelism_budget_multiplier)
408 .ceil() as u32;
409 let mut task_queue =
410 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
411
412 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
414
415 let (task_completion_tx, mut task_completion_rx) =
417 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
418 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
419 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
422
423 let mut min_interval = tokio::time::interval(stream_retry_interval);
424 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
425
426 let mut log_throttler =
428 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
429
430 'start_stream: loop {
432 let mut pull_task_ack = true;
435 tokio::select! {
436 _ = min_interval.tick() => {},
438 _ = &mut shutdown_rx => {
440 tracing::info!("Compactor is shutting down");
441 return;
442 }
443 }
444
445 let (request_sender, response_event_stream) = match hummock_meta_client
446 .subscribe_iceberg_compaction_event()
447 .await
448 {
449 Ok((request_sender, response_event_stream)) => {
450 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
451 (request_sender, response_event_stream)
452 }
453
454 Err(e) => {
455 tracing::warn!(
456 error = %e.as_report(),
457 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
458 );
459 continue 'start_stream;
460 }
461 };
462
463 if matches!(
464 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
465 ReportSendResult::RestartStream
466 ) {
467 continue 'start_stream;
468 }
469
470 pin_mut!(response_event_stream);
471
472 let _executor = compactor_context.compaction_executor.clone();
473
474 let mut event_loop_iteration_now = Instant::now();
476 'consume_stream: loop {
477 {
478 compactor_context
480 .compactor_metrics
481 .compaction_event_loop_iteration_latency
482 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
483 event_loop_iteration_now = Instant::now();
484 }
485
486 let request_sender = request_sender.clone();
487 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
488 Some(plan_completion) = task_completion_rx.recv() => {
490 let task_key = plan_completion.task_key;
491 let error_message = plan_completion.error_message;
492 tracing::debug!(
493 task_id = task_key.0,
494 plan_index = task_key.1,
495 success = error_message.is_none(),
496 "Plan completed, updating queue state"
497 );
498 task_queue.finish_running(task_key);
499
500 let completed_task_id = task_key.0;
501 let Entry::Occupied(mut tracker_entry) =
502 task_trackers.entry(completed_task_id)
503 else {
504 continue 'consume_stream;
505 };
506 tracker_entry.get_mut().record_completion(error_message);
507 if !tracker_entry.get().is_finished() {
508 continue 'consume_stream;
509 }
510
511 let report = tracker_entry.remove().into_report(completed_task_id);
512 if matches!(
513 send_or_buffer_iceberg_task_report(
514 &request_sender,
515 &mut pending_task_reports,
516 report,
517 ),
518 ReportSendResult::RestartStream
519 ) {
520 continue 'start_stream;
521 }
522 continue 'consume_stream;
523 }
524
525 _ = task_queue.wait_schedulable() => {
527 schedule_queued_tasks(
528 &mut task_queue,
529 &compactor_context,
530 &shutdown_map,
531 &task_completion_tx,
532 );
533 continue 'consume_stream;
534 }
535
536 _ = periodic_event_interval.tick() => {
537 let should_restart_stream = handle_meta_task_pulling(
539 &mut pull_task_ack,
540 &task_queue,
541 max_task_parallelism,
542 max_pull_task_count,
543 &request_sender,
544 &mut log_throttler,
545 );
546
547 if should_restart_stream {
548 continue 'start_stream;
549 }
550 continue;
551 }
552 event = response_event_stream.next() => {
553 event
554 }
555
556 _ = &mut shutdown_rx => {
557 tracing::info!("Iceberg Compactor is shutting down");
558 return
559 }
560 };
561
562 match event {
563 Some(Ok(SubscribeIcebergCompactionEventResponse {
564 event,
565 create_at: _create_at,
566 })) => {
567 let event = match event {
568 Some(event) => event,
569 None => continue 'consume_stream,
570 };
571
572 match event {
573 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
574 let task_id = iceberg_compaction_task.task_id;
575 let sink_id = iceberg_compaction_task.sink_id;
576 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
578 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
579 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
580 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
581 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
582 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
583 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
584 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
585 .enable_prefetch(compactor_context.storage_opts.iceberg_compaction_enable_prefetch)
586 .target_binpack_group_size_mb(
587 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
588 )
589 .min_group_size_mb(
590 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
591 )
592 .min_group_file_count(
593 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
594 )
595 .build() {
596 Ok(config) => config,
597 Err(e) => {
598 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
599 let report = build_iceberg_task_report(
600 task_id,
601 sink_id,
602 Some(format!(
603 "Failed to build iceberg compactor runner config: {}",
604 e.as_report()
605 )),
606 );
607 if matches!(
608 send_or_buffer_iceberg_task_report(
609 &request_sender,
610 &mut pending_task_reports,
611 report,
612 ),
613 ReportSendResult::RestartStream
614 ) {
615 continue 'start_stream;
616 }
617 continue 'consume_stream;
618 }
619 };
620
621 let task_execution = match create_task_execution(
623 iceberg_compaction_task,
624 compactor_runner_config,
625 compactor_context.compactor_metrics.clone(),
626 ).await {
627 Ok(task_execution) => task_execution,
628 Err(e) => {
629 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
630 let report = build_iceberg_task_report(
631 task_id,
632 sink_id,
633 Some(format!(
634 "Failed to create iceberg compaction task execution: {}",
635 e.as_report()
636 )),
637 );
638 if matches!(
639 send_or_buffer_iceberg_task_report(
640 &request_sender,
641 &mut pending_task_reports,
642 report,
643 ),
644 ReportSendResult::RestartStream
645 ) {
646 continue 'start_stream;
647 }
648 continue 'consume_stream;
649 }
650 };
651
652 let sink_id = task_execution.sink_id;
653 let plan_runners = task_execution.plan_runners;
654
655 if plan_runners.is_empty() {
656 tracing::info!(task_id, sink_id, "No plans to execute");
657 let report = build_iceberg_task_report(task_id, sink_id, None);
658 if matches!(
659 send_or_buffer_iceberg_task_report(
660 &request_sender,
661 &mut pending_task_reports,
662 report,
663 ),
664 ReportSendResult::RestartStream
665 ) {
666 continue 'start_stream;
667 }
668 continue 'consume_stream;
669 }
670
671 let total_plans = plan_runners.len();
673 let mut enqueued_count = 0;
674
675 for runner in plan_runners {
676 let meta = runner.to_meta();
677 let plan_index = meta.plan_index;
678 let required_parallelism = runner.required_parallelism();
679 let push_result = task_queue.push(meta, Some(runner));
680
681 match push_result {
682 PushResult::Added => {
683 enqueued_count += 1;
684 tracing::debug!(
685 task_id = task_id,
686 plan_index = plan_index,
687 required_parallelism = required_parallelism,
688 "Iceberg plan runner added to queue"
689 );
690 },
691 PushResult::RejectedCapacity => {
692 tracing::warn!(
693 task_id = task_id,
694 required_parallelism = required_parallelism,
695 pending_budget = pending_parallelism_budget,
696 enqueued_count = enqueued_count,
697 total_plans = total_plans,
698 "Iceberg plan runner rejected - queue capacity exceeded"
699 );
700 break;
702 },
703 PushResult::RejectedTooLarge => {
704 tracing::error!(
705 task_id = task_id,
706 required_parallelism = required_parallelism,
707 max_parallelism = max_task_parallelism,
708 "Iceberg plan runner rejected - parallelism exceeds max"
709 );
710 },
711 PushResult::RejectedInvalidParallelism => {
712 tracing::error!(
713 task_id = task_id,
714 required_parallelism = required_parallelism,
715 "Iceberg plan runner rejected - invalid parallelism"
716 );
717 },
718 PushResult::RejectedDuplicate => {
719 tracing::error!(
720 task_id = task_id,
721 plan_index = plan_index,
722 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
723 );
724 }
725 }
726 }
727
728 if enqueued_count == 0 {
729 let report = build_iceberg_task_report(
730 task_id,
731 sink_id,
732 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
733 );
734 if matches!(
735 send_or_buffer_iceberg_task_report(
736 &request_sender,
737 &mut pending_task_reports,
738 report,
739 ),
740 ReportSendResult::RestartStream
741 ) {
742 continue 'start_stream;
743 }
744 } else {
745 task_trackers.insert(
746 task_id,
747 IcebergTaskTracker::new(sink_id, enqueued_count),
748 );
749 }
750
751 tracing::info!(
752 task_id = task_id,
753 sink_id = sink_id,
754 total_plans = total_plans,
755 enqueued_count = enqueued_count,
756 "Enqueued {} of {} Iceberg plan runners",
757 enqueued_count,
758 total_plans
759 );
760 },
761 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
762 pull_task_ack = true;
764 },
765 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
766 cancel_iceberg_task(
767 cancel_compact_task.task_id,
768 &mut task_queue,
769 &shutdown_map,
770 &mut task_trackers,
771 );
772 },
773 }
774 }
775 Some(Err(e)) => {
776 tracing::warn!("Failed to consume stream. {}", e.message());
777 continue 'start_stream;
778 }
779 _ => {
780 continue 'start_stream;
782 }
783 }
784 }
785 }
786 });
787
788 (join_handle, shutdown_tx)
789}
790
791#[must_use]
794pub fn start_compactor(
795 compactor_context: CompactorContext,
796 hummock_meta_client: Arc<dyn HummockMetaClient>,
797 object_id_manager: Arc<ObjectIdManager>,
798 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
799) -> (JoinHandle<()>, Sender<()>) {
800 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
801 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
802 let stream_retry_interval = Duration::from_secs(30);
803 let task_progress = compactor_context.task_progress_manager.clone();
804 let periodic_event_update_interval = Duration::from_millis(1000);
805
806 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
807 * compactor_context.storage_opts.compactor_max_task_multiplier)
808 .ceil() as u32;
809 let running_task_parallelism = Arc::new(AtomicU32::new(0));
810
811 const MAX_PULL_TASK_COUNT: u32 = 4;
812 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
813
814 assert_ge!(
815 compactor_context.storage_opts.compactor_max_task_multiplier,
816 0.0
817 );
818
819 let join_handle = tokio::spawn(async move {
820 let shutdown_map = CompactionShutdownMap::default();
821 let mut min_interval = tokio::time::interval(stream_retry_interval);
822 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
823
824 let mut log_throttler =
826 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
827
828 'start_stream: loop {
830 let mut pull_task_ack = true;
833 tokio::select! {
834 _ = min_interval.tick() => {},
836 _ = &mut shutdown_rx => {
838 tracing::info!("Compactor is shutting down");
839 return;
840 }
841 }
842
843 let (request_sender, response_event_stream) =
844 match hummock_meta_client.subscribe_compaction_event().await {
845 Ok((request_sender, response_event_stream)) => {
846 tracing::debug!("Succeeded subscribe_compaction_event.");
847 (request_sender, response_event_stream)
848 }
849
850 Err(e) => {
851 tracing::warn!(
852 error = %e.as_report(),
853 "Subscribing to compaction tasks failed with error. Will retry.",
854 );
855 continue 'start_stream;
856 }
857 };
858
859 pin_mut!(response_event_stream);
860
861 let executor = compactor_context.compaction_executor.clone();
862 let object_id_manager = object_id_manager.clone();
863
864 let mut event_loop_iteration_now = Instant::now();
866 'consume_stream: loop {
867 {
868 compactor_context
870 .compactor_metrics
871 .compaction_event_loop_iteration_latency
872 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
873 event_loop_iteration_now = Instant::now();
874 }
875
876 let running_task_parallelism = running_task_parallelism.clone();
877 let request_sender = request_sender.clone();
878 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
879 _ = periodic_event_interval.tick() => {
880 let progress_list = get_task_progress(task_progress.clone());
881
882 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
883 event: Some(RequestEvent::HeartBeat(
884 HeartBeat {
885 progress: progress_list
886 }
887 )),
888 create_at: SystemTime::now()
889 .duration_since(std::time::UNIX_EPOCH)
890 .expect("Clock may have gone backwards")
891 .as_millis() as u64,
892 }) {
893 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
894 continue 'start_stream;
896 }
897
898
899 let mut pending_pull_task_count = 0;
900 if pull_task_ack {
901 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
903
904 if pending_pull_task_count > 0 {
905 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
906 event: Some(RequestEvent::PullTask(
907 PullTask {
908 pull_task_count: pending_pull_task_count,
909 }
910 )),
911 create_at: SystemTime::now()
912 .duration_since(std::time::UNIX_EPOCH)
913 .expect("Clock may have gone backwards")
914 .as_millis() as u64,
915 }) {
916 tracing::warn!(error = %e.as_report(), "Failed to pull task");
917
918 continue 'start_stream;
920 } else {
921 pull_task_ack = false;
922 }
923 }
924 }
925
926 let running_count = running_task_parallelism.load(Ordering::SeqCst);
927 let current_state = CompactionLogState {
928 running_parallelism: running_count,
929 pull_task_ack,
930 pending_pull_task_count,
931 };
932
933 if log_throttler.should_log(¤t_state) {
935 tracing::info!(
936 running_parallelism_count = %current_state.running_parallelism,
937 pull_task_ack = %current_state.pull_task_ack,
938 pending_pull_task_count = %current_state.pending_pull_task_count
939 );
940 log_throttler.update(current_state);
941 }
942
943 continue;
944 }
945 event = response_event_stream.next() => {
946 event
947 }
948
949 _ = &mut shutdown_rx => {
950 tracing::info!("Compactor is shutting down");
951 return
952 }
953 };
954
955 fn send_report_task_event(
956 compact_task: &CompactTask,
957 table_stats: TableStatsMap,
958 object_timestamps: HashMap<HummockSstableObjectId, u64>,
959 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
960 ) {
961 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
962 event: Some(RequestEvent::ReportTask(ReportTask {
963 task_id: compact_task.task_id,
964 task_status: compact_task.task_status.into(),
965 sorted_output_ssts: compact_task
966 .sorted_output_ssts
967 .iter()
968 .map(|sst| sst.into())
969 .collect(),
970 table_stats_change: to_prost_table_stats_map(table_stats),
971 object_timestamps,
972 })),
973 create_at: SystemTime::now()
974 .duration_since(std::time::UNIX_EPOCH)
975 .expect("Clock may have gone backwards")
976 .as_millis() as u64,
977 }) {
978 let task_id = compact_task.task_id;
979 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
980 }
981 }
982
983 match event {
984 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
985 let event = match event {
986 Some(event) => event,
987 None => continue 'consume_stream,
988 };
989 let shutdown = shutdown_map.clone();
990 let context = compactor_context.clone();
991 let consumed_latency_ms = SystemTime::now()
992 .duration_since(std::time::UNIX_EPOCH)
993 .expect("Clock may have gone backwards")
994 .as_millis() as u64
995 - create_at;
996 context
997 .compactor_metrics
998 .compaction_event_consumed_latency
999 .observe(consumed_latency_ms as _);
1000
1001 let object_id_manager = object_id_manager.clone();
1002 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1003
1004 match event {
1005 ResponseEvent::CompactTask(compact_task) => {
1006 let compact_task = CompactTask::from(compact_task);
1007 let parallelism =
1008 calculate_task_parallelism(&compact_task, &context);
1009
1010 assert_ne!(parallelism, 0, "splits cannot be empty");
1011
1012 if (max_task_parallelism
1013 - running_task_parallelism.load(Ordering::SeqCst))
1014 < parallelism as u32
1015 {
1016 tracing::warn!(
1017 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1018 compact_task.task_id,
1019 parallelism,
1020 max_task_parallelism,
1021 running_task_parallelism.load(Ordering::Relaxed),
1022 );
1023 let (compact_task, table_stats, object_timestamps) =
1024 compact_done(
1025 compact_task,
1026 context.clone(),
1027 vec![],
1028 TaskStatus::NoAvailCpuResourceCanceled,
1029 );
1030
1031 send_report_task_event(
1032 &compact_task,
1033 table_stats,
1034 object_timestamps,
1035 &request_sender,
1036 );
1037
1038 continue 'consume_stream;
1039 }
1040
1041 running_task_parallelism
1042 .fetch_add(parallelism as u32, Ordering::SeqCst);
1043 executor.spawn(async move {
1044 let (tx, rx) = tokio::sync::oneshot::channel();
1045 let task_id = compact_task.task_id;
1046 shutdown.lock().unwrap().insert(task_id, tx);
1047
1048 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1049 context.clone(),
1050 compact_task,
1051 rx,
1052 object_id_manager.clone(),
1053 compaction_catalog_manager_ref.clone(),
1054 )
1055 .await;
1056
1057 shutdown.lock().unwrap().remove(&task_id);
1058 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1059
1060 send_report_task_event(
1061 &compact_task,
1062 table_stats,
1063 object_timestamps,
1064 &request_sender,
1065 );
1066
1067 let enable_check_compaction_result =
1068 context.storage_opts.check_compaction_result;
1069 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1070
1071 if enable_check_compaction_result && need_check_task {
1072 let read_table_ids = compact_task
1073 .get_table_ids_from_input_ssts()
1074 .collect::<Vec<_>>();
1075 match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1076 Ok(compaction_catalog_agent_ref) => {
1077 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1078 {
1079 Err(e) => {
1080 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1081 }
1082 Ok(true) => (),
1083 Ok(false) => {
1084 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1085 }
1086 }
1087 },
1088 Err(e) => {
1089 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1090 }
1091 }
1092 }
1093 });
1094 }
1095 #[expect(deprecated)]
1096 ResponseEvent::VacuumTask(_) => {
1097 unreachable!("unexpected vacuum task");
1098 }
1099 #[expect(deprecated)]
1100 ResponseEvent::FullScanTask(_) => {
1101 unreachable!("unexpected scan task");
1102 }
1103 #[expect(deprecated)]
1104 ResponseEvent::ValidationTask(validation_task) => {
1105 let validation_task = ValidationTask::from(validation_task);
1106 executor.spawn(async move {
1107 validate_ssts(validation_task, context.sstable_store.clone())
1108 .await;
1109 });
1110 }
1111 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1112 .lock()
1113 .unwrap()
1114 .remove(&cancel_compact_task.task_id)
1115 {
1116 Some(tx) => {
1117 if tx.send(()).is_err() {
1118 tracing::warn!(
1119 "Cancellation of compaction task failed. task_id: {}",
1120 cancel_compact_task.task_id
1121 );
1122 }
1123 }
1124 _ => {
1125 tracing::warn!(
1126 "Attempting to cancel non-existent compaction task. task_id: {}",
1127 cancel_compact_task.task_id
1128 );
1129 }
1130 },
1131
1132 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1133 pull_task_ack = true;
1135 }
1136 }
1137 }
1138 Some(Err(e)) => {
1139 tracing::warn!("Failed to consume stream. {}", e.message());
1140 continue 'start_stream;
1141 }
1142 _ => {
1143 continue 'start_stream;
1145 }
1146 }
1147 }
1148 }
1149 });
1150
1151 (join_handle, shutdown_tx)
1152}
1153
1154#[must_use]
1157pub fn start_shared_compactor(
1158 grpc_proxy_client: GrpcCompactorProxyClient,
1159 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1160 context: CompactorContext,
1161) -> (JoinHandle<()>, Sender<()>) {
1162 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1163 let task_progress = context.task_progress_manager.clone();
1164 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1165 let periodic_event_update_interval = Duration::from_millis(1000);
1166
1167 let join_handle = tokio::spawn(async move {
1168 let shutdown_map = CompactionShutdownMap::default();
1169
1170 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1171 let executor = context.compaction_executor.clone();
1172 let report_heartbeat_client = grpc_proxy_client.clone();
1173 'consume_stream: loop {
1174 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1175 _ = periodic_event_interval.tick() => {
1176 let progress_list = get_task_progress(task_progress.clone());
1177 let report_compaction_task_request = ReportCompactionTaskRequest{
1178 event: Some(ReportCompactionTaskEvent::HeartBeat(
1179 SharedHeartBeat {
1180 progress: progress_list
1181 }
1182 )),
1183 };
1184 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1185 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1186 }
1187 continue
1188 }
1189
1190
1191 _ = &mut shutdown_rx => {
1192 tracing::info!("Compactor is shutting down");
1193 return
1194 }
1195
1196 request = receiver.recv() => {
1197 request
1198 }
1199
1200 };
1201 match request {
1202 Some(request) => {
1203 let context = context.clone();
1204 let shutdown = shutdown_map.clone();
1205
1206 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1207 executor.spawn(async move {
1208 let DispatchCompactionTaskRequest {
1209 tables,
1210 output_object_ids,
1211 task: dispatch_task,
1212 } = request.into_inner();
1213 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1214 acc.insert(table.id, table);
1215 acc
1216 });
1217
1218 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1219 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1220 let shared_compactor_object_id_manager =
1221 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1222 match dispatch_task.unwrap() {
1223 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1224 let compact_task = CompactTask::from(&compact_task);
1225 let (tx, rx) = tokio::sync::oneshot::channel();
1226 let task_id = compact_task.task_id;
1227 shutdown.lock().unwrap().insert(task_id, tx);
1228
1229 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1230 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1231 context.clone(),
1232 compact_task,
1233 rx,
1234 shared_compactor_object_id_manager,
1235 compaction_catalog_agent_ref.clone(),
1236 )
1237 .await;
1238 shutdown.lock().unwrap().remove(&task_id);
1239 let report_compaction_task_request = ReportCompactionTaskRequest {
1240 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1241 compact_task: Some(PbCompactTask::from(&compact_task)),
1242 table_stats_change: to_prost_table_stats_map(table_stats),
1243 object_timestamps,
1244 })),
1245 };
1246
1247 match cloned_grpc_proxy_client
1248 .report_compaction_task(report_compaction_task_request)
1249 .await
1250 {
1251 Ok(_) => {
1252 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1254 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1255 if enable_check_compaction_result && need_check_task {
1256 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1257 Err(e) => {
1258 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1259 },
1260 Ok(true) => (),
1261 Ok(false) => {
1262 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1263 }
1264 }
1265 }
1266 }
1267 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1268 }
1269
1270 }
1271 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1272 unreachable!("unexpected vacuum task");
1273 }
1274 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1275 unreachable!("unexpected scan task");
1276 }
1277 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1278 let validation_task = ValidationTask::from(validation_task);
1279 validate_ssts(validation_task, context.sstable_store.clone()).await;
1280 }
1281 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1282 match shutdown
1283 .lock()
1284 .unwrap()
1285 .remove(&cancel_compact_task.task_id)
1286 { Some(tx) => {
1287 if tx.send(()).is_err() {
1288 tracing::warn!(
1289 "Cancellation of compaction task failed. task_id: {}",
1290 cancel_compact_task.task_id
1291 );
1292 }
1293 } _ => {
1294 tracing::warn!(
1295 "Attempting to cancel non-existent compaction task. task_id: {}",
1296 cancel_compact_task.task_id
1297 );
1298 }}
1299 }
1300 }
1301 });
1302 }
1303 None => continue 'consume_stream,
1304 }
1305 }
1306 });
1307 (join_handle, shutdown_tx)
1308}
1309
1310fn get_task_progress(
1311 task_progress: Arc<
1312 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1313 >,
1314) -> Vec<CompactTaskProgress> {
1315 let mut progress_list = Vec::new();
1316 for (&task_id, progress) in &*task_progress.lock() {
1317 progress_list.push(progress.snapshot(task_id));
1318 }
1319 progress_list
1320}
1321
1322fn schedule_queued_tasks(
1324 task_queue: &mut IcebergTaskQueue,
1325 compactor_context: &CompactorContext,
1326 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1327 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1328) {
1329 while let Some(popped_task) = task_queue.pop() {
1330 let task_id = popped_task.meta.task_id;
1331 let plan_index = popped_task.meta.plan_index;
1332 let task_key = (task_id, plan_index);
1333
1334 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1336
1337 let Some(runner) = popped_task.runner else {
1338 tracing::error!(
1339 task_id = task_id,
1340 plan_index = plan_index,
1341 "Popped task missing runner - this should not happen"
1342 );
1343 task_queue.finish_running(task_key);
1344 continue;
1345 };
1346
1347 let executor = compactor_context.compaction_executor.clone();
1348 let shutdown_map_clone = shutdown_map.clone();
1349 let completion_tx_clone = task_completion_tx.clone();
1350 let (tx, rx) = tokio::sync::oneshot::channel();
1351
1352 {
1353 let mut shutdown_guard = shutdown_map.lock().unwrap();
1354 shutdown_guard.insert(task_key, tx);
1355 }
1356
1357 tracing::info!(
1358 task_id = task_id,
1359 plan_index = plan_index,
1360 unique_ident = ?unique_ident,
1361 required_parallelism = popped_task.meta.required_parallelism,
1362 "Starting iceberg compaction task from queue"
1363 );
1364
1365 executor.spawn(async move {
1366 let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1367 let mut shutdown_guard = shutdown_map.lock().unwrap();
1368 shutdown_guard.remove(&task_key);
1369 });
1370
1371 let result = Box::pin(runner.compact(rx)).await;
1372
1373 let completion = match result {
1374 Ok(_) => IcebergPlanCompletion {
1375 task_key,
1376 error_message: None,
1377 },
1378 Err(e) => {
1379 if is_cancelled_iceberg_compaction_error(&e) {
1380 tracing::info!(
1381 task_id = task_key.0,
1382 plan_index = task_key.1,
1383 "Iceberg compaction plan cancelled"
1384 );
1385 } else {
1386 tracing::warn!(
1387 error = %e.as_report(),
1388 task_id = task_key.0,
1389 plan_index = task_key.1,
1390 "Failed to compact iceberg runner"
1391 );
1392 }
1393 IcebergPlanCompletion {
1394 task_key,
1395 error_message: Some(e.to_report_string()),
1396 }
1397 }
1398 };
1399
1400 if completion_tx_clone.send(completion).is_err() {
1401 tracing::warn!(
1402 task_id = task_key.0,
1403 plan_index = task_key.1,
1404 "Failed to notify task completion - main loop may have shut down"
1405 );
1406 }
1407 });
1408 }
1409}
1410
1411fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1412 matches!(
1413 error.inner(),
1414 HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1415 )
1416}
1417
1418fn cancel_iceberg_task(
1419 task_id: u64,
1420 task_queue: &mut IcebergTaskQueue,
1421 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1422 task_trackers: &mut HashMap<u64, IcebergTaskTracker>,
1423) {
1424 let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1429 let removed_tracker = task_trackers.remove(&task_id).is_some();
1430
1431 let cancelled_running = {
1432 let mut shutdown_guard = shutdown_map.lock().unwrap();
1433 let task_keys: Vec<_> = shutdown_guard
1434 .keys()
1435 .filter(|(running_task_id, _)| *running_task_id == task_id)
1436 .copied()
1437 .collect();
1438
1439 for task_key in &task_keys {
1440 if let Some(tx) = shutdown_guard.remove(task_key)
1441 && tx.send(()).is_err()
1442 {
1443 tracing::debug!(
1444 task_id = task_key.0,
1445 plan_index = task_key.1,
1446 "Iceberg compaction plan shutdown receiver already closed during cancellation"
1447 );
1448 }
1449 }
1450
1451 task_keys.len()
1452 };
1453
1454 if cancelled_waiting == 0 && cancelled_running == 0 && !removed_tracker {
1455 tracing::warn!(
1456 task_id = task_id,
1457 "Attempting to cancel non-existent iceberg compaction task"
1458 );
1459 } else {
1460 tracing::info!(
1461 task_id = task_id,
1462 cancelled_waiting = cancelled_waiting,
1463 cancelled_running = cancelled_running,
1464 removed_tracker = removed_tracker,
1465 "Cancelled iceberg compaction task"
1466 );
1467 }
1468}
1469
1470fn handle_meta_task_pulling(
1473 pull_task_ack: &mut bool,
1474 task_queue: &IcebergTaskQueue,
1475 max_task_parallelism: u32,
1476 max_pull_task_count: u32,
1477 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1478 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1479) -> bool {
1480 let mut pending_pull_task_count = 0;
1481 if *pull_task_ack {
1482 let current_running_parallelism = task_queue.running_parallelism_sum();
1484 pending_pull_task_count =
1485 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1486
1487 if pending_pull_task_count > 0 {
1488 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1489 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1490 subscribe_iceberg_compaction_event_request::PullTask {
1491 pull_task_count: pending_pull_task_count,
1492 },
1493 )),
1494 create_at: SystemTime::now()
1495 .duration_since(std::time::UNIX_EPOCH)
1496 .expect("Clock may have gone backwards")
1497 .as_millis() as u64,
1498 }) {
1499 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1500 return true; } else {
1502 *pull_task_ack = false;
1503 }
1504 }
1505 }
1506
1507 let running_count = task_queue.running_parallelism_sum();
1508 let waiting_count = task_queue.waiting_parallelism_sum();
1509 let available_count = max_task_parallelism.saturating_sub(running_count);
1510 let current_state = IcebergCompactionLogState {
1511 running_parallelism: running_count,
1512 waiting_parallelism: waiting_count,
1513 available_parallelism: available_count,
1514 pull_task_ack: *pull_task_ack,
1515 pending_pull_task_count,
1516 };
1517
1518 if log_throttler.should_log(¤t_state) {
1520 tracing::info!(
1521 running_parallelism_count = %current_state.running_parallelism,
1522 waiting_parallelism_count = %current_state.waiting_parallelism,
1523 available_parallelism = %current_state.available_parallelism,
1524 pull_task_ack = %current_state.pull_task_ack,
1525 pending_pull_task_count = %current_state.pending_pull_task_count
1526 );
1527 log_throttler.update(current_state);
1528 }
1529
1530 false }
1532
1533#[cfg(test)]
1534mod tests {
1535 use super::*;
1536
1537 #[test]
1538 fn test_cancel_iceberg_task_removes_waiting_plans_and_tracker() {
1539 let task_id = 42;
1540 let mut task_queue = IcebergTaskQueue::new(10, 30);
1541 assert_eq!(
1542 task_queue.push(
1543 iceberg_compaction::IcebergTaskMeta {
1544 task_id,
1545 plan_index: 0,
1546 required_parallelism: 3,
1547 },
1548 None,
1549 ),
1550 PushResult::Added
1551 );
1552 assert_eq!(
1553 task_queue.push(
1554 iceberg_compaction::IcebergTaskMeta {
1555 task_id,
1556 plan_index: 1,
1557 required_parallelism: 4,
1558 },
1559 None,
1560 ),
1561 PushResult::Added
1562 );
1563 assert_eq!(task_queue.waiting_parallelism_sum(), 7);
1564
1565 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1566 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 2))]);
1567
1568 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1569
1570 assert_eq!(task_queue.waiting_parallelism_sum(), 0);
1571 assert!(!task_trackers.contains_key(&task_id));
1572 }
1573
1574 #[test]
1575 fn test_cancel_iceberg_task_shuts_down_running_plans_and_tracker() {
1576 let task_id = 43;
1577 let task_key = (task_id, 0);
1578 let mut task_queue = IcebergTaskQueue::new(10, 30);
1579 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1580 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1581 shutdown_map.lock().unwrap().insert(task_key, shutdown_tx);
1582 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 1))]);
1583
1584 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1585
1586 assert!(shutdown_rx.try_recv().is_ok());
1587 assert!(shutdown_map.lock().unwrap().is_empty());
1588 assert!(!task_trackers.contains_key(&task_id));
1589 }
1590
1591 #[test]
1592 fn test_log_state_equality() {
1593 let state1 = CompactionLogState {
1595 running_parallelism: 10,
1596 pull_task_ack: true,
1597 pending_pull_task_count: 2,
1598 };
1599 let state2 = CompactionLogState {
1600 running_parallelism: 10,
1601 pull_task_ack: true,
1602 pending_pull_task_count: 2,
1603 };
1604 let state3 = CompactionLogState {
1605 running_parallelism: 11,
1606 pull_task_ack: true,
1607 pending_pull_task_count: 2,
1608 };
1609 assert_eq!(state1, state2);
1610 assert_ne!(state1, state3);
1611
1612 let ice_state1 = IcebergCompactionLogState {
1614 running_parallelism: 10,
1615 waiting_parallelism: 5,
1616 available_parallelism: 15,
1617 pull_task_ack: true,
1618 pending_pull_task_count: 2,
1619 };
1620 let ice_state2 = IcebergCompactionLogState {
1621 running_parallelism: 10,
1622 waiting_parallelism: 6,
1623 available_parallelism: 15,
1624 pull_task_ack: true,
1625 pending_pull_task_count: 2,
1626 };
1627 assert_ne!(ice_state1, ice_state2);
1628 }
1629
1630 #[test]
1631 fn test_log_throttler_state_change_detection() {
1632 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1633 let state1 = CompactionLogState {
1634 running_parallelism: 10,
1635 pull_task_ack: true,
1636 pending_pull_task_count: 2,
1637 };
1638 let state2 = CompactionLogState {
1639 running_parallelism: 11,
1640 pull_task_ack: true,
1641 pending_pull_task_count: 2,
1642 };
1643
1644 assert!(throttler.should_log(&state1));
1646 throttler.update(state1.clone());
1647
1648 assert!(!throttler.should_log(&state1));
1650
1651 assert!(throttler.should_log(&state2));
1653 throttler.update(state2.clone());
1654
1655 assert!(!throttler.should_log(&state2));
1657 }
1658
1659 #[test]
1660 fn test_log_throttler_heartbeat() {
1661 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1662 let state = CompactionLogState {
1663 running_parallelism: 10,
1664 pull_task_ack: true,
1665 pending_pull_task_count: 2,
1666 };
1667
1668 assert!(throttler.should_log(&state));
1670 throttler.update(state.clone());
1671
1672 assert!(!throttler.should_log(&state));
1674
1675 std::thread::sleep(Duration::from_millis(15));
1677
1678 assert!(throttler.should_log(&state));
1680 }
1681}