risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34    full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47    metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53    fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58    ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64    BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65    SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69    compact_task: CompactTask,
70    compactor: Compactor,
71    sstable_store: SstableStoreRef,
72    key_range: KeyRange,
73    split_index: usize,
74}
75
76impl CompactorRunner {
77    pub fn new(
78        split_index: usize,
79        context: CompactorContext,
80        task: CompactTask,
81        object_id_getter: Arc<dyn GetObjectId>,
82    ) -> Self {
83        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84        options.compression_algorithm = match task.compression_algorithm {
85            0 => CompressionAlgorithm::None,
86            1 => CompressionAlgorithm::Lz4,
87            _ => CompressionAlgorithm::Zstd,
88        };
89
90        options.capacity = estimate_task_output_capacity(context.clone(), &task);
91        let kv_count = task
92            .input_ssts
93            .iter()
94            .flat_map(|level| level.table_infos.iter())
95            .map(|sst| sst.total_key_count)
96            .sum::<u64>() as usize;
97        let use_block_based_filter =
98            BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100        let key_range = KeyRange {
101            left: task.splits[split_index].left.clone(),
102            right: task.splits[split_index].right.clone(),
103            right_exclusive: true,
104        };
105
106        let compactor = Compactor::new(
107            context.clone(),
108            options,
109            TaskConfig {
110                key_range: key_range.clone(),
111                cache_policy: CachePolicy::NotFill,
112                gc_delete_keys: task.gc_delete_keys,
113                retain_multiple_version: false,
114                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115                task_type: task.task_type,
116                use_block_based_filter,
117                table_vnode_partition: task.table_vnode_partition.clone(),
118                table_schemas: task
119                    .table_schemas
120                    .iter()
121                    .map(|(k, v)| (*k, v.clone()))
122                    .collect(),
123                disable_drop_column_optimization: false,
124            },
125            object_id_getter,
126        );
127
128        Self {
129            compactor,
130            compact_task: task,
131            sstable_store: context.sstable_store,
132            key_range,
133            split_index,
134        }
135    }
136
137    pub async fn run(
138        &self,
139        compaction_filter: impl CompactionFilter,
140        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141        task_progress: Arc<TaskProgress>,
142    ) -> HummockResult<CompactOutput> {
143        let iter =
144            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145        let (ssts, compaction_stat) = self
146            .compactor
147            .compact_key_range(
148                iter,
149                compaction_filter,
150                compaction_catalog_agent_ref,
151                Some(task_progress),
152                Some(self.compact_task.task_id),
153                Some(self.split_index),
154            )
155            .await?;
156        Ok((self.split_index, ssts, compaction_stat))
157    }
158
159    /// Build the merge iterator based on the given input ssts.
160    fn build_sst_iter(
161        &self,
162        task_progress: Arc<TaskProgress>,
163        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165        let compactor_iter_max_io_retry_times = self
166            .compactor
167            .context
168            .storage_opts
169            .compactor_iter_max_io_retry_times;
170        let mut table_iters = Vec::new();
171        for level in &self.compact_task.input_ssts {
172            if level.table_infos.is_empty() {
173                continue;
174            }
175
176            let tables = level
177                .table_infos
178                .iter()
179                .filter(|table_info| {
180                    let table_ids = &table_info.table_ids;
181                    let exist_table = table_ids
182                        .iter()
183                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186                })
187                .cloned()
188                .collect_vec();
189            // Do not need to filter the table because manager has done it.
190            if level.level_type == LevelType::Nonoverlapping {
191                debug_assert!(can_concat(&level.table_infos));
192                table_iters.push(ConcatSstableIterator::new(
193                    self.compact_task.existing_table_ids.clone(),
194                    tables,
195                    self.compactor.task_config.key_range.clone(),
196                    self.sstable_store.clone(),
197                    task_progress.clone(),
198                    compactor_iter_max_io_retry_times,
199                ));
200            } else if tables.len()
201                > self
202                    .compactor
203                    .context
204                    .storage_opts
205                    .compactor_max_overlap_sst_count
206            {
207                let sst_groups = partition_overlapping_sstable_infos(tables);
208                tracing::warn!(
209                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210                    level.table_infos.len(),
211                    sst_groups.len()
212                );
213                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
215                    assert!(
216                        full_key_can_concat(&table_infos),
217                        "sst_group idx {:?} table_infos: {:?}",
218                        idx,
219                        table_infos
220                    );
221                    table_iters.push(ConcatSstableIterator::new(
222                        self.compact_task.existing_table_ids.clone(),
223                        table_infos,
224                        self.compactor.task_config.key_range.clone(),
225                        self.sstable_store.clone(),
226                        task_progress.clone(),
227                        compactor_iter_max_io_retry_times,
228                    ));
229                }
230            } else {
231                for table_info in tables {
232                    table_iters.push(ConcatSstableIterator::new(
233                        self.compact_task.existing_table_ids.clone(),
234                        vec![table_info],
235                        self.compactor.task_config.key_range.clone(),
236                        self.sstable_store.clone(),
237                        task_progress.clone(),
238                        compactor_iter_max_io_retry_times,
239                    ));
240                }
241            }
242        }
243
244        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
245        // in https://github.com/risingwavelabs/risingwave/issues/13148
246        let combine_iter = {
247            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248                MonitoredCompactorIterator::new(
249                    MergeIterator::for_compactor(table_iters),
250                    task_progress.clone(),
251                ),
252                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253                    self.compact_task.pk_prefix_table_watermarks.clone(),
254                ),
255            );
256
257            NonPkPrefixSkipWatermarkIterator::new(
258                skip_watermark_iter,
259                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
261                    compaction_catalog_agent_ref,
262                ),
263            )
264        };
265
266        Ok(combine_iter)
267    }
268}
269
270pub fn partition_overlapping_sstable_infos(
271    mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273    pub struct SstableGroup {
274        ssts: Vec<SstableInfo>,
275        max_right_bound: Bytes,
276    }
277
278    impl PartialEq for SstableGroup {
279        fn eq(&self, other: &SstableGroup) -> bool {
280            self.max_right_bound == other.max_right_bound
281        }
282    }
283    impl PartialOrd for SstableGroup {
284        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285            Some(self.cmp(other))
286        }
287    }
288    impl Eq for SstableGroup {}
289    impl Ord for SstableGroup {
290        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291            // Pick group with the smallest right bound for every new sstable.
292            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293        }
294    }
295    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296    origin_infos.sort_by(|a, b| {
297        let x = &a.key_range;
298        let y = &b.key_range;
299        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300    });
301    for sst in origin_infos {
302        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
303        if let Some(mut prev_group) = groups.peek_mut()
304            && KeyComparator::encoded_full_key_less_than(
305                &prev_group.max_right_bound,
306                &sst.key_range.left,
307            )
308        {
309            prev_group.max_right_bound.clone_from(&sst.key_range.right);
310            prev_group.ssts.push(sst);
311            continue;
312        }
313        groups.push(SstableGroup {
314            max_right_bound: sst.key_range.right.clone(),
315            ssts: vec![sst],
316        });
317    }
318    assert!(!groups.is_empty());
319    groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322/// Handles a compaction task and reports its status to hummock manager.
323/// Always return `Ok` and let hummock manager handle errors.
324pub async fn compact_with_agent(
325    compactor_context: CompactorContext,
326    mut compact_task: CompactTask,
327    mut shutdown_rx: Receiver<()>,
328    object_id_getter: Arc<dyn GetObjectId>,
329    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331    (
332        CompactTask,
333        HashMap<u32, TableStats>,
334        HashMap<HummockSstableObjectId, u64>,
335    ),
336    Option<MemoryTracker>,
337) {
338    let context = compactor_context.clone();
339    let group_label = compact_task.compaction_group_id.to_string();
340    metrics_report_for_task(&compact_task, &context);
341
342    let timer = context
343        .compactor_metrics
344        .compact_task_duration
345        .with_label_values(&[
346            &group_label,
347            &compact_task.input_ssts[0].level_idx.to_string(),
348        ])
349        .start_timer();
350
351    let multi_filter = build_multi_compaction_filter(&compact_task);
352    let mut task_status = TaskStatus::Success;
353    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355    if let Err(e) =
356        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357    {
358        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359        task_status = TaskStatus::ExecuteFailed;
360        return (
361            compact_done(compact_task, context.clone(), vec![], task_status),
362            None,
363        );
364    }
365
366    let compact_task_statistics = statistics_compact_task(&compact_task);
367    // Number of splits (key ranges) is equal to number of compaction tasks
368    let parallelism = compact_task.splits.len();
369    assert_ne!(parallelism, 0, "splits cannot be empty");
370    let mut output_ssts = Vec::with_capacity(parallelism);
371    let mut compaction_futures = vec![];
372    let mut abort_handles = vec![];
373    let task_progress_guard =
374        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379        &compact_task,
380        (context.storage_opts.block_size_kb as u64) * (1 << 10),
381        context
382            .storage_opts
383            .object_store_config
384            .s3
385            .recv_buffer_size
386            .unwrap_or(6 * 1024 * 1024) as u64,
387        capacity as u64,
388    ) * compact_task.splits.len() as u64;
389
390    tracing::info!(
391        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392        compact_task.task_id,
393        compact_task_statistics,
394        compact_task.compression_algorithm,
395        parallelism,
396        task_memory_capacity_with_parallelism,
397        optimize_by_copy_block,
398        compact_task_to_string(&compact_task),
399    );
400
401    // If the task does not have enough memory, it should cancel the task and let the meta
402    // reschedule it, so that it does not occupy the compactor's resources.
403    let memory_detector = context
404        .memory_limiter
405        .try_require_memory(task_memory_capacity_with_parallelism);
406    if memory_detector.is_none() {
407        tracing::warn!(
408            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
409            compact_task.task_id,
410            task_memory_capacity_with_parallelism,
411            context.memory_limiter.get_memory_usage(),
412            context.memory_limiter.quota()
413        );
414        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415        return (
416            compact_done(compact_task, context.clone(), output_ssts, task_status),
417            memory_detector,
418        );
419    }
420
421    context.compactor_metrics.compact_task_pending_num.inc();
422    context
423        .compactor_metrics
424        .compact_task_pending_parallelism
425        .add(parallelism as _);
426    let _release_metrics_guard =
427        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428            context.compactor_metrics.compact_task_pending_num.dec();
429            context
430                .compactor_metrics
431                .compact_task_pending_parallelism
432                .sub(parallelism as _);
433        });
434
435    if optimize_by_copy_block {
436        let runner = fast_compactor_runner::CompactorRunner::new(
437            context.clone(),
438            compact_task.clone(),
439            compaction_catalog_agent_ref.clone(),
440            object_id_getter.clone(),
441            task_progress_guard.progress.clone(),
442            multi_filter,
443        );
444
445        tokio::select! {
446            _ = &mut shutdown_rx => {
447                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
448                task_status = TaskStatus::ManualCanceled;
449            },
450
451            ret = runner.run() => {
452                match ret {
453                    Ok((ssts, statistics)) => {
454                        output_ssts.push((0, ssts, statistics));
455                    }
456                    Err(e) => {
457                        task_status = TaskStatus::ExecuteFailed;
458                        tracing::warn!(
459                            error = %e.as_report(),
460                            "Compaction task {} failed with error",
461                            compact_task.task_id,
462                        );
463                    }
464                }
465            }
466        }
467
468        // After a compaction is done, mutate the compaction task.
469        let (compact_task, table_stats, object_timestamps) =
470            compact_done(compact_task, context.clone(), output_ssts, task_status);
471        let cost_time = timer.stop_and_record() * 1000.0;
472        tracing::info!(
473            "Finished fast compaction task in {:?}ms: {}",
474            cost_time,
475            compact_task_to_string(&compact_task)
476        );
477        return (
478            (compact_task, table_stats, object_timestamps),
479            memory_detector,
480        );
481    }
482    for (split_index, _) in compact_task.splits.iter().enumerate() {
483        let filter = multi_filter.clone();
484        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
485        let compactor_runner = CompactorRunner::new(
486            split_index,
487            compactor_context.clone(),
488            compact_task.clone(),
489            object_id_getter.clone(),
490        );
491        let task_progress = task_progress_guard.progress.clone();
492        let runner = async move {
493            compactor_runner
494                .run(filter, compaction_catalog_agent_ref, task_progress)
495                .await
496        };
497        let traced = match context.await_tree_reg.as_ref() {
498            None => runner.right_future(),
499            Some(await_tree_reg) => await_tree_reg
500                .register(
501                    await_tree_key::CompactRunner {
502                        task_id: compact_task.task_id,
503                        split_index,
504                    },
505                    format!(
506                        "Compaction Task {} Split {} ",
507                        compact_task.task_id, split_index
508                    ),
509                )
510                .instrument(runner)
511                .left_future(),
512        };
513        let handle = tokio::spawn(traced);
514        abort_handles.push(handle.abort_handle());
515        compaction_futures.push(handle);
516    }
517
518    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
519    loop {
520        tokio::select! {
521            _ = &mut shutdown_rx => {
522                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
523                task_status = TaskStatus::ManualCanceled;
524                break;
525            }
526            future_result = buffered.next() => {
527                match future_result {
528                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
529                        output_ssts.push((split_index, ssts, compact_stat));
530                    }
531                    Some(Ok(Err(e))) => {
532                        task_status = TaskStatus::ExecuteFailed;
533                        tracing::warn!(
534                            error = %e.as_report(),
535                            "Compaction task {} failed with error",
536                            compact_task.task_id,
537                        );
538                        break;
539                    }
540                    Some(Err(e)) => {
541                        task_status = TaskStatus::JoinHandleFailed;
542                        tracing::warn!(
543                            error = %e.as_report(),
544                            "Compaction task {} failed with join handle error",
545                            compact_task.task_id,
546                        );
547                        break;
548                    }
549                    None => break,
550                }
551            }
552        }
553    }
554
555    if task_status != TaskStatus::Success {
556        for abort_handle in abort_handles {
557            abort_handle.abort();
558        }
559        output_ssts.clear();
560    }
561    // Sort by split/key range index.
562    if !output_ssts.is_empty() {
563        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
564    }
565
566    // After a compaction is done, mutate the compaction task.
567    let (compact_task, table_stats, object_timestamps) =
568        compact_done(compact_task, context.clone(), output_ssts, task_status);
569    let cost_time = timer.stop_and_record() * 1000.0;
570    tracing::info!(
571        "Finished compaction task in {:?}ms: {}",
572        cost_time,
573        compact_task_output_to_string(&compact_task)
574    );
575    (
576        (compact_task, table_stats, object_timestamps),
577        memory_detector,
578    )
579}
580
581/// Handles a compaction task and reports its status to hummock manager.
582/// Always return `Ok` and let hummock manager handle errors.
583pub async fn compact(
584    compactor_context: CompactorContext,
585    mut compact_task: CompactTask,
586    shutdown_rx: Receiver<()>,
587    object_id_getter: Arc<dyn GetObjectId>,
588    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
589) -> (
590    (
591        CompactTask,
592        HashMap<u32, TableStats>,
593        HashMap<HummockSstableObjectId, u64>,
594    ),
595    Option<MemoryTracker>,
596) {
597    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
598    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
599        .acquire(table_ids_to_be_compacted.clone())
600        .await
601    {
602        Ok(compaction_catalog_agent_ref) => {
603            let acquire_table_ids: HashSet<StateTableId> =
604                compaction_catalog_agent_ref.table_ids().collect();
605            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
606                let diff = table_ids_to_be_compacted
607                    .into_iter()
608                    .collect::<HashSet<_>>()
609                    .symmetric_difference(&acquire_table_ids)
610                    .cloned()
611                    .collect::<Vec<_>>();
612                tracing::warn!(
613                    dif= ?diff,
614                    "Some table ids are not acquired."
615                );
616                return (
617                    compact_done(
618                        compact_task,
619                        compactor_context.clone(),
620                        vec![],
621                        TaskStatus::ExecuteFailed,
622                    ),
623                    None,
624                );
625            }
626
627            compaction_catalog_agent_ref
628        }
629        Err(e) => {
630            tracing::warn!(
631                error = %e.as_report(),
632                "Failed to acquire compaction catalog agent"
633            );
634            return (
635                compact_done(
636                    compact_task,
637                    compactor_context.clone(),
638                    vec![],
639                    TaskStatus::ExecuteFailed,
640                ),
641                None,
642            );
643        }
644    };
645
646    // rewrite compact_task watermarks
647    {
648        compact_task
649            .pk_prefix_table_watermarks
650            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
651
652        compact_task
653            .non_pk_prefix_table_watermarks
654            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
655    }
656
657    compact_with_agent(
658        compactor_context,
659        compact_task,
660        shutdown_rx,
661        object_id_getter,
662        compaction_catalog_agent_ref,
663    )
664    .await
665}
666
667/// Fills in the compact task and tries to report the task result to meta node.
668pub(crate) fn compact_done(
669    mut compact_task: CompactTask,
670    context: CompactorContext,
671    output_ssts: Vec<CompactOutput>,
672    task_status: TaskStatus,
673) -> (
674    CompactTask,
675    HashMap<u32, TableStats>,
676    HashMap<HummockSstableObjectId, u64>,
677) {
678    let mut table_stats_map = TableStatsMap::default();
679    let mut object_timestamps = HashMap::default();
680    compact_task.task_status = task_status;
681    compact_task
682        .sorted_output_ssts
683        .reserve(compact_task.splits.len());
684    let mut compaction_write_bytes = 0;
685    for (
686        _,
687        ssts,
688        CompactionStatistics {
689            delta_drop_stat, ..
690        },
691    ) in output_ssts
692    {
693        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
694        for sst_info in ssts {
695            compaction_write_bytes += sst_info.file_size();
696            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
697            compact_task.sorted_output_ssts.push(sst_info.sst_info);
698        }
699    }
700
701    let group_label = compact_task.compaction_group_id.to_string();
702    let level_label = compact_task.target_level.to_string();
703    context
704        .compactor_metrics
705        .compact_write_bytes
706        .with_label_values(&[&group_label, &level_label])
707        .inc_by(compaction_write_bytes);
708    context
709        .compactor_metrics
710        .compact_write_sstn
711        .with_label_values(&[&group_label, &level_label])
712        .inc_by(compact_task.sorted_output_ssts.len() as u64);
713
714    (compact_task, table_stats_map, object_timestamps)
715}
716
717pub async fn compact_and_build_sst<F>(
718    sst_builder: &mut CapacitySplitTableBuilder<F>,
719    task_config: &TaskConfig,
720    compactor_metrics: Arc<CompactorMetrics>,
721    mut iter: impl HummockIterator<Direction = Forward>,
722    mut compaction_filter: impl CompactionFilter,
723) -> HummockResult<CompactionStatistics>
724where
725    F: TableBuilderFactory,
726{
727    if !task_config.key_range.left.is_empty() {
728        let full_key = FullKey::decode(&task_config.key_range.left);
729        iter.seek(full_key)
730            .instrument_await("iter_seek".verbose())
731            .await?;
732    } else {
733        iter.rewind().instrument_await("rewind".verbose()).await?;
734    };
735
736    let end_key = if task_config.key_range.right.is_empty() {
737        FullKey::default()
738    } else {
739        FullKey::decode(&task_config.key_range.right).to_vec()
740    };
741    let max_key = end_key.to_ref();
742
743    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
744    let mut local_stats = StoreLocalStatistic::default();
745
746    // Keep table stats changes due to dropping KV.
747    let mut table_stats_drop = TableStatsMap::default();
748    let mut last_table_stats = TableStats::default();
749    let mut last_table_id = None;
750    let mut compaction_statistics = CompactionStatistics::default();
751    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
752    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
753    let schemas: HashMap<u32, HashSet<i32>> = task_config
754        .table_schemas
755        .iter()
756        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
757        .collect();
758    while iter.is_valid() {
759        let iter_key = iter.key();
760        compaction_statistics.iter_total_key_counts += 1;
761
762        let is_new_user_key = full_key_tracker.observe(iter.key());
763        let mut drop = false;
764
765        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
766        let value = iter.value();
767        let ValueMeta {
768            object_id,
769            block_id,
770        } = iter.value_meta();
771        if is_new_user_key {
772            if !max_key.is_empty() && iter_key >= max_key {
773                break;
774            }
775            if value.is_delete() {
776                local_stats.skip_delete_key_count += 1;
777            }
778        } else {
779            local_stats.skip_multi_version_key_count += 1;
780        }
781
782        if last_table_id != Some(iter_key.user_key.table_id.table_id) {
783            if let Some(last_table_id) = last_table_id.take() {
784                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
785            }
786            last_table_id = Some(iter_key.user_key.table_id.table_id);
787        }
788
789        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
790        // In our design, frontend avoid to access keys which had be deleted, so we don't
791        // need to consider the epoch when the compaction_filter match (it
792        // means that mv had drop)
793        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
794        // Do not assume that "the epoch of keys behind must be smaller than the current key."
795        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
796            || (!task_config.retain_multiple_version && !is_new_user_key)
797        {
798            drop = true;
799        }
800
801        if !drop && compaction_filter.should_delete(iter_key) {
802            drop = true;
803        }
804
805        if drop {
806            compaction_statistics.iter_drop_key_counts += 1;
807
808            let should_count = match task_config.stats_target_table_ids.as_ref() {
809                Some(target_table_ids) => {
810                    target_table_ids.contains(&iter_key.user_key.table_id.table_id)
811                }
812                None => true,
813            };
814            if should_count {
815                last_table_stats.total_key_count -= 1;
816                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
817                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
818            }
819            iter.next()
820                .instrument_await("iter_next_in_drop".verbose())
821                .await?;
822            continue;
823        }
824
825        // May drop stale columns
826        let check_table_id = iter_key.user_key.table_id.table_id;
827        let mut is_value_rewritten = false;
828        if let HummockValue::Put(v) = value
829            && let Some(object_id) = object_id
830            && let Some(block_id) = block_id
831            && !skip_schema_check
832                .get(&object_id)
833                .map(|prev_block_id| {
834                    assert!(*prev_block_id <= block_id);
835                    *prev_block_id == block_id
836                })
837                .unwrap_or(false)
838            && let Some(schema) = schemas.get(&check_table_id)
839        {
840            let value_size = v.len();
841            match try_drop_invalid_columns(v, schema) {
842                None => {
843                    if !task_config.disable_drop_column_optimization {
844                        // Under the assumption that all values in the same (object, block) group should share the same schema,
845                        // if one value drops no columns during a compaction, no need to check other values in the same group.
846                        skip_schema_check.insert(object_id, block_id);
847                    }
848                }
849                Some(new_value) => {
850                    is_value_rewritten = true;
851                    let new_put = HummockValue::put(new_value.as_slice());
852                    sst_builder
853                        .add_full_key(iter_key, new_put, is_new_user_key)
854                        .instrument_await("add_rewritten_full_key".verbose())
855                        .await?;
856                    let value_size_change = value_size as i64 - new_value.len() as i64;
857                    assert!(value_size_change >= 0);
858                    last_table_stats.total_value_size -= value_size_change;
859                }
860            }
861        }
862
863        if !is_value_rewritten {
864            // Don't allow two SSTs to share same user key
865            sst_builder
866                .add_full_key(iter_key, value, is_new_user_key)
867                .instrument_await("add_full_key".verbose())
868                .await?;
869        }
870
871        iter.next().instrument_await("iter_next".verbose()).await?;
872    }
873
874    if let Some(last_table_id) = last_table_id.take() {
875        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
876    }
877    iter.collect_local_statistic(&mut local_stats);
878    add_table_stats_map(
879        &mut table_stats_drop,
880        &local_stats.skipped_by_watermark_table_stats,
881    );
882    local_stats.report_compactor(compactor_metrics.as_ref());
883    compaction_statistics.delta_drop_stat = table_stats_drop;
884
885    Ok(compaction_statistics)
886}
887
888#[cfg(test)]
889pub mod tests {
890    use risingwave_hummock_sdk::can_concat;
891
892    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
893    use crate::hummock::iterator::test_utils::mock_sstable_store;
894    use crate::hummock::test_utils::{
895        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
896    };
897    use crate::hummock::value::HummockValue;
898
899    #[tokio::test]
900    async fn test_partition_overlapping_level() {
901        const TEST_KEYS_COUNT: usize = 10;
902        let sstable_store = mock_sstable_store().await;
903        let mut table_infos = vec![];
904        for object_id in 0..10 {
905            let start_index = object_id * TEST_KEYS_COUNT;
906            let end_index = start_index + 2 * TEST_KEYS_COUNT;
907            let table_info = gen_test_sstable_info(
908                default_builder_opt_for_test(),
909                object_id as u64,
910                (start_index..end_index)
911                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
912                sstable_store.clone(),
913            )
914            .await;
915            table_infos.push(table_info);
916        }
917        let table_infos = partition_overlapping_sstable_infos(table_infos);
918        assert_eq!(table_infos.len(), 2);
919        for ssts in table_infos {
920            assert!(can_concat(&ssts));
921        }
922    }
923}