1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, PbSstableFilterLayout, PbSstableFilterType, ReportCompactionTaskRequest,
78 SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockError, HummockErrorInner, HummockResult, ObjectIdManager,
94 SstableBuilderOptions, Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder, NoneFilterBuilder,
105 SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106 validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116 running_parallelism: u32,
117 pull_task_ack: bool,
118 pending_pull_task_count: u32,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124 running_parallelism: u32,
125 waiting_parallelism: u32,
126 available_parallelism: u32,
127 pull_task_ack: bool,
128 pending_pull_task_count: u32,
129}
130
131struct LogThrottler<T: PartialEq> {
133 last_logged_state: Option<T>,
134 last_heartbeat: Instant,
135 heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139 fn new(heartbeat_interval: Duration) -> Self {
140 Self {
141 last_logged_state: None,
142 last_heartbeat: Instant::now(),
143 heartbeat_interval,
144 }
145 }
146
147 fn should_log(&self, current_state: &T) -> bool {
149 self.last_logged_state.as_ref() != Some(current_state)
150 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151 }
152
153 fn update(&mut self, current_state: T) {
155 self.last_logged_state = Some(current_state);
156 self.last_heartbeat = Instant::now();
157 }
158}
159
160pub struct Compactor {
162 context: CompactorContext,
164 object_id_getter: Arc<dyn GetObjectId>,
165 task_config: TaskConfig,
166 options: SstableBuilderOptions,
167 get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173 pub fn new(
175 context: CompactorContext,
176 options: SstableBuilderOptions,
177 task_config: TaskConfig,
178 object_id_getter: Arc<dyn GetObjectId>,
179 ) -> Self {
180 Self {
181 context,
182 options,
183 task_config,
184 get_id_time: Arc::new(AtomicU64::new(0)),
185 object_id_getter,
186 }
187 }
188
189 async fn compact_key_range(
194 &self,
195 iter: impl HummockIterator<Direction = Forward>,
196 compaction_filter: impl CompactionFilter,
197 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198 task_progress: Option<Arc<TaskProgress>>,
199 task_id: Option<HummockCompactionTaskId>,
200 split_index: Option<usize>,
201 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202 let compact_timer = if self.context.is_share_buffer_compact {
204 self.context
205 .compactor_metrics
206 .write_build_l0_sst_duration
207 .start_timer()
208 } else {
209 self.context
210 .compactor_metrics
211 .compact_sst_duration
212 .start_timer()
213 };
214
215 let (split_table_outputs, table_stats_map) = {
216 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217 match (
218 self.task_config.sstable_filter_type,
219 self.task_config.sstable_filter_layout,
220 ) {
221 (PbSstableFilterType::SstableFilterNone, _) => {
222 self.compact_key_range_impl::<_, NoneFilterBuilder>(
223 factory,
224 iter,
225 compaction_filter,
226 compaction_catalog_agent_ref,
227 task_progress.clone(),
228 self.object_id_getter.clone(),
229 )
230 .instrument_await("compact".verbose())
231 .await?
232 }
233 (PbSstableFilterType::SstableFilterXor8, PbSstableFilterLayout::Blocked) => {
234 self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
235 factory,
236 iter,
237 compaction_filter,
238 compaction_catalog_agent_ref,
239 task_progress.clone(),
240 self.object_id_getter.clone(),
241 )
242 .instrument_await("compact".verbose())
243 .await?
244 }
245 (PbSstableFilterType::SstableFilterXor8, PbSstableFilterLayout::Plain) => {
246 self.compact_key_range_impl::<_, Xor8FilterBuilder>(
247 factory,
248 iter,
249 compaction_filter,
250 compaction_catalog_agent_ref,
251 task_progress.clone(),
252 self.object_id_getter.clone(),
253 )
254 .instrument_await("compact".verbose())
255 .await?
256 }
257 (PbSstableFilterType::SstableFilterXor16, PbSstableFilterLayout::Blocked) => {
258 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
259 factory,
260 iter,
261 compaction_filter,
262 compaction_catalog_agent_ref,
263 task_progress.clone(),
264 self.object_id_getter.clone(),
265 )
266 .instrument_await("compact".verbose())
267 .await?
268 }
269 (PbSstableFilterType::SstableFilterXor16, PbSstableFilterLayout::Plain) => {
270 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
271 factory,
272 iter,
273 compaction_filter,
274 compaction_catalog_agent_ref,
275 task_progress.clone(),
276 self.object_id_getter.clone(),
277 )
278 .instrument_await("compact".verbose())
279 .await?
280 }
281 (filter_type, layout) => {
282 return Err(HummockError::compaction_executor(format!(
283 "unresolved SST filter layout in task config: {:?}, {:?}",
284 filter_type, layout
285 )));
286 }
287 }
288 };
289
290 compact_timer.observe_duration();
291
292 Self::report_progress(
293 self.context.compactor_metrics.clone(),
294 task_progress,
295 &split_table_outputs,
296 self.context.is_share_buffer_compact,
297 );
298
299 self.context
300 .compactor_metrics
301 .get_table_id_total_time_duration
302 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
303
304 debug_assert!(
305 split_table_outputs
306 .iter()
307 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
308 );
309
310 if task_id.is_some() {
311 tracing::info!(
313 "Finish Task {:?} split_index {:?} sst count {}",
314 task_id,
315 split_index,
316 split_table_outputs.len()
317 );
318 }
319 Ok((split_table_outputs, table_stats_map))
320 }
321
322 pub fn report_progress(
323 metrics: Arc<CompactorMetrics>,
324 task_progress: Option<Arc<TaskProgress>>,
325 ssts: &Vec<LocalSstableInfo>,
326 is_share_buffer_compact: bool,
327 ) {
328 for sst_info in ssts {
329 let sst_size = sst_info.file_size();
330 if let Some(tracker) = &task_progress {
331 tracker.inc_ssts_uploaded();
332 tracker.dec_num_pending_write_io();
333 }
334 if is_share_buffer_compact {
335 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
336 } else {
337 metrics.compaction_upload_sst_counts.inc();
338 }
339 }
340 }
341
342 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
343 &self,
344 writer_factory: F,
345 iter: impl HummockIterator<Direction = Forward>,
346 compaction_filter: impl CompactionFilter,
347 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
348 task_progress: Option<Arc<TaskProgress>>,
349 object_id_getter: Arc<dyn GetObjectId>,
350 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
351 let builder_factory = RemoteBuilderFactory::<F, B> {
352 object_id_getter,
353 limiter: self.context.memory_limiter.clone(),
354 options: self.options.clone(),
355 policy: self.task_config.cache_policy,
356 remote_rpc_cost: self.get_id_time.clone(),
357 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
358 sstable_writer_factory: writer_factory,
359 _phantom: PhantomData,
360 };
361
362 let mut sst_builder = CapacitySplitTableBuilder::new(
363 builder_factory,
364 self.context.compactor_metrics.clone(),
365 task_progress.clone(),
366 self.task_config.table_vnode_partition.clone(),
367 self.context
368 .storage_opts
369 .compactor_concurrent_uploading_sst_count,
370 compaction_catalog_agent_ref,
371 );
372 let compaction_statistics = compact_and_build_sst(
373 &mut sst_builder,
374 &self.task_config,
375 self.context.compactor_metrics.clone(),
376 iter,
377 compaction_filter,
378 )
379 .instrument_await("compact_and_build_sst".verbose())
380 .await?;
381
382 let ssts = sst_builder
383 .finish()
384 .instrument_await("builder_finish".verbose())
385 .await?;
386
387 Ok((ssts, compaction_statistics))
388 }
389}
390
391#[must_use]
394pub fn start_iceberg_compactor(
395 compactor_context: CompactorContext,
396 hummock_meta_client: Arc<dyn HummockMetaClient>,
397) -> (JoinHandle<()>, Sender<()>) {
398 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
399 let stream_retry_interval = Duration::from_secs(30);
400 let periodic_event_update_interval = Duration::from_millis(
401 compactor_context
402 .storage_opts
403 .iceberg_compaction_pull_interval_ms,
404 );
405 let worker_num = compactor_context.compaction_executor.worker_num();
406
407 let max_task_parallelism: u32 = (worker_num as f32
408 * compactor_context.storage_opts.compactor_max_task_multiplier)
409 .ceil() as u32;
410
411 const MAX_PULL_TASK_COUNT: u32 = 4;
412 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
413
414 assert_ge!(
415 compactor_context.storage_opts.compactor_max_task_multiplier,
416 0.0
417 );
418
419 let join_handle = tokio::spawn(async move {
420 let pending_parallelism_budget = (max_task_parallelism as f32
422 * compactor_context
423 .storage_opts
424 .iceberg_compaction_pending_parallelism_budget_multiplier)
425 .ceil() as u32;
426 let mut task_queue =
427 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
428
429 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
431
432 let (task_completion_tx, mut task_completion_rx) =
434 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
435 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
436 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
439
440 let mut min_interval = tokio::time::interval(stream_retry_interval);
441 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
442
443 let mut log_throttler =
445 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
446
447 'start_stream: loop {
449 let mut pull_task_ack = true;
452 tokio::select! {
453 _ = min_interval.tick() => {},
455 _ = &mut shutdown_rx => {
457 tracing::info!("Compactor is shutting down");
458 return;
459 }
460 }
461
462 let (request_sender, response_event_stream) = match hummock_meta_client
463 .subscribe_iceberg_compaction_event()
464 .await
465 {
466 Ok((request_sender, response_event_stream)) => {
467 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
468 (request_sender, response_event_stream)
469 }
470
471 Err(e) => {
472 tracing::warn!(
473 error = %e.as_report(),
474 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
475 );
476 continue 'start_stream;
477 }
478 };
479
480 if matches!(
481 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
482 ReportSendResult::RestartStream
483 ) {
484 continue 'start_stream;
485 }
486
487 pin_mut!(response_event_stream);
488
489 let _executor = compactor_context.compaction_executor.clone();
490
491 let mut event_loop_iteration_now = Instant::now();
493 'consume_stream: loop {
494 {
495 compactor_context
497 .compactor_metrics
498 .compaction_event_loop_iteration_latency
499 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
500 event_loop_iteration_now = Instant::now();
501 }
502
503 let request_sender = request_sender.clone();
504 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
505 Some(plan_completion) = task_completion_rx.recv() => {
507 let task_key = plan_completion.task_key;
508 let error_message = plan_completion.error_message;
509 tracing::debug!(
510 task_id = task_key.0,
511 plan_index = task_key.1,
512 success = error_message.is_none(),
513 "Plan completed, updating queue state"
514 );
515 task_queue.finish_running(task_key);
516
517 let completed_task_id = task_key.0;
518 let Entry::Occupied(mut tracker_entry) =
519 task_trackers.entry(completed_task_id)
520 else {
521 continue 'consume_stream;
522 };
523 tracker_entry.get_mut().record_completion(error_message);
524 if !tracker_entry.get().is_finished() {
525 continue 'consume_stream;
526 }
527
528 let report = tracker_entry.remove().into_report(completed_task_id);
529 if matches!(
530 send_or_buffer_iceberg_task_report(
531 &request_sender,
532 &mut pending_task_reports,
533 report,
534 ),
535 ReportSendResult::RestartStream
536 ) {
537 continue 'start_stream;
538 }
539 continue 'consume_stream;
540 }
541
542 _ = task_queue.wait_schedulable() => {
544 schedule_queued_tasks(
545 &mut task_queue,
546 &compactor_context,
547 &shutdown_map,
548 &task_completion_tx,
549 );
550 continue 'consume_stream;
551 }
552
553 _ = periodic_event_interval.tick() => {
554 let should_restart_stream = handle_meta_task_pulling(
556 &mut pull_task_ack,
557 &task_queue,
558 max_task_parallelism,
559 max_pull_task_count,
560 &request_sender,
561 &mut log_throttler,
562 );
563
564 if should_restart_stream {
565 continue 'start_stream;
566 }
567 continue;
568 }
569 event = response_event_stream.next() => {
570 event
571 }
572
573 _ = &mut shutdown_rx => {
574 tracing::info!("Iceberg Compactor is shutting down");
575 return
576 }
577 };
578
579 match event {
580 Some(Ok(SubscribeIcebergCompactionEventResponse {
581 event,
582 create_at: _create_at,
583 })) => {
584 let event = match event {
585 Some(event) => event,
586 None => continue 'consume_stream,
587 };
588
589 match event {
590 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
591 let task_id = iceberg_compaction_task.task_id;
592 let sink_id = iceberg_compaction_task.sink_id;
593 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
595 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
596 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
597 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
598 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
599 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
600 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
601 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
602 .enable_prefetch(compactor_context.storage_opts.iceberg_compaction_enable_prefetch)
603 .target_binpack_group_size_mb(
604 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
605 )
606 .min_group_size_mb(
607 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
608 )
609 .min_group_file_count(
610 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
611 )
612 .build() {
613 Ok(config) => config,
614 Err(e) => {
615 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
616 let report = build_iceberg_task_report(
617 task_id,
618 sink_id,
619 Some(format!(
620 "Failed to build iceberg compactor runner config: {}",
621 e.as_report()
622 )),
623 );
624 if matches!(
625 send_or_buffer_iceberg_task_report(
626 &request_sender,
627 &mut pending_task_reports,
628 report,
629 ),
630 ReportSendResult::RestartStream
631 ) {
632 continue 'start_stream;
633 }
634 continue 'consume_stream;
635 }
636 };
637
638 let task_execution = match create_task_execution(
640 iceberg_compaction_task,
641 compactor_runner_config,
642 compactor_context.compactor_metrics.clone(),
643 ).await {
644 Ok(task_execution) => task_execution,
645 Err(e) => {
646 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
647 let report = build_iceberg_task_report(
648 task_id,
649 sink_id,
650 Some(format!(
651 "Failed to create iceberg compaction task execution: {}",
652 e.as_report()
653 )),
654 );
655 if matches!(
656 send_or_buffer_iceberg_task_report(
657 &request_sender,
658 &mut pending_task_reports,
659 report,
660 ),
661 ReportSendResult::RestartStream
662 ) {
663 continue 'start_stream;
664 }
665 continue 'consume_stream;
666 }
667 };
668
669 let sink_id = task_execution.sink_id;
670 let plan_runners = task_execution.plan_runners;
671
672 if plan_runners.is_empty() {
673 tracing::info!(task_id, sink_id, "No plans to execute");
674 let report = build_iceberg_task_report(task_id, sink_id, None);
675 if matches!(
676 send_or_buffer_iceberg_task_report(
677 &request_sender,
678 &mut pending_task_reports,
679 report,
680 ),
681 ReportSendResult::RestartStream
682 ) {
683 continue 'start_stream;
684 }
685 continue 'consume_stream;
686 }
687
688 let total_plans = plan_runners.len();
690 let mut enqueued_count = 0;
691
692 for runner in plan_runners {
693 let meta = runner.to_meta();
694 let plan_index = meta.plan_index;
695 let required_parallelism = runner.required_parallelism();
696 let push_result = task_queue.push(meta, Some(runner));
697
698 match push_result {
699 PushResult::Added => {
700 enqueued_count += 1;
701 tracing::debug!(
702 task_id = task_id,
703 plan_index = plan_index,
704 required_parallelism = required_parallelism,
705 "Iceberg plan runner added to queue"
706 );
707 },
708 PushResult::RejectedCapacity => {
709 tracing::warn!(
710 task_id = task_id,
711 required_parallelism = required_parallelism,
712 pending_budget = pending_parallelism_budget,
713 enqueued_count = enqueued_count,
714 total_plans = total_plans,
715 "Iceberg plan runner rejected - queue capacity exceeded"
716 );
717 break;
719 },
720 PushResult::RejectedTooLarge => {
721 tracing::error!(
722 task_id = task_id,
723 required_parallelism = required_parallelism,
724 max_parallelism = max_task_parallelism,
725 "Iceberg plan runner rejected - parallelism exceeds max"
726 );
727 },
728 PushResult::RejectedInvalidParallelism => {
729 tracing::error!(
730 task_id = task_id,
731 required_parallelism = required_parallelism,
732 "Iceberg plan runner rejected - invalid parallelism"
733 );
734 },
735 PushResult::RejectedDuplicate => {
736 tracing::error!(
737 task_id = task_id,
738 plan_index = plan_index,
739 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
740 );
741 }
742 }
743 }
744
745 if enqueued_count == 0 {
746 let report = build_iceberg_task_report(
747 task_id,
748 sink_id,
749 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
750 );
751 if matches!(
752 send_or_buffer_iceberg_task_report(
753 &request_sender,
754 &mut pending_task_reports,
755 report,
756 ),
757 ReportSendResult::RestartStream
758 ) {
759 continue 'start_stream;
760 }
761 } else {
762 task_trackers.insert(
763 task_id,
764 IcebergTaskTracker::new(sink_id, enqueued_count),
765 );
766 }
767
768 tracing::info!(
769 task_id = task_id,
770 sink_id = sink_id,
771 total_plans = total_plans,
772 enqueued_count = enqueued_count,
773 "Enqueued {} of {} Iceberg plan runners",
774 enqueued_count,
775 total_plans
776 );
777 },
778 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
779 pull_task_ack = true;
781 },
782 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
783 cancel_iceberg_task(
784 cancel_compact_task.task_id,
785 &mut task_queue,
786 &shutdown_map,
787 &mut task_trackers,
788 );
789 },
790 }
791 }
792 Some(Err(e)) => {
793 tracing::warn!("Failed to consume stream. {}", e.message());
794 continue 'start_stream;
795 }
796 _ => {
797 continue 'start_stream;
799 }
800 }
801 }
802 }
803 });
804
805 (join_handle, shutdown_tx)
806}
807
808#[must_use]
811pub fn start_compactor(
812 compactor_context: CompactorContext,
813 hummock_meta_client: Arc<dyn HummockMetaClient>,
814 object_id_manager: Arc<ObjectIdManager>,
815 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
816) -> (JoinHandle<()>, Sender<()>) {
817 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
818 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
819 let stream_retry_interval = Duration::from_secs(30);
820 let task_progress = compactor_context.task_progress_manager.clone();
821 let periodic_event_update_interval = Duration::from_millis(1000);
822
823 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
824 * compactor_context.storage_opts.compactor_max_task_multiplier)
825 .ceil() as u32;
826 let running_task_parallelism = Arc::new(AtomicU32::new(0));
827
828 const MAX_PULL_TASK_COUNT: u32 = 4;
829 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
830
831 assert_ge!(
832 compactor_context.storage_opts.compactor_max_task_multiplier,
833 0.0
834 );
835
836 let join_handle = tokio::spawn(async move {
837 let shutdown_map = CompactionShutdownMap::default();
838 let mut min_interval = tokio::time::interval(stream_retry_interval);
839 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
840
841 let mut log_throttler =
843 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
844
845 'start_stream: loop {
847 let mut pull_task_ack = true;
850 tokio::select! {
851 _ = min_interval.tick() => {},
853 _ = &mut shutdown_rx => {
855 tracing::info!("Compactor is shutting down");
856 return;
857 }
858 }
859
860 let (request_sender, response_event_stream) =
861 match hummock_meta_client.subscribe_compaction_event().await {
862 Ok((request_sender, response_event_stream)) => {
863 tracing::debug!("Succeeded subscribe_compaction_event.");
864 (request_sender, response_event_stream)
865 }
866
867 Err(e) => {
868 tracing::warn!(
869 error = %e.as_report(),
870 "Subscribing to compaction tasks failed with error. Will retry.",
871 );
872 continue 'start_stream;
873 }
874 };
875
876 pin_mut!(response_event_stream);
877
878 let executor = compactor_context.compaction_executor.clone();
879 let object_id_manager = object_id_manager.clone();
880
881 let mut event_loop_iteration_now = Instant::now();
883 'consume_stream: loop {
884 {
885 compactor_context
887 .compactor_metrics
888 .compaction_event_loop_iteration_latency
889 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
890 event_loop_iteration_now = Instant::now();
891 }
892
893 let running_task_parallelism = running_task_parallelism.clone();
894 let request_sender = request_sender.clone();
895 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
896 _ = periodic_event_interval.tick() => {
897 let progress_list = get_task_progress(task_progress.clone());
898
899 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
900 event: Some(RequestEvent::HeartBeat(
901 HeartBeat {
902 progress: progress_list
903 }
904 )),
905 create_at: SystemTime::now()
906 .duration_since(std::time::UNIX_EPOCH)
907 .expect("Clock may have gone backwards")
908 .as_millis() as u64,
909 }) {
910 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
911 continue 'start_stream;
913 }
914
915
916 let mut pending_pull_task_count = 0;
917 if pull_task_ack {
918 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
920
921 if pending_pull_task_count > 0 {
922 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
923 event: Some(RequestEvent::PullTask(
924 PullTask {
925 pull_task_count: pending_pull_task_count,
926 }
927 )),
928 create_at: SystemTime::now()
929 .duration_since(std::time::UNIX_EPOCH)
930 .expect("Clock may have gone backwards")
931 .as_millis() as u64,
932 }) {
933 tracing::warn!(error = %e.as_report(), "Failed to pull task");
934
935 continue 'start_stream;
937 } else {
938 pull_task_ack = false;
939 }
940 }
941 }
942
943 let running_count = running_task_parallelism.load(Ordering::SeqCst);
944 let current_state = CompactionLogState {
945 running_parallelism: running_count,
946 pull_task_ack,
947 pending_pull_task_count,
948 };
949
950 if log_throttler.should_log(¤t_state) {
952 tracing::info!(
953 running_parallelism_count = %current_state.running_parallelism,
954 pull_task_ack = %current_state.pull_task_ack,
955 pending_pull_task_count = %current_state.pending_pull_task_count
956 );
957 log_throttler.update(current_state);
958 }
959
960 continue;
961 }
962 event = response_event_stream.next() => {
963 event
964 }
965
966 _ = &mut shutdown_rx => {
967 tracing::info!("Compactor is shutting down");
968 return
969 }
970 };
971
972 fn send_report_task_event(
973 compact_task: &CompactTask,
974 table_stats: TableStatsMap,
975 object_timestamps: HashMap<HummockSstableObjectId, u64>,
976 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
977 ) {
978 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
979 event: Some(RequestEvent::ReportTask(ReportTask {
980 task_id: compact_task.task_id,
981 task_status: compact_task.task_status.into(),
982 sorted_output_ssts: compact_task
983 .sorted_output_ssts
984 .iter()
985 .map(|sst| sst.into())
986 .collect(),
987 table_stats_change: to_prost_table_stats_map(table_stats),
988 object_timestamps,
989 })),
990 create_at: SystemTime::now()
991 .duration_since(std::time::UNIX_EPOCH)
992 .expect("Clock may have gone backwards")
993 .as_millis() as u64,
994 }) {
995 let task_id = compact_task.task_id;
996 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
997 }
998 }
999
1000 match event {
1001 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
1002 let event = match event {
1003 Some(event) => event,
1004 None => continue 'consume_stream,
1005 };
1006 let shutdown = shutdown_map.clone();
1007 let context = compactor_context.clone();
1008 let consumed_latency_ms = SystemTime::now()
1009 .duration_since(std::time::UNIX_EPOCH)
1010 .expect("Clock may have gone backwards")
1011 .as_millis() as u64
1012 - create_at;
1013 context
1014 .compactor_metrics
1015 .compaction_event_consumed_latency
1016 .observe(consumed_latency_ms as _);
1017
1018 let object_id_manager = object_id_manager.clone();
1019 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1020
1021 match event {
1022 ResponseEvent::CompactTask(compact_task) => {
1023 let compact_task = CompactTask::from(compact_task);
1024 let parallelism =
1025 calculate_task_parallelism(&compact_task, &context);
1026
1027 assert_ne!(parallelism, 0, "splits cannot be empty");
1028
1029 if (max_task_parallelism
1030 - running_task_parallelism.load(Ordering::SeqCst))
1031 < parallelism as u32
1032 {
1033 tracing::warn!(
1034 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1035 compact_task.task_id,
1036 parallelism,
1037 max_task_parallelism,
1038 running_task_parallelism.load(Ordering::Relaxed),
1039 );
1040 let (compact_task, table_stats, object_timestamps) =
1041 compact_done(
1042 compact_task,
1043 context.clone(),
1044 vec![],
1045 TaskStatus::NoAvailCpuResourceCanceled,
1046 );
1047
1048 send_report_task_event(
1049 &compact_task,
1050 table_stats,
1051 object_timestamps,
1052 &request_sender,
1053 );
1054
1055 continue 'consume_stream;
1056 }
1057
1058 running_task_parallelism
1059 .fetch_add(parallelism as u32, Ordering::SeqCst);
1060 executor.spawn(async move {
1061 let (tx, rx) = tokio::sync::oneshot::channel();
1062 let task_id = compact_task.task_id;
1063 shutdown.lock().unwrap().insert(task_id, tx);
1064
1065 let (
1066 (compact_task, table_stats, object_timestamps),
1067 _memory_tracker,
1068 ) = compactor_runner::compact(
1069 context.clone(),
1070 compact_task,
1071 rx,
1072 object_id_manager.clone(),
1073 compaction_catalog_manager_ref.clone(),
1074 )
1075 .await;
1076
1077 shutdown.lock().unwrap().remove(&task_id);
1078 running_task_parallelism
1079 .fetch_sub(parallelism as u32, Ordering::SeqCst);
1080
1081 send_report_task_event(
1082 &compact_task,
1083 table_stats,
1084 object_timestamps,
1085 &request_sender,
1086 );
1087
1088 let enable_check_compaction_result =
1089 context.storage_opts.check_compaction_result;
1090 let need_check_task =
1091 !compact_task.sorted_output_ssts.is_empty()
1092 && compact_task.task_status == TaskStatus::Success;
1093
1094 if enable_check_compaction_result && need_check_task {
1095 let read_table_ids = compact_task
1096 .get_table_ids_from_input_ssts()
1097 .collect::<Vec<_>>();
1098 match compaction_catalog_manager_ref.acquire(read_table_ids).await {
1099 Ok(compaction_catalog_agent_ref) => {
1100 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1101 {
1102 Err(e) => {
1103 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1104 }
1105 Ok(true) => (),
1106 Ok(false) => {
1107 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1108 }
1109 }
1110 },
1111 Err(e) => {
1112 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1113 }
1114 }
1115 }
1116 });
1117 }
1118 #[expect(deprecated)]
1119 ResponseEvent::VacuumTask(_) => {
1120 unreachable!("unexpected vacuum task");
1121 }
1122 #[expect(deprecated)]
1123 ResponseEvent::FullScanTask(_) => {
1124 unreachable!("unexpected scan task");
1125 }
1126 #[expect(deprecated)]
1127 ResponseEvent::ValidationTask(validation_task) => {
1128 let validation_task = ValidationTask::from(validation_task);
1129 executor.spawn(async move {
1130 validate_ssts(validation_task, context.sstable_store.clone())
1131 .await;
1132 });
1133 }
1134 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1135 .lock()
1136 .unwrap()
1137 .remove(&cancel_compact_task.task_id)
1138 {
1139 Some(tx) => {
1140 if tx.send(()).is_err() {
1141 tracing::warn!(
1142 "Cancellation of compaction task failed. task_id: {}",
1143 cancel_compact_task.task_id
1144 );
1145 }
1146 }
1147 _ => {
1148 tracing::warn!(
1149 "Attempting to cancel non-existent compaction task. task_id: {}",
1150 cancel_compact_task.task_id
1151 );
1152 }
1153 },
1154
1155 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1156 pull_task_ack = true;
1158 }
1159 }
1160 }
1161 Some(Err(e)) => {
1162 tracing::warn!("Failed to consume stream. {}", e.message());
1163 continue 'start_stream;
1164 }
1165 _ => {
1166 continue 'start_stream;
1168 }
1169 }
1170 }
1171 }
1172 });
1173
1174 (join_handle, shutdown_tx)
1175}
1176
1177#[must_use]
1180pub fn start_shared_compactor(
1181 grpc_proxy_client: GrpcCompactorProxyClient,
1182 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1183 context: CompactorContext,
1184) -> (JoinHandle<()>, Sender<()>) {
1185 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1186 let task_progress = context.task_progress_manager.clone();
1187 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1188 let periodic_event_update_interval = Duration::from_millis(1000);
1189
1190 let join_handle = tokio::spawn(async move {
1191 let shutdown_map = CompactionShutdownMap::default();
1192
1193 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1194 let executor = context.compaction_executor.clone();
1195 let report_heartbeat_client = grpc_proxy_client.clone();
1196 'consume_stream: loop {
1197 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1198 _ = periodic_event_interval.tick() => {
1199 let progress_list = get_task_progress(task_progress.clone());
1200 let report_compaction_task_request = ReportCompactionTaskRequest{
1201 event: Some(ReportCompactionTaskEvent::HeartBeat(
1202 SharedHeartBeat {
1203 progress: progress_list
1204 }
1205 )),
1206 };
1207 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1208 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1209 }
1210 continue
1211 }
1212
1213
1214 _ = &mut shutdown_rx => {
1215 tracing::info!("Compactor is shutting down");
1216 return
1217 }
1218
1219 request = receiver.recv() => {
1220 request
1221 }
1222
1223 };
1224 match request {
1225 Some(request) => {
1226 let context = context.clone();
1227 let shutdown = shutdown_map.clone();
1228
1229 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1230 executor.spawn(async move {
1231 let DispatchCompactionTaskRequest {
1232 tables,
1233 output_object_ids,
1234 task: dispatch_task,
1235 } = request.into_inner();
1236 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1237 acc.insert(table.id, table);
1238 acc
1239 });
1240
1241 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1242 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1243 let shared_compactor_object_id_manager =
1244 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1245 match dispatch_task.unwrap() {
1246 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1247 let compact_task = CompactTask::from(&compact_task);
1248 let (tx, rx) = tokio::sync::oneshot::channel();
1249 let task_id = compact_task.task_id;
1250 shutdown.lock().unwrap().insert(task_id, tx);
1251
1252 let compaction_catalog_manager_ref =
1253 Arc::new(CompactionCatalogManager::new_preloaded(
1254 table_id_to_catalog,
1255 ));
1256 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1257 context.clone(),
1258 compact_task,
1259 rx,
1260 shared_compactor_object_id_manager,
1261 compaction_catalog_manager_ref.clone(),
1262 )
1263 .await;
1264 shutdown.lock().unwrap().remove(&task_id);
1265 let report_compaction_task_request = ReportCompactionTaskRequest {
1266 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1267 compact_task: Some(PbCompactTask::from(&compact_task)),
1268 table_stats_change: to_prost_table_stats_map(table_stats),
1269 object_timestamps,
1270 })),
1271 };
1272
1273 match cloned_grpc_proxy_client
1274 .report_compaction_task(report_compaction_task_request)
1275 .await
1276 {
1277 Ok(_) => {
1278 let enable_check_compaction_result =
1280 context.storage_opts.check_compaction_result;
1281 let need_check_task =
1282 !compact_task.sorted_output_ssts.is_empty()
1283 && compact_task.task_status
1284 == TaskStatus::Success;
1285 if enable_check_compaction_result && need_check_task {
1286 let read_table_ids = compact_task
1287 .get_table_ids_from_input_ssts()
1288 .collect::<Vec<_>>();
1289 match compaction_catalog_manager_ref.acquire(read_table_ids).await {
1290 Ok(compaction_catalog_agent_ref) => {
1291 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1292 {
1293 Err(e) => {
1294 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1295 }
1296 Ok(true) => (),
1297 Ok(false) => {
1298 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1299 }
1300 }
1301 }
1302 Err(e) => {
1303 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1304 }
1305 }
1306 }
1307 }
1308 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1309 }
1310
1311 }
1312 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1313 unreachable!("unexpected vacuum task");
1314 }
1315 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1316 unreachable!("unexpected scan task");
1317 }
1318 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1319 let validation_task = ValidationTask::from(validation_task);
1320 validate_ssts(validation_task, context.sstable_store.clone()).await;
1321 }
1322 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1323 match shutdown
1324 .lock()
1325 .unwrap()
1326 .remove(&cancel_compact_task.task_id)
1327 { Some(tx) => {
1328 if tx.send(()).is_err() {
1329 tracing::warn!(
1330 "Cancellation of compaction task failed. task_id: {}",
1331 cancel_compact_task.task_id
1332 );
1333 }
1334 } _ => {
1335 tracing::warn!(
1336 "Attempting to cancel non-existent compaction task. task_id: {}",
1337 cancel_compact_task.task_id
1338 );
1339 }}
1340 }
1341 }
1342 });
1343 }
1344 None => continue 'consume_stream,
1345 }
1346 }
1347 });
1348 (join_handle, shutdown_tx)
1349}
1350
1351fn get_task_progress(
1352 task_progress: Arc<
1353 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1354 >,
1355) -> Vec<CompactTaskProgress> {
1356 let mut progress_list = Vec::new();
1357 for (&task_id, progress) in &*task_progress.lock() {
1358 progress_list.push(progress.snapshot(task_id));
1359 }
1360 progress_list
1361}
1362
1363fn schedule_queued_tasks(
1365 task_queue: &mut IcebergTaskQueue,
1366 compactor_context: &CompactorContext,
1367 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1368 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1369) {
1370 while let Some(popped_task) = task_queue.pop() {
1371 let task_id = popped_task.meta.task_id;
1372 let plan_index = popped_task.meta.plan_index;
1373 let task_key = (task_id, plan_index);
1374
1375 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1377
1378 let Some(runner) = popped_task.runner else {
1379 tracing::error!(
1380 task_id = task_id,
1381 plan_index = plan_index,
1382 "Popped task missing runner - this should not happen"
1383 );
1384 task_queue.finish_running(task_key);
1385 continue;
1386 };
1387
1388 let executor = compactor_context.compaction_executor.clone();
1389 let shutdown_map_clone = shutdown_map.clone();
1390 let completion_tx_clone = task_completion_tx.clone();
1391 let (tx, rx) = tokio::sync::oneshot::channel();
1392
1393 {
1394 let mut shutdown_guard = shutdown_map.lock().unwrap();
1395 shutdown_guard.insert(task_key, tx);
1396 }
1397
1398 tracing::info!(
1399 task_id = task_id,
1400 plan_index = plan_index,
1401 unique_ident = ?unique_ident,
1402 required_parallelism = popped_task.meta.required_parallelism,
1403 "Starting iceberg compaction task from queue"
1404 );
1405
1406 executor.spawn(async move {
1407 let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1408 let mut shutdown_guard = shutdown_map.lock().unwrap();
1409 shutdown_guard.remove(&task_key);
1410 });
1411
1412 let result = Box::pin(runner.compact(rx)).await;
1413
1414 let completion = match result {
1415 Ok(_) => IcebergPlanCompletion {
1416 task_key,
1417 error_message: None,
1418 },
1419 Err(e) => {
1420 if is_cancelled_iceberg_compaction_error(&e) {
1421 tracing::info!(
1422 task_id = task_key.0,
1423 plan_index = task_key.1,
1424 "Iceberg compaction plan cancelled"
1425 );
1426 } else {
1427 tracing::warn!(
1428 error = %e.as_report(),
1429 task_id = task_key.0,
1430 plan_index = task_key.1,
1431 "Failed to compact iceberg runner"
1432 );
1433 }
1434 IcebergPlanCompletion {
1435 task_key,
1436 error_message: Some(e.to_report_string()),
1437 }
1438 }
1439 };
1440
1441 if completion_tx_clone.send(completion).is_err() {
1442 tracing::warn!(
1443 task_id = task_key.0,
1444 plan_index = task_key.1,
1445 "Failed to notify task completion - main loop may have shut down"
1446 );
1447 }
1448 });
1449 }
1450}
1451
1452fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1453 matches!(
1454 error.inner(),
1455 HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1456 )
1457}
1458
1459fn cancel_iceberg_task(
1460 task_id: u64,
1461 task_queue: &mut IcebergTaskQueue,
1462 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1463 task_trackers: &mut HashMap<u64, IcebergTaskTracker>,
1464) {
1465 let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1470 let removed_tracker = task_trackers.remove(&task_id).is_some();
1471
1472 let cancelled_running = {
1473 let mut shutdown_guard = shutdown_map.lock().unwrap();
1474 let task_keys: Vec<_> = shutdown_guard
1475 .keys()
1476 .filter(|(running_task_id, _)| *running_task_id == task_id)
1477 .copied()
1478 .collect();
1479
1480 for task_key in &task_keys {
1481 if let Some(tx) = shutdown_guard.remove(task_key)
1482 && tx.send(()).is_err()
1483 {
1484 tracing::debug!(
1485 task_id = task_key.0,
1486 plan_index = task_key.1,
1487 "Iceberg compaction plan shutdown receiver already closed during cancellation"
1488 );
1489 }
1490 }
1491
1492 task_keys.len()
1493 };
1494
1495 if cancelled_waiting == 0 && cancelled_running == 0 && !removed_tracker {
1496 tracing::warn!(
1497 task_id = task_id,
1498 "Attempting to cancel non-existent iceberg compaction task"
1499 );
1500 } else {
1501 tracing::info!(
1502 task_id = task_id,
1503 cancelled_waiting = cancelled_waiting,
1504 cancelled_running = cancelled_running,
1505 removed_tracker = removed_tracker,
1506 "Cancelled iceberg compaction task"
1507 );
1508 }
1509}
1510
1511fn handle_meta_task_pulling(
1514 pull_task_ack: &mut bool,
1515 task_queue: &IcebergTaskQueue,
1516 max_task_parallelism: u32,
1517 max_pull_task_count: u32,
1518 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1519 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1520) -> bool {
1521 let mut pending_pull_task_count = 0;
1522 if *pull_task_ack {
1523 let current_running_parallelism = task_queue.running_parallelism_sum();
1525 pending_pull_task_count =
1526 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1527
1528 if pending_pull_task_count > 0 {
1529 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1530 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1531 subscribe_iceberg_compaction_event_request::PullTask {
1532 pull_task_count: pending_pull_task_count,
1533 },
1534 )),
1535 create_at: SystemTime::now()
1536 .duration_since(std::time::UNIX_EPOCH)
1537 .expect("Clock may have gone backwards")
1538 .as_millis() as u64,
1539 }) {
1540 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1541 return true; } else {
1543 *pull_task_ack = false;
1544 }
1545 }
1546 }
1547
1548 let running_count = task_queue.running_parallelism_sum();
1549 let waiting_count = task_queue.waiting_parallelism_sum();
1550 let available_count = max_task_parallelism.saturating_sub(running_count);
1551 let current_state = IcebergCompactionLogState {
1552 running_parallelism: running_count,
1553 waiting_parallelism: waiting_count,
1554 available_parallelism: available_count,
1555 pull_task_ack: *pull_task_ack,
1556 pending_pull_task_count,
1557 };
1558
1559 if log_throttler.should_log(¤t_state) {
1561 tracing::info!(
1562 running_parallelism_count = %current_state.running_parallelism,
1563 waiting_parallelism_count = %current_state.waiting_parallelism,
1564 available_parallelism = %current_state.available_parallelism,
1565 pull_task_ack = %current_state.pull_task_ack,
1566 pending_pull_task_count = %current_state.pending_pull_task_count
1567 );
1568 log_throttler.update(current_state);
1569 }
1570
1571 false }
1573
1574#[cfg(test)]
1575mod tests {
1576 use super::*;
1577
1578 #[test]
1579 fn test_cancel_iceberg_task_removes_waiting_plans_and_tracker() {
1580 let task_id = 42;
1581 let mut task_queue = IcebergTaskQueue::new(10, 30);
1582 assert_eq!(
1583 task_queue.push(
1584 iceberg_compaction::IcebergTaskMeta {
1585 task_id,
1586 plan_index: 0,
1587 required_parallelism: 3,
1588 },
1589 None,
1590 ),
1591 PushResult::Added
1592 );
1593 assert_eq!(
1594 task_queue.push(
1595 iceberg_compaction::IcebergTaskMeta {
1596 task_id,
1597 plan_index: 1,
1598 required_parallelism: 4,
1599 },
1600 None,
1601 ),
1602 PushResult::Added
1603 );
1604 assert_eq!(task_queue.waiting_parallelism_sum(), 7);
1605
1606 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1607 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 2))]);
1608
1609 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1610
1611 assert_eq!(task_queue.waiting_parallelism_sum(), 0);
1612 assert!(!task_trackers.contains_key(&task_id));
1613 }
1614
1615 #[test]
1616 fn test_cancel_iceberg_task_shuts_down_running_plans_and_tracker() {
1617 let task_id = 43;
1618 let task_key = (task_id, 0);
1619 let mut task_queue = IcebergTaskQueue::new(10, 30);
1620 let shutdown_map = Arc::new(Mutex::new(HashMap::new()));
1621 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1622 shutdown_map.lock().unwrap().insert(task_key, shutdown_tx);
1623 let mut task_trackers = HashMap::from([(task_id, IcebergTaskTracker::new(10, 1))]);
1624
1625 cancel_iceberg_task(task_id, &mut task_queue, &shutdown_map, &mut task_trackers);
1626
1627 assert!(shutdown_rx.try_recv().is_ok());
1628 assert!(shutdown_map.lock().unwrap().is_empty());
1629 assert!(!task_trackers.contains_key(&task_id));
1630 }
1631
1632 #[test]
1633 fn test_log_state_equality() {
1634 let state1 = CompactionLogState {
1636 running_parallelism: 10,
1637 pull_task_ack: true,
1638 pending_pull_task_count: 2,
1639 };
1640 let state2 = CompactionLogState {
1641 running_parallelism: 10,
1642 pull_task_ack: true,
1643 pending_pull_task_count: 2,
1644 };
1645 let state3 = CompactionLogState {
1646 running_parallelism: 11,
1647 pull_task_ack: true,
1648 pending_pull_task_count: 2,
1649 };
1650 assert_eq!(state1, state2);
1651 assert_ne!(state1, state3);
1652
1653 let ice_state1 = IcebergCompactionLogState {
1655 running_parallelism: 10,
1656 waiting_parallelism: 5,
1657 available_parallelism: 15,
1658 pull_task_ack: true,
1659 pending_pull_task_count: 2,
1660 };
1661 let ice_state2 = IcebergCompactionLogState {
1662 running_parallelism: 10,
1663 waiting_parallelism: 6,
1664 available_parallelism: 15,
1665 pull_task_ack: true,
1666 pending_pull_task_count: 2,
1667 };
1668 assert_ne!(ice_state1, ice_state2);
1669 }
1670
1671 #[test]
1672 fn test_log_throttler_state_change_detection() {
1673 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1674 let state1 = CompactionLogState {
1675 running_parallelism: 10,
1676 pull_task_ack: true,
1677 pending_pull_task_count: 2,
1678 };
1679 let state2 = CompactionLogState {
1680 running_parallelism: 11,
1681 pull_task_ack: true,
1682 pending_pull_task_count: 2,
1683 };
1684
1685 assert!(throttler.should_log(&state1));
1687 throttler.update(state1.clone());
1688
1689 assert!(!throttler.should_log(&state1));
1691
1692 assert!(throttler.should_log(&state2));
1694 throttler.update(state2.clone());
1695
1696 assert!(!throttler.should_log(&state2));
1698 }
1699
1700 #[test]
1701 fn test_log_throttler_heartbeat() {
1702 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1703 let state = CompactionLogState {
1704 running_parallelism: 10,
1705 pull_task_ack: true,
1706 pending_pull_task_count: 2,
1707 };
1708
1709 assert!(throttler.should_log(&state));
1711 throttler.update(state.clone());
1712
1713 assert!(!throttler.should_log(&state));
1715
1716 std::thread::sleep(Duration::from_millis(15));
1718
1719 assert!(throttler.should_log(&state));
1721 }
1722}