1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, PbSstableFilterType, ReportCompactionTaskRequest,
78 SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockResult, ObjectIdManager, SstableBuilderOptions, Xor8FilterBuilder,
94 Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder,
105 SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106 validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116 running_parallelism: u32,
117 pull_task_ack: bool,
118 pending_pull_task_count: u32,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124 running_parallelism: u32,
125 waiting_parallelism: u32,
126 available_parallelism: u32,
127 pull_task_ack: bool,
128 pending_pull_task_count: u32,
129}
130
131struct LogThrottler<T: PartialEq> {
133 last_logged_state: Option<T>,
134 last_heartbeat: Instant,
135 heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139 fn new(heartbeat_interval: Duration) -> Self {
140 Self {
141 last_logged_state: None,
142 last_heartbeat: Instant::now(),
143 heartbeat_interval,
144 }
145 }
146
147 fn should_log(&self, current_state: &T) -> bool {
149 self.last_logged_state.as_ref() != Some(current_state)
150 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151 }
152
153 fn update(&mut self, current_state: T) {
155 self.last_logged_state = Some(current_state);
156 self.last_heartbeat = Instant::now();
157 }
158}
159
160pub struct Compactor {
162 context: CompactorContext,
164 object_id_getter: Arc<dyn GetObjectId>,
165 task_config: TaskConfig,
166 options: SstableBuilderOptions,
167 get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173 pub fn new(
175 context: CompactorContext,
176 options: SstableBuilderOptions,
177 task_config: TaskConfig,
178 object_id_getter: Arc<dyn GetObjectId>,
179 ) -> Self {
180 Self {
181 context,
182 options,
183 task_config,
184 get_id_time: Arc::new(AtomicU64::new(0)),
185 object_id_getter,
186 }
187 }
188
189 async fn compact_key_range(
194 &self,
195 iter: impl HummockIterator<Direction = Forward>,
196 compaction_filter: impl CompactionFilter,
197 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198 task_progress: Option<Arc<TaskProgress>>,
199 task_id: Option<HummockCompactionTaskId>,
200 split_index: Option<usize>,
201 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202 let compact_timer = if self.context.is_share_buffer_compact {
204 self.context
205 .compactor_metrics
206 .write_build_l0_sst_duration
207 .start_timer()
208 } else {
209 self.context
210 .compactor_metrics
211 .compact_sst_duration
212 .start_timer()
213 };
214
215 let (split_table_outputs, table_stats_map) = {
216 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217 match (
218 self.task_config.sstable_filter_kind,
219 self.task_config.use_block_based_filter,
220 ) {
221 (PbSstableFilterType::SstableFilterXor8, true) => {
222 self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
223 factory,
224 iter,
225 compaction_filter,
226 compaction_catalog_agent_ref,
227 task_progress.clone(),
228 self.object_id_getter.clone(),
229 )
230 .instrument_await("compact".verbose())
231 .await?
232 }
233 (PbSstableFilterType::SstableFilterXor8, false) => {
234 self.compact_key_range_impl::<_, Xor8FilterBuilder>(
235 factory,
236 iter,
237 compaction_filter,
238 compaction_catalog_agent_ref,
239 task_progress.clone(),
240 self.object_id_getter.clone(),
241 )
242 .instrument_await("compact".verbose())
243 .await?
244 }
245 (PbSstableFilterType::SstableFilterXor16, true) => {
246 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
247 factory,
248 iter,
249 compaction_filter,
250 compaction_catalog_agent_ref,
251 task_progress.clone(),
252 self.object_id_getter.clone(),
253 )
254 .instrument_await("compact".verbose())
255 .await?
256 }
257 (PbSstableFilterType::SstableFilterXor16, false) => {
258 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
259 factory,
260 iter,
261 compaction_filter,
262 compaction_catalog_agent_ref,
263 task_progress.clone(),
264 self.object_id_getter.clone(),
265 )
266 .instrument_await("compact".verbose())
267 .await?
268 }
269 (kind, _) => unreachable!("unsupported sstable filter kind in compactor: {kind:?}"),
270 }
271 };
272
273 compact_timer.observe_duration();
274
275 Self::report_progress(
276 self.context.compactor_metrics.clone(),
277 task_progress,
278 &split_table_outputs,
279 self.context.is_share_buffer_compact,
280 );
281
282 self.context
283 .compactor_metrics
284 .get_table_id_total_time_duration
285 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
286
287 debug_assert!(
288 split_table_outputs
289 .iter()
290 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
291 );
292
293 if task_id.is_some() {
294 tracing::info!(
296 "Finish Task {:?} split_index {:?} sst count {}",
297 task_id,
298 split_index,
299 split_table_outputs.len()
300 );
301 }
302 Ok((split_table_outputs, table_stats_map))
303 }
304
305 pub fn report_progress(
306 metrics: Arc<CompactorMetrics>,
307 task_progress: Option<Arc<TaskProgress>>,
308 ssts: &Vec<LocalSstableInfo>,
309 is_share_buffer_compact: bool,
310 ) {
311 for sst_info in ssts {
312 let sst_size = sst_info.file_size();
313 if let Some(tracker) = &task_progress {
314 tracker.inc_ssts_uploaded();
315 tracker.dec_num_pending_write_io();
316 }
317 if is_share_buffer_compact {
318 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
319 } else {
320 metrics.compaction_upload_sst_counts.inc();
321 }
322 }
323 }
324
325 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
326 &self,
327 writer_factory: F,
328 iter: impl HummockIterator<Direction = Forward>,
329 compaction_filter: impl CompactionFilter,
330 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331 task_progress: Option<Arc<TaskProgress>>,
332 object_id_getter: Arc<dyn GetObjectId>,
333 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
334 let builder_factory = RemoteBuilderFactory::<F, B> {
335 object_id_getter,
336 limiter: self.context.memory_limiter.clone(),
337 options: self.options.clone(),
338 policy: self.task_config.cache_policy,
339 remote_rpc_cost: self.get_id_time.clone(),
340 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
341 sstable_writer_factory: writer_factory,
342 _phantom: PhantomData,
343 };
344
345 let mut sst_builder = CapacitySplitTableBuilder::new(
346 builder_factory,
347 self.context.compactor_metrics.clone(),
348 task_progress.clone(),
349 self.task_config.table_vnode_partition.clone(),
350 self.context
351 .storage_opts
352 .compactor_concurrent_uploading_sst_count,
353 compaction_catalog_agent_ref,
354 );
355 let compaction_statistics = compact_and_build_sst(
356 &mut sst_builder,
357 &self.task_config,
358 self.context.compactor_metrics.clone(),
359 iter,
360 compaction_filter,
361 )
362 .instrument_await("compact_and_build_sst".verbose())
363 .await?;
364
365 let ssts = sst_builder
366 .finish()
367 .instrument_await("builder_finish".verbose())
368 .await?;
369
370 Ok((ssts, compaction_statistics))
371 }
372}
373
374#[must_use]
377pub fn start_iceberg_compactor(
378 compactor_context: CompactorContext,
379 hummock_meta_client: Arc<dyn HummockMetaClient>,
380) -> (JoinHandle<()>, Sender<()>) {
381 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
382 let stream_retry_interval = Duration::from_secs(30);
383 let periodic_event_update_interval = Duration::from_millis(
384 compactor_context
385 .storage_opts
386 .iceberg_compaction_pull_interval_ms,
387 );
388 let worker_num = compactor_context.compaction_executor.worker_num();
389
390 let max_task_parallelism: u32 = (worker_num as f32
391 * compactor_context.storage_opts.compactor_max_task_multiplier)
392 .ceil() as u32;
393
394 const MAX_PULL_TASK_COUNT: u32 = 4;
395 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
396
397 assert_ge!(
398 compactor_context.storage_opts.compactor_max_task_multiplier,
399 0.0
400 );
401
402 let join_handle = tokio::spawn(async move {
403 let pending_parallelism_budget = (max_task_parallelism as f32
405 * compactor_context
406 .storage_opts
407 .iceberg_compaction_pending_parallelism_budget_multiplier)
408 .ceil() as u32;
409 let mut task_queue =
410 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
411
412 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
414
415 let (task_completion_tx, mut task_completion_rx) =
417 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
418 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
419 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
422
423 let mut min_interval = tokio::time::interval(stream_retry_interval);
424 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
425
426 let mut log_throttler =
428 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
429
430 'start_stream: loop {
432 let mut pull_task_ack = true;
435 tokio::select! {
436 _ = min_interval.tick() => {},
438 _ = &mut shutdown_rx => {
440 tracing::info!("Compactor is shutting down");
441 return;
442 }
443 }
444
445 let (request_sender, response_event_stream) = match hummock_meta_client
446 .subscribe_iceberg_compaction_event()
447 .await
448 {
449 Ok((request_sender, response_event_stream)) => {
450 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
451 (request_sender, response_event_stream)
452 }
453
454 Err(e) => {
455 tracing::warn!(
456 error = %e.as_report(),
457 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
458 );
459 continue 'start_stream;
460 }
461 };
462
463 if matches!(
464 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
465 ReportSendResult::RestartStream
466 ) {
467 continue 'start_stream;
468 }
469
470 pin_mut!(response_event_stream);
471
472 let _executor = compactor_context.compaction_executor.clone();
473
474 let mut event_loop_iteration_now = Instant::now();
476 'consume_stream: loop {
477 {
478 compactor_context
480 .compactor_metrics
481 .compaction_event_loop_iteration_latency
482 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
483 event_loop_iteration_now = Instant::now();
484 }
485
486 let request_sender = request_sender.clone();
487 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
488 Some(plan_completion) = task_completion_rx.recv() => {
490 let task_key = plan_completion.task_key;
491 let error_message = plan_completion.error_message;
492 tracing::debug!(
493 task_id = task_key.0,
494 plan_index = task_key.1,
495 success = error_message.is_none(),
496 "Plan completed, updating queue state"
497 );
498 task_queue.finish_running(task_key);
499
500 let completed_task_id = task_key.0;
501 let Entry::Occupied(mut tracker_entry) =
502 task_trackers.entry(completed_task_id)
503 else {
504 continue 'consume_stream;
505 };
506 tracker_entry.get_mut().record_completion(error_message);
507 if !tracker_entry.get().is_finished() {
508 continue 'consume_stream;
509 }
510
511 let report = tracker_entry.remove().into_report(completed_task_id);
512 if matches!(
513 send_or_buffer_iceberg_task_report(
514 &request_sender,
515 &mut pending_task_reports,
516 report,
517 ),
518 ReportSendResult::RestartStream
519 ) {
520 continue 'start_stream;
521 }
522 continue 'consume_stream;
523 }
524
525 _ = task_queue.wait_schedulable() => {
527 schedule_queued_tasks(
528 &mut task_queue,
529 &compactor_context,
530 &shutdown_map,
531 &task_completion_tx,
532 );
533 continue 'consume_stream;
534 }
535
536 _ = periodic_event_interval.tick() => {
537 let should_restart_stream = handle_meta_task_pulling(
539 &mut pull_task_ack,
540 &task_queue,
541 max_task_parallelism,
542 max_pull_task_count,
543 &request_sender,
544 &mut log_throttler,
545 );
546
547 if should_restart_stream {
548 continue 'start_stream;
549 }
550 continue;
551 }
552 event = response_event_stream.next() => {
553 event
554 }
555
556 _ = &mut shutdown_rx => {
557 tracing::info!("Iceberg Compactor is shutting down");
558 return
559 }
560 };
561
562 match event {
563 Some(Ok(SubscribeIcebergCompactionEventResponse {
564 event,
565 create_at: _create_at,
566 })) => {
567 let event = match event {
568 Some(event) => event,
569 None => continue 'consume_stream,
570 };
571
572 match event {
573 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
574 let task_id = iceberg_compaction_task.task_id;
575 let sink_id = iceberg_compaction_task.sink_id;
576 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
578 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
579 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
580 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
581 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
582 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
583 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
584 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
585 .target_binpack_group_size_mb(
586 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
587 )
588 .min_group_size_mb(
589 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
590 )
591 .min_group_file_count(
592 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
593 )
594 .build() {
595 Ok(config) => config,
596 Err(e) => {
597 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
598 let report = build_iceberg_task_report(
599 task_id,
600 sink_id,
601 Some(format!(
602 "Failed to build iceberg compactor runner config: {}",
603 e.as_report()
604 )),
605 );
606 if matches!(
607 send_or_buffer_iceberg_task_report(
608 &request_sender,
609 &mut pending_task_reports,
610 report,
611 ),
612 ReportSendResult::RestartStream
613 ) {
614 continue 'start_stream;
615 }
616 continue 'consume_stream;
617 }
618 };
619
620 let task_execution = match create_task_execution(
622 iceberg_compaction_task,
623 compactor_runner_config,
624 compactor_context.compactor_metrics.clone(),
625 ).await {
626 Ok(task_execution) => task_execution,
627 Err(e) => {
628 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
629 let report = build_iceberg_task_report(
630 task_id,
631 sink_id,
632 Some(format!(
633 "Failed to create iceberg compaction task execution: {}",
634 e.as_report()
635 )),
636 );
637 if matches!(
638 send_or_buffer_iceberg_task_report(
639 &request_sender,
640 &mut pending_task_reports,
641 report,
642 ),
643 ReportSendResult::RestartStream
644 ) {
645 continue 'start_stream;
646 }
647 continue 'consume_stream;
648 }
649 };
650
651 let sink_id = task_execution.sink_id;
652 let plan_runners = task_execution.plan_runners;
653
654 if plan_runners.is_empty() {
655 tracing::info!(task_id, sink_id, "No plans to execute");
656 let report = build_iceberg_task_report(task_id, sink_id, None);
657 if matches!(
658 send_or_buffer_iceberg_task_report(
659 &request_sender,
660 &mut pending_task_reports,
661 report,
662 ),
663 ReportSendResult::RestartStream
664 ) {
665 continue 'start_stream;
666 }
667 continue 'consume_stream;
668 }
669
670 let total_plans = plan_runners.len();
672 let mut enqueued_count = 0;
673
674 for runner in plan_runners {
675 let meta = runner.to_meta();
676 let plan_index = meta.plan_index;
677 let required_parallelism = runner.required_parallelism();
678 let push_result = task_queue.push(meta, Some(runner));
679
680 match push_result {
681 PushResult::Added => {
682 enqueued_count += 1;
683 tracing::debug!(
684 task_id = task_id,
685 plan_index = plan_index,
686 required_parallelism = required_parallelism,
687 "Iceberg plan runner added to queue"
688 );
689 },
690 PushResult::RejectedCapacity => {
691 tracing::warn!(
692 task_id = task_id,
693 required_parallelism = required_parallelism,
694 pending_budget = pending_parallelism_budget,
695 enqueued_count = enqueued_count,
696 total_plans = total_plans,
697 "Iceberg plan runner rejected - queue capacity exceeded"
698 );
699 break;
701 },
702 PushResult::RejectedTooLarge => {
703 tracing::error!(
704 task_id = task_id,
705 required_parallelism = required_parallelism,
706 max_parallelism = max_task_parallelism,
707 "Iceberg plan runner rejected - parallelism exceeds max"
708 );
709 },
710 PushResult::RejectedInvalidParallelism => {
711 tracing::error!(
712 task_id = task_id,
713 required_parallelism = required_parallelism,
714 "Iceberg plan runner rejected - invalid parallelism"
715 );
716 },
717 PushResult::RejectedDuplicate => {
718 tracing::error!(
719 task_id = task_id,
720 plan_index = plan_index,
721 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
722 );
723 }
724 }
725 }
726
727 if enqueued_count == 0 {
728 let report = build_iceberg_task_report(
729 task_id,
730 sink_id,
731 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
732 );
733 if matches!(
734 send_or_buffer_iceberg_task_report(
735 &request_sender,
736 &mut pending_task_reports,
737 report,
738 ),
739 ReportSendResult::RestartStream
740 ) {
741 continue 'start_stream;
742 }
743 } else {
744 task_trackers.insert(
745 task_id,
746 IcebergTaskTracker::new(sink_id, enqueued_count),
747 );
748 }
749
750 tracing::info!(
751 task_id = task_id,
752 sink_id = sink_id,
753 total_plans = total_plans,
754 enqueued_count = enqueued_count,
755 "Enqueued {} of {} Iceberg plan runners",
756 enqueued_count,
757 total_plans
758 );
759 },
760 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
761 pull_task_ack = true;
763 },
764 }
765 }
766 Some(Err(e)) => {
767 tracing::warn!("Failed to consume stream. {}", e.message());
768 continue 'start_stream;
769 }
770 _ => {
771 continue 'start_stream;
773 }
774 }
775 }
776 }
777 });
778
779 (join_handle, shutdown_tx)
780}
781
782#[must_use]
785pub fn start_compactor(
786 compactor_context: CompactorContext,
787 hummock_meta_client: Arc<dyn HummockMetaClient>,
788 object_id_manager: Arc<ObjectIdManager>,
789 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
790) -> (JoinHandle<()>, Sender<()>) {
791 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
792 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
793 let stream_retry_interval = Duration::from_secs(30);
794 let task_progress = compactor_context.task_progress_manager.clone();
795 let periodic_event_update_interval = Duration::from_millis(1000);
796
797 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
798 * compactor_context.storage_opts.compactor_max_task_multiplier)
799 .ceil() as u32;
800 let running_task_parallelism = Arc::new(AtomicU32::new(0));
801
802 const MAX_PULL_TASK_COUNT: u32 = 4;
803 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
804
805 assert_ge!(
806 compactor_context.storage_opts.compactor_max_task_multiplier,
807 0.0
808 );
809
810 let join_handle = tokio::spawn(async move {
811 let shutdown_map = CompactionShutdownMap::default();
812 let mut min_interval = tokio::time::interval(stream_retry_interval);
813 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
814
815 let mut log_throttler =
817 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
818
819 'start_stream: loop {
821 let mut pull_task_ack = true;
824 tokio::select! {
825 _ = min_interval.tick() => {},
827 _ = &mut shutdown_rx => {
829 tracing::info!("Compactor is shutting down");
830 return;
831 }
832 }
833
834 let (request_sender, response_event_stream) =
835 match hummock_meta_client.subscribe_compaction_event().await {
836 Ok((request_sender, response_event_stream)) => {
837 tracing::debug!("Succeeded subscribe_compaction_event.");
838 (request_sender, response_event_stream)
839 }
840
841 Err(e) => {
842 tracing::warn!(
843 error = %e.as_report(),
844 "Subscribing to compaction tasks failed with error. Will retry.",
845 );
846 continue 'start_stream;
847 }
848 };
849
850 pin_mut!(response_event_stream);
851
852 let executor = compactor_context.compaction_executor.clone();
853 let object_id_manager = object_id_manager.clone();
854
855 let mut event_loop_iteration_now = Instant::now();
857 'consume_stream: loop {
858 {
859 compactor_context
861 .compactor_metrics
862 .compaction_event_loop_iteration_latency
863 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
864 event_loop_iteration_now = Instant::now();
865 }
866
867 let running_task_parallelism = running_task_parallelism.clone();
868 let request_sender = request_sender.clone();
869 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
870 _ = periodic_event_interval.tick() => {
871 let progress_list = get_task_progress(task_progress.clone());
872
873 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
874 event: Some(RequestEvent::HeartBeat(
875 HeartBeat {
876 progress: progress_list
877 }
878 )),
879 create_at: SystemTime::now()
880 .duration_since(std::time::UNIX_EPOCH)
881 .expect("Clock may have gone backwards")
882 .as_millis() as u64,
883 }) {
884 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
885 continue 'start_stream;
887 }
888
889
890 let mut pending_pull_task_count = 0;
891 if pull_task_ack {
892 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
894
895 if pending_pull_task_count > 0 {
896 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
897 event: Some(RequestEvent::PullTask(
898 PullTask {
899 pull_task_count: pending_pull_task_count,
900 }
901 )),
902 create_at: SystemTime::now()
903 .duration_since(std::time::UNIX_EPOCH)
904 .expect("Clock may have gone backwards")
905 .as_millis() as u64,
906 }) {
907 tracing::warn!(error = %e.as_report(), "Failed to pull task");
908
909 continue 'start_stream;
911 } else {
912 pull_task_ack = false;
913 }
914 }
915 }
916
917 let running_count = running_task_parallelism.load(Ordering::SeqCst);
918 let current_state = CompactionLogState {
919 running_parallelism: running_count,
920 pull_task_ack,
921 pending_pull_task_count,
922 };
923
924 if log_throttler.should_log(¤t_state) {
926 tracing::info!(
927 running_parallelism_count = %current_state.running_parallelism,
928 pull_task_ack = %current_state.pull_task_ack,
929 pending_pull_task_count = %current_state.pending_pull_task_count
930 );
931 log_throttler.update(current_state);
932 }
933
934 continue;
935 }
936 event = response_event_stream.next() => {
937 event
938 }
939
940 _ = &mut shutdown_rx => {
941 tracing::info!("Compactor is shutting down");
942 return
943 }
944 };
945
946 fn send_report_task_event(
947 compact_task: &CompactTask,
948 table_stats: TableStatsMap,
949 object_timestamps: HashMap<HummockSstableObjectId, u64>,
950 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
951 ) {
952 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
953 event: Some(RequestEvent::ReportTask(ReportTask {
954 task_id: compact_task.task_id,
955 task_status: compact_task.task_status.into(),
956 sorted_output_ssts: compact_task
957 .sorted_output_ssts
958 .iter()
959 .map(|sst| sst.into())
960 .collect(),
961 table_stats_change: to_prost_table_stats_map(table_stats),
962 object_timestamps,
963 })),
964 create_at: SystemTime::now()
965 .duration_since(std::time::UNIX_EPOCH)
966 .expect("Clock may have gone backwards")
967 .as_millis() as u64,
968 }) {
969 let task_id = compact_task.task_id;
970 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
971 }
972 }
973
974 match event {
975 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
976 let event = match event {
977 Some(event) => event,
978 None => continue 'consume_stream,
979 };
980 let shutdown = shutdown_map.clone();
981 let context = compactor_context.clone();
982 let consumed_latency_ms = SystemTime::now()
983 .duration_since(std::time::UNIX_EPOCH)
984 .expect("Clock may have gone backwards")
985 .as_millis() as u64
986 - create_at;
987 context
988 .compactor_metrics
989 .compaction_event_consumed_latency
990 .observe(consumed_latency_ms as _);
991
992 let object_id_manager = object_id_manager.clone();
993 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
994
995 match event {
996 ResponseEvent::CompactTask(compact_task) => {
997 let compact_task = CompactTask::from(compact_task);
998 let parallelism =
999 calculate_task_parallelism(&compact_task, &context);
1000
1001 assert_ne!(parallelism, 0, "splits cannot be empty");
1002
1003 if (max_task_parallelism
1004 - running_task_parallelism.load(Ordering::SeqCst))
1005 < parallelism as u32
1006 {
1007 tracing::warn!(
1008 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1009 compact_task.task_id,
1010 parallelism,
1011 max_task_parallelism,
1012 running_task_parallelism.load(Ordering::Relaxed),
1013 );
1014 let (compact_task, table_stats, object_timestamps) =
1015 compact_done(
1016 compact_task,
1017 context.clone(),
1018 vec![],
1019 TaskStatus::NoAvailCpuResourceCanceled,
1020 );
1021
1022 send_report_task_event(
1023 &compact_task,
1024 table_stats,
1025 object_timestamps,
1026 &request_sender,
1027 );
1028
1029 continue 'consume_stream;
1030 }
1031
1032 running_task_parallelism
1033 .fetch_add(parallelism as u32, Ordering::SeqCst);
1034 executor.spawn(async move {
1035 let (tx, rx) = tokio::sync::oneshot::channel();
1036 let task_id = compact_task.task_id;
1037 shutdown.lock().unwrap().insert(task_id, tx);
1038
1039 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1040 context.clone(),
1041 compact_task,
1042 rx,
1043 object_id_manager.clone(),
1044 compaction_catalog_manager_ref.clone(),
1045 )
1046 .await;
1047
1048 shutdown.lock().unwrap().remove(&task_id);
1049 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1050
1051 send_report_task_event(
1052 &compact_task,
1053 table_stats,
1054 object_timestamps,
1055 &request_sender,
1056 );
1057
1058 let enable_check_compaction_result =
1059 context.storage_opts.check_compaction_result;
1060 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1061
1062 if enable_check_compaction_result && need_check_task {
1063 let read_table_ids = compact_task
1064 .get_table_ids_from_input_ssts()
1065 .collect::<Vec<_>>();
1066 match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1067 Ok(compaction_catalog_agent_ref) => {
1068 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1069 {
1070 Err(e) => {
1071 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1072 }
1073 Ok(true) => (),
1074 Ok(false) => {
1075 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1076 }
1077 }
1078 },
1079 Err(e) => {
1080 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1081 }
1082 }
1083 }
1084 });
1085 }
1086 #[expect(deprecated)]
1087 ResponseEvent::VacuumTask(_) => {
1088 unreachable!("unexpected vacuum task");
1089 }
1090 #[expect(deprecated)]
1091 ResponseEvent::FullScanTask(_) => {
1092 unreachable!("unexpected scan task");
1093 }
1094 #[expect(deprecated)]
1095 ResponseEvent::ValidationTask(validation_task) => {
1096 let validation_task = ValidationTask::from(validation_task);
1097 executor.spawn(async move {
1098 validate_ssts(validation_task, context.sstable_store.clone())
1099 .await;
1100 });
1101 }
1102 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1103 .lock()
1104 .unwrap()
1105 .remove(&cancel_compact_task.task_id)
1106 {
1107 Some(tx) => {
1108 if tx.send(()).is_err() {
1109 tracing::warn!(
1110 "Cancellation of compaction task failed. task_id: {}",
1111 cancel_compact_task.task_id
1112 );
1113 }
1114 }
1115 _ => {
1116 tracing::warn!(
1117 "Attempting to cancel non-existent compaction task. task_id: {}",
1118 cancel_compact_task.task_id
1119 );
1120 }
1121 },
1122
1123 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1124 pull_task_ack = true;
1126 }
1127 }
1128 }
1129 Some(Err(e)) => {
1130 tracing::warn!("Failed to consume stream. {}", e.message());
1131 continue 'start_stream;
1132 }
1133 _ => {
1134 continue 'start_stream;
1136 }
1137 }
1138 }
1139 }
1140 });
1141
1142 (join_handle, shutdown_tx)
1143}
1144
1145#[must_use]
1148pub fn start_shared_compactor(
1149 grpc_proxy_client: GrpcCompactorProxyClient,
1150 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1151 context: CompactorContext,
1152) -> (JoinHandle<()>, Sender<()>) {
1153 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1154 let task_progress = context.task_progress_manager.clone();
1155 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1156 let periodic_event_update_interval = Duration::from_millis(1000);
1157
1158 let join_handle = tokio::spawn(async move {
1159 let shutdown_map = CompactionShutdownMap::default();
1160
1161 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1162 let executor = context.compaction_executor.clone();
1163 let report_heartbeat_client = grpc_proxy_client.clone();
1164 'consume_stream: loop {
1165 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1166 _ = periodic_event_interval.tick() => {
1167 let progress_list = get_task_progress(task_progress.clone());
1168 let report_compaction_task_request = ReportCompactionTaskRequest{
1169 event: Some(ReportCompactionTaskEvent::HeartBeat(
1170 SharedHeartBeat {
1171 progress: progress_list
1172 }
1173 )),
1174 };
1175 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1176 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1177 }
1178 continue
1179 }
1180
1181
1182 _ = &mut shutdown_rx => {
1183 tracing::info!("Compactor is shutting down");
1184 return
1185 }
1186
1187 request = receiver.recv() => {
1188 request
1189 }
1190
1191 };
1192 match request {
1193 Some(request) => {
1194 let context = context.clone();
1195 let shutdown = shutdown_map.clone();
1196
1197 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1198 executor.spawn(async move {
1199 let DispatchCompactionTaskRequest {
1200 tables,
1201 output_object_ids,
1202 task: dispatch_task,
1203 } = request.into_inner();
1204 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1205 acc.insert(table.id, table);
1206 acc
1207 });
1208
1209 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1210 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1211 let shared_compactor_object_id_manager =
1212 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1213 match dispatch_task.unwrap() {
1214 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1215 let compact_task = CompactTask::from(&compact_task);
1216 let (tx, rx) = tokio::sync::oneshot::channel();
1217 let task_id = compact_task.task_id;
1218 shutdown.lock().unwrap().insert(task_id, tx);
1219
1220 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1221 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1222 context.clone(),
1223 compact_task,
1224 rx,
1225 shared_compactor_object_id_manager,
1226 compaction_catalog_agent_ref.clone(),
1227 )
1228 .await;
1229 shutdown.lock().unwrap().remove(&task_id);
1230 let report_compaction_task_request = ReportCompactionTaskRequest {
1231 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1232 compact_task: Some(PbCompactTask::from(&compact_task)),
1233 table_stats_change: to_prost_table_stats_map(table_stats),
1234 object_timestamps,
1235 })),
1236 };
1237
1238 match cloned_grpc_proxy_client
1239 .report_compaction_task(report_compaction_task_request)
1240 .await
1241 {
1242 Ok(_) => {
1243 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1245 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1246 if enable_check_compaction_result && need_check_task {
1247 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1248 Err(e) => {
1249 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1250 },
1251 Ok(true) => (),
1252 Ok(false) => {
1253 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1254 }
1255 }
1256 }
1257 }
1258 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1259 }
1260
1261 }
1262 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1263 unreachable!("unexpected vacuum task");
1264 }
1265 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1266 unreachable!("unexpected scan task");
1267 }
1268 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1269 let validation_task = ValidationTask::from(validation_task);
1270 validate_ssts(validation_task, context.sstable_store.clone()).await;
1271 }
1272 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1273 match shutdown
1274 .lock()
1275 .unwrap()
1276 .remove(&cancel_compact_task.task_id)
1277 { Some(tx) => {
1278 if tx.send(()).is_err() {
1279 tracing::warn!(
1280 "Cancellation of compaction task failed. task_id: {}",
1281 cancel_compact_task.task_id
1282 );
1283 }
1284 } _ => {
1285 tracing::warn!(
1286 "Attempting to cancel non-existent compaction task. task_id: {}",
1287 cancel_compact_task.task_id
1288 );
1289 }}
1290 }
1291 }
1292 });
1293 }
1294 None => continue 'consume_stream,
1295 }
1296 }
1297 });
1298 (join_handle, shutdown_tx)
1299}
1300
1301fn get_task_progress(
1302 task_progress: Arc<
1303 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1304 >,
1305) -> Vec<CompactTaskProgress> {
1306 let mut progress_list = Vec::new();
1307 for (&task_id, progress) in &*task_progress.lock() {
1308 progress_list.push(progress.snapshot(task_id));
1309 }
1310 progress_list
1311}
1312
1313fn schedule_queued_tasks(
1315 task_queue: &mut IcebergTaskQueue,
1316 compactor_context: &CompactorContext,
1317 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1318 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1319) {
1320 while let Some(popped_task) = task_queue.pop() {
1321 let task_id = popped_task.meta.task_id;
1322 let plan_index = popped_task.meta.plan_index;
1323 let task_key = (task_id, plan_index);
1324
1325 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1327
1328 let Some(runner) = popped_task.runner else {
1329 tracing::error!(
1330 task_id = task_id,
1331 plan_index = plan_index,
1332 "Popped task missing runner - this should not happen"
1333 );
1334 task_queue.finish_running(task_key);
1335 continue;
1336 };
1337
1338 let executor = compactor_context.compaction_executor.clone();
1339 let shutdown_map_clone = shutdown_map.clone();
1340 let completion_tx_clone = task_completion_tx.clone();
1341
1342 tracing::info!(
1343 task_id = task_id,
1344 plan_index = plan_index,
1345 unique_ident = ?unique_ident,
1346 required_parallelism = popped_task.meta.required_parallelism,
1347 "Starting iceberg compaction task from queue"
1348 );
1349
1350 executor.spawn(async move {
1351 let (tx, rx) = tokio::sync::oneshot::channel();
1352 {
1353 let mut shutdown_guard = shutdown_map_clone.lock().unwrap();
1354 shutdown_guard.insert(task_key, tx);
1355 }
1356 let _cleanup_guard = scopeguard::guard(shutdown_map_clone.clone(), move |shutdown_map| {
1357 let mut shutdown_guard = shutdown_map.lock().unwrap();
1358 shutdown_guard.remove(&task_key);
1359 });
1360
1361 let result = Box::pin(runner.compact(rx)).await;
1362
1363 let completion = match result {
1364 Ok(_) => IcebergPlanCompletion {
1365 task_key,
1366 error_message: None,
1367 },
1368 Err(e) => {
1369 tracing::warn!(error = %e.as_report(), task_id = task_key.0, plan_index = task_key.1, "Failed to compact iceberg runner");
1370 IcebergPlanCompletion {
1371 task_key,
1372 error_message: Some(e.to_report_string()),
1373 }
1374 }
1375 };
1376
1377 if completion_tx_clone.send(completion).is_err() {
1378 tracing::warn!(task_id = task_key.0, plan_index = task_key.1, "Failed to notify task completion - main loop may have shut down");
1379 }
1380 });
1381 }
1382}
1383
1384fn handle_meta_task_pulling(
1387 pull_task_ack: &mut bool,
1388 task_queue: &IcebergTaskQueue,
1389 max_task_parallelism: u32,
1390 max_pull_task_count: u32,
1391 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1392 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1393) -> bool {
1394 let mut pending_pull_task_count = 0;
1395 if *pull_task_ack {
1396 let current_running_parallelism = task_queue.running_parallelism_sum();
1398 pending_pull_task_count =
1399 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1400
1401 if pending_pull_task_count > 0 {
1402 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1403 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1404 subscribe_iceberg_compaction_event_request::PullTask {
1405 pull_task_count: pending_pull_task_count,
1406 },
1407 )),
1408 create_at: SystemTime::now()
1409 .duration_since(std::time::UNIX_EPOCH)
1410 .expect("Clock may have gone backwards")
1411 .as_millis() as u64,
1412 }) {
1413 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1414 return true; } else {
1416 *pull_task_ack = false;
1417 }
1418 }
1419 }
1420
1421 let running_count = task_queue.running_parallelism_sum();
1422 let waiting_count = task_queue.waiting_parallelism_sum();
1423 let available_count = max_task_parallelism.saturating_sub(running_count);
1424 let current_state = IcebergCompactionLogState {
1425 running_parallelism: running_count,
1426 waiting_parallelism: waiting_count,
1427 available_parallelism: available_count,
1428 pull_task_ack: *pull_task_ack,
1429 pending_pull_task_count,
1430 };
1431
1432 if log_throttler.should_log(¤t_state) {
1434 tracing::info!(
1435 running_parallelism_count = %current_state.running_parallelism,
1436 waiting_parallelism_count = %current_state.waiting_parallelism,
1437 available_parallelism = %current_state.available_parallelism,
1438 pull_task_ack = %current_state.pull_task_ack,
1439 pending_pull_task_count = %current_state.pending_pull_task_count
1440 );
1441 log_throttler.update(current_state);
1442 }
1443
1444 false }
1446
1447#[cfg(test)]
1448mod tests {
1449 use super::*;
1450
1451 #[test]
1452 fn test_log_state_equality() {
1453 let state1 = CompactionLogState {
1455 running_parallelism: 10,
1456 pull_task_ack: true,
1457 pending_pull_task_count: 2,
1458 };
1459 let state2 = CompactionLogState {
1460 running_parallelism: 10,
1461 pull_task_ack: true,
1462 pending_pull_task_count: 2,
1463 };
1464 let state3 = CompactionLogState {
1465 running_parallelism: 11,
1466 pull_task_ack: true,
1467 pending_pull_task_count: 2,
1468 };
1469 assert_eq!(state1, state2);
1470 assert_ne!(state1, state3);
1471
1472 let ice_state1 = IcebergCompactionLogState {
1474 running_parallelism: 10,
1475 waiting_parallelism: 5,
1476 available_parallelism: 15,
1477 pull_task_ack: true,
1478 pending_pull_task_count: 2,
1479 };
1480 let ice_state2 = IcebergCompactionLogState {
1481 running_parallelism: 10,
1482 waiting_parallelism: 6,
1483 available_parallelism: 15,
1484 pull_task_ack: true,
1485 pending_pull_task_count: 2,
1486 };
1487 assert_ne!(ice_state1, ice_state2);
1488 }
1489
1490 #[test]
1491 fn test_log_throttler_state_change_detection() {
1492 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1493 let state1 = CompactionLogState {
1494 running_parallelism: 10,
1495 pull_task_ack: true,
1496 pending_pull_task_count: 2,
1497 };
1498 let state2 = CompactionLogState {
1499 running_parallelism: 11,
1500 pull_task_ack: true,
1501 pending_pull_task_count: 2,
1502 };
1503
1504 assert!(throttler.should_log(&state1));
1506 throttler.update(state1.clone());
1507
1508 assert!(!throttler.should_log(&state1));
1510
1511 assert!(throttler.should_log(&state2));
1513 throttler.update(state2.clone());
1514
1515 assert!(!throttler.should_log(&state2));
1517 }
1518
1519 #[test]
1520 fn test_log_throttler_heartbeat() {
1521 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1522 let state = CompactionLogState {
1523 running_parallelism: 10,
1524 pull_task_ack: true,
1525 pending_pull_task_count: 2,
1526 };
1527
1528 assert!(throttler.should_log(&state));
1530 throttler.update(state.clone());
1531
1532 assert!(!throttler.should_log(&state));
1534
1535 std::thread::sleep(Duration::from_millis(15));
1537
1538 assert!(throttler.should_log(&state));
1540 }
1541}