1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48 metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54 fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59 ValueMeta,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66 SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70 compact_task: CompactTask,
71 compactor: Compactor,
72 sstable_store: SstableStoreRef,
73 key_range: KeyRange,
74 split_index: usize,
75}
76
77impl CompactorRunner {
78 pub fn new(
79 split_index: usize,
80 context: CompactorContext,
81 task: CompactTask,
82 object_id_getter: Arc<dyn GetObjectId>,
83 ) -> Self {
84 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85 options.compression_algorithm = match task.compression_algorithm {
86 0 => CompressionAlgorithm::None,
87 1 => CompressionAlgorithm::Lz4,
88 _ => CompressionAlgorithm::Zstd,
89 };
90
91 options.capacity = estimate_task_output_capacity(context.clone(), &task);
92 let use_block_based_filter = task.should_use_block_based_filter();
93
94 let key_range = KeyRange {
95 left: task.splits[split_index].left.clone(),
96 right: task.splits[split_index].right.clone(),
97 right_exclusive: true,
98 };
99
100 let compactor = Compactor::new(
101 context.clone(),
102 options,
103 TaskConfig {
104 key_range: key_range.clone(),
105 cache_policy: CachePolicy::NotFill,
106 gc_delete_keys: task.gc_delete_keys,
107 retain_multiple_version: false,
108 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
109 task_type: task.task_type,
110 use_block_based_filter,
111 table_vnode_partition: task.table_vnode_partition.clone(),
112 table_schemas: task
113 .table_schemas
114 .iter()
115 .map(|(k, v)| (*k, v.clone()))
116 .collect(),
117 disable_drop_column_optimization: false,
118 },
119 object_id_getter,
120 );
121
122 Self {
123 compactor,
124 compact_task: task,
125 sstable_store: context.sstable_store,
126 key_range,
127 split_index,
128 }
129 }
130
131 pub async fn run(
132 &self,
133 compaction_filter: impl CompactionFilter,
134 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135 task_progress: Arc<TaskProgress>,
136 ) -> HummockResult<CompactOutput> {
137 let iter =
138 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139 let (ssts, compaction_stat) = self
140 .compactor
141 .compact_key_range(
142 iter,
143 compaction_filter,
144 compaction_catalog_agent_ref,
145 Some(task_progress),
146 Some(self.compact_task.task_id),
147 Some(self.split_index),
148 )
149 .await?;
150 Ok((self.split_index, ssts, compaction_stat))
151 }
152
153 fn build_sst_iter(
155 &self,
156 task_progress: Arc<TaskProgress>,
157 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159 let compactor_iter_max_io_retry_times = self
160 .compactor
161 .context
162 .storage_opts
163 .compactor_iter_max_io_retry_times;
164 let mut table_iters = Vec::new();
165 for level in &self.compact_task.input_ssts {
166 if level.table_infos.is_empty() {
167 continue;
168 }
169
170 let tables = level
171 .table_infos
172 .iter()
173 .filter(|table_info| {
174 let table_ids = &table_info.table_ids;
175 let exist_table = table_ids
176 .iter()
177 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
178
179 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
180 })
181 .cloned()
182 .collect_vec();
183 if level.level_type == LevelType::Nonoverlapping {
185 debug_assert!(can_concat(&level.table_infos));
186 table_iters.push(ConcatSstableIterator::new(
187 self.compact_task.existing_table_ids.clone(),
188 tables,
189 self.compactor.task_config.key_range.clone(),
190 self.sstable_store.clone(),
191 task_progress.clone(),
192 compactor_iter_max_io_retry_times,
193 ));
194 } else if tables.len()
195 > self
196 .compactor
197 .context
198 .storage_opts
199 .compactor_max_overlap_sst_count
200 {
201 let sst_groups = partition_overlapping_sstable_infos(tables);
202 tracing::warn!(
203 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
204 level.table_infos.len(),
205 sst_groups.len()
206 );
207 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
208 assert!(
210 full_key_can_concat(&table_infos),
211 "sst_group idx {:?} table_infos: {:?}",
212 idx,
213 table_infos
214 );
215 table_iters.push(ConcatSstableIterator::new(
216 self.compact_task.existing_table_ids.clone(),
217 table_infos,
218 self.compactor.task_config.key_range.clone(),
219 self.sstable_store.clone(),
220 task_progress.clone(),
221 compactor_iter_max_io_retry_times,
222 ));
223 }
224 } else {
225 for table_info in tables {
226 table_iters.push(ConcatSstableIterator::new(
227 self.compact_task.existing_table_ids.clone(),
228 vec![table_info],
229 self.compactor.task_config.key_range.clone(),
230 self.sstable_store.clone(),
231 task_progress.clone(),
232 compactor_iter_max_io_retry_times,
233 ));
234 }
235 }
236 }
237
238 let combine_iter = {
241 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
242 MonitoredCompactorIterator::new(
243 MergeIterator::for_compactor(table_iters),
244 task_progress,
245 ),
246 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
247 self.compact_task.pk_prefix_table_watermarks.clone(),
248 ),
249 );
250
251 NonPkPrefixSkipWatermarkIterator::new(
252 skip_watermark_iter,
253 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
254 self.compact_task.non_pk_prefix_table_watermarks.clone(),
255 compaction_catalog_agent_ref,
256 ),
257 )
258 };
259
260 Ok(combine_iter)
261 }
262}
263
264pub fn partition_overlapping_sstable_infos(
265 mut origin_infos: Vec<SstableInfo>,
266) -> Vec<Vec<SstableInfo>> {
267 pub struct SstableGroup {
268 ssts: Vec<SstableInfo>,
269 max_right_bound: Bytes,
270 }
271
272 impl PartialEq for SstableGroup {
273 fn eq(&self, other: &SstableGroup) -> bool {
274 self.max_right_bound == other.max_right_bound
275 }
276 }
277 impl PartialOrd for SstableGroup {
278 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
279 Some(self.cmp(other))
280 }
281 }
282 impl Eq for SstableGroup {}
283 impl Ord for SstableGroup {
284 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
285 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
287 }
288 }
289 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
290 origin_infos.sort_by(|a, b| {
291 let x = &a.key_range;
292 let y = &b.key_range;
293 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
294 });
295 for sst in origin_infos {
296 if let Some(mut prev_group) = groups.peek_mut()
298 && KeyComparator::encoded_full_key_less_than(
299 &prev_group.max_right_bound,
300 &sst.key_range.left,
301 )
302 {
303 prev_group.max_right_bound.clone_from(&sst.key_range.right);
304 prev_group.ssts.push(sst);
305 continue;
306 }
307 groups.push(SstableGroup {
308 max_right_bound: sst.key_range.right.clone(),
309 ssts: vec![sst],
310 });
311 }
312 assert!(!groups.is_empty());
313 groups.into_iter().map(|group| group.ssts).collect_vec()
314}
315
316pub async fn compact_with_agent(
319 compactor_context: CompactorContext,
320 mut compact_task: CompactTask,
321 mut shutdown_rx: Receiver<()>,
322 object_id_getter: Arc<dyn GetObjectId>,
323 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
324) -> (
325 (
326 CompactTask,
327 HashMap<TableId, TableStats>,
328 HashMap<HummockSstableObjectId, u64>,
329 ),
330 Option<MemoryTracker>,
331) {
332 let context = compactor_context.clone();
333 let group_label = compact_task.compaction_group_id.to_string();
334 metrics_report_for_task(&compact_task, &context);
335
336 let timer = context
337 .compactor_metrics
338 .compact_task_duration
339 .with_label_values(&[
340 &group_label,
341 &compact_task.input_ssts[0].level_idx.to_string(),
342 ])
343 .start_timer();
344
345 let multi_filter = build_multi_compaction_filter(&compact_task);
346 let mut task_status = TaskStatus::Success;
347 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
348
349 if let Err(e) =
350 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
351 {
352 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
353 task_status = TaskStatus::ExecuteFailed;
354 return (
355 compact_done(compact_task, context.clone(), vec![], task_status),
356 None,
357 );
358 }
359
360 let compact_task_statistics = statistics_compact_task(&compact_task);
361 let parallelism = compact_task.splits.len();
363 assert_ne!(parallelism, 0, "splits cannot be empty");
364 let mut output_ssts = Vec::with_capacity(parallelism);
365 let mut compaction_futures = vec![];
366 let mut abort_handles = vec![];
367 let task_progress_guard =
368 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
369
370 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
371
372 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
373 &compact_task,
374 (context.storage_opts.block_size_kb as u64) * (1 << 10),
375 context
376 .storage_opts
377 .object_store_config
378 .s3
379 .recv_buffer_size
380 .unwrap_or(6 * 1024 * 1024) as u64,
381 capacity as u64,
382 ) * compact_task.splits.len() as u64;
383
384 tracing::info!(
385 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
386 compact_task.task_id,
387 compact_task_statistics,
388 compact_task.compression_algorithm,
389 parallelism,
390 task_memory_capacity_with_parallelism,
391 optimize_by_copy_block,
392 compact_task_to_string(&compact_task),
393 );
394
395 let memory_detector = context
398 .memory_limiter
399 .try_require_memory(task_memory_capacity_with_parallelism);
400 if memory_detector.is_none() {
401 tracing::warn!(
402 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
403 compact_task.task_id,
404 task_memory_capacity_with_parallelism,
405 context.memory_limiter.get_memory_usage(),
406 context.memory_limiter.quota()
407 );
408 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
409 return (
410 compact_done(compact_task, context.clone(), output_ssts, task_status),
411 memory_detector,
412 );
413 }
414
415 context.compactor_metrics.compact_task_pending_num.inc();
416 context
417 .compactor_metrics
418 .compact_task_pending_parallelism
419 .add(parallelism as _);
420 let _release_metrics_guard =
421 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
422 context.compactor_metrics.compact_task_pending_num.dec();
423 context
424 .compactor_metrics
425 .compact_task_pending_parallelism
426 .sub(parallelism as _);
427 });
428
429 if optimize_by_copy_block {
430 let runner = fast_compactor_runner::CompactorRunner::new(
431 context.clone(),
432 compact_task.clone(),
433 compaction_catalog_agent_ref.clone(),
434 object_id_getter.clone(),
435 task_progress_guard.progress.clone(),
436 multi_filter,
437 );
438
439 tokio::select! {
440 _ = &mut shutdown_rx => {
441 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
442 task_status = TaskStatus::ManualCanceled;
443 },
444
445 ret = runner.run() => {
446 match ret {
447 Ok((ssts, statistics)) => {
448 output_ssts.push((0, ssts, statistics));
449 }
450 Err(e) => {
451 task_status = TaskStatus::ExecuteFailed;
452 tracing::warn!(
453 error = %e.as_report(),
454 "Compaction task {} failed with error",
455 compact_task.task_id,
456 );
457 }
458 }
459 }
460 }
461
462 let (compact_task, table_stats, object_timestamps) =
464 compact_done(compact_task, context.clone(), output_ssts, task_status);
465 let cost_time = timer.stop_and_record() * 1000.0;
466 tracing::info!(
467 "Finished fast compaction task in {:?}ms: {}",
468 cost_time,
469 compact_task_to_string(&compact_task)
470 );
471 return (
472 (compact_task, table_stats, object_timestamps),
473 memory_detector,
474 );
475 }
476 for (split_index, _) in compact_task.splits.iter().enumerate() {
477 let filter = multi_filter.clone();
478 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
479 let compactor_runner = CompactorRunner::new(
480 split_index,
481 compactor_context.clone(),
482 compact_task.clone(),
483 object_id_getter.clone(),
484 );
485 let task_progress = task_progress_guard.progress.clone();
486 let runner = async move {
487 compactor_runner
488 .run(filter, compaction_catalog_agent_ref, task_progress)
489 .await
490 };
491 let traced = match context.await_tree_reg.as_ref() {
492 None => runner.right_future(),
493 Some(await_tree_reg) => await_tree_reg
494 .register(
495 await_tree_key::CompactRunner {
496 task_id: compact_task.task_id,
497 split_index,
498 },
499 format!(
500 "Compaction Task {} Split {} ",
501 compact_task.task_id, split_index
502 ),
503 )
504 .instrument(runner)
505 .left_future(),
506 };
507 let handle = tokio::spawn(traced);
508 abort_handles.push(handle.abort_handle());
509 compaction_futures.push(handle);
510 }
511
512 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
513 loop {
514 tokio::select! {
515 _ = &mut shutdown_rx => {
516 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
517 task_status = TaskStatus::ManualCanceled;
518 break;
519 }
520 future_result = buffered.next() => {
521 match future_result {
522 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
523 output_ssts.push((split_index, ssts, compact_stat));
524 }
525 Some(Ok(Err(e))) => {
526 task_status = TaskStatus::ExecuteFailed;
527 tracing::warn!(
528 error = %e.as_report(),
529 "Compaction task {} failed with error",
530 compact_task.task_id,
531 );
532 break;
533 }
534 Some(Err(e)) => {
535 task_status = TaskStatus::JoinHandleFailed;
536 tracing::warn!(
537 error = %e.as_report(),
538 "Compaction task {} failed with join handle error",
539 compact_task.task_id,
540 );
541 break;
542 }
543 None => break,
544 }
545 }
546 }
547 }
548
549 if task_status != TaskStatus::Success {
550 for abort_handle in abort_handles {
551 abort_handle.abort();
552 }
553 output_ssts.clear();
554 }
555 if !output_ssts.is_empty() {
557 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
558 }
559
560 let (compact_task, table_stats, object_timestamps) =
562 compact_done(compact_task, context.clone(), output_ssts, task_status);
563 let cost_time = timer.stop_and_record() * 1000.0;
564 tracing::info!(
565 "Finished compaction task in {:?}ms: {}",
566 cost_time,
567 compact_task_output_to_string(&compact_task)
568 );
569 (
570 (compact_task, table_stats, object_timestamps),
571 memory_detector,
572 )
573}
574
575pub async fn compact(
578 compactor_context: CompactorContext,
579 mut compact_task: CompactTask,
580 shutdown_rx: Receiver<()>,
581 object_id_getter: Arc<dyn GetObjectId>,
582 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
583) -> (
584 (
585 CompactTask,
586 HashMap<TableId, TableStats>,
587 HashMap<HummockSstableObjectId, u64>,
588 ),
589 Option<MemoryTracker>,
590) {
591 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
592 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
593 .acquire(table_ids_to_be_compacted.clone())
594 .await
595 {
596 Ok(compaction_catalog_agent_ref) => {
597 let acquire_table_ids: HashSet<StateTableId> =
598 compaction_catalog_agent_ref.table_ids().collect();
599 if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
600 let diff = table_ids_to_be_compacted
601 .into_iter()
602 .collect::<HashSet<_>>()
603 .symmetric_difference(&acquire_table_ids)
604 .cloned()
605 .collect::<Vec<_>>();
606 tracing::warn!(
607 dif= ?diff,
608 "Some table ids are not acquired."
609 );
610 return (
611 compact_done(
612 compact_task,
613 compactor_context.clone(),
614 vec![],
615 TaskStatus::ExecuteFailed,
616 ),
617 None,
618 );
619 }
620
621 compaction_catalog_agent_ref
622 }
623 Err(e) => {
624 tracing::warn!(
625 error = %e.as_report(),
626 "Failed to acquire compaction catalog agent"
627 );
628 return (
629 compact_done(
630 compact_task,
631 compactor_context.clone(),
632 vec![],
633 TaskStatus::ExecuteFailed,
634 ),
635 None,
636 );
637 }
638 };
639
640 {
642 compact_task
643 .pk_prefix_table_watermarks
644 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
645
646 compact_task
647 .non_pk_prefix_table_watermarks
648 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
649 }
650
651 compact_with_agent(
652 compactor_context,
653 compact_task,
654 shutdown_rx,
655 object_id_getter,
656 compaction_catalog_agent_ref,
657 )
658 .await
659}
660
661pub(crate) fn compact_done(
663 mut compact_task: CompactTask,
664 context: CompactorContext,
665 output_ssts: Vec<CompactOutput>,
666 task_status: TaskStatus,
667) -> (
668 CompactTask,
669 HashMap<TableId, TableStats>,
670 HashMap<HummockSstableObjectId, u64>,
671) {
672 let mut table_stats_map = TableStatsMap::default();
673 let mut object_timestamps = HashMap::default();
674 compact_task.task_status = task_status;
675 compact_task
676 .sorted_output_ssts
677 .reserve(compact_task.splits.len());
678 let mut compaction_write_bytes = 0;
679 for (
680 _,
681 ssts,
682 CompactionStatistics {
683 delta_drop_stat, ..
684 },
685 ) in output_ssts
686 {
687 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
688 for sst_info in ssts {
689 compaction_write_bytes += sst_info.file_size();
690 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
691 compact_task.sorted_output_ssts.push(sst_info.sst_info);
692 }
693 }
694
695 let group_label = compact_task.compaction_group_id.to_string();
696 let level_label = compact_task.target_level.to_string();
697 context
698 .compactor_metrics
699 .compact_write_bytes
700 .with_label_values(&[&group_label, &level_label])
701 .inc_by(compaction_write_bytes);
702 context
703 .compactor_metrics
704 .compact_write_sstn
705 .with_label_values(&[&group_label, &level_label])
706 .inc_by(compact_task.sorted_output_ssts.len() as u64);
707
708 (compact_task, table_stats_map, object_timestamps)
709}
710
711pub async fn compact_and_build_sst<F>(
712 sst_builder: &mut CapacitySplitTableBuilder<F>,
713 task_config: &TaskConfig,
714 compactor_metrics: Arc<CompactorMetrics>,
715 mut iter: impl HummockIterator<Direction = Forward>,
716 mut compaction_filter: impl CompactionFilter,
717) -> HummockResult<CompactionStatistics>
718where
719 F: TableBuilderFactory,
720{
721 if !task_config.key_range.left.is_empty() {
722 let full_key = FullKey::decode(&task_config.key_range.left);
723 iter.seek(full_key)
724 .instrument_await("iter_seek".verbose())
725 .await?;
726 } else {
727 iter.rewind().instrument_await("rewind".verbose()).await?;
728 };
729
730 let end_key = if task_config.key_range.right.is_empty() {
731 FullKey::default()
732 } else {
733 FullKey::decode(&task_config.key_range.right).to_vec()
734 };
735 let max_key = end_key.to_ref();
736
737 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
738 let mut local_stats = StoreLocalStatistic::default();
739
740 let mut table_stats_drop = TableStatsMap::default();
742 let mut last_table_stats = TableStats::default();
743 let mut last_table_id = None;
744 let mut compaction_statistics = CompactionStatistics::default();
745 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
747 let schemas: HashMap<TableId, HashSet<i32>> = task_config
748 .table_schemas
749 .iter()
750 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
751 .collect();
752 while iter.is_valid() {
753 let iter_key = iter.key();
754 compaction_statistics.iter_total_key_counts += 1;
755
756 let is_new_user_key = full_key_tracker.observe(iter.key());
757 let mut drop = false;
758
759 let value = iter.value();
761 let ValueMeta {
762 object_id,
763 block_id,
764 } = iter.value_meta();
765 if is_new_user_key {
766 if !max_key.is_empty() && iter_key >= max_key {
767 break;
768 }
769 if value.is_delete() {
770 local_stats.skip_delete_key_count += 1;
771 }
772 } else {
773 local_stats.skip_multi_version_key_count += 1;
774 }
775
776 if last_table_id != Some(iter_key.user_key.table_id) {
777 if let Some(last_table_id) = last_table_id.take() {
778 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
779 }
780 last_table_id = Some(iter_key.user_key.table_id);
781 }
782
783 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
790 || (!task_config.retain_multiple_version && !is_new_user_key)
791 {
792 drop = true;
793 }
794
795 if !drop && compaction_filter.should_delete(iter_key) {
796 drop = true;
797 }
798
799 if drop {
800 compaction_statistics.iter_drop_key_counts += 1;
801
802 let should_count = match task_config.stats_target_table_ids.as_ref() {
803 Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
804 None => true,
805 };
806 if should_count {
807 last_table_stats.total_key_count -= 1;
808 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
809 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
810 }
811 iter.next()
812 .instrument_await("iter_next_in_drop".verbose())
813 .await?;
814 continue;
815 }
816
817 let check_table_id = iter_key.user_key.table_id;
819 let mut is_value_rewritten = false;
820 if let HummockValue::Put(v) = value
821 && let Some(object_id) = object_id
822 && let Some(block_id) = block_id
823 && !skip_schema_check
824 .get(&object_id)
825 .map(|prev_block_id| {
826 assert!(*prev_block_id <= block_id);
827 *prev_block_id == block_id
828 })
829 .unwrap_or(false)
830 && let Some(schema) = schemas.get(&check_table_id)
831 {
832 let value_size = v.len();
833 match try_drop_invalid_columns(v, schema) {
834 None => {
835 if !task_config.disable_drop_column_optimization {
836 skip_schema_check.insert(object_id, block_id);
839 }
840 }
841 Some(new_value) => {
842 is_value_rewritten = true;
843 let new_put = HummockValue::put(new_value.as_slice());
844 sst_builder
845 .add_full_key(iter_key, new_put, is_new_user_key)
846 .instrument_await("add_rewritten_full_key".verbose())
847 .await?;
848 let value_size_change = value_size as i64 - new_value.len() as i64;
849 assert!(value_size_change >= 0);
850 last_table_stats.total_value_size -= value_size_change;
851 }
852 }
853 }
854
855 if !is_value_rewritten {
856 sst_builder
858 .add_full_key(iter_key, value, is_new_user_key)
859 .instrument_await("add_full_key".verbose())
860 .await?;
861 }
862
863 iter.next().instrument_await("iter_next".verbose()).await?;
864 }
865
866 if let Some(last_table_id) = last_table_id.take() {
867 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
868 }
869 iter.collect_local_statistic(&mut local_stats);
870 add_table_stats_map(
871 &mut table_stats_drop,
872 &local_stats.skipped_by_watermark_table_stats,
873 );
874 local_stats.report_compactor(compactor_metrics.as_ref());
875 compaction_statistics.delta_drop_stat = table_stats_drop;
876
877 Ok(compaction_statistics)
878}
879
880#[cfg(test)]
881pub mod tests {
882 use risingwave_hummock_sdk::can_concat;
883
884 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
885 use crate::hummock::iterator::test_utils::mock_sstable_store;
886 use crate::hummock::test_utils::{
887 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
888 };
889 use crate::hummock::value::HummockValue;
890
891 #[tokio::test]
892 async fn test_partition_overlapping_level() {
893 const TEST_KEYS_COUNT: usize = 10;
894 let sstable_store = mock_sstable_store().await;
895 let mut table_infos = vec![];
896 for object_id in 0..10 {
897 let start_index = object_id * TEST_KEYS_COUNT;
898 let end_index = start_index + 2 * TEST_KEYS_COUNT;
899 let table_info = gen_test_sstable_info(
900 default_builder_opt_for_test(),
901 object_id as u64,
902 (start_index..end_index)
903 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
904 sstable_store.clone(),
905 )
906 .await;
907 table_infos.push(table_info);
908 }
909 let table_infos = partition_overlapping_sstable_infos(table_infos);
910 assert_eq!(table_infos.len(), 2);
911 for ssts in table_infos {
912 assert!(can_concat(&ssts));
913 }
914 }
915}