1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, StateCleanUpCompactionFilter,
53 TtlCompactionFilter,
54};
55pub use context::{
56 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
57};
58use futures::{StreamExt, pin_mut};
59use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
61use iceberg_compaction::{
62 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
63 ReportSendResult, build_iceberg_task_report, create_task_execution,
64 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
65};
66pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
67use more_asserts::assert_ge;
68use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
69use risingwave_hummock_sdk::{
70 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
71};
72use risingwave_pb::hummock::compact_task::TaskStatus;
73use risingwave_pb::hummock::subscribe_compaction_event_request::{
74 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
75};
76use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
77use risingwave_pb::hummock::{
78 CompactTaskProgress, ReportCompactionTaskRequest, SubscribeCompactionEventRequest,
79 SubscribeCompactionEventResponse,
80};
81use risingwave_rpc_client::HummockMetaClient;
82pub use shared_buffer_compact::{compact, merge_imms_in_memory};
83use tokio::sync::oneshot::Sender;
84use tokio::task::JoinHandle;
85use tokio::time::Instant;
86
87pub use self::compaction_utils::{
88 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
89 check_flush_result,
90};
91pub use self::task_progress::TaskProgress;
92use super::multi_builder::CapacitySplitTableBuilder;
93use super::{
94 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory,
105 UnifiedSstableWriterFactory, validate_ssts,
106};
107use crate::monitor::CompactorMetrics;
108
109const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
111
112#[derive(Debug, Clone, PartialEq, Eq)]
114struct CompactionLogState {
115 running_parallelism: u32,
116 pull_task_ack: bool,
117 pending_pull_task_count: u32,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122struct IcebergCompactionLogState {
123 running_parallelism: u32,
124 waiting_parallelism: u32,
125 available_parallelism: u32,
126 pull_task_ack: bool,
127 pending_pull_task_count: u32,
128}
129
130struct LogThrottler<T: PartialEq> {
132 last_logged_state: Option<T>,
133 last_heartbeat: Instant,
134 heartbeat_interval: Duration,
135}
136
137impl<T: PartialEq> LogThrottler<T> {
138 fn new(heartbeat_interval: Duration) -> Self {
139 Self {
140 last_logged_state: None,
141 last_heartbeat: Instant::now(),
142 heartbeat_interval,
143 }
144 }
145
146 fn should_log(&self, current_state: &T) -> bool {
148 self.last_logged_state.as_ref() != Some(current_state)
149 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
150 }
151
152 fn update(&mut self, current_state: T) {
154 self.last_logged_state = Some(current_state);
155 self.last_heartbeat = Instant::now();
156 }
157}
158
159pub struct Compactor {
161 context: CompactorContext,
163 object_id_getter: Arc<dyn GetObjectId>,
164 task_config: TaskConfig,
165 options: SstableBuilderOptions,
166 get_id_time: Arc<AtomicU64>,
167}
168
169pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
170
171impl Compactor {
172 pub fn new(
174 context: CompactorContext,
175 options: SstableBuilderOptions,
176 task_config: TaskConfig,
177 object_id_getter: Arc<dyn GetObjectId>,
178 ) -> Self {
179 Self {
180 context,
181 options,
182 task_config,
183 get_id_time: Arc::new(AtomicU64::new(0)),
184 object_id_getter,
185 }
186 }
187
188 async fn compact_key_range(
193 &self,
194 iter: impl HummockIterator<Direction = Forward>,
195 compaction_filter: impl CompactionFilter,
196 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
197 task_progress: Option<Arc<TaskProgress>>,
198 task_id: Option<HummockCompactionTaskId>,
199 split_index: Option<usize>,
200 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
201 let compact_timer = if self.context.is_share_buffer_compact {
203 self.context
204 .compactor_metrics
205 .write_build_l0_sst_duration
206 .start_timer()
207 } else {
208 self.context
209 .compactor_metrics
210 .compact_sst_duration
211 .start_timer()
212 };
213
214 let (split_table_outputs, table_stats_map) = {
215 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
216 if self.task_config.use_block_based_filter {
217 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
218 factory,
219 iter,
220 compaction_filter,
221 compaction_catalog_agent_ref,
222 task_progress.clone(),
223 self.object_id_getter.clone(),
224 )
225 .instrument_await("compact".verbose())
226 .await?
227 } else {
228 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
229 factory,
230 iter,
231 compaction_filter,
232 compaction_catalog_agent_ref,
233 task_progress.clone(),
234 self.object_id_getter.clone(),
235 )
236 .instrument_await("compact".verbose())
237 .await?
238 }
239 };
240
241 compact_timer.observe_duration();
242
243 Self::report_progress(
244 self.context.compactor_metrics.clone(),
245 task_progress,
246 &split_table_outputs,
247 self.context.is_share_buffer_compact,
248 );
249
250 self.context
251 .compactor_metrics
252 .get_table_id_total_time_duration
253 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
254
255 debug_assert!(
256 split_table_outputs
257 .iter()
258 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
259 );
260
261 if task_id.is_some() {
262 tracing::info!(
264 "Finish Task {:?} split_index {:?} sst count {}",
265 task_id,
266 split_index,
267 split_table_outputs.len()
268 );
269 }
270 Ok((split_table_outputs, table_stats_map))
271 }
272
273 pub fn report_progress(
274 metrics: Arc<CompactorMetrics>,
275 task_progress: Option<Arc<TaskProgress>>,
276 ssts: &Vec<LocalSstableInfo>,
277 is_share_buffer_compact: bool,
278 ) {
279 for sst_info in ssts {
280 let sst_size = sst_info.file_size();
281 if let Some(tracker) = &task_progress {
282 tracker.inc_ssts_uploaded();
283 tracker.dec_num_pending_write_io();
284 }
285 if is_share_buffer_compact {
286 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
287 } else {
288 metrics.compaction_upload_sst_counts.inc();
289 }
290 }
291 }
292
293 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
294 &self,
295 writer_factory: F,
296 iter: impl HummockIterator<Direction = Forward>,
297 compaction_filter: impl CompactionFilter,
298 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
299 task_progress: Option<Arc<TaskProgress>>,
300 object_id_getter: Arc<dyn GetObjectId>,
301 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
302 let builder_factory = RemoteBuilderFactory::<F, B> {
303 object_id_getter,
304 limiter: self.context.memory_limiter.clone(),
305 options: self.options.clone(),
306 policy: self.task_config.cache_policy,
307 remote_rpc_cost: self.get_id_time.clone(),
308 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
309 sstable_writer_factory: writer_factory,
310 _phantom: PhantomData,
311 };
312
313 let mut sst_builder = CapacitySplitTableBuilder::new(
314 builder_factory,
315 self.context.compactor_metrics.clone(),
316 task_progress.clone(),
317 self.task_config.table_vnode_partition.clone(),
318 self.context
319 .storage_opts
320 .compactor_concurrent_uploading_sst_count,
321 compaction_catalog_agent_ref,
322 );
323 let compaction_statistics = compact_and_build_sst(
324 &mut sst_builder,
325 &self.task_config,
326 self.context.compactor_metrics.clone(),
327 iter,
328 compaction_filter,
329 )
330 .instrument_await("compact_and_build_sst".verbose())
331 .await?;
332
333 let ssts = sst_builder
334 .finish()
335 .instrument_await("builder_finish".verbose())
336 .await?;
337
338 Ok((ssts, compaction_statistics))
339 }
340}
341
342#[must_use]
345pub fn start_iceberg_compactor(
346 compactor_context: CompactorContext,
347 hummock_meta_client: Arc<dyn HummockMetaClient>,
348) -> (JoinHandle<()>, Sender<()>) {
349 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
350 let stream_retry_interval = Duration::from_secs(30);
351 let periodic_event_update_interval = Duration::from_millis(
352 compactor_context
353 .storage_opts
354 .iceberg_compaction_pull_interval_ms,
355 );
356 let worker_num = compactor_context.compaction_executor.worker_num();
357
358 let max_task_parallelism: u32 = (worker_num as f32
359 * compactor_context.storage_opts.compactor_max_task_multiplier)
360 .ceil() as u32;
361
362 const MAX_PULL_TASK_COUNT: u32 = 4;
363 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
364
365 assert_ge!(
366 compactor_context.storage_opts.compactor_max_task_multiplier,
367 0.0
368 );
369
370 let join_handle = tokio::spawn(async move {
371 let pending_parallelism_budget = (max_task_parallelism as f32
373 * compactor_context
374 .storage_opts
375 .iceberg_compaction_pending_parallelism_budget_multiplier)
376 .ceil() as u32;
377 let mut task_queue =
378 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
379
380 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
382
383 let (task_completion_tx, mut task_completion_rx) =
385 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
386 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
387 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
390
391 let mut min_interval = tokio::time::interval(stream_retry_interval);
392 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
393
394 let mut log_throttler =
396 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
397
398 'start_stream: loop {
400 let mut pull_task_ack = true;
403 tokio::select! {
404 _ = min_interval.tick() => {},
406 _ = &mut shutdown_rx => {
408 tracing::info!("Compactor is shutting down");
409 return;
410 }
411 }
412
413 let (request_sender, response_event_stream) = match hummock_meta_client
414 .subscribe_iceberg_compaction_event()
415 .await
416 {
417 Ok((request_sender, response_event_stream)) => {
418 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
419 (request_sender, response_event_stream)
420 }
421
422 Err(e) => {
423 tracing::warn!(
424 error = %e.as_report(),
425 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
426 );
427 continue 'start_stream;
428 }
429 };
430
431 if matches!(
432 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
433 ReportSendResult::RestartStream
434 ) {
435 continue 'start_stream;
436 }
437
438 pin_mut!(response_event_stream);
439
440 let _executor = compactor_context.compaction_executor.clone();
441
442 let mut event_loop_iteration_now = Instant::now();
444 'consume_stream: loop {
445 {
446 compactor_context
448 .compactor_metrics
449 .compaction_event_loop_iteration_latency
450 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
451 event_loop_iteration_now = Instant::now();
452 }
453
454 let request_sender = request_sender.clone();
455 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
456 Some(plan_completion) = task_completion_rx.recv() => {
458 let task_key = plan_completion.task_key;
459 let error_message = plan_completion.error_message;
460 tracing::debug!(
461 task_id = task_key.0,
462 plan_index = task_key.1,
463 success = error_message.is_none(),
464 "Plan completed, updating queue state"
465 );
466 task_queue.finish_running(task_key);
467
468 let completed_task_id = task_key.0;
469 let Entry::Occupied(mut tracker_entry) =
470 task_trackers.entry(completed_task_id)
471 else {
472 continue 'consume_stream;
473 };
474 tracker_entry.get_mut().record_completion(error_message);
475 if !tracker_entry.get().is_finished() {
476 continue 'consume_stream;
477 }
478
479 let report = tracker_entry.remove().into_report(completed_task_id);
480 if matches!(
481 send_or_buffer_iceberg_task_report(
482 &request_sender,
483 &mut pending_task_reports,
484 report,
485 ),
486 ReportSendResult::RestartStream
487 ) {
488 continue 'start_stream;
489 }
490 continue 'consume_stream;
491 }
492
493 _ = task_queue.wait_schedulable() => {
495 schedule_queued_tasks(
496 &mut task_queue,
497 &compactor_context,
498 &shutdown_map,
499 &task_completion_tx,
500 );
501 continue 'consume_stream;
502 }
503
504 _ = periodic_event_interval.tick() => {
505 let should_restart_stream = handle_meta_task_pulling(
507 &mut pull_task_ack,
508 &task_queue,
509 max_task_parallelism,
510 max_pull_task_count,
511 &request_sender,
512 &mut log_throttler,
513 );
514
515 if should_restart_stream {
516 continue 'start_stream;
517 }
518 continue;
519 }
520 event = response_event_stream.next() => {
521 event
522 }
523
524 _ = &mut shutdown_rx => {
525 tracing::info!("Iceberg Compactor is shutting down");
526 return
527 }
528 };
529
530 match event {
531 Some(Ok(SubscribeIcebergCompactionEventResponse {
532 event,
533 create_at: _create_at,
534 })) => {
535 let event = match event {
536 Some(event) => event,
537 None => continue 'consume_stream,
538 };
539
540 match event {
541 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
542 let task_id = iceberg_compaction_task.task_id;
543 let sink_id = iceberg_compaction_task.sink_id;
544 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
546 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
547 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
548 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
549 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
550 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
551 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
552 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
553 .target_binpack_group_size_mb(
554 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
555 )
556 .min_group_size_mb(
557 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
558 )
559 .min_group_file_count(
560 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
561 )
562 .build() {
563 Ok(config) => config,
564 Err(e) => {
565 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
566 let report = build_iceberg_task_report(
567 task_id,
568 sink_id,
569 Some(format!(
570 "Failed to build iceberg compactor runner config: {}",
571 e.as_report()
572 )),
573 );
574 if matches!(
575 send_or_buffer_iceberg_task_report(
576 &request_sender,
577 &mut pending_task_reports,
578 report,
579 ),
580 ReportSendResult::RestartStream
581 ) {
582 continue 'start_stream;
583 }
584 continue 'consume_stream;
585 }
586 };
587
588 let task_execution = match create_task_execution(
590 iceberg_compaction_task,
591 compactor_runner_config,
592 compactor_context.compactor_metrics.clone(),
593 ).await {
594 Ok(task_execution) => task_execution,
595 Err(e) => {
596 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
597 let report = build_iceberg_task_report(
598 task_id,
599 sink_id,
600 Some(format!(
601 "Failed to create iceberg compaction task execution: {}",
602 e.as_report()
603 )),
604 );
605 if matches!(
606 send_or_buffer_iceberg_task_report(
607 &request_sender,
608 &mut pending_task_reports,
609 report,
610 ),
611 ReportSendResult::RestartStream
612 ) {
613 continue 'start_stream;
614 }
615 continue 'consume_stream;
616 }
617 };
618
619 let sink_id = task_execution.sink_id;
620 let plan_runners = task_execution.plan_runners;
621
622 if plan_runners.is_empty() {
623 tracing::info!(task_id, sink_id, "No plans to execute");
624 let report = build_iceberg_task_report(task_id, sink_id, None);
625 if matches!(
626 send_or_buffer_iceberg_task_report(
627 &request_sender,
628 &mut pending_task_reports,
629 report,
630 ),
631 ReportSendResult::RestartStream
632 ) {
633 continue 'start_stream;
634 }
635 continue 'consume_stream;
636 }
637
638 let total_plans = plan_runners.len();
640 let mut enqueued_count = 0;
641
642 for runner in plan_runners {
643 let meta = runner.to_meta();
644 let plan_index = meta.plan_index;
645 let required_parallelism = runner.required_parallelism();
646 let push_result = task_queue.push(meta, Some(runner));
647
648 match push_result {
649 PushResult::Added => {
650 enqueued_count += 1;
651 tracing::debug!(
652 task_id = task_id,
653 plan_index = plan_index,
654 required_parallelism = required_parallelism,
655 "Iceberg plan runner added to queue"
656 );
657 },
658 PushResult::RejectedCapacity => {
659 tracing::warn!(
660 task_id = task_id,
661 required_parallelism = required_parallelism,
662 pending_budget = pending_parallelism_budget,
663 enqueued_count = enqueued_count,
664 total_plans = total_plans,
665 "Iceberg plan runner rejected - queue capacity exceeded"
666 );
667 break;
669 },
670 PushResult::RejectedTooLarge => {
671 tracing::error!(
672 task_id = task_id,
673 required_parallelism = required_parallelism,
674 max_parallelism = max_task_parallelism,
675 "Iceberg plan runner rejected - parallelism exceeds max"
676 );
677 },
678 PushResult::RejectedInvalidParallelism => {
679 tracing::error!(
680 task_id = task_id,
681 required_parallelism = required_parallelism,
682 "Iceberg plan runner rejected - invalid parallelism"
683 );
684 },
685 PushResult::RejectedDuplicate => {
686 tracing::error!(
687 task_id = task_id,
688 plan_index = plan_index,
689 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
690 );
691 }
692 }
693 }
694
695 if enqueued_count == 0 {
696 let report = build_iceberg_task_report(
697 task_id,
698 sink_id,
699 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
700 );
701 if matches!(
702 send_or_buffer_iceberg_task_report(
703 &request_sender,
704 &mut pending_task_reports,
705 report,
706 ),
707 ReportSendResult::RestartStream
708 ) {
709 continue 'start_stream;
710 }
711 } else {
712 task_trackers.insert(
713 task_id,
714 IcebergTaskTracker::new(sink_id, enqueued_count),
715 );
716 }
717
718 tracing::info!(
719 task_id = task_id,
720 sink_id = sink_id,
721 total_plans = total_plans,
722 enqueued_count = enqueued_count,
723 "Enqueued {} of {} Iceberg plan runners",
724 enqueued_count,
725 total_plans
726 );
727 },
728 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
729 pull_task_ack = true;
731 },
732 }
733 }
734 Some(Err(e)) => {
735 tracing::warn!("Failed to consume stream. {}", e.message());
736 continue 'start_stream;
737 }
738 _ => {
739 continue 'start_stream;
741 }
742 }
743 }
744 }
745 });
746
747 (join_handle, shutdown_tx)
748}
749
750#[must_use]
753pub fn start_compactor(
754 compactor_context: CompactorContext,
755 hummock_meta_client: Arc<dyn HummockMetaClient>,
756 object_id_manager: Arc<ObjectIdManager>,
757 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
758) -> (JoinHandle<()>, Sender<()>) {
759 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
760 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
761 let stream_retry_interval = Duration::from_secs(30);
762 let task_progress = compactor_context.task_progress_manager.clone();
763 let periodic_event_update_interval = Duration::from_millis(1000);
764
765 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
766 * compactor_context.storage_opts.compactor_max_task_multiplier)
767 .ceil() as u32;
768 let running_task_parallelism = Arc::new(AtomicU32::new(0));
769
770 const MAX_PULL_TASK_COUNT: u32 = 4;
771 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
772
773 assert_ge!(
774 compactor_context.storage_opts.compactor_max_task_multiplier,
775 0.0
776 );
777
778 let join_handle = tokio::spawn(async move {
779 let shutdown_map = CompactionShutdownMap::default();
780 let mut min_interval = tokio::time::interval(stream_retry_interval);
781 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
782
783 let mut log_throttler =
785 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
786
787 'start_stream: loop {
789 let mut pull_task_ack = true;
792 tokio::select! {
793 _ = min_interval.tick() => {},
795 _ = &mut shutdown_rx => {
797 tracing::info!("Compactor is shutting down");
798 return;
799 }
800 }
801
802 let (request_sender, response_event_stream) =
803 match hummock_meta_client.subscribe_compaction_event().await {
804 Ok((request_sender, response_event_stream)) => {
805 tracing::debug!("Succeeded subscribe_compaction_event.");
806 (request_sender, response_event_stream)
807 }
808
809 Err(e) => {
810 tracing::warn!(
811 error = %e.as_report(),
812 "Subscribing to compaction tasks failed with error. Will retry.",
813 );
814 continue 'start_stream;
815 }
816 };
817
818 pin_mut!(response_event_stream);
819
820 let executor = compactor_context.compaction_executor.clone();
821 let object_id_manager = object_id_manager.clone();
822
823 let mut event_loop_iteration_now = Instant::now();
825 'consume_stream: loop {
826 {
827 compactor_context
829 .compactor_metrics
830 .compaction_event_loop_iteration_latency
831 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
832 event_loop_iteration_now = Instant::now();
833 }
834
835 let running_task_parallelism = running_task_parallelism.clone();
836 let request_sender = request_sender.clone();
837 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
838 _ = periodic_event_interval.tick() => {
839 let progress_list = get_task_progress(task_progress.clone());
840
841 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
842 event: Some(RequestEvent::HeartBeat(
843 HeartBeat {
844 progress: progress_list
845 }
846 )),
847 create_at: SystemTime::now()
848 .duration_since(std::time::UNIX_EPOCH)
849 .expect("Clock may have gone backwards")
850 .as_millis() as u64,
851 }) {
852 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
853 continue 'start_stream;
855 }
856
857
858 let mut pending_pull_task_count = 0;
859 if pull_task_ack {
860 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
862
863 if pending_pull_task_count > 0 {
864 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
865 event: Some(RequestEvent::PullTask(
866 PullTask {
867 pull_task_count: pending_pull_task_count,
868 }
869 )),
870 create_at: SystemTime::now()
871 .duration_since(std::time::UNIX_EPOCH)
872 .expect("Clock may have gone backwards")
873 .as_millis() as u64,
874 }) {
875 tracing::warn!(error = %e.as_report(), "Failed to pull task");
876
877 continue 'start_stream;
879 } else {
880 pull_task_ack = false;
881 }
882 }
883 }
884
885 let running_count = running_task_parallelism.load(Ordering::SeqCst);
886 let current_state = CompactionLogState {
887 running_parallelism: running_count,
888 pull_task_ack,
889 pending_pull_task_count,
890 };
891
892 if log_throttler.should_log(¤t_state) {
894 tracing::info!(
895 running_parallelism_count = %current_state.running_parallelism,
896 pull_task_ack = %current_state.pull_task_ack,
897 pending_pull_task_count = %current_state.pending_pull_task_count
898 );
899 log_throttler.update(current_state);
900 }
901
902 continue;
903 }
904 event = response_event_stream.next() => {
905 event
906 }
907
908 _ = &mut shutdown_rx => {
909 tracing::info!("Compactor is shutting down");
910 return
911 }
912 };
913
914 fn send_report_task_event(
915 compact_task: &CompactTask,
916 table_stats: TableStatsMap,
917 object_timestamps: HashMap<HummockSstableObjectId, u64>,
918 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
919 ) {
920 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
921 event: Some(RequestEvent::ReportTask(ReportTask {
922 task_id: compact_task.task_id,
923 task_status: compact_task.task_status.into(),
924 sorted_output_ssts: compact_task
925 .sorted_output_ssts
926 .iter()
927 .map(|sst| sst.into())
928 .collect(),
929 table_stats_change: to_prost_table_stats_map(table_stats),
930 object_timestamps,
931 })),
932 create_at: SystemTime::now()
933 .duration_since(std::time::UNIX_EPOCH)
934 .expect("Clock may have gone backwards")
935 .as_millis() as u64,
936 }) {
937 let task_id = compact_task.task_id;
938 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
939 }
940 }
941
942 match event {
943 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
944 let event = match event {
945 Some(event) => event,
946 None => continue 'consume_stream,
947 };
948 let shutdown = shutdown_map.clone();
949 let context = compactor_context.clone();
950 let consumed_latency_ms = SystemTime::now()
951 .duration_since(std::time::UNIX_EPOCH)
952 .expect("Clock may have gone backwards")
953 .as_millis() as u64
954 - create_at;
955 context
956 .compactor_metrics
957 .compaction_event_consumed_latency
958 .observe(consumed_latency_ms as _);
959
960 let object_id_manager = object_id_manager.clone();
961 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
962
963 match event {
964 ResponseEvent::CompactTask(compact_task) => {
965 let compact_task = CompactTask::from(compact_task);
966 let parallelism =
967 calculate_task_parallelism(&compact_task, &context);
968
969 assert_ne!(parallelism, 0, "splits cannot be empty");
970
971 if (max_task_parallelism
972 - running_task_parallelism.load(Ordering::SeqCst))
973 < parallelism as u32
974 {
975 tracing::warn!(
976 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
977 compact_task.task_id,
978 parallelism,
979 max_task_parallelism,
980 running_task_parallelism.load(Ordering::Relaxed),
981 );
982 let (compact_task, table_stats, object_timestamps) =
983 compact_done(
984 compact_task,
985 context.clone(),
986 vec![],
987 TaskStatus::NoAvailCpuResourceCanceled,
988 );
989
990 send_report_task_event(
991 &compact_task,
992 table_stats,
993 object_timestamps,
994 &request_sender,
995 );
996
997 continue 'consume_stream;
998 }
999
1000 running_task_parallelism
1001 .fetch_add(parallelism as u32, Ordering::SeqCst);
1002 executor.spawn(async move {
1003 let (tx, rx) = tokio::sync::oneshot::channel();
1004 let task_id = compact_task.task_id;
1005 shutdown.lock().unwrap().insert(task_id, tx);
1006
1007 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1008 context.clone(),
1009 compact_task,
1010 rx,
1011 object_id_manager.clone(),
1012 compaction_catalog_manager_ref.clone(),
1013 )
1014 .await;
1015
1016 shutdown.lock().unwrap().remove(&task_id);
1017 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1018
1019 send_report_task_event(
1020 &compact_task,
1021 table_stats,
1022 object_timestamps,
1023 &request_sender,
1024 );
1025
1026 let enable_check_compaction_result =
1027 context.storage_opts.check_compaction_result;
1028 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1029
1030 if enable_check_compaction_result && need_check_task {
1031 let compact_table_ids = compact_task.build_compact_table_ids();
1032 match compaction_catalog_manager_ref.acquire(compact_table_ids.into_iter().collect()).await {
1033 Ok(compaction_catalog_agent_ref) => {
1034 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1035 {
1036 Err(e) => {
1037 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1038 }
1039 Ok(true) => (),
1040 Ok(false) => {
1041 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1042 }
1043 }
1044 },
1045 Err(e) => {
1046 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1047 }
1048 }
1049 }
1050 });
1051 }
1052 #[expect(deprecated)]
1053 ResponseEvent::VacuumTask(_) => {
1054 unreachable!("unexpected vacuum task");
1055 }
1056 #[expect(deprecated)]
1057 ResponseEvent::FullScanTask(_) => {
1058 unreachable!("unexpected scan task");
1059 }
1060 #[expect(deprecated)]
1061 ResponseEvent::ValidationTask(validation_task) => {
1062 let validation_task = ValidationTask::from(validation_task);
1063 executor.spawn(async move {
1064 validate_ssts(validation_task, context.sstable_store.clone())
1065 .await;
1066 });
1067 }
1068 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1069 .lock()
1070 .unwrap()
1071 .remove(&cancel_compact_task.task_id)
1072 {
1073 Some(tx) => {
1074 if tx.send(()).is_err() {
1075 tracing::warn!(
1076 "Cancellation of compaction task failed. task_id: {}",
1077 cancel_compact_task.task_id
1078 );
1079 }
1080 }
1081 _ => {
1082 tracing::warn!(
1083 "Attempting to cancel non-existent compaction task. task_id: {}",
1084 cancel_compact_task.task_id
1085 );
1086 }
1087 },
1088
1089 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1090 pull_task_ack = true;
1092 }
1093 }
1094 }
1095 Some(Err(e)) => {
1096 tracing::warn!("Failed to consume stream. {}", e.message());
1097 continue 'start_stream;
1098 }
1099 _ => {
1100 continue 'start_stream;
1102 }
1103 }
1104 }
1105 }
1106 });
1107
1108 (join_handle, shutdown_tx)
1109}
1110
1111#[must_use]
1114pub fn start_shared_compactor(
1115 grpc_proxy_client: GrpcCompactorProxyClient,
1116 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1117 context: CompactorContext,
1118) -> (JoinHandle<()>, Sender<()>) {
1119 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1120 let task_progress = context.task_progress_manager.clone();
1121 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1122 let periodic_event_update_interval = Duration::from_millis(1000);
1123
1124 let join_handle = tokio::spawn(async move {
1125 let shutdown_map = CompactionShutdownMap::default();
1126
1127 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1128 let executor = context.compaction_executor.clone();
1129 let report_heartbeat_client = grpc_proxy_client.clone();
1130 'consume_stream: loop {
1131 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1132 _ = periodic_event_interval.tick() => {
1133 let progress_list = get_task_progress(task_progress.clone());
1134 let report_compaction_task_request = ReportCompactionTaskRequest{
1135 event: Some(ReportCompactionTaskEvent::HeartBeat(
1136 SharedHeartBeat {
1137 progress: progress_list
1138 }
1139 )),
1140 };
1141 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1142 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1143 }
1144 continue
1145 }
1146
1147
1148 _ = &mut shutdown_rx => {
1149 tracing::info!("Compactor is shutting down");
1150 return
1151 }
1152
1153 request = receiver.recv() => {
1154 request
1155 }
1156
1157 };
1158 match request {
1159 Some(request) => {
1160 let context = context.clone();
1161 let shutdown = shutdown_map.clone();
1162
1163 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1164 executor.spawn(async move {
1165 let DispatchCompactionTaskRequest {
1166 tables,
1167 output_object_ids,
1168 task: dispatch_task,
1169 } = request.into_inner();
1170 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1171 acc.insert(table.id, table);
1172 acc
1173 });
1174
1175 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1176 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1177 let shared_compactor_object_id_manager =
1178 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1179 match dispatch_task.unwrap() {
1180 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1181 let compact_task = CompactTask::from(&compact_task);
1182 let (tx, rx) = tokio::sync::oneshot::channel();
1183 let task_id = compact_task.task_id;
1184 shutdown.lock().unwrap().insert(task_id, tx);
1185
1186 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1187 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1188 context.clone(),
1189 compact_task,
1190 rx,
1191 shared_compactor_object_id_manager,
1192 compaction_catalog_agent_ref.clone(),
1193 )
1194 .await;
1195 shutdown.lock().unwrap().remove(&task_id);
1196 let report_compaction_task_request = ReportCompactionTaskRequest {
1197 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1198 compact_task: Some(PbCompactTask::from(&compact_task)),
1199 table_stats_change: to_prost_table_stats_map(table_stats),
1200 object_timestamps,
1201 })),
1202 };
1203
1204 match cloned_grpc_proxy_client
1205 .report_compaction_task(report_compaction_task_request)
1206 .await
1207 {
1208 Ok(_) => {
1209 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1211 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1212 if enable_check_compaction_result && need_check_task {
1213 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1214 Err(e) => {
1215 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1216 },
1217 Ok(true) => (),
1218 Ok(false) => {
1219 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1220 }
1221 }
1222 }
1223 }
1224 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1225 }
1226
1227 }
1228 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1229 unreachable!("unexpected vacuum task");
1230 }
1231 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1232 unreachable!("unexpected scan task");
1233 }
1234 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1235 let validation_task = ValidationTask::from(validation_task);
1236 validate_ssts(validation_task, context.sstable_store.clone()).await;
1237 }
1238 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1239 match shutdown
1240 .lock()
1241 .unwrap()
1242 .remove(&cancel_compact_task.task_id)
1243 { Some(tx) => {
1244 if tx.send(()).is_err() {
1245 tracing::warn!(
1246 "Cancellation of compaction task failed. task_id: {}",
1247 cancel_compact_task.task_id
1248 );
1249 }
1250 } _ => {
1251 tracing::warn!(
1252 "Attempting to cancel non-existent compaction task. task_id: {}",
1253 cancel_compact_task.task_id
1254 );
1255 }}
1256 }
1257 }
1258 });
1259 }
1260 None => continue 'consume_stream,
1261 }
1262 }
1263 });
1264 (join_handle, shutdown_tx)
1265}
1266
1267fn get_task_progress(
1268 task_progress: Arc<
1269 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1270 >,
1271) -> Vec<CompactTaskProgress> {
1272 let mut progress_list = Vec::new();
1273 for (&task_id, progress) in &*task_progress.lock() {
1274 progress_list.push(progress.snapshot(task_id));
1275 }
1276 progress_list
1277}
1278
1279fn schedule_queued_tasks(
1281 task_queue: &mut IcebergTaskQueue,
1282 compactor_context: &CompactorContext,
1283 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1284 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1285) {
1286 while let Some(popped_task) = task_queue.pop() {
1287 let task_id = popped_task.meta.task_id;
1288 let plan_index = popped_task.meta.plan_index;
1289 let task_key = (task_id, plan_index);
1290
1291 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1293
1294 let Some(runner) = popped_task.runner else {
1295 tracing::error!(
1296 task_id = task_id,
1297 plan_index = plan_index,
1298 "Popped task missing runner - this should not happen"
1299 );
1300 task_queue.finish_running(task_key);
1301 continue;
1302 };
1303
1304 let executor = compactor_context.compaction_executor.clone();
1305 let shutdown_map_clone = shutdown_map.clone();
1306 let completion_tx_clone = task_completion_tx.clone();
1307
1308 tracing::info!(
1309 task_id = task_id,
1310 plan_index = plan_index,
1311 unique_ident = ?unique_ident,
1312 required_parallelism = popped_task.meta.required_parallelism,
1313 "Starting iceberg compaction task from queue"
1314 );
1315
1316 executor.spawn(async move {
1317 let (tx, rx) = tokio::sync::oneshot::channel();
1318 {
1319 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1320 shutdown_guard.insert(task_key, tx);
1321 }
1322 let _cleanup_guard = scopeguard::guard(shutdown_map_clone.clone(), move |shutdown_map| {
1323 let mut shutdown_guard = shutdown_map.lock().unwrap();
1324 shutdown_guard.remove(&task_key);
1325 });
1326
1327 let result = Box::pin(runner.compact(rx)).await;
1328
1329 let completion = match result {
1330 Ok(_) => IcebergPlanCompletion {
1331 task_key,
1332 error_message: None,
1333 },
1334 Err(e) => {
1335 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1336 IcebergPlanCompletion {
1337 task_key,
1338 error_message: Some(e.to_report_string()),
1339 }
1340 }
1341 };
1342
1343 if completion_tx_clone.send(completion).is_err() {
1344 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1345 }
1346 });
1347 }
1348}
1349
1350fn handle_meta_task_pulling(
1353 pull_task_ack: &mut bool,
1354 task_queue: &IcebergTaskQueue,
1355 max_task_parallelism: u32,
1356 max_pull_task_count: u32,
1357 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1358 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1359) -> bool {
1360 let mut pending_pull_task_count = 0;
1361 if *pull_task_ack {
1362 let current_running_parallelism = task_queue.running_parallelism_sum();
1364 pending_pull_task_count =
1365 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1366
1367 if pending_pull_task_count > 0 {
1368 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1369 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1370 subscribe_iceberg_compaction_event_request::PullTask {
1371 pull_task_count: pending_pull_task_count,
1372 },
1373 )),
1374 create_at: SystemTime::now()
1375 .duration_since(std::time::UNIX_EPOCH)
1376 .expect("Clock may have gone backwards")
1377 .as_millis() as u64,
1378 }) {
1379 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1380 return true; } else {
1382 *pull_task_ack = false;
1383 }
1384 }
1385 }
1386
1387 let running_count = task_queue.running_parallelism_sum();
1388 let waiting_count = task_queue.waiting_parallelism_sum();
1389 let available_count = max_task_parallelism.saturating_sub(running_count);
1390 let current_state = IcebergCompactionLogState {
1391 running_parallelism: running_count,
1392 waiting_parallelism: waiting_count,
1393 available_parallelism: available_count,
1394 pull_task_ack: *pull_task_ack,
1395 pending_pull_task_count,
1396 };
1397
1398 if log_throttler.should_log(¤t_state) {
1400 tracing::info!(
1401 running_parallelism_count = %current_state.running_parallelism,
1402 waiting_parallelism_count = %current_state.waiting_parallelism,
1403 available_parallelism = %current_state.available_parallelism,
1404 pull_task_ack = %current_state.pull_task_ack,
1405 pending_pull_task_count = %current_state.pending_pull_task_count
1406 );
1407 log_throttler.update(current_state);
1408 }
1409
1410 false }
1412
1413#[cfg(test)]
1414mod tests {
1415 use super::*;
1416
1417 #[test]
1418 fn test_log_state_equality() {
1419 let state1 = CompactionLogState {
1421 running_parallelism: 10,
1422 pull_task_ack: true,
1423 pending_pull_task_count: 2,
1424 };
1425 let state2 = CompactionLogState {
1426 running_parallelism: 10,
1427 pull_task_ack: true,
1428 pending_pull_task_count: 2,
1429 };
1430 let state3 = CompactionLogState {
1431 running_parallelism: 11,
1432 pull_task_ack: true,
1433 pending_pull_task_count: 2,
1434 };
1435 assert_eq!(state1, state2);
1436 assert_ne!(state1, state3);
1437
1438 let ice_state1 = IcebergCompactionLogState {
1440 running_parallelism: 10,
1441 waiting_parallelism: 5,
1442 available_parallelism: 15,
1443 pull_task_ack: true,
1444 pending_pull_task_count: 2,
1445 };
1446 let ice_state2 = IcebergCompactionLogState {
1447 running_parallelism: 10,
1448 waiting_parallelism: 6,
1449 available_parallelism: 15,
1450 pull_task_ack: true,
1451 pending_pull_task_count: 2,
1452 };
1453 assert_ne!(ice_state1, ice_state2);
1454 }
1455
1456 #[test]
1457 fn test_log_throttler_state_change_detection() {
1458 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1459 let state1 = CompactionLogState {
1460 running_parallelism: 10,
1461 pull_task_ack: true,
1462 pending_pull_task_count: 2,
1463 };
1464 let state2 = CompactionLogState {
1465 running_parallelism: 11,
1466 pull_task_ack: true,
1467 pending_pull_task_count: 2,
1468 };
1469
1470 assert!(throttler.should_log(&state1));
1472 throttler.update(state1.clone());
1473
1474 assert!(!throttler.should_log(&state1));
1476
1477 assert!(throttler.should_log(&state2));
1479 throttler.update(state2.clone());
1480
1481 assert!(!throttler.should_log(&state2));
1483 }
1484
1485 #[test]
1486 fn test_log_throttler_heartbeat() {
1487 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1488 let state = CompactionLogState {
1489 running_parallelism: 10,
1490 pull_task_ack: true,
1491 pending_pull_task_count: 2,
1492 };
1493
1494 assert!(throttler.should_log(&state));
1496 throttler.update(state.clone());
1497
1498 assert!(!throttler.should_log(&state));
1500
1501 std::thread::sleep(Duration::from_millis(15));
1503
1504 assert!(throttler.should_log(&state));
1506 }
1507}