1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35 full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48 metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54 fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59 ValueMeta,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65 BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
66 SstableBuilderOptions, SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70 compact_task: CompactTask,
71 compactor: Compactor,
72 sstable_store: SstableStoreRef,
73 key_range: KeyRange,
74 split_index: usize,
75}
76
77impl CompactorRunner {
78 pub fn new(
79 split_index: usize,
80 context: CompactorContext,
81 task: CompactTask,
82 object_id_getter: Arc<dyn GetObjectId>,
83 ) -> Self {
84 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85 options.compression_algorithm = match task.compression_algorithm {
86 0 => CompressionAlgorithm::None,
87 1 => CompressionAlgorithm::Lz4,
88 _ => CompressionAlgorithm::Zstd,
89 };
90
91 options.capacity = estimate_task_output_capacity(context.clone(), &task);
92 let kv_count = task
93 .input_ssts
94 .iter()
95 .flat_map(|level| level.table_infos.iter())
96 .map(|sst| sst.total_key_count)
97 .sum::<u64>() as usize;
98 let use_block_based_filter =
99 BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
100
101 let key_range = KeyRange {
102 left: task.splits[split_index].left.clone(),
103 right: task.splits[split_index].right.clone(),
104 right_exclusive: true,
105 };
106
107 let compactor = Compactor::new(
108 context.clone(),
109 options,
110 TaskConfig {
111 key_range: key_range.clone(),
112 cache_policy: CachePolicy::NotFill,
113 gc_delete_keys: task.gc_delete_keys,
114 retain_multiple_version: false,
115 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
116 task_type: task.task_type,
117 use_block_based_filter,
118 table_vnode_partition: task.table_vnode_partition.clone(),
119 table_schemas: task
120 .table_schemas
121 .iter()
122 .map(|(k, v)| (*k, v.clone()))
123 .collect(),
124 disable_drop_column_optimization: false,
125 },
126 object_id_getter,
127 );
128
129 Self {
130 compactor,
131 compact_task: task,
132 sstable_store: context.sstable_store,
133 key_range,
134 split_index,
135 }
136 }
137
138 pub async fn run(
139 &self,
140 compaction_filter: impl CompactionFilter,
141 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142 task_progress: Arc<TaskProgress>,
143 ) -> HummockResult<CompactOutput> {
144 let iter =
145 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
146 let (ssts, compaction_stat) = self
147 .compactor
148 .compact_key_range(
149 iter,
150 compaction_filter,
151 compaction_catalog_agent_ref,
152 Some(task_progress),
153 Some(self.compact_task.task_id),
154 Some(self.split_index),
155 )
156 .await?;
157 Ok((self.split_index, ssts, compaction_stat))
158 }
159
160 fn build_sst_iter(
162 &self,
163 task_progress: Arc<TaskProgress>,
164 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
165 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
166 let compactor_iter_max_io_retry_times = self
167 .compactor
168 .context
169 .storage_opts
170 .compactor_iter_max_io_retry_times;
171 let mut table_iters = Vec::new();
172 for level in &self.compact_task.input_ssts {
173 if level.table_infos.is_empty() {
174 continue;
175 }
176
177 let tables = level
178 .table_infos
179 .iter()
180 .filter(|table_info| {
181 let table_ids = &table_info.table_ids;
182 let exist_table = table_ids
183 .iter()
184 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
185
186 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
187 })
188 .cloned()
189 .collect_vec();
190 if level.level_type == LevelType::Nonoverlapping {
192 debug_assert!(can_concat(&level.table_infos));
193 table_iters.push(ConcatSstableIterator::new(
194 self.compact_task.existing_table_ids.clone(),
195 tables,
196 self.compactor.task_config.key_range.clone(),
197 self.sstable_store.clone(),
198 task_progress.clone(),
199 compactor_iter_max_io_retry_times,
200 ));
201 } else if tables.len()
202 > self
203 .compactor
204 .context
205 .storage_opts
206 .compactor_max_overlap_sst_count
207 {
208 let sst_groups = partition_overlapping_sstable_infos(tables);
209 tracing::warn!(
210 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
211 level.table_infos.len(),
212 sst_groups.len()
213 );
214 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
215 assert!(
217 full_key_can_concat(&table_infos),
218 "sst_group idx {:?} table_infos: {:?}",
219 idx,
220 table_infos
221 );
222 table_iters.push(ConcatSstableIterator::new(
223 self.compact_task.existing_table_ids.clone(),
224 table_infos,
225 self.compactor.task_config.key_range.clone(),
226 self.sstable_store.clone(),
227 task_progress.clone(),
228 compactor_iter_max_io_retry_times,
229 ));
230 }
231 } else {
232 for table_info in tables {
233 table_iters.push(ConcatSstableIterator::new(
234 self.compact_task.existing_table_ids.clone(),
235 vec![table_info],
236 self.compactor.task_config.key_range.clone(),
237 self.sstable_store.clone(),
238 task_progress.clone(),
239 compactor_iter_max_io_retry_times,
240 ));
241 }
242 }
243 }
244
245 let combine_iter = {
248 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
249 MonitoredCompactorIterator::new(
250 MergeIterator::for_compactor(table_iters),
251 task_progress,
252 ),
253 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
254 self.compact_task.pk_prefix_table_watermarks.clone(),
255 ),
256 );
257
258 NonPkPrefixSkipWatermarkIterator::new(
259 skip_watermark_iter,
260 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
261 self.compact_task.non_pk_prefix_table_watermarks.clone(),
262 compaction_catalog_agent_ref,
263 ),
264 )
265 };
266
267 Ok(combine_iter)
268 }
269}
270
271pub fn partition_overlapping_sstable_infos(
272 mut origin_infos: Vec<SstableInfo>,
273) -> Vec<Vec<SstableInfo>> {
274 pub struct SstableGroup {
275 ssts: Vec<SstableInfo>,
276 max_right_bound: Bytes,
277 }
278
279 impl PartialEq for SstableGroup {
280 fn eq(&self, other: &SstableGroup) -> bool {
281 self.max_right_bound == other.max_right_bound
282 }
283 }
284 impl PartialOrd for SstableGroup {
285 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
286 Some(self.cmp(other))
287 }
288 }
289 impl Eq for SstableGroup {}
290 impl Ord for SstableGroup {
291 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
292 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
294 }
295 }
296 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
297 origin_infos.sort_by(|a, b| {
298 let x = &a.key_range;
299 let y = &b.key_range;
300 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
301 });
302 for sst in origin_infos {
303 if let Some(mut prev_group) = groups.peek_mut()
305 && KeyComparator::encoded_full_key_less_than(
306 &prev_group.max_right_bound,
307 &sst.key_range.left,
308 )
309 {
310 prev_group.max_right_bound.clone_from(&sst.key_range.right);
311 prev_group.ssts.push(sst);
312 continue;
313 }
314 groups.push(SstableGroup {
315 max_right_bound: sst.key_range.right.clone(),
316 ssts: vec![sst],
317 });
318 }
319 assert!(!groups.is_empty());
320 groups.into_iter().map(|group| group.ssts).collect_vec()
321}
322
323pub async fn compact_with_agent(
326 compactor_context: CompactorContext,
327 mut compact_task: CompactTask,
328 mut shutdown_rx: Receiver<()>,
329 object_id_getter: Arc<dyn GetObjectId>,
330 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331) -> (
332 (
333 CompactTask,
334 HashMap<TableId, TableStats>,
335 HashMap<HummockSstableObjectId, u64>,
336 ),
337 Option<MemoryTracker>,
338) {
339 let context = compactor_context.clone();
340 let group_label = compact_task.compaction_group_id.to_string();
341 metrics_report_for_task(&compact_task, &context);
342
343 let timer = context
344 .compactor_metrics
345 .compact_task_duration
346 .with_label_values(&[
347 &group_label,
348 &compact_task.input_ssts[0].level_idx.to_string(),
349 ])
350 .start_timer();
351
352 let multi_filter = build_multi_compaction_filter(&compact_task);
353 let mut task_status = TaskStatus::Success;
354 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
355
356 if let Err(e) =
357 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
358 {
359 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
360 task_status = TaskStatus::ExecuteFailed;
361 return (
362 compact_done(compact_task, context.clone(), vec![], task_status),
363 None,
364 );
365 }
366
367 let compact_task_statistics = statistics_compact_task(&compact_task);
368 let parallelism = compact_task.splits.len();
370 assert_ne!(parallelism, 0, "splits cannot be empty");
371 let mut output_ssts = Vec::with_capacity(parallelism);
372 let mut compaction_futures = vec![];
373 let mut abort_handles = vec![];
374 let task_progress_guard =
375 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
376
377 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
378
379 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
380 &compact_task,
381 (context.storage_opts.block_size_kb as u64) * (1 << 10),
382 context
383 .storage_opts
384 .object_store_config
385 .s3
386 .recv_buffer_size
387 .unwrap_or(6 * 1024 * 1024) as u64,
388 capacity as u64,
389 ) * compact_task.splits.len() as u64;
390
391 tracing::info!(
392 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
393 compact_task.task_id,
394 compact_task_statistics,
395 compact_task.compression_algorithm,
396 parallelism,
397 task_memory_capacity_with_parallelism,
398 optimize_by_copy_block,
399 compact_task_to_string(&compact_task),
400 );
401
402 let memory_detector = context
405 .memory_limiter
406 .try_require_memory(task_memory_capacity_with_parallelism);
407 if memory_detector.is_none() {
408 tracing::warn!(
409 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
410 compact_task.task_id,
411 task_memory_capacity_with_parallelism,
412 context.memory_limiter.get_memory_usage(),
413 context.memory_limiter.quota()
414 );
415 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
416 return (
417 compact_done(compact_task, context.clone(), output_ssts, task_status),
418 memory_detector,
419 );
420 }
421
422 context.compactor_metrics.compact_task_pending_num.inc();
423 context
424 .compactor_metrics
425 .compact_task_pending_parallelism
426 .add(parallelism as _);
427 let _release_metrics_guard =
428 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
429 context.compactor_metrics.compact_task_pending_num.dec();
430 context
431 .compactor_metrics
432 .compact_task_pending_parallelism
433 .sub(parallelism as _);
434 });
435
436 if optimize_by_copy_block {
437 let runner = fast_compactor_runner::CompactorRunner::new(
438 context.clone(),
439 compact_task.clone(),
440 compaction_catalog_agent_ref.clone(),
441 object_id_getter.clone(),
442 task_progress_guard.progress.clone(),
443 multi_filter,
444 );
445
446 tokio::select! {
447 _ = &mut shutdown_rx => {
448 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
449 task_status = TaskStatus::ManualCanceled;
450 },
451
452 ret = runner.run() => {
453 match ret {
454 Ok((ssts, statistics)) => {
455 output_ssts.push((0, ssts, statistics));
456 }
457 Err(e) => {
458 task_status = TaskStatus::ExecuteFailed;
459 tracing::warn!(
460 error = %e.as_report(),
461 "Compaction task {} failed with error",
462 compact_task.task_id,
463 );
464 }
465 }
466 }
467 }
468
469 let (compact_task, table_stats, object_timestamps) =
471 compact_done(compact_task, context.clone(), output_ssts, task_status);
472 let cost_time = timer.stop_and_record() * 1000.0;
473 tracing::info!(
474 "Finished fast compaction task in {:?}ms: {}",
475 cost_time,
476 compact_task_to_string(&compact_task)
477 );
478 return (
479 (compact_task, table_stats, object_timestamps),
480 memory_detector,
481 );
482 }
483 for (split_index, _) in compact_task.splits.iter().enumerate() {
484 let filter = multi_filter.clone();
485 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
486 let compactor_runner = CompactorRunner::new(
487 split_index,
488 compactor_context.clone(),
489 compact_task.clone(),
490 object_id_getter.clone(),
491 );
492 let task_progress = task_progress_guard.progress.clone();
493 let runner = async move {
494 compactor_runner
495 .run(filter, compaction_catalog_agent_ref, task_progress)
496 .await
497 };
498 let traced = match context.await_tree_reg.as_ref() {
499 None => runner.right_future(),
500 Some(await_tree_reg) => await_tree_reg
501 .register(
502 await_tree_key::CompactRunner {
503 task_id: compact_task.task_id,
504 split_index,
505 },
506 format!(
507 "Compaction Task {} Split {} ",
508 compact_task.task_id, split_index
509 ),
510 )
511 .instrument(runner)
512 .left_future(),
513 };
514 let handle = tokio::spawn(traced);
515 abort_handles.push(handle.abort_handle());
516 compaction_futures.push(handle);
517 }
518
519 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
520 loop {
521 tokio::select! {
522 _ = &mut shutdown_rx => {
523 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
524 task_status = TaskStatus::ManualCanceled;
525 break;
526 }
527 future_result = buffered.next() => {
528 match future_result {
529 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
530 output_ssts.push((split_index, ssts, compact_stat));
531 }
532 Some(Ok(Err(e))) => {
533 task_status = TaskStatus::ExecuteFailed;
534 tracing::warn!(
535 error = %e.as_report(),
536 "Compaction task {} failed with error",
537 compact_task.task_id,
538 );
539 break;
540 }
541 Some(Err(e)) => {
542 task_status = TaskStatus::JoinHandleFailed;
543 tracing::warn!(
544 error = %e.as_report(),
545 "Compaction task {} failed with join handle error",
546 compact_task.task_id,
547 );
548 break;
549 }
550 None => break,
551 }
552 }
553 }
554 }
555
556 if task_status != TaskStatus::Success {
557 for abort_handle in abort_handles {
558 abort_handle.abort();
559 }
560 output_ssts.clear();
561 }
562 if !output_ssts.is_empty() {
564 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
565 }
566
567 let (compact_task, table_stats, object_timestamps) =
569 compact_done(compact_task, context.clone(), output_ssts, task_status);
570 let cost_time = timer.stop_and_record() * 1000.0;
571 tracing::info!(
572 "Finished compaction task in {:?}ms: {}",
573 cost_time,
574 compact_task_output_to_string(&compact_task)
575 );
576 (
577 (compact_task, table_stats, object_timestamps),
578 memory_detector,
579 )
580}
581
582pub async fn compact(
585 compactor_context: CompactorContext,
586 mut compact_task: CompactTask,
587 shutdown_rx: Receiver<()>,
588 object_id_getter: Arc<dyn GetObjectId>,
589 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
590) -> (
591 (
592 CompactTask,
593 HashMap<TableId, TableStats>,
594 HashMap<HummockSstableObjectId, u64>,
595 ),
596 Option<MemoryTracker>,
597) {
598 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
599 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
600 .acquire(table_ids_to_be_compacted.clone())
601 .await
602 {
603 Ok(compaction_catalog_agent_ref) => {
604 let acquire_table_ids: HashSet<StateTableId> =
605 compaction_catalog_agent_ref.table_ids().collect();
606 if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
607 let diff = table_ids_to_be_compacted
608 .into_iter()
609 .collect::<HashSet<_>>()
610 .symmetric_difference(&acquire_table_ids)
611 .cloned()
612 .collect::<Vec<_>>();
613 tracing::warn!(
614 dif= ?diff,
615 "Some table ids are not acquired."
616 );
617 return (
618 compact_done(
619 compact_task,
620 compactor_context.clone(),
621 vec![],
622 TaskStatus::ExecuteFailed,
623 ),
624 None,
625 );
626 }
627
628 compaction_catalog_agent_ref
629 }
630 Err(e) => {
631 tracing::warn!(
632 error = %e.as_report(),
633 "Failed to acquire compaction catalog agent"
634 );
635 return (
636 compact_done(
637 compact_task,
638 compactor_context.clone(),
639 vec![],
640 TaskStatus::ExecuteFailed,
641 ),
642 None,
643 );
644 }
645 };
646
647 {
649 compact_task
650 .pk_prefix_table_watermarks
651 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
652
653 compact_task
654 .non_pk_prefix_table_watermarks
655 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
656 }
657
658 compact_with_agent(
659 compactor_context,
660 compact_task,
661 shutdown_rx,
662 object_id_getter,
663 compaction_catalog_agent_ref,
664 )
665 .await
666}
667
668pub(crate) fn compact_done(
670 mut compact_task: CompactTask,
671 context: CompactorContext,
672 output_ssts: Vec<CompactOutput>,
673 task_status: TaskStatus,
674) -> (
675 CompactTask,
676 HashMap<TableId, TableStats>,
677 HashMap<HummockSstableObjectId, u64>,
678) {
679 let mut table_stats_map = TableStatsMap::default();
680 let mut object_timestamps = HashMap::default();
681 compact_task.task_status = task_status;
682 compact_task
683 .sorted_output_ssts
684 .reserve(compact_task.splits.len());
685 let mut compaction_write_bytes = 0;
686 for (
687 _,
688 ssts,
689 CompactionStatistics {
690 delta_drop_stat, ..
691 },
692 ) in output_ssts
693 {
694 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
695 for sst_info in ssts {
696 compaction_write_bytes += sst_info.file_size();
697 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
698 compact_task.sorted_output_ssts.push(sst_info.sst_info);
699 }
700 }
701
702 let group_label = compact_task.compaction_group_id.to_string();
703 let level_label = compact_task.target_level.to_string();
704 context
705 .compactor_metrics
706 .compact_write_bytes
707 .with_label_values(&[&group_label, &level_label])
708 .inc_by(compaction_write_bytes);
709 context
710 .compactor_metrics
711 .compact_write_sstn
712 .with_label_values(&[&group_label, &level_label])
713 .inc_by(compact_task.sorted_output_ssts.len() as u64);
714
715 (compact_task, table_stats_map, object_timestamps)
716}
717
718pub async fn compact_and_build_sst<F>(
719 sst_builder: &mut CapacitySplitTableBuilder<F>,
720 task_config: &TaskConfig,
721 compactor_metrics: Arc<CompactorMetrics>,
722 mut iter: impl HummockIterator<Direction = Forward>,
723 mut compaction_filter: impl CompactionFilter,
724) -> HummockResult<CompactionStatistics>
725where
726 F: TableBuilderFactory,
727{
728 if !task_config.key_range.left.is_empty() {
729 let full_key = FullKey::decode(&task_config.key_range.left);
730 iter.seek(full_key)
731 .instrument_await("iter_seek".verbose())
732 .await?;
733 } else {
734 iter.rewind().instrument_await("rewind".verbose()).await?;
735 };
736
737 let end_key = if task_config.key_range.right.is_empty() {
738 FullKey::default()
739 } else {
740 FullKey::decode(&task_config.key_range.right).to_vec()
741 };
742 let max_key = end_key.to_ref();
743
744 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
745 let mut local_stats = StoreLocalStatistic::default();
746
747 let mut table_stats_drop = TableStatsMap::default();
749 let mut last_table_stats = TableStats::default();
750 let mut last_table_id = None;
751 let mut compaction_statistics = CompactionStatistics::default();
752 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
754 let schemas: HashMap<TableId, HashSet<i32>> = task_config
755 .table_schemas
756 .iter()
757 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
758 .collect();
759 while iter.is_valid() {
760 let iter_key = iter.key();
761 compaction_statistics.iter_total_key_counts += 1;
762
763 let is_new_user_key = full_key_tracker.observe(iter.key());
764 let mut drop = false;
765
766 let value = iter.value();
768 let ValueMeta {
769 object_id,
770 block_id,
771 } = iter.value_meta();
772 if is_new_user_key {
773 if !max_key.is_empty() && iter_key >= max_key {
774 break;
775 }
776 if value.is_delete() {
777 local_stats.skip_delete_key_count += 1;
778 }
779 } else {
780 local_stats.skip_multi_version_key_count += 1;
781 }
782
783 if last_table_id != Some(iter_key.user_key.table_id) {
784 if let Some(last_table_id) = last_table_id.take() {
785 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
786 }
787 last_table_id = Some(iter_key.user_key.table_id);
788 }
789
790 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
797 || (!task_config.retain_multiple_version && !is_new_user_key)
798 {
799 drop = true;
800 }
801
802 if !drop && compaction_filter.should_delete(iter_key) {
803 drop = true;
804 }
805
806 if drop {
807 compaction_statistics.iter_drop_key_counts += 1;
808
809 let should_count = match task_config.stats_target_table_ids.as_ref() {
810 Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
811 None => true,
812 };
813 if should_count {
814 last_table_stats.total_key_count -= 1;
815 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
816 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
817 }
818 iter.next()
819 .instrument_await("iter_next_in_drop".verbose())
820 .await?;
821 continue;
822 }
823
824 let check_table_id = iter_key.user_key.table_id;
826 let mut is_value_rewritten = false;
827 if let HummockValue::Put(v) = value
828 && let Some(object_id) = object_id
829 && let Some(block_id) = block_id
830 && !skip_schema_check
831 .get(&object_id)
832 .map(|prev_block_id| {
833 assert!(*prev_block_id <= block_id);
834 *prev_block_id == block_id
835 })
836 .unwrap_or(false)
837 && let Some(schema) = schemas.get(&check_table_id)
838 {
839 let value_size = v.len();
840 match try_drop_invalid_columns(v, schema) {
841 None => {
842 if !task_config.disable_drop_column_optimization {
843 skip_schema_check.insert(object_id, block_id);
846 }
847 }
848 Some(new_value) => {
849 is_value_rewritten = true;
850 let new_put = HummockValue::put(new_value.as_slice());
851 sst_builder
852 .add_full_key(iter_key, new_put, is_new_user_key)
853 .instrument_await("add_rewritten_full_key".verbose())
854 .await?;
855 let value_size_change = value_size as i64 - new_value.len() as i64;
856 assert!(value_size_change >= 0);
857 last_table_stats.total_value_size -= value_size_change;
858 }
859 }
860 }
861
862 if !is_value_rewritten {
863 sst_builder
865 .add_full_key(iter_key, value, is_new_user_key)
866 .instrument_await("add_full_key".verbose())
867 .await?;
868 }
869
870 iter.next().instrument_await("iter_next".verbose()).await?;
871 }
872
873 if let Some(last_table_id) = last_table_id.take() {
874 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
875 }
876 iter.collect_local_statistic(&mut local_stats);
877 add_table_stats_map(
878 &mut table_stats_drop,
879 &local_stats.skipped_by_watermark_table_stats,
880 );
881 local_stats.report_compactor(compactor_metrics.as_ref());
882 compaction_statistics.delta_drop_stat = table_stats_drop;
883
884 Ok(compaction_statistics)
885}
886
887#[cfg(test)]
888pub mod tests {
889 use risingwave_hummock_sdk::can_concat;
890
891 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
892 use crate::hummock::iterator::test_utils::mock_sstable_store;
893 use crate::hummock::test_utils::{
894 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
895 };
896 use crate::hummock::value::HummockValue;
897
898 #[tokio::test]
899 async fn test_partition_overlapping_level() {
900 const TEST_KEYS_COUNT: usize = 10;
901 let sstable_store = mock_sstable_store().await;
902 let mut table_infos = vec![];
903 for object_id in 0..10 {
904 let start_index = object_id * TEST_KEYS_COUNT;
905 let end_index = start_index + 2 * TEST_KEYS_COUNT;
906 let table_info = gen_test_sstable_info(
907 default_builder_opt_for_test(),
908 object_id as u64,
909 (start_index..end_index)
910 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
911 sstable_store.clone(),
912 )
913 .await;
914 table_infos.push(table_info);
915 }
916 let table_infos = partition_overlapping_sstable_infos(table_infos);
917 assert_eq!(table_infos.len(), 2);
918 for ssts in table_infos {
919 assert!(can_concat(&ssts));
920 }
921 }
922}