1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48 metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54 fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59 ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66 SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70 compact_task: CompactTask,
71 compactor: Compactor,
72 sstable_store: SstableStoreRef,
73 key_range: KeyRange,
74 split_index: usize,
75}
76
77impl CompactorRunner {
78 pub fn new(
79 split_index: usize,
80 context: CompactorContext,
81 task: CompactTask,
82 object_id_getter: Arc<dyn GetObjectId>,
83 ) -> Self {
84 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85 options.compression_algorithm = match task.compression_algorithm {
86 0 => CompressionAlgorithm::None,
87 1 => CompressionAlgorithm::Lz4,
88 _ => CompressionAlgorithm::Zstd,
89 };
90
91 options.capacity = estimate_task_output_capacity(context.clone(), &task);
92 let use_block_based_filter = task.should_use_block_based_filter();
93
94 let key_range = KeyRange {
95 left: task.splits[split_index].left.clone(),
96 right: task.splits[split_index].right.clone(),
97 right_exclusive: true,
98 };
99
100 let compactor = Compactor::new(
101 context.clone(),
102 options,
103 TaskConfig {
104 key_range: key_range.clone(),
105 cache_policy: CachePolicy::NotFill,
106 gc_delete_keys: task.gc_delete_keys,
107 retain_multiple_version: false,
108 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
109 task_type: task.task_type,
110 use_block_based_filter,
111 table_vnode_partition: task.table_vnode_partition.clone(),
112 table_schemas: task
113 .table_schemas
114 .iter()
115 .map(|(k, v)| (*k, v.clone()))
116 .collect(),
117 disable_drop_column_optimization: false,
118 },
119 object_id_getter,
120 );
121
122 Self {
123 compactor,
124 compact_task: task,
125 sstable_store: context.sstable_store,
126 key_range,
127 split_index,
128 }
129 }
130
131 pub async fn run(
132 &self,
133 compaction_filter: impl CompactionFilter,
134 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135 task_progress: Arc<TaskProgress>,
136 ) -> HummockResult<CompactOutput> {
137 let iter =
138 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139 let (ssts, compaction_stat) = self
140 .compactor
141 .compact_key_range(
142 iter,
143 compaction_filter,
144 compaction_catalog_agent_ref,
145 Some(task_progress),
146 Some(self.compact_task.task_id),
147 Some(self.split_index),
148 )
149 .await?;
150 Ok((self.split_index, ssts, compaction_stat))
151 }
152
153 fn build_sst_iter(
155 &self,
156 task_progress: Arc<TaskProgress>,
157 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159 let compactor_iter_max_io_retry_times = self
160 .compactor
161 .context
162 .storage_opts
163 .compactor_iter_max_io_retry_times;
164 let mut table_iters = Vec::new();
165 for level in &self.compact_task.input_ssts {
166 if level.table_infos.is_empty() {
167 continue;
168 }
169
170 let tables = level
171 .table_infos
172 .iter()
173 .filter(|table_info| {
174 let table_ids = &table_info.table_ids;
175 let exist_table = table_ids
176 .iter()
177 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
178
179 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
180 })
181 .cloned()
182 .collect_vec();
183 if level.level_type == LevelType::Nonoverlapping {
185 debug_assert!(can_concat(&level.table_infos));
186 table_iters.push(ConcatSstableIterator::new(
187 self.compact_task.existing_table_ids.clone(),
188 tables,
189 self.compactor.task_config.key_range.clone(),
190 self.sstable_store.clone(),
191 task_progress.clone(),
192 compactor_iter_max_io_retry_times,
193 ));
194 } else if tables.len()
195 > self
196 .compactor
197 .context
198 .storage_opts
199 .compactor_max_overlap_sst_count
200 {
201 let sst_groups = partition_overlapping_sstable_infos(tables);
202 tracing::warn!(
203 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
204 level.table_infos.len(),
205 sst_groups.len()
206 );
207 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
208 assert!(
210 full_key_can_concat(&table_infos),
211 "sst_group idx {:?} table_infos: {:?}",
212 idx,
213 table_infos
214 );
215 table_iters.push(ConcatSstableIterator::new(
216 self.compact_task.existing_table_ids.clone(),
217 table_infos,
218 self.compactor.task_config.key_range.clone(),
219 self.sstable_store.clone(),
220 task_progress.clone(),
221 compactor_iter_max_io_retry_times,
222 ));
223 }
224 } else {
225 for table_info in tables {
226 table_iters.push(ConcatSstableIterator::new(
227 self.compact_task.existing_table_ids.clone(),
228 vec![table_info],
229 self.compactor.task_config.key_range.clone(),
230 self.sstable_store.clone(),
231 task_progress.clone(),
232 compactor_iter_max_io_retry_times,
233 ));
234 }
235 }
236 }
237
238 let combine_iter = {
243 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
244 MonitoredCompactorIterator::new(
245 MergeIterator::for_compactor(table_iters),
246 task_progress,
247 ),
248 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
249 self.compact_task.pk_prefix_table_watermarks.clone(),
250 ),
251 );
252
253 let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
254 skip_watermark_iter,
255 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
256 self.compact_task.non_pk_prefix_table_watermarks.clone(),
257 compaction_catalog_agent_ref.clone(),
258 ),
259 );
260
261 ValueSkipWatermarkIterator::new(
262 pk_skip_watermark_iter,
263 ValueSkipWatermarkState::from_safe_epoch_watermarks(
264 self.compact_task.value_table_watermarks.clone(),
265 compaction_catalog_agent_ref,
266 ),
267 )
268 };
269
270 Ok(combine_iter)
271 }
272}
273
274pub fn partition_overlapping_sstable_infos(
275 mut origin_infos: Vec<SstableInfo>,
276) -> Vec<Vec<SstableInfo>> {
277 pub struct SstableGroup {
278 ssts: Vec<SstableInfo>,
279 max_right_bound: Bytes,
280 }
281
282 impl PartialEq for SstableGroup {
283 fn eq(&self, other: &SstableGroup) -> bool {
284 self.max_right_bound == other.max_right_bound
285 }
286 }
287 impl PartialOrd for SstableGroup {
288 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
289 Some(self.cmp(other))
290 }
291 }
292 impl Eq for SstableGroup {}
293 impl Ord for SstableGroup {
294 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
295 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
297 }
298 }
299 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
300 origin_infos.sort_by(|a, b| {
301 let x = &a.key_range;
302 let y = &b.key_range;
303 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
304 });
305 for sst in origin_infos {
306 if let Some(mut prev_group) = groups.peek_mut()
308 && KeyComparator::encoded_full_key_less_than(
309 &prev_group.max_right_bound,
310 &sst.key_range.left,
311 )
312 {
313 prev_group.max_right_bound.clone_from(&sst.key_range.right);
314 prev_group.ssts.push(sst);
315 continue;
316 }
317 groups.push(SstableGroup {
318 max_right_bound: sst.key_range.right.clone(),
319 ssts: vec![sst],
320 });
321 }
322 assert!(!groups.is_empty());
323 groups.into_iter().map(|group| group.ssts).collect_vec()
324}
325
326pub async fn compact_with_agent(
329 compactor_context: CompactorContext,
330 mut compact_task: CompactTask,
331 mut shutdown_rx: Receiver<()>,
332 object_id_getter: Arc<dyn GetObjectId>,
333 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
334) -> (
335 (
336 CompactTask,
337 HashMap<TableId, TableStats>,
338 HashMap<HummockSstableObjectId, u64>,
339 ),
340 Option<MemoryTracker>,
341) {
342 let context = compactor_context.clone();
343 let group_label = compact_task.compaction_group_id.to_string();
344 metrics_report_for_task(&compact_task, &context);
345
346 let timer = context
347 .compactor_metrics
348 .compact_task_duration
349 .with_label_values(&[
350 &group_label,
351 &compact_task.input_ssts[0].level_idx.to_string(),
352 ])
353 .start_timer();
354
355 let multi_filter = build_multi_compaction_filter(&compact_task);
356 let mut task_status = TaskStatus::Success;
357 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
358
359 if let Err(e) =
360 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
361 {
362 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
363 task_status = TaskStatus::ExecuteFailed;
364 return (
365 compact_done(compact_task, context.clone(), vec![], task_status),
366 None,
367 );
368 }
369
370 let compact_task_statistics = statistics_compact_task(&compact_task);
371 let parallelism = compact_task.splits.len();
373 assert_ne!(parallelism, 0, "splits cannot be empty");
374 let mut output_ssts = Vec::with_capacity(parallelism);
375 let mut compaction_futures = vec![];
376 let mut abort_handles = vec![];
377 let task_progress_guard =
378 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
379
380 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
381
382 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
383 &compact_task,
384 (context.storage_opts.block_size_kb as u64) * (1 << 10),
385 context
386 .storage_opts
387 .object_store_config
388 .s3
389 .recv_buffer_size
390 .unwrap_or(6 * 1024 * 1024) as u64,
391 capacity as u64,
392 ) * compact_task.splits.len() as u64;
393
394 tracing::info!(
395 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
396 compact_task.task_id,
397 compact_task_statistics,
398 compact_task.compression_algorithm,
399 parallelism,
400 task_memory_capacity_with_parallelism,
401 optimize_by_copy_block,
402 compact_task_to_string(&compact_task),
403 );
404
405 let memory_detector = context
408 .memory_limiter
409 .try_require_memory(task_memory_capacity_with_parallelism);
410 if memory_detector.is_none() {
411 tracing::warn!(
412 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
413 compact_task.task_id,
414 task_memory_capacity_with_parallelism,
415 context.memory_limiter.get_memory_usage(),
416 context.memory_limiter.quota()
417 );
418 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
419 return (
420 compact_done(compact_task, context.clone(), output_ssts, task_status),
421 memory_detector,
422 );
423 }
424
425 context.compactor_metrics.compact_task_pending_num.inc();
426 context
427 .compactor_metrics
428 .compact_task_pending_parallelism
429 .add(parallelism as _);
430 let _release_metrics_guard =
431 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
432 context.compactor_metrics.compact_task_pending_num.dec();
433 context
434 .compactor_metrics
435 .compact_task_pending_parallelism
436 .sub(parallelism as _);
437 });
438
439 if optimize_by_copy_block {
440 let runner = fast_compactor_runner::CompactorRunner::new(
441 context.clone(),
442 compact_task.clone(),
443 compaction_catalog_agent_ref.clone(),
444 object_id_getter.clone(),
445 task_progress_guard.progress.clone(),
446 multi_filter,
447 );
448
449 tokio::select! {
450 _ = &mut shutdown_rx => {
451 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
452 task_status = TaskStatus::ManualCanceled;
453 },
454
455 ret = runner.run() => {
456 match ret {
457 Ok((ssts, statistics)) => {
458 output_ssts.push((0, ssts, statistics));
459 }
460 Err(e) => {
461 task_status = TaskStatus::ExecuteFailed;
462 tracing::warn!(
463 error = %e.as_report(),
464 "Compaction task {} failed with error",
465 compact_task.task_id,
466 );
467 }
468 }
469 }
470 }
471
472 let (compact_task, table_stats, object_timestamps) =
474 compact_done(compact_task, context.clone(), output_ssts, task_status);
475 let cost_time = timer.stop_and_record() * 1000.0;
476 tracing::info!(
477 "Finished fast compaction task in {:?}ms: {}",
478 cost_time,
479 compact_task_to_string(&compact_task)
480 );
481 return (
482 (compact_task, table_stats, object_timestamps),
483 memory_detector,
484 );
485 }
486 for (split_index, _) in compact_task.splits.iter().enumerate() {
487 let filter = multi_filter.clone();
488 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
489 let compactor_runner = CompactorRunner::new(
490 split_index,
491 compactor_context.clone(),
492 compact_task.clone(),
493 object_id_getter.clone(),
494 );
495 let task_progress = task_progress_guard.progress.clone();
496 let runner = async move {
497 compactor_runner
498 .run(filter, compaction_catalog_agent_ref, task_progress)
499 .await
500 };
501 let traced = match context.await_tree_reg.as_ref() {
502 None => runner.right_future(),
503 Some(await_tree_reg) => await_tree_reg
504 .register(
505 await_tree_key::CompactRunner {
506 task_id: compact_task.task_id,
507 split_index,
508 },
509 format!(
510 "Compaction Task {} Split {} ",
511 compact_task.task_id, split_index
512 ),
513 )
514 .instrument(runner)
515 .left_future(),
516 };
517 let handle = tokio::spawn(traced);
518 abort_handles.push(handle.abort_handle());
519 compaction_futures.push(handle);
520 }
521
522 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
523 loop {
524 tokio::select! {
525 _ = &mut shutdown_rx => {
526 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
527 task_status = TaskStatus::ManualCanceled;
528 break;
529 }
530 future_result = buffered.next() => {
531 match future_result {
532 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
533 output_ssts.push((split_index, ssts, compact_stat));
534 }
535 Some(Ok(Err(e))) => {
536 task_status = TaskStatus::ExecuteFailed;
537 tracing::warn!(
538 error = %e.as_report(),
539 "Compaction task {} failed with error",
540 compact_task.task_id,
541 );
542 break;
543 }
544 Some(Err(e)) => {
545 task_status = TaskStatus::JoinHandleFailed;
546 tracing::warn!(
547 error = %e.as_report(),
548 "Compaction task {} failed with join handle error",
549 compact_task.task_id,
550 );
551 break;
552 }
553 None => break,
554 }
555 }
556 }
557 }
558
559 if task_status != TaskStatus::Success {
560 for abort_handle in abort_handles {
561 abort_handle.abort();
562 }
563 output_ssts.clear();
564 }
565 if !output_ssts.is_empty() {
567 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
568 }
569
570 let (compact_task, table_stats, object_timestamps) =
572 compact_done(compact_task, context.clone(), output_ssts, task_status);
573 let cost_time = timer.stop_and_record() * 1000.0;
574 tracing::info!(
575 "Finished compaction task in {:?}ms: {}",
576 cost_time,
577 compact_task_output_to_string(&compact_task)
578 );
579 (
580 (compact_task, table_stats, object_timestamps),
581 memory_detector,
582 )
583}
584
585pub async fn compact(
588 compactor_context: CompactorContext,
589 mut compact_task: CompactTask,
590 shutdown_rx: Receiver<()>,
591 object_id_getter: Arc<dyn GetObjectId>,
592 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
593) -> (
594 (
595 CompactTask,
596 HashMap<TableId, TableStats>,
597 HashMap<HummockSstableObjectId, u64>,
598 ),
599 Option<MemoryTracker>,
600) {
601 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
602 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
603 .acquire(table_ids_to_be_compacted.clone())
604 .await
605 {
606 Ok(compaction_catalog_agent_ref) => {
607 let acquire_table_ids: HashSet<StateTableId> =
608 compaction_catalog_agent_ref.table_ids().collect();
609 if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
610 let diff = table_ids_to_be_compacted
611 .into_iter()
612 .collect::<HashSet<_>>()
613 .symmetric_difference(&acquire_table_ids)
614 .cloned()
615 .collect::<Vec<_>>();
616 tracing::warn!(
617 dif= ?diff,
618 "Some table ids are not acquired."
619 );
620 return (
621 compact_done(
622 compact_task,
623 compactor_context.clone(),
624 vec![],
625 TaskStatus::ExecuteFailed,
626 ),
627 None,
628 );
629 }
630
631 compaction_catalog_agent_ref
632 }
633 Err(e) => {
634 tracing::warn!(
635 error = %e.as_report(),
636 "Failed to acquire compaction catalog agent"
637 );
638 return (
639 compact_done(
640 compact_task,
641 compactor_context.clone(),
642 vec![],
643 TaskStatus::ExecuteFailed,
644 ),
645 None,
646 );
647 }
648 };
649
650 {
652 compact_task
653 .pk_prefix_table_watermarks
654 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
655
656 compact_task
657 .non_pk_prefix_table_watermarks
658 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
659 }
660
661 compact_with_agent(
662 compactor_context,
663 compact_task,
664 shutdown_rx,
665 object_id_getter,
666 compaction_catalog_agent_ref,
667 )
668 .await
669}
670
671pub(crate) fn compact_done(
673 mut compact_task: CompactTask,
674 context: CompactorContext,
675 output_ssts: Vec<CompactOutput>,
676 task_status: TaskStatus,
677) -> (
678 CompactTask,
679 HashMap<TableId, TableStats>,
680 HashMap<HummockSstableObjectId, u64>,
681) {
682 let mut table_stats_map = TableStatsMap::default();
683 let mut object_timestamps = HashMap::default();
684 compact_task.task_status = task_status;
685 compact_task
686 .sorted_output_ssts
687 .reserve(compact_task.splits.len());
688 let mut compaction_write_bytes = 0;
689 for (
690 _,
691 ssts,
692 CompactionStatistics {
693 delta_drop_stat, ..
694 },
695 ) in output_ssts
696 {
697 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
698 for sst_info in ssts {
699 compaction_write_bytes += sst_info.file_size();
700 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
701 compact_task.sorted_output_ssts.push(sst_info.sst_info);
702 }
703 }
704
705 let group_label = compact_task.compaction_group_id.to_string();
706 let level_label = compact_task.target_level.to_string();
707 context
708 .compactor_metrics
709 .compact_write_bytes
710 .with_label_values(&[&group_label, &level_label])
711 .inc_by(compaction_write_bytes);
712 context
713 .compactor_metrics
714 .compact_write_sstn
715 .with_label_values(&[&group_label, &level_label])
716 .inc_by(compact_task.sorted_output_ssts.len() as u64);
717
718 (compact_task, table_stats_map, object_timestamps)
719}
720
721pub async fn compact_and_build_sst<F>(
722 sst_builder: &mut CapacitySplitTableBuilder<F>,
723 task_config: &TaskConfig,
724 compactor_metrics: Arc<CompactorMetrics>,
725 mut iter: impl HummockIterator<Direction = Forward>,
726 mut compaction_filter: impl CompactionFilter,
727) -> HummockResult<CompactionStatistics>
728where
729 F: TableBuilderFactory,
730{
731 if !task_config.key_range.left.is_empty() {
732 let full_key = FullKey::decode(&task_config.key_range.left);
733 iter.seek(full_key)
734 .instrument_await("iter_seek".verbose())
735 .await?;
736 } else {
737 iter.rewind().instrument_await("rewind".verbose()).await?;
738 };
739
740 let end_key = if task_config.key_range.right.is_empty() {
741 FullKey::default()
742 } else {
743 FullKey::decode(&task_config.key_range.right).to_vec()
744 };
745 let max_key = end_key.to_ref();
746
747 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
748 let mut local_stats = StoreLocalStatistic::default();
749
750 let mut table_stats_drop = TableStatsMap::default();
752 let mut last_table_stats = TableStats::default();
753 let mut last_table_id = None;
754 let mut compaction_statistics = CompactionStatistics::default();
755 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
757 let schemas: HashMap<TableId, HashSet<i32>> = task_config
758 .table_schemas
759 .iter()
760 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
761 .collect();
762 while iter.is_valid() {
763 let iter_key = iter.key();
764 compaction_statistics.iter_total_key_counts += 1;
765
766 let is_new_user_key = full_key_tracker.observe(iter.key());
767 let mut drop = false;
768
769 let value = iter.value();
771 let ValueMeta {
772 object_id,
773 block_id,
774 } = iter.value_meta();
775 if is_new_user_key {
776 if !max_key.is_empty() && iter_key >= max_key {
777 break;
778 }
779 if value.is_delete() {
780 local_stats.skip_delete_key_count += 1;
781 }
782 } else {
783 local_stats.skip_multi_version_key_count += 1;
784 }
785
786 if last_table_id != Some(iter_key.user_key.table_id) {
787 if let Some(last_table_id) = last_table_id.take() {
788 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
789 }
790 last_table_id = Some(iter_key.user_key.table_id);
791 }
792
793 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
800 || (!task_config.retain_multiple_version && !is_new_user_key)
801 {
802 drop = true;
803 }
804
805 if !drop && compaction_filter.should_delete(iter_key) {
806 drop = true;
807 }
808
809 if drop {
810 compaction_statistics.iter_drop_key_counts += 1;
811
812 let should_count = match task_config.stats_target_table_ids.as_ref() {
813 Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
814 None => true,
815 };
816 if should_count {
817 last_table_stats.total_key_count -= 1;
818 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
819 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
820 }
821 iter.next()
822 .instrument_await("iter_next_in_drop".verbose())
823 .await?;
824 continue;
825 }
826
827 let check_table_id = iter_key.user_key.table_id;
829 let mut is_value_rewritten = false;
830 if let HummockValue::Put(v) = value
831 && let Some(object_id) = object_id
832 && let Some(block_id) = block_id
833 && !skip_schema_check
834 .get(&object_id)
835 .map(|prev_block_id| {
836 assert!(*prev_block_id <= block_id);
837 *prev_block_id == block_id
838 })
839 .unwrap_or(false)
840 && let Some(schema) = schemas.get(&check_table_id)
841 {
842 let value_size = v.len();
843 match try_drop_invalid_columns(v, schema) {
844 None => {
845 if !task_config.disable_drop_column_optimization {
846 skip_schema_check.insert(object_id, block_id);
849 }
850 }
851 Some(new_value) => {
852 is_value_rewritten = true;
853 let new_put = HummockValue::put(new_value.as_slice());
854 sst_builder
855 .add_full_key(iter_key, new_put, is_new_user_key)
856 .instrument_await("add_rewritten_full_key".verbose())
857 .await?;
858 let value_size_change = value_size as i64 - new_value.len() as i64;
859 assert!(value_size_change >= 0);
860 last_table_stats.total_value_size -= value_size_change;
861 }
862 }
863 }
864
865 if !is_value_rewritten {
866 sst_builder
868 .add_full_key(iter_key, value, is_new_user_key)
869 .instrument_await("add_full_key".verbose())
870 .await?;
871 }
872
873 iter.next().instrument_await("iter_next".verbose()).await?;
874 }
875
876 if let Some(last_table_id) = last_table_id.take() {
877 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
878 }
879 iter.collect_local_statistic(&mut local_stats);
880 add_table_stats_map(
881 &mut table_stats_drop,
882 &local_stats.skipped_by_watermark_table_stats,
883 );
884 local_stats.report_compactor(compactor_metrics.as_ref());
885 compaction_statistics.delta_drop_stat = table_stats_drop;
886
887 Ok(compaction_statistics)
888}
889
890#[cfg(test)]
891pub mod tests {
892 use risingwave_hummock_sdk::can_concat;
893
894 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
895 use crate::hummock::iterator::test_utils::mock_sstable_store;
896 use crate::hummock::test_utils::{
897 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
898 };
899 use crate::hummock::value::HummockValue;
900
901 #[tokio::test]
902 async fn test_partition_overlapping_level() {
903 const TEST_KEYS_COUNT: usize = 10;
904 let sstable_store = mock_sstable_store().await;
905 let mut table_infos = vec![];
906 for object_id in 0..10 {
907 let start_index = object_id * TEST_KEYS_COUNT;
908 let end_index = start_index + 2 * TEST_KEYS_COUNT;
909 let table_info = gen_test_sstable_info(
910 default_builder_opt_for_test(),
911 object_id as u64,
912 (start_index..end_index)
913 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
914 sstable_store.clone(),
915 )
916 .await;
917 table_infos.push(table_info);
918 }
919 let table_infos = partition_overlapping_sstable_infos(table_infos);
920 assert_eq!(table_infos.len(), 2);
921 for ssts in table_infos {
922 assert!(can_concat(&ssts));
923 }
924 }
925}