1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24 compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33 HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34 full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46 build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47 metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52 CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53 fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56 Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57 NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58 ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64 BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65 SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69 compact_task: CompactTask,
70 compactor: Compactor,
71 sstable_store: SstableStoreRef,
72 key_range: KeyRange,
73 split_index: usize,
74}
75
76impl CompactorRunner {
77 pub fn new(
78 split_index: usize,
79 context: CompactorContext,
80 task: CompactTask,
81 object_id_getter: Box<dyn GetObjectId>,
82 ) -> Self {
83 let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84 options.compression_algorithm = match task.compression_algorithm {
85 0 => CompressionAlgorithm::None,
86 1 => CompressionAlgorithm::Lz4,
87 _ => CompressionAlgorithm::Zstd,
88 };
89
90 options.capacity = estimate_task_output_capacity(context.clone(), &task);
91 let kv_count = task
92 .input_ssts
93 .iter()
94 .flat_map(|level| level.table_infos.iter())
95 .map(|sst| sst.total_key_count)
96 .sum::<u64>() as usize;
97 let use_block_based_filter =
98 BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100 let key_range = KeyRange {
101 left: task.splits[split_index].left.clone(),
102 right: task.splits[split_index].right.clone(),
103 right_exclusive: true,
104 };
105
106 let compactor = Compactor::new(
107 context.clone(),
108 options,
109 TaskConfig {
110 key_range: key_range.clone(),
111 cache_policy: CachePolicy::NotFill,
112 gc_delete_keys: task.gc_delete_keys,
113 retain_multiple_version: false,
114 stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115 task_type: task.task_type,
116 use_block_based_filter,
117 table_vnode_partition: task.table_vnode_partition.clone(),
118 table_schemas: task
119 .table_schemas
120 .iter()
121 .map(|(k, v)| (*k, v.clone()))
122 .collect(),
123 disable_drop_column_optimization: false,
124 },
125 object_id_getter,
126 );
127
128 Self {
129 compactor,
130 compact_task: task,
131 sstable_store: context.sstable_store,
132 key_range,
133 split_index,
134 }
135 }
136
137 pub async fn run(
138 &self,
139 compaction_filter: impl CompactionFilter,
140 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141 task_progress: Arc<TaskProgress>,
142 ) -> HummockResult<CompactOutput> {
143 let iter =
144 self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145 let (ssts, compaction_stat) = self
146 .compactor
147 .compact_key_range(
148 iter,
149 compaction_filter,
150 compaction_catalog_agent_ref,
151 Some(task_progress),
152 Some(self.compact_task.task_id),
153 Some(self.split_index),
154 )
155 .await?;
156 Ok((self.split_index, ssts, compaction_stat))
157 }
158
159 fn build_sst_iter(
161 &self,
162 task_progress: Arc<TaskProgress>,
163 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164 ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165 let compactor_iter_max_io_retry_times = self
166 .compactor
167 .context
168 .storage_opts
169 .compactor_iter_max_io_retry_times;
170 let mut table_iters = Vec::new();
171 for level in &self.compact_task.input_ssts {
172 if level.table_infos.is_empty() {
173 continue;
174 }
175
176 let tables = level
177 .table_infos
178 .iter()
179 .filter(|table_info| {
180 let table_ids = &table_info.table_ids;
181 let exist_table = table_ids
182 .iter()
183 .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185 self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186 })
187 .cloned()
188 .collect_vec();
189 if level.level_type == LevelType::Nonoverlapping {
191 debug_assert!(can_concat(&level.table_infos));
192 table_iters.push(ConcatSstableIterator::new(
193 self.compact_task.existing_table_ids.clone(),
194 tables,
195 self.compactor.task_config.key_range.clone(),
196 self.sstable_store.clone(),
197 task_progress.clone(),
198 compactor_iter_max_io_retry_times,
199 ));
200 } else if tables.len()
201 > self
202 .compactor
203 .context
204 .storage_opts
205 .compactor_max_overlap_sst_count
206 {
207 let sst_groups = partition_overlapping_sstable_infos(tables);
208 tracing::warn!(
209 "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210 level.table_infos.len(),
211 sst_groups.len()
212 );
213 for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214 assert!(
216 full_key_can_concat(&table_infos),
217 "sst_group idx {:?} table_infos: {:?}",
218 idx,
219 table_infos
220 );
221 table_iters.push(ConcatSstableIterator::new(
222 self.compact_task.existing_table_ids.clone(),
223 table_infos,
224 self.compactor.task_config.key_range.clone(),
225 self.sstable_store.clone(),
226 task_progress.clone(),
227 compactor_iter_max_io_retry_times,
228 ));
229 }
230 } else {
231 for table_info in tables {
232 table_iters.push(ConcatSstableIterator::new(
233 self.compact_task.existing_table_ids.clone(),
234 vec![table_info],
235 self.compactor.task_config.key_range.clone(),
236 self.sstable_store.clone(),
237 task_progress.clone(),
238 compactor_iter_max_io_retry_times,
239 ));
240 }
241 }
242 }
243
244 let combine_iter = {
247 let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248 MonitoredCompactorIterator::new(
249 MergeIterator::for_compactor(table_iters),
250 task_progress.clone(),
251 ),
252 PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253 self.compact_task.pk_prefix_table_watermarks.clone(),
254 ),
255 );
256
257 NonPkPrefixSkipWatermarkIterator::new(
258 skip_watermark_iter,
259 NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260 self.compact_task.non_pk_prefix_table_watermarks.clone(),
261 compaction_catalog_agent_ref,
262 ),
263 )
264 };
265
266 Ok(combine_iter)
267 }
268}
269
270pub fn partition_overlapping_sstable_infos(
271 mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273 pub struct SstableGroup {
274 ssts: Vec<SstableInfo>,
275 max_right_bound: Bytes,
276 }
277
278 impl PartialEq for SstableGroup {
279 fn eq(&self, other: &SstableGroup) -> bool {
280 self.max_right_bound == other.max_right_bound
281 }
282 }
283 impl PartialOrd for SstableGroup {
284 fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285 Some(self.cmp(other))
286 }
287 }
288 impl Eq for SstableGroup {}
289 impl Ord for SstableGroup {
290 fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291 KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293 }
294 }
295 let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296 origin_infos.sort_by(|a, b| {
297 let x = &a.key_range;
298 let y = &b.key_range;
299 KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300 });
301 for sst in origin_infos {
302 if let Some(mut prev_group) = groups.peek_mut() {
304 if KeyComparator::encoded_full_key_less_than(
305 &prev_group.max_right_bound,
306 &sst.key_range.left,
307 ) {
308 prev_group.max_right_bound.clone_from(&sst.key_range.right);
309 prev_group.ssts.push(sst);
310 continue;
311 }
312 }
313 groups.push(SstableGroup {
314 max_right_bound: sst.key_range.right.clone(),
315 ssts: vec![sst],
316 });
317 }
318 assert!(!groups.is_empty());
319 groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322pub async fn compact_with_agent(
325 compactor_context: CompactorContext,
326 mut compact_task: CompactTask,
327 mut shutdown_rx: Receiver<()>,
328 object_id_getter: Box<dyn GetObjectId>,
329 compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331 (
332 CompactTask,
333 HashMap<u32, TableStats>,
334 HashMap<HummockSstableObjectId, u64>,
335 ),
336 Option<MemoryTracker>,
337) {
338 let context = compactor_context.clone();
339 let group_label = compact_task.compaction_group_id.to_string();
340 metrics_report_for_task(&compact_task, &context);
341
342 let timer = context
343 .compactor_metrics
344 .compact_task_duration
345 .with_label_values(&[
346 &group_label,
347 &compact_task.input_ssts[0].level_idx.to_string(),
348 ])
349 .start_timer();
350
351 let multi_filter = build_multi_compaction_filter(&compact_task);
352 let mut task_status = TaskStatus::Success;
353 let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355 if let Err(e) =
356 generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357 {
358 tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359 task_status = TaskStatus::ExecuteFailed;
360 return (
361 compact_done(compact_task, context.clone(), vec![], task_status),
362 None,
363 );
364 }
365
366 let compact_task_statistics = statistics_compact_task(&compact_task);
367 let parallelism = compact_task.splits.len();
369 assert_ne!(parallelism, 0, "splits cannot be empty");
370 let mut output_ssts = Vec::with_capacity(parallelism);
371 let mut compaction_futures = vec![];
372 let mut abort_handles = vec![];
373 let task_progress_guard =
374 TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376 let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378 let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379 &compact_task,
380 (context.storage_opts.block_size_kb as u64) * (1 << 10),
381 context
382 .storage_opts
383 .object_store_config
384 .s3
385 .recv_buffer_size
386 .unwrap_or(6 * 1024 * 1024) as u64,
387 capacity as u64,
388 ) * compact_task.splits.len() as u64;
389
390 tracing::info!(
391 "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392 compact_task.task_id,
393 compact_task_statistics,
394 compact_task.compression_algorithm,
395 parallelism,
396 task_memory_capacity_with_parallelism,
397 optimize_by_copy_block,
398 compact_task_to_string(&compact_task),
399 );
400
401 let memory_detector = context
404 .memory_limiter
405 .try_require_memory(task_memory_capacity_with_parallelism);
406 if memory_detector.is_none() {
407 tracing::warn!(
408 "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {} memory_usage {} memory_quota {}",
409 compact_task.task_id,
410 task_memory_capacity_with_parallelism,
411 context.memory_limiter.get_memory_usage(),
412 context.memory_limiter.quota()
413 );
414 task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415 return (
416 compact_done(compact_task, context.clone(), output_ssts, task_status),
417 memory_detector,
418 );
419 }
420
421 context.compactor_metrics.compact_task_pending_num.inc();
422 context
423 .compactor_metrics
424 .compact_task_pending_parallelism
425 .add(parallelism as _);
426 let _release_metrics_guard =
427 scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428 context.compactor_metrics.compact_task_pending_num.dec();
429 context
430 .compactor_metrics
431 .compact_task_pending_parallelism
432 .sub(parallelism as _);
433 });
434
435 if optimize_by_copy_block {
436 let runner = fast_compactor_runner::CompactorRunner::new(
437 context.clone(),
438 compact_task.clone(),
439 compaction_catalog_agent_ref.clone(),
440 object_id_getter.clone(),
441 task_progress_guard.progress.clone(),
442 );
443
444 tokio::select! {
445 _ = &mut shutdown_rx => {
446 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
447 task_status = TaskStatus::ManualCanceled;
448 },
449
450 ret = runner.run() => {
451 match ret {
452 Ok((ssts, statistics)) => {
453 output_ssts.push((0, ssts, statistics));
454 }
455 Err(e) => {
456 task_status = TaskStatus::ExecuteFailed;
457 tracing::warn!(
458 error = %e.as_report(),
459 "Compaction task {} failed with error",
460 compact_task.task_id,
461 );
462 }
463 }
464 }
465 }
466
467 let (compact_task, table_stats, object_timestamps) =
469 compact_done(compact_task, context.clone(), output_ssts, task_status);
470 let cost_time = timer.stop_and_record() * 1000.0;
471 tracing::info!(
472 "Finished fast compaction task in {:?}ms: {}",
473 cost_time,
474 compact_task_to_string(&compact_task)
475 );
476 return (
477 (compact_task, table_stats, object_timestamps),
478 memory_detector,
479 );
480 }
481 for (split_index, _) in compact_task.splits.iter().enumerate() {
482 let filter = multi_filter.clone();
483 let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
484 let compactor_runner = CompactorRunner::new(
485 split_index,
486 compactor_context.clone(),
487 compact_task.clone(),
488 object_id_getter.clone(),
489 );
490 let task_progress = task_progress_guard.progress.clone();
491 let runner = async move {
492 compactor_runner
493 .run(filter, compaction_catalog_agent_ref, task_progress)
494 .await
495 };
496 let traced = match context.await_tree_reg.as_ref() {
497 None => runner.right_future(),
498 Some(await_tree_reg) => await_tree_reg
499 .register(
500 await_tree_key::CompactRunner {
501 task_id: compact_task.task_id,
502 split_index,
503 },
504 format!(
505 "Compaction Task {} Split {} ",
506 compact_task.task_id, split_index
507 ),
508 )
509 .instrument(runner)
510 .left_future(),
511 };
512 let handle = tokio::spawn(traced);
513 abort_handles.push(handle.abort_handle());
514 compaction_futures.push(handle);
515 }
516
517 let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
518 loop {
519 tokio::select! {
520 _ = &mut shutdown_rx => {
521 tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
522 task_status = TaskStatus::ManualCanceled;
523 break;
524 }
525 future_result = buffered.next() => {
526 match future_result {
527 Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
528 output_ssts.push((split_index, ssts, compact_stat));
529 }
530 Some(Ok(Err(e))) => {
531 task_status = TaskStatus::ExecuteFailed;
532 tracing::warn!(
533 error = %e.as_report(),
534 "Compaction task {} failed with error",
535 compact_task.task_id,
536 );
537 break;
538 }
539 Some(Err(e)) => {
540 task_status = TaskStatus::JoinHandleFailed;
541 tracing::warn!(
542 error = %e.as_report(),
543 "Compaction task {} failed with join handle error",
544 compact_task.task_id,
545 );
546 break;
547 }
548 None => break,
549 }
550 }
551 }
552 }
553
554 if task_status != TaskStatus::Success {
555 for abort_handle in abort_handles {
556 abort_handle.abort();
557 }
558 output_ssts.clear();
559 }
560 if !output_ssts.is_empty() {
562 output_ssts.sort_by_key(|(split_index, ..)| *split_index);
563 }
564
565 let (compact_task, table_stats, object_timestamps) =
567 compact_done(compact_task, context.clone(), output_ssts, task_status);
568 let cost_time = timer.stop_and_record() * 1000.0;
569 tracing::info!(
570 "Finished compaction task in {:?}ms: {}",
571 cost_time,
572 compact_task_output_to_string(&compact_task)
573 );
574 (
575 (compact_task, table_stats, object_timestamps),
576 memory_detector,
577 )
578}
579
580pub async fn compact(
583 compactor_context: CompactorContext,
584 compact_task: CompactTask,
585 shutdown_rx: Receiver<()>,
586 object_id_getter: Box<dyn GetObjectId>,
587 compaction_catalog_manager_ref: CompactionCatalogManagerRef,
588) -> (
589 (
590 CompactTask,
591 HashMap<u32, TableStats>,
592 HashMap<HummockSstableObjectId, u64>,
593 ),
594 Option<MemoryTracker>,
595) {
596 let compact_table_ids = compact_task.build_compact_table_ids();
597 let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
598 .acquire(compact_table_ids.clone())
599 .await
600 {
601 Ok(compaction_catalog_agent_ref) => {
602 let acquire_table_ids: HashSet<StateTableId> =
603 compaction_catalog_agent_ref.table_ids().collect();
604 if acquire_table_ids.len() != compact_table_ids.len() {
605 let diff = compact_table_ids
606 .into_iter()
607 .collect::<HashSet<_>>()
608 .symmetric_difference(&acquire_table_ids)
609 .cloned()
610 .collect::<Vec<_>>();
611 tracing::warn!(
612 dif= ?diff,
613 "Some table ids are not acquired."
614 );
615 return (
616 compact_done(
617 compact_task,
618 compactor_context.clone(),
619 vec![],
620 TaskStatus::ExecuteFailed,
621 ),
622 None,
623 );
624 }
625
626 compaction_catalog_agent_ref
627 }
628 Err(e) => {
629 tracing::warn!(
630 error = %e.as_report(),
631 "Failed to acquire compaction catalog agent"
632 );
633 return (
634 compact_done(
635 compact_task,
636 compactor_context.clone(),
637 vec![],
638 TaskStatus::ExecuteFailed,
639 ),
640 None,
641 );
642 }
643 };
644
645 compact_with_agent(
646 compactor_context,
647 compact_task,
648 shutdown_rx,
649 object_id_getter,
650 compaction_catalog_agent_ref,
651 )
652 .await
653}
654
655pub(crate) fn compact_done(
657 mut compact_task: CompactTask,
658 context: CompactorContext,
659 output_ssts: Vec<CompactOutput>,
660 task_status: TaskStatus,
661) -> (
662 CompactTask,
663 HashMap<u32, TableStats>,
664 HashMap<HummockSstableObjectId, u64>,
665) {
666 let mut table_stats_map = TableStatsMap::default();
667 let mut object_timestamps = HashMap::default();
668 compact_task.task_status = task_status;
669 compact_task
670 .sorted_output_ssts
671 .reserve(compact_task.splits.len());
672 let mut compaction_write_bytes = 0;
673 for (
674 _,
675 ssts,
676 CompactionStatistics {
677 delta_drop_stat, ..
678 },
679 ) in output_ssts
680 {
681 add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
682 for sst_info in ssts {
683 compaction_write_bytes += sst_info.file_size();
684 object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
685 compact_task.sorted_output_ssts.push(sst_info.sst_info);
686 }
687 }
688
689 let group_label = compact_task.compaction_group_id.to_string();
690 let level_label = compact_task.target_level.to_string();
691 context
692 .compactor_metrics
693 .compact_write_bytes
694 .with_label_values(&[&group_label, level_label.as_str()])
695 .inc_by(compaction_write_bytes);
696 context
697 .compactor_metrics
698 .compact_write_sstn
699 .with_label_values(&[&group_label, level_label.as_str()])
700 .inc_by(compact_task.sorted_output_ssts.len() as u64);
701
702 (compact_task, table_stats_map, object_timestamps)
703}
704
705pub async fn compact_and_build_sst<F>(
706 sst_builder: &mut CapacitySplitTableBuilder<F>,
707 task_config: &TaskConfig,
708 compactor_metrics: Arc<CompactorMetrics>,
709 mut iter: impl HummockIterator<Direction = Forward>,
710 mut compaction_filter: impl CompactionFilter,
711) -> HummockResult<CompactionStatistics>
712where
713 F: TableBuilderFactory,
714{
715 if !task_config.key_range.left.is_empty() {
716 let full_key = FullKey::decode(&task_config.key_range.left);
717 iter.seek(full_key)
718 .instrument_await("iter_seek".verbose())
719 .await?;
720 } else {
721 iter.rewind().instrument_await("rewind".verbose()).await?;
722 };
723
724 let end_key = if task_config.key_range.right.is_empty() {
725 FullKey::default()
726 } else {
727 FullKey::decode(&task_config.key_range.right).to_vec()
728 };
729 let max_key = end_key.to_ref();
730
731 let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
732 let mut local_stats = StoreLocalStatistic::default();
733
734 let mut table_stats_drop = TableStatsMap::default();
736 let mut last_table_stats = TableStats::default();
737 let mut last_table_id = None;
738 let mut compaction_statistics = CompactionStatistics::default();
739 let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
741 let schemas: HashMap<u32, HashSet<i32>> = task_config
742 .table_schemas
743 .iter()
744 .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
745 .collect();
746 while iter.is_valid() {
747 let iter_key = iter.key();
748 compaction_statistics.iter_total_key_counts += 1;
749
750 let is_new_user_key = full_key_tracker.observe(iter.key());
751 let mut drop = false;
752
753 let value = iter.value();
755 let ValueMeta {
756 object_id,
757 block_id,
758 } = iter.value_meta();
759 if is_new_user_key {
760 if !max_key.is_empty() && iter_key >= max_key {
761 break;
762 }
763 if value.is_delete() {
764 local_stats.skip_delete_key_count += 1;
765 }
766 } else {
767 local_stats.skip_multi_version_key_count += 1;
768 }
769
770 if last_table_id != Some(iter_key.user_key.table_id.table_id) {
771 if let Some(last_table_id) = last_table_id.take() {
772 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
773 }
774 last_table_id = Some(iter_key.user_key.table_id.table_id);
775 }
776
777 if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
784 || (!task_config.retain_multiple_version && !is_new_user_key)
785 {
786 drop = true;
787 }
788
789 if !drop && compaction_filter.should_delete(iter_key) {
790 drop = true;
791 }
792
793 if drop {
794 compaction_statistics.iter_drop_key_counts += 1;
795
796 let should_count = match task_config.stats_target_table_ids.as_ref() {
797 Some(target_table_ids) => {
798 target_table_ids.contains(&iter_key.user_key.table_id.table_id)
799 }
800 None => true,
801 };
802 if should_count {
803 last_table_stats.total_key_count -= 1;
804 last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
805 last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
806 }
807 iter.next()
808 .instrument_await("iter_next_in_drop".verbose())
809 .await?;
810 continue;
811 }
812
813 let check_table_id = iter_key.user_key.table_id.table_id;
815 let mut is_value_rewritten = false;
816 if let HummockValue::Put(v) = value
817 && let Some(object_id) = object_id
818 && let Some(block_id) = block_id
819 && !skip_schema_check
820 .get(&object_id)
821 .map(|prev_block_id| {
822 assert!(*prev_block_id <= block_id);
823 *prev_block_id == block_id
824 })
825 .unwrap_or(false)
826 && let Some(schema) = schemas.get(&check_table_id)
827 {
828 let value_size = v.len();
829 match try_drop_invalid_columns(v, schema) {
830 None => {
831 if !task_config.disable_drop_column_optimization {
832 skip_schema_check.insert(object_id, block_id);
835 }
836 }
837 Some(new_value) => {
838 is_value_rewritten = true;
839 let new_put = HummockValue::put(new_value.as_slice());
840 sst_builder
841 .add_full_key(iter_key, new_put, is_new_user_key)
842 .instrument_await("add_rewritten_full_key".verbose())
843 .await?;
844 let value_size_change = value_size as i64 - new_value.len() as i64;
845 assert!(value_size_change >= 0);
846 last_table_stats.total_value_size -= value_size_change;
847 }
848 }
849 }
850
851 if !is_value_rewritten {
852 sst_builder
854 .add_full_key(iter_key, value, is_new_user_key)
855 .instrument_await("add_full_key".verbose())
856 .await?;
857 }
858
859 iter.next().instrument_await("iter_next".verbose()).await?;
860 }
861
862 if let Some(last_table_id) = last_table_id.take() {
863 table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
864 }
865 iter.collect_local_statistic(&mut local_stats);
866 add_table_stats_map(
867 &mut table_stats_drop,
868 &local_stats.skipped_by_watermark_table_stats,
869 );
870 local_stats.report_compactor(compactor_metrics.as_ref());
871 compaction_statistics.delta_drop_stat = table_stats_drop;
872
873 Ok(compaction_statistics)
874}
875
876#[cfg(test)]
877pub mod tests {
878 use risingwave_hummock_sdk::can_concat;
879
880 use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
881 use crate::hummock::iterator::test_utils::mock_sstable_store;
882 use crate::hummock::test_utils::{
883 default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
884 };
885 use crate::hummock::value::HummockValue;
886
887 #[tokio::test]
888 async fn test_partition_overlapping_level() {
889 const TEST_KEYS_COUNT: usize = 10;
890 let sstable_store = mock_sstable_store().await;
891 let mut table_infos = vec![];
892 for object_id in 0..10 {
893 let start_index = object_id * TEST_KEYS_COUNT;
894 let end_index = start_index + 2 * TEST_KEYS_COUNT;
895 let table_info = gen_test_sstable_info(
896 default_builder_opt_for_test(),
897 object_id as u64,
898 (start_index..end_index)
899 .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
900 sstable_store.clone(),
901 )
902 .await;
903 table_infos.push(table_info);
904 }
905 let table_infos = partition_overlapping_sstable_infos(table_infos);
906 assert_eq!(table_infos.len(), 2);
907 for ssts in table_infos {
908 assert!(can_concat(&ssts));
909 }
910 }
911}