risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
23use risingwave_hummock_sdk::compact::{
24    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
25};
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::compaction_group::StateTableId;
28use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
29use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
30use risingwave_hummock_sdk::sstable_info::SstableInfo;
31use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
32use risingwave_hummock_sdk::{
33    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
34    full_key_can_concat,
35};
36use risingwave_pb::hummock::LevelType;
37use risingwave_pb::hummock::compact_task::TaskStatus;
38use thiserror_ext::AsReport;
39use tokio::sync::oneshot::Receiver;
40
41use super::iterator::MonitoredCompactorIterator;
42use super::task_progress::TaskProgress;
43use super::{CompactionStatistics, TaskConfig};
44use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
45use crate::hummock::compactor::compaction_utils::{
46    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
47    metrics_report_for_task, optimize_by_copy_block,
48};
49use crate::hummock::compactor::iterator::ConcatSstableIterator;
50use crate::hummock::compactor::task_progress::TaskProgressGuard;
51use crate::hummock::compactor::{
52    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
53    fast_compactor_runner,
54};
55use crate::hummock::iterator::{
56    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
57    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
58    ValueMeta,
59};
60use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
61use crate::hummock::utils::MemoryTracker;
62use crate::hummock::value::HummockValue;
63use crate::hummock::{
64    BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult,
65    SstableBuilderOptions, SstableStoreRef,
66};
67use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
68pub struct CompactorRunner {
69    compact_task: CompactTask,
70    compactor: Compactor,
71    sstable_store: SstableStoreRef,
72    key_range: KeyRange,
73    split_index: usize,
74}
75
76impl CompactorRunner {
77    pub fn new(
78        split_index: usize,
79        context: CompactorContext,
80        task: CompactTask,
81        object_id_getter: Arc<dyn GetObjectId>,
82    ) -> Self {
83        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
84        options.compression_algorithm = match task.compression_algorithm {
85            0 => CompressionAlgorithm::None,
86            1 => CompressionAlgorithm::Lz4,
87            _ => CompressionAlgorithm::Zstd,
88        };
89
90        options.capacity = estimate_task_output_capacity(context.clone(), &task);
91        let kv_count = task
92            .input_ssts
93            .iter()
94            .flat_map(|level| level.table_infos.iter())
95            .map(|sst| sst.total_key_count)
96            .sum::<u64>() as usize;
97        let use_block_based_filter =
98            BlockedXor16FilterBuilder::is_kv_count_too_large(kv_count) || task.target_level > 0;
99
100        let key_range = KeyRange {
101            left: task.splits[split_index].left.clone(),
102            right: task.splits[split_index].right.clone(),
103            right_exclusive: true,
104        };
105
106        let compactor = Compactor::new(
107            context.clone(),
108            options,
109            TaskConfig {
110                key_range: key_range.clone(),
111                cache_policy: CachePolicy::NotFill,
112                gc_delete_keys: task.gc_delete_keys,
113                retain_multiple_version: false,
114                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
115                task_type: task.task_type,
116                use_block_based_filter,
117                table_vnode_partition: task.table_vnode_partition.clone(),
118                table_schemas: task
119                    .table_schemas
120                    .iter()
121                    .map(|(k, v)| (*k, v.clone()))
122                    .collect(),
123                disable_drop_column_optimization: false,
124            },
125            object_id_getter,
126        );
127
128        Self {
129            compactor,
130            compact_task: task,
131            sstable_store: context.sstable_store,
132            key_range,
133            split_index,
134        }
135    }
136
137    pub async fn run(
138        &self,
139        compaction_filter: impl CompactionFilter,
140        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
141        task_progress: Arc<TaskProgress>,
142    ) -> HummockResult<CompactOutput> {
143        let iter =
144            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
145        let (ssts, compaction_stat) = self
146            .compactor
147            .compact_key_range(
148                iter,
149                compaction_filter,
150                compaction_catalog_agent_ref,
151                Some(task_progress),
152                Some(self.compact_task.task_id),
153                Some(self.split_index),
154            )
155            .await?;
156        Ok((self.split_index, ssts, compaction_stat))
157    }
158
159    /// Build the merge iterator based on the given input ssts.
160    fn build_sst_iter(
161        &self,
162        task_progress: Arc<TaskProgress>,
163        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
164    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
165        let compactor_iter_max_io_retry_times = self
166            .compactor
167            .context
168            .storage_opts
169            .compactor_iter_max_io_retry_times;
170        let mut table_iters = Vec::new();
171        for level in &self.compact_task.input_ssts {
172            if level.table_infos.is_empty() {
173                continue;
174            }
175
176            let tables = level
177                .table_infos
178                .iter()
179                .filter(|table_info| {
180                    let table_ids = &table_info.table_ids;
181                    let exist_table = table_ids
182                        .iter()
183                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
184
185                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
186                })
187                .cloned()
188                .collect_vec();
189            // Do not need to filter the table because manager has done it.
190            if level.level_type == LevelType::Nonoverlapping {
191                debug_assert!(can_concat(&level.table_infos));
192                table_iters.push(ConcatSstableIterator::new(
193                    self.compact_task.existing_table_ids.clone(),
194                    tables,
195                    self.compactor.task_config.key_range.clone(),
196                    self.sstable_store.clone(),
197                    task_progress.clone(),
198                    compactor_iter_max_io_retry_times,
199                ));
200            } else if tables.len()
201                > self
202                    .compactor
203                    .context
204                    .storage_opts
205                    .compactor_max_overlap_sst_count
206            {
207                let sst_groups = partition_overlapping_sstable_infos(tables);
208                tracing::warn!(
209                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
210                    level.table_infos.len(),
211                    sst_groups.len()
212                );
213                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
214                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
215                    assert!(
216                        full_key_can_concat(&table_infos),
217                        "sst_group idx {:?} table_infos: {:?}",
218                        idx,
219                        table_infos
220                    );
221                    table_iters.push(ConcatSstableIterator::new(
222                        self.compact_task.existing_table_ids.clone(),
223                        table_infos,
224                        self.compactor.task_config.key_range.clone(),
225                        self.sstable_store.clone(),
226                        task_progress.clone(),
227                        compactor_iter_max_io_retry_times,
228                    ));
229                }
230            } else {
231                for table_info in tables {
232                    table_iters.push(ConcatSstableIterator::new(
233                        self.compact_task.existing_table_ids.clone(),
234                        vec![table_info],
235                        self.compactor.task_config.key_range.clone(),
236                        self.sstable_store.clone(),
237                        task_progress.clone(),
238                        compactor_iter_max_io_retry_times,
239                    ));
240                }
241            }
242        }
243
244        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
245        // in https://github.com/risingwavelabs/risingwave/issues/13148
246        let combine_iter = {
247            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
248                MonitoredCompactorIterator::new(
249                    MergeIterator::for_compactor(table_iters),
250                    task_progress.clone(),
251                ),
252                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
253                    self.compact_task.pk_prefix_table_watermarks.clone(),
254                ),
255            );
256
257            NonPkPrefixSkipWatermarkIterator::new(
258                skip_watermark_iter,
259                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
260                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
261                    compaction_catalog_agent_ref,
262                ),
263            )
264        };
265
266        Ok(combine_iter)
267    }
268}
269
270pub fn partition_overlapping_sstable_infos(
271    mut origin_infos: Vec<SstableInfo>,
272) -> Vec<Vec<SstableInfo>> {
273    pub struct SstableGroup {
274        ssts: Vec<SstableInfo>,
275        max_right_bound: Bytes,
276    }
277
278    impl PartialEq for SstableGroup {
279        fn eq(&self, other: &SstableGroup) -> bool {
280            self.max_right_bound == other.max_right_bound
281        }
282    }
283    impl PartialOrd for SstableGroup {
284        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
285            Some(self.cmp(other))
286        }
287    }
288    impl Eq for SstableGroup {}
289    impl Ord for SstableGroup {
290        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
291            // Pick group with the smallest right bound for every new sstable.
292            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
293        }
294    }
295    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
296    origin_infos.sort_by(|a, b| {
297        let x = &a.key_range;
298        let y = &b.key_range;
299        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
300    });
301    for sst in origin_infos {
302        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
303        if let Some(mut prev_group) = groups.peek_mut()
304            && KeyComparator::encoded_full_key_less_than(
305                &prev_group.max_right_bound,
306                &sst.key_range.left,
307            )
308        {
309            prev_group.max_right_bound.clone_from(&sst.key_range.right);
310            prev_group.ssts.push(sst);
311            continue;
312        }
313        groups.push(SstableGroup {
314            max_right_bound: sst.key_range.right.clone(),
315            ssts: vec![sst],
316        });
317    }
318    assert!(!groups.is_empty());
319    groups.into_iter().map(|group| group.ssts).collect_vec()
320}
321
322/// Handles a compaction task and reports its status to hummock manager.
323/// Always return `Ok` and let hummock manager handle errors.
324pub async fn compact_with_agent(
325    compactor_context: CompactorContext,
326    mut compact_task: CompactTask,
327    mut shutdown_rx: Receiver<()>,
328    object_id_getter: Arc<dyn GetObjectId>,
329    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
330) -> (
331    (
332        CompactTask,
333        HashMap<u32, TableStats>,
334        HashMap<HummockSstableObjectId, u64>,
335    ),
336    Option<MemoryTracker>,
337) {
338    let context = compactor_context.clone();
339    let group_label = compact_task.compaction_group_id.to_string();
340    metrics_report_for_task(&compact_task, &context);
341
342    let timer = context
343        .compactor_metrics
344        .compact_task_duration
345        .with_label_values(&[
346            &group_label,
347            &compact_task.input_ssts[0].level_idx.to_string(),
348        ])
349        .start_timer();
350
351    let multi_filter = build_multi_compaction_filter(&compact_task);
352    let mut task_status = TaskStatus::Success;
353    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
354
355    if let Err(e) =
356        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
357    {
358        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
359        task_status = TaskStatus::ExecuteFailed;
360        return (
361            compact_done(compact_task, context.clone(), vec![], task_status),
362            None,
363        );
364    }
365
366    let compact_task_statistics = statistics_compact_task(&compact_task);
367    // Number of splits (key ranges) is equal to number of compaction tasks
368    let parallelism = compact_task.splits.len();
369    assert_ne!(parallelism, 0, "splits cannot be empty");
370    let mut output_ssts = Vec::with_capacity(parallelism);
371    let mut compaction_futures = vec![];
372    let mut abort_handles = vec![];
373    let task_progress_guard =
374        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
375
376    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
377
378    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
379        &compact_task,
380        (context.storage_opts.block_size_kb as u64) * (1 << 10),
381        context
382            .storage_opts
383            .object_store_config
384            .s3
385            .recv_buffer_size
386            .unwrap_or(6 * 1024 * 1024) as u64,
387        capacity as u64,
388    ) * compact_task.splits.len() as u64;
389
390    tracing::info!(
391        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
392        compact_task.task_id,
393        compact_task_statistics,
394        compact_task.compression_algorithm,
395        parallelism,
396        task_memory_capacity_with_parallelism,
397        optimize_by_copy_block,
398        compact_task_to_string(&compact_task),
399    );
400
401    // If the task does not have enough memory, it should cancel the task and let the meta
402    // reschedule it, so that it does not occupy the compactor's resources.
403    let memory_detector = context
404        .memory_limiter
405        .try_require_memory(task_memory_capacity_with_parallelism);
406    if memory_detector.is_none() {
407        tracing::warn!(
408            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
409            compact_task.task_id,
410            task_memory_capacity_with_parallelism,
411            context.memory_limiter.get_memory_usage(),
412            context.memory_limiter.quota()
413        );
414        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
415        return (
416            compact_done(compact_task, context.clone(), output_ssts, task_status),
417            memory_detector,
418        );
419    }
420
421    context.compactor_metrics.compact_task_pending_num.inc();
422    context
423        .compactor_metrics
424        .compact_task_pending_parallelism
425        .add(parallelism as _);
426    let _release_metrics_guard =
427        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
428            context.compactor_metrics.compact_task_pending_num.dec();
429            context
430                .compactor_metrics
431                .compact_task_pending_parallelism
432                .sub(parallelism as _);
433        });
434
435    if optimize_by_copy_block {
436        let runner = fast_compactor_runner::CompactorRunner::new(
437            context.clone(),
438            compact_task.clone(),
439            compaction_catalog_agent_ref.clone(),
440            object_id_getter.clone(),
441            task_progress_guard.progress.clone(),
442            multi_filter,
443        );
444
445        tokio::select! {
446            _ = &mut shutdown_rx => {
447                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
448                task_status = TaskStatus::ManualCanceled;
449            },
450
451            ret = runner.run() => {
452                match ret {
453                    Ok((ssts, statistics)) => {
454                        output_ssts.push((0, ssts, statistics));
455                    }
456                    Err(e) => {
457                        task_status = TaskStatus::ExecuteFailed;
458                        tracing::warn!(
459                            error = %e.as_report(),
460                            "Compaction task {} failed with error",
461                            compact_task.task_id,
462                        );
463                    }
464                }
465            }
466        }
467
468        // After a compaction is done, mutate the compaction task.
469        let (compact_task, table_stats, object_timestamps) =
470            compact_done(compact_task, context.clone(), output_ssts, task_status);
471        let cost_time = timer.stop_and_record() * 1000.0;
472        tracing::info!(
473            "Finished fast compaction task in {:?}ms: {}",
474            cost_time,
475            compact_task_to_string(&compact_task)
476        );
477        return (
478            (compact_task, table_stats, object_timestamps),
479            memory_detector,
480        );
481    }
482    for (split_index, _) in compact_task.splits.iter().enumerate() {
483        let filter = multi_filter.clone();
484        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
485        let compactor_runner = CompactorRunner::new(
486            split_index,
487            compactor_context.clone(),
488            compact_task.clone(),
489            object_id_getter.clone(),
490        );
491        let task_progress = task_progress_guard.progress.clone();
492        let runner = async move {
493            compactor_runner
494                .run(filter, compaction_catalog_agent_ref, task_progress)
495                .await
496        };
497        let traced = match context.await_tree_reg.as_ref() {
498            None => runner.right_future(),
499            Some(await_tree_reg) => await_tree_reg
500                .register(
501                    await_tree_key::CompactRunner {
502                        task_id: compact_task.task_id,
503                        split_index,
504                    },
505                    format!(
506                        "Compaction Task {} Split {} ",
507                        compact_task.task_id, split_index
508                    ),
509                )
510                .instrument(runner)
511                .left_future(),
512        };
513        let handle = tokio::spawn(traced);
514        abort_handles.push(handle.abort_handle());
515        compaction_futures.push(handle);
516    }
517
518    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
519    loop {
520        tokio::select! {
521            _ = &mut shutdown_rx => {
522                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
523                task_status = TaskStatus::ManualCanceled;
524                break;
525            }
526            future_result = buffered.next() => {
527                match future_result {
528                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
529                        output_ssts.push((split_index, ssts, compact_stat));
530                    }
531                    Some(Ok(Err(e))) => {
532                        task_status = TaskStatus::ExecuteFailed;
533                        tracing::warn!(
534                            error = %e.as_report(),
535                            "Compaction task {} failed with error",
536                            compact_task.task_id,
537                        );
538                        break;
539                    }
540                    Some(Err(e)) => {
541                        task_status = TaskStatus::JoinHandleFailed;
542                        tracing::warn!(
543                            error = %e.as_report(),
544                            "Compaction task {} failed with join handle error",
545                            compact_task.task_id,
546                        );
547                        break;
548                    }
549                    None => break,
550                }
551            }
552        }
553    }
554
555    if task_status != TaskStatus::Success {
556        for abort_handle in abort_handles {
557            abort_handle.abort();
558        }
559        output_ssts.clear();
560    }
561    // Sort by split/key range index.
562    if !output_ssts.is_empty() {
563        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
564    }
565
566    // After a compaction is done, mutate the compaction task.
567    let (compact_task, table_stats, object_timestamps) =
568        compact_done(compact_task, context.clone(), output_ssts, task_status);
569    let cost_time = timer.stop_and_record() * 1000.0;
570    tracing::info!(
571        "Finished compaction task in {:?}ms: {}",
572        cost_time,
573        compact_task_output_to_string(&compact_task)
574    );
575    (
576        (compact_task, table_stats, object_timestamps),
577        memory_detector,
578    )
579}
580
581/// Handles a compaction task and reports its status to hummock manager.
582/// Always return `Ok` and let hummock manager handle errors.
583pub async fn compact(
584    compactor_context: CompactorContext,
585    compact_task: CompactTask,
586    shutdown_rx: Receiver<()>,
587    object_id_getter: Arc<dyn GetObjectId>,
588    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
589) -> (
590    (
591        CompactTask,
592        HashMap<u32, TableStats>,
593        HashMap<HummockSstableObjectId, u64>,
594    ),
595    Option<MemoryTracker>,
596) {
597    let compact_table_ids = compact_task.build_compact_table_ids();
598    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
599        .acquire(compact_table_ids.clone())
600        .await
601    {
602        Ok(compaction_catalog_agent_ref) => {
603            let acquire_table_ids: HashSet<StateTableId> =
604                compaction_catalog_agent_ref.table_ids().collect();
605            if acquire_table_ids.len() != compact_table_ids.len() {
606                let diff = compact_table_ids
607                    .into_iter()
608                    .collect::<HashSet<_>>()
609                    .symmetric_difference(&acquire_table_ids)
610                    .cloned()
611                    .collect::<Vec<_>>();
612                tracing::warn!(
613                    dif= ?diff,
614                    "Some table ids are not acquired."
615                );
616                return (
617                    compact_done(
618                        compact_task,
619                        compactor_context.clone(),
620                        vec![],
621                        TaskStatus::ExecuteFailed,
622                    ),
623                    None,
624                );
625            }
626
627            compaction_catalog_agent_ref
628        }
629        Err(e) => {
630            tracing::warn!(
631                error = %e.as_report(),
632                "Failed to acquire compaction catalog agent"
633            );
634            return (
635                compact_done(
636                    compact_task,
637                    compactor_context.clone(),
638                    vec![],
639                    TaskStatus::ExecuteFailed,
640                ),
641                None,
642            );
643        }
644    };
645
646    compact_with_agent(
647        compactor_context,
648        compact_task,
649        shutdown_rx,
650        object_id_getter,
651        compaction_catalog_agent_ref,
652    )
653    .await
654}
655
656/// Fills in the compact task and tries to report the task result to meta node.
657pub(crate) fn compact_done(
658    mut compact_task: CompactTask,
659    context: CompactorContext,
660    output_ssts: Vec<CompactOutput>,
661    task_status: TaskStatus,
662) -> (
663    CompactTask,
664    HashMap<u32, TableStats>,
665    HashMap<HummockSstableObjectId, u64>,
666) {
667    let mut table_stats_map = TableStatsMap::default();
668    let mut object_timestamps = HashMap::default();
669    compact_task.task_status = task_status;
670    compact_task
671        .sorted_output_ssts
672        .reserve(compact_task.splits.len());
673    let mut compaction_write_bytes = 0;
674    for (
675        _,
676        ssts,
677        CompactionStatistics {
678            delta_drop_stat, ..
679        },
680    ) in output_ssts
681    {
682        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
683        for sst_info in ssts {
684            compaction_write_bytes += sst_info.file_size();
685            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
686            compact_task.sorted_output_ssts.push(sst_info.sst_info);
687        }
688    }
689
690    let group_label = compact_task.compaction_group_id.to_string();
691    let level_label = compact_task.target_level.to_string();
692    context
693        .compactor_metrics
694        .compact_write_bytes
695        .with_label_values(&[&group_label, &level_label])
696        .inc_by(compaction_write_bytes);
697    context
698        .compactor_metrics
699        .compact_write_sstn
700        .with_label_values(&[&group_label, &level_label])
701        .inc_by(compact_task.sorted_output_ssts.len() as u64);
702
703    (compact_task, table_stats_map, object_timestamps)
704}
705
706pub async fn compact_and_build_sst<F>(
707    sst_builder: &mut CapacitySplitTableBuilder<F>,
708    task_config: &TaskConfig,
709    compactor_metrics: Arc<CompactorMetrics>,
710    mut iter: impl HummockIterator<Direction = Forward>,
711    mut compaction_filter: impl CompactionFilter,
712) -> HummockResult<CompactionStatistics>
713where
714    F: TableBuilderFactory,
715{
716    if !task_config.key_range.left.is_empty() {
717        let full_key = FullKey::decode(&task_config.key_range.left);
718        iter.seek(full_key)
719            .instrument_await("iter_seek".verbose())
720            .await?;
721    } else {
722        iter.rewind().instrument_await("rewind".verbose()).await?;
723    };
724
725    let end_key = if task_config.key_range.right.is_empty() {
726        FullKey::default()
727    } else {
728        FullKey::decode(&task_config.key_range.right).to_vec()
729    };
730    let max_key = end_key.to_ref();
731
732    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
733    let mut local_stats = StoreLocalStatistic::default();
734
735    // Keep table stats changes due to dropping KV.
736    let mut table_stats_drop = TableStatsMap::default();
737    let mut last_table_stats = TableStats::default();
738    let mut last_table_id = None;
739    let mut compaction_statistics = CompactionStatistics::default();
740    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
741    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
742    let schemas: HashMap<u32, HashSet<i32>> = task_config
743        .table_schemas
744        .iter()
745        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
746        .collect();
747    while iter.is_valid() {
748        let iter_key = iter.key();
749        compaction_statistics.iter_total_key_counts += 1;
750
751        let is_new_user_key = full_key_tracker.observe(iter.key());
752        let mut drop = false;
753
754        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
755        let value = iter.value();
756        let ValueMeta {
757            object_id,
758            block_id,
759        } = iter.value_meta();
760        if is_new_user_key {
761            if !max_key.is_empty() && iter_key >= max_key {
762                break;
763            }
764            if value.is_delete() {
765                local_stats.skip_delete_key_count += 1;
766            }
767        } else {
768            local_stats.skip_multi_version_key_count += 1;
769        }
770
771        if last_table_id != Some(iter_key.user_key.table_id.table_id) {
772            if let Some(last_table_id) = last_table_id.take() {
773                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
774            }
775            last_table_id = Some(iter_key.user_key.table_id.table_id);
776        }
777
778        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
779        // In our design, frontend avoid to access keys which had be deleted, so we don't
780        // need to consider the epoch when the compaction_filter match (it
781        // means that mv had drop)
782        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
783        // Do not assume that "the epoch of keys behind must be smaller than the current key."
784        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
785            || (!task_config.retain_multiple_version && !is_new_user_key)
786        {
787            drop = true;
788        }
789
790        if !drop && compaction_filter.should_delete(iter_key) {
791            drop = true;
792        }
793
794        if drop {
795            compaction_statistics.iter_drop_key_counts += 1;
796
797            let should_count = match task_config.stats_target_table_ids.as_ref() {
798                Some(target_table_ids) => {
799                    target_table_ids.contains(&iter_key.user_key.table_id.table_id)
800                }
801                None => true,
802            };
803            if should_count {
804                last_table_stats.total_key_count -= 1;
805                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
806                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
807            }
808            iter.next()
809                .instrument_await("iter_next_in_drop".verbose())
810                .await?;
811            continue;
812        }
813
814        // May drop stale columns
815        let check_table_id = iter_key.user_key.table_id.table_id;
816        let mut is_value_rewritten = false;
817        if let HummockValue::Put(v) = value
818            && let Some(object_id) = object_id
819            && let Some(block_id) = block_id
820            && !skip_schema_check
821                .get(&object_id)
822                .map(|prev_block_id| {
823                    assert!(*prev_block_id <= block_id);
824                    *prev_block_id == block_id
825                })
826                .unwrap_or(false)
827            && let Some(schema) = schemas.get(&check_table_id)
828        {
829            let value_size = v.len();
830            match try_drop_invalid_columns(v, schema) {
831                None => {
832                    if !task_config.disable_drop_column_optimization {
833                        // Under the assumption that all values in the same (object, block) group should share the same schema,
834                        // if one value drops no columns during a compaction, no need to check other values in the same group.
835                        skip_schema_check.insert(object_id, block_id);
836                    }
837                }
838                Some(new_value) => {
839                    is_value_rewritten = true;
840                    let new_put = HummockValue::put(new_value.as_slice());
841                    sst_builder
842                        .add_full_key(iter_key, new_put, is_new_user_key)
843                        .instrument_await("add_rewritten_full_key".verbose())
844                        .await?;
845                    let value_size_change = value_size as i64 - new_value.len() as i64;
846                    assert!(value_size_change >= 0);
847                    last_table_stats.total_value_size -= value_size_change;
848                }
849            }
850        }
851
852        if !is_value_rewritten {
853            // Don't allow two SSTs to share same user key
854            sst_builder
855                .add_full_key(iter_key, value, is_new_user_key)
856                .instrument_await("add_full_key".verbose())
857                .await?;
858        }
859
860        iter.next().instrument_await("iter_next".verbose()).await?;
861    }
862
863    if let Some(last_table_id) = last_table_id.take() {
864        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
865    }
866    iter.collect_local_statistic(&mut local_stats);
867    add_table_stats_map(
868        &mut table_stats_drop,
869        &local_stats.skipped_by_watermark_table_stats,
870    );
871    local_stats.report_compactor(compactor_metrics.as_ref());
872    compaction_statistics.delta_drop_stat = table_stats_drop;
873
874    Ok(compaction_statistics)
875}
876
877#[cfg(test)]
878pub mod tests {
879    use risingwave_hummock_sdk::can_concat;
880
881    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
882    use crate::hummock::iterator::test_utils::mock_sstable_store;
883    use crate::hummock::test_utils::{
884        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
885    };
886    use crate::hummock::value::HummockValue;
887
888    #[tokio::test]
889    async fn test_partition_overlapping_level() {
890        const TEST_KEYS_COUNT: usize = 10;
891        let sstable_store = mock_sstable_store().await;
892        let mut table_infos = vec![];
893        for object_id in 0..10 {
894            let start_index = object_id * TEST_KEYS_COUNT;
895            let end_index = start_index + 2 * TEST_KEYS_COUNT;
896            let table_info = gen_test_sstable_info(
897                default_builder_opt_for_test(),
898                object_id as u64,
899                (start_index..end_index)
900                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
901                sstable_store.clone(),
902            )
903            .await;
904            table_infos.push(table_info);
905        }
906        let table_infos = partition_overlapping_sstable_infos(table_infos);
907        assert_eq!(table_infos.len(), 2);
908        for ssts in table_infos {
909            assert!(can_concat(&ssts));
910        }
911    }
912}