1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48 metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54 fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59 ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65 CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66 SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70 compact_task: CompactTask,
71 compactor: Compactor,
72 sstable_store: SstableStoreRef,
73 key_range: KeyRange,
74 split_index: usize,
75}
76
77impl CompactorRunner {
78 pub fn new(
79 split_index: usize,
80 context: CompactorContext,
81 task: CompactTask,
82 object_id_getter: Arc<dyn GetObjectId>,
83 ) -> Self {
84 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85 options.compression_algorithm = match task.compression_algorithm {
86 0 => CompressionAlgorithm::None,
87 1 => CompressionAlgorithm::Lz4,
88 _ => CompressionAlgorithm::Zstd,
89 };
90
91 options.capacity = estimate_task_output_capacity(context.clone(), &task);
92 options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
93 let use_block_based_filter = task.should_use_block_based_filter();
94
95 let key_range = KeyRange {
96 left: task.splits[split_index].left.clone(),
97 right: task.splits[split_index].right.clone(),
98 right_exclusive: true,
99 };
100
101 let compactor = Compactor::new(
102 context.clone(),
103 options,
104 TaskConfig {
105 key_range: key_range.clone(),
106 cache_policy: CachePolicy::NotFill,
107 gc_delete_keys: task.gc_delete_keys,
108 retain_multiple_version: false,
109 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
110 use_block_based_filter,
111 table_vnode_partition: task.table_vnode_partition.clone(),
112 table_schemas: task
113 .table_schemas
114 .iter()
115 .map(|(k, v)| (*k, v.clone()))
116 .collect(),
117 disable_drop_column_optimization: false,
118 },
119 object_id_getter,
120 );
121
122 Self {
123 compactor,
124 compact_task: task,
125 sstable_store: context.sstable_store,
126 key_range,
127 split_index,
128 }
129 }
130
131 pub async fn run(
132 &self,
133 compaction_filter: impl CompactionFilter,
134 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135 task_progress: Arc<TaskProgress>,
136 ) -> HummockResult<CompactOutput> {
137 let iter =
138 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139 let (ssts, compaction_stat) = self
140 .compactor
141 .compact_key_range(
142 iter,
143 compaction_filter,
144 compaction_catalog_agent_ref,
145 Some(task_progress),
146 Some(self.compact_task.task_id),
147 Some(self.split_index),
148 )
149 .await?;
150 Ok((self.split_index, ssts, compaction_stat))
151 }
152
153 fn build_sst_iter(
155 &self,
156 task_progress: Arc<TaskProgress>,
157 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159 let compactor_iter_max_io_retry_times = self
160 .compactor
161 .context
162 .storage_opts
163 .compactor_iter_max_io_retry_times;
164 let mut table_iters = Vec::new();
165 let compact_table_ids = self.compact_task.build_compact_table_ids();
166
167 for level in &self.compact_task.input_ssts {
168 if level.table_infos.is_empty() {
169 continue;
170 }
171
172 let tables = level
173 .table_infos
174 .iter()
175 .filter(|table_info| {
176 let table_ids = &table_info.table_ids;
177 let exist_table = table_ids
178 .iter()
179 .any(|table_id| compact_table_ids.contains(table_id));
180
181 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
182 })
183 .cloned()
184 .collect_vec();
185 if level.level_type == LevelType::Nonoverlapping {
187 debug_assert!(can_concat(&level.table_infos));
188 table_iters.push(ConcatSstableIterator::new(
189 compact_table_ids.clone(),
190 tables,
191 self.compactor.task_config.key_range.clone(),
192 self.sstable_store.clone(),
193 task_progress.clone(),
194 compactor_iter_max_io_retry_times,
195 ));
196 } else if tables.len()
197 > self
198 .compactor
199 .context
200 .storage_opts
201 .compactor_max_overlap_sst_count
202 {
203 let sst_groups = partition_overlapping_sstable_infos(tables);
204 tracing::warn!(
205 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
206 level.table_infos.len(),
207 sst_groups.len()
208 );
209 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
210 assert!(
212 full_key_can_concat(&table_infos),
213 "sst_group idx {:?} table_infos: {:?}",
214 idx,
215 table_infos
216 );
217 table_iters.push(ConcatSstableIterator::new(
218 compact_table_ids.clone(),
219 table_infos,
220 self.compactor.task_config.key_range.clone(),
221 self.sstable_store.clone(),
222 task_progress.clone(),
223 compactor_iter_max_io_retry_times,
224 ));
225 }
226 } else {
227 for table_info in tables {
228 table_iters.push(ConcatSstableIterator::new(
229 compact_table_ids.clone(),
230 vec![table_info],
231 self.compactor.task_config.key_range.clone(),
232 self.sstable_store.clone(),
233 task_progress.clone(),
234 compactor_iter_max_io_retry_times,
235 ));
236 }
237 }
238 }
239
240 let combine_iter = {
245 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
246 MonitoredCompactorIterator::new(
247 MergeIterator::for_compactor(table_iters),
248 task_progress,
249 ),
250 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
251 self.compact_task.pk_prefix_table_watermarks.clone(),
252 ),
253 );
254
255 let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
256 skip_watermark_iter,
257 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
258 self.compact_task.non_pk_prefix_table_watermarks.clone(),
259 compaction_catalog_agent_ref.clone(),
260 ),
261 );
262
263 ValueSkipWatermarkIterator::new(
264 pk_skip_watermark_iter,
265 ValueSkipWatermarkState::from_safe_epoch_watermarks(
266 self.compact_task.value_table_watermarks.clone(),
267 compaction_catalog_agent_ref,
268 ),
269 )
270 };
271
272 Ok(combine_iter)
273 }
274}
275
276pub fn partition_overlapping_sstable_infos(
277 mut origin_infos: Vec<SstableInfo>,
278) -> Vec<Vec<SstableInfo>> {
279 pub struct SstableGroup {
280 ssts: Vec<SstableInfo>,
281 max_right_bound: Bytes,
282 }
283
284 impl PartialEq for SstableGroup {
285 fn eq(&self, other: &SstableGroup) -> bool {
286 self.max_right_bound == other.max_right_bound
287 }
288 }
289 impl PartialOrd for SstableGroup {
290 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
291 Some(self.cmp(other))
292 }
293 }
294 impl Eq for SstableGroup {}
295 impl Ord for SstableGroup {
296 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
297 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
299 }
300 }
301 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
302 origin_infos.sort_by(|a, b| {
303 let x = &a.key_range;
304 let y = &b.key_range;
305 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
306 });
307 for sst in origin_infos {
308 if let Some(mut prev_group) = groups.peek_mut()
310 && KeyComparator::encoded_full_key_less_than(
311 &prev_group.max_right_bound,
312 &sst.key_range.left,
313 )
314 {
315 prev_group.max_right_bound.clone_from(&sst.key_range.right);
316 prev_group.ssts.push(sst);
317 continue;
318 }
319 groups.push(SstableGroup {
320 max_right_bound: sst.key_range.right.clone(),
321 ssts: vec![sst],
322 });
323 }
324 assert!(!groups.is_empty());
325 groups.into_iter().map(|group| group.ssts).collect_vec()
326}
327
328pub async fn compact_with_agent(
331 compactor_context: CompactorContext,
332 mut compact_task: CompactTask,
333 mut shutdown_rx: Receiver<()>,
334 object_id_getter: Arc<dyn GetObjectId>,
335 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
336) -> (
337 (
338 CompactTask,
339 HashMap<TableId, TableStats>,
340 HashMap<HummockSstableObjectId, u64>,
341 ),
342 Option<MemoryTracker>,
343) {
344 let context = compactor_context.clone();
345 let group_label = compact_task.compaction_group_id.to_string();
346 metrics_report_for_task(&compact_task, &context);
347
348 let timer = context
349 .compactor_metrics
350 .compact_task_duration
351 .with_label_values(&[
352 &group_label,
353 &compact_task.input_ssts[0].level_idx.to_string(),
354 ])
355 .start_timer();
356
357 let multi_filter = build_multi_compaction_filter(&compact_task);
358 let mut task_status = TaskStatus::Success;
359 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
360
361 if let Err(e) =
362 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
363 {
364 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
365 task_status = TaskStatus::ExecuteFailed;
366 return (
367 compact_done(compact_task, context.clone(), vec![], task_status),
368 None,
369 );
370 }
371
372 let compact_task_statistics = statistics_compact_task(&compact_task);
373 let parallelism = compact_task.splits.len();
375 assert_ne!(parallelism, 0, "splits cannot be empty");
376 let mut output_ssts = Vec::with_capacity(parallelism);
377 let mut compaction_futures = vec![];
378 let mut abort_handles = vec![];
379 let task_progress_guard =
380 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
381
382 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
383
384 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
385 &compact_task,
386 (context.storage_opts.block_size_kb as u64) * (1 << 10),
387 context
388 .storage_opts
389 .object_store_config
390 .s3
391 .recv_buffer_size
392 .unwrap_or(6 * 1024 * 1024) as u64,
393 capacity as u64,
394 ) * compact_task.splits.len() as u64;
395
396 tracing::info!(
397 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
398 compact_task.task_id,
399 compact_task_statistics,
400 compact_task.compression_algorithm,
401 parallelism,
402 task_memory_capacity_with_parallelism,
403 optimize_by_copy_block,
404 compact_task_to_string(&compact_task),
405 );
406
407 let memory_detector = context
410 .memory_limiter
411 .try_require_memory(task_memory_capacity_with_parallelism);
412 if memory_detector.is_none() {
413 tracing::warn!(
414 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
415 compact_task.task_id,
416 task_memory_capacity_with_parallelism,
417 context.memory_limiter.get_memory_usage(),
418 context.memory_limiter.quota()
419 );
420 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
421 return (
422 compact_done(compact_task, context.clone(), output_ssts, task_status),
423 memory_detector,
424 );
425 }
426
427 context.compactor_metrics.compact_task_pending_num.inc();
428 context
429 .compactor_metrics
430 .compact_task_pending_parallelism
431 .add(parallelism as _);
432 let _release_metrics_guard =
433 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
434 context.compactor_metrics.compact_task_pending_num.dec();
435 context
436 .compactor_metrics
437 .compact_task_pending_parallelism
438 .sub(parallelism as _);
439 });
440
441 if optimize_by_copy_block {
442 let runner = fast_compactor_runner::CompactorRunner::new(
443 context.clone(),
444 compact_task.clone(),
445 compaction_catalog_agent_ref.clone(),
446 object_id_getter.clone(),
447 task_progress_guard.progress.clone(),
448 multi_filter,
449 );
450
451 tokio::select! {
452 _ = &mut shutdown_rx => {
453 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
454 task_status = TaskStatus::ManualCanceled;
455 },
456
457 ret = runner.run() => {
458 match ret {
459 Ok((ssts, statistics)) => {
460 output_ssts.push((0, ssts, statistics));
461 }
462 Err(e) => {
463 task_status = TaskStatus::ExecuteFailed;
464 tracing::warn!(
465 error = %e.as_report(),
466 "Compaction task {} failed with error",
467 compact_task.task_id,
468 );
469 }
470 }
471 }
472 }
473
474 let (compact_task, table_stats, object_timestamps) =
476 compact_done(compact_task, context.clone(), output_ssts, task_status);
477 let cost_time = timer.stop_and_record() * 1000.0;
478 tracing::info!(
479 "Finished fast compaction task in {:?}ms: {}",
480 cost_time,
481 compact_task_to_string(&compact_task)
482 );
483 return (
484 (compact_task, table_stats, object_timestamps),
485 memory_detector,
486 );
487 }
488 for (split_index, _) in compact_task.splits.iter().enumerate() {
489 let filter = multi_filter.clone();
490 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
491 let compactor_runner = CompactorRunner::new(
492 split_index,
493 compactor_context.clone(),
494 compact_task.clone(),
495 object_id_getter.clone(),
496 );
497 let task_progress = task_progress_guard.progress.clone();
498 let runner = async move {
499 compactor_runner
500 .run(filter, compaction_catalog_agent_ref, task_progress)
501 .await
502 };
503 let traced = match context.await_tree_reg.as_ref() {
504 None => runner.right_future(),
505 Some(await_tree_reg) => await_tree_reg
506 .register(
507 await_tree_key::CompactRunner {
508 task_id: compact_task.task_id,
509 split_index,
510 },
511 format!(
512 "Compaction Task {} Split {} ",
513 compact_task.task_id, split_index
514 ),
515 )
516 .instrument(runner)
517 .left_future(),
518 };
519 let handle = tokio::spawn(traced);
520 abort_handles.push(handle.abort_handle());
521 compaction_futures.push(handle);
522 }
523
524 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
525 loop {
526 tokio::select! {
527 _ = &mut shutdown_rx => {
528 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
529 task_status = TaskStatus::ManualCanceled;
530 break;
531 }
532 future_result = buffered.next() => {
533 match future_result {
534 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
535 output_ssts.push((split_index, ssts, compact_stat));
536 }
537 Some(Ok(Err(e))) => {
538 task_status = TaskStatus::ExecuteFailed;
539 tracing::warn!(
540 error = %e.as_report(),
541 "Compaction task {} failed with error",
542 compact_task.task_id,
543 );
544 break;
545 }
546 Some(Err(e)) => {
547 task_status = TaskStatus::JoinHandleFailed;
548 tracing::warn!(
549 error = %e.as_report(),
550 "Compaction task {} failed with join handle error",
551 compact_task.task_id,
552 );
553 break;
554 }
555 None => break,
556 }
557 }
558 }
559 }
560
561 if task_status != TaskStatus::Success {
562 for abort_handle in abort_handles {
563 abort_handle.abort();
564 }
565 output_ssts.clear();
566 }
567 if !output_ssts.is_empty() {
569 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
570 }
571
572 let (compact_task, table_stats, object_timestamps) =
574 compact_done(compact_task, context.clone(), output_ssts, task_status);
575 let cost_time = timer.stop_and_record() * 1000.0;
576 tracing::info!(
577 "Finished compaction task in {:?}ms: {}",
578 cost_time,
579 compact_task_output_to_string(&compact_task)
580 );
581 (
582 (compact_task, table_stats, object_timestamps),
583 memory_detector,
584 )
585}
586
587pub async fn compact(
590 compactor_context: CompactorContext,
591 mut compact_task: CompactTask,
592 shutdown_rx: Receiver<()>,
593 object_id_getter: Arc<dyn GetObjectId>,
594 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
595) -> (
596 (
597 CompactTask,
598 HashMap<TableId, TableStats>,
599 HashMap<HummockSstableObjectId, u64>,
600 ),
601 Option<MemoryTracker>,
602) {
603 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
604 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
605 .acquire(table_ids_to_be_compacted.clone())
606 .await
607 {
608 Ok(compaction_catalog_agent_ref) => {
609 let acquire_table_ids: HashSet<StateTableId> =
610 compaction_catalog_agent_ref.table_ids().collect();
611 if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
612 let diff = table_ids_to_be_compacted
613 .into_iter()
614 .collect::<HashSet<_>>()
615 .symmetric_difference(&acquire_table_ids)
616 .cloned()
617 .collect::<Vec<_>>();
618 tracing::warn!(
619 dif= ?diff,
620 "Some table ids are not acquired."
621 );
622 return (
623 compact_done(
624 compact_task,
625 compactor_context.clone(),
626 vec![],
627 TaskStatus::ExecuteFailed,
628 ),
629 None,
630 );
631 }
632
633 compaction_catalog_agent_ref
634 }
635 Err(e) => {
636 tracing::warn!(
637 error = %e.as_report(),
638 "Failed to acquire compaction catalog agent"
639 );
640 return (
641 compact_done(
642 compact_task,
643 compactor_context.clone(),
644 vec![],
645 TaskStatus::ExecuteFailed,
646 ),
647 None,
648 );
649 }
650 };
651
652 {
654 compact_task
655 .pk_prefix_table_watermarks
656 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
657
658 compact_task
659 .non_pk_prefix_table_watermarks
660 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
661 }
662
663 compact_with_agent(
664 compactor_context,
665 compact_task,
666 shutdown_rx,
667 object_id_getter,
668 compaction_catalog_agent_ref,
669 )
670 .await
671}
672
673pub(crate) fn compact_done(
675 mut compact_task: CompactTask,
676 context: CompactorContext,
677 output_ssts: Vec<CompactOutput>,
678 task_status: TaskStatus,
679) -> (
680 CompactTask,
681 HashMap<TableId, TableStats>,
682 HashMap<HummockSstableObjectId, u64>,
683) {
684 let mut table_stats_map = TableStatsMap::default();
685 let mut object_timestamps = HashMap::default();
686 compact_task.task_status = task_status;
687 compact_task
688 .sorted_output_ssts
689 .reserve(compact_task.splits.len());
690 let mut compaction_write_bytes = 0;
691 for (
692 _,
693 ssts,
694 CompactionStatistics {
695 delta_drop_stat, ..
696 },
697 ) in output_ssts
698 {
699 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
700 for sst_info in ssts {
701 compaction_write_bytes += sst_info.file_size();
702 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
703 compact_task.sorted_output_ssts.push(sst_info.sst_info);
704 }
705 }
706
707 let group_label = compact_task.compaction_group_id.to_string();
708 let level_label = compact_task.target_level.to_string();
709 context
710 .compactor_metrics
711 .compact_write_bytes
712 .with_label_values(&[&group_label, &level_label])
713 .inc_by(compaction_write_bytes);
714 context
715 .compactor_metrics
716 .compact_write_sstn
717 .with_label_values(&[&group_label, &level_label])
718 .inc_by(compact_task.sorted_output_ssts.len() as u64);
719
720 (compact_task, table_stats_map, object_timestamps)
721}
722
723pub async fn compact_and_build_sst<F>(
724 sst_builder: &mut CapacitySplitTableBuilder<F>,
725 task_config: &TaskConfig,
726 compactor_metrics: Arc<CompactorMetrics>,
727 mut iter: impl HummockIterator<Direction = Forward>,
728 mut compaction_filter: impl CompactionFilter,
729) -> HummockResult<CompactionStatistics>
730where
731 F: TableBuilderFactory,
732{
733 if !task_config.key_range.left.is_empty() {
734 let full_key = FullKey::decode(&task_config.key_range.left);
735 iter.seek(full_key)
736 .instrument_await("iter_seek".verbose())
737 .await?;
738 } else {
739 iter.rewind().instrument_await("rewind".verbose()).await?;
740 };
741
742 let end_key = if task_config.key_range.right.is_empty() {
743 FullKey::default()
744 } else {
745 FullKey::decode(&task_config.key_range.right).to_vec()
746 };
747 let max_key = end_key.to_ref();
748
749 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
750 let mut local_stats = StoreLocalStatistic::default();
751
752 let mut table_stats_drop = TableStatsMap::default();
754 let mut last_table_stats = TableStats::default();
755 let mut last_table_id = None;
756 let mut compaction_statistics = CompactionStatistics::default();
757 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
759 let schemas: HashMap<TableId, HashSet<i32>> = task_config
760 .table_schemas
761 .iter()
762 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
763 .collect();
764 while iter.is_valid() {
765 let iter_key = iter.key();
766 compaction_statistics.iter_total_key_counts += 1;
767
768 let is_new_user_key = full_key_tracker.observe(iter.key());
769 let mut drop = false;
770
771 let value = iter.value();
773 let ValueMeta {
774 object_id,
775 block_id,
776 } = iter.value_meta();
777 if is_new_user_key {
778 if !max_key.is_empty() && iter_key >= max_key {
779 break;
780 }
781 if value.is_delete() {
782 local_stats.skip_delete_key_count += 1;
783 }
784 } else {
785 local_stats.skip_multi_version_key_count += 1;
786 }
787
788 if last_table_id != Some(iter_key.user_key.table_id) {
789 if let Some(last_table_id) = last_table_id.take() {
790 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
791 }
792 last_table_id = Some(iter_key.user_key.table_id);
793 }
794
795 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
802 || (!task_config.retain_multiple_version && !is_new_user_key)
803 {
804 drop = true;
805 }
806
807 if !drop && compaction_filter.should_delete(iter_key) {
808 drop = true;
809 }
810
811 if drop {
812 compaction_statistics.iter_drop_key_counts += 1;
813
814 let should_count = match task_config.stats_target_table_ids.as_ref() {
815 Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
816 None => true,
817 };
818 if should_count {
819 last_table_stats.total_key_count -= 1;
820 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
821 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
822 }
823 iter.next()
824 .instrument_await("iter_next_in_drop".verbose())
825 .await?;
826 continue;
827 }
828
829 let check_table_id = iter_key.user_key.table_id;
831 let mut is_value_rewritten = false;
832 if let HummockValue::Put(v) = value
833 && let Some(object_id) = object_id
834 && let Some(block_id) = block_id
835 && !skip_schema_check
836 .get(&object_id)
837 .map(|prev_block_id| {
838 assert!(*prev_block_id <= block_id);
839 *prev_block_id == block_id
840 })
841 .unwrap_or(false)
842 && let Some(schema) = schemas.get(&check_table_id)
843 {
844 let value_size = v.len();
845 match try_drop_invalid_columns(v, schema) {
846 None => {
847 if !task_config.disable_drop_column_optimization {
848 skip_schema_check.insert(object_id, block_id);
851 }
852 }
853 Some(new_value) => {
854 is_value_rewritten = true;
855 let new_put = HummockValue::put(new_value.as_slice());
856 sst_builder
857 .add_full_key(iter_key, new_put, is_new_user_key)
858 .instrument_await("add_rewritten_full_key".verbose())
859 .await?;
860 let value_size_change = value_size as i64 - new_value.len() as i64;
861 assert!(value_size_change >= 0);
862 last_table_stats.total_value_size -= value_size_change;
863 }
864 }
865 }
866
867 if !is_value_rewritten {
868 sst_builder
870 .add_full_key(iter_key, value, is_new_user_key)
871 .instrument_await("add_full_key".verbose())
872 .await?;
873 }
874
875 iter.next().instrument_await("iter_next".verbose()).await?;
876 }
877
878 if let Some(last_table_id) = last_table_id.take() {
879 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
880 }
881 iter.collect_local_statistic(&mut local_stats);
882 add_table_stats_map(
883 &mut table_stats_drop,
884 &local_stats.skipped_by_watermark_table_stats,
885 );
886 local_stats.report_compactor(compactor_metrics.as_ref());
887 compaction_statistics.delta_drop_stat = table_stats_drop;
888
889 Ok(compaction_statistics)
890}
891
892#[cfg(test)]
893pub mod tests {
894 use risingwave_hummock_sdk::can_concat;
895
896 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
897 use crate::hummock::iterator::test_utils::mock_sstable_store;
898 use crate::hummock::test_utils::{
899 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
900 };
901 use crate::hummock::value::HummockValue;
902
903 #[tokio::test]
904 async fn test_partition_overlapping_level() {
905 const TEST_KEYS_COUNT: usize = 10;
906 let sstable_store = mock_sstable_store().await;
907 let mut table_infos = vec![];
908 for object_id in 0..10 {
909 let start_index = object_id * TEST_KEYS_COUNT;
910 let end_index = start_index + 2 * TEST_KEYS_COUNT;
911 let table_info = gen_test_sstable_info(
912 default_builder_opt_for_test(),
913 object_id as u64,
914 (start_index..end_index)
915 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
916 sstable_store.clone(),
917 )
918 .await;
919 table_infos.push(table_info);
920 }
921 let table_infos = partition_overlapping_sstable_infos(table_infos);
922 assert_eq!(table_infos.len(), 2);
923 for ssts in table_infos {
924 assert!(can_concat(&ssts));
925 }
926 }
927}