1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
78 SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::{compact, merge_imms_in_memory};
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
94};
95use crate::compaction_catalog_manager::{
96 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
97};
98use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
99use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
100use crate::hummock::compactor::iceberg_compaction::TaskKey;
101use crate::hummock::iterator::{Forward, HummockIterator};
102use crate::hummock::{
103 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
104 UnifiedSstableWriterFactory, validate_ssts,
105};
106use crate::monitor::CompactorMetrics;
107
108const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
110
111#[derive(Debug, Clone, PartialEq, Eq)]
113struct CompactionLogState {
114 running_parallelism: u32,
115 pull_task_ack: bool,
116 pending_pull_task_count: u32,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
121struct IcebergCompactionLogState {
122 running_parallelism: u32,
123 waiting_parallelism: u32,
124 available_parallelism: u32,
125 pull_task_ack: bool,
126 pending_pull_task_count: u32,
127}
128
129struct LogThrottler<T: PartialEq> {
131 last_logged_state: Option<T>,
132 last_heartbeat: Instant,
133 heartbeat_interval: Duration,
134}
135
136impl<T: PartialEq> LogThrottler<T> {
137 fn new(heartbeat_interval: Duration) -> Self {
138 Self {
139 last_logged_state: None,
140 last_heartbeat: Instant::now(),
141 heartbeat_interval,
142 }
143 }
144
145 fn should_log(&self, current_state: &T) -> bool {
147 self.last_logged_state.as_ref() != Some(current_state)
148 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
149 }
150
151 fn update(&mut self, current_state: T) {
153 self.last_logged_state = Some(current_state);
154 self.last_heartbeat = Instant::now();
155 }
156}
157
158pub struct Compactor {
160 context: CompactorContext,
162 object_id_getter: Arc<dyn GetObjectId>,
163 task_config: TaskConfig,
164 options: SstableBuilderOptions,
165 get_id_time: Arc<AtomicU64>,
166}
167
168pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
169
170impl Compactor {
171 pub fn new(
173 context: CompactorContext,
174 options: SstableBuilderOptions,
175 task_config: TaskConfig,
176 object_id_getter: Arc<dyn GetObjectId>,
177 ) -> Self {
178 Self {
179 context,
180 options,
181 task_config,
182 get_id_time: Arc::new(AtomicU64::new(0)),
183 object_id_getter,
184 }
185 }
186
187 async fn compact_key_range(
192 &self,
193 iter: impl HummockIterator<Direction = Forward>,
194 compaction_filter: impl CompactionFilter,
195 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
196 task_progress: Option<Arc<TaskProgress>>,
197 task_id: Option<HummockCompactionTaskId>,
198 split_index: Option<usize>,
199 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
200 let compact_timer = if self.context.is_share_buffer_compact {
202 self.context
203 .compactor_metrics
204 .write_build_l0_sst_duration
205 .start_timer()
206 } else {
207 self.context
208 .compactor_metrics
209 .compact_sst_duration
210 .start_timer()
211 };
212
213 let (split_table_outputs, table_stats_map) = {
214 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
215 if self.task_config.use_block_based_filter {
216 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
217 factory,
218 iter,
219 compaction_filter,
220 compaction_catalog_agent_ref,
221 task_progress.clone(),
222 self.object_id_getter.clone(),
223 )
224 .instrument_await("compact".verbose())
225 .await?
226 } else {
227 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
228 factory,
229 iter,
230 compaction_filter,
231 compaction_catalog_agent_ref,
232 task_progress.clone(),
233 self.object_id_getter.clone(),
234 )
235 .instrument_await("compact".verbose())
236 .await?
237 }
238 };
239
240 compact_timer.observe_duration();
241
242 Self::report_progress(
243 self.context.compactor_metrics.clone(),
244 task_progress,
245 &split_table_outputs,
246 self.context.is_share_buffer_compact,
247 );
248
249 self.context
250 .compactor_metrics
251 .get_table_id_total_time_duration
252 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
253
254 debug_assert!(
255 split_table_outputs
256 .iter()
257 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
258 );
259
260 if task_id.is_some() {
261 tracing::info!(
263 "Finish Task {:?} split_index {:?} sst count {}",
264 task_id,
265 split_index,
266 split_table_outputs.len()
267 );
268 }
269 Ok((split_table_outputs, table_stats_map))
270 }
271
272 pub fn report_progress(
273 metrics: Arc<CompactorMetrics>,
274 task_progress: Option<Arc<TaskProgress>>,
275 ssts: &Vec<LocalSstableInfo>,
276 is_share_buffer_compact: bool,
277 ) {
278 for sst_info in ssts {
279 let sst_size = sst_info.file_size();
280 if let Some(tracker) = &task_progress {
281 tracker.inc_ssts_uploaded();
282 tracker.dec_num_pending_write_io();
283 }
284 if is_share_buffer_compact {
285 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
286 } else {
287 metrics.compaction_upload_sst_counts.inc();
288 }
289 }
290 }
291
292 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
293 &self,
294 writer_factory: F,
295 iter: impl HummockIterator<Direction = Forward>,
296 compaction_filter: impl CompactionFilter,
297 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
298 task_progress: Option<Arc<TaskProgress>>,
299 object_id_getter: Arc<dyn GetObjectId>,
300 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
301 let builder_factory = RemoteBuilderFactory::<F, B> {
302 object_id_getter,
303 limiter: self.context.memory_limiter.clone(),
304 options: self.options.clone(),
305 policy: self.task_config.cache_policy,
306 remote_rpc_cost: self.get_id_time.clone(),
307 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
308 sstable_writer_factory: writer_factory,
309 _phantom: PhantomData,
310 };
311
312 let mut sst_builder = CapacitySplitTableBuilder::new(
313 builder_factory,
314 self.context.compactor_metrics.clone(),
315 task_progress.clone(),
316 self.task_config.table_vnode_partition.clone(),
317 self.context
318 .storage_opts
319 .compactor_concurrent_uploading_sst_count,
320 compaction_catalog_agent_ref,
321 );
322 let compaction_statistics = compact_and_build_sst(
323 &mut sst_builder,
324 &self.task_config,
325 self.context.compactor_metrics.clone(),
326 iter,
327 compaction_filter,
328 )
329 .instrument_await("compact_and_build_sst".verbose())
330 .await?;
331
332 let ssts = sst_builder
333 .finish()
334 .instrument_await("builder_finish".verbose())
335 .await?;
336
337 Ok((ssts, compaction_statistics))
338 }
339}
340
341#[must_use]
344pub fn start_iceberg_compactor(
345 compactor_context: CompactorContext,
346 hummock_meta_client: Arc<dyn HummockMetaClient>,
347) -> (JoinHandle<()>, Sender<()>) {
348 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
349 let stream_retry_interval = Duration::from_secs(30);
350 let periodic_event_update_interval = Duration::from_millis(
351 compactor_context
352 .storage_opts
353 .iceberg_compaction_pull_interval_ms,
354 );
355 let worker_num = compactor_context.compaction_executor.worker_num();
356
357 let max_task_parallelism: u32 = (worker_num as f32
358 * compactor_context.storage_opts.compactor_max_task_multiplier)
359 .ceil() as u32;
360
361 const MAX_PULL_TASK_COUNT: u32 = 4;
362 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
363
364 assert_ge!(
365 compactor_context.storage_opts.compactor_max_task_multiplier,
366 0.0
367 );
368
369 let join_handle = tokio::spawn(async move {
370 let pending_parallelism_budget = (max_task_parallelism as f32
372 * compactor_context
373 .storage_opts
374 .iceberg_compaction_pending_parallelism_budget_multiplier)
375 .ceil() as u32;
376 let mut task_queue =
377 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
378
379 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
381
382 let (task_completion_tx, mut task_completion_rx) =
384 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
385 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
386 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
389
390 let mut min_interval = tokio::time::interval(stream_retry_interval);
391 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
392
393 let mut log_throttler =
395 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
396
397 'start_stream: loop {
399 let mut pull_task_ack = true;
402 tokio::select! {
403 _ = min_interval.tick() => {},
405 _ = &mut shutdown_rx => {
407 tracing::info!("Compactor is shutting down");
408 return;
409 }
410 }
411
412 let (request_sender, response_event_stream) = match hummock_meta_client
413 .subscribe_iceberg_compaction_event()
414 .await
415 {
416 Ok((request_sender, response_event_stream)) => {
417 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
418 (request_sender, response_event_stream)
419 }
420
421 Err(e) => {
422 tracing::warn!(
423 error = %e.as_report(),
424 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
425 );
426 continue 'start_stream;
427 }
428 };
429
430 if matches!(
431 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
432 ReportSendResult::RestartStream
433 ) {
434 continue 'start_stream;
435 }
436
437 pin_mut!(response_event_stream);
438
439 let _executor = compactor_context.compaction_executor.clone();
440
441 let mut event_loop_iteration_now = Instant::now();
443 'consume_stream: loop {
444 {
445 compactor_context
447 .compactor_metrics
448 .compaction_event_loop_iteration_latency
449 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
450 event_loop_iteration_now = Instant::now();
451 }
452
453 let request_sender = request_sender.clone();
454 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
455 Some(plan_completion) = task_completion_rx.recv() => {
457 let task_key = plan_completion.task_key;
458 let error_message = plan_completion.error_message;
459 tracing::debug!(
460 task_id = task_key.0,
461 plan_index = task_key.1,
462 success = error_message.is_none(),
463 "Plan completed, updating queue state"
464 );
465 task_queue.finish_running(task_key);
466
467 let completed_task_id = task_key.0;
468 let Entry::Occupied(mut tracker_entry) =
469 task_trackers.entry(completed_task_id)
470 else {
471 continue 'consume_stream;
472 };
473 tracker_entry.get_mut().record_completion(error_message);
474 if !tracker_entry.get().is_finished() {
475 continue 'consume_stream;
476 }
477
478 let report = tracker_entry.remove().into_report(completed_task_id);
479 if matches!(
480 send_or_buffer_iceberg_task_report(
481 &request_sender,
482 &mut pending_task_reports,
483 report,
484 ),
485 ReportSendResult::RestartStream
486 ) {
487 continue 'start_stream;
488 }
489 continue 'consume_stream;
490 }
491
492 _ = task_queue.wait_schedulable() => {
494 schedule_queued_tasks(
495 &mut task_queue,
496 &compactor_context,
497 &shutdown_map,
498 &task_completion_tx,
499 );
500 continue 'consume_stream;
501 }
502
503 _ = periodic_event_interval.tick() => {
504 let should_restart_stream = handle_meta_task_pulling(
506 &mut pull_task_ack,
507 &task_queue,
508 max_task_parallelism,
509 max_pull_task_count,
510 &request_sender,
511 &mut log_throttler,
512 );
513
514 if should_restart_stream {
515 continue 'start_stream;
516 }
517 continue;
518 }
519 event = response_event_stream.next() => {
520 event
521 }
522
523 _ = &mut shutdown_rx => {
524 tracing::info!("Iceberg Compactor is shutting down");
525 return
526 }
527 };
528
529 match event {
530 Some(Ok(SubscribeIcebergCompactionEventResponse {
531 event,
532 create_at: _create_at,
533 })) => {
534 let event = match event {
535 Some(event) => event,
536 None => continue 'consume_stream,
537 };
538
539 match event {
540 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
541 let task_id = iceberg_compaction_task.task_id;
542 let sink_id = iceberg_compaction_task.sink_id;
543 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
545 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
546 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
547 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
548 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
549 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
550 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
551 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
552 .target_binpack_group_size_mb(
553 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
554 )
555 .min_group_size_mb(
556 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
557 )
558 .min_group_file_count(
559 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
560 )
561 .build() {
562 Ok(config) => config,
563 Err(e) => {
564 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
565 let report = build_iceberg_task_report(
566 task_id,
567 sink_id,
568 Some(format!(
569 "Failed to build iceberg compactor runner config: {}",
570 e.as_report()
571 )),
572 );
573 if matches!(
574 send_or_buffer_iceberg_task_report(
575 &request_sender,
576 &mut pending_task_reports,
577 report,
578 ),
579 ReportSendResult::RestartStream
580 ) {
581 continue 'start_stream;
582 }
583 continue 'consume_stream;
584 }
585 };
586
587 let task_execution = match create_task_execution(
589 iceberg_compaction_task,
590 compactor_runner_config,
591 compactor_context.compactor_metrics.clone(),
592 ).await {
593 Ok(task_execution) => task_execution,
594 Err(e) => {
595 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
596 let report = build_iceberg_task_report(
597 task_id,
598 sink_id,
599 Some(format!(
600 "Failed to create iceberg compaction task execution: {}",
601 e.as_report()
602 )),
603 );
604 if matches!(
605 send_or_buffer_iceberg_task_report(
606 &request_sender,
607 &mut pending_task_reports,
608 report,
609 ),
610 ReportSendResult::RestartStream
611 ) {
612 continue 'start_stream;
613 }
614 continue 'consume_stream;
615 }
616 };
617
618 let sink_id = task_execution.sink_id;
619 let plan_runners = task_execution.plan_runners;
620
621 if plan_runners.is_empty() {
622 tracing::info!(task_id, sink_id, "No plans to execute");
623 let report = build_iceberg_task_report(task_id, sink_id, None);
624 if matches!(
625 send_or_buffer_iceberg_task_report(
626 &request_sender,
627 &mut pending_task_reports,
628 report,
629 ),
630 ReportSendResult::RestartStream
631 ) {
632 continue 'start_stream;
633 }
634 continue 'consume_stream;
635 }
636
637 let total_plans = plan_runners.len();
639 let mut enqueued_count = 0;
640
641 for runner in plan_runners {
642 let meta = runner.to_meta();
643 let plan_index = meta.plan_index;
644 let required_parallelism = runner.required_parallelism();
645 let push_result = task_queue.push(meta, Some(runner));
646
647 match push_result {
648 PushResult::Added => {
649 enqueued_count += 1;
650 tracing::debug!(
651 task_id = task_id,
652 plan_index = plan_index,
653 required_parallelism = required_parallelism,
654 "Iceberg plan runner added to queue"
655 );
656 },
657 PushResult::RejectedCapacity => {
658 tracing::warn!(
659 task_id = task_id,
660 required_parallelism = required_parallelism,
661 pending_budget = pending_parallelism_budget,
662 enqueued_count = enqueued_count,
663 total_plans = total_plans,
664 "Iceberg plan runner rejected - queue capacity exceeded"
665 );
666 break;
668 },
669 PushResult::RejectedTooLarge => {
670 tracing::error!(
671 task_id = task_id,
672 required_parallelism = required_parallelism,
673 max_parallelism = max_task_parallelism,
674 "Iceberg plan runner rejected - parallelism exceeds max"
675 );
676 },
677 PushResult::RejectedInvalidParallelism => {
678 tracing::error!(
679 task_id = task_id,
680 required_parallelism = required_parallelism,
681 "Iceberg plan runner rejected - invalid parallelism"
682 );
683 },
684 PushResult::RejectedDuplicate => {
685 tracing::error!(
686 task_id = task_id,
687 plan_index = plan_index,
688 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
689 );
690 }
691 }
692 }
693
694 if enqueued_count == 0 {
695 let report = build_iceberg_task_report(
696 task_id,
697 sink_id,
698 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
699 );
700 if matches!(
701 send_or_buffer_iceberg_task_report(
702 &request_sender,
703 &mut pending_task_reports,
704 report,
705 ),
706 ReportSendResult::RestartStream
707 ) {
708 continue 'start_stream;
709 }
710 } else {
711 task_trackers.insert(
712 task_id,
713 IcebergTaskTracker::new(sink_id, enqueued_count),
714 );
715 }
716
717 tracing::info!(
718 task_id = task_id,
719 sink_id = sink_id,
720 total_plans = total_plans,
721 enqueued_count = enqueued_count,
722 "Enqueued {} of {} Iceberg plan runners",
723 enqueued_count,
724 total_plans
725 );
726 },
727 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
728 pull_task_ack = true;
730 },
731 }
732 }
733 Some(Err(e)) => {
734 tracing::warn!("Failed to consume stream. {}", e.message());
735 continue 'start_stream;
736 }
737 _ => {
738 continue 'start_stream;
740 }
741 }
742 }
743 }
744 });
745
746 (join_handle, shutdown_tx)
747}
748
749#[must_use]
752pub fn start_compactor(
753 compactor_context: CompactorContext,
754 hummock_meta_client: Arc<dyn HummockMetaClient>,
755 object_id_manager: Arc<ObjectIdManager>,
756 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
757) -> (JoinHandle<()>, Sender<()>) {
758 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
759 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
760 let stream_retry_interval = Duration::from_secs(30);
761 let task_progress = compactor_context.task_progress_manager.clone();
762 let periodic_event_update_interval = Duration::from_millis(1000);
763
764 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
765 * compactor_context.storage_opts.compactor_max_task_multiplier)
766 .ceil() as u32;
767 let running_task_parallelism = Arc::new(AtomicU32::new(0));
768
769 const MAX_PULL_TASK_COUNT: u32 = 4;
770 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
771
772 assert_ge!(
773 compactor_context.storage_opts.compactor_max_task_multiplier,
774 0.0
775 );
776
777 let join_handle = tokio::spawn(async move {
778 let shutdown_map = CompactionShutdownMap::default();
779 let mut min_interval = tokio::time::interval(stream_retry_interval);
780 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
781
782 let mut log_throttler =
784 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
785
786 'start_stream: loop {
788 let mut pull_task_ack = true;
791 tokio::select! {
792 _ = min_interval.tick() => {},
794 _ = &mut shutdown_rx => {
796 tracing::info!("Compactor is shutting down");
797 return;
798 }
799 }
800
801 let (request_sender, response_event_stream) =
802 match hummock_meta_client.subscribe_compaction_event().await {
803 Ok((request_sender, response_event_stream)) => {
804 tracing::debug!("Succeeded subscribe_compaction_event.");
805 (request_sender, response_event_stream)
806 }
807
808 Err(e) => {
809 tracing::warn!(
810 error = %e.as_report(),
811 "Subscribing to compaction tasks failed with error. Will retry.",
812 );
813 continue 'start_stream;
814 }
815 };
816
817 pin_mut!(response_event_stream);
818
819 let executor = compactor_context.compaction_executor.clone();
820 let object_id_manager = object_id_manager.clone();
821
822 let mut event_loop_iteration_now = Instant::now();
824 'consume_stream: loop {
825 {
826 compactor_context
828 .compactor_metrics
829 .compaction_event_loop_iteration_latency
830 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
831 event_loop_iteration_now = Instant::now();
832 }
833
834 let running_task_parallelism = running_task_parallelism.clone();
835 let request_sender = request_sender.clone();
836 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
837 _ = periodic_event_interval.tick() => {
838 let progress_list = get_task_progress(task_progress.clone());
839
840 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
841 event: Some(RequestEvent::HeartBeat(
842 HeartBeat {
843 progress: progress_list
844 }
845 )),
846 create_at: SystemTime::now()
847 .duration_since(std::time::UNIX_EPOCH)
848 .expect("Clock may have gone backwards")
849 .as_millis() as u64,
850 }) {
851 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
852 continue 'start_stream;
854 }
855
856
857 let mut pending_pull_task_count = 0;
858 if pull_task_ack {
859 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
861
862 if pending_pull_task_count > 0 {
863 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
864 event: Some(RequestEvent::PullTask(
865 PullTask {
866 pull_task_count: pending_pull_task_count,
867 }
868 )),
869 create_at: SystemTime::now()
870 .duration_since(std::time::UNIX_EPOCH)
871 .expect("Clock may have gone backwards")
872 .as_millis() as u64,
873 }) {
874 tracing::warn!(error = %e.as_report(), "Failed to pull task");
875
876 continue 'start_stream;
878 } else {
879 pull_task_ack = false;
880 }
881 }
882 }
883
884 let running_count = running_task_parallelism.load(Ordering::SeqCst);
885 let current_state = CompactionLogState {
886 running_parallelism: running_count,
887 pull_task_ack,
888 pending_pull_task_count,
889 };
890
891 if log_throttler.should_log(¤t_state) {
893 tracing::info!(
894 running_parallelism_count = %current_state.running_parallelism,
895 pull_task_ack = %current_state.pull_task_ack,
896 pending_pull_task_count = %current_state.pending_pull_task_count
897 );
898 log_throttler.update(current_state);
899 }
900
901 continue;
902 }
903 event = response_event_stream.next() => {
904 event
905 }
906
907 _ = &mut shutdown_rx => {
908 tracing::info!("Compactor is shutting down");
909 return
910 }
911 };
912
913 fn send_report_task_event(
914 compact_task: &CompactTask,
915 table_stats: TableStatsMap,
916 object_timestamps: HashMap<HummockSstableObjectId, u64>,
917 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
918 ) {
919 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
920 event: Some(RequestEvent::ReportTask(ReportTask {
921 task_id: compact_task.task_id,
922 task_status: compact_task.task_status.into(),
923 sorted_output_ssts: compact_task
924 .sorted_output_ssts
925 .iter()
926 .map(|sst| sst.into())
927 .collect(),
928 table_stats_change: to_prost_table_stats_map(table_stats),
929 object_timestamps,
930 })),
931 create_at: SystemTime::now()
932 .duration_since(std::time::UNIX_EPOCH)
933 .expect("Clock may have gone backwards")
934 .as_millis() as u64,
935 }) {
936 let task_id = compact_task.task_id;
937 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
938 }
939 }
940
941 match event {
942 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
943 let event = match event {
944 Some(event) => event,
945 None => continue 'consume_stream,
946 };
947 let shutdown = shutdown_map.clone();
948 let context = compactor_context.clone();
949 let consumed_latency_ms = SystemTime::now()
950 .duration_since(std::time::UNIX_EPOCH)
951 .expect("Clock may have gone backwards")
952 .as_millis() as u64
953 - create_at;
954 context
955 .compactor_metrics
956 .compaction_event_consumed_latency
957 .observe(consumed_latency_ms as _);
958
959 let object_id_manager = object_id_manager.clone();
960 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
961
962 match event {
963 ResponseEvent::CompactTask(compact_task) => {
964 let compact_task = CompactTask::from(compact_task);
965 let parallelism =
966 calculate_task_parallelism(&compact_task, &context);
967
968 assert_ne!(parallelism, 0, "splits cannot be empty");
969
970 if (max_task_parallelism
971 - running_task_parallelism.load(Ordering::SeqCst))
972 < parallelism as u32
973 {
974 tracing::warn!(
975 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
976 compact_task.task_id,
977 parallelism,
978 max_task_parallelism,
979 running_task_parallelism.load(Ordering::Relaxed),
980 );
981 let (compact_task, table_stats, object_timestamps) =
982 compact_done(
983 compact_task,
984 context.clone(),
985 vec![],
986 TaskStatus::NoAvailCpuResourceCanceled,
987 );
988
989 send_report_task_event(
990 &compact_task,
991 table_stats,
992 object_timestamps,
993 &request_sender,
994 );
995
996 continue 'consume_stream;
997 }
998
999 running_task_parallelism
1000 .fetch_add(parallelism as u32, Ordering::SeqCst);
1001 executor.spawn(async move {
1002 let (tx, rx) = tokio::sync::oneshot::channel();
1003 let task_id = compact_task.task_id;
1004 shutdown.lock().unwrap().insert(task_id, tx);
1005
1006 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1007 context.clone(),
1008 compact_task,
1009 rx,
1010 object_id_manager.clone(),
1011 compaction_catalog_manager_ref.clone(),
1012 )
1013 .await;
1014
1015 shutdown.lock().unwrap().remove(&task_id);
1016 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1017
1018 send_report_task_event(
1019 &compact_task,
1020 table_stats,
1021 object_timestamps,
1022 &request_sender,
1023 );
1024
1025 let enable_check_compaction_result =
1026 context.storage_opts.check_compaction_result;
1027 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1028
1029 if enable_check_compaction_result && need_check_task {
1030 let read_table_ids = compact_task
1031 .get_table_ids_from_input_ssts()
1032 .collect::<Vec<_>>();
1033 match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1034 Ok(compaction_catalog_agent_ref) => {
1035 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1036 {
1037 Err(e) => {
1038 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1039 }
1040 Ok(true) => (),
1041 Ok(false) => {
1042 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1043 }
1044 }
1045 },
1046 Err(e) => {
1047 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1048 }
1049 }
1050 }
1051 });
1052 }
1053 #[expect(deprecated)]
1054 ResponseEvent::VacuumTask(_) => {
1055 unreachable!("unexpected vacuum task");
1056 }
1057 #[expect(deprecated)]
1058 ResponseEvent::FullScanTask(_) => {
1059 unreachable!("unexpected scan task");
1060 }
1061 #[expect(deprecated)]
1062 ResponseEvent::ValidationTask(validation_task) => {
1063 let validation_task = ValidationTask::from(validation_task);
1064 executor.spawn(async move {
1065 validate_ssts(validation_task, context.sstable_store.clone())
1066 .await;
1067 });
1068 }
1069 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1070 .lock()
1071 .unwrap()
1072 .remove(&cancel_compact_task.task_id)
1073 {
1074 Some(tx) => {
1075 if tx.send(()).is_err() {
1076 tracing::warn!(
1077 "Cancellation of compaction task failed. task_id: {}",
1078 cancel_compact_task.task_id
1079 );
1080 }
1081 }
1082 _ => {
1083 tracing::warn!(
1084 "Attempting to cancel non-existent compaction task. task_id: {}",
1085 cancel_compact_task.task_id
1086 );
1087 }
1088 },
1089
1090 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1091 pull_task_ack = true;
1093 }
1094 }
1095 }
1096 Some(Err(e)) => {
1097 tracing::warn!("Failed to consume stream. {}", e.message());
1098 continue 'start_stream;
1099 }
1100 _ => {
1101 continue 'start_stream;
1103 }
1104 }
1105 }
1106 }
1107 });
1108
1109 (join_handle, shutdown_tx)
1110}
1111
1112#[must_use]
1115pub fn start_shared_compactor(
1116 grpc_proxy_client: GrpcCompactorProxyClient,
1117 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1118 context: CompactorContext,
1119) -> (JoinHandle<()>, Sender<()>) {
1120 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1121 let task_progress = context.task_progress_manager.clone();
1122 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1123 let periodic_event_update_interval = Duration::from_millis(1000);
1124
1125 let join_handle = tokio::spawn(async move {
1126 let shutdown_map = CompactionShutdownMap::default();
1127
1128 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1129 let executor = context.compaction_executor.clone();
1130 let report_heartbeat_client = grpc_proxy_client.clone();
1131 'consume_stream: loop {
1132 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1133 _ = periodic_event_interval.tick() => {
1134 let progress_list = get_task_progress(task_progress.clone());
1135 let report_compaction_task_request = ReportCompactionTaskRequest{
1136 event: Some(ReportCompactionTaskEvent::HeartBeat(
1137 SharedHeartBeat {
1138 progress: progress_list
1139 }
1140 )),
1141 };
1142 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1143 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1144 }
1145 continue
1146 }
1147
1148
1149 _ = &mut shutdown_rx => {
1150 tracing::info!("Compactor is shutting down");
1151 return
1152 }
1153
1154 request = receiver.recv() => {
1155 request
1156 }
1157
1158 };
1159 match request {
1160 Some(request) => {
1161 let context = context.clone();
1162 let shutdown = shutdown_map.clone();
1163
1164 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1165 executor.spawn(async move {
1166 let DispatchCompactionTaskRequest {
1167 tables,
1168 output_object_ids,
1169 task: dispatch_task,
1170 } = request.into_inner();
1171 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1172 acc.insert(table.id, table);
1173 acc
1174 });
1175
1176 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1177 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1178 let shared_compactor_object_id_manager =
1179 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1180 match dispatch_task.unwrap() {
1181 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1182 let compact_task = CompactTask::from(&compact_task);
1183 let (tx, rx) = tokio::sync::oneshot::channel();
1184 let task_id = compact_task.task_id;
1185 shutdown.lock().unwrap().insert(task_id, tx);
1186
1187 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1188 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1189 context.clone(),
1190 compact_task,
1191 rx,
1192 shared_compactor_object_id_manager,
1193 compaction_catalog_agent_ref.clone(),
1194 )
1195 .await;
1196 shutdown.lock().unwrap().remove(&task_id);
1197 let report_compaction_task_request = ReportCompactionTaskRequest {
1198 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1199 compact_task: Some(PbCompactTask::from(&compact_task)),
1200 table_stats_change: to_prost_table_stats_map(table_stats),
1201 object_timestamps,
1202 })),
1203 };
1204
1205 match cloned_grpc_proxy_client
1206 .report_compaction_task(report_compaction_task_request)
1207 .await
1208 {
1209 Ok(_) => {
1210 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1212 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1213 if enable_check_compaction_result && need_check_task {
1214 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1215 Err(e) => {
1216 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1217 },
1218 Ok(true) => (),
1219 Ok(false) => {
1220 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1221 }
1222 }
1223 }
1224 }
1225 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1226 }
1227
1228 }
1229 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1230 unreachable!("unexpected vacuum task");
1231 }
1232 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1233 unreachable!("unexpected scan task");
1234 }
1235 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1236 let validation_task = ValidationTask::from(validation_task);
1237 validate_ssts(validation_task, context.sstable_store.clone()).await;
1238 }
1239 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1240 match shutdown
1241 .lock()
1242 .unwrap()
1243 .remove(&cancel_compact_task.task_id)
1244 { Some(tx) => {
1245 if tx.send(()).is_err() {
1246 tracing::warn!(
1247 "Cancellation of compaction task failed. task_id: {}",
1248 cancel_compact_task.task_id
1249 );
1250 }
1251 } _ => {
1252 tracing::warn!(
1253 "Attempting to cancel non-existent compaction task. task_id: {}",
1254 cancel_compact_task.task_id
1255 );
1256 }}
1257 }
1258 }
1259 });
1260 }
1261 None => continue 'consume_stream,
1262 }
1263 }
1264 });
1265 (join_handle, shutdown_tx)
1266}
1267
1268fn get_task_progress(
1269 task_progress: Arc<
1270 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1271 >,
1272) -> Vec<CompactTaskProgress> {
1273 let mut progress_list = Vec::new();
1274 for (&task_id, progress) in &*task_progress.lock() {
1275 progress_list.push(progress.snapshot(task_id));
1276 }
1277 progress_list
1278}
1279
1280fn schedule_queued_tasks(
1282 task_queue: &mut IcebergTaskQueue,
1283 compactor_context: &CompactorContext,
1284 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1285 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1286) {
1287 while let Some(popped_task) = task_queue.pop() {
1288 let task_id = popped_task.meta.task_id;
1289 let plan_index = popped_task.meta.plan_index;
1290 let task_key = (task_id, plan_index);
1291
1292 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1294
1295 let Some(runner) = popped_task.runner else {
1296 tracing::error!(
1297 task_id = task_id,
1298 plan_index = plan_index,
1299 "Popped task missing runner - this should not happen"
1300 );
1301 task_queue.finish_running(task_key);
1302 continue;
1303 };
1304
1305 let executor = compactor_context.compaction_executor.clone();
1306 let shutdown_map_clone = shutdown_map.clone();
1307 let completion_tx_clone = task_completion_tx.clone();
1308
1309 tracing::info!(
1310 task_id = task_id,
1311 plan_index = plan_index,
1312 unique_ident = ?unique_ident,
1313 required_parallelism = popped_task.meta.required_parallelism,
1314 "Starting iceberg compaction task from queue"
1315 );
1316
1317 executor.spawn(async move {
1318 let (tx, rx) = tokio::sync::oneshot::channel();
1319 {
1320 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1321 shutdown_guard.insert(task_key, tx);
1322 }
1323 let _cleanup_guard = scopeguard::guard(shutdown_map_clone.clone(), move |shutdown_map| {
1324 let mut shutdown_guard = shutdown_map.lock().unwrap();
1325 shutdown_guard.remove(&task_key);
1326 });
1327
1328 let result = Box::pin(runner.compact(rx)).await;
1329
1330 let completion = match result {
1331 Ok(_) => IcebergPlanCompletion {
1332 task_key,
1333 error_message: None,
1334 },
1335 Err(e) => {
1336 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1337 IcebergPlanCompletion {
1338 task_key,
1339 error_message: Some(e.to_report_string()),
1340 }
1341 }
1342 };
1343
1344 if completion_tx_clone.send(completion).is_err() {
1345 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1346 }
1347 });
1348 }
1349}
1350
1351fn handle_meta_task_pulling(
1354 pull_task_ack: &mut bool,
1355 task_queue: &IcebergTaskQueue,
1356 max_task_parallelism: u32,
1357 max_pull_task_count: u32,
1358 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1359 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1360) -> bool {
1361 let mut pending_pull_task_count = 0;
1362 if *pull_task_ack {
1363 let current_running_parallelism = task_queue.running_parallelism_sum();
1365 pending_pull_task_count =
1366 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1367
1368 if pending_pull_task_count > 0 {
1369 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1370 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1371 subscribe_iceberg_compaction_event_request::PullTask {
1372 pull_task_count: pending_pull_task_count,
1373 },
1374 )),
1375 create_at: SystemTime::now()
1376 .duration_since(std::time::UNIX_EPOCH)
1377 .expect("Clock may have gone backwards")
1378 .as_millis() as u64,
1379 }) {
1380 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1381 return true; } else {
1383 *pull_task_ack = false;
1384 }
1385 }
1386 }
1387
1388 let running_count = task_queue.running_parallelism_sum();
1389 let waiting_count = task_queue.waiting_parallelism_sum();
1390 let available_count = max_task_parallelism.saturating_sub(running_count);
1391 let current_state = IcebergCompactionLogState {
1392 running_parallelism: running_count,
1393 waiting_parallelism: waiting_count,
1394 available_parallelism: available_count,
1395 pull_task_ack: *pull_task_ack,
1396 pending_pull_task_count,
1397 };
1398
1399 if log_throttler.should_log(¤t_state) {
1401 tracing::info!(
1402 running_parallelism_count = %current_state.running_parallelism,
1403 waiting_parallelism_count = %current_state.waiting_parallelism,
1404 available_parallelism = %current_state.available_parallelism,
1405 pull_task_ack = %current_state.pull_task_ack,
1406 pending_pull_task_count = %current_state.pending_pull_task_count
1407 );
1408 log_throttler.update(current_state);
1409 }
1410
1411 false }
1413
1414#[cfg(test)]
1415mod tests {
1416 use super::*;
1417
1418 #[test]
1419 fn test_log_state_equality() {
1420 let state1 = CompactionLogState {
1422 running_parallelism: 10,
1423 pull_task_ack: true,
1424 pending_pull_task_count: 2,
1425 };
1426 let state2 = CompactionLogState {
1427 running_parallelism: 10,
1428 pull_task_ack: true,
1429 pending_pull_task_count: 2,
1430 };
1431 let state3 = CompactionLogState {
1432 running_parallelism: 11,
1433 pull_task_ack: true,
1434 pending_pull_task_count: 2,
1435 };
1436 assert_eq!(state1, state2);
1437 assert_ne!(state1, state3);
1438
1439 let ice_state1 = IcebergCompactionLogState {
1441 running_parallelism: 10,
1442 waiting_parallelism: 5,
1443 available_parallelism: 15,
1444 pull_task_ack: true,
1445 pending_pull_task_count: 2,
1446 };
1447 let ice_state2 = IcebergCompactionLogState {
1448 running_parallelism: 10,
1449 waiting_parallelism: 6,
1450 available_parallelism: 15,
1451 pull_task_ack: true,
1452 pending_pull_task_count: 2,
1453 };
1454 assert_ne!(ice_state1, ice_state2);
1455 }
1456
1457 #[test]
1458 fn test_log_throttler_state_change_detection() {
1459 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1460 let state1 = CompactionLogState {
1461 running_parallelism: 10,
1462 pull_task_ack: true,
1463 pending_pull_task_count: 2,
1464 };
1465 let state2 = CompactionLogState {
1466 running_parallelism: 11,
1467 pull_task_ack: true,
1468 pending_pull_task_count: 2,
1469 };
1470
1471 assert!(throttler.should_log(&state1));
1473 throttler.update(state1.clone());
1474
1475 assert!(!throttler.should_log(&state1));
1477
1478 assert!(throttler.should_log(&state2));
1480 throttler.update(state2.clone());
1481
1482 assert!(!throttler.should_log(&state2));
1484 }
1485
1486 #[test]
1487 fn test_log_throttler_heartbeat() {
1488 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1489 let state = CompactionLogState {
1490 running_parallelism: 10,
1491 pull_task_ack: true,
1492 pending_pull_task_count: 2,
1493 };
1494
1495 assert!(throttler.should_log(&state));
1497 throttler.update(state.clone());
1498
1499 assert!(!throttler.should_log(&state));
1501
1502 std::thread::sleep(Duration::from_millis(15));
1504
1505 assert!(throttler.should_log(&state));
1507 }
1508}