risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
66    SstableBuilderOptions, SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        let kv_count = task
93            .input_ssts
94            .iter()
95            .flat_map(|level| level.table_infos.iter())
96            .map(|sst| sst.total_key_count)
97            .sum::<u64>() as usize;
98        let use_block_based_filter =
99            BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
100
101        let key_range = KeyRange {
102            left: task.splits[split_index].left.clone(),
103            right: task.splits[split_index].right.clone(),
104            right_exclusive: true,
105        };
106
107        let compactor = Compactor::new(
108            context.clone(),
109            options,
110            TaskConfig {
111                key_range: key_range.clone(),
112                cache_policy: CachePolicy::NotFill,
113                gc_delete_keys: task.gc_delete_keys,
114                retain_multiple_version: false,
115                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
116                task_type: task.task_type,
117                use_block_based_filter,
118                table_vnode_partition: task.table_vnode_partition.clone(),
119                table_schemas: task
120                    .table_schemas
121                    .iter()
122                    .map(|(k, v)| (*k, v.clone()))
123                    .collect(),
124                disable_drop_column_optimization: false,
125            },
126            object_id_getter,
127        );
128
129        Self {
130            compactor,
131            compact_task: task,
132            sstable_store: context.sstable_store,
133            key_range,
134            split_index,
135        }
136    }
137
138    pub async fn run(
139        &self,
140        compaction_filter: impl CompactionFilter,
141        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
142        task_progress: Arc<TaskProgress>,
143    ) -> HummockResult<CompactOutput> {
144        let iter =
145            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
146        let (ssts, compaction_stat) = self
147            .compactor
148            .compact_key_range(
149                iter,
150                compaction_filter,
151                compaction_catalog_agent_ref,
152                Some(task_progress),
153                Some(self.compact_task.task_id),
154                Some(self.split_index),
155            )
156            .await?;
157        Ok((self.split_index, ssts, compaction_stat))
158    }
159
160    /// Build the merge iterator based on the given input ssts.
161    fn build_sst_iter(
162        &self,
163        task_progress: Arc<TaskProgress>,
164        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
165    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
166        let compactor_iter_max_io_retry_times = self
167            .compactor
168            .context
169            .storage_opts
170            .compactor_iter_max_io_retry_times;
171        let mut table_iters = Vec::new();
172        for level in &self.compact_task.input_ssts {
173            if level.table_infos.is_empty() {
174                continue;
175            }
176
177            let tables = level
178                .table_infos
179                .iter()
180                .filter(|table_info| {
181                    let table_ids = &table_info.table_ids;
182                    let exist_table = table_ids
183                        .iter()
184                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
185
186                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
187                })
188                .cloned()
189                .collect_vec();
190            // Do not need to filter the table because manager has done it.
191            if level.level_type == LevelType::Nonoverlapping {
192                debug_assert!(can_concat(&level.table_infos));
193                table_iters.push(ConcatSstableIterator::new(
194                    self.compact_task.existing_table_ids.clone(),
195                    tables,
196                    self.compactor.task_config.key_range.clone(),
197                    self.sstable_store.clone(),
198                    task_progress.clone(),
199                    compactor_iter_max_io_retry_times,
200                ));
201            } else if tables.len()
202                > self
203                    .compactor
204                    .context
205                    .storage_opts
206                    .compactor_max_overlap_sst_count
207            {
208                let sst_groups = partition_overlapping_sstable_infos(tables);
209                tracing::warn!(
210                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
211                    level.table_infos.len(),
212                    sst_groups.len()
213                );
214                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
215                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
216                    assert!(
217                        full_key_can_concat(&table_infos),
218                        "sst_group idx {:?} table_infos: {:?}",
219                        idx,
220                        table_infos
221                    );
222                    table_iters.push(ConcatSstableIterator::new(
223                        self.compact_task.existing_table_ids.clone(),
224                        table_infos,
225                        self.compactor.task_config.key_range.clone(),
226                        self.sstable_store.clone(),
227                        task_progress.clone(),
228                        compactor_iter_max_io_retry_times,
229                    ));
230                }
231            } else {
232                for table_info in tables {
233                    table_iters.push(ConcatSstableIterator::new(
234                        self.compact_task.existing_table_ids.clone(),
235                        vec![table_info],
236                        self.compactor.task_config.key_range.clone(),
237                        self.sstable_store.clone(),
238                        task_progress.clone(),
239                        compactor_iter_max_io_retry_times,
240                    ));
241                }
242            }
243        }
244
245        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
246        // in https://github.com/risingwavelabs/risingwave/issues/13148
247        let combine_iter = {
248            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
249                MonitoredCompactorIterator::new(
250                    MergeIterator::for_compactor(table_iters),
251                    task_progress,
252                ),
253                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
254                    self.compact_task.pk_prefix_table_watermarks.clone(),
255                ),
256            );
257
258            NonPkPrefixSkipWatermarkIterator::new(
259                skip_watermark_iter,
260                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
261                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
262                    compaction_catalog_agent_ref,
263                ),
264            )
265        };
266
267        Ok(combine_iter)
268    }
269}
270
271pub fn partition_overlapping_sstable_infos(
272    mut origin_infos: Vec<SstableInfo>,
273) -> Vec<Vec<SstableInfo>> {
274    pub struct SstableGroup {
275        ssts: Vec<SstableInfo>,
276        max_right_bound: Bytes,
277    }
278
279    impl PartialEq for SstableGroup {
280        fn eq(&self, other: &SstableGroup) -> bool {
281            self.max_right_bound == other.max_right_bound
282        }
283    }
284    impl PartialOrd for SstableGroup {
285        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
286            Some(self.cmp(other))
287        }
288    }
289    impl Eq for SstableGroup {}
290    impl Ord for SstableGroup {
291        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
292            // Pick group with the smallest right bound for every new sstable.
293            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
294        }
295    }
296    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
297    origin_infos.sort_by(|a, b| {
298        let x = &a.key_range;
299        let y = &b.key_range;
300        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
301    });
302    for sst in origin_infos {
303        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
304        if let Some(mut prev_group) = groups.peek_mut()
305            && KeyComparator::encoded_full_key_less_than(
306                &prev_group.max_right_bound,
307                &sst.key_range.left,
308            )
309        {
310            prev_group.max_right_bound.clone_from(&sst.key_range.right);
311            prev_group.ssts.push(sst);
312            continue;
313        }
314        groups.push(SstableGroup {
315            max_right_bound: sst.key_range.right.clone(),
316            ssts: vec![sst],
317        });
318    }
319    assert!(!groups.is_empty());
320    groups.into_iter().map(|group| group.ssts).collect_vec()
321}
322
323/// Handles a compaction task and reports its status to hummock manager.
324/// Always return `Ok` and let hummock manager handle errors.
325pub async fn compact_with_agent(
326    compactor_context: CompactorContext,
327    mut compact_task: CompactTask,
328    mut shutdown_rx: Receiver<()>,
329    object_id_getter: Arc<dyn GetObjectId>,
330    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
331) -> (
332    (
333        CompactTask,
334        HashMap<TableId, TableStats>,
335        HashMap<HummockSstableObjectId, u64>,
336    ),
337    Option<MemoryTracker>,
338) {
339    let context = compactor_context.clone();
340    let group_label = compact_task.compaction_group_id.to_string();
341    metrics_report_for_task(&compact_task, &context);
342
343    let timer = context
344        .compactor_metrics
345        .compact_task_duration
346        .with_label_values(&[
347            &group_label,
348            &compact_task.input_ssts[0].level_idx.to_string(),
349        ])
350        .start_timer();
351
352    let multi_filter = build_multi_compaction_filter(&compact_task);
353    let mut task_status = TaskStatus::Success;
354    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
355
356    if let Err(e) =
357        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
358    {
359        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
360        task_status = TaskStatus::ExecuteFailed;
361        return (
362            compact_done(compact_task, context.clone(), vec![], task_status),
363            None,
364        );
365    }
366
367    let compact_task_statistics = statistics_compact_task(&compact_task);
368    // Number of splits (key ranges) is equal to number of compaction tasks
369    let parallelism = compact_task.splits.len();
370    assert_ne!(parallelism, 0, "splits cannot be empty");
371    let mut output_ssts = Vec::with_capacity(parallelism);
372    let mut compaction_futures = vec![];
373    let mut abort_handles = vec![];
374    let task_progress_guard =
375        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
376
377    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
378
379    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
380        &compact_task,
381        (context.storage_opts.block_size_kb as u64) * (1 << 10),
382        context
383            .storage_opts
384            .object_store_config
385            .s3
386            .recv_buffer_size
387            .unwrap_or(6 * 1024 * 1024) as u64,
388        capacity as u64,
389    ) * compact_task.splits.len() as u64;
390
391    tracing::info!(
392        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
393        compact_task.task_id,
394        compact_task_statistics,
395        compact_task.compression_algorithm,
396        parallelism,
397        task_memory_capacity_with_parallelism,
398        optimize_by_copy_block,
399        compact_task_to_string(&compact_task),
400    );
401
402    // If the task does not have enough memory, it should cancel the task and let the meta
403    // reschedule it, so that it does not occupy the compactor's resources.
404    let memory_detector = context
405        .memory_limiter
406        .try_require_memory(task_memory_capacity_with_parallelism);
407    if memory_detector.is_none() {
408        tracing::warn!(
409            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
410            compact_task.task_id,
411            task_memory_capacity_with_parallelism,
412            context.memory_limiter.get_memory_usage(),
413            context.memory_limiter.quota()
414        );
415        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
416        return (
417            compact_done(compact_task, context.clone(), output_ssts, task_status),
418            memory_detector,
419        );
420    }
421
422    context.compactor_metrics.compact_task_pending_num.inc();
423    context
424        .compactor_metrics
425        .compact_task_pending_parallelism
426        .add(parallelism as _);
427    let _release_metrics_guard =
428        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
429            context.compactor_metrics.compact_task_pending_num.dec();
430            context
431                .compactor_metrics
432                .compact_task_pending_parallelism
433                .sub(parallelism as _);
434        });
435
436    if optimize_by_copy_block {
437        let runner = fast_compactor_runner::CompactorRunner::new(
438            context.clone(),
439            compact_task.clone(),
440            compaction_catalog_agent_ref.clone(),
441            object_id_getter.clone(),
442            task_progress_guard.progress.clone(),
443            multi_filter,
444        );
445
446        tokio::select! {
447            _ = &mut shutdown_rx => {
448                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
449                task_status = TaskStatus::ManualCanceled;
450            },
451
452            ret = runner.run() => {
453                match ret {
454                    Ok((ssts, statistics)) => {
455                        output_ssts.push((0, ssts, statistics));
456                    }
457                    Err(e) => {
458                        task_status = TaskStatus::ExecuteFailed;
459                        tracing::warn!(
460                            error = %e.as_report(),
461                            "Compaction task {} failed with error",
462                            compact_task.task_id,
463                        );
464                    }
465                }
466            }
467        }
468
469        // After a compaction is done, mutate the compaction task.
470        let (compact_task, table_stats, object_timestamps) =
471            compact_done(compact_task, context.clone(), output_ssts, task_status);
472        let cost_time = timer.stop_and_record() * 1000.0;
473        tracing::info!(
474            "Finished fast compaction task in {:?}ms: {}",
475            cost_time,
476            compact_task_to_string(&compact_task)
477        );
478        return (
479            (compact_task, table_stats, object_timestamps),
480            memory_detector,
481        );
482    }
483    for (split_index, _) in compact_task.splits.iter().enumerate() {
484        let filter = multi_filter.clone();
485        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
486        let compactor_runner = CompactorRunner::new(
487            split_index,
488            compactor_context.clone(),
489            compact_task.clone(),
490            object_id_getter.clone(),
491        );
492        let task_progress = task_progress_guard.progress.clone();
493        let runner = async move {
494            compactor_runner
495                .run(filter, compaction_catalog_agent_ref, task_progress)
496                .await
497        };
498        let traced = match context.await_tree_reg.as_ref() {
499            None => runner.right_future(),
500            Some(await_tree_reg) => await_tree_reg
501                .register(
502                    await_tree_key::CompactRunner {
503                        task_id: compact_task.task_id,
504                        split_index,
505                    },
506                    format!(
507                        "Compaction Task {} Split {} ",
508                        compact_task.task_id, split_index
509                    ),
510                )
511                .instrument(runner)
512                .left_future(),
513        };
514        let handle = tokio::spawn(traced);
515        abort_handles.push(handle.abort_handle());
516        compaction_futures.push(handle);
517    }
518
519    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
520    loop {
521        tokio::select! {
522            _ = &mut shutdown_rx => {
523                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
524                task_status = TaskStatus::ManualCanceled;
525                break;
526            }
527            future_result = buffered.next() => {
528                match future_result {
529                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
530                        output_ssts.push((split_index, ssts, compact_stat));
531                    }
532                    Some(Ok(Err(e))) => {
533                        task_status = TaskStatus::ExecuteFailed;
534                        tracing::warn!(
535                            error = %e.as_report(),
536                            "Compaction task {} failed with error",
537                            compact_task.task_id,
538                        );
539                        break;
540                    }
541                    Some(Err(e)) => {
542                        task_status = TaskStatus::JoinHandleFailed;
543                        tracing::warn!(
544                            error = %e.as_report(),
545                            "Compaction task {} failed with join handle error",
546                            compact_task.task_id,
547                        );
548                        break;
549                    }
550                    None => break,
551                }
552            }
553        }
554    }
555
556    if task_status != TaskStatus::Success {
557        for abort_handle in abort_handles {
558            abort_handle.abort();
559        }
560        output_ssts.clear();
561    }
562    // Sort by split/key range index.
563    if !output_ssts.is_empty() {
564        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
565    }
566
567    // After a compaction is done, mutate the compaction task.
568    let (compact_task, table_stats, object_timestamps) =
569        compact_done(compact_task, context.clone(), output_ssts, task_status);
570    let cost_time = timer.stop_and_record() * 1000.0;
571    tracing::info!(
572        "Finished compaction task in {:?}ms: {}",
573        cost_time,
574        compact_task_output_to_string(&compact_task)
575    );
576    (
577        (compact_task, table_stats, object_timestamps),
578        memory_detector,
579    )
580}
581
582/// Handles a compaction task and reports its status to hummock manager.
583/// Always return `Ok` and let hummock manager handle errors.
584pub async fn compact(
585    compactor_context: CompactorContext,
586    mut compact_task: CompactTask,
587    shutdown_rx: Receiver<()>,
588    object_id_getter: Arc<dyn GetObjectId>,
589    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
590) -> (
591    (
592        CompactTask,
593        HashMap<TableId, TableStats>,
594        HashMap<HummockSstableObjectId, u64>,
595    ),
596    Option<MemoryTracker>,
597) {
598    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
599    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
600        .acquire(table_ids_to_be_compacted.clone())
601        .await
602    {
603        Ok(compaction_catalog_agent_ref) => {
604            let acquire_table_ids: HashSet<StateTableId> =
605                compaction_catalog_agent_ref.table_ids().collect();
606            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
607                let diff = table_ids_to_be_compacted
608                    .into_iter()
609                    .collect::<HashSet<_>>()
610                    .symmetric_difference(&acquire_table_ids)
611                    .cloned()
612                    .collect::<Vec<_>>();
613                tracing::warn!(
614                    dif= ?diff,
615                    "Some table ids are not acquired."
616                );
617                return (
618                    compact_done(
619                        compact_task,
620                        compactor_context.clone(),
621                        vec![],
622                        TaskStatus::ExecuteFailed,
623                    ),
624                    None,
625                );
626            }
627
628            compaction_catalog_agent_ref
629        }
630        Err(e) => {
631            tracing::warn!(
632                error = %e.as_report(),
633                "Failed to acquire compaction catalog agent"
634            );
635            return (
636                compact_done(
637                    compact_task,
638                    compactor_context.clone(),
639                    vec![],
640                    TaskStatus::ExecuteFailed,
641                ),
642                None,
643            );
644        }
645    };
646
647    // rewrite compact_task watermarks
648    {
649        compact_task
650            .pk_prefix_table_watermarks
651            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
652
653        compact_task
654            .non_pk_prefix_table_watermarks
655            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
656    }
657
658    compact_with_agent(
659        compactor_context,
660        compact_task,
661        shutdown_rx,
662        object_id_getter,
663        compaction_catalog_agent_ref,
664    )
665    .await
666}
667
668/// Fills in the compact task and tries to report the task result to meta node.
669pub(crate) fn compact_done(
670    mut compact_task: CompactTask,
671    context: CompactorContext,
672    output_ssts: Vec<CompactOutput>,
673    task_status: TaskStatus,
674) -> (
675    CompactTask,
676    HashMap<TableId, TableStats>,
677    HashMap<HummockSstableObjectId, u64>,
678) {
679    let mut table_stats_map = TableStatsMap::default();
680    let mut object_timestamps = HashMap::default();
681    compact_task.task_status = task_status;
682    compact_task
683        .sorted_output_ssts
684        .reserve(compact_task.splits.len());
685    let mut compaction_write_bytes = 0;
686    for (
687        _,
688        ssts,
689        CompactionStatistics {
690            delta_drop_stat, ..
691        },
692    ) in output_ssts
693    {
694        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
695        for sst_info in ssts {
696            compaction_write_bytes += sst_info.file_size();
697            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
698            compact_task.sorted_output_ssts.push(sst_info.sst_info);
699        }
700    }
701
702    let group_label = compact_task.compaction_group_id.to_string();
703    let level_label = compact_task.target_level.to_string();
704    context
705        .compactor_metrics
706        .compact_write_bytes
707        .with_label_values(&[&group_label, &level_label])
708        .inc_by(compaction_write_bytes);
709    context
710        .compactor_metrics
711        .compact_write_sstn
712        .with_label_values(&[&group_label, &level_label])
713        .inc_by(compact_task.sorted_output_ssts.len() as u64);
714
715    (compact_task, table_stats_map, object_timestamps)
716}
717
718pub async fn compact_and_build_sst<F>(
719    sst_builder: &mut CapacitySplitTableBuilder<F>,
720    task_config: &TaskConfig,
721    compactor_metrics: Arc<CompactorMetrics>,
722    mut iter: impl HummockIterator<Direction = Forward>,
723    mut compaction_filter: impl CompactionFilter,
724) -> HummockResult<CompactionStatistics>
725where
726    F: TableBuilderFactory,
727{
728    if !task_config.key_range.left.is_empty() {
729        let full_key = FullKey::decode(&task_config.key_range.left);
730        iter.seek(full_key)
731            .instrument_await("iter_seek".verbose())
732            .await?;
733    } else {
734        iter.rewind().instrument_await("rewind".verbose()).await?;
735    };
736
737    let end_key = if task_config.key_range.right.is_empty() {
738        FullKey::default()
739    } else {
740        FullKey::decode(&task_config.key_range.right).to_vec()
741    };
742    let max_key = end_key.to_ref();
743
744    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
745    let mut local_stats = StoreLocalStatistic::default();
746
747    // Keep table stats changes due to dropping KV.
748    let mut table_stats_drop = TableStatsMap::default();
749    let mut last_table_stats = TableStats::default();
750    let mut last_table_id = None;
751    let mut compaction_statistics = CompactionStatistics::default();
752    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
753    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
754    let schemas: HashMap<TableId, HashSet<i32>> = task_config
755        .table_schemas
756        .iter()
757        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
758        .collect();
759    while iter.is_valid() {
760        let iter_key = iter.key();
761        compaction_statistics.iter_total_key_counts += 1;
762
763        let is_new_user_key = full_key_tracker.observe(iter.key());
764        let mut drop = false;
765
766        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
767        let value = iter.value();
768        let ValueMeta {
769            object_id,
770            block_id,
771        } = iter.value_meta();
772        if is_new_user_key {
773            if !max_key.is_empty() && iter_key >= max_key {
774                break;
775            }
776            if value.is_delete() {
777                local_stats.skip_delete_key_count += 1;
778            }
779        } else {
780            local_stats.skip_multi_version_key_count += 1;
781        }
782
783        if last_table_id != Some(iter_key.user_key.table_id) {
784            if let Some(last_table_id) = last_table_id.take() {
785                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
786            }
787            last_table_id = Some(iter_key.user_key.table_id);
788        }
789
790        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
791        // In our design, frontend avoid to access keys which had be deleted, so we don't
792        // need to consider the epoch when the compaction_filter match (it
793        // means that mv had drop)
794        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
795        // Do not assume that "the epoch of keys behind must be smaller than the current key."
796        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
797            || (!task_config.retain_multiple_version && !is_new_user_key)
798        {
799            drop = true;
800        }
801
802        if !drop && compaction_filter.should_delete(iter_key) {
803            drop = true;
804        }
805
806        if drop {
807            compaction_statistics.iter_drop_key_counts += 1;
808
809            let should_count = match task_config.stats_target_table_ids.as_ref() {
810                Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
811                None => true,
812            };
813            if should_count {
814                last_table_stats.total_key_count -= 1;
815                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
816                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
817            }
818            iter.next()
819                .instrument_await("iter_next_in_drop".verbose())
820                .await?;
821            continue;
822        }
823
824        // May drop stale columns
825        let check_table_id = iter_key.user_key.table_id;
826        let mut is_value_rewritten = false;
827        if let HummockValue::Put(v) = value
828            && let Some(object_id) = object_id
829            && let Some(block_id) = block_id
830            && !skip_schema_check
831                .get(&object_id)
832                .map(|prev_block_id| {
833                    assert!(*prev_block_id <= block_id);
834                    *prev_block_id == block_id
835                })
836                .unwrap_or(false)
837            && let Some(schema) = schemas.get(&check_table_id)
838        {
839            let value_size = v.len();
840            match try_drop_invalid_columns(v, schema) {
841                None => {
842                    if !task_config.disable_drop_column_optimization {
843                        // Under the assumption that all values in the same (object, block) group should share the same schema,
844                        // if one value drops no columns during a compaction, no need to check other values in the same group.
845                        skip_schema_check.insert(object_id, block_id);
846                    }
847                }
848                Some(new_value) => {
849                    is_value_rewritten = true;
850                    let new_put = HummockValue::put(new_value.as_slice());
851                    sst_builder
852                        .add_full_key(iter_key, new_put, is_new_user_key)
853                        .instrument_await("add_rewritten_full_key".verbose())
854                        .await?;
855                    let value_size_change = value_size as i64 - new_value.len() as i64;
856                    assert!(value_size_change >= 0);
857                    last_table_stats.total_value_size -= value_size_change;
858                }
859            }
860        }
861
862        if !is_value_rewritten {
863            // Don't allow two SSTs to share same user key
864            sst_builder
865                .add_full_key(iter_key, value, is_new_user_key)
866                .instrument_await("add_full_key".verbose())
867                .await?;
868        }
869
870        iter.next().instrument_await("iter_next".verbose()).await?;
871    }
872
873    if let Some(last_table_id) = last_table_id.take() {
874        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
875    }
876    iter.collect_local_statistic(&mut local_stats);
877    add_table_stats_map(
878        &mut table_stats_drop,
879        &local_stats.skipped_by_watermark_table_stats,
880    );
881    local_stats.report_compactor(compactor_metrics.as_ref());
882    compaction_statistics.delta_drop_stat = table_stats_drop;
883
884    Ok(compaction_statistics)
885}
886
887#[cfg(test)]
888pub mod tests {
889    use risingwave_hummock_sdk::can_concat;
890
891    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
892    use crate::hummock::iterator::test_utils::mock_sstable_store;
893    use crate::hummock::test_utils::{
894        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
895    };
896    use crate::hummock::value::HummockValue;
897
898    #[tokio::test]
899    async fn test_partition_overlapping_level() {
900        const TEST_KEYS_COUNT: usize = 10;
901        let sstable_store = mock_sstable_store().await;
902        let mut table_infos = vec![];
903        for object_id in 0..10 {
904            let start_index = object_id * TEST_KEYS_COUNT;
905            let end_index = start_index + 2 * TEST_KEYS_COUNT;
906            let table_info = gen_test_sstable_info(
907                default_builder_opt_for_test(),
908                object_id as u64,
909                (start_index..end_index)
910                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
911                sstable_store.clone(),
912            )
913            .await;
914            table_infos.push(table_info);
915        }
916        let table_infos = partition_overlapping_sstable_infos(table_infos);
917        assert_eq!(table_infos.len(), 2);
918        for ssts in table_infos {
919            assert!(can_concat(&ssts));
920        }
921    }
922}