1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48 metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54 fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59 ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66 SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70 compact_task: CompactTask,
71 compactor: Compactor,
72 sstable_store: SstableStoreRef,
73 key_range: KeyRange,
74 split_index: usize,
75}
76
77impl CompactorRunner {
78 pub fn new(
79 split_index: usize,
80 context: CompactorContext,
81 task: CompactTask,
82 object_id_getter: Arc<dyn GetObjectId>,
83 ) -> Self {
84 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85 options.compression_algorithm = match task.compression_algorithm {
86 0 => CompressionAlgorithm::None,
87 1 => CompressionAlgorithm::Lz4,
88 _ => CompressionAlgorithm::Zstd,
89 };
90
91 options.capacity = estimate_task_output_capacity(context.clone(), &task);
92 options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
93 let use_block_based_filter = task.should_use_block_based_filter();
94
95 let key_range = KeyRange {
96 left: task.splits[split_index].left.clone(),
97 right: task.splits[split_index].right.clone(),
98 right_exclusive: true,
99 };
100 let compactor = Compactor::new(
101 context.clone(),
102 options,
103 TaskConfig {
104 key_range: key_range.clone(),
105 cache_policy: CachePolicy::NotFill,
106 gc_delete_keys: task.gc_delete_keys,
107 retain_multiple_version: false,
108 use_block_based_filter,
109 table_vnode_partition: task.table_vnode_partition.clone(),
110 table_schemas: task
111 .table_schemas
112 .iter()
113 .map(|(k, v)| (*k, v.clone()))
114 .collect(),
115 disable_drop_column_optimization: false,
116 },
117 object_id_getter,
118 );
119
120 Self {
121 compactor,
122 compact_task: task,
123 sstable_store: context.sstable_store,
124 key_range,
125 split_index,
126 }
127 }
128
129 pub async fn run(
130 &self,
131 compaction_filter: impl CompactionFilter,
132 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
133 task_progress: Arc<TaskProgress>,
134 ) -> HummockResult<CompactOutput> {
135 let iter =
136 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
137 let (ssts, compaction_stat) = self
138 .compactor
139 .compact_key_range(
140 iter,
141 compaction_filter,
142 compaction_catalog_agent_ref,
143 Some(task_progress),
144 Some(self.compact_task.task_id),
145 Some(self.split_index),
146 )
147 .await?;
148 Ok((self.split_index, ssts, compaction_stat))
149 }
150
151 fn build_sst_iter(
153 &self,
154 task_progress: Arc<TaskProgress>,
155 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
156 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
157 let compactor_iter_max_io_retry_times = self
158 .compactor
159 .context
160 .storage_opts
161 .compactor_iter_max_io_retry_times;
162 let mut table_iters = Vec::new();
163
164 for level in &self.compact_task.input_ssts {
165 let tables = level
166 .read_sstable_infos()
167 .filter(|table_info| self.key_range.full_key_overlap(&table_info.key_range))
168 .cloned()
169 .collect_vec();
170 if tables.is_empty() {
171 continue;
172 }
173
174 if level.level_type == LevelType::Nonoverlapping {
175 debug_assert!(can_concat(&tables));
176 table_iters.push(ConcatSstableIterator::new(
177 tables,
178 self.compactor.task_config.key_range.clone(),
179 self.sstable_store.clone(),
180 task_progress.clone(),
181 compactor_iter_max_io_retry_times,
182 ));
183 } else if tables.len()
184 > self
185 .compactor
186 .context
187 .storage_opts
188 .compactor_max_overlap_sst_count
189 {
190 let sst_groups = partition_overlapping_sstable_infos(tables);
191 tracing::warn!(
192 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
193 sst_groups.iter().map(Vec::len).sum::<usize>(),
194 sst_groups.len()
195 );
196 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
197 assert!(
199 full_key_can_concat(&table_infos),
200 "sst_group idx {:?} table_infos: {:?}",
201 idx,
202 table_infos
203 );
204 table_iters.push(ConcatSstableIterator::new(
205 table_infos,
206 self.compactor.task_config.key_range.clone(),
207 self.sstable_store.clone(),
208 task_progress.clone(),
209 compactor_iter_max_io_retry_times,
210 ));
211 }
212 } else {
213 for table_info in tables {
214 table_iters.push(ConcatSstableIterator::new(
215 vec![table_info],
216 self.compactor.task_config.key_range.clone(),
217 self.sstable_store.clone(),
218 task_progress.clone(),
219 compactor_iter_max_io_retry_times,
220 ));
221 }
222 }
223 }
224
225 let combine_iter = {
230 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
231 MonitoredCompactorIterator::new(
232 MergeIterator::for_compactor(table_iters),
233 task_progress,
234 ),
235 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
236 self.compact_task.pk_prefix_table_watermarks.clone(),
237 ),
238 );
239
240 let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
241 skip_watermark_iter,
242 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
243 self.compact_task.non_pk_prefix_table_watermarks.clone(),
244 compaction_catalog_agent_ref.clone(),
245 ),
246 );
247
248 ValueSkipWatermarkIterator::new(
249 pk_skip_watermark_iter,
250 ValueSkipWatermarkState::from_safe_epoch_watermarks(
251 self.compact_task.value_table_watermarks.clone(),
252 compaction_catalog_agent_ref,
253 ),
254 )
255 };
256
257 Ok(combine_iter)
258 }
259}
260
261pub fn partition_overlapping_sstable_infos(
262 mut origin_infos: Vec<SstableInfo>,
263) -> Vec<Vec<SstableInfo>> {
264 pub struct SstableGroup {
265 ssts: Vec<SstableInfo>,
266 max_right_bound: Bytes,
267 }
268
269 impl PartialEq for SstableGroup {
270 fn eq(&self, other: &SstableGroup) -> bool {
271 self.max_right_bound == other.max_right_bound
272 }
273 }
274 impl PartialOrd for SstableGroup {
275 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
276 Some(self.cmp(other))
277 }
278 }
279 impl Eq for SstableGroup {}
280 impl Ord for SstableGroup {
281 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
282 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
284 }
285 }
286 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
287 origin_infos.sort_by(|a, b| {
288 let x = &a.key_range;
289 let y = &b.key_range;
290 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
291 });
292 for sst in origin_infos {
293 if let Some(mut prev_group) = groups.peek_mut()
295 && KeyComparator::encoded_full_key_less_than(
296 &prev_group.max_right_bound,
297 &sst.key_range.left,
298 )
299 {
300 prev_group.max_right_bound.clone_from(&sst.key_range.right);
301 prev_group.ssts.push(sst);
302 continue;
303 }
304 groups.push(SstableGroup {
305 max_right_bound: sst.key_range.right.clone(),
306 ssts: vec![sst],
307 });
308 }
309 assert!(!groups.is_empty());
310 groups.into_iter().map(|group| group.ssts).collect_vec()
311}
312
313pub async fn compact_with_agent(
316 compactor_context: CompactorContext,
317 mut compact_task: CompactTask,
318 mut shutdown_rx: Receiver<()>,
319 object_id_getter: Arc<dyn GetObjectId>,
320 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
321) -> (
322 (
323 CompactTask,
324 HashMap<TableId, TableStats>,
325 HashMap<HummockSstableObjectId, u64>,
326 ),
327 Option<MemoryTracker>,
328) {
329 let context = compactor_context.clone();
330 let group_label = compact_task.compaction_group_id.to_string();
331 metrics_report_for_task(&compact_task, &context);
332
333 let timer = context
334 .compactor_metrics
335 .compact_task_duration
336 .with_label_values(&[
337 &group_label,
338 &compact_task.input_ssts[0].level_idx.to_string(),
339 ])
340 .start_timer();
341
342 let multi_filter = build_multi_compaction_filter(&compact_task);
343 let mut task_status = TaskStatus::Success;
344 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
345
346 if let Err(e) =
347 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
348 {
349 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
350 task_status = TaskStatus::ExecuteFailed;
351 return (
352 compact_done(compact_task, context.clone(), vec![], task_status),
353 None,
354 );
355 }
356
357 let compact_task_statistics = statistics_compact_task(&compact_task);
358 let parallelism = compact_task.splits.len();
360 assert_ne!(parallelism, 0, "splits cannot be empty");
361 let mut output_ssts = Vec::with_capacity(parallelism);
362 let mut compaction_futures = vec![];
363 let mut abort_handles = vec![];
364 let task_progress_guard =
365 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
366
367 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
368
369 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
370 &compact_task,
371 (context.storage_opts.block_size_kb as u64) * (1 << 10),
372 context
373 .storage_opts
374 .object_store_config
375 .s3
376 .recv_buffer_size
377 .unwrap_or(6 * 1024 * 1024) as u64,
378 capacity as u64,
379 ) * compact_task.splits.len() as u64;
380
381 tracing::info!(
382 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
383 compact_task.task_id,
384 compact_task_statistics,
385 compact_task.compression_algorithm,
386 parallelism,
387 task_memory_capacity_with_parallelism,
388 optimize_by_copy_block,
389 compact_task_to_string(&compact_task),
390 );
391
392 let memory_detector = context
395 .memory_limiter
396 .try_require_memory(task_memory_capacity_with_parallelism);
397 if memory_detector.is_none() {
398 tracing::warn!(
399 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
400 compact_task.task_id,
401 task_memory_capacity_with_parallelism,
402 context.memory_limiter.get_memory_usage(),
403 context.memory_limiter.quota()
404 );
405 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
406 return (
407 compact_done(compact_task, context.clone(), output_ssts, task_status),
408 memory_detector,
409 );
410 }
411
412 context.compactor_metrics.compact_task_pending_num.inc();
413 context
414 .compactor_metrics
415 .compact_task_pending_parallelism
416 .add(parallelism as _);
417 let _release_metrics_guard =
418 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
419 context.compactor_metrics.compact_task_pending_num.dec();
420 context
421 .compactor_metrics
422 .compact_task_pending_parallelism
423 .sub(parallelism as _);
424 });
425
426 if optimize_by_copy_block {
427 let runner = fast_compactor_runner::CompactorRunner::new(
428 context.clone(),
429 compact_task.clone(),
430 compaction_catalog_agent_ref.clone(),
431 object_id_getter.clone(),
432 task_progress_guard.progress.clone(),
433 multi_filter,
434 );
435
436 tokio::select! {
437 _ = &mut shutdown_rx => {
438 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
439 task_status = TaskStatus::ManualCanceled;
440 },
441
442 ret = runner.run() => {
443 match ret {
444 Ok((ssts, statistics)) => {
445 output_ssts.push((0, ssts, statistics));
446 }
447 Err(e) => {
448 task_status = TaskStatus::ExecuteFailed;
449 tracing::warn!(
450 error = %e.as_report(),
451 "Compaction task {} failed with error",
452 compact_task.task_id,
453 );
454 }
455 }
456 }
457 }
458
459 let (compact_task, table_stats, object_timestamps) =
461 compact_done(compact_task, context.clone(), output_ssts, task_status);
462 let cost_time = timer.stop_and_record() * 1000.0;
463 tracing::info!(
464 "Finished fast compaction task in {:?}ms: {}",
465 cost_time,
466 compact_task_to_string(&compact_task)
467 );
468 return (
469 (compact_task, table_stats, object_timestamps),
470 memory_detector,
471 );
472 }
473 for (split_index, _) in compact_task.splits.iter().enumerate() {
474 let filter = multi_filter.clone();
475 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
476 let compactor_runner = CompactorRunner::new(
477 split_index,
478 compactor_context.clone(),
479 compact_task.clone(),
480 object_id_getter.clone(),
481 );
482 let task_progress = task_progress_guard.progress.clone();
483 let runner = async move {
484 compactor_runner
485 .run(filter, compaction_catalog_agent_ref, task_progress)
486 .await
487 };
488 let traced = match context.await_tree_reg.as_ref() {
489 None => runner.right_future(),
490 Some(await_tree_reg) => await_tree_reg
491 .register(
492 await_tree_key::CompactRunner {
493 task_id: compact_task.task_id,
494 split_index,
495 },
496 format!(
497 "Compaction Task {} Split {} ",
498 compact_task.task_id, split_index
499 ),
500 )
501 .instrument(runner)
502 .left_future(),
503 };
504 let handle = tokio::spawn(traced);
505 abort_handles.push(handle.abort_handle());
506 compaction_futures.push(handle);
507 }
508
509 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
510 loop {
511 tokio::select! {
512 _ = &mut shutdown_rx => {
513 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
514 task_status = TaskStatus::ManualCanceled;
515 break;
516 }
517 future_result = buffered.next() => {
518 match future_result {
519 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
520 output_ssts.push((split_index, ssts, compact_stat));
521 }
522 Some(Ok(Err(e))) => {
523 task_status = TaskStatus::ExecuteFailed;
524 tracing::warn!(
525 error = %e.as_report(),
526 "Compaction task {} failed with error",
527 compact_task.task_id,
528 );
529 break;
530 }
531 Some(Err(e)) => {
532 task_status = TaskStatus::JoinHandleFailed;
533 tracing::warn!(
534 error = %e.as_report(),
535 "Compaction task {} failed with join handle error",
536 compact_task.task_id,
537 );
538 break;
539 }
540 None => break,
541 }
542 }
543 }
544 }
545
546 if task_status != TaskStatus::Success {
547 for abort_handle in abort_handles {
548 abort_handle.abort();
549 }
550 output_ssts.clear();
551 }
552 if !output_ssts.is_empty() {
554 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
555 }
556
557 let (compact_task, table_stats, object_timestamps) =
559 compact_done(compact_task, context.clone(), output_ssts, task_status);
560 let cost_time = timer.stop_and_record() * 1000.0;
561 tracing::info!(
562 "Finished compaction task in {:?}ms: {}",
563 cost_time,
564 compact_task_output_to_string(&compact_task)
565 );
566 (
567 (compact_task, table_stats, object_timestamps),
568 memory_detector,
569 )
570}
571
572pub async fn compact(
575 compactor_context: CompactorContext,
576 compact_task: CompactTask,
577 shutdown_rx: Receiver<()>,
578 object_id_getter: Arc<dyn GetObjectId>,
579 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
580) -> (
581 (
582 CompactTask,
583 HashMap<TableId, TableStats>,
584 HashMap<HummockSstableObjectId, u64>,
585 ),
586 Option<MemoryTracker>,
587) {
588 let read_table_ids = compact_task.get_table_ids_from_input_ssts().collect_vec();
589 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
590 .acquire(read_table_ids.clone())
591 .await
592 {
593 Ok(compaction_catalog_agent_ref) => {
594 let acquire_table_ids: HashSet<StateTableId> =
595 compaction_catalog_agent_ref.table_ids().collect();
596 if acquire_table_ids.len() != read_table_ids.len() {
597 let diff = read_table_ids
598 .into_iter()
599 .collect::<HashSet<_>>()
600 .symmetric_difference(&acquire_table_ids)
601 .cloned()
602 .collect::<Vec<_>>();
603 tracing::warn!(
604 dif= ?diff,
605 "Some table ids are not acquired."
606 );
607 return (
608 compact_done(
609 compact_task,
610 compactor_context.clone(),
611 vec![],
612 TaskStatus::ExecuteFailed,
613 ),
614 None,
615 );
616 }
617
618 compaction_catalog_agent_ref
619 }
620 Err(e) => {
621 tracing::warn!(
622 error = %e.as_report(),
623 "Failed to acquire compaction catalog agent"
624 );
625 return (
626 compact_done(
627 compact_task,
628 compactor_context.clone(),
629 vec![],
630 TaskStatus::ExecuteFailed,
631 ),
632 None,
633 );
634 }
635 };
636
637 compact_with_agent(
638 compactor_context,
639 compact_task,
640 shutdown_rx,
641 object_id_getter,
642 compaction_catalog_agent_ref,
643 )
644 .await
645}
646
647pub(crate) fn compact_done(
649 mut compact_task: CompactTask,
650 context: CompactorContext,
651 output_ssts: Vec<CompactOutput>,
652 task_status: TaskStatus,
653) -> (
654 CompactTask,
655 HashMap<TableId, TableStats>,
656 HashMap<HummockSstableObjectId, u64>,
657) {
658 let mut table_stats_map = TableStatsMap::default();
659 let mut object_timestamps = HashMap::default();
660 compact_task.task_status = task_status;
661 compact_task
662 .sorted_output_ssts
663 .reserve(compact_task.splits.len());
664 let mut compaction_write_bytes = 0;
665 for (
666 _,
667 ssts,
668 CompactionStatistics {
669 delta_drop_stat, ..
670 },
671 ) in output_ssts
672 {
673 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
674 for sst_info in ssts {
675 compaction_write_bytes += sst_info.file_size();
676 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
677 compact_task.sorted_output_ssts.push(sst_info.sst_info);
678 }
679 }
680 assert_output_table_ids_subset_of_read_input(&compact_task);
681
682 let group_label = compact_task.compaction_group_id.to_string();
683 let level_label = compact_task.target_level.to_string();
684 context
685 .compactor_metrics
686 .compact_write_bytes
687 .with_label_values(&[&group_label, &level_label])
688 .inc_by(compaction_write_bytes);
689 context
690 .compactor_metrics
691 .compact_write_sstn
692 .with_label_values(&[&group_label, &level_label])
693 .inc_by(compact_task.sorted_output_ssts.len() as u64);
694
695 (compact_task, table_stats_map, object_timestamps)
696}
697
698fn assert_output_table_ids_subset_of_read_input(compact_task: &CompactTask) {
700 let read_table_ids = compact_task
701 .get_table_ids_from_input_ssts()
702 .collect::<HashSet<_>>();
703
704 for output_sst in &compact_task.sorted_output_ssts {
705 assert!(
706 output_sst
707 .table_ids
708 .iter()
709 .all(|table_id| read_table_ids.contains(table_id)),
710 "compaction output contains table ids outside task read input: task_id={}, output_sst_id={}, read_table_ids={:?}, output_table_ids={:?}",
711 compact_task.task_id,
712 output_sst.sst_id,
713 read_table_ids,
714 output_sst.table_ids,
715 );
716 }
717}
718
719pub async fn compact_and_build_sst<F>(
720 sst_builder: &mut CapacitySplitTableBuilder<F>,
721 task_config: &TaskConfig,
722 compactor_metrics: Arc<CompactorMetrics>,
723 mut iter: impl HummockIterator<Direction = Forward>,
724 mut compaction_filter: impl CompactionFilter,
725) -> HummockResult<CompactionStatistics>
726where
727 F: TableBuilderFactory,
728{
729 if !task_config.key_range.left.is_empty() {
730 let full_key = FullKey::decode(&task_config.key_range.left);
731 iter.seek(full_key)
732 .instrument_await("iter_seek".verbose())
733 .await?;
734 } else {
735 iter.rewind().instrument_await("rewind".verbose()).await?;
736 };
737
738 let end_key = if task_config.key_range.right.is_empty() {
739 FullKey::default()
740 } else {
741 FullKey::decode(&task_config.key_range.right).to_vec()
742 };
743 let max_key = end_key.to_ref();
744
745 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
746 let mut local_stats = StoreLocalStatistic::default();
747
748 let mut table_stats_drop = TableStatsMap::default();
750 let mut last_table_stats = TableStats::default();
751 let mut last_table_id = None;
752 let mut compaction_statistics = CompactionStatistics::default();
753 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
755 let schemas: HashMap<TableId, HashSet<i32>> = task_config
756 .table_schemas
757 .iter()
758 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
759 .collect();
760 while iter.is_valid() {
761 let iter_key = iter.key();
762 compaction_statistics.iter_total_key_counts += 1;
763
764 let is_new_user_key = full_key_tracker.observe(iter.key());
765 let mut drop = false;
766
767 let value = iter.value();
769 let ValueMeta {
770 object_id,
771 block_id,
772 } = iter.value_meta();
773 if is_new_user_key {
774 if !max_key.is_empty() && iter_key >= max_key {
775 break;
776 }
777 if value.is_delete() {
778 local_stats.skip_delete_key_count += 1;
779 }
780 } else {
781 local_stats.skip_multi_version_key_count += 1;
782 }
783
784 if last_table_id != Some(iter_key.user_key.table_id) {
785 if let Some(last_table_id) = last_table_id.take() {
786 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
787 }
788 last_table_id = Some(iter_key.user_key.table_id);
789 }
790
791 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
798 || (!task_config.retain_multiple_version && !is_new_user_key)
799 {
800 drop = true;
801 }
802
803 if !drop && compaction_filter.should_delete(iter_key) {
804 drop = true;
805 }
806
807 if drop {
808 compaction_statistics.iter_drop_key_counts += 1;
809
810 last_table_stats.total_key_count -= 1;
811 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
812 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
813 iter.next()
814 .instrument_await("iter_next_in_drop".verbose())
815 .await?;
816 continue;
817 }
818
819 let check_table_id = iter_key.user_key.table_id;
821 let mut is_value_rewritten = false;
822 if let HummockValue::Put(v) = value
823 && let Some(object_id) = object_id
824 && let Some(block_id) = block_id
825 && !skip_schema_check
826 .get(&object_id)
827 .map(|prev_block_id| {
828 assert!(*prev_block_id <= block_id);
829 *prev_block_id == block_id
830 })
831 .unwrap_or(false)
832 && let Some(schema) = schemas.get(&check_table_id)
833 {
834 let value_size = v.len();
835 match try_drop_invalid_columns(v, schema) {
836 None => {
837 if !task_config.disable_drop_column_optimization {
838 skip_schema_check.insert(object_id, block_id);
841 }
842 }
843 Some(new_value) => {
844 is_value_rewritten = true;
845 let new_put = HummockValue::put(new_value.as_slice());
846 sst_builder
847 .add_full_key(iter_key, new_put, is_new_user_key)
848 .instrument_await("add_rewritten_full_key".verbose())
849 .await?;
850 let value_size_change = value_size as i64 - new_value.len() as i64;
851 assert!(value_size_change >= 0);
852 last_table_stats.total_value_size -= value_size_change;
853 }
854 }
855 }
856
857 if !is_value_rewritten {
858 sst_builder
860 .add_full_key(iter_key, value, is_new_user_key)
861 .instrument_await("add_full_key".verbose())
862 .await?;
863 }
864
865 iter.next().instrument_await("iter_next".verbose()).await?;
866 }
867
868 if let Some(last_table_id) = last_table_id.take() {
869 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
870 }
871 iter.collect_local_statistic(&mut local_stats);
872 add_table_stats_map(
873 &mut table_stats_drop,
874 &local_stats.skipped_by_watermark_table_stats,
875 );
876 local_stats.report_compactor(compactor_metrics.as_ref());
877 compaction_statistics.delta_drop_stat = table_stats_drop;
878
879 Ok(compaction_statistics)
880}
881
882#[cfg(test)]
883pub mod tests {
884 use risingwave_hummock_sdk::can_concat;
885
886 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
887 use crate::hummock::iterator::test_utils::mock_sstable_store;
888 use crate::hummock::test_utils::{
889 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
890 };
891 use crate::hummock::value::HummockValue;
892
893 #[tokio::test]
894 async fn test_partition_overlapping_level() {
895 const TEST_KEYS_COUNT: usize = 10;
896 let sstable_store = mock_sstable_store().await;
897 let mut table_infos = vec![];
898 for object_id in 0..10 {
899 let start_index = object_id * TEST_KEYS_COUNT;
900 let end_index = start_index + 2 * TEST_KEYS_COUNT;
901 let table_info = gen_test_sstable_info(
902 default_builder_opt_for_test(),
903 object_id as u64,
904 (start_index..end_index)
905 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
906 sstable_store.clone(),
907 )
908 .await;
909 table_infos.push(table_info);
910 }
911 let table_infos = partition_overlapping_sstable_infos(table_infos);
912 assert_eq!(table_infos.len(), 2);
913 for ssts in table_infos {
914 assert!(can_concat(&ssts));
915 }
916 }
917}