risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66    SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        let use_block_based_filter = task.should_use_block_based_filter();
93
94        let key_range = KeyRange {
95            left: task.splits[split_index].left.clone(),
96            right: task.splits[split_index].right.clone(),
97            right_exclusive: true,
98        };
99
100        let compactor = Compactor::new(
101            context.clone(),
102            options,
103            TaskConfig {
104                key_range: key_range.clone(),
105                cache_policy: CachePolicy::NotFill,
106                gc_delete_keys: task.gc_delete_keys,
107                retain_multiple_version: false,
108                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
109                task_type: task.task_type,
110                use_block_based_filter,
111                table_vnode_partition: task.table_vnode_partition.clone(),
112                table_schemas: task
113                    .table_schemas
114                    .iter()
115                    .map(|(k, v)| (*k, v.clone()))
116                    .collect(),
117                disable_drop_column_optimization: false,
118            },
119            object_id_getter,
120        );
121
122        Self {
123            compactor,
124            compact_task: task,
125            sstable_store: context.sstable_store,
126            key_range,
127            split_index,
128        }
129    }
130
131    pub async fn run(
132        &self,
133        compaction_filter: impl CompactionFilter,
134        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135        task_progress: Arc<TaskProgress>,
136    ) -> HummockResult<CompactOutput> {
137        let iter =
138            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139        let (ssts, compaction_stat) = self
140            .compactor
141            .compact_key_range(
142                iter,
143                compaction_filter,
144                compaction_catalog_agent_ref,
145                Some(task_progress),
146                Some(self.compact_task.task_id),
147                Some(self.split_index),
148            )
149            .await?;
150        Ok((self.split_index, ssts, compaction_stat))
151    }
152
153    /// Build the merge iterator based on the given input ssts.
154    fn build_sst_iter(
155        &self,
156        task_progress: Arc<TaskProgress>,
157        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159        let compactor_iter_max_io_retry_times = self
160            .compactor
161            .context
162            .storage_opts
163            .compactor_iter_max_io_retry_times;
164        let mut table_iters = Vec::new();
165        for level in &self.compact_task.input_ssts {
166            if level.table_infos.is_empty() {
167                continue;
168            }
169
170            let tables = level
171                .table_infos
172                .iter()
173                .filter(|table_info| {
174                    let table_ids = &table_info.table_ids;
175                    let exist_table = table_ids
176                        .iter()
177                        .any(|table_id| self.compact_task.existing_table_ids.contains(table_id));
178
179                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
180                })
181                .cloned()
182                .collect_vec();
183            // Do not need to filter the table because manager has done it.
184            if level.level_type == LevelType::Nonoverlapping {
185                debug_assert!(can_concat(&level.table_infos));
186                table_iters.push(ConcatSstableIterator::new(
187                    self.compact_task.existing_table_ids.clone(),
188                    tables,
189                    self.compactor.task_config.key_range.clone(),
190                    self.sstable_store.clone(),
191                    task_progress.clone(),
192                    compactor_iter_max_io_retry_times,
193                ));
194            } else if tables.len()
195                > self
196                    .compactor
197                    .context
198                    .storage_opts
199                    .compactor_max_overlap_sst_count
200            {
201                let sst_groups = partition_overlapping_sstable_infos(tables);
202                tracing::warn!(
203                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
204                    level.table_infos.len(),
205                    sst_groups.len()
206                );
207                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
208                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
209                    assert!(
210                        full_key_can_concat(&table_infos),
211                        "sst_group idx {:?} table_infos: {:?}",
212                        idx,
213                        table_infos
214                    );
215                    table_iters.push(ConcatSstableIterator::new(
216                        self.compact_task.existing_table_ids.clone(),
217                        table_infos,
218                        self.compactor.task_config.key_range.clone(),
219                        self.sstable_store.clone(),
220                        task_progress.clone(),
221                        compactor_iter_max_io_retry_times,
222                    ));
223                }
224            } else {
225                for table_info in tables {
226                    table_iters.push(ConcatSstableIterator::new(
227                        self.compact_task.existing_table_ids.clone(),
228                        vec![table_info],
229                        self.compactor.task_config.key_range.clone(),
230                        self.sstable_store.clone(),
231                        task_progress.clone(),
232                        compactor_iter_max_io_retry_times,
233                    ));
234                }
235            }
236        }
237
238        // // The `SkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
239        // // in https://github.com/risingwavelabs/risingwave/issues/13148
240        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
241        // in https://github.com/risingwavelabs/risingwave/issues/13148
242        let combine_iter = {
243            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
244                MonitoredCompactorIterator::new(
245                    MergeIterator::for_compactor(table_iters),
246                    task_progress,
247                ),
248                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
249                    self.compact_task.pk_prefix_table_watermarks.clone(),
250                ),
251            );
252
253            let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
254                skip_watermark_iter,
255                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
256                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
257                    compaction_catalog_agent_ref.clone(),
258                ),
259            );
260
261            ValueSkipWatermarkIterator::new(
262                pk_skip_watermark_iter,
263                ValueSkipWatermarkState::from_safe_epoch_watermarks(
264                    self.compact_task.value_table_watermarks.clone(),
265                    compaction_catalog_agent_ref,
266                ),
267            )
268        };
269
270        Ok(combine_iter)
271    }
272}
273
274pub fn partition_overlapping_sstable_infos(
275    mut origin_infos: Vec<SstableInfo>,
276) -> Vec<Vec<SstableInfo>> {
277    pub struct SstableGroup {
278        ssts: Vec<SstableInfo>,
279        max_right_bound: Bytes,
280    }
281
282    impl PartialEq for SstableGroup {
283        fn eq(&self, other: &SstableGroup) -> bool {
284            self.max_right_bound == other.max_right_bound
285        }
286    }
287    impl PartialOrd for SstableGroup {
288        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
289            Some(self.cmp(other))
290        }
291    }
292    impl Eq for SstableGroup {}
293    impl Ord for SstableGroup {
294        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
295            // Pick group with the smallest right bound for every new sstable.
296            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
297        }
298    }
299    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
300    origin_infos.sort_by(|a, b| {
301        let x = &a.key_range;
302        let y = &b.key_range;
303        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
304    });
305    for sst in origin_infos {
306        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
307        if let Some(mut prev_group) = groups.peek_mut()
308            && KeyComparator::encoded_full_key_less_than(
309                &prev_group.max_right_bound,
310                &sst.key_range.left,
311            )
312        {
313            prev_group.max_right_bound.clone_from(&sst.key_range.right);
314            prev_group.ssts.push(sst);
315            continue;
316        }
317        groups.push(SstableGroup {
318            max_right_bound: sst.key_range.right.clone(),
319            ssts: vec![sst],
320        });
321    }
322    assert!(!groups.is_empty());
323    groups.into_iter().map(|group| group.ssts).collect_vec()
324}
325
326/// Handles a compaction task and reports its status to hummock manager.
327/// Always return `Ok` and let hummock manager handle errors.
328pub async fn compact_with_agent(
329    compactor_context: CompactorContext,
330    mut compact_task: CompactTask,
331    mut shutdown_rx: Receiver<()>,
332    object_id_getter: Arc<dyn GetObjectId>,
333    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
334) -> (
335    (
336        CompactTask,
337        HashMap<TableId, TableStats>,
338        HashMap<HummockSstableObjectId, u64>,
339    ),
340    Option<MemoryTracker>,
341) {
342    let context = compactor_context.clone();
343    let group_label = compact_task.compaction_group_id.to_string();
344    metrics_report_for_task(&compact_task, &context);
345
346    let timer = context
347        .compactor_metrics
348        .compact_task_duration
349        .with_label_values(&[
350            &group_label,
351            &compact_task.input_ssts[0].level_idx.to_string(),
352        ])
353        .start_timer();
354
355    let multi_filter = build_multi_compaction_filter(&compact_task);
356    let mut task_status = TaskStatus::Success;
357    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
358
359    if let Err(e) =
360        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
361    {
362        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
363        task_status = TaskStatus::ExecuteFailed;
364        return (
365            compact_done(compact_task, context.clone(), vec![], task_status),
366            None,
367        );
368    }
369
370    let compact_task_statistics = statistics_compact_task(&compact_task);
371    // Number of splits (key ranges) is equal to number of compaction tasks
372    let parallelism = compact_task.splits.len();
373    assert_ne!(parallelism, 0, "splits cannot be empty");
374    let mut output_ssts = Vec::with_capacity(parallelism);
375    let mut compaction_futures = vec![];
376    let mut abort_handles = vec![];
377    let task_progress_guard =
378        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
379
380    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
381
382    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
383        &compact_task,
384        (context.storage_opts.block_size_kb as u64) * (1 << 10),
385        context
386            .storage_opts
387            .object_store_config
388            .s3
389            .recv_buffer_size
390            .unwrap_or(6 * 1024 * 1024) as u64,
391        capacity as u64,
392    ) * compact_task.splits.len() as u64;
393
394    tracing::info!(
395        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
396        compact_task.task_id,
397        compact_task_statistics,
398        compact_task.compression_algorithm,
399        parallelism,
400        task_memory_capacity_with_parallelism,
401        optimize_by_copy_block,
402        compact_task_to_string(&compact_task),
403    );
404
405    // If the task does not have enough memory, it should cancel the task and let the meta
406    // reschedule it, so that it does not occupy the compactor's resources.
407    let memory_detector = context
408        .memory_limiter
409        .try_require_memory(task_memory_capacity_with_parallelism);
410    if memory_detector.is_none() {
411        tracing::warn!(
412            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
413            compact_task.task_id,
414            task_memory_capacity_with_parallelism,
415            context.memory_limiter.get_memory_usage(),
416            context.memory_limiter.quota()
417        );
418        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
419        return (
420            compact_done(compact_task, context.clone(), output_ssts, task_status),
421            memory_detector,
422        );
423    }
424
425    context.compactor_metrics.compact_task_pending_num.inc();
426    context
427        .compactor_metrics
428        .compact_task_pending_parallelism
429        .add(parallelism as _);
430    let _release_metrics_guard =
431        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
432            context.compactor_metrics.compact_task_pending_num.dec();
433            context
434                .compactor_metrics
435                .compact_task_pending_parallelism
436                .sub(parallelism as _);
437        });
438
439    if optimize_by_copy_block {
440        let runner = fast_compactor_runner::CompactorRunner::new(
441            context.clone(),
442            compact_task.clone(),
443            compaction_catalog_agent_ref.clone(),
444            object_id_getter.clone(),
445            task_progress_guard.progress.clone(),
446            multi_filter,
447        );
448
449        tokio::select! {
450            _ = &mut shutdown_rx => {
451                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
452                task_status = TaskStatus::ManualCanceled;
453            },
454
455            ret = runner.run() => {
456                match ret {
457                    Ok((ssts, statistics)) => {
458                        output_ssts.push((0, ssts, statistics));
459                    }
460                    Err(e) => {
461                        task_status = TaskStatus::ExecuteFailed;
462                        tracing::warn!(
463                            error = %e.as_report(),
464                            "Compaction task {} failed with error",
465                            compact_task.task_id,
466                        );
467                    }
468                }
469            }
470        }
471
472        // After a compaction is done, mutate the compaction task.
473        let (compact_task, table_stats, object_timestamps) =
474            compact_done(compact_task, context.clone(), output_ssts, task_status);
475        let cost_time = timer.stop_and_record() * 1000.0;
476        tracing::info!(
477            "Finished fast compaction task in {:?}ms: {}",
478            cost_time,
479            compact_task_to_string(&compact_task)
480        );
481        return (
482            (compact_task, table_stats, object_timestamps),
483            memory_detector,
484        );
485    }
486    for (split_index, _) in compact_task.splits.iter().enumerate() {
487        let filter = multi_filter.clone();
488        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
489        let compactor_runner = CompactorRunner::new(
490            split_index,
491            compactor_context.clone(),
492            compact_task.clone(),
493            object_id_getter.clone(),
494        );
495        let task_progress = task_progress_guard.progress.clone();
496        let runner = async move {
497            compactor_runner
498                .run(filter, compaction_catalog_agent_ref, task_progress)
499                .await
500        };
501        let traced = match context.await_tree_reg.as_ref() {
502            None => runner.right_future(),
503            Some(await_tree_reg) => await_tree_reg
504                .register(
505                    await_tree_key::CompactRunner {
506                        task_id: compact_task.task_id,
507                        split_index,
508                    },
509                    format!(
510                        "Compaction Task {} Split {} ",
511                        compact_task.task_id, split_index
512                    ),
513                )
514                .instrument(runner)
515                .left_future(),
516        };
517        let handle = tokio::spawn(traced);
518        abort_handles.push(handle.abort_handle());
519        compaction_futures.push(handle);
520    }
521
522    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
523    loop {
524        tokio::select! {
525            _ = &mut shutdown_rx => {
526                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
527                task_status = TaskStatus::ManualCanceled;
528                break;
529            }
530            future_result = buffered.next() => {
531                match future_result {
532                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
533                        output_ssts.push((split_index, ssts, compact_stat));
534                    }
535                    Some(Ok(Err(e))) => {
536                        task_status = TaskStatus::ExecuteFailed;
537                        tracing::warn!(
538                            error = %e.as_report(),
539                            "Compaction task {} failed with error",
540                            compact_task.task_id,
541                        );
542                        break;
543                    }
544                    Some(Err(e)) => {
545                        task_status = TaskStatus::JoinHandleFailed;
546                        tracing::warn!(
547                            error = %e.as_report(),
548                            "Compaction task {} failed with join handle error",
549                            compact_task.task_id,
550                        );
551                        break;
552                    }
553                    None => break,
554                }
555            }
556        }
557    }
558
559    if task_status != TaskStatus::Success {
560        for abort_handle in abort_handles {
561            abort_handle.abort();
562        }
563        output_ssts.clear();
564    }
565    // Sort by split/key range index.
566    if !output_ssts.is_empty() {
567        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
568    }
569
570    // After a compaction is done, mutate the compaction task.
571    let (compact_task, table_stats, object_timestamps) =
572        compact_done(compact_task, context.clone(), output_ssts, task_status);
573    let cost_time = timer.stop_and_record() * 1000.0;
574    tracing::info!(
575        "Finished compaction task in {:?}ms: {}",
576        cost_time,
577        compact_task_output_to_string(&compact_task)
578    );
579    (
580        (compact_task, table_stats, object_timestamps),
581        memory_detector,
582    )
583}
584
585/// Handles a compaction task and reports its status to hummock manager.
586/// Always return `Ok` and let hummock manager handle errors.
587pub async fn compact(
588    compactor_context: CompactorContext,
589    mut compact_task: CompactTask,
590    shutdown_rx: Receiver<()>,
591    object_id_getter: Arc<dyn GetObjectId>,
592    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
593) -> (
594    (
595        CompactTask,
596        HashMap<TableId, TableStats>,
597        HashMap<HummockSstableObjectId, u64>,
598    ),
599    Option<MemoryTracker>,
600) {
601    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
602    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
603        .acquire(table_ids_to_be_compacted.clone())
604        .await
605    {
606        Ok(compaction_catalog_agent_ref) => {
607            let acquire_table_ids: HashSet<StateTableId> =
608                compaction_catalog_agent_ref.table_ids().collect();
609            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
610                let diff = table_ids_to_be_compacted
611                    .into_iter()
612                    .collect::<HashSet<_>>()
613                    .symmetric_difference(&acquire_table_ids)
614                    .cloned()
615                    .collect::<Vec<_>>();
616                tracing::warn!(
617                    dif= ?diff,
618                    "Some table ids are not acquired."
619                );
620                return (
621                    compact_done(
622                        compact_task,
623                        compactor_context.clone(),
624                        vec![],
625                        TaskStatus::ExecuteFailed,
626                    ),
627                    None,
628                );
629            }
630
631            compaction_catalog_agent_ref
632        }
633        Err(e) => {
634            tracing::warn!(
635                error = %e.as_report(),
636                "Failed to acquire compaction catalog agent"
637            );
638            return (
639                compact_done(
640                    compact_task,
641                    compactor_context.clone(),
642                    vec![],
643                    TaskStatus::ExecuteFailed,
644                ),
645                None,
646            );
647        }
648    };
649
650    // rewrite compact_task watermarks
651    {
652        compact_task
653            .pk_prefix_table_watermarks
654            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
655
656        compact_task
657            .non_pk_prefix_table_watermarks
658            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
659    }
660
661    compact_with_agent(
662        compactor_context,
663        compact_task,
664        shutdown_rx,
665        object_id_getter,
666        compaction_catalog_agent_ref,
667    )
668    .await
669}
670
671/// Fills in the compact task and tries to report the task result to meta node.
672pub(crate) fn compact_done(
673    mut compact_task: CompactTask,
674    context: CompactorContext,
675    output_ssts: Vec<CompactOutput>,
676    task_status: TaskStatus,
677) -> (
678    CompactTask,
679    HashMap<TableId, TableStats>,
680    HashMap<HummockSstableObjectId, u64>,
681) {
682    let mut table_stats_map = TableStatsMap::default();
683    let mut object_timestamps = HashMap::default();
684    compact_task.task_status = task_status;
685    compact_task
686        .sorted_output_ssts
687        .reserve(compact_task.splits.len());
688    let mut compaction_write_bytes = 0;
689    for (
690        _,
691        ssts,
692        CompactionStatistics {
693            delta_drop_stat, ..
694        },
695    ) in output_ssts
696    {
697        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
698        for sst_info in ssts {
699            compaction_write_bytes += sst_info.file_size();
700            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
701            compact_task.sorted_output_ssts.push(sst_info.sst_info);
702        }
703    }
704
705    let group_label = compact_task.compaction_group_id.to_string();
706    let level_label = compact_task.target_level.to_string();
707    context
708        .compactor_metrics
709        .compact_write_bytes
710        .with_label_values(&[&group_label, &level_label])
711        .inc_by(compaction_write_bytes);
712    context
713        .compactor_metrics
714        .compact_write_sstn
715        .with_label_values(&[&group_label, &level_label])
716        .inc_by(compact_task.sorted_output_ssts.len() as u64);
717
718    (compact_task, table_stats_map, object_timestamps)
719}
720
721pub async fn compact_and_build_sst<F>(
722    sst_builder: &mut CapacitySplitTableBuilder<F>,
723    task_config: &TaskConfig,
724    compactor_metrics: Arc<CompactorMetrics>,
725    mut iter: impl HummockIterator<Direction = Forward>,
726    mut compaction_filter: impl CompactionFilter,
727) -> HummockResult<CompactionStatistics>
728where
729    F: TableBuilderFactory,
730{
731    if !task_config.key_range.left.is_empty() {
732        let full_key = FullKey::decode(&task_config.key_range.left);
733        iter.seek(full_key)
734            .instrument_await("iter_seek".verbose())
735            .await?;
736    } else {
737        iter.rewind().instrument_await("rewind".verbose()).await?;
738    };
739
740    let end_key = if task_config.key_range.right.is_empty() {
741        FullKey::default()
742    } else {
743        FullKey::decode(&task_config.key_range.right).to_vec()
744    };
745    let max_key = end_key.to_ref();
746
747    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
748    let mut local_stats = StoreLocalStatistic::default();
749
750    // Keep table stats changes due to dropping KV.
751    let mut table_stats_drop = TableStatsMap::default();
752    let mut last_table_stats = TableStats::default();
753    let mut last_table_id = None;
754    let mut compaction_statistics = CompactionStatistics::default();
755    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
756    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
757    let schemas: HashMap<TableId, HashSet<i32>> = task_config
758        .table_schemas
759        .iter()
760        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
761        .collect();
762    while iter.is_valid() {
763        let iter_key = iter.key();
764        compaction_statistics.iter_total_key_counts += 1;
765
766        let is_new_user_key = full_key_tracker.observe(iter.key());
767        let mut drop = false;
768
769        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
770        let value = iter.value();
771        let ValueMeta {
772            object_id,
773            block_id,
774        } = iter.value_meta();
775        if is_new_user_key {
776            if !max_key.is_empty() && iter_key >= max_key {
777                break;
778            }
779            if value.is_delete() {
780                local_stats.skip_delete_key_count += 1;
781            }
782        } else {
783            local_stats.skip_multi_version_key_count += 1;
784        }
785
786        if last_table_id != Some(iter_key.user_key.table_id) {
787            if let Some(last_table_id) = last_table_id.take() {
788                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
789            }
790            last_table_id = Some(iter_key.user_key.table_id);
791        }
792
793        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
794        // In our design, frontend avoid to access keys which had be deleted, so we don't
795        // need to consider the epoch when the compaction_filter match (it
796        // means that mv had drop)
797        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
798        // Do not assume that "the epoch of keys behind must be smaller than the current key."
799        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
800            || (!task_config.retain_multiple_version && !is_new_user_key)
801        {
802            drop = true;
803        }
804
805        if !drop && compaction_filter.should_delete(iter_key) {
806            drop = true;
807        }
808
809        if drop {
810            compaction_statistics.iter_drop_key_counts += 1;
811
812            let should_count = match task_config.stats_target_table_ids.as_ref() {
813                Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
814                None => true,
815            };
816            if should_count {
817                last_table_stats.total_key_count -= 1;
818                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
819                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
820            }
821            iter.next()
822                .instrument_await("iter_next_in_drop".verbose())
823                .await?;
824            continue;
825        }
826
827        // May drop stale columns
828        let check_table_id = iter_key.user_key.table_id;
829        let mut is_value_rewritten = false;
830        if let HummockValue::Put(v) = value
831            && let Some(object_id) = object_id
832            && let Some(block_id) = block_id
833            && !skip_schema_check
834                .get(&object_id)
835                .map(|prev_block_id| {
836                    assert!(*prev_block_id <= block_id);
837                    *prev_block_id == block_id
838                })
839                .unwrap_or(false)
840            && let Some(schema) = schemas.get(&check_table_id)
841        {
842            let value_size = v.len();
843            match try_drop_invalid_columns(v, schema) {
844                None => {
845                    if !task_config.disable_drop_column_optimization {
846                        // Under the assumption that all values in the same (object, block) group should share the same schema,
847                        // if one value drops no columns during a compaction, no need to check other values in the same group.
848                        skip_schema_check.insert(object_id, block_id);
849                    }
850                }
851                Some(new_value) => {
852                    is_value_rewritten = true;
853                    let new_put = HummockValue::put(new_value.as_slice());
854                    sst_builder
855                        .add_full_key(iter_key, new_put, is_new_user_key)
856                        .instrument_await("add_rewritten_full_key".verbose())
857                        .await?;
858                    let value_size_change = value_size as i64 - new_value.len() as i64;
859                    assert!(value_size_change >= 0);
860                    last_table_stats.total_value_size -= value_size_change;
861                }
862            }
863        }
864
865        if !is_value_rewritten {
866            // Don't allow two SSTs to share same user key
867            sst_builder
868                .add_full_key(iter_key, value, is_new_user_key)
869                .instrument_await("add_full_key".verbose())
870                .await?;
871        }
872
873        iter.next().instrument_await("iter_next".verbose()).await?;
874    }
875
876    if let Some(last_table_id) = last_table_id.take() {
877        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
878    }
879    iter.collect_local_statistic(&mut local_stats);
880    add_table_stats_map(
881        &mut table_stats_drop,
882        &local_stats.skipped_by_watermark_table_stats,
883    );
884    local_stats.report_compactor(compactor_metrics.as_ref());
885    compaction_statistics.delta_drop_stat = table_stats_drop;
886
887    Ok(compaction_statistics)
888}
889
890#[cfg(test)]
891pub mod tests {
892    use risingwave_hummock_sdk::can_concat;
893
894    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
895    use crate::hummock::iterator::test_utils::mock_sstable_store;
896    use crate::hummock::test_utils::{
897        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
898    };
899    use crate::hummock::value::HummockValue;
900
901    #[tokio::test]
902    async fn test_partition_overlapping_level() {
903        const TEST_KEYS_COUNT: usize = 10;
904        let sstable_store = mock_sstable_store().await;
905        let mut table_infos = vec![];
906        for object_id in 0..10 {
907            let start_index = object_id * TEST_KEYS_COUNT;
908            let end_index = start_index + 2 * TEST_KEYS_COUNT;
909            let table_info = gen_test_sstable_info(
910                default_builder_opt_for_test(),
911                object_id as u64,
912                (start_index..end_index)
913                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
914                sstable_store.clone(),
915            )
916            .await;
917            table_infos.push(table_info);
918        }
919        let table_infos = partition_overlapping_sstable_infos(table_infos);
920        assert_eq!(table_infos.len(), 2);
921        for ssts in table_infos {
922            assert!(can_concat(&ssts));
923        }
924    }
925}