1use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
66    SstableBuilderOptions, SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        let kv_count = task
93            .input_ssts
94            .iter()
95            .flat_map(|level| level.table_infos.iter())
96            .map(|sst| sst.total_key_count)
97            .sum::<u64>() as usize;
98        let use_block_based_filter =
99            BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
100
101        let key_range = KeyRange {
102            left: task.splits[split_index].left.clone(),
103            right: task.splits[split_index].right.clone(),
104            right_exclusive: true,
105        };
106
107        let compactor = Compactor::new(
108            context.clone(),
109            options,
110            TaskConfig {
111                key_range: key_range.clone(),
112                cache_policy: CachePolicy::NotFill,
113                gc_delete_keys: task.gc_delete_keys,
114                retain_multiple_version: false,
115                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
116                task_type: task.task_type,
117                use_block_based_filter,
118                table_vnode_partition: task.table_vnode_partition.clone(),
119                table_schemas: task
120                    .table_schemas
121                    .iter()
122                    .map(|(k, v)| (*k, v.clone()))
123                    .collect(),
124                disable_drop_column_optimization: false,
125            },
126            object_id_getter,
127        );
128
129        Self {
130            compactor,
131            compact_task: task,
132            sstable_store: context.sstable_store,
133            key_range,
134            split_index,
135        }
136    }
137
138    pub async fn run(
139        &self,
140        compaction_filter: impl CompactionFilter,
141        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142        task_progress: Arc<TaskProgress>,
143    ) -> HummockResult<CompactOutput> {
144        let iter =
145            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
146        let (ssts, compaction_stat) = self
147            .compactor
148            .compact_key_range(
149                iter,
150                compaction_filter,
151                compaction_catalog_agent_ref,
152                Some(task_progress),
153                Some(self.compact_task.task_id),
154                Some(self.split_index),
155            )
156            .await?;
157        Ok((self.split_index, ssts, compaction_stat))
158    }
159
160    fn build_sst_iter(
162        &self,
163        task_progress: Arc<TaskProgress>,
164        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
165    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
166        let compactor_iter_max_io_retry_times = self
167            .compactor
168            .context
169            .storage_opts
170            .compactor_iter_max_io_retry_times;
171        let mut table_iters = Vec::new();
172        for level in &self.compact_task.input_ssts {
173            if level.table_infos.is_empty() {
174                continue;
175            }
176
177            let tables = level
178                .table_infos
179                .iter()
180                .filter(|table_info| {
181                    let table_ids = &table_info.table_ids;
182                    let exist_table = table_ids
183                        .iter()
184                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
185
186                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
187                })
188                .cloned()
189                .collect_vec();
190            if level.level_type == LevelType::Nonoverlapping {
192                debug_assert!(can_concat(&level.table_infos));
193                table_iters.push(ConcatSstableIterator::new(
194                    self.compact_task.existing_table_ids.clone(),
195                    tables,
196                    self.compactor.task_config.key_range.clone(),
197                    self.sstable_store.clone(),
198                    task_progress.clone(),
199                    compactor_iter_max_io_retry_times,
200                ));
201            } else if tables.len()
202                > self
203                    .compactor
204                    .context
205                    .storage_opts
206                    .compactor_max_overlap_sst_count
207            {
208                let sst_groups = partition_overlapping_sstable_infos(tables);
209                tracing::warn!(
210                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
211                    level.table_infos.len(),
212                    sst_groups.len()
213                );
214                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
215                    assert!(
217                        full_key_can_concat(&table_infos),
218                        "sst_group idx {:?} table_infos: {:?}",
219                        idx,
220                        table_infos
221                    );
222                    table_iters.push(ConcatSstableIterator::new(
223                        self.compact_task.existing_table_ids.clone(),
224                        table_infos,
225                        self.compactor.task_config.key_range.clone(),
226                        self.sstable_store.clone(),
227                        task_progress.clone(),
228                        compactor_iter_max_io_retry_times,
229                    ));
230                }
231            } else {
232                for table_info in tables {
233                    table_iters.push(ConcatSstableIterator::new(
234                        self.compact_task.existing_table_ids.clone(),
235                        vec![table_info],
236                        self.compactor.task_config.key_range.clone(),
237                        self.sstable_store.clone(),
238                        task_progress.clone(),
239                        compactor_iter_max_io_retry_times,
240                    ));
241                }
242            }
243        }
244
245        let combine_iter = {
248            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
249                MonitoredCompactorIterator::new(
250                    MergeIterator::for_compactor(table_iters),
251                    task_progress,
252                ),
253                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
254                    self.compact_task.pk_prefix_table_watermarks.clone(),
255                ),
256            );
257
258            NonPkPrefixSkipWatermarkIterator::new(
259                skip_watermark_iter,
260                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
261                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
262                    compaction_catalog_agent_ref,
263                ),
264            )
265        };
266
267        Ok(combine_iter)
268    }
269}
270
271pub fn partition_overlapping_sstable_infos(
272    mut origin_infos: Vec<SstableInfo>,
273) -> Vec<Vec<SstableInfo>> {
274    pub struct SstableGroup {
275        ssts: Vec<SstableInfo>,
276        max_right_bound: Bytes,
277    }
278
279    impl PartialEq for SstableGroup {
280        fn eq(&self, other: &SstableGroup) -> bool {
281            self.max_right_bound == other.max_right_bound
282        }
283    }
284    impl PartialOrd for SstableGroup {
285        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
286            Some(self.cmp(other))
287        }
288    }
289    impl Eq for SstableGroup {}
290    impl Ord for SstableGroup {
291        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
292            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
294        }
295    }
296    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
297    origin_infos.sort_by(|a, b| {
298        let x = &a.key_range;
299        let y = &b.key_range;
300        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
301    });
302    for sst in origin_infos {
303        if let Some(mut prev_group) = groups.peek_mut()
305            && KeyComparator::encoded_full_key_less_than(
306                &prev_group.max_right_bound,
307                &sst.key_range.left,
308            )
309        {
310            prev_group.max_right_bound.clone_from(&sst.key_range.right);
311            prev_group.ssts.push(sst);
312            continue;
313        }
314        groups.push(SstableGroup {
315            max_right_bound: sst.key_range.right.clone(),
316            ssts: vec![sst],
317        });
318    }
319    assert!(!groups.is_empty());
320    groups.into_iter().map(|group| group.ssts).collect_vec()
321}
322
323pub async fn compact_with_agent(
326    compactor_context: CompactorContext,
327    mut compact_task: CompactTask,
328    mut shutdown_rx: Receiver<()>,
329    object_id_getter: Arc<dyn GetObjectId>,
330    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331) -> (
332    (
333        CompactTask,
334        HashMap<TableId, TableStats>,
335        HashMap<HummockSstableObjectId, u64>,
336    ),
337    Option<MemoryTracker>,
338) {
339    let context = compactor_context.clone();
340    let group_label = compact_task.compaction_group_id.to_string();
341    metrics_report_for_task(&compact_task, &context);
342
343    let timer = context
344        .compactor_metrics
345        .compact_task_duration
346        .with_label_values(&[
347            &group_label,
348            &compact_task.input_ssts[0].level_idx.to_string(),
349        ])
350        .start_timer();
351
352    let multi_filter = build_multi_compaction_filter(&compact_task);
353    let mut task_status = TaskStatus::Success;
354    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
355
356    if let Err(e) =
357        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
358    {
359        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
360        task_status = TaskStatus::ExecuteFailed;
361        return (
362            compact_done(compact_task, context.clone(), vec![], task_status),
363            None,
364        );
365    }
366
367    let compact_task_statistics = statistics_compact_task(&compact_task);
368    let parallelism = compact_task.splits.len();
370    assert_ne!(parallelism, 0, "splits cannot be empty");
371    let mut output_ssts = Vec::with_capacity(parallelism);
372    let mut compaction_futures = vec![];
373    let mut abort_handles = vec![];
374    let task_progress_guard =
375        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
376
377    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
378
379    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
380        &compact_task,
381        (context.storage_opts.block_size_kb as u64) * (1 << 10),
382        context
383            .storage_opts
384            .object_store_config
385            .s3
386            .recv_buffer_size
387            .unwrap_or(6 * 1024 * 1024) as u64,
388        capacity as u64,
389    ) * compact_task.splits.len() as u64;
390
391    tracing::info!(
392        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
393        compact_task.task_id,
394        compact_task_statistics,
395        compact_task.compression_algorithm,
396        parallelism,
397        task_memory_capacity_with_parallelism,
398        optimize_by_copy_block,
399        compact_task_to_string(&compact_task),
400    );
401
402    let memory_detector = context
405        .memory_limiter
406        .try_require_memory(task_memory_capacity_with_parallelism);
407    if memory_detector.is_none() {
408        tracing::warn!(
409            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
410            compact_task.task_id,
411            task_memory_capacity_with_parallelism,
412            context.memory_limiter.get_memory_usage(),
413            context.memory_limiter.quota()
414        );
415        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
416        return (
417            compact_done(compact_task, context.clone(), output_ssts, task_status),
418            memory_detector,
419        );
420    }
421
422    context.compactor_metrics.compact_task_pending_num.inc();
423    context
424        .compactor_metrics
425        .compact_task_pending_parallelism
426        .add(parallelism as _);
427    let _release_metrics_guard =
428        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
429            context.compactor_metrics.compact_task_pending_num.dec();
430            context
431                .compactor_metrics
432                .compact_task_pending_parallelism
433                .sub(parallelism as _);
434        });
435
436    if optimize_by_copy_block {
437        let runner = fast_compactor_runner::CompactorRunner::new(
438            context.clone(),
439            compact_task.clone(),
440            compaction_catalog_agent_ref.clone(),
441            object_id_getter.clone(),
442            task_progress_guard.progress.clone(),
443            multi_filter,
444        );
445
446        tokio::select! {
447            _ = &mut shutdown_rx => {
448                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
449                task_status = TaskStatus::ManualCanceled;
450            },
451
452            ret = runner.run() => {
453                match ret {
454                    Ok((ssts, statistics)) => {
455                        output_ssts.push((0, ssts, statistics));
456                    }
457                    Err(e) => {
458                        task_status = TaskStatus::ExecuteFailed;
459                        tracing::warn!(
460                            error = %e.as_report(),
461                            "Compaction task {} failed with error",
462                            compact_task.task_id,
463                        );
464                    }
465                }
466            }
467        }
468
469        let (compact_task, table_stats, object_timestamps) =
471            compact_done(compact_task, context.clone(), output_ssts, task_status);
472        let cost_time = timer.stop_and_record() * 1000.0;
473        tracing::info!(
474            "Finished fast compaction task in {:?}ms: {}",
475            cost_time,
476            compact_task_to_string(&compact_task)
477        );
478        return (
479            (compact_task, table_stats, object_timestamps),
480            memory_detector,
481        );
482    }
483    for (split_index, _) in compact_task.splits.iter().enumerate() {
484        let filter = multi_filter.clone();
485        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
486        let compactor_runner = CompactorRunner::new(
487            split_index,
488            compactor_context.clone(),
489            compact_task.clone(),
490            object_id_getter.clone(),
491        );
492        let task_progress = task_progress_guard.progress.clone();
493        let runner = async move {
494            compactor_runner
495                .run(filter, compaction_catalog_agent_ref, task_progress)
496                .await
497        };
498        let traced = match context.await_tree_reg.as_ref() {
499            None => runner.right_future(),
500            Some(await_tree_reg) => await_tree_reg
501                .register(
502                    await_tree_key::CompactRunner {
503                        task_id: compact_task.task_id,
504                        split_index,
505                    },
506                    format!(
507                        "Compaction Task {} Split {} ",
508                        compact_task.task_id, split_index
509                    ),
510                )
511                .instrument(runner)
512                .left_future(),
513        };
514        let handle = tokio::spawn(traced);
515        abort_handles.push(handle.abort_handle());
516        compaction_futures.push(handle);
517    }
518
519    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
520    loop {
521        tokio::select! {
522            _ = &mut shutdown_rx => {
523                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
524                task_status = TaskStatus::ManualCanceled;
525                break;
526            }
527            future_result = buffered.next() => {
528                match future_result {
529                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
530                        output_ssts.push((split_index, ssts, compact_stat));
531                    }
532                    Some(Ok(Err(e))) => {
533                        task_status = TaskStatus::ExecuteFailed;
534                        tracing::warn!(
535                            error = %e.as_report(),
536                            "Compaction task {} failed with error",
537                            compact_task.task_id,
538                        );
539                        break;
540                    }
541                    Some(Err(e)) => {
542                        task_status = TaskStatus::JoinHandleFailed;
543                        tracing::warn!(
544                            error = %e.as_report(),
545                            "Compaction task {} failed with join handle error",
546                            compact_task.task_id,
547                        );
548                        break;
549                    }
550                    None => break,
551                }
552            }
553        }
554    }
555
556    if task_status != TaskStatus::Success {
557        for abort_handle in abort_handles {
558            abort_handle.abort();
559        }
560        output_ssts.clear();
561    }
562    if !output_ssts.is_empty() {
564        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
565    }
566
567    let (compact_task, table_stats, object_timestamps) =
569        compact_done(compact_task, context.clone(), output_ssts, task_status);
570    let cost_time = timer.stop_and_record() * 1000.0;
571    tracing::info!(
572        "Finished compaction task in {:?}ms: {}",
573        cost_time,
574        compact_task_output_to_string(&compact_task)
575    );
576    (
577        (compact_task, table_stats, object_timestamps),
578        memory_detector,
579    )
580}
581
582pub async fn compact(
585    compactor_context: CompactorContext,
586    mut compact_task: CompactTask,
587    shutdown_rx: Receiver<()>,
588    object_id_getter: Arc<dyn GetObjectId>,
589    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
590) -> (
591    (
592        CompactTask,
593        HashMap<TableId, TableStats>,
594        HashMap<HummockSstableObjectId, u64>,
595    ),
596    Option<MemoryTracker>,
597) {
598    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
599    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
600        .acquire(table_ids_to_be_compacted.clone())
601        .await
602    {
603        Ok(compaction_catalog_agent_ref) => {
604            let acquire_table_ids: HashSet<StateTableId> =
605                compaction_catalog_agent_ref.table_ids().collect();
606            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
607                let diff = table_ids_to_be_compacted
608                    .into_iter()
609                    .collect::<HashSet<_>>()
610                    .symmetric_difference(&acquire_table_ids)
611                    .cloned()
612                    .collect::<Vec<_>>();
613                tracing::warn!(
614                    dif= ?diff,
615                    "Some table ids are not acquired."
616                );
617                return (
618                    compact_done(
619                        compact_task,
620                        compactor_context.clone(),
621                        vec![],
622                        TaskStatus::ExecuteFailed,
623                    ),
624                    None,
625                );
626            }
627
628            compaction_catalog_agent_ref
629        }
630        Err(e) => {
631            tracing::warn!(
632                error = %e.as_report(),
633                "Failed to acquire compaction catalog agent"
634            );
635            return (
636                compact_done(
637                    compact_task,
638                    compactor_context.clone(),
639                    vec![],
640                    TaskStatus::ExecuteFailed,
641                ),
642                None,
643            );
644        }
645    };
646
647    {
649        compact_task
650            .pk_prefix_table_watermarks
651            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
652
653        compact_task
654            .non_pk_prefix_table_watermarks
655            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
656    }
657
658    compact_with_agent(
659        compactor_context,
660        compact_task,
661        shutdown_rx,
662        object_id_getter,
663        compaction_catalog_agent_ref,
664    )
665    .await
666}
667
668pub(crate) fn compact_done(
670    mut compact_task: CompactTask,
671    context: CompactorContext,
672    output_ssts: Vec<CompactOutput>,
673    task_status: TaskStatus,
674) -> (
675    CompactTask,
676    HashMap<TableId, TableStats>,
677    HashMap<HummockSstableObjectId, u64>,
678) {
679    let mut table_stats_map = TableStatsMap::default();
680    let mut object_timestamps = HashMap::default();
681    compact_task.task_status = task_status;
682    compact_task
683        .sorted_output_ssts
684        .reserve(compact_task.splits.len());
685    let mut compaction_write_bytes = 0;
686    for (
687        _,
688        ssts,
689        CompactionStatistics {
690            delta_drop_stat, ..
691        },
692    ) in output_ssts
693    {
694        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
695        for sst_info in ssts {
696            compaction_write_bytes += sst_info.file_size();
697            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
698            compact_task.sorted_output_ssts.push(sst_info.sst_info);
699        }
700    }
701
702    let group_label = compact_task.compaction_group_id.to_string();
703    let level_label = compact_task.target_level.to_string();
704    context
705        .compactor_metrics
706        .compact_write_bytes
707        .with_label_values(&[&group_label, &level_label])
708        .inc_by(compaction_write_bytes);
709    context
710        .compactor_metrics
711        .compact_write_sstn
712        .with_label_values(&[&group_label, &level_label])
713        .inc_by(compact_task.sorted_output_ssts.len() as u64);
714
715    (compact_task, table_stats_map, object_timestamps)
716}
717
718pub async fn compact_and_build_sst<F>(
719    sst_builder: &mut CapacitySplitTableBuilder<F>,
720    task_config: &TaskConfig,
721    compactor_metrics: Arc<CompactorMetrics>,
722    mut iter: impl HummockIterator<Direction = Forward>,
723    mut compaction_filter: impl CompactionFilter,
724) -> HummockResult<CompactionStatistics>
725where
726    F: TableBuilderFactory,
727{
728    if !task_config.key_range.left.is_empty() {
729        let full_key = FullKey::decode(&task_config.key_range.left);
730        iter.seek(full_key)
731            .instrument_await("iter_seek".verbose())
732            .await?;
733    } else {
734        iter.rewind().instrument_await("rewind".verbose()).await?;
735    };
736
737    let end_key = if task_config.key_range.right.is_empty() {
738        FullKey::default()
739    } else {
740        FullKey::decode(&task_config.key_range.right).to_vec()
741    };
742    let max_key = end_key.to_ref();
743
744    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
745    let mut local_stats = StoreLocalStatistic::default();
746
747    let mut table_stats_drop = TableStatsMap::default();
749    let mut last_table_stats = TableStats::default();
750    let mut last_table_id = None;
751    let mut compaction_statistics = CompactionStatistics::default();
752    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
754    let schemas: HashMap<TableId, HashSet<i32>> = task_config
755        .table_schemas
756        .iter()
757        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
758        .collect();
759    while iter.is_valid() {
760        let iter_key = iter.key();
761        compaction_statistics.iter_total_key_counts += 1;
762
763        let is_new_user_key = full_key_tracker.observe(iter.key());
764        let mut drop = false;
765
766        let value = iter.value();
768        let ValueMeta {
769            object_id,
770            block_id,
771        } = iter.value_meta();
772        if is_new_user_key {
773            if !max_key.is_empty() && iter_key >= max_key {
774                break;
775            }
776            if value.is_delete() {
777                local_stats.skip_delete_key_count += 1;
778            }
779        } else {
780            local_stats.skip_multi_version_key_count += 1;
781        }
782
783        if last_table_id != Some(iter_key.user_key.table_id) {
784            if let Some(last_table_id) = last_table_id.take() {
785                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
786            }
787            last_table_id = Some(iter_key.user_key.table_id);
788        }
789
790        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
797            || (!task_config.retain_multiple_version && !is_new_user_key)
798        {
799            drop = true;
800        }
801
802        if !drop && compaction_filter.should_delete(iter_key) {
803            drop = true;
804        }
805
806        if drop {
807            compaction_statistics.iter_drop_key_counts += 1;
808
809            let should_count = match task_config.stats_target_table_ids.as_ref() {
810                Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
811                None => true,
812            };
813            if should_count {
814                last_table_stats.total_key_count -= 1;
815                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
816                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
817            }
818            iter.next()
819                .instrument_await("iter_next_in_drop".verbose())
820                .await?;
821            continue;
822        }
823
824        let check_table_id = iter_key.user_key.table_id;
826        let mut is_value_rewritten = false;
827        if let HummockValue::Put(v) = value
828            && let Some(object_id) = object_id
829            && let Some(block_id) = block_id
830            && !skip_schema_check
831                .get(&object_id)
832                .map(|prev_block_id| {
833                    assert!(*prev_block_id <= block_id);
834                    *prev_block_id == block_id
835                })
836                .unwrap_or(false)
837            && let Some(schema) = schemas.get(&check_table_id)
838        {
839            let value_size = v.len();
840            match try_drop_invalid_columns(v, schema) {
841                None => {
842                    if !task_config.disable_drop_column_optimization {
843                        skip_schema_check.insert(object_id, block_id);
846                    }
847                }
848                Some(new_value) => {
849                    is_value_rewritten = true;
850                    let new_put = HummockValue::put(new_value.as_slice());
851                    sst_builder
852                        .add_full_key(iter_key, new_put, is_new_user_key)
853                        .instrument_await("add_rewritten_full_key".verbose())
854                        .await?;
855                    let value_size_change = value_size as i64 - new_value.len() as i64;
856                    assert!(value_size_change >= 0);
857                    last_table_stats.total_value_size -= value_size_change;
858                }
859            }
860        }
861
862        if !is_value_rewritten {
863            sst_builder
865                .add_full_key(iter_key, value, is_new_user_key)
866                .instrument_await("add_full_key".verbose())
867                .await?;
868        }
869
870        iter.next().instrument_await("iter_next".verbose()).await?;
871    }
872
873    if let Some(last_table_id) = last_table_id.take() {
874        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
875    }
876    iter.collect_local_statistic(&mut local_stats);
877    add_table_stats_map(
878        &mut table_stats_drop,
879        &local_stats.skipped_by_watermark_table_stats,
880    );
881    local_stats.report_compactor(compactor_metrics.as_ref());
882    compaction_statistics.delta_drop_stat = table_stats_drop;
883
884    Ok(compaction_statistics)
885}
886
887#[cfg(test)]
888pub mod tests {
889    use risingwave_hummock_sdk::can_concat;
890
891    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
892    use crate::hummock::iterator::test_utils::mock_sstable_store;
893    use crate::hummock::test_utils::{
894        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
895    };
896    use crate::hummock::value::HummockValue;
897
898    #[tokio::test]
899    async fn test_partition_overlapping_level() {
900        const TEST_KEYS_COUNT: usize = 10;
901        let sstable_store = mock_sstable_store().await;
902        let mut table_infos = vec![];
903        for object_id in 0..10 {
904            let start_index = object_id * TEST_KEYS_COUNT;
905            let end_index = start_index + 2 * TEST_KEYS_COUNT;
906            let table_info = gen_test_sstable_info(
907                default_builder_opt_for_test(),
908                object_id as u64,
909                (start_index..end_index)
910                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
911                sstable_store.clone(),
912            )
913            .await;
914            table_infos.push(table_info);
915        }
916        let table_infos = partition_overlapping_sstable_infos(table_infos);
917        assert_eq!(table_infos.len(), 2);
918        for ssts in table_infos {
919            assert!(can_concat(&ssts));
920        }
921    }
922}