1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34 full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47 metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53 fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58 ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64 BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65 SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69 compact_task: CompactTask,
70 compactor: Compactor,
71 sstable_store: SstableStoreRef,
72 key_range: KeyRange,
73 split_index: usize,
74}
75
76impl CompactorRunner {
77 pub fn new(
78 split_index: usize,
79 context: CompactorContext,
80 task: CompactTask,
81 object_id_getter: Arc<dyn GetObjectId>,
82 ) -> Self {
83 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84 options.compression_algorithm = match task.compression_algorithm {
85 0 => CompressionAlgorithm::None,
86 1 => CompressionAlgorithm::Lz4,
87 _ => CompressionAlgorithm::Zstd,
88 };
89
90 options.capacity = estimate_task_output_capacity(context.clone(), &task);
91 let kv_count = task
92 .input_ssts
93 .iter()
94 .flat_map(|level| level.table_infos.iter())
95 .map(|sst| sst.total_key_count)
96 .sum::<u64>() as usize;
97 let use_block_based_filter =
98 BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100 let key_range = KeyRange {
101 left: task.splits[split_index].left.clone(),
102 right: task.splits[split_index].right.clone(),
103 right_exclusive: true,
104 };
105
106 let compactor = Compactor::new(
107 context.clone(),
108 options,
109 TaskConfig {
110 key_range: key_range.clone(),
111 cache_policy: CachePolicy::NotFill,
112 gc_delete_keys: task.gc_delete_keys,
113 retain_multiple_version: false,
114 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115 task_type: task.task_type,
116 use_block_based_filter,
117 table_vnode_partition: task.table_vnode_partition.clone(),
118 table_schemas: task
119 .table_schemas
120 .iter()
121 .map(|(k, v)| (*k, v.clone()))
122 .collect(),
123 disable_drop_column_optimization: false,
124 },
125 object_id_getter,
126 );
127
128 Self {
129 compactor,
130 compact_task: task,
131 sstable_store: context.sstable_store,
132 key_range,
133 split_index,
134 }
135 }
136
137 pub async fn run(
138 &self,
139 compaction_filter: impl CompactionFilter,
140 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141 task_progress: Arc<TaskProgress>,
142 ) -> HummockResult<CompactOutput> {
143 let iter =
144 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145 let (ssts, compaction_stat) = self
146 .compactor
147 .compact_key_range(
148 iter,
149 compaction_filter,
150 compaction_catalog_agent_ref,
151 Some(task_progress),
152 Some(self.compact_task.task_id),
153 Some(self.split_index),
154 )
155 .await?;
156 Ok((self.split_index, ssts, compaction_stat))
157 }
158
159 fn build_sst_iter(
161 &self,
162 task_progress: Arc<TaskProgress>,
163 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165 let compactor_iter_max_io_retry_times = self
166 .compactor
167 .context
168 .storage_opts
169 .compactor_iter_max_io_retry_times;
170 let mut table_iters = Vec::new();
171 for level in &self.compact_task.input_ssts {
172 if level.table_infos.is_empty() {
173 continue;
174 }
175
176 let tables = level
177 .table_infos
178 .iter()
179 .filter(|table_info| {
180 let table_ids = &table_info.table_ids;
181 let exist_table = table_ids
182 .iter()
183 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186 })
187 .cloned()
188 .collect_vec();
189 if level.level_type == LevelType::Nonoverlapping {
191 debug_assert!(can_concat(&level.table_infos));
192 table_iters.push(ConcatSstableIterator::new(
193 self.compact_task.existing_table_ids.clone(),
194 tables,
195 self.compactor.task_config.key_range.clone(),
196 self.sstable_store.clone(),
197 task_progress.clone(),
198 compactor_iter_max_io_retry_times,
199 ));
200 } else if tables.len()
201 > self
202 .compactor
203 .context
204 .storage_opts
205 .compactor_max_overlap_sst_count
206 {
207 let sst_groups = partition_overlapping_sstable_infos(tables);
208 tracing::warn!(
209 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210 level.table_infos.len(),
211 sst_groups.len()
212 );
213 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214 assert!(
216 full_key_can_concat(&table_infos),
217 "sst_group idx {:?} table_infos: {:?}",
218 idx,
219 table_infos
220 );
221 table_iters.push(ConcatSstableIterator::new(
222 self.compact_task.existing_table_ids.clone(),
223 table_infos,
224 self.compactor.task_config.key_range.clone(),
225 self.sstable_store.clone(),
226 task_progress.clone(),
227 compactor_iter_max_io_retry_times,
228 ));
229 }
230 } else {
231 for table_info in tables {
232 table_iters.push(ConcatSstableIterator::new(
233 self.compact_task.existing_table_ids.clone(),
234 vec![table_info],
235 self.compactor.task_config.key_range.clone(),
236 self.sstable_store.clone(),
237 task_progress.clone(),
238 compactor_iter_max_io_retry_times,
239 ));
240 }
241 }
242 }
243
244 let combine_iter = {
247 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248 MonitoredCompactorIterator::new(
249 MergeIterator::for_compactor(table_iters),
250 task_progress.clone(),
251 ),
252 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253 self.compact_task.pk_prefix_table_watermarks.clone(),
254 ),
255 );
256
257 NonPkPrefixSkipWatermarkIterator::new(
258 skip_watermark_iter,
259 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260 self.compact_task.non_pk_prefix_table_watermarks.clone(),
261 compaction_catalog_agent_ref,
262 ),
263 )
264 };
265
266 Ok(combine_iter)
267 }
268}
269
270pub fn partition_overlapping_sstable_infos(
271 mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273 pub struct SstableGroup {
274 ssts: Vec<SstableInfo>,
275 max_right_bound: Bytes,
276 }
277
278 impl PartialEq for SstableGroup {
279 fn eq(&self, other: &SstableGroup) -> bool {
280 self.max_right_bound == other.max_right_bound
281 }
282 }
283 impl PartialOrd for SstableGroup {
284 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285 Some(self.cmp(other))
286 }
287 }
288 impl Eq for SstableGroup {}
289 impl Ord for SstableGroup {
290 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293 }
294 }
295 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296 origin_infos.sort_by(|a, b| {
297 let x = &a.key_range;
298 let y = &b.key_range;
299 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300 });
301 for sst in origin_infos {
302 if let Some(mut prev_group) = groups.peek_mut()
304 && KeyComparator::encoded_full_key_less_than(
305 &prev_group.max_right_bound,
306 &sst.key_range.left,
307 )
308 {
309 prev_group.max_right_bound.clone_from(&sst.key_range.right);
310 prev_group.ssts.push(sst);
311 continue;
312 }
313 groups.push(SstableGroup {
314 max_right_bound: sst.key_range.right.clone(),
315 ssts: vec![sst],
316 });
317 }
318 assert!(!groups.is_empty());
319 groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322pub async fn compact_with_agent(
325 compactor_context: CompactorContext,
326 mut compact_task: CompactTask,
327 mut shutdown_rx: Receiver<()>,
328 object_id_getter: Arc<dyn GetObjectId>,
329 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331 (
332 CompactTask,
333 HashMap<u32, TableStats>,
334 HashMap<HummockSstableObjectId, u64>,
335 ),
336 Option<MemoryTracker>,
337) {
338 let context = compactor_context.clone();
339 let group_label = compact_task.compaction_group_id.to_string();
340 metrics_report_for_task(&compact_task, &context);
341
342 let timer = context
343 .compactor_metrics
344 .compact_task_duration
345 .with_label_values(&[
346 &group_label,
347 &compact_task.input_ssts[0].level_idx.to_string(),
348 ])
349 .start_timer();
350
351 let multi_filter = build_multi_compaction_filter(&compact_task);
352 let mut task_status = TaskStatus::Success;
353 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355 if let Err(e) =
356 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357 {
358 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359 task_status = TaskStatus::ExecuteFailed;
360 return (
361 compact_done(compact_task, context.clone(), vec![], task_status),
362 None,
363 );
364 }
365
366 let compact_task_statistics = statistics_compact_task(&compact_task);
367 let parallelism = compact_task.splits.len();
369 assert_ne!(parallelism, 0, "splits cannot be empty");
370 let mut output_ssts = Vec::with_capacity(parallelism);
371 let mut compaction_futures = vec![];
372 let mut abort_handles = vec![];
373 let task_progress_guard =
374 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379 &compact_task,
380 (context.storage_opts.block_size_kb as u64) * (1 << 10),
381 context
382 .storage_opts
383 .object_store_config
384 .s3
385 .recv_buffer_size
386 .unwrap_or(6 * 1024 * 1024) as u64,
387 capacity as u64,
388 ) * compact_task.splits.len() as u64;
389
390 tracing::info!(
391 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392 compact_task.task_id,
393 compact_task_statistics,
394 compact_task.compression_algorithm,
395 parallelism,
396 task_memory_capacity_with_parallelism,
397 optimize_by_copy_block,
398 compact_task_to_string(&compact_task),
399 );
400
401 let memory_detector = context
404 .memory_limiter
405 .try_require_memory(task_memory_capacity_with_parallelism);
406 if memory_detector.is_none() {
407 tracing::warn!(
408 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
409 compact_task.task_id,
410 task_memory_capacity_with_parallelism,
411 context.memory_limiter.get_memory_usage(),
412 context.memory_limiter.quota()
413 );
414 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415 return (
416 compact_done(compact_task, context.clone(), output_ssts, task_status),
417 memory_detector,
418 );
419 }
420
421 context.compactor_metrics.compact_task_pending_num.inc();
422 context
423 .compactor_metrics
424 .compact_task_pending_parallelism
425 .add(parallelism as _);
426 let _release_metrics_guard =
427 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428 context.compactor_metrics.compact_task_pending_num.dec();
429 context
430 .compactor_metrics
431 .compact_task_pending_parallelism
432 .sub(parallelism as _);
433 });
434
435 if optimize_by_copy_block {
436 let runner = fast_compactor_runner::CompactorRunner::new(
437 context.clone(),
438 compact_task.clone(),
439 compaction_catalog_agent_ref.clone(),
440 object_id_getter.clone(),
441 task_progress_guard.progress.clone(),
442 multi_filter,
443 );
444
445 tokio::select! {
446 _ = &mut shutdown_rx => {
447 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
448 task_status = TaskStatus::ManualCanceled;
449 },
450
451 ret = runner.run() => {
452 match ret {
453 Ok((ssts, statistics)) => {
454 output_ssts.push((0, ssts, statistics));
455 }
456 Err(e) => {
457 task_status = TaskStatus::ExecuteFailed;
458 tracing::warn!(
459 error = %e.as_report(),
460 "Compaction task {} failed with error",
461 compact_task.task_id,
462 );
463 }
464 }
465 }
466 }
467
468 let (compact_task, table_stats, object_timestamps) =
470 compact_done(compact_task, context.clone(), output_ssts, task_status);
471 let cost_time = timer.stop_and_record() * 1000.0;
472 tracing::info!(
473 "Finished fast compaction task in {:?}ms: {}",
474 cost_time,
475 compact_task_to_string(&compact_task)
476 );
477 return (
478 (compact_task, table_stats, object_timestamps),
479 memory_detector,
480 );
481 }
482 for (split_index, _) in compact_task.splits.iter().enumerate() {
483 let filter = multi_filter.clone();
484 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
485 let compactor_runner = CompactorRunner::new(
486 split_index,
487 compactor_context.clone(),
488 compact_task.clone(),
489 object_id_getter.clone(),
490 );
491 let task_progress = task_progress_guard.progress.clone();
492 let runner = async move {
493 compactor_runner
494 .run(filter, compaction_catalog_agent_ref, task_progress)
495 .await
496 };
497 let traced = match context.await_tree_reg.as_ref() {
498 None => runner.right_future(),
499 Some(await_tree_reg) => await_tree_reg
500 .register(
501 await_tree_key::CompactRunner {
502 task_id: compact_task.task_id,
503 split_index,
504 },
505 format!(
506 "Compaction Task {} Split {} ",
507 compact_task.task_id, split_index
508 ),
509 )
510 .instrument(runner)
511 .left_future(),
512 };
513 let handle = tokio::spawn(traced);
514 abort_handles.push(handle.abort_handle());
515 compaction_futures.push(handle);
516 }
517
518 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
519 loop {
520 tokio::select! {
521 _ = &mut shutdown_rx => {
522 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
523 task_status = TaskStatus::ManualCanceled;
524 break;
525 }
526 future_result = buffered.next() => {
527 match future_result {
528 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
529 output_ssts.push((split_index, ssts, compact_stat));
530 }
531 Some(Ok(Err(e))) => {
532 task_status = TaskStatus::ExecuteFailed;
533 tracing::warn!(
534 error = %e.as_report(),
535 "Compaction task {} failed with error",
536 compact_task.task_id,
537 );
538 break;
539 }
540 Some(Err(e)) => {
541 task_status = TaskStatus::JoinHandleFailed;
542 tracing::warn!(
543 error = %e.as_report(),
544 "Compaction task {} failed with join handle error",
545 compact_task.task_id,
546 );
547 break;
548 }
549 None => break,
550 }
551 }
552 }
553 }
554
555 if task_status != TaskStatus::Success {
556 for abort_handle in abort_handles {
557 abort_handle.abort();
558 }
559 output_ssts.clear();
560 }
561 if !output_ssts.is_empty() {
563 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
564 }
565
566 let (compact_task, table_stats, object_timestamps) =
568 compact_done(compact_task, context.clone(), output_ssts, task_status);
569 let cost_time = timer.stop_and_record() * 1000.0;
570 tracing::info!(
571 "Finished compaction task in {:?}ms: {}",
572 cost_time,
573 compact_task_output_to_string(&compact_task)
574 );
575 (
576 (compact_task, table_stats, object_timestamps),
577 memory_detector,
578 )
579}
580
581pub async fn compact(
584 compactor_context: CompactorContext,
585 compact_task: CompactTask,
586 shutdown_rx: Receiver<()>,
587 object_id_getter: Arc<dyn GetObjectId>,
588 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
589) -> (
590 (
591 CompactTask,
592 HashMap<u32, TableStats>,
593 HashMap<HummockSstableObjectId, u64>,
594 ),
595 Option<MemoryTracker>,
596) {
597 let compact_table_ids = compact_task.build_compact_table_ids();
598 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
599 .acquire(compact_table_ids.clone())
600 .await
601 {
602 Ok(compaction_catalog_agent_ref) => {
603 let acquire_table_ids: HashSet<StateTableId> =
604 compaction_catalog_agent_ref.table_ids().collect();
605 if acquire_table_ids.len() != compact_table_ids.len() {
606 let diff = compact_table_ids
607 .into_iter()
608 .collect::<HashSet<_>>()
609 .symmetric_difference(&acquire_table_ids)
610 .cloned()
611 .collect::<Vec<_>>();
612 tracing::warn!(
613 dif= ?diff,
614 "Some table ids are not acquired."
615 );
616 return (
617 compact_done(
618 compact_task,
619 compactor_context.clone(),
620 vec![],
621 TaskStatus::ExecuteFailed,
622 ),
623 None,
624 );
625 }
626
627 compaction_catalog_agent_ref
628 }
629 Err(e) => {
630 tracing::warn!(
631 error = %e.as_report(),
632 "Failed to acquire compaction catalog agent"
633 );
634 return (
635 compact_done(
636 compact_task,
637 compactor_context.clone(),
638 vec![],
639 TaskStatus::ExecuteFailed,
640 ),
641 None,
642 );
643 }
644 };
645
646 compact_with_agent(
647 compactor_context,
648 compact_task,
649 shutdown_rx,
650 object_id_getter,
651 compaction_catalog_agent_ref,
652 )
653 .await
654}
655
656pub(crate) fn compact_done(
658 mut compact_task: CompactTask,
659 context: CompactorContext,
660 output_ssts: Vec<CompactOutput>,
661 task_status: TaskStatus,
662) -> (
663 CompactTask,
664 HashMap<u32, TableStats>,
665 HashMap<HummockSstableObjectId, u64>,
666) {
667 let mut table_stats_map = TableStatsMap::default();
668 let mut object_timestamps = HashMap::default();
669 compact_task.task_status = task_status;
670 compact_task
671 .sorted_output_ssts
672 .reserve(compact_task.splits.len());
673 let mut compaction_write_bytes = 0;
674 for (
675 _,
676 ssts,
677 CompactionStatistics {
678 delta_drop_stat, ..
679 },
680 ) in output_ssts
681 {
682 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
683 for sst_info in ssts {
684 compaction_write_bytes += sst_info.file_size();
685 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
686 compact_task.sorted_output_ssts.push(sst_info.sst_info);
687 }
688 }
689
690 let group_label = compact_task.compaction_group_id.to_string();
691 let level_label = compact_task.target_level.to_string();
692 context
693 .compactor_metrics
694 .compact_write_bytes
695 .with_label_values(&[&group_label, &level_label])
696 .inc_by(compaction_write_bytes);
697 context
698 .compactor_metrics
699 .compact_write_sstn
700 .with_label_values(&[&group_label, &level_label])
701 .inc_by(compact_task.sorted_output_ssts.len() as u64);
702
703 (compact_task, table_stats_map, object_timestamps)
704}
705
706pub async fn compact_and_build_sst<F>(
707 sst_builder: &mut CapacitySplitTableBuilder<F>,
708 task_config: &TaskConfig,
709 compactor_metrics: Arc<CompactorMetrics>,
710 mut iter: impl HummockIterator<Direction = Forward>,
711 mut compaction_filter: impl CompactionFilter,
712) -> HummockResult<CompactionStatistics>
713where
714 F: TableBuilderFactory,
715{
716 if !task_config.key_range.left.is_empty() {
717 let full_key = FullKey::decode(&task_config.key_range.left);
718 iter.seek(full_key)
719 .instrument_await("iter_seek".verbose())
720 .await?;
721 } else {
722 iter.rewind().instrument_await("rewind".verbose()).await?;
723 };
724
725 let end_key = if task_config.key_range.right.is_empty() {
726 FullKey::default()
727 } else {
728 FullKey::decode(&task_config.key_range.right).to_vec()
729 };
730 let max_key = end_key.to_ref();
731
732 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
733 let mut local_stats = StoreLocalStatistic::default();
734
735 let mut table_stats_drop = TableStatsMap::default();
737 let mut last_table_stats = TableStats::default();
738 let mut last_table_id = None;
739 let mut compaction_statistics = CompactionStatistics::default();
740 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
742 let schemas: HashMap<u32, HashSet<i32>> = task_config
743 .table_schemas
744 .iter()
745 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
746 .collect();
747 while iter.is_valid() {
748 let iter_key = iter.key();
749 compaction_statistics.iter_total_key_counts += 1;
750
751 let is_new_user_key = full_key_tracker.observe(iter.key());
752 let mut drop = false;
753
754 let value = iter.value();
756 let ValueMeta {
757 object_id,
758 block_id,
759 } = iter.value_meta();
760 if is_new_user_key {
761 if !max_key.is_empty() && iter_key >= max_key {
762 break;
763 }
764 if value.is_delete() {
765 local_stats.skip_delete_key_count += 1;
766 }
767 } else {
768 local_stats.skip_multi_version_key_count += 1;
769 }
770
771 if last_table_id != Some(iter_key.user_key.table_id.table_id) {
772 if let Some(last_table_id) = last_table_id.take() {
773 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
774 }
775 last_table_id = Some(iter_key.user_key.table_id.table_id);
776 }
777
778 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
785 || (!task_config.retain_multiple_version && !is_new_user_key)
786 {
787 drop = true;
788 }
789
790 if !drop && compaction_filter.should_delete(iter_key) {
791 drop = true;
792 }
793
794 if drop {
795 compaction_statistics.iter_drop_key_counts += 1;
796
797 let should_count = match task_config.stats_target_table_ids.as_ref() {
798 Some(target_table_ids) => {
799 target_table_ids.contains(&iter_key.user_key.table_id.table_id)
800 }
801 None => true,
802 };
803 if should_count {
804 last_table_stats.total_key_count -= 1;
805 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
806 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
807 }
808 iter.next()
809 .instrument_await("iter_next_in_drop".verbose())
810 .await?;
811 continue;
812 }
813
814 let check_table_id = iter_key.user_key.table_id.table_id;
816 let mut is_value_rewritten = false;
817 if let HummockValue::Put(v) = value
818 && let Some(object_id) = object_id
819 && let Some(block_id) = block_id
820 && !skip_schema_check
821 .get(&object_id)
822 .map(|prev_block_id| {
823 assert!(*prev_block_id <= block_id);
824 *prev_block_id == block_id
825 })
826 .unwrap_or(false)
827 && let Some(schema) = schemas.get(&check_table_id)
828 {
829 let value_size = v.len();
830 match try_drop_invalid_columns(v, schema) {
831 None => {
832 if !task_config.disable_drop_column_optimization {
833 skip_schema_check.insert(object_id, block_id);
836 }
837 }
838 Some(new_value) => {
839 is_value_rewritten = true;
840 let new_put = HummockValue::put(new_value.as_slice());
841 sst_builder
842 .add_full_key(iter_key, new_put, is_new_user_key)
843 .instrument_await("add_rewritten_full_key".verbose())
844 .await?;
845 let value_size_change = value_size as i64 - new_value.len() as i64;
846 assert!(value_size_change >= 0);
847 last_table_stats.total_value_size -= value_size_change;
848 }
849 }
850 }
851
852 if !is_value_rewritten {
853 sst_builder
855 .add_full_key(iter_key, value, is_new_user_key)
856 .instrument_await("add_full_key".verbose())
857 .await?;
858 }
859
860 iter.next().instrument_await("iter_next".verbose()).await?;
861 }
862
863 if let Some(last_table_id) = last_table_id.take() {
864 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
865 }
866 iter.collect_local_statistic(&mut local_stats);
867 add_table_stats_map(
868 &mut table_stats_drop,
869 &local_stats.skipped_by_watermark_table_stats,
870 );
871 local_stats.report_compactor(compactor_metrics.as_ref());
872 compaction_statistics.delta_drop_stat = table_stats_drop;
873
874 Ok(compaction_statistics)
875}
876
877#[cfg(test)]
878pub mod tests {
879 use risingwave_hummock_sdk::can_concat;
880
881 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
882 use crate::hummock::iterator::test_utils::mock_sstable_store;
883 use crate::hummock::test_utils::{
884 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
885 };
886 use crate::hummock::value::HummockValue;
887
888 #[tokio::test]
889 async fn test_partition_overlapping_level() {
890 const TEST_KEYS_COUNT: usize = 10;
891 let sstable_store = mock_sstable_store().await;
892 let mut table_infos = vec![];
893 for object_id in 0..10 {
894 let start_index = object_id * TEST_KEYS_COUNT;
895 let end_index = start_index + 2 * TEST_KEYS_COUNT;
896 let table_info = gen_test_sstable_info(
897 default_builder_opt_for_test(),
898 object_id as u64,
899 (start_index..end_index)
900 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
901 sstable_store.clone(),
902 )
903 .await;
904 table_infos.push(table_info);
905 }
906 let table_infos = partition_overlapping_sstable_infos(table_infos);
907 assert_eq!(table_infos.len(), 2);
908 for ssts in table_infos {
909 assert!(can_concat(&ssts));
910 }
911 }
912}