risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34    full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47    metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53    fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58    ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64    BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65    SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69    compact_task: CompactTask,
70    compactor: Compactor,
71    sstable_store: SstableStoreRef,
72    key_range: KeyRange,
73    split_index: usize,
74}
75
76impl CompactorRunner {
77    pub fn new(
78        split_index: usize,
79        context: CompactorContext,
80        task: CompactTask,
81        object_id_getter: Box<dyn GetObjectId>,
82    ) -> Self {
83        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84        options.compression_algorithm = match task.compression_algorithm {
85            0 => CompressionAlgorithm::None,
86            1 => CompressionAlgorithm::Lz4,
87            _ => CompressionAlgorithm::Zstd,
88        };
89
90        options.capacity = estimate_task_output_capacity(context.clone(), &task);
91        let kv_count = task
92            .input_ssts
93            .iter()
94            .flat_map(|level| level.table_infos.iter())
95            .map(|sst| sst.total_key_count)
96            .sum::<u64>() as usize;
97        let use_block_based_filter =
98            BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100        let key_range = KeyRange {
101            left: task.splits[split_index].left.clone(),
102            right: task.splits[split_index].right.clone(),
103            right_exclusive: true,
104        };
105
106        let compactor = Compactor::new(
107            context.clone(),
108            options,
109            TaskConfig {
110                key_range: key_range.clone(),
111                cache_policy: CachePolicy::NotFill,
112                gc_delete_keys: task.gc_delete_keys,
113                retain_multiple_version: false,
114                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115                task_type: task.task_type,
116                use_block_based_filter,
117                table_vnode_partition: task.table_vnode_partition.clone(),
118                table_schemas: task
119                    .table_schemas
120                    .iter()
121                    .map(|(k, v)| (*k, v.clone()))
122                    .collect(),
123                disable_drop_column_optimization: false,
124            },
125            object_id_getter,
126        );
127
128        Self {
129            compactor,
130            compact_task: task,
131            sstable_store: context.sstable_store,
132            key_range,
133            split_index,
134        }
135    }
136
137    pub async fn run(
138        &self,
139        compaction_filter: impl CompactionFilter,
140        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141        task_progress: Arc<TaskProgress>,
142    ) -> HummockResult<CompactOutput> {
143        let iter =
144            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145        let (ssts, compaction_stat) = self
146            .compactor
147            .compact_key_range(
148                iter,
149                compaction_filter,
150                compaction_catalog_agent_ref,
151                Some(task_progress),
152                Some(self.compact_task.task_id),
153                Some(self.split_index),
154            )
155            .await?;
156        Ok((self.split_index, ssts, compaction_stat))
157    }
158
159    /// Build the merge iterator based on the given input ssts.
160    fn build_sst_iter(
161        &self,
162        task_progress: Arc<TaskProgress>,
163        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165        let compactor_iter_max_io_retry_times = self
166            .compactor
167            .context
168            .storage_opts
169            .compactor_iter_max_io_retry_times;
170        let mut table_iters = Vec::new();
171        for level in &self.compact_task.input_ssts {
172            if level.table_infos.is_empty() {
173                continue;
174            }
175
176            let tables = level
177                .table_infos
178                .iter()
179                .filter(|table_info| {
180                    let table_ids = &table_info.table_ids;
181                    let exist_table = table_ids
182                        .iter()
183                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186                })
187                .cloned()
188                .collect_vec();
189            // Do not need to filter the table because manager has done it.
190            if level.level_type == LevelType::Nonoverlapping {
191                debug_assert!(can_concat(&level.table_infos));
192                table_iters.push(ConcatSstableIterator::new(
193                    self.compact_task.existing_table_ids.clone(),
194                    tables,
195                    self.compactor.task_config.key_range.clone(),
196                    self.sstable_store.clone(),
197                    task_progress.clone(),
198                    compactor_iter_max_io_retry_times,
199                ));
200            } else if tables.len()
201                > self
202                    .compactor
203                    .context
204                    .storage_opts
205                    .compactor_max_overlap_sst_count
206            {
207                let sst_groups = partition_overlapping_sstable_infos(tables);
208                tracing::warn!(
209                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210                    level.table_infos.len(),
211                    sst_groups.len()
212                );
213                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
215                    assert!(
216                        full_key_can_concat(&table_infos),
217                        "sst_group idx {:?} table_infos: {:?}",
218                        idx,
219                        table_infos
220                    );
221                    table_iters.push(ConcatSstableIterator::new(
222                        self.compact_task.existing_table_ids.clone(),
223                        table_infos,
224                        self.compactor.task_config.key_range.clone(),
225                        self.sstable_store.clone(),
226                        task_progress.clone(),
227                        compactor_iter_max_io_retry_times,
228                    ));
229                }
230            } else {
231                for table_info in tables {
232                    table_iters.push(ConcatSstableIterator::new(
233                        self.compact_task.existing_table_ids.clone(),
234                        vec![table_info],
235                        self.compactor.task_config.key_range.clone(),
236                        self.sstable_store.clone(),
237                        task_progress.clone(),
238                        compactor_iter_max_io_retry_times,
239                    ));
240                }
241            }
242        }
243
244        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
245        // in https://github.com/risingwavelabs/risingwave/issues/13148
246        let combine_iter = {
247            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248                MonitoredCompactorIterator::new(
249                    MergeIterator::for_compactor(table_iters),
250                    task_progress.clone(),
251                ),
252                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253                    self.compact_task.pk_prefix_table_watermarks.clone(),
254                ),
255            );
256
257            NonPkPrefixSkipWatermarkIterator::new(
258                skip_watermark_iter,
259                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
261                    compaction_catalog_agent_ref,
262                ),
263            )
264        };
265
266        Ok(combine_iter)
267    }
268}
269
270pub fn partition_overlapping_sstable_infos(
271    mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273    pub struct SstableGroup {
274        ssts: Vec<SstableInfo>,
275        max_right_bound: Bytes,
276    }
277
278    impl PartialEq for SstableGroup {
279        fn eq(&self, other: &SstableGroup) -> bool {
280            self.max_right_bound == other.max_right_bound
281        }
282    }
283    impl PartialOrd for SstableGroup {
284        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285            Some(self.cmp(other))
286        }
287    }
288    impl Eq for SstableGroup {}
289    impl Ord for SstableGroup {
290        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291            // Pick group with the smallest right bound for every new sstable.
292            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293        }
294    }
295    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296    origin_infos.sort_by(|a, b| {
297        let x = &a.key_range;
298        let y = &b.key_range;
299        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300    });
301    for sst in origin_infos {
302        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
303        if let Some(mut prev_group) = groups.peek_mut() {
304            if KeyComparator::encoded_full_key_less_than(
305                &prev_group.max_right_bound,
306                &sst.key_range.left,
307            ) {
308                prev_group.max_right_bound.clone_from(&sst.key_range.right);
309                prev_group.ssts.push(sst);
310                continue;
311            }
312        }
313        groups.push(SstableGroup {
314            max_right_bound: sst.key_range.right.clone(),
315            ssts: vec![sst],
316        });
317    }
318    assert!(!groups.is_empty());
319    groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322/// Handles a compaction task and reports its status to hummock manager.
323/// Always return `Ok` and let hummock manager handle errors.
324pub async fn compact_with_agent(
325    compactor_context: CompactorContext,
326    mut compact_task: CompactTask,
327    mut shutdown_rx: Receiver<()>,
328    object_id_getter: Box<dyn GetObjectId>,
329    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331    (
332        CompactTask,
333        HashMap<u32, TableStats>,
334        HashMap<HummockSstableObjectId, u64>,
335    ),
336    Option<MemoryTracker>,
337) {
338    let context = compactor_context.clone();
339    let group_label = compact_task.compaction_group_id.to_string();
340    metrics_report_for_task(&compact_task, &context);
341
342    let timer = context
343        .compactor_metrics
344        .compact_task_duration
345        .with_label_values(&[
346            &group_label,
347            &compact_task.input_ssts[0].level_idx.to_string(),
348        ])
349        .start_timer();
350
351    let multi_filter = build_multi_compaction_filter(&compact_task);
352    let mut task_status = TaskStatus::Success;
353    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355    if let Err(e) =
356        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357    {
358        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359        task_status = TaskStatus::ExecuteFailed;
360        return (
361            compact_done(compact_task, context.clone(), vec![], task_status),
362            None,
363        );
364    }
365
366    let compact_task_statistics = statistics_compact_task(&compact_task);
367    // Number of splits (key ranges) is equal to number of compaction tasks
368    let parallelism = compact_task.splits.len();
369    assert_ne!(parallelism, 0, "splits cannot be empty");
370    let mut output_ssts = Vec::with_capacity(parallelism);
371    let mut compaction_futures = vec![];
372    let mut abort_handles = vec![];
373    let task_progress_guard =
374        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379        &compact_task,
380        (context.storage_opts.block_size_kb as u64) * (1 << 10),
381        context
382            .storage_opts
383            .object_store_config
384            .s3
385            .recv_buffer_size
386            .unwrap_or(6 * 1024 * 1024) as u64,
387        capacity as u64,
388    ) * compact_task.splits.len() as u64;
389
390    tracing::info!(
391        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392        compact_task.task_id,
393        compact_task_statistics,
394        compact_task.compression_algorithm,
395        parallelism,
396        task_memory_capacity_with_parallelism,
397        optimize_by_copy_block,
398        compact_task_to_string(&compact_task),
399    );
400
401    // If the task does not have enough memory, it should cancel the task and let the meta
402    // reschedule it, so that it does not occupy the compactor's resources.
403    let memory_detector = context
404        .memory_limiter
405        .try_require_memory(task_memory_capacity_with_parallelism);
406    if memory_detector.is_none() {
407        tracing::warn!(
408            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
409            compact_task.task_id,
410            task_memory_capacity_with_parallelism,
411            context.memory_limiter.get_memory_usage(),
412            context.memory_limiter.quota()
413        );
414        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415        return (
416            compact_done(compact_task, context.clone(), output_ssts, task_status),
417            memory_detector,
418        );
419    }
420
421    context.compactor_metrics.compact_task_pending_num.inc();
422    context
423        .compactor_metrics
424        .compact_task_pending_parallelism
425        .add(parallelism as _);
426    let _release_metrics_guard =
427        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428            context.compactor_metrics.compact_task_pending_num.dec();
429            context
430                .compactor_metrics
431                .compact_task_pending_parallelism
432                .sub(parallelism as _);
433        });
434
435    if optimize_by_copy_block {
436        let runner = fast_compactor_runner::CompactorRunner::new(
437            context.clone(),
438            compact_task.clone(),
439            compaction_catalog_agent_ref.clone(),
440            object_id_getter.clone(),
441            task_progress_guard.progress.clone(),
442        );
443
444        tokio::select! {
445            _ = &mut shutdown_rx => {
446                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
447                task_status = TaskStatus::ManualCanceled;
448            },
449
450            ret = runner.run() => {
451                match ret {
452                    Ok((ssts, statistics)) => {
453                        output_ssts.push((0, ssts, statistics));
454                    }
455                    Err(e) => {
456                        task_status = TaskStatus::ExecuteFailed;
457                        tracing::warn!(
458                            error = %e.as_report(),
459                            "Compaction task {} failed with error",
460                            compact_task.task_id,
461                        );
462                    }
463                }
464            }
465        }
466
467        // After a compaction is done, mutate the compaction task.
468        let (compact_task, table_stats, object_timestamps) =
469            compact_done(compact_task, context.clone(), output_ssts, task_status);
470        let cost_time = timer.stop_and_record() * 1000.0;
471        tracing::info!(
472            "Finished fast compaction task in {:?}ms: {}",
473            cost_time,
474            compact_task_to_string(&compact_task)
475        );
476        return (
477            (compact_task, table_stats, object_timestamps),
478            memory_detector,
479        );
480    }
481    for (split_index, _) in compact_task.splits.iter().enumerate() {
482        let filter = multi_filter.clone();
483        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
484        let compactor_runner = CompactorRunner::new(
485            split_index,
486            compactor_context.clone(),
487            compact_task.clone(),
488            object_id_getter.clone(),
489        );
490        let task_progress = task_progress_guard.progress.clone();
491        let runner = async move {
492            compactor_runner
493                .run(filter, compaction_catalog_agent_ref, task_progress)
494                .await
495        };
496        let traced = match context.await_tree_reg.as_ref() {
497            None => runner.right_future(),
498            Some(await_tree_reg) => await_tree_reg
499                .register(
500                    await_tree_key::CompactRunner {
501                        task_id: compact_task.task_id,
502                        split_index,
503                    },
504                    format!(
505                        "Compaction Task {} Split {} ",
506                        compact_task.task_id, split_index
507                    ),
508                )
509                .instrument(runner)
510                .left_future(),
511        };
512        let handle = tokio::spawn(traced);
513        abort_handles.push(handle.abort_handle());
514        compaction_futures.push(handle);
515    }
516
517    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
518    loop {
519        tokio::select! {
520            _ = &mut shutdown_rx => {
521                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
522                task_status = TaskStatus::ManualCanceled;
523                break;
524            }
525            future_result = buffered.next() => {
526                match future_result {
527                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
528                        output_ssts.push((split_index, ssts, compact_stat));
529                    }
530                    Some(Ok(Err(e))) => {
531                        task_status = TaskStatus::ExecuteFailed;
532                        tracing::warn!(
533                            error = %e.as_report(),
534                            "Compaction task {} failed with error",
535                            compact_task.task_id,
536                        );
537                        break;
538                    }
539                    Some(Err(e)) => {
540                        task_status = TaskStatus::JoinHandleFailed;
541                        tracing::warn!(
542                            error = %e.as_report(),
543                            "Compaction task {} failed with join handle error",
544                            compact_task.task_id,
545                        );
546                        break;
547                    }
548                    None => break,
549                }
550            }
551        }
552    }
553
554    if task_status != TaskStatus::Success {
555        for abort_handle in abort_handles {
556            abort_handle.abort();
557        }
558        output_ssts.clear();
559    }
560    // Sort by split/key range index.
561    if !output_ssts.is_empty() {
562        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
563    }
564
565    // After a compaction is done, mutate the compaction task.
566    let (compact_task, table_stats, object_timestamps) =
567        compact_done(compact_task, context.clone(), output_ssts, task_status);
568    let cost_time = timer.stop_and_record() * 1000.0;
569    tracing::info!(
570        "Finished compaction task in {:?}ms: {}",
571        cost_time,
572        compact_task_output_to_string(&compact_task)
573    );
574    (
575        (compact_task, table_stats, object_timestamps),
576        memory_detector,
577    )
578}
579
580/// Handles a compaction task and reports its status to hummock manager.
581/// Always return `Ok` and let hummock manager handle errors.
582pub async fn compact(
583    compactor_context: CompactorContext,
584    compact_task: CompactTask,
585    shutdown_rx: Receiver<()>,
586    object_id_getter: Box<dyn GetObjectId>,
587    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
588) -> (
589    (
590        CompactTask,
591        HashMap<u32, TableStats>,
592        HashMap<HummockSstableObjectId, u64>,
593    ),
594    Option<MemoryTracker>,
595) {
596    let compact_table_ids = compact_task.build_compact_table_ids();
597    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
598        .acquire(compact_table_ids.clone())
599        .await
600    {
601        Ok(compaction_catalog_agent_ref) => {
602            let acquire_table_ids: HashSet<StateTableId> =
603                compaction_catalog_agent_ref.table_ids().collect();
604            if acquire_table_ids.len() != compact_table_ids.len() {
605                let diff = compact_table_ids
606                    .into_iter()
607                    .collect::<HashSet<_>>()
608                    .symmetric_difference(&acquire_table_ids)
609                    .cloned()
610                    .collect::<Vec<_>>();
611                tracing::warn!(
612                    dif= ?diff,
613                    "Some table ids are not acquired."
614                );
615                return (
616                    compact_done(
617                        compact_task,
618                        compactor_context.clone(),
619                        vec![],
620                        TaskStatus::ExecuteFailed,
621                    ),
622                    None,
623                );
624            }
625
626            compaction_catalog_agent_ref
627        }
628        Err(e) => {
629            tracing::warn!(
630                error = %e.as_report(),
631                "Failed to acquire compaction catalog agent"
632            );
633            return (
634                compact_done(
635                    compact_task,
636                    compactor_context.clone(),
637                    vec![],
638                    TaskStatus::ExecuteFailed,
639                ),
640                None,
641            );
642        }
643    };
644
645    compact_with_agent(
646        compactor_context,
647        compact_task,
648        shutdown_rx,
649        object_id_getter,
650        compaction_catalog_agent_ref,
651    )
652    .await
653}
654
655/// Fills in the compact task and tries to report the task result to meta node.
656pub(crate) fn compact_done(
657    mut compact_task: CompactTask,
658    context: CompactorContext,
659    output_ssts: Vec<CompactOutput>,
660    task_status: TaskStatus,
661) -> (
662    CompactTask,
663    HashMap<u32, TableStats>,
664    HashMap<HummockSstableObjectId, u64>,
665) {
666    let mut table_stats_map = TableStatsMap::default();
667    let mut object_timestamps = HashMap::default();
668    compact_task.task_status = task_status;
669    compact_task
670        .sorted_output_ssts
671        .reserve(compact_task.splits.len());
672    let mut compaction_write_bytes = 0;
673    for (
674        _,
675        ssts,
676        CompactionStatistics {
677            delta_drop_stat, ..
678        },
679    ) in output_ssts
680    {
681        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
682        for sst_info in ssts {
683            compaction_write_bytes += sst_info.file_size();
684            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
685            compact_task.sorted_output_ssts.push(sst_info.sst_info);
686        }
687    }
688
689    let group_label = compact_task.compaction_group_id.to_string();
690    let level_label = compact_task.target_level.to_string();
691    context
692        .compactor_metrics
693        .compact_write_bytes
694        .with_label_values(&[&group_label, level_label.as_str()])
695        .inc_by(compaction_write_bytes);
696    context
697        .compactor_metrics
698        .compact_write_sstn
699        .with_label_values(&[&group_label, level_label.as_str()])
700        .inc_by(compact_task.sorted_output_ssts.len() as u64);
701
702    (compact_task, table_stats_map, object_timestamps)
703}
704
705pub async fn compact_and_build_sst<F>(
706    sst_builder: &mut CapacitySplitTableBuilder<F>,
707    task_config: &TaskConfig,
708    compactor_metrics: Arc<CompactorMetrics>,
709    mut iter: impl HummockIterator<Direction = Forward>,
710    mut compaction_filter: impl CompactionFilter,
711) -> HummockResult<CompactionStatistics>
712where
713    F: TableBuilderFactory,
714{
715    if !task_config.key_range.left.is_empty() {
716        let full_key = FullKey::decode(&task_config.key_range.left);
717        iter.seek(full_key)
718            .instrument_await("iter_seek".verbose())
719            .await?;
720    } else {
721        iter.rewind().instrument_await("rewind".verbose()).await?;
722    };
723
724    let end_key = if task_config.key_range.right.is_empty() {
725        FullKey::default()
726    } else {
727        FullKey::decode(&task_config.key_range.right).to_vec()
728    };
729    let max_key = end_key.to_ref();
730
731    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
732    let mut local_stats = StoreLocalStatistic::default();
733
734    // Keep table stats changes due to dropping KV.
735    let mut table_stats_drop = TableStatsMap::default();
736    let mut last_table_stats = TableStats::default();
737    let mut last_table_id = None;
738    let mut compaction_statistics = CompactionStatistics::default();
739    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
740    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
741    let schemas: HashMap<u32, HashSet<i32>> = task_config
742        .table_schemas
743        .iter()
744        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
745        .collect();
746    while iter.is_valid() {
747        let iter_key = iter.key();
748        compaction_statistics.iter_total_key_counts += 1;
749
750        let is_new_user_key = full_key_tracker.observe(iter.key());
751        let mut drop = false;
752
753        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
754        let value = iter.value();
755        let ValueMeta {
756            object_id,
757            block_id,
758        } = iter.value_meta();
759        if is_new_user_key {
760            if !max_key.is_empty() && iter_key >= max_key {
761                break;
762            }
763            if value.is_delete() {
764                local_stats.skip_delete_key_count += 1;
765            }
766        } else {
767            local_stats.skip_multi_version_key_count += 1;
768        }
769
770        if last_table_id != Some(iter_key.user_key.table_id.table_id) {
771            if let Some(last_table_id) = last_table_id.take() {
772                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
773            }
774            last_table_id = Some(iter_key.user_key.table_id.table_id);
775        }
776
777        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
778        // In our design, frontend avoid to access keys which had be deleted, so we don't
779        // need to consider the epoch when the compaction_filter match (it
780        // means that mv had drop)
781        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
782        // Do not assume that "the epoch of keys behind must be smaller than the current key."
783        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
784            || (!task_config.retain_multiple_version && !is_new_user_key)
785        {
786            drop = true;
787        }
788
789        if !drop && compaction_filter.should_delete(iter_key) {
790            drop = true;
791        }
792
793        if drop {
794            compaction_statistics.iter_drop_key_counts += 1;
795
796            let should_count = match task_config.stats_target_table_ids.as_ref() {
797                Some(target_table_ids) => {
798                    target_table_ids.contains(&iter_key.user_key.table_id.table_id)
799                }
800                None => true,
801            };
802            if should_count {
803                last_table_stats.total_key_count -= 1;
804                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
805                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
806            }
807            iter.next()
808                .instrument_await("iter_next_in_drop".verbose())
809                .await?;
810            continue;
811        }
812
813        // May drop stale columns
814        let check_table_id = iter_key.user_key.table_id.table_id;
815        let mut is_value_rewritten = false;
816        if let HummockValue::Put(v) = value
817            && let Some(object_id) = object_id
818            && let Some(block_id) = block_id
819            && !skip_schema_check
820                .get(&object_id)
821                .map(|prev_block_id| {
822                    assert!(*prev_block_id <= block_id);
823                    *prev_block_id == block_id
824                })
825                .unwrap_or(false)
826            && let Some(schema) = schemas.get(&check_table_id)
827        {
828            let value_size = v.len();
829            match try_drop_invalid_columns(v, schema) {
830                None => {
831                    if !task_config.disable_drop_column_optimization {
832                        // Under the assumption that all values in the same (object, block) group should share the same schema,
833                        // if one value drops no columns during a compaction, no need to check other values in the same group.
834                        skip_schema_check.insert(object_id, block_id);
835                    }
836                }
837                Some(new_value) => {
838                    is_value_rewritten = true;
839                    let new_put = HummockValue::put(new_value.as_slice());
840                    sst_builder
841                        .add_full_key(iter_key, new_put, is_new_user_key)
842                        .instrument_await("add_rewritten_full_key".verbose())
843                        .await?;
844                    let value_size_change = value_size as i64 - new_value.len() as i64;
845                    assert!(value_size_change >= 0);
846                    last_table_stats.total_value_size -= value_size_change;
847                }
848            }
849        }
850
851        if !is_value_rewritten {
852            // Don't allow two SSTs to share same user key
853            sst_builder
854                .add_full_key(iter_key, value, is_new_user_key)
855                .instrument_await("add_full_key".verbose())
856                .await?;
857        }
858
859        iter.next().instrument_await("iter_next".verbose()).await?;
860    }
861
862    if let Some(last_table_id) = last_table_id.take() {
863        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
864    }
865    iter.collect_local_statistic(&mut local_stats);
866    add_table_stats_map(
867        &mut table_stats_drop,
868        &local_stats.skipped_by_watermark_table_stats,
869    );
870    local_stats.report_compactor(compactor_metrics.as_ref());
871    compaction_statistics.delta_drop_stat = table_stats_drop;
872
873    Ok(compaction_statistics)
874}
875
876#[cfg(test)]
877pub mod tests {
878    use risingwave_hummock_sdk::can_concat;
879
880    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
881    use crate::hummock::iterator::test_utils::mock_sstable_store;
882    use crate::hummock::test_utils::{
883        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
884    };
885    use crate::hummock::value::HummockValue;
886
887    #[tokio::test]
888    async fn test_partition_overlapping_level() {
889        const TEST_KEYS_COUNT: usize = 10;
890        let sstable_store = mock_sstable_store().await;
891        let mut table_infos = vec![];
892        for object_id in 0..10 {
893            let start_index = object_id * TEST_KEYS_COUNT;
894            let end_index = start_index + 2 * TEST_KEYS_COUNT;
895            let table_info = gen_test_sstable_info(
896                default_builder_opt_for_test(),
897                object_id as u64,
898                (start_index..end_index)
899                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
900                sstable_store.clone(),
901            )
902            .await;
903            table_infos.push(table_info);
904        }
905        let table_infos = partition_overlapping_sstable_infos(table_infos);
906        assert_eq!(table_infos.len(), 2);
907        for ssts in table_infos {
908            assert!(can_concat(&ssts));
909        }
910    }
911}