1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, PbSstableFilterType, ReportCompactionTaskRequest,
78 SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockErrorInner, HummockResult, ObjectIdManager, SstableBuilderOptions,
94 Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder,
105 SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106 validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116 running_parallelism: u32,
117 pull_task_ack: bool,
118 pending_pull_task_count: u32,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124 running_parallelism: u32,
125 waiting_parallelism: u32,
126 available_parallelism: u32,
127 pull_task_ack: bool,
128 pending_pull_task_count: u32,
129}
130
131struct LogThrottler<T: PartialEq> {
133 last_logged_state: Option<T>,
134 last_heartbeat: Instant,
135 heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139 fn new(heartbeat_interval: Duration) -> Self {
140 Self {
141 last_logged_state: None,
142 last_heartbeat: Instant::now(),
143 heartbeat_interval,
144 }
145 }
146
147 fn should_log(&self, current_state: &T) -> bool {
149 self.last_logged_state.as_ref() != Some(current_state)
150 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151 }
152
153 fn update(&mut self, current_state: T) {
155 self.last_logged_state = Some(current_state);
156 self.last_heartbeat = Instant::now();
157 }
158}
159
160pub struct Compactor {
162 context: CompactorContext,
164 object_id_getter: Arc<dyn GetObjectId>,
165 task_config: TaskConfig,
166 options: SstableBuilderOptions,
167 get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173 pub fn new(
175 context: CompactorContext,
176 options: SstableBuilderOptions,
177 task_config: TaskConfig,
178 object_id_getter: Arc<dyn GetObjectId>,
179 ) -> Self {
180 Self {
181 context,
182 options,
183 task_config,
184 get_id_time: Arc::new(AtomicU64::new(0)),
185 object_id_getter,
186 }
187 }
188
189 async fn compact_key_range(
194 &self,
195 iter: impl HummockIterator<Direction = Forward>,
196 compaction_filter: impl CompactionFilter,
197 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198 task_progress: Option<Arc<TaskProgress>>,
199 task_id: Option<HummockCompactionTaskId>,
200 split_index: Option<usize>,
201 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202 let compact_timer = if self.context.is_share_buffer_compact {
204 self.context
205 .compactor_metrics
206 .write_build_l0_sst_duration
207 .start_timer()
208 } else {
209 self.context
210 .compactor_metrics
211 .compact_sst_duration
212 .start_timer()
213 };
214
215 let (split_table_outputs, table_stats_map) = {
216 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217 match (
218 self.task_config.sstable_filter_kind,
219 self.task_config.use_block_based_filter,
220 ) {
221 (PbSstableFilterType::SstableFilterXor8, true) => {
222 self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
223 factory,
224 iter,
225 compaction_filter,
226 compaction_catalog_agent_ref,
227 task_progress.clone(),
228 self.object_id_getter.clone(),
229 )
230 .instrument_await("compact".verbose())
231 .await?
232 }
233 (PbSstableFilterType::SstableFilterXor8, false) => {
234 self.compact_key_range_impl::<_, Xor8FilterBuilder>(
235 factory,
236 iter,
237 compaction_filter,
238 compaction_catalog_agent_ref,
239 task_progress.clone(),
240 self.object_id_getter.clone(),
241 )
242 .instrument_await("compact".verbose())
243 .await?
244 }
245 (PbSstableFilterType::SstableFilterXor16, true) => {
246 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
247 factory,
248 iter,
249 compaction_filter,
250 compaction_catalog_agent_ref,
251 task_progress.clone(),
252 self.object_id_getter.clone(),
253 )
254 .instrument_await("compact".verbose())
255 .await?
256 }
257 (PbSstableFilterType::SstableFilterXor16, false) => {
258 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
259 factory,
260 iter,
261 compaction_filter,
262 compaction_catalog_agent_ref,
263 task_progress.clone(),
264 self.object_id_getter.clone(),
265 )
266 .instrument_await("compact".verbose())
267 .await?
268 }
269 (kind, _) => unreachable!("unsupported sstable filter kind in compactor: {kind:?}"),
270 }
271 };
272
273 compact_timer.observe_duration();
274
275 Self::report_progress(
276 self.context.compactor_metrics.clone(),
277 task_progress,
278 &split_table_outputs,
279 self.context.is_share_buffer_compact,
280 );
281
282 self.context
283 .compactor_metrics
284 .get_table_id_total_time_duration
285 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
286
287 debug_assert!(
288 split_table_outputs
289 .iter()
290 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
291 );
292
293 if task_id.is_some() {
294 tracing::info!(
296 "Finish Task {:?} split_index {:?} sst count {}",
297 task_id,
298 split_index,
299 split_table_outputs.len()
300 );
301 }
302 Ok((split_table_outputs, table_stats_map))
303 }
304
305 pub fn report_progress(
306 metrics: Arc<CompactorMetrics>,
307 task_progress: Option<Arc<TaskProgress>>,
308 ssts: &Vec<LocalSstableInfo>,
309 is_share_buffer_compact: bool,
310 ) {
311 for sst_info in ssts {
312 let sst_size = sst_info.file_size();
313 if let Some(tracker) = &task_progress {
314 tracker.inc_ssts_uploaded();
315 tracker.dec_num_pending_write_io();
316 }
317 if is_share_buffer_compact {
318 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
319 } else {
320 metrics.compaction_upload_sst_counts.inc();
321 }
322 }
323 }
324
325 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
326 &self,
327 writer_factory: F,
328 iter: impl HummockIterator<Direction = Forward>,
329 compaction_filter: impl CompactionFilter,
330 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331 task_progress: Option<Arc<TaskProgress>>,
332 object_id_getter: Arc<dyn GetObjectId>,
333 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
334 let builder_factory = RemoteBuilderFactory::<F, B> {
335 object_id_getter,
336 limiter: self.context.memory_limiter.clone(),
337 options: self.options.clone(),
338 policy: self.task_config.cache_policy,
339 remote_rpc_cost: self.get_id_time.clone(),
340 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
341 sstable_writer_factory: writer_factory,
342 _phantom: PhantomData,
343 };
344
345 let mut sst_builder = CapacitySplitTableBuilder::new(
346 builder_factory,
347 self.context.compactor_metrics.clone(),
348 task_progress.clone(),
349 self.task_config.table_vnode_partition.clone(),
350 self.context
351 .storage_opts
352 .compactor_concurrent_uploading_sst_count,
353 compaction_catalog_agent_ref,
354 );
355 let compaction_statistics = compact_and_build_sst(
356 &mut sst_builder,
357 &self.task_config,
358 self.context.compactor_metrics.clone(),
359 iter,
360 compaction_filter,
361 )
362 .instrument_await("compact_and_build_sst".verbose())
363 .await?;
364
365 let ssts = sst_builder
366 .finish()
367 .instrument_await("builder_finish".verbose())
368 .await?;
369
370 Ok((ssts, compaction_statistics))
371 }
372}
373
374#[must_use]
377pub fn start_iceberg_compactor(
378 compactor_context: CompactorContext,
379 hummock_meta_client: Arc<dyn HummockMetaClient>,
380) -> (JoinHandle<()>, Sender<()>) {
381 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
382 let stream_retry_interval = Duration::from_secs(30);
383 let periodic_event_update_interval = Duration::from_millis(
384 compactor_context
385 .storage_opts
386 .iceberg_compaction_pull_interval_ms,
387 );
388 let worker_num = compactor_context.compaction_executor.worker_num();
389
390 let max_task_parallelism: u32 = (worker_num as f32
391 * compactor_context.storage_opts.compactor_max_task_multiplier)
392 .ceil() as u32;
393
394 const MAX_PULL_TASK_COUNT: u32 = 4;
395 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
396
397 assert_ge!(
398 compactor_context.storage_opts.compactor_max_task_multiplier,
399 0.0
400 );
401
402 let join_handle = tokio::spawn(async move {
403 let pending_parallelism_budget = (max_task_parallelism as f32
405 * compactor_context
406 .storage_opts
407 .iceberg_compaction_pending_parallelism_budget_multiplier)
408 .ceil() as u32;
409 let mut task_queue =
410 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
411
412 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
414
415 let (task_completion_tx, mut task_completion_rx) =
417 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
418 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
419 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
422
423 let mut min_interval = tokio::time::interval(stream_retry_interval);
424 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
425
426 let mut log_throttler =
428 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
429
430 'start_stream: loop {
432 let mut pull_task_ack = true;
435 tokio::select! {
436 _ = min_interval.tick() => {},
438 _ = &mut shutdown_rx => {
440 tracing::info!("Compactor is shutting down");
441 return;
442 }
443 }
444
445 let (request_sender, response_event_stream) = match hummock_meta_client
446 .subscribe_iceberg_compaction_event()
447 .await
448 {
449 Ok((request_sender, response_event_stream)) => {
450 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
451 (request_sender, response_event_stream)
452 }
453
454 Err(e) => {
455 tracing::warn!(
456 error = %e.as_report(),
457 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
458 );
459 continue 'start_stream;
460 }
461 };
462
463 if matches!(
464 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
465 ReportSendResult::RestartStream
466 ) {
467 continue 'start_stream;
468 }
469
470 pin_mut!(response_event_stream);
471
472 let _executor = compactor_context.compaction_executor.clone();
473
474 let mut event_loop_iteration_now = Instant::now();
476 'consume_stream: loop {
477 {
478 compactor_context
480 .compactor_metrics
481 .compaction_event_loop_iteration_latency
482 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
483 event_loop_iteration_now = Instant::now();
484 }
485
486 let request_sender = request_sender.clone();
487 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
488 Some(plan_completion) = task_completion_rx.recv() => {
490 let task_key = plan_completion.task_key;
491 let error_message = plan_completion.error_message;
492 tracing::debug!(
493 task_id = task_key.0,
494 plan_index = task_key.1,
495 success = error_message.is_none(),
496 "Plan completed, updating queue state"
497 );
498 task_queue.finish_running(task_key);
499
500 let completed_task_id = task_key.0;
501 let Entry::Occupied(mut tracker_entry) =
502 task_trackers.entry(completed_task_id)
503 else {
504 continue 'consume_stream;
505 };
506 tracker_entry.get_mut().record_completion(error_message);
507 if !tracker_entry.get().is_finished() {
508 continue 'consume_stream;
509 }
510
511 let report = tracker_entry.remove().into_report(completed_task_id);
512 if matches!(
513 send_or_buffer_iceberg_task_report(
514 &request_sender,
515 &mut pending_task_reports,
516 report,
517 ),
518 ReportSendResult::RestartStream
519 ) {
520 continue 'start_stream;
521 }
522 continue 'consume_stream;
523 }
524
525 _ = task_queue.wait_schedulable() => {
527 schedule_queued_tasks(
528 &mut task_queue,
529 &compactor_context,
530 &shutdown_map,
531 &task_completion_tx,
532 );
533 continue 'consume_stream;
534 }
535
536 _ = periodic_event_interval.tick() => {
537 let should_restart_stream = handle_meta_task_pulling(
539 &mut pull_task_ack,
540 &task_queue,
541 max_task_parallelism,
542 max_pull_task_count,
543 &request_sender,
544 &mut log_throttler,
545 );
546
547 if should_restart_stream {
548 continue 'start_stream;
549 }
550 continue;
551 }
552 event = response_event_stream.next() => {
553 event
554 }
555
556 _ = &mut shutdown_rx => {
557 tracing::info!("Iceberg Compactor is shutting down");
558 return
559 }
560 };
561
562 match event {
563 Some(Ok(SubscribeIcebergCompactionEventResponse {
564 event,
565 create_at: _create_at,
566 })) => {
567 let event = match event {
568 Some(event) => event,
569 None => continue 'consume_stream,
570 };
571
572 match event {
573 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
574 let task_id = iceberg_compaction_task.task_id;
575 let sink_id = iceberg_compaction_task.sink_id;
576 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
578 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
579 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
580 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
581 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
582 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
583 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
584 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
585 .target_binpack_group_size_mb(
586 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
587 )
588 .min_group_size_mb(
589 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
590 )
591 .min_group_file_count(
592 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
593 )
594 .build() {
595 Ok(config) => config,
596 Err(e) => {
597 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
598 let report = build_iceberg_task_report(
599 task_id,
600 sink_id,
601 Some(format!(
602 "Failed to build iceberg compactor runner config: {}",
603 e.as_report()
604 )),
605 );
606 if matches!(
607 send_or_buffer_iceberg_task_report(
608 &request_sender,
609 &mut pending_task_reports,
610 report,
611 ),
612 ReportSendResult::RestartStream
613 ) {
614 continue 'start_stream;
615 }
616 continue 'consume_stream;
617 }
618 };
619
620 let task_execution = match create_task_execution(
622 iceberg_compaction_task,
623 compactor_runner_config,
624 compactor_context.compactor_metrics.clone(),
625 ).await {
626 Ok(task_execution) => task_execution,
627 Err(e) => {
628 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
629 let report = build_iceberg_task_report(
630 task_id,
631 sink_id,
632 Some(format!(
633 "Failed to create iceberg compaction task execution: {}",
634 e.as_report()
635 )),
636 );
637 if matches!(
638 send_or_buffer_iceberg_task_report(
639 &request_sender,
640 &mut pending_task_reports,
641 report,
642 ),
643 ReportSendResult::RestartStream
644 ) {
645 continue 'start_stream;
646 }
647 continue 'consume_stream;
648 }
649 };
650
651 let sink_id = task_execution.sink_id;
652 let plan_runners = task_execution.plan_runners;
653
654 if plan_runners.is_empty() {
655 tracing::info!(task_id, sink_id, "No plans to execute");
656 let report = build_iceberg_task_report(task_id, sink_id, None);
657 if matches!(
658 send_or_buffer_iceberg_task_report(
659 &request_sender,
660 &mut pending_task_reports,
661 report,
662 ),
663 ReportSendResult::RestartStream
664 ) {
665 continue 'start_stream;
666 }
667 continue 'consume_stream;
668 }
669
670 let total_plans = plan_runners.len();
672 let mut enqueued_count = 0;
673
674 for runner in plan_runners {
675 let meta = runner.to_meta();
676 let plan_index = meta.plan_index;
677 let required_parallelism = runner.required_parallelism();
678 let push_result = task_queue.push(meta, Some(runner));
679
680 match push_result {
681 PushResult::Added => {
682 enqueued_count += 1;
683 tracing::debug!(
684 task_id = task_id,
685 plan_index = plan_index,
686 required_parallelism = required_parallelism,
687 "Iceberg plan runner added to queue"
688 );
689 },
690 PushResult::RejectedCapacity => {
691 tracing::warn!(
692 task_id = task_id,
693 required_parallelism = required_parallelism,
694 pending_budget = pending_parallelism_budget,
695 enqueued_count = enqueued_count,
696 total_plans = total_plans,
697 "Iceberg plan runner rejected - queue capacity exceeded"
698 );
699 break;
701 },
702 PushResult::RejectedTooLarge => {
703 tracing::error!(
704 task_id = task_id,
705 required_parallelism = required_parallelism,
706 max_parallelism = max_task_parallelism,
707 "Iceberg plan runner rejected - parallelism exceeds max"
708 );
709 },
710 PushResult::RejectedInvalidParallelism => {
711 tracing::error!(
712 task_id = task_id,
713 required_parallelism = required_parallelism,
714 "Iceberg plan runner rejected - invalid parallelism"
715 );
716 },
717 PushResult::RejectedDuplicate => {
718 tracing::error!(
719 task_id = task_id,
720 plan_index = plan_index,
721 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
722 );
723 }
724 }
725 }
726
727 if enqueued_count == 0 {
728 let report = build_iceberg_task_report(
729 task_id,
730 sink_id,
731 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
732 );
733 if matches!(
734 send_or_buffer_iceberg_task_report(
735 &request_sender,
736 &mut pending_task_reports,
737 report,
738 ),
739 ReportSendResult::RestartStream
740 ) {
741 continue 'start_stream;
742 }
743 } else {
744 task_trackers.insert(
745 task_id,
746 IcebergTaskTracker::new(sink_id, enqueued_count),
747 );
748 }
749
750 tracing::info!(
751 task_id = task_id,
752 sink_id = sink_id,
753 total_plans = total_plans,
754 enqueued_count = enqueued_count,
755 "Enqueued {} of {} Iceberg plan runners",
756 enqueued_count,
757 total_plans
758 );
759 },
760 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
761 pull_task_ack = true;
763 },
764 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
765 cancel_iceberg_task(
766 cancel_compact_task.task_id,
767 &mut task_queue,
768 &shutdown_map,
769 &mut task_trackers,
770 );
771 },
772 }
773 }
774 Some(Err(e)) => {
775 tracing::warn!("Failed to consume stream. {}", e.message());
776 continue 'start_stream;
777 }
778 _ => {
779 continue 'start_stream;
781 }
782 }
783 }
784 }
785 });
786
787 (join_handle, shutdown_tx)
788}
789
790#[must_use]
793pub fn start_compactor(
794 compactor_context: CompactorContext,
795 hummock_meta_client: Arc<dyn HummockMetaClient>,
796 object_id_manager: Arc<ObjectIdManager>,
797 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
798) -> (JoinHandle<()>, Sender<()>) {
799 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
800 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
801 let stream_retry_interval = Duration::from_secs(30);
802 let task_progress = compactor_context.task_progress_manager.clone();
803 let periodic_event_update_interval = Duration::from_millis(1000);
804
805 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
806 * compactor_context.storage_opts.compactor_max_task_multiplier)
807 .ceil() as u32;
808 let running_task_parallelism = Arc::new(AtomicU32::new(0));
809
810 const MAX_PULL_TASK_COUNT: u32 = 4;
811 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
812
813 assert_ge!(
814 compactor_context.storage_opts.compactor_max_task_multiplier,
815 0.0
816 );
817
818 let join_handle = tokio::spawn(async move {
819 let shutdown_map = CompactionShutdownMap::default();
820 let mut min_interval = tokio::time::interval(stream_retry_interval);
821 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
822
823 let mut log_throttler =
825 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
826
827 'start_stream: loop {
829 let mut pull_task_ack = true;
832 tokio::select! {
833 _ = min_interval.tick() => {},
835 _ = &mut shutdown_rx => {
837 tracing::info!("Compactor is shutting down");
838 return;
839 }
840 }
841
842 let (request_sender, response_event_stream) =
843 match hummock_meta_client.subscribe_compaction_event().await {
844 Ok((request_sender, response_event_stream)) => {
845 tracing::debug!("Succeeded subscribe_compaction_event.");
846 (request_sender, response_event_stream)
847 }
848
849 Err(e) => {
850 tracing::warn!(
851 error = %e.as_report(),
852 "Subscribing to compaction tasks failed with error. Will retry.",
853 );
854 continue 'start_stream;
855 }
856 };
857
858 pin_mut!(response_event_stream);
859
860 let executor = compactor_context.compaction_executor.clone();
861 let object_id_manager = object_id_manager.clone();
862
863 let mut event_loop_iteration_now = Instant::now();
865 'consume_stream: loop {
866 {
867 compactor_context
869 .compactor_metrics
870 .compaction_event_loop_iteration_latency
871 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
872 event_loop_iteration_now = Instant::now();
873 }
874
875 let running_task_parallelism = running_task_parallelism.clone();
876 let request_sender = request_sender.clone();
877 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
878 _ = periodic_event_interval.tick() => {
879 let progress_list = get_task_progress(task_progress.clone());
880
881 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
882 event: Some(RequestEvent::HeartBeat(
883 HeartBeat {
884 progress: progress_list
885 }
886 )),
887 create_at: SystemTime::now()
888 .duration_since(std::time::UNIX_EPOCH)
889 .expect("Clock may have gone backwards")
890 .as_millis() as u64,
891 }) {
892 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
893 continue 'start_stream;
895 }
896
897
898 let mut pending_pull_task_count = 0;
899 if pull_task_ack {
900 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
902
903 if pending_pull_task_count > 0 {
904 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
905 event: Some(RequestEvent::PullTask(
906 PullTask {
907 pull_task_count: pending_pull_task_count,
908 }
909 )),
910 create_at: SystemTime::now()
911 .duration_since(std::time::UNIX_EPOCH)
912 .expect("Clock may have gone backwards")
913 .as_millis() as u64,
914 }) {
915 tracing::warn!(error = %e.as_report(), "Failed to pull task");
916
917 continue 'start_stream;
919 } else {
920 pull_task_ack = false;
921 }
922 }
923 }
924
925 let running_count = running_task_parallelism.load(Ordering::SeqCst);
926 let current_state = CompactionLogState {
927 running_parallelism: running_count,
928 pull_task_ack,
929 pending_pull_task_count,
930 };
931
932 if log_throttler.should_log(¤t_state) {
934 tracing::info!(
935 running_parallelism_count = %current_state.running_parallelism,
936 pull_task_ack = %current_state.pull_task_ack,
937 pending_pull_task_count = %current_state.pending_pull_task_count
938 );
939 log_throttler.update(current_state);
940 }
941
942 continue;
943 }
944 event = response_event_stream.next() => {
945 event
946 }
947
948 _ = &mut shutdown_rx => {
949 tracing::info!("Compactor is shutting down");
950 return
951 }
952 };
953
954 fn send_report_task_event(
955 compact_task: &CompactTask,
956 table_stats: TableStatsMap,
957 object_timestamps: HashMap<HummockSstableObjectId, u64>,
958 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
959 ) {
960 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
961 event: Some(RequestEvent::ReportTask(ReportTask {
962 task_id: compact_task.task_id,
963 task_status: compact_task.task_status.into(),
964 sorted_output_ssts: compact_task
965 .sorted_output_ssts
966 .iter()
967 .map(|sst| sst.into())
968 .collect(),
969 table_stats_change: to_prost_table_stats_map(table_stats),
970 object_timestamps,
971 })),
972 create_at: SystemTime::now()
973 .duration_since(std::time::UNIX_EPOCH)
974 .expect("Clock may have gone backwards")
975 .as_millis() as u64,
976 }) {
977 let task_id = compact_task.task_id;
978 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
979 }
980 }
981
982 match event {
983 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
984 let event = match event {
985 Some(event) => event,
986 None => continue 'consume_stream,
987 };
988 let shutdown = shutdown_map.clone();
989 let context = compactor_context.clone();
990 let consumed_latency_ms = SystemTime::now()
991 .duration_since(std::time::UNIX_EPOCH)
992 .expect("Clock may have gone backwards")
993 .as_millis() as u64
994 - create_at;
995 context
996 .compactor_metrics
997 .compaction_event_consumed_latency
998 .observe(consumed_latency_ms as _);
999
1000 let object_id_manager = object_id_manager.clone();
1001 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1002
1003 match event {
1004 ResponseEvent::CompactTask(compact_task) => {
1005 let compact_task = CompactTask::from(compact_task);
1006 let parallelism =
1007 calculate_task_parallelism(&compact_task, &context);
1008
1009 assert_ne!(parallelism, 0, "splits cannot be empty");
1010
1011 if (max_task_parallelism
1012 - running_task_parallelism.load(Ordering::SeqCst))
1013 < parallelism as u32
1014 {
1015 tracing::warn!(
1016 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1017 compact_task.task_id,
1018 parallelism,
1019 max_task_parallelism,
1020 running_task_parallelism.load(Ordering::Relaxed),
1021 );
1022 let (compact_task, table_stats, object_timestamps) =
1023 compact_done(
1024 compact_task,
1025 context.clone(),
1026 vec![],
1027 TaskStatus::NoAvailCpuResourceCanceled,
1028 );
1029
1030 send_report_task_event(
1031 &compact_task,
1032 table_stats,
1033 object_timestamps,
1034 &request_sender,
1035 );
1036
1037 continue 'consume_stream;
1038 }
1039
1040 running_task_parallelism
1041 .fetch_add(parallelism as u32, Ordering::SeqCst);
1042 executor.spawn(async move {
1043 let (tx, rx) = tokio::sync::oneshot::channel();
1044 let task_id = compact_task.task_id;
1045 shutdown.lock().unwrap().insert(task_id, tx);
1046
1047 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1048 context.clone(),
1049 compact_task,
1050 rx,
1051 object_id_manager.clone(),
1052 compaction_catalog_manager_ref.clone(),
1053 )
1054 .await;
1055
1056 shutdown.lock().unwrap().remove(&task_id);
1057 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1058
1059 send_report_task_event(
1060 &compact_task,
1061 table_stats,
1062 object_timestamps,
1063 &request_sender,
1064 );
1065
1066 let enable_check_compaction_result =
1067 context.storage_opts.check_compaction_result;
1068 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1069
1070 if enable_check_compaction_result && need_check_task {
1071 let read_table_ids = compact_task
1072 .get_table_ids_from_input_ssts()
1073 .collect::<Vec<_>>();
1074 match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1075 Ok(compaction_catalog_agent_ref) => {
1076 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1077 {
1078 Err(e) => {
1079 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1080 }
1081 Ok(true) => (),
1082 Ok(false) => {
1083 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1084 }
1085 }
1086 },
1087 Err(e) => {
1088 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1089 }
1090 }
1091 }
1092 });
1093 }
1094 #[expect(deprecated)]
1095 ResponseEvent::VacuumTask(_) => {
1096 unreachable!("unexpected vacuum task");
1097 }
1098 #[expect(deprecated)]
1099 ResponseEvent::FullScanTask(_) => {
1100 unreachable!("unexpected scan task");
1101 }
1102 #[expect(deprecated)]
1103 ResponseEvent::ValidationTask(validation_task) => {
1104 let validation_task = ValidationTask::from(validation_task);
1105 executor.spawn(async move {
1106 validate_ssts(validation_task, context.sstable_store.clone())
1107 .await;
1108 });
1109 }
1110 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1111 .lock()
1112 .unwrap()
1113 .remove(&cancel_compact_task.task_id)
1114 {
1115 Some(tx) => {
1116 if tx.send(()).is_err() {
1117 tracing::warn!(
1118 "Cancellation of compaction task failed. task_id: {}",
1119 cancel_compact_task.task_id
1120 );
1121 }
1122 }
1123 _ => {
1124 tracing::warn!(
1125 "Attempting to cancel non-existent compaction task. task_id: {}",
1126 cancel_compact_task.task_id
1127 );
1128 }
1129 },
1130
1131 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1132 pull_task_ack = true;
1134 }
1135 }
1136 }
1137 Some(Err(e)) => {
1138 tracing::warn!("Failed to consume stream. {}", e.message());
1139 continue 'start_stream;
1140 }
1141 _ => {
1142 continue 'start_stream;
1144 }
1145 }
1146 }
1147 }
1148 });
1149
1150 (join_handle, shutdown_tx)
1151}
1152
1153#[must_use]
1156pub fn start_shared_compactor(
1157 grpc_proxy_client: GrpcCompactorProxyClient,
1158 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1159 context: CompactorContext,
1160) -> (JoinHandle<()>, Sender<()>) {
1161 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1162 let task_progress = context.task_progress_manager.clone();
1163 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1164 let periodic_event_update_interval = Duration::from_millis(1000);
1165
1166 let join_handle = tokio::spawn(async move {
1167 let shutdown_map = CompactionShutdownMap::default();
1168
1169 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1170 let executor = context.compaction_executor.clone();
1171 let report_heartbeat_client = grpc_proxy_client.clone();
1172 'consume_stream: loop {
1173 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1174 _ = periodic_event_interval.tick() => {
1175 let progress_list = get_task_progress(task_progress.clone());
1176 let report_compaction_task_request = ReportCompactionTaskRequest{
1177 event: Some(ReportCompactionTaskEvent::HeartBeat(
1178 SharedHeartBeat {
1179 progress: progress_list
1180 }
1181 )),
1182 };
1183 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1184 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1185 }
1186 continue
1187 }
1188
1189
1190 _ = &mut shutdown_rx => {
1191 tracing::info!("Compactor is shutting down");
1192 return
1193 }
1194
1195 request = receiver.recv() => {
1196 request
1197 }
1198
1199 };
1200 match request {
1201 Some(request) => {
1202 let context = context.clone();
1203 let shutdown = shutdown_map.clone();
1204
1205 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1206 executor.spawn(async move {
1207 let DispatchCompactionTaskRequest {
1208 tables,
1209 output_object_ids,
1210 task: dispatch_task,
1211 } = request.into_inner();
1212 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1213 acc.insert(table.id, table);
1214 acc
1215 });
1216
1217 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1218 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1219 let shared_compactor_object_id_manager =
1220 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1221 match dispatch_task.unwrap() {
1222 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1223 let compact_task = CompactTask::from(&compact_task);
1224 let (tx, rx) = tokio::sync::oneshot::channel();
1225 let task_id = compact_task.task_id;
1226 shutdown.lock().unwrap().insert(task_id, tx);
1227
1228 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1229 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1230 context.clone(),
1231 compact_task,
1232 rx,
1233 shared_compactor_object_id_manager,
1234 compaction_catalog_agent_ref.clone(),
1235 )
1236 .await;
1237 shutdown.lock().unwrap().remove(&task_id);
1238 let report_compaction_task_request = ReportCompactionTaskRequest {
1239 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1240 compact_task: Some(PbCompactTask::from(&compact_task)),
1241 table_stats_change: to_prost_table_stats_map(table_stats),
1242 object_timestamps,
1243 })),
1244 };
1245
1246 match cloned_grpc_proxy_client
1247 .report_compaction_task(report_compaction_task_request)
1248 .await
1249 {
1250 Ok(_) => {
1251 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1253 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1254 if enable_check_compaction_result && need_check_task {
1255 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1256 Err(e) => {
1257 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1258 },
1259 Ok(true) => (),
1260 Ok(false) => {
1261 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1262 }
1263 }
1264 }
1265 }
1266 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1267 }
1268
1269 }
1270 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1271 unreachable!("unexpected vacuum task");
1272 }
1273 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1274 unreachable!("unexpected scan task");
1275 }
1276 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1277 let validation_task = ValidationTask::from(validation_task);
1278 validate_ssts(validation_task, context.sstable_store.clone()).await;
1279 }
1280 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1281 match shutdown
1282 .lock()
1283 .unwrap()
1284 .remove(&cancel_compact_task.task_id)
1285 { Some(tx) => {
1286 if tx.send(()).is_err() {
1287 tracing::warn!(
1288 "Cancellation of compaction task failed. task_id: {}",
1289 cancel_compact_task.task_id
1290 );
1291 }
1292 } _ => {
1293 tracing::warn!(
1294 "Attempting to cancel non-existent compaction task. task_id: {}",
1295 cancel_compact_task.task_id
1296 );
1297 }}
1298 }
1299 }
1300 });
1301 }
1302 None => continue 'consume_stream,
1303 }
1304 }
1305 });
1306 (join_handle, shutdown_tx)
1307}
1308
1309fn get_task_progress(
1310 task_progress: Arc<
1311 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1312 >,
1313) -> Vec<CompactTaskProgress> {
1314 let mut progress_list = Vec::new();
1315 for (&task_id, progress) in &*task_progress.lock() {
1316 progress_list.push(progress.snapshot(task_id));
1317 }
1318 progress_list
1319}
1320
1321fn schedule_queued_tasks(
1323 task_queue: &mut IcebergTaskQueue,
1324 compactor_context: &CompactorContext,
1325 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1326 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1327) {
1328 while let Some(popped_task) = task_queue.pop() {
1329 let task_id = popped_task.meta.task_id;
1330 let plan_index = popped_task.meta.plan_index;
1331 let task_key = (task_id, plan_index);
1332
1333 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1335
1336 let Some(runner) = popped_task.runner else {
1337 tracing::error!(
1338 task_id = task_id,
1339 plan_index = plan_index,
1340 "Popped task missing runner - this should not happen"
1341 );
1342 task_queue.finish_running(task_key);
1343 continue;
1344 };
1345
1346 let executor = compactor_context.compaction_executor.clone();
1347 let shutdown_map_clone = shutdown_map.clone();
1348 let completion_tx_clone = task_completion_tx.clone();
1349 let (tx, rx) = tokio::sync::oneshot::channel();
1350
1351 {
1352 let mut shutdown_guard = shutdown_map.lock().unwrap();
1353 shutdown_guard.insert(task_key, tx);
1354 }
1355
1356 tracing::info!(
1357 task_id = task_id,
1358 plan_index = plan_index,
1359 unique_ident = ?unique_ident,
1360 required_parallelism = popped_task.meta.required_parallelism,
1361 "Starting iceberg compaction task from queue"
1362 );
1363
1364 executor.spawn(async move {
1365 let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1366 let mut shutdown_guard = shutdown_map.lock().unwrap();
1367 shutdown_guard.remove(&task_key);
1368 });
1369
1370 let result = Box::pin(runner.compact(rx)).await;
1371
1372 let completion = match result {
1373 Ok(_) => IcebergPlanCompletion {
1374 task_key,
1375 error_message: None,
1376 },
1377 Err(e) => {
1378 if is_cancelled_iceberg_compaction_error(&e) {
1379 tracing::info!(
1380 task_id = task_key.0,
1381 plan_index = task_key.1,
1382 "Iceberg compaction plan cancelled"
1383 );
1384 } else {
1385 tracing::warn!(
1386 error = %e.as_report(),
1387 task_id = task_key.0,
1388 plan_index = task_key.1,
1389 "Failed to compact iceberg runner"
1390 );
1391 }
1392 IcebergPlanCompletion {
1393 task_key,
1394 error_message: Some(e.to_report_string()),
1395 }
1396 }
1397 };
1398
1399 if completion_tx_clone.send(completion).is_err() {
1400 tracing::warn!(
1401 task_id = task_key.0,
1402 plan_index = task_key.1,
1403 "Failed to notify task completion - main loop may have shut down"
1404 );
1405 }
1406 });
1407 }
1408}
1409
1410fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1411 matches!(
1412 error.inner(),
1413 HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1414 )
1415}
1416
1417fn cancel_iceberg_task(
1418 task_id: u64,
1419 task_queue: &mut IcebergTaskQueue,
1420 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1421 task_trackers: &mut HashMap<u64, IcebergTaskTracker>,
1422) {
1423 let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1428 let removed_tracker = task_trackers.remove(&task_id).is_some();
1429
1430 let cancelled_running = {
1431 let mut shutdown_guard = shutdown_map.lock().unwrap();
1432 let task_keys: Vec<_> = shutdown_guard
1433 .keys()
1434 .filter(|(running_task_id, _)| *running_task_id == task_id)
1435 .copied()
1436 .collect();
1437
1438 for task_key in &task_keys {
1439 if let Some(tx) = shutdown_guard.remove(task_key)
1440 && tx.send(()).is_err()
1441 {
1442 tracing::debug!(
1443 task_id = task_key.0,
1444 plan_index = task_key.1,
1445 "Iceberg compaction plan shutdown receiver already closed during cancellation"
1446 );
1447 }
1448 }
1449
1450 task_keys.len()
1451 };
1452
1453 if cancelled_waiting == 0 && cancelled_running == 0 && !removed_tracker {
1454 tracing::warn!(
1455 task_id = task_id,
1456 "Attempting to cancel non-existent iceberg compaction task"
1457 );
1458 } else {
1459 tracing::info!(
1460 task_id = task_id,
1461 cancelled_waiting = cancelled_waiting,
1462 cancelled_running = cancelled_running,
1463 removed_tracker = removed_tracker,
1464 "Cancelled iceberg compaction task"
1465 );
1466 }
1467}
1468
1469fn handle_meta_task_pulling(
1472 pull_task_ack: &mut bool,
1473 task_queue: &IcebergTaskQueue,
1474 max_task_parallelism: u32,
1475 max_pull_task_count: u32,
1476 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1477 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1478) -> bool {
1479 let mut pending_pull_task_count = 0;
1480 if *pull_task_ack {
1481 let current_running_parallelism = task_queue.running_parallelism_sum();
1483 pending_pull_task_count =
1484 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1485
1486 if pending_pull_task_count > 0 {
1487 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1488 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1489 subscribe_iceberg_compaction_event_request::PullTask {
1490 pull_task_count: pending_pull_task_count,
1491 },
1492 )),
1493 create_at: SystemTime::now()
1494 .duration_since(std::time::UNIX_EPOCH)
1495 .expect("Clock may have gone backwards")
1496 .as_millis() as u64,
1497 }) {
1498 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1499 return true; } else {
1501 *pull_task_ack = false;
1502 }
1503 }
1504 }
1505
1506 let running_count = task_queue.running_parallelism_sum();
1507 let waiting_count = task_queue.waiting_parallelism_sum();
1508 let available_count = max_task_parallelism.saturating_sub(running_count);
1509 let current_state = IcebergCompactionLogState {
1510 running_parallelism: running_count,
1511 waiting_parallelism: waiting_count,
1512 available_parallelism: available_count,
1513 pull_task_ack: *pull_task_ack,
1514 pending_pull_task_count,
1515 };
1516
1517 if log_throttler.should_log(¤t_state) {
1519 tracing::info!(
1520 running_parallelism_count = %current_state.running_parallelism,
1521 waiting_parallelism_count = %current_state.waiting_parallelism,
1522 available_parallelism = %current_state.available_parallelism,
1523 pull_task_ack = %current_state.pull_task_ack,
1524 pending_pull_task_count = %current_state.pending_pull_task_count
1525 );
1526 log_throttler.update(current_state);
1527 }
1528
1529 false }
1531
1532#[cfg(test)]
1533mod tests {
1534 use super::*;
1535
1536 #[test]
1537 fn test_cancel_iceberg_task_removes_waiting_plans_and_tracker() {
1538 let task_id = 42;
1539 let mut task_queue = IcebergTaskQueue::new(10, 30);
1540 assert_eq!(
1541 task_queue.push(
1542 iceberg_compaction::IcebergTaskMeta {
1543 task_id,
1544 plan_index: 0,
1545 required_parallelism: 3,
1546 },
1547 None,
1548 ),
1549 PushResult::Added
1550 );
1551 assert_eq!(
1552 task_queue.push(
1553 iceberg_compaction::IcebergTaskMeta {
1554 task_id,
1555 plan_index: 1,
1556 required_parallelism: 4,
1557 },
1558 None,
1559 ),
1560 PushResult::Added
1561 );
1562 assert_eq!(task_queue.waiting_parallelism_sum(), 7);
1563
1564 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1565 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 2))]);
1566
1567 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1568
1569 assert_eq!(task_queue.waiting_parallelism_sum(), 0);
1570 assert!(!task_trackers.contains_key(&task_id));
1571 }
1572
1573 #[test]
1574 fn test_cancel_iceberg_task_shuts_down_running_plans_and_tracker() {
1575 let task_id = 43;
1576 let task_key = (task_id, 0);
1577 let mut task_queue = IcebergTaskQueue::new(10, 30);
1578 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1579 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1580 shutdown_map.lock().unwrap().insert(task_key, shutdown_tx);
1581 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 1))]);
1582
1583 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1584
1585 assert!(shutdown_rx.try_recv().is_ok());
1586 assert!(shutdown_map.lock().unwrap().is_empty());
1587 assert!(!task_trackers.contains_key(&task_id));
1588 }
1589
1590 #[test]
1591 fn test_log_state_equality() {
1592 let state1 = CompactionLogState {
1594 running_parallelism: 10,
1595 pull_task_ack: true,
1596 pending_pull_task_count: 2,
1597 };
1598 let state2 = CompactionLogState {
1599 running_parallelism: 10,
1600 pull_task_ack: true,
1601 pending_pull_task_count: 2,
1602 };
1603 let state3 = CompactionLogState {
1604 running_parallelism: 11,
1605 pull_task_ack: true,
1606 pending_pull_task_count: 2,
1607 };
1608 assert_eq!(state1, state2);
1609 assert_ne!(state1, state3);
1610
1611 let ice_state1 = IcebergCompactionLogState {
1613 running_parallelism: 10,
1614 waiting_parallelism: 5,
1615 available_parallelism: 15,
1616 pull_task_ack: true,
1617 pending_pull_task_count: 2,
1618 };
1619 let ice_state2 = IcebergCompactionLogState {
1620 running_parallelism: 10,
1621 waiting_parallelism: 6,
1622 available_parallelism: 15,
1623 pull_task_ack: true,
1624 pending_pull_task_count: 2,
1625 };
1626 assert_ne!(ice_state1, ice_state2);
1627 }
1628
1629 #[test]
1630 fn test_log_throttler_state_change_detection() {
1631 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1632 let state1 = CompactionLogState {
1633 running_parallelism: 10,
1634 pull_task_ack: true,
1635 pending_pull_task_count: 2,
1636 };
1637 let state2 = CompactionLogState {
1638 running_parallelism: 11,
1639 pull_task_ack: true,
1640 pending_pull_task_count: 2,
1641 };
1642
1643 assert!(throttler.should_log(&state1));
1645 throttler.update(state1.clone());
1646
1647 assert!(!throttler.should_log(&state1));
1649
1650 assert!(throttler.should_log(&state2));
1652 throttler.update(state2.clone());
1653
1654 assert!(!throttler.should_log(&state2));
1656 }
1657
1658 #[test]
1659 fn test_log_throttler_heartbeat() {
1660 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1661 let state = CompactionLogState {
1662 running_parallelism: 10,
1663 pull_task_ack: true,
1664 pending_pull_task_count: 2,
1665 };
1666
1667 assert!(throttler.should_log(&state));
1669 throttler.update(state.clone());
1670
1671 assert!(!throttler.should_log(&state));
1673
1674 std::thread::sleep(Duration::from_millis(15));
1676
1677 assert!(throttler.should_log(&state));
1679 }
1680}