1mod compaction_executor;
16mod compaction_filter;
17pub mod compaction_utils;
18mod iceberg_compaction;
19use risingwave_hummock_sdk::compact_task::{CompactTask, ValidationTask};
20use risingwave_pb::compactor::{DispatchCompactionTaskRequest, dispatch_compaction_task_request};
21use risingwave_pb::hummock::PbCompactTask;
22use risingwave_pb::hummock::report_compaction_task_request::{
23 Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat,
24 ReportTask as ReportSharedTask,
25};
26use risingwave_pb::iceberg_compaction::{
27 SubscribeIcebergCompactionEventRequest, SubscribeIcebergCompactionEventResponse,
28 subscribe_iceberg_compaction_event_request,
29};
30use risingwave_rpc_client::GrpcCompactorProxyClient;
31use thiserror_ext::AsReport;
32use tokio::sync::mpsc;
33use tonic::Request;
34
35pub mod compactor_runner;
36mod context;
37pub mod fast_compactor_runner;
38mod iterator;
39mod shared_buffer_compact;
40pub(super) mod task_progress;
41
42use std::collections::hash_map::Entry;
43use std::collections::{HashMap, VecDeque};
44use std::marker::PhantomData;
45use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47use std::time::{Duration, SystemTime};
48
49use await_tree::{InstrumentAwait, SpanExt};
50pub use compaction_executor::CompactionExecutor;
51pub use compaction_filter::{
52 CompactionFilter, DummyCompactionFilter, MultiCompactionFilter, TtlCompactionFilter,
53};
54pub use context::{
55 CompactionAwaitTreeRegRef, CompactorContext, await_tree_key, new_compaction_await_tree_reg_ref,
56};
57use futures::{StreamExt, pin_mut};
58use iceberg_compaction::iceberg_compactor_runner::IcebergCompactorRunnerConfigBuilder;
60use iceberg_compaction::{
61 IcebergPlanCompletion, IcebergTaskQueue, IcebergTaskReport, IcebergTaskTracker, PushResult,
62 ReportSendResult, build_iceberg_task_report, create_task_execution,
63 flush_pending_iceberg_task_reports, send_or_buffer_iceberg_task_report,
64};
65pub use iterator::{ConcatSstableIterator, SstableStreamIterator};
66use more_asserts::assert_ge;
67use risingwave_hummock_sdk::table_stats::{TableStatsMap, to_prost_table_stats_map};
68use risingwave_hummock_sdk::{
69 HummockCompactionTaskId, HummockSstableObjectId, LocalSstableInfo, compact_task_to_string,
70};
71use risingwave_pb::hummock::compact_task::TaskStatus;
72use risingwave_pb::hummock::subscribe_compaction_event_request::{
73 Event as RequestEvent, HeartBeat, PullTask, ReportTask,
74};
75use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
76use risingwave_pb::hummock::{
77 CompactTaskProgress, PbSstableFilterType, ReportCompactionTaskRequest,
78 SubscribeCompactionEventRequest, SubscribeCompactionEventResponse,
79};
80use risingwave_rpc_client::HummockMetaClient;
81pub use shared_buffer_compact::compact;
82use tokio::sync::oneshot::Sender;
83use tokio::task::JoinHandle;
84use tokio::time::Instant;
85
86pub use self::compaction_utils::{
87 CompactionStatistics, RemoteBuilderFactory, TaskConfig, check_compaction_result,
88 check_flush_result,
89};
90pub use self::task_progress::TaskProgress;
91use super::multi_builder::CapacitySplitTableBuilder;
92use super::{
93 GetObjectId, HummockErrorInner, HummockResult, ObjectIdManager, SstableBuilderOptions,
94 Xor8FilterBuilder, Xor16FilterBuilder,
95};
96use crate::compaction_catalog_manager::{
97 CompactionCatalogAgentRef, CompactionCatalogManager, CompactionCatalogManagerRef,
98};
99use crate::hummock::compactor::compaction_utils::calculate_task_parallelism;
100use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done};
101use crate::hummock::compactor::iceberg_compaction::TaskKey;
102use crate::hummock::iterator::{Forward, HummockIterator};
103use crate::hummock::{
104 BlockedXor8FilterBuilder, BlockedXor16FilterBuilder, FilterBuilder,
105 SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory,
106 validate_ssts,
107};
108use crate::monitor::CompactorMetrics;
109
110const COMPACTION_HEARTBEAT_LOG_INTERVAL: Duration = Duration::from_secs(60);
112
113#[derive(Debug, Clone, PartialEq, Eq)]
115struct CompactionLogState {
116 running_parallelism: u32,
117 pull_task_ack: bool,
118 pending_pull_task_count: u32,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123struct IcebergCompactionLogState {
124 running_parallelism: u32,
125 waiting_parallelism: u32,
126 available_parallelism: u32,
127 pull_task_ack: bool,
128 pending_pull_task_count: u32,
129}
130
131struct LogThrottler<T: PartialEq> {
133 last_logged_state: Option<T>,
134 last_heartbeat: Instant,
135 heartbeat_interval: Duration,
136}
137
138impl<T: PartialEq> LogThrottler<T> {
139 fn new(heartbeat_interval: Duration) -> Self {
140 Self {
141 last_logged_state: None,
142 last_heartbeat: Instant::now(),
143 heartbeat_interval,
144 }
145 }
146
147 fn should_log(&self, current_state: &T) -> bool {
149 self.last_logged_state.as_ref() != Some(current_state)
150 || self.last_heartbeat.elapsed() >= self.heartbeat_interval
151 }
152
153 fn update(&mut self, current_state: T) {
155 self.last_logged_state = Some(current_state);
156 self.last_heartbeat = Instant::now();
157 }
158}
159
160pub struct Compactor {
162 context: CompactorContext,
164 object_id_getter: Arc<dyn GetObjectId>,
165 task_config: TaskConfig,
166 options: SstableBuilderOptions,
167 get_id_time: Arc<AtomicU64>,
168}
169
170pub type CompactOutput = (usize, Vec<LocalSstableInfo>, CompactionStatistics);
171
172impl Compactor {
173 pub fn new(
175 context: CompactorContext,
176 options: SstableBuilderOptions,
177 task_config: TaskConfig,
178 object_id_getter: Arc<dyn GetObjectId>,
179 ) -> Self {
180 Self {
181 context,
182 options,
183 task_config,
184 get_id_time: Arc::new(AtomicU64::new(0)),
185 object_id_getter,
186 }
187 }
188
189 async fn compact_key_range(
194 &self,
195 iter: impl HummockIterator<Direction = Forward>,
196 compaction_filter: impl CompactionFilter,
197 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
198 task_progress: Option<Arc<TaskProgress>>,
199 task_id: Option<HummockCompactionTaskId>,
200 split_index: Option<usize>,
201 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
202 let compact_timer = if self.context.is_share_buffer_compact {
204 self.context
205 .compactor_metrics
206 .write_build_l0_sst_duration
207 .start_timer()
208 } else {
209 self.context
210 .compactor_metrics
211 .compact_sst_duration
212 .start_timer()
213 };
214
215 let (split_table_outputs, table_stats_map) = {
216 let factory = UnifiedSstableWriterFactory::new(self.context.sstable_store.clone());
217 match (
218 self.task_config.sstable_filter_kind,
219 self.task_config.use_block_based_filter,
220 ) {
221 (PbSstableFilterType::SstableFilterXor8, true) => {
222 self.compact_key_range_impl::<_, BlockedXor8FilterBuilder>(
223 factory,
224 iter,
225 compaction_filter,
226 compaction_catalog_agent_ref,
227 task_progress.clone(),
228 self.object_id_getter.clone(),
229 )
230 .instrument_await("compact".verbose())
231 .await?
232 }
233 (PbSstableFilterType::SstableFilterXor8, false) => {
234 self.compact_key_range_impl::<_, Xor8FilterBuilder>(
235 factory,
236 iter,
237 compaction_filter,
238 compaction_catalog_agent_ref,
239 task_progress.clone(),
240 self.object_id_getter.clone(),
241 )
242 .instrument_await("compact".verbose())
243 .await?
244 }
245 (PbSstableFilterType::SstableFilterXor16, true) => {
246 self.compact_key_range_impl::<_, BlockedXor16FilterBuilder>(
247 factory,
248 iter,
249 compaction_filter,
250 compaction_catalog_agent_ref,
251 task_progress.clone(),
252 self.object_id_getter.clone(),
253 )
254 .instrument_await("compact".verbose())
255 .await?
256 }
257 (PbSstableFilterType::SstableFilterXor16, false) => {
258 self.compact_key_range_impl::<_, Xor16FilterBuilder>(
259 factory,
260 iter,
261 compaction_filter,
262 compaction_catalog_agent_ref,
263 task_progress.clone(),
264 self.object_id_getter.clone(),
265 )
266 .instrument_await("compact".verbose())
267 .await?
268 }
269 (kind, _) => unreachable!("unsupported sstable filter kind in compactor: {kind:?}"),
270 }
271 };
272
273 compact_timer.observe_duration();
274
275 Self::report_progress(
276 self.context.compactor_metrics.clone(),
277 task_progress,
278 &split_table_outputs,
279 self.context.is_share_buffer_compact,
280 );
281
282 self.context
283 .compactor_metrics
284 .get_table_id_total_time_duration
285 .observe(self.get_id_time.load(Ordering::Relaxed) as f64 / 1000.0 / 1000.0);
286
287 debug_assert!(
288 split_table_outputs
289 .iter()
290 .all(|table_info| table_info.sst_info.table_ids.is_sorted())
291 );
292
293 if task_id.is_some() {
294 tracing::info!(
296 "Finish Task {:?} split_index {:?} sst count {}",
297 task_id,
298 split_index,
299 split_table_outputs.len()
300 );
301 }
302 Ok((split_table_outputs, table_stats_map))
303 }
304
305 pub fn report_progress(
306 metrics: Arc<CompactorMetrics>,
307 task_progress: Option<Arc<TaskProgress>>,
308 ssts: &Vec<LocalSstableInfo>,
309 is_share_buffer_compact: bool,
310 ) {
311 for sst_info in ssts {
312 let sst_size = sst_info.file_size();
313 if let Some(tracker) = &task_progress {
314 tracker.inc_ssts_uploaded();
315 tracker.dec_num_pending_write_io();
316 }
317 if is_share_buffer_compact {
318 metrics.shared_buffer_to_sstable_size.observe(sst_size as _);
319 } else {
320 metrics.compaction_upload_sst_counts.inc();
321 }
322 }
323 }
324
325 async fn compact_key_range_impl<F: SstableWriterFactory, B: FilterBuilder>(
326 &self,
327 writer_factory: F,
328 iter: impl HummockIterator<Direction = Forward>,
329 compaction_filter: impl CompactionFilter,
330 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331 task_progress: Option<Arc<TaskProgress>>,
332 object_id_getter: Arc<dyn GetObjectId>,
333 ) -> HummockResult<(Vec<LocalSstableInfo>, CompactionStatistics)> {
334 let builder_factory = RemoteBuilderFactory::<F, B> {
335 object_id_getter,
336 limiter: self.context.memory_limiter.clone(),
337 options: self.options.clone(),
338 policy: self.task_config.cache_policy,
339 remote_rpc_cost: self.get_id_time.clone(),
340 compaction_catalog_agent_ref: compaction_catalog_agent_ref.clone(),
341 sstable_writer_factory: writer_factory,
342 _phantom: PhantomData,
343 };
344
345 let mut sst_builder = CapacitySplitTableBuilder::new(
346 builder_factory,
347 self.context.compactor_metrics.clone(),
348 task_progress.clone(),
349 self.task_config.table_vnode_partition.clone(),
350 self.context
351 .storage_opts
352 .compactor_concurrent_uploading_sst_count,
353 compaction_catalog_agent_ref,
354 );
355 let compaction_statistics = compact_and_build_sst(
356 &mut sst_builder,
357 &self.task_config,
358 self.context.compactor_metrics.clone(),
359 iter,
360 compaction_filter,
361 )
362 .instrument_await("compact_and_build_sst".verbose())
363 .await?;
364
365 let ssts = sst_builder
366 .finish()
367 .instrument_await("builder_finish".verbose())
368 .await?;
369
370 Ok((ssts, compaction_statistics))
371 }
372}
373
374#[must_use]
377pub fn start_iceberg_compactor(
378 compactor_context: CompactorContext,
379 hummock_meta_client: Arc<dyn HummockMetaClient>,
380) -> (JoinHandle<()>, Sender<()>) {
381 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
382 let stream_retry_interval = Duration::from_secs(30);
383 let periodic_event_update_interval = Duration::from_millis(
384 compactor_context
385 .storage_opts
386 .iceberg_compaction_pull_interval_ms,
387 );
388 let worker_num = compactor_context.compaction_executor.worker_num();
389
390 let max_task_parallelism: u32 = (worker_num as f32
391 * compactor_context.storage_opts.compactor_max_task_multiplier)
392 .ceil() as u32;
393
394 const MAX_PULL_TASK_COUNT: u32 = 4;
395 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
396
397 assert_ge!(
398 compactor_context.storage_opts.compactor_max_task_multiplier,
399 0.0
400 );
401
402 let join_handle = tokio::spawn(async move {
403 let pending_parallelism_budget = (max_task_parallelism as f32
405 * compactor_context
406 .storage_opts
407 .iceberg_compaction_pending_parallelism_budget_multiplier)
408 .ceil() as u32;
409 let mut task_queue =
410 IcebergTaskQueue::new(max_task_parallelism, pending_parallelism_budget);
411
412 let shutdown_map = Arc::new(Mutex::new(HashMap::<TaskKey, Sender<()>>::new()));
414
415 let (task_completion_tx, mut task_completion_rx) =
417 tokio::sync::mpsc::unbounded_channel::<IcebergPlanCompletion>();
418 let mut task_trackers = HashMap::<u64, IcebergTaskTracker>::new();
419 let mut pending_task_reports = VecDeque::<IcebergTaskReport>::new();
422
423 let mut min_interval = tokio::time::interval(stream_retry_interval);
424 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
425
426 let mut log_throttler =
428 LogThrottler::<IcebergCompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
429
430 'start_stream: loop {
432 let mut pull_task_ack = true;
435 tokio::select! {
436 _ = min_interval.tick() => {},
438 _ = &mut shutdown_rx => {
440 tracing::info!("Compactor is shutting down");
441 return;
442 }
443 }
444
445 let (request_sender, response_event_stream) = match hummock_meta_client
446 .subscribe_iceberg_compaction_event()
447 .await
448 {
449 Ok((request_sender, response_event_stream)) => {
450 tracing::debug!("Succeeded subscribe_iceberg_compaction_event.");
451 (request_sender, response_event_stream)
452 }
453
454 Err(e) => {
455 tracing::warn!(
456 error = %e.as_report(),
457 "Subscribing to iceberg compaction tasks failed with error. Will retry.",
458 );
459 continue 'start_stream;
460 }
461 };
462
463 if matches!(
464 flush_pending_iceberg_task_reports(&request_sender, &mut pending_task_reports),
465 ReportSendResult::RestartStream
466 ) {
467 continue 'start_stream;
468 }
469
470 pin_mut!(response_event_stream);
471
472 let _executor = compactor_context.compaction_executor.clone();
473
474 let mut event_loop_iteration_now = Instant::now();
476 'consume_stream: loop {
477 {
478 compactor_context
480 .compactor_metrics
481 .compaction_event_loop_iteration_latency
482 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
483 event_loop_iteration_now = Instant::now();
484 }
485
486 let request_sender = request_sender.clone();
487 let event: Option<Result<SubscribeIcebergCompactionEventResponse, _>> = tokio::select! {
488 Some(plan_completion) = task_completion_rx.recv() => {
490 let task_key = plan_completion.task_key;
491 let error_message = plan_completion.error_message;
492 tracing::debug!(
493 task_id = task_key.0,
494 plan_index = task_key.1,
495 success = error_message.is_none(),
496 "Plan completed, updating queue state"
497 );
498 task_queue.finish_running(task_key);
499
500 let completed_task_id = task_key.0;
501 let Entry::Occupied(mut tracker_entry) =
502 task_trackers.entry(completed_task_id)
503 else {
504 continue 'consume_stream;
505 };
506 tracker_entry.get_mut().record_completion(error_message);
507 if !tracker_entry.get().is_finished() {
508 continue 'consume_stream;
509 }
510
511 let report = tracker_entry.remove().into_report(completed_task_id);
512 if matches!(
513 send_or_buffer_iceberg_task_report(
514 &request_sender,
515 &mut pending_task_reports,
516 report,
517 ),
518 ReportSendResult::RestartStream
519 ) {
520 continue 'start_stream;
521 }
522 continue 'consume_stream;
523 }
524
525 _ = task_queue.wait_schedulable() => {
527 schedule_queued_tasks(
528 &mut task_queue,
529 &compactor_context,
530 &shutdown_map,
531 &task_completion_tx,
532 );
533 continue 'consume_stream;
534 }
535
536 _ = periodic_event_interval.tick() => {
537 let should_restart_stream = handle_meta_task_pulling(
539 &mut pull_task_ack,
540 &task_queue,
541 max_task_parallelism,
542 max_pull_task_count,
543 &request_sender,
544 &mut log_throttler,
545 );
546
547 if should_restart_stream {
548 continue 'start_stream;
549 }
550 continue;
551 }
552 event = response_event_stream.next() => {
553 event
554 }
555
556 _ = &mut shutdown_rx => {
557 tracing::info!("Iceberg Compactor is shutting down");
558 return
559 }
560 };
561
562 match event {
563 Some(Ok(SubscribeIcebergCompactionEventResponse {
564 event,
565 create_at: _create_at,
566 })) => {
567 let event = match event {
568 Some(event) => event,
569 None => continue 'consume_stream,
570 };
571
572 match event {
573 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CompactTask(iceberg_compaction_task) => {
574 let task_id = iceberg_compaction_task.task_id;
575 let sink_id = iceberg_compaction_task.sink_id;
576 let compactor_runner_config = match IcebergCompactorRunnerConfigBuilder::default()
578 .max_parallelism((worker_num as f32 * compactor_context.storage_opts.iceberg_compaction_task_parallelism_ratio) as u32)
579 .min_size_per_partition(compactor_context.storage_opts.iceberg_compaction_min_size_per_partition_mb as u64 * 1024 * 1024)
580 .max_file_count_per_partition(compactor_context.storage_opts.iceberg_compaction_max_file_count_per_partition)
581 .enable_validate_compaction(compactor_context.storage_opts.iceberg_compaction_enable_validate)
582 .max_record_batch_rows(compactor_context.storage_opts.iceberg_compaction_max_record_batch_rows)
583 .enable_heuristic_output_parallelism(compactor_context.storage_opts.iceberg_compaction_enable_heuristic_output_parallelism)
584 .max_concurrent_closes(compactor_context.storage_opts.iceberg_compaction_max_concurrent_closes)
585 .target_binpack_group_size_mb(
586 compactor_context.storage_opts.iceberg_compaction_target_binpack_group_size_mb
587 )
588 .min_group_size_mb(
589 compactor_context.storage_opts.iceberg_compaction_min_group_size_mb
590 )
591 .min_group_file_count(
592 compactor_context.storage_opts.iceberg_compaction_min_group_file_count
593 )
594 .build() {
595 Ok(config) => config,
596 Err(e) => {
597 tracing::warn!(error = %e.as_report(), "Failed to build iceberg compactor runner config {}", task_id);
598 let report = build_iceberg_task_report(
599 task_id,
600 sink_id,
601 Some(format!(
602 "Failed to build iceberg compactor runner config: {}",
603 e.as_report()
604 )),
605 );
606 if matches!(
607 send_or_buffer_iceberg_task_report(
608 &request_sender,
609 &mut pending_task_reports,
610 report,
611 ),
612 ReportSendResult::RestartStream
613 ) {
614 continue 'start_stream;
615 }
616 continue 'consume_stream;
617 }
618 };
619
620 let task_execution = match create_task_execution(
622 iceberg_compaction_task,
623 compactor_runner_config,
624 compactor_context.compactor_metrics.clone(),
625 ).await {
626 Ok(task_execution) => task_execution,
627 Err(e) => {
628 tracing::warn!(error = %e.as_report(), task_id, "Failed to create plan runners");
629 let report = build_iceberg_task_report(
630 task_id,
631 sink_id,
632 Some(format!(
633 "Failed to create iceberg compaction task execution: {}",
634 e.as_report()
635 )),
636 );
637 if matches!(
638 send_or_buffer_iceberg_task_report(
639 &request_sender,
640 &mut pending_task_reports,
641 report,
642 ),
643 ReportSendResult::RestartStream
644 ) {
645 continue 'start_stream;
646 }
647 continue 'consume_stream;
648 }
649 };
650
651 let sink_id = task_execution.sink_id;
652 let plan_runners = task_execution.plan_runners;
653
654 if plan_runners.is_empty() {
655 tracing::info!(task_id, sink_id, "No plans to execute");
656 let report = build_iceberg_task_report(task_id, sink_id, None);
657 if matches!(
658 send_or_buffer_iceberg_task_report(
659 &request_sender,
660 &mut pending_task_reports,
661 report,
662 ),
663 ReportSendResult::RestartStream
664 ) {
665 continue 'start_stream;
666 }
667 continue 'consume_stream;
668 }
669
670 let total_plans = plan_runners.len();
672 let mut enqueued_count = 0;
673
674 for runner in plan_runners {
675 let meta = runner.to_meta();
676 let plan_index = meta.plan_index;
677 let required_parallelism = runner.required_parallelism();
678 let push_result = task_queue.push(meta, Some(runner));
679
680 match push_result {
681 PushResult::Added => {
682 enqueued_count += 1;
683 tracing::debug!(
684 task_id = task_id,
685 plan_index = plan_index,
686 required_parallelism = required_parallelism,
687 "Iceberg plan runner added to queue"
688 );
689 },
690 PushResult::RejectedCapacity => {
691 tracing::warn!(
692 task_id = task_id,
693 required_parallelism = required_parallelism,
694 pending_budget = pending_parallelism_budget,
695 enqueued_count = enqueued_count,
696 total_plans = total_plans,
697 "Iceberg plan runner rejected - queue capacity exceeded"
698 );
699 break;
701 },
702 PushResult::RejectedTooLarge => {
703 tracing::error!(
704 task_id = task_id,
705 required_parallelism = required_parallelism,
706 max_parallelism = max_task_parallelism,
707 "Iceberg plan runner rejected - parallelism exceeds max"
708 );
709 },
710 PushResult::RejectedInvalidParallelism => {
711 tracing::error!(
712 task_id = task_id,
713 required_parallelism = required_parallelism,
714 "Iceberg plan runner rejected - invalid parallelism"
715 );
716 },
717 PushResult::RejectedDuplicate => {
718 tracing::error!(
719 task_id = task_id,
720 plan_index = plan_index,
721 "Iceberg plan runner rejected - duplicate (task_id, plan_index)"
722 );
723 }
724 }
725 }
726
727 if enqueued_count == 0 {
728 let report = build_iceberg_task_report(
729 task_id,
730 sink_id,
731 Some("Failed to enqueue all iceberg compaction plans".to_owned()),
732 );
733 if matches!(
734 send_or_buffer_iceberg_task_report(
735 &request_sender,
736 &mut pending_task_reports,
737 report,
738 ),
739 ReportSendResult::RestartStream
740 ) {
741 continue 'start_stream;
742 }
743 } else {
744 task_trackers.insert(
745 task_id,
746 IcebergTaskTracker::new(sink_id, enqueued_count),
747 );
748 }
749
750 tracing::info!(
751 task_id = task_id,
752 sink_id = sink_id,
753 total_plans = total_plans,
754 enqueued_count = enqueued_count,
755 "Enqueued {} of {} Iceberg plan runners",
756 enqueued_count,
757 total_plans
758 );
759 },
760 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::PullTaskAck(_) => {
761 pull_task_ack = true;
763 },
764 risingwave_pb::iceberg_compaction::subscribe_iceberg_compaction_event_response::Event::CancelCompactTask(cancel_compact_task) => {
765 cancel_iceberg_task(
766 cancel_compact_task.task_id,
767 &mut task_queue,
768 &shutdown_map,
769 );
770 },
771 }
772 }
773 Some(Err(e)) => {
774 tracing::warn!("Failed to consume stream. {}", e.message());
775 continue 'start_stream;
776 }
777 _ => {
778 continue 'start_stream;
780 }
781 }
782 }
783 }
784 });
785
786 (join_handle, shutdown_tx)
787}
788
789#[must_use]
792pub fn start_compactor(
793 compactor_context: CompactorContext,
794 hummock_meta_client: Arc<dyn HummockMetaClient>,
795 object_id_manager: Arc<ObjectIdManager>,
796 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
797) -> (JoinHandle<()>, Sender<()>) {
798 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
799 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
800 let stream_retry_interval = Duration::from_secs(30);
801 let task_progress = compactor_context.task_progress_manager.clone();
802 let periodic_event_update_interval = Duration::from_millis(1000);
803
804 let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32
805 * compactor_context.storage_opts.compactor_max_task_multiplier)
806 .ceil() as u32;
807 let running_task_parallelism = Arc::new(AtomicU32::new(0));
808
809 const MAX_PULL_TASK_COUNT: u32 = 4;
810 let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT);
811
812 assert_ge!(
813 compactor_context.storage_opts.compactor_max_task_multiplier,
814 0.0
815 );
816
817 let join_handle = tokio::spawn(async move {
818 let shutdown_map = CompactionShutdownMap::default();
819 let mut min_interval = tokio::time::interval(stream_retry_interval);
820 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
821
822 let mut log_throttler =
824 LogThrottler::<CompactionLogState>::new(COMPACTION_HEARTBEAT_LOG_INTERVAL);
825
826 'start_stream: loop {
828 let mut pull_task_ack = true;
831 tokio::select! {
832 _ = min_interval.tick() => {},
834 _ = &mut shutdown_rx => {
836 tracing::info!("Compactor is shutting down");
837 return;
838 }
839 }
840
841 let (request_sender, response_event_stream) =
842 match hummock_meta_client.subscribe_compaction_event().await {
843 Ok((request_sender, response_event_stream)) => {
844 tracing::debug!("Succeeded subscribe_compaction_event.");
845 (request_sender, response_event_stream)
846 }
847
848 Err(e) => {
849 tracing::warn!(
850 error = %e.as_report(),
851 "Subscribing to compaction tasks failed with error. Will retry.",
852 );
853 continue 'start_stream;
854 }
855 };
856
857 pin_mut!(response_event_stream);
858
859 let executor = compactor_context.compaction_executor.clone();
860 let object_id_manager = object_id_manager.clone();
861
862 let mut event_loop_iteration_now = Instant::now();
864 'consume_stream: loop {
865 {
866 compactor_context
868 .compactor_metrics
869 .compaction_event_loop_iteration_latency
870 .observe(event_loop_iteration_now.elapsed().as_millis() as _);
871 event_loop_iteration_now = Instant::now();
872 }
873
874 let running_task_parallelism = running_task_parallelism.clone();
875 let request_sender = request_sender.clone();
876 let event: Option<Result<SubscribeCompactionEventResponse, _>> = tokio::select! {
877 _ = periodic_event_interval.tick() => {
878 let progress_list = get_task_progress(task_progress.clone());
879
880 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
881 event: Some(RequestEvent::HeartBeat(
882 HeartBeat {
883 progress: progress_list
884 }
885 )),
886 create_at: SystemTime::now()
887 .duration_since(std::time::UNIX_EPOCH)
888 .expect("Clock may have gone backwards")
889 .as_millis() as u64,
890 }) {
891 tracing::warn!(error = %e.as_report(), "Failed to report task progress");
892 continue 'start_stream;
894 }
895
896
897 let mut pending_pull_task_count = 0;
898 if pull_task_ack {
899 pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).min(max_pull_task_count);
901
902 if pending_pull_task_count > 0 {
903 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
904 event: Some(RequestEvent::PullTask(
905 PullTask {
906 pull_task_count: pending_pull_task_count,
907 }
908 )),
909 create_at: SystemTime::now()
910 .duration_since(std::time::UNIX_EPOCH)
911 .expect("Clock may have gone backwards")
912 .as_millis() as u64,
913 }) {
914 tracing::warn!(error = %e.as_report(), "Failed to pull task");
915
916 continue 'start_stream;
918 } else {
919 pull_task_ack = false;
920 }
921 }
922 }
923
924 let running_count = running_task_parallelism.load(Ordering::SeqCst);
925 let current_state = CompactionLogState {
926 running_parallelism: running_count,
927 pull_task_ack,
928 pending_pull_task_count,
929 };
930
931 if log_throttler.should_log(¤t_state) {
933 tracing::info!(
934 running_parallelism_count = %current_state.running_parallelism,
935 pull_task_ack = %current_state.pull_task_ack,
936 pending_pull_task_count = %current_state.pending_pull_task_count
937 );
938 log_throttler.update(current_state);
939 }
940
941 continue;
942 }
943 event = response_event_stream.next() => {
944 event
945 }
946
947 _ = &mut shutdown_rx => {
948 tracing::info!("Compactor is shutting down");
949 return
950 }
951 };
952
953 fn send_report_task_event(
954 compact_task: &CompactTask,
955 table_stats: TableStatsMap,
956 object_timestamps: HashMap<HummockSstableObjectId, u64>,
957 request_sender: &mpsc::UnboundedSender<SubscribeCompactionEventRequest>,
958 ) {
959 if let Err(e) = request_sender.send(SubscribeCompactionEventRequest {
960 event: Some(RequestEvent::ReportTask(ReportTask {
961 task_id: compact_task.task_id,
962 task_status: compact_task.task_status.into(),
963 sorted_output_ssts: compact_task
964 .sorted_output_ssts
965 .iter()
966 .map(|sst| sst.into())
967 .collect(),
968 table_stats_change: to_prost_table_stats_map(table_stats),
969 object_timestamps,
970 })),
971 create_at: SystemTime::now()
972 .duration_since(std::time::UNIX_EPOCH)
973 .expect("Clock may have gone backwards")
974 .as_millis() as u64,
975 }) {
976 let task_id = compact_task.task_id;
977 tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}");
978 }
979 }
980
981 match event {
982 Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => {
983 let event = match event {
984 Some(event) => event,
985 None => continue 'consume_stream,
986 };
987 let shutdown = shutdown_map.clone();
988 let context = compactor_context.clone();
989 let consumed_latency_ms = SystemTime::now()
990 .duration_since(std::time::UNIX_EPOCH)
991 .expect("Clock may have gone backwards")
992 .as_millis() as u64
993 - create_at;
994 context
995 .compactor_metrics
996 .compaction_event_consumed_latency
997 .observe(consumed_latency_ms as _);
998
999 let object_id_manager = object_id_manager.clone();
1000 let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone();
1001
1002 match event {
1003 ResponseEvent::CompactTask(compact_task) => {
1004 let compact_task = CompactTask::from(compact_task);
1005 let parallelism =
1006 calculate_task_parallelism(&compact_task, &context);
1007
1008 assert_ne!(parallelism, 0, "splits cannot be empty");
1009
1010 if (max_task_parallelism
1011 - running_task_parallelism.load(Ordering::SeqCst))
1012 < parallelism as u32
1013 {
1014 tracing::warn!(
1015 "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}",
1016 compact_task.task_id,
1017 parallelism,
1018 max_task_parallelism,
1019 running_task_parallelism.load(Ordering::Relaxed),
1020 );
1021 let (compact_task, table_stats, object_timestamps) =
1022 compact_done(
1023 compact_task,
1024 context.clone(),
1025 vec![],
1026 TaskStatus::NoAvailCpuResourceCanceled,
1027 );
1028
1029 send_report_task_event(
1030 &compact_task,
1031 table_stats,
1032 object_timestamps,
1033 &request_sender,
1034 );
1035
1036 continue 'consume_stream;
1037 }
1038
1039 running_task_parallelism
1040 .fetch_add(parallelism as u32, Ordering::SeqCst);
1041 executor.spawn(async move {
1042 let (tx, rx) = tokio::sync::oneshot::channel();
1043 let task_id = compact_task.task_id;
1044 shutdown.lock().unwrap().insert(task_id, tx);
1045
1046 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact(
1047 context.clone(),
1048 compact_task,
1049 rx,
1050 object_id_manager.clone(),
1051 compaction_catalog_manager_ref.clone(),
1052 )
1053 .await;
1054
1055 shutdown.lock().unwrap().remove(&task_id);
1056 running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst);
1057
1058 send_report_task_event(
1059 &compact_task,
1060 table_stats,
1061 object_timestamps,
1062 &request_sender,
1063 );
1064
1065 let enable_check_compaction_result =
1066 context.storage_opts.check_compaction_result;
1067 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1068
1069 if enable_check_compaction_result && need_check_task {
1070 let read_table_ids = compact_task
1071 .get_table_ids_from_input_ssts()
1072 .collect::<Vec<_>>();
1073 match compaction_catalog_manager_ref.acquire(read_table_ids.into_iter().collect()).await {
1074 Ok(compaction_catalog_agent_ref) => {
1075 match check_compaction_result(&compact_task, context.clone(), compaction_catalog_agent_ref).await
1076 {
1077 Err(e) => {
1078 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id);
1079 }
1080 Ok(true) => (),
1081 Ok(false) => {
1082 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1083 }
1084 }
1085 },
1086 Err(e) => {
1087 tracing::warn!(error = %e.as_report(), "failed to acquire compaction catalog agent");
1088 }
1089 }
1090 }
1091 });
1092 }
1093 #[expect(deprecated)]
1094 ResponseEvent::VacuumTask(_) => {
1095 unreachable!("unexpected vacuum task");
1096 }
1097 #[expect(deprecated)]
1098 ResponseEvent::FullScanTask(_) => {
1099 unreachable!("unexpected scan task");
1100 }
1101 #[expect(deprecated)]
1102 ResponseEvent::ValidationTask(validation_task) => {
1103 let validation_task = ValidationTask::from(validation_task);
1104 executor.spawn(async move {
1105 validate_ssts(validation_task, context.sstable_store.clone())
1106 .await;
1107 });
1108 }
1109 ResponseEvent::CancelCompactTask(cancel_compact_task) => match shutdown
1110 .lock()
1111 .unwrap()
1112 .remove(&cancel_compact_task.task_id)
1113 {
1114 Some(tx) => {
1115 if tx.send(()).is_err() {
1116 tracing::warn!(
1117 "Cancellation of compaction task failed. task_id: {}",
1118 cancel_compact_task.task_id
1119 );
1120 }
1121 }
1122 _ => {
1123 tracing::warn!(
1124 "Attempting to cancel non-existent compaction task. task_id: {}",
1125 cancel_compact_task.task_id
1126 );
1127 }
1128 },
1129
1130 ResponseEvent::PullTaskAck(_pull_task_ack) => {
1131 pull_task_ack = true;
1133 }
1134 }
1135 }
1136 Some(Err(e)) => {
1137 tracing::warn!("Failed to consume stream. {}", e.message());
1138 continue 'start_stream;
1139 }
1140 _ => {
1141 continue 'start_stream;
1143 }
1144 }
1145 }
1146 }
1147 });
1148
1149 (join_handle, shutdown_tx)
1150}
1151
1152#[must_use]
1155pub fn start_shared_compactor(
1156 grpc_proxy_client: GrpcCompactorProxyClient,
1157 mut receiver: mpsc::UnboundedReceiver<Request<DispatchCompactionTaskRequest>>,
1158 context: CompactorContext,
1159) -> (JoinHandle<()>, Sender<()>) {
1160 type CompactionShutdownMap = Arc<Mutex<HashMap<u64, Sender<()>>>>;
1161 let task_progress = context.task_progress_manager.clone();
1162 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
1163 let periodic_event_update_interval = Duration::from_millis(1000);
1164
1165 let join_handle = tokio::spawn(async move {
1166 let shutdown_map = CompactionShutdownMap::default();
1167
1168 let mut periodic_event_interval = tokio::time::interval(periodic_event_update_interval);
1169 let executor = context.compaction_executor.clone();
1170 let report_heartbeat_client = grpc_proxy_client.clone();
1171 'consume_stream: loop {
1172 let request: Option<Request<DispatchCompactionTaskRequest>> = tokio::select! {
1173 _ = periodic_event_interval.tick() => {
1174 let progress_list = get_task_progress(task_progress.clone());
1175 let report_compaction_task_request = ReportCompactionTaskRequest{
1176 event: Some(ReportCompactionTaskEvent::HeartBeat(
1177 SharedHeartBeat {
1178 progress: progress_list
1179 }
1180 )),
1181 };
1182 if let Err(e) = report_heartbeat_client.report_compaction_task(report_compaction_task_request).await{
1183 tracing::warn!(error = %e.as_report(), "Failed to report heartbeat");
1184 }
1185 continue
1186 }
1187
1188
1189 _ = &mut shutdown_rx => {
1190 tracing::info!("Compactor is shutting down");
1191 return
1192 }
1193
1194 request = receiver.recv() => {
1195 request
1196 }
1197
1198 };
1199 match request {
1200 Some(request) => {
1201 let context = context.clone();
1202 let shutdown = shutdown_map.clone();
1203
1204 let cloned_grpc_proxy_client = grpc_proxy_client.clone();
1205 executor.spawn(async move {
1206 let DispatchCompactionTaskRequest {
1207 tables,
1208 output_object_ids,
1209 task: dispatch_task,
1210 } = request.into_inner();
1211 let table_id_to_catalog = tables.into_iter().fold(HashMap::new(), |mut acc, table| {
1212 acc.insert(table.id, table);
1213 acc
1214 });
1215
1216 let mut output_object_ids_deque: VecDeque<_> = VecDeque::new();
1217 output_object_ids_deque.extend(output_object_ids.into_iter().map(Into::<HummockSstableObjectId>::into));
1218 let shared_compactor_object_id_manager =
1219 SharedComapctorObjectIdManager::new(output_object_ids_deque, cloned_grpc_proxy_client.clone(), context.storage_opts.sstable_id_remote_fetch_number);
1220 match dispatch_task.unwrap() {
1221 dispatch_compaction_task_request::Task::CompactTask(compact_task) => {
1222 let compact_task = CompactTask::from(&compact_task);
1223 let (tx, rx) = tokio::sync::oneshot::channel();
1224 let task_id = compact_task.task_id;
1225 shutdown.lock().unwrap().insert(task_id, tx);
1226
1227 let compaction_catalog_agent_ref = CompactionCatalogManager::build_compaction_catalog_agent(table_id_to_catalog);
1228 let ((compact_task, table_stats, object_timestamps), _memory_tracker)= compactor_runner::compact_with_agent(
1229 context.clone(),
1230 compact_task,
1231 rx,
1232 shared_compactor_object_id_manager,
1233 compaction_catalog_agent_ref.clone(),
1234 )
1235 .await;
1236 shutdown.lock().unwrap().remove(&task_id);
1237 let report_compaction_task_request = ReportCompactionTaskRequest {
1238 event: Some(ReportCompactionTaskEvent::ReportTask(ReportSharedTask {
1239 compact_task: Some(PbCompactTask::from(&compact_task)),
1240 table_stats_change: to_prost_table_stats_map(table_stats),
1241 object_timestamps,
1242 })),
1243 };
1244
1245 match cloned_grpc_proxy_client
1246 .report_compaction_task(report_compaction_task_request)
1247 .await
1248 {
1249 Ok(_) => {
1250 let enable_check_compaction_result = context.storage_opts.check_compaction_result;
1252 let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status == TaskStatus::Success;
1253 if enable_check_compaction_result && need_check_task {
1254 match check_compaction_result(&compact_task, context.clone(),compaction_catalog_agent_ref).await {
1255 Err(e) => {
1256 tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}", task_id);
1257 },
1258 Ok(true) => (),
1259 Ok(false) => {
1260 panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task));
1261 }
1262 }
1263 }
1264 }
1265 Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"),
1266 }
1267
1268 }
1269 dispatch_compaction_task_request::Task::VacuumTask(_) => {
1270 unreachable!("unexpected vacuum task");
1271 }
1272 dispatch_compaction_task_request::Task::FullScanTask(_) => {
1273 unreachable!("unexpected scan task");
1274 }
1275 dispatch_compaction_task_request::Task::ValidationTask(validation_task) => {
1276 let validation_task = ValidationTask::from(validation_task);
1277 validate_ssts(validation_task, context.sstable_store.clone()).await;
1278 }
1279 dispatch_compaction_task_request::Task::CancelCompactTask(cancel_compact_task) => {
1280 match shutdown
1281 .lock()
1282 .unwrap()
1283 .remove(&cancel_compact_task.task_id)
1284 { Some(tx) => {
1285 if tx.send(()).is_err() {
1286 tracing::warn!(
1287 "Cancellation of compaction task failed. task_id: {}",
1288 cancel_compact_task.task_id
1289 );
1290 }
1291 } _ => {
1292 tracing::warn!(
1293 "Attempting to cancel non-existent compaction task. task_id: {}",
1294 cancel_compact_task.task_id
1295 );
1296 }}
1297 }
1298 }
1299 });
1300 }
1301 None => continue 'consume_stream,
1302 }
1303 }
1304 });
1305 (join_handle, shutdown_tx)
1306}
1307
1308fn get_task_progress(
1309 task_progress: Arc<
1310 parking_lot::lock_api::Mutex<parking_lot::RawMutex, HashMap<u64, Arc<TaskProgress>>>,
1311 >,
1312) -> Vec<CompactTaskProgress> {
1313 let mut progress_list = Vec::new();
1314 for (&task_id, progress) in &*task_progress.lock() {
1315 progress_list.push(progress.snapshot(task_id));
1316 }
1317 progress_list
1318}
1319
1320fn schedule_queued_tasks(
1322 task_queue: &mut IcebergTaskQueue,
1323 compactor_context: &CompactorContext,
1324 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1325 task_completion_tx: &tokio::sync::mpsc::UnboundedSender<IcebergPlanCompletion>,
1326) {
1327 while let Some(popped_task) = task_queue.pop() {
1328 let task_id = popped_task.meta.task_id;
1329 let plan_index = popped_task.meta.plan_index;
1330 let task_key = (task_id, plan_index);
1331
1332 let unique_ident = popped_task.runner.as_ref().map(|r| r.unique_ident());
1334
1335 let Some(runner) = popped_task.runner else {
1336 tracing::error!(
1337 task_id = task_id,
1338 plan_index = plan_index,
1339 "Popped task missing runner - this should not happen"
1340 );
1341 task_queue.finish_running(task_key);
1342 continue;
1343 };
1344
1345 let executor = compactor_context.compaction_executor.clone();
1346 let shutdown_map_clone = shutdown_map.clone();
1347 let completion_tx_clone = task_completion_tx.clone();
1348 let (tx, rx) = tokio::sync::oneshot::channel();
1349
1350 {
1351 let mut shutdown_guard = shutdown_map.lock().unwrap();
1352 shutdown_guard.insert(task_key, tx);
1353 }
1354
1355 tracing::info!(
1356 task_id = task_id,
1357 plan_index = plan_index,
1358 unique_ident = ?unique_ident,
1359 required_parallelism = popped_task.meta.required_parallelism,
1360 "Starting iceberg compaction task from queue"
1361 );
1362
1363 executor.spawn(async move {
1364 let _cleanup_guard = scopeguard::guard(shutdown_map_clone, move |shutdown_map| {
1365 let mut shutdown_guard = shutdown_map.lock().unwrap();
1366 shutdown_guard.remove(&task_key);
1367 });
1368
1369 let result = Box::pin(runner.compact(rx)).await;
1370
1371 let completion = match result {
1372 Ok(_) => IcebergPlanCompletion {
1373 task_key,
1374 error_message: None,
1375 },
1376 Err(e) => {
1377 if is_cancelled_iceberg_compaction_error(&e) {
1378 tracing::info!(
1379 task_id = task_key.0,
1380 plan_index = task_key.1,
1381 "Iceberg compaction plan cancelled"
1382 );
1383 } else {
1384 tracing::warn!(
1385 error = %e.as_report(),
1386 task_id = task_key.0,
1387 plan_index = task_key.1,
1388 "Failed to compact iceberg runner"
1389 );
1390 }
1391 IcebergPlanCompletion {
1392 task_key,
1393 error_message: Some(e.to_report_string()),
1394 }
1395 }
1396 };
1397
1398 if completion_tx_clone.send(completion).is_err() {
1399 tracing::warn!(
1400 task_id = task_key.0,
1401 plan_index = task_key.1,
1402 "Failed to notify task completion - main loop may have shut down"
1403 );
1404 }
1405 });
1406 }
1407}
1408
1409fn is_cancelled_iceberg_compaction_error(error: &crate::hummock::HummockError) -> bool {
1410 matches!(
1411 error.inner(),
1412 HummockErrorInner::CompactionExecutor(message) if message == "Plan cancelled"
1413 )
1414}
1415
1416fn cancel_iceberg_task(
1417 task_id: u64,
1418 task_queue: &mut IcebergTaskQueue,
1419 shutdown_map: &Arc<Mutex<HashMap<TaskKey, Sender<()>>>>,
1420) {
1421 let cancelled_waiting = task_queue.cancel_waiting_task(task_id);
1426
1427 let cancelled_running = {
1428 let mut shutdown_guard = shutdown_map.lock().unwrap();
1429 let task_keys: Vec<_> = shutdown_guard
1430 .keys()
1431 .filter(|(running_task_id, _)| *running_task_id == task_id)
1432 .copied()
1433 .collect();
1434
1435 for task_key in &task_keys {
1436 if let Some(tx) = shutdown_guard.remove(task_key)
1437 && tx.send(()).is_err()
1438 {
1439 tracing::debug!(
1440 task_id = task_key.0,
1441 plan_index = task_key.1,
1442 "Iceberg compaction plan shutdown receiver already closed during cancellation"
1443 );
1444 }
1445 }
1446
1447 task_keys.len()
1448 };
1449
1450 if cancelled_waiting == 0 && cancelled_running == 0 {
1451 tracing::warn!(
1452 task_id = task_id,
1453 "Attempting to cancel non-existent iceberg compaction task"
1454 );
1455 } else {
1456 tracing::info!(
1457 task_id = task_id,
1458 cancelled_waiting = cancelled_waiting,
1459 cancelled_running = cancelled_running,
1460 "Cancelled iceberg compaction task"
1461 );
1462 }
1463}
1464
1465fn handle_meta_task_pulling(
1468 pull_task_ack: &mut bool,
1469 task_queue: &IcebergTaskQueue,
1470 max_task_parallelism: u32,
1471 max_pull_task_count: u32,
1472 request_sender: &mpsc::UnboundedSender<SubscribeIcebergCompactionEventRequest>,
1473 log_throttler: &mut LogThrottler<IcebergCompactionLogState>,
1474) -> bool {
1475 let mut pending_pull_task_count = 0;
1476 if *pull_task_ack {
1477 let current_running_parallelism = task_queue.running_parallelism_sum();
1479 pending_pull_task_count =
1480 (max_task_parallelism - current_running_parallelism).min(max_pull_task_count);
1481
1482 if pending_pull_task_count > 0 {
1483 if let Err(e) = request_sender.send(SubscribeIcebergCompactionEventRequest {
1484 event: Some(subscribe_iceberg_compaction_event_request::Event::PullTask(
1485 subscribe_iceberg_compaction_event_request::PullTask {
1486 pull_task_count: pending_pull_task_count,
1487 },
1488 )),
1489 create_at: SystemTime::now()
1490 .duration_since(std::time::UNIX_EPOCH)
1491 .expect("Clock may have gone backwards")
1492 .as_millis() as u64,
1493 }) {
1494 tracing::warn!(error = %e.as_report(), "Failed to pull task - will retry on stream restart");
1495 return true; } else {
1497 *pull_task_ack = false;
1498 }
1499 }
1500 }
1501
1502 let running_count = task_queue.running_parallelism_sum();
1503 let waiting_count = task_queue.waiting_parallelism_sum();
1504 let available_count = max_task_parallelism.saturating_sub(running_count);
1505 let current_state = IcebergCompactionLogState {
1506 running_parallelism: running_count,
1507 waiting_parallelism: waiting_count,
1508 available_parallelism: available_count,
1509 pull_task_ack: *pull_task_ack,
1510 pending_pull_task_count,
1511 };
1512
1513 if log_throttler.should_log(¤t_state) {
1515 tracing::info!(
1516 running_parallelism_count = %current_state.running_parallelism,
1517 waiting_parallelism_count = %current_state.waiting_parallelism,
1518 available_parallelism = %current_state.available_parallelism,
1519 pull_task_ack = %current_state.pull_task_ack,
1520 pending_pull_task_count = %current_state.pending_pull_task_count
1521 );
1522 log_throttler.update(current_state);
1523 }
1524
1525 false }
1527
1528#[cfg(test)]
1529mod tests {
1530 use super::*;
1531
1532 #[test]
1533 fn test_log_state_equality() {
1534 let state1 = CompactionLogState {
1536 running_parallelism: 10,
1537 pull_task_ack: true,
1538 pending_pull_task_count: 2,
1539 };
1540 let state2 = CompactionLogState {
1541 running_parallelism: 10,
1542 pull_task_ack: true,
1543 pending_pull_task_count: 2,
1544 };
1545 let state3 = CompactionLogState {
1546 running_parallelism: 11,
1547 pull_task_ack: true,
1548 pending_pull_task_count: 2,
1549 };
1550 assert_eq!(state1, state2);
1551 assert_ne!(state1, state3);
1552
1553 let ice_state1 = IcebergCompactionLogState {
1555 running_parallelism: 10,
1556 waiting_parallelism: 5,
1557 available_parallelism: 15,
1558 pull_task_ack: true,
1559 pending_pull_task_count: 2,
1560 };
1561 let ice_state2 = IcebergCompactionLogState {
1562 running_parallelism: 10,
1563 waiting_parallelism: 6,
1564 available_parallelism: 15,
1565 pull_task_ack: true,
1566 pending_pull_task_count: 2,
1567 };
1568 assert_ne!(ice_state1, ice_state2);
1569 }
1570
1571 #[test]
1572 fn test_log_throttler_state_change_detection() {
1573 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_secs(60));
1574 let state1 = CompactionLogState {
1575 running_parallelism: 10,
1576 pull_task_ack: true,
1577 pending_pull_task_count: 2,
1578 };
1579 let state2 = CompactionLogState {
1580 running_parallelism: 11,
1581 pull_task_ack: true,
1582 pending_pull_task_count: 2,
1583 };
1584
1585 assert!(throttler.should_log(&state1));
1587 throttler.update(state1.clone());
1588
1589 assert!(!throttler.should_log(&state1));
1591
1592 assert!(throttler.should_log(&state2));
1594 throttler.update(state2.clone());
1595
1596 assert!(!throttler.should_log(&state2));
1598 }
1599
1600 #[test]
1601 fn test_log_throttler_heartbeat() {
1602 let mut throttler = LogThrottler::<CompactionLogState>::new(Duration::from_millis(10));
1603 let state = CompactionLogState {
1604 running_parallelism: 10,
1605 pull_task_ack: true,
1606 pending_pull_task_count: 2,
1607 };
1608
1609 assert!(throttler.should_log(&state));
1611 throttler.update(state.clone());
1612
1613 assert!(!throttler.should_log(&state));
1615
1616 std::thread::sleep(Duration::from_millis(15));
1618
1619 assert!(throttler.should_log(&state));
1621 }
1622}