1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34 full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47 metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53 fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58 ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64 BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65 SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69 compact_task: CompactTask,
70 compactor: Compactor,
71 sstable_store: SstableStoreRef,
72 key_range: KeyRange,
73 split_index: usize,
74}
75
76impl CompactorRunner {
77 pub fn new(
78 split_index: usize,
79 context: CompactorContext,
80 task: CompactTask,
81 object_id_getter: Arc<dyn GetObjectId>,
82 ) -> Self {
83 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84 options.compression_algorithm = match task.compression_algorithm {
85 0 => CompressionAlgorithm::None,
86 1 => CompressionAlgorithm::Lz4,
87 _ => CompressionAlgorithm::Zstd,
88 };
89
90 options.capacity = estimate_task_output_capacity(context.clone(), &task);
91 let kv_count = task
92 .input_ssts
93 .iter()
94 .flat_map(|level| level.table_infos.iter())
95 .map(|sst| sst.total_key_count)
96 .sum::<u64>() as usize;
97 let use_block_based_filter =
98 BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100 let key_range = KeyRange {
101 left: task.splits[split_index].left.clone(),
102 right: task.splits[split_index].right.clone(),
103 right_exclusive: true,
104 };
105
106 let compactor = Compactor::new(
107 context.clone(),
108 options,
109 TaskConfig {
110 key_range: key_range.clone(),
111 cache_policy: CachePolicy::NotFill,
112 gc_delete_keys: task.gc_delete_keys,
113 retain_multiple_version: false,
114 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115 task_type: task.task_type,
116 use_block_based_filter,
117 table_vnode_partition: task.table_vnode_partition.clone(),
118 table_schemas: task
119 .table_schemas
120 .iter()
121 .map(|(k, v)| (*k, v.clone()))
122 .collect(),
123 disable_drop_column_optimization: false,
124 },
125 object_id_getter,
126 );
127
128 Self {
129 compactor,
130 compact_task: task,
131 sstable_store: context.sstable_store,
132 key_range,
133 split_index,
134 }
135 }
136
137 pub async fn run(
138 &self,
139 compaction_filter: impl CompactionFilter,
140 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141 task_progress: Arc<TaskProgress>,
142 ) -> HummockResult<CompactOutput> {
143 let iter =
144 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145 let (ssts, compaction_stat) = self
146 .compactor
147 .compact_key_range(
148 iter,
149 compaction_filter,
150 compaction_catalog_agent_ref,
151 Some(task_progress),
152 Some(self.compact_task.task_id),
153 Some(self.split_index),
154 )
155 .await?;
156 Ok((self.split_index, ssts, compaction_stat))
157 }
158
159 fn build_sst_iter(
161 &self,
162 task_progress: Arc<TaskProgress>,
163 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165 let compactor_iter_max_io_retry_times = self
166 .compactor
167 .context
168 .storage_opts
169 .compactor_iter_max_io_retry_times;
170 let mut table_iters = Vec::new();
171 for level in &self.compact_task.input_ssts {
172 if level.table_infos.is_empty() {
173 continue;
174 }
175
176 let tables = level
177 .table_infos
178 .iter()
179 .filter(|table_info| {
180 let table_ids = &table_info.table_ids;
181 let exist_table = table_ids
182 .iter()
183 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186 })
187 .cloned()
188 .collect_vec();
189 if level.level_type == LevelType::Nonoverlapping {
191 debug_assert!(can_concat(&level.table_infos));
192 table_iters.push(ConcatSstableIterator::new(
193 self.compact_task.existing_table_ids.clone(),
194 tables,
195 self.compactor.task_config.key_range.clone(),
196 self.sstable_store.clone(),
197 task_progress.clone(),
198 compactor_iter_max_io_retry_times,
199 ));
200 } else if tables.len()
201 > self
202 .compactor
203 .context
204 .storage_opts
205 .compactor_max_overlap_sst_count
206 {
207 let sst_groups = partition_overlapping_sstable_infos(tables);
208 tracing::warn!(
209 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210 level.table_infos.len(),
211 sst_groups.len()
212 );
213 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214 assert!(
216 full_key_can_concat(&table_infos),
217 "sst_group idx {:?} table_infos: {:?}",
218 idx,
219 table_infos
220 );
221 table_iters.push(ConcatSstableIterator::new(
222 self.compact_task.existing_table_ids.clone(),
223 table_infos,
224 self.compactor.task_config.key_range.clone(),
225 self.sstable_store.clone(),
226 task_progress.clone(),
227 compactor_iter_max_io_retry_times,
228 ));
229 }
230 } else {
231 for table_info in tables {
232 table_iters.push(ConcatSstableIterator::new(
233 self.compact_task.existing_table_ids.clone(),
234 vec![table_info],
235 self.compactor.task_config.key_range.clone(),
236 self.sstable_store.clone(),
237 task_progress.clone(),
238 compactor_iter_max_io_retry_times,
239 ));
240 }
241 }
242 }
243
244 let combine_iter = {
247 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248 MonitoredCompactorIterator::new(
249 MergeIterator::for_compactor(table_iters),
250 task_progress.clone(),
251 ),
252 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253 self.compact_task.pk_prefix_table_watermarks.clone(),
254 ),
255 );
256
257 NonPkPrefixSkipWatermarkIterator::new(
258 skip_watermark_iter,
259 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260 self.compact_task.non_pk_prefix_table_watermarks.clone(),
261 compaction_catalog_agent_ref,
262 ),
263 )
264 };
265
266 Ok(combine_iter)
267 }
268}
269
270pub fn partition_overlapping_sstable_infos(
271 mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273 pub struct SstableGroup {
274 ssts: Vec<SstableInfo>,
275 max_right_bound: Bytes,
276 }
277
278 impl PartialEq for SstableGroup {
279 fn eq(&self, other: &SstableGroup) -> bool {
280 self.max_right_bound == other.max_right_bound
281 }
282 }
283 impl PartialOrd for SstableGroup {
284 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285 Some(self.cmp(other))
286 }
287 }
288 impl Eq for SstableGroup {}
289 impl Ord for SstableGroup {
290 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293 }
294 }
295 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296 origin_infos.sort_by(|a, b| {
297 let x = &a.key_range;
298 let y = &b.key_range;
299 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300 });
301 for sst in origin_infos {
302 if let Some(mut prev_group) = groups.peek_mut()
304 && KeyComparator::encoded_full_key_less_than(
305 &prev_group.max_right_bound,
306 &sst.key_range.left,
307 )
308 {
309 prev_group.max_right_bound.clone_from(&sst.key_range.right);
310 prev_group.ssts.push(sst);
311 continue;
312 }
313 groups.push(SstableGroup {
314 max_right_bound: sst.key_range.right.clone(),
315 ssts: vec![sst],
316 });
317 }
318 assert!(!groups.is_empty());
319 groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322pub async fn compact_with_agent(
325 compactor_context: CompactorContext,
326 mut compact_task: CompactTask,
327 mut shutdown_rx: Receiver<()>,
328 object_id_getter: Arc<dyn GetObjectId>,
329 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331 (
332 CompactTask,
333 HashMap<u32, TableStats>,
334 HashMap<HummockSstableObjectId, u64>,
335 ),
336 Option<MemoryTracker>,
337) {
338 let context = compactor_context.clone();
339 let group_label = compact_task.compaction_group_id.to_string();
340 metrics_report_for_task(&compact_task, &context);
341
342 let timer = context
343 .compactor_metrics
344 .compact_task_duration
345 .with_label_values(&[
346 &group_label,
347 &compact_task.input_ssts[0].level_idx.to_string(),
348 ])
349 .start_timer();
350
351 let multi_filter = build_multi_compaction_filter(&compact_task);
352 let mut task_status = TaskStatus::Success;
353 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355 if let Err(e) =
356 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357 {
358 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359 task_status = TaskStatus::ExecuteFailed;
360 return (
361 compact_done(compact_task, context.clone(), vec![], task_status),
362 None,
363 );
364 }
365
366 let compact_task_statistics = statistics_compact_task(&compact_task);
367 let parallelism = compact_task.splits.len();
369 assert_ne!(parallelism, 0, "splits cannot be empty");
370 let mut output_ssts = Vec::with_capacity(parallelism);
371 let mut compaction_futures = vec![];
372 let mut abort_handles = vec![];
373 let task_progress_guard =
374 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379 &compact_task,
380 (context.storage_opts.block_size_kb as u64) * (1 << 10),
381 context
382 .storage_opts
383 .object_store_config
384 .s3
385 .recv_buffer_size
386 .unwrap_or(6 * 1024 * 1024) as u64,
387 capacity as u64,
388 ) * compact_task.splits.len() as u64;
389
390 tracing::info!(
391 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392 compact_task.task_id,
393 compact_task_statistics,
394 compact_task.compression_algorithm,
395 parallelism,
396 task_memory_capacity_with_parallelism,
397 optimize_by_copy_block,
398 compact_task_to_string(&compact_task),
399 );
400
401 let memory_detector = context
404 .memory_limiter
405 .try_require_memory(task_memory_capacity_with_parallelism);
406 if memory_detector.is_none() {
407 tracing::warn!(
408 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
409 compact_task.task_id,
410 task_memory_capacity_with_parallelism,
411 context.memory_limiter.get_memory_usage(),
412 context.memory_limiter.quota()
413 );
414 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415 return (
416 compact_done(compact_task, context.clone(), output_ssts, task_status),
417 memory_detector,
418 );
419 }
420
421 context.compactor_metrics.compact_task_pending_num.inc();
422 context
423 .compactor_metrics
424 .compact_task_pending_parallelism
425 .add(parallelism as _);
426 let _release_metrics_guard =
427 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428 context.compactor_metrics.compact_task_pending_num.dec();
429 context
430 .compactor_metrics
431 .compact_task_pending_parallelism
432 .sub(parallelism as _);
433 });
434
435 if optimize_by_copy_block {
436 let runner = fast_compactor_runner::CompactorRunner::new(
437 context.clone(),
438 compact_task.clone(),
439 compaction_catalog_agent_ref.clone(),
440 object_id_getter.clone(),
441 task_progress_guard.progress.clone(),
442 multi_filter,
443 );
444
445 tokio::select! {
446 _ = &mut shutdown_rx => {
447 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
448 task_status = TaskStatus::ManualCanceled;
449 },
450
451 ret = runner.run() => {
452 match ret {
453 Ok((ssts, statistics)) => {
454 output_ssts.push((0, ssts, statistics));
455 }
456 Err(e) => {
457 task_status = TaskStatus::ExecuteFailed;
458 tracing::warn!(
459 error = %e.as_report(),
460 "Compaction task {} failed with error",
461 compact_task.task_id,
462 );
463 }
464 }
465 }
466 }
467
468 let (compact_task, table_stats, object_timestamps) =
470 compact_done(compact_task, context.clone(), output_ssts, task_status);
471 let cost_time = timer.stop_and_record() * 1000.0;
472 tracing::info!(
473 "Finished fast compaction task in {:?}ms: {}",
474 cost_time,
475 compact_task_to_string(&compact_task)
476 );
477 return (
478 (compact_task, table_stats, object_timestamps),
479 memory_detector,
480 );
481 }
482 for (split_index, _) in compact_task.splits.iter().enumerate() {
483 let filter = multi_filter.clone();
484 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
485 let compactor_runner = CompactorRunner::new(
486 split_index,
487 compactor_context.clone(),
488 compact_task.clone(),
489 object_id_getter.clone(),
490 );
491 let task_progress = task_progress_guard.progress.clone();
492 let runner = async move {
493 compactor_runner
494 .run(filter, compaction_catalog_agent_ref, task_progress)
495 .await
496 };
497 let traced = match context.await_tree_reg.as_ref() {
498 None => runner.right_future(),
499 Some(await_tree_reg) => await_tree_reg
500 .register(
501 await_tree_key::CompactRunner {
502 task_id: compact_task.task_id,
503 split_index,
504 },
505 format!(
506 "Compaction Task {} Split {} ",
507 compact_task.task_id, split_index
508 ),
509 )
510 .instrument(runner)
511 .left_future(),
512 };
513 let handle = tokio::spawn(traced);
514 abort_handles.push(handle.abort_handle());
515 compaction_futures.push(handle);
516 }
517
518 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
519 loop {
520 tokio::select! {
521 _ = &mut shutdown_rx => {
522 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
523 task_status = TaskStatus::ManualCanceled;
524 break;
525 }
526 future_result = buffered.next() => {
527 match future_result {
528 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
529 output_ssts.push((split_index, ssts, compact_stat));
530 }
531 Some(Ok(Err(e))) => {
532 task_status = TaskStatus::ExecuteFailed;
533 tracing::warn!(
534 error = %e.as_report(),
535 "Compaction task {} failed with error",
536 compact_task.task_id,
537 );
538 break;
539 }
540 Some(Err(e)) => {
541 task_status = TaskStatus::JoinHandleFailed;
542 tracing::warn!(
543 error = %e.as_report(),
544 "Compaction task {} failed with join handle error",
545 compact_task.task_id,
546 );
547 break;
548 }
549 None => break,
550 }
551 }
552 }
553 }
554
555 if task_status != TaskStatus::Success {
556 for abort_handle in abort_handles {
557 abort_handle.abort();
558 }
559 output_ssts.clear();
560 }
561 if !output_ssts.is_empty() {
563 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
564 }
565
566 let (compact_task, table_stats, object_timestamps) =
568 compact_done(compact_task, context.clone(), output_ssts, task_status);
569 let cost_time = timer.stop_and_record() * 1000.0;
570 tracing::info!(
571 "Finished compaction task in {:?}ms: {}",
572 cost_time,
573 compact_task_output_to_string(&compact_task)
574 );
575 (
576 (compact_task, table_stats, object_timestamps),
577 memory_detector,
578 )
579}
580
581pub async fn compact(
584 compactor_context: CompactorContext,
585 mut compact_task: CompactTask,
586 shutdown_rx: Receiver<()>,
587 object_id_getter: Arc<dyn GetObjectId>,
588 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
589) -> (
590 (
591 CompactTask,
592 HashMap<u32, TableStats>,
593 HashMap<HummockSstableObjectId, u64>,
594 ),
595 Option<MemoryTracker>,
596) {
597 let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
598 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
599 .acquire(table_ids_to_be_compacted.clone())
600 .await
601 {
602 Ok(compaction_catalog_agent_ref) => {
603 let acquire_table_ids: HashSet<StateTableId> =
604 compaction_catalog_agent_ref.table_ids().collect();
605 if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
606 let diff = table_ids_to_be_compacted
607 .into_iter()
608 .collect::<HashSet<_>>()
609 .symmetric_difference(&acquire_table_ids)
610 .cloned()
611 .collect::<Vec<_>>();
612 tracing::warn!(
613 dif= ?diff,
614 "Some table ids are not acquired."
615 );
616 return (
617 compact_done(
618 compact_task,
619 compactor_context.clone(),
620 vec![],
621 TaskStatus::ExecuteFailed,
622 ),
623 None,
624 );
625 }
626
627 compaction_catalog_agent_ref
628 }
629 Err(e) => {
630 tracing::warn!(
631 error = %e.as_report(),
632 "Failed to acquire compaction catalog agent"
633 );
634 return (
635 compact_done(
636 compact_task,
637 compactor_context.clone(),
638 vec![],
639 TaskStatus::ExecuteFailed,
640 ),
641 None,
642 );
643 }
644 };
645
646 {
648 compact_task
649 .pk_prefix_table_watermarks
650 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
651
652 compact_task
653 .non_pk_prefix_table_watermarks
654 .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
655 }
656
657 compact_with_agent(
658 compactor_context,
659 compact_task,
660 shutdown_rx,
661 object_id_getter,
662 compaction_catalog_agent_ref,
663 )
664 .await
665}
666
667pub(crate) fn compact_done(
669 mut compact_task: CompactTask,
670 context: CompactorContext,
671 output_ssts: Vec<CompactOutput>,
672 task_status: TaskStatus,
673) -> (
674 CompactTask,
675 HashMap<u32, TableStats>,
676 HashMap<HummockSstableObjectId, u64>,
677) {
678 let mut table_stats_map = TableStatsMap::default();
679 let mut object_timestamps = HashMap::default();
680 compact_task.task_status = task_status;
681 compact_task
682 .sorted_output_ssts
683 .reserve(compact_task.splits.len());
684 let mut compaction_write_bytes = 0;
685 for (
686 _,
687 ssts,
688 CompactionStatistics {
689 delta_drop_stat, ..
690 },
691 ) in output_ssts
692 {
693 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
694 for sst_info in ssts {
695 compaction_write_bytes += sst_info.file_size();
696 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
697 compact_task.sorted_output_ssts.push(sst_info.sst_info);
698 }
699 }
700
701 let group_label = compact_task.compaction_group_id.to_string();
702 let level_label = compact_task.target_level.to_string();
703 context
704 .compactor_metrics
705 .compact_write_bytes
706 .with_label_values(&[&group_label, &level_label])
707 .inc_by(compaction_write_bytes);
708 context
709 .compactor_metrics
710 .compact_write_sstn
711 .with_label_values(&[&group_label, &level_label])
712 .inc_by(compact_task.sorted_output_ssts.len() as u64);
713
714 (compact_task, table_stats_map, object_timestamps)
715}
716
717pub async fn compact_and_build_sst<F>(
718 sst_builder: &mut CapacitySplitTableBuilder<F>,
719 task_config: &TaskConfig,
720 compactor_metrics: Arc<CompactorMetrics>,
721 mut iter: impl HummockIterator<Direction = Forward>,
722 mut compaction_filter: impl CompactionFilter,
723) -> HummockResult<CompactionStatistics>
724where
725 F: TableBuilderFactory,
726{
727 if !task_config.key_range.left.is_empty() {
728 let full_key = FullKey::decode(&task_config.key_range.left);
729 iter.seek(full_key)
730 .instrument_await("iter_seek".verbose())
731 .await?;
732 } else {
733 iter.rewind().instrument_await("rewind".verbose()).await?;
734 };
735
736 let end_key = if task_config.key_range.right.is_empty() {
737 FullKey::default()
738 } else {
739 FullKey::decode(&task_config.key_range.right).to_vec()
740 };
741 let max_key = end_key.to_ref();
742
743 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
744 let mut local_stats = StoreLocalStatistic::default();
745
746 let mut table_stats_drop = TableStatsMap::default();
748 let mut last_table_stats = TableStats::default();
749 let mut last_table_id = None;
750 let mut compaction_statistics = CompactionStatistics::default();
751 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
753 let schemas: HashMap<u32, HashSet<i32>> = task_config
754 .table_schemas
755 .iter()
756 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
757 .collect();
758 while iter.is_valid() {
759 let iter_key = iter.key();
760 compaction_statistics.iter_total_key_counts += 1;
761
762 let is_new_user_key = full_key_tracker.observe(iter.key());
763 let mut drop = false;
764
765 let value = iter.value();
767 let ValueMeta {
768 object_id,
769 block_id,
770 } = iter.value_meta();
771 if is_new_user_key {
772 if !max_key.is_empty() && iter_key >= max_key {
773 break;
774 }
775 if value.is_delete() {
776 local_stats.skip_delete_key_count += 1;
777 }
778 } else {
779 local_stats.skip_multi_version_key_count += 1;
780 }
781
782 if last_table_id != Some(iter_key.user_key.table_id.table_id) {
783 if let Some(last_table_id) = last_table_id.take() {
784 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
785 }
786 last_table_id = Some(iter_key.user_key.table_id.table_id);
787 }
788
789 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
796 || (!task_config.retain_multiple_version && !is_new_user_key)
797 {
798 drop = true;
799 }
800
801 if !drop && compaction_filter.should_delete(iter_key) {
802 drop = true;
803 }
804
805 if drop {
806 compaction_statistics.iter_drop_key_counts += 1;
807
808 let should_count = match task_config.stats_target_table_ids.as_ref() {
809 Some(target_table_ids) => {
810 target_table_ids.contains(&iter_key.user_key.table_id.table_id)
811 }
812 None => true,
813 };
814 if should_count {
815 last_table_stats.total_key_count -= 1;
816 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
817 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
818 }
819 iter.next()
820 .instrument_await("iter_next_in_drop".verbose())
821 .await?;
822 continue;
823 }
824
825 let check_table_id = iter_key.user_key.table_id.table_id;
827 let mut is_value_rewritten = false;
828 if let HummockValue::Put(v) = value
829 && let Some(object_id) = object_id
830 && let Some(block_id) = block_id
831 && !skip_schema_check
832 .get(&object_id)
833 .map(|prev_block_id| {
834 assert!(*prev_block_id <= block_id);
835 *prev_block_id == block_id
836 })
837 .unwrap_or(false)
838 && let Some(schema) = schemas.get(&check_table_id)
839 {
840 let value_size = v.len();
841 match try_drop_invalid_columns(v, schema) {
842 None => {
843 if !task_config.disable_drop_column_optimization {
844 skip_schema_check.insert(object_id, block_id);
847 }
848 }
849 Some(new_value) => {
850 is_value_rewritten = true;
851 let new_put = HummockValue::put(new_value.as_slice());
852 sst_builder
853 .add_full_key(iter_key, new_put, is_new_user_key)
854 .instrument_await("add_rewritten_full_key".verbose())
855 .await?;
856 let value_size_change = value_size as i64 - new_value.len() as i64;
857 assert!(value_size_change >= 0);
858 last_table_stats.total_value_size -= value_size_change;
859 }
860 }
861 }
862
863 if !is_value_rewritten {
864 sst_builder
866 .add_full_key(iter_key, value, is_new_user_key)
867 .instrument_await("add_full_key".verbose())
868 .await?;
869 }
870
871 iter.next().instrument_await("iter_next".verbose()).await?;
872 }
873
874 if let Some(last_table_id) = last_table_id.take() {
875 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
876 }
877 iter.collect_local_statistic(&mut local_stats);
878 add_table_stats_map(
879 &mut table_stats_drop,
880 &local_stats.skipped_by_watermark_table_stats,
881 );
882 local_stats.report_compactor(compactor_metrics.as_ref());
883 compaction_statistics.delta_drop_stat = table_stats_drop;
884
885 Ok(compaction_statistics)
886}
887
888#[cfg(test)]
889pub mod tests {
890 use risingwave_hummock_sdk::can_concat;
891
892 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
893 use crate::hummock::iterator::test_utils::mock_sstable_store;
894 use crate::hummock::test_utils::{
895 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
896 };
897 use crate::hummock::value::HummockValue;
898
899 #[tokio::test]
900 async fn test_partition_overlapping_level() {
901 const TEST_KEYS_COUNT: usize = 10;
902 let sstable_store = mock_sstable_store().await;
903 let mut table_infos = vec![];
904 for object_id in 0..10 {
905 let start_index = object_id * TEST_KEYS_COUNT;
906 let end_index = start_index + 2 * TEST_KEYS_COUNT;
907 let table_info = gen_test_sstable_info(
908 default_builder_opt_for_test(),
909 object_id as u64,
910 (start_index..end_index)
911 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
912 sstable_store.clone(),
913 )
914 .await;
915 table_infos.push(table_info);
916 }
917 let table_infos = partition_overlapping_sstable_infos(table_infos);
918 assert_eq!(table_infos.len(), 2);
919 for ssts in table_infos {
920 assert!(can_concat(&ssts));
921 }
922 }
923}