risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66    SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
93        let use_block_based_filter = task.should_use_block_based_filter();
94
95        let key_range = KeyRange {
96            left: task.splits[split_index].left.clone(),
97            right: task.splits[split_index].right.clone(),
98            right_exclusive: true,
99        };
100
101        let compactor = Compactor::new(
102            context.clone(),
103            options,
104            TaskConfig {
105                key_range: key_range.clone(),
106                cache_policy: CachePolicy::NotFill,
107                gc_delete_keys: task.gc_delete_keys,
108                retain_multiple_version: false,
109                stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())),
110                use_block_based_filter,
111                table_vnode_partition: task.table_vnode_partition.clone(),
112                table_schemas: task
113                    .table_schemas
114                    .iter()
115                    .map(|(k, v)| (*k, v.clone()))
116                    .collect(),
117                disable_drop_column_optimization: false,
118            },
119            object_id_getter,
120        );
121
122        Self {
123            compactor,
124            compact_task: task,
125            sstable_store: context.sstable_store,
126            key_range,
127            split_index,
128        }
129    }
130
131    pub async fn run(
132        &self,
133        compaction_filter: impl CompactionFilter,
134        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
135        task_progress: Arc<TaskProgress>,
136    ) -> HummockResult<CompactOutput> {
137        let iter =
138            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
139        let (ssts, compaction_stat) = self
140            .compactor
141            .compact_key_range(
142                iter,
143                compaction_filter,
144                compaction_catalog_agent_ref,
145                Some(task_progress),
146                Some(self.compact_task.task_id),
147                Some(self.split_index),
148            )
149            .await?;
150        Ok((self.split_index, ssts, compaction_stat))
151    }
152
153    /// Build the merge iterator based on the given input ssts.
154    fn build_sst_iter(
155        &self,
156        task_progress: Arc<TaskProgress>,
157        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
158    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
159        let compactor_iter_max_io_retry_times = self
160            .compactor
161            .context
162            .storage_opts
163            .compactor_iter_max_io_retry_times;
164        let mut table_iters = Vec::new();
165        let compact_table_ids = self.compact_task.build_compact_table_ids();
166
167        for level in &self.compact_task.input_ssts {
168            if level.table_infos.is_empty() {
169                continue;
170            }
171
172            let tables = level
173                .table_infos
174                .iter()
175                .filter(|table_info| {
176                    let table_ids = &table_info.table_ids;
177                    let exist_table = table_ids
178                        .iter()
179                        .any(|table_id| compact_table_ids.contains(table_id));
180
181                    self.key_range.full_key_overlap(&table_info.key_range) && exist_table
182                })
183                .cloned()
184                .collect_vec();
185            // Do not need to filter the table because manager has done it.
186            if level.level_type == LevelType::Nonoverlapping {
187                debug_assert!(can_concat(&level.table_infos));
188                table_iters.push(ConcatSstableIterator::new(
189                    compact_table_ids.clone(),
190                    tables,
191                    self.compactor.task_config.key_range.clone(),
192                    self.sstable_store.clone(),
193                    task_progress.clone(),
194                    compactor_iter_max_io_retry_times,
195                ));
196            } else if tables.len()
197                > self
198                    .compactor
199                    .context
200                    .storage_opts
201                    .compactor_max_overlap_sst_count
202            {
203                let sst_groups = partition_overlapping_sstable_infos(tables);
204                tracing::warn!(
205                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
206                    level.table_infos.len(),
207                    sst_groups.len()
208                );
209                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
210                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
211                    assert!(
212                        full_key_can_concat(&table_infos),
213                        "sst_group idx {:?} table_infos: {:?}",
214                        idx,
215                        table_infos
216                    );
217                    table_iters.push(ConcatSstableIterator::new(
218                        compact_table_ids.clone(),
219                        table_infos,
220                        self.compactor.task_config.key_range.clone(),
221                        self.sstable_store.clone(),
222                        task_progress.clone(),
223                        compactor_iter_max_io_retry_times,
224                    ));
225                }
226            } else {
227                for table_info in tables {
228                    table_iters.push(ConcatSstableIterator::new(
229                        compact_table_ids.clone(),
230                        vec![table_info],
231                        self.compactor.task_config.key_range.clone(),
232                        self.sstable_store.clone(),
233                        task_progress.clone(),
234                        compactor_iter_max_io_retry_times,
235                    ));
236                }
237            }
238        }
239
240        // // The `SkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
241        // // in https://github.com/risingwavelabs/risingwave/issues/13148
242        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
243        // in https://github.com/risingwavelabs/risingwave/issues/13148
244        let combine_iter = {
245            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
246                MonitoredCompactorIterator::new(
247                    MergeIterator::for_compactor(table_iters),
248                    task_progress,
249                ),
250                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
251                    self.compact_task.pk_prefix_table_watermarks.clone(),
252                ),
253            );
254
255            let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
256                skip_watermark_iter,
257                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
258                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
259                    compaction_catalog_agent_ref.clone(),
260                ),
261            );
262
263            ValueSkipWatermarkIterator::new(
264                pk_skip_watermark_iter,
265                ValueSkipWatermarkState::from_safe_epoch_watermarks(
266                    self.compact_task.value_table_watermarks.clone(),
267                    compaction_catalog_agent_ref,
268                ),
269            )
270        };
271
272        Ok(combine_iter)
273    }
274}
275
276pub fn partition_overlapping_sstable_infos(
277    mut origin_infos: Vec<SstableInfo>,
278) -> Vec<Vec<SstableInfo>> {
279    pub struct SstableGroup {
280        ssts: Vec<SstableInfo>,
281        max_right_bound: Bytes,
282    }
283
284    impl PartialEq for SstableGroup {
285        fn eq(&self, other: &SstableGroup) -> bool {
286            self.max_right_bound == other.max_right_bound
287        }
288    }
289    impl PartialOrd for SstableGroup {
290        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
291            Some(self.cmp(other))
292        }
293    }
294    impl Eq for SstableGroup {}
295    impl Ord for SstableGroup {
296        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
297            // Pick group with the smallest right bound for every new sstable.
298            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
299        }
300    }
301    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
302    origin_infos.sort_by(|a, b| {
303        let x = &a.key_range;
304        let y = &b.key_range;
305        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
306    });
307    for sst in origin_infos {
308        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
309        if let Some(mut prev_group) = groups.peek_mut()
310            && KeyComparator::encoded_full_key_less_than(
311                &prev_group.max_right_bound,
312                &sst.key_range.left,
313            )
314        {
315            prev_group.max_right_bound.clone_from(&sst.key_range.right);
316            prev_group.ssts.push(sst);
317            continue;
318        }
319        groups.push(SstableGroup {
320            max_right_bound: sst.key_range.right.clone(),
321            ssts: vec![sst],
322        });
323    }
324    assert!(!groups.is_empty());
325    groups.into_iter().map(|group| group.ssts).collect_vec()
326}
327
328/// Handles a compaction task and reports its status to hummock manager.
329/// Always return `Ok` and let hummock manager handle errors.
330pub async fn compact_with_agent(
331    compactor_context: CompactorContext,
332    mut compact_task: CompactTask,
333    mut shutdown_rx: Receiver<()>,
334    object_id_getter: Arc<dyn GetObjectId>,
335    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
336) -> (
337    (
338        CompactTask,
339        HashMap<TableId, TableStats>,
340        HashMap<HummockSstableObjectId, u64>,
341    ),
342    Option<MemoryTracker>,
343) {
344    let context = compactor_context.clone();
345    let group_label = compact_task.compaction_group_id.to_string();
346    metrics_report_for_task(&compact_task, &context);
347
348    let timer = context
349        .compactor_metrics
350        .compact_task_duration
351        .with_label_values(&[
352            &group_label,
353            &compact_task.input_ssts[0].level_idx.to_string(),
354        ])
355        .start_timer();
356
357    let multi_filter = build_multi_compaction_filter(&compact_task);
358    let mut task_status = TaskStatus::Success;
359    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
360
361    if let Err(e) =
362        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
363    {
364        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
365        task_status = TaskStatus::ExecuteFailed;
366        return (
367            compact_done(compact_task, context.clone(), vec![], task_status),
368            None,
369        );
370    }
371
372    let compact_task_statistics = statistics_compact_task(&compact_task);
373    // Number of splits (key ranges) is equal to number of compaction tasks
374    let parallelism = compact_task.splits.len();
375    assert_ne!(parallelism, 0, "splits cannot be empty");
376    let mut output_ssts = Vec::with_capacity(parallelism);
377    let mut compaction_futures = vec![];
378    let mut abort_handles = vec![];
379    let task_progress_guard =
380        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
381
382    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
383
384    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
385        &compact_task,
386        (context.storage_opts.block_size_kb as u64) * (1 << 10),
387        context
388            .storage_opts
389            .object_store_config
390            .s3
391            .recv_buffer_size
392            .unwrap_or(6 * 1024 * 1024) as u64,
393        capacity as u64,
394    ) * compact_task.splits.len() as u64;
395
396    tracing::info!(
397        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
398        compact_task.task_id,
399        compact_task_statistics,
400        compact_task.compression_algorithm,
401        parallelism,
402        task_memory_capacity_with_parallelism,
403        optimize_by_copy_block,
404        compact_task_to_string(&compact_task),
405    );
406
407    // If the task does not have enough memory, it should cancel the task and let the meta
408    // reschedule it, so that it does not occupy the compactor's resources.
409    let memory_detector = context
410        .memory_limiter
411        .try_require_memory(task_memory_capacity_with_parallelism);
412    if memory_detector.is_none() {
413        tracing::warn!(
414            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
415            compact_task.task_id,
416            task_memory_capacity_with_parallelism,
417            context.memory_limiter.get_memory_usage(),
418            context.memory_limiter.quota()
419        );
420        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
421        return (
422            compact_done(compact_task, context.clone(), output_ssts, task_status),
423            memory_detector,
424        );
425    }
426
427    context.compactor_metrics.compact_task_pending_num.inc();
428    context
429        .compactor_metrics
430        .compact_task_pending_parallelism
431        .add(parallelism as _);
432    let _release_metrics_guard =
433        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
434            context.compactor_metrics.compact_task_pending_num.dec();
435            context
436                .compactor_metrics
437                .compact_task_pending_parallelism
438                .sub(parallelism as _);
439        });
440
441    if optimize_by_copy_block {
442        let runner = fast_compactor_runner::CompactorRunner::new(
443            context.clone(),
444            compact_task.clone(),
445            compaction_catalog_agent_ref.clone(),
446            object_id_getter.clone(),
447            task_progress_guard.progress.clone(),
448            multi_filter,
449        );
450
451        tokio::select! {
452            _ = &mut shutdown_rx => {
453                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
454                task_status = TaskStatus::ManualCanceled;
455            },
456
457            ret = runner.run() => {
458                match ret {
459                    Ok((ssts, statistics)) => {
460                        output_ssts.push((0, ssts, statistics));
461                    }
462                    Err(e) => {
463                        task_status = TaskStatus::ExecuteFailed;
464                        tracing::warn!(
465                            error = %e.as_report(),
466                            "Compaction task {} failed with error",
467                            compact_task.task_id,
468                        );
469                    }
470                }
471            }
472        }
473
474        // After a compaction is done, mutate the compaction task.
475        let (compact_task, table_stats, object_timestamps) =
476            compact_done(compact_task, context.clone(), output_ssts, task_status);
477        let cost_time = timer.stop_and_record() * 1000.0;
478        tracing::info!(
479            "Finished fast compaction task in {:?}ms: {}",
480            cost_time,
481            compact_task_to_string(&compact_task)
482        );
483        return (
484            (compact_task, table_stats, object_timestamps),
485            memory_detector,
486        );
487    }
488    for (split_index, _) in compact_task.splits.iter().enumerate() {
489        let filter = multi_filter.clone();
490        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
491        let compactor_runner = CompactorRunner::new(
492            split_index,
493            compactor_context.clone(),
494            compact_task.clone(),
495            object_id_getter.clone(),
496        );
497        let task_progress = task_progress_guard.progress.clone();
498        let runner = async move {
499            compactor_runner
500                .run(filter, compaction_catalog_agent_ref, task_progress)
501                .await
502        };
503        let traced = match context.await_tree_reg.as_ref() {
504            None => runner.right_future(),
505            Some(await_tree_reg) => await_tree_reg
506                .register(
507                    await_tree_key::CompactRunner {
508                        task_id: compact_task.task_id,
509                        split_index,
510                    },
511                    format!(
512                        "Compaction Task {} Split {} ",
513                        compact_task.task_id, split_index
514                    ),
515                )
516                .instrument(runner)
517                .left_future(),
518        };
519        let handle = tokio::spawn(traced);
520        abort_handles.push(handle.abort_handle());
521        compaction_futures.push(handle);
522    }
523
524    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
525    loop {
526        tokio::select! {
527            _ = &mut shutdown_rx => {
528                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
529                task_status = TaskStatus::ManualCanceled;
530                break;
531            }
532            future_result = buffered.next() => {
533                match future_result {
534                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
535                        output_ssts.push((split_index, ssts, compact_stat));
536                    }
537                    Some(Ok(Err(e))) => {
538                        task_status = TaskStatus::ExecuteFailed;
539                        tracing::warn!(
540                            error = %e.as_report(),
541                            "Compaction task {} failed with error",
542                            compact_task.task_id,
543                        );
544                        break;
545                    }
546                    Some(Err(e)) => {
547                        task_status = TaskStatus::JoinHandleFailed;
548                        tracing::warn!(
549                            error = %e.as_report(),
550                            "Compaction task {} failed with join handle error",
551                            compact_task.task_id,
552                        );
553                        break;
554                    }
555                    None => break,
556                }
557            }
558        }
559    }
560
561    if task_status != TaskStatus::Success {
562        for abort_handle in abort_handles {
563            abort_handle.abort();
564        }
565        output_ssts.clear();
566    }
567    // Sort by split/key range index.
568    if !output_ssts.is_empty() {
569        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
570    }
571
572    // After a compaction is done, mutate the compaction task.
573    let (compact_task, table_stats, object_timestamps) =
574        compact_done(compact_task, context.clone(), output_ssts, task_status);
575    let cost_time = timer.stop_and_record() * 1000.0;
576    tracing::info!(
577        "Finished compaction task in {:?}ms: {}",
578        cost_time,
579        compact_task_output_to_string(&compact_task)
580    );
581    (
582        (compact_task, table_stats, object_timestamps),
583        memory_detector,
584    )
585}
586
587/// Handles a compaction task and reports its status to hummock manager.
588/// Always return `Ok` and let hummock manager handle errors.
589pub async fn compact(
590    compactor_context: CompactorContext,
591    mut compact_task: CompactTask,
592    shutdown_rx: Receiver<()>,
593    object_id_getter: Arc<dyn GetObjectId>,
594    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
595) -> (
596    (
597        CompactTask,
598        HashMap<TableId, TableStats>,
599        HashMap<HummockSstableObjectId, u64>,
600    ),
601    Option<MemoryTracker>,
602) {
603    let table_ids_to_be_compacted = compact_task.build_compact_table_ids();
604    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
605        .acquire(table_ids_to_be_compacted.clone())
606        .await
607    {
608        Ok(compaction_catalog_agent_ref) => {
609            let acquire_table_ids: HashSet<StateTableId> =
610                compaction_catalog_agent_ref.table_ids().collect();
611            if acquire_table_ids.len() != table_ids_to_be_compacted.len() {
612                let diff = table_ids_to_be_compacted
613                    .into_iter()
614                    .collect::<HashSet<_>>()
615                    .symmetric_difference(&acquire_table_ids)
616                    .cloned()
617                    .collect::<Vec<_>>();
618                tracing::warn!(
619                    dif= ?diff,
620                    "Some table ids are not acquired."
621                );
622                return (
623                    compact_done(
624                        compact_task,
625                        compactor_context.clone(),
626                        vec![],
627                        TaskStatus::ExecuteFailed,
628                    ),
629                    None,
630                );
631            }
632
633            compaction_catalog_agent_ref
634        }
635        Err(e) => {
636            tracing::warn!(
637                error = %e.as_report(),
638                "Failed to acquire compaction catalog agent"
639            );
640            return (
641                compact_done(
642                    compact_task,
643                    compactor_context.clone(),
644                    vec![],
645                    TaskStatus::ExecuteFailed,
646                ),
647                None,
648            );
649        }
650    };
651
652    // rewrite compact_task watermarks
653    {
654        compact_task
655            .pk_prefix_table_watermarks
656            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
657
658        compact_task
659            .non_pk_prefix_table_watermarks
660            .retain(|table_id, _| table_ids_to_be_compacted.contains(table_id));
661    }
662
663    compact_with_agent(
664        compactor_context,
665        compact_task,
666        shutdown_rx,
667        object_id_getter,
668        compaction_catalog_agent_ref,
669    )
670    .await
671}
672
673/// Fills in the compact task and tries to report the task result to meta node.
674pub(crate) fn compact_done(
675    mut compact_task: CompactTask,
676    context: CompactorContext,
677    output_ssts: Vec<CompactOutput>,
678    task_status: TaskStatus,
679) -> (
680    CompactTask,
681    HashMap<TableId, TableStats>,
682    HashMap<HummockSstableObjectId, u64>,
683) {
684    let mut table_stats_map = TableStatsMap::default();
685    let mut object_timestamps = HashMap::default();
686    compact_task.task_status = task_status;
687    compact_task
688        .sorted_output_ssts
689        .reserve(compact_task.splits.len());
690    let mut compaction_write_bytes = 0;
691    for (
692        _,
693        ssts,
694        CompactionStatistics {
695            delta_drop_stat, ..
696        },
697    ) in output_ssts
698    {
699        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
700        for sst_info in ssts {
701            compaction_write_bytes += sst_info.file_size();
702            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
703            compact_task.sorted_output_ssts.push(sst_info.sst_info);
704        }
705    }
706
707    let group_label = compact_task.compaction_group_id.to_string();
708    let level_label = compact_task.target_level.to_string();
709    context
710        .compactor_metrics
711        .compact_write_bytes
712        .with_label_values(&[&group_label, &level_label])
713        .inc_by(compaction_write_bytes);
714    context
715        .compactor_metrics
716        .compact_write_sstn
717        .with_label_values(&[&group_label, &level_label])
718        .inc_by(compact_task.sorted_output_ssts.len() as u64);
719
720    (compact_task, table_stats_map, object_timestamps)
721}
722
723pub async fn compact_and_build_sst<F>(
724    sst_builder: &mut CapacitySplitTableBuilder<F>,
725    task_config: &TaskConfig,
726    compactor_metrics: Arc<CompactorMetrics>,
727    mut iter: impl HummockIterator<Direction = Forward>,
728    mut compaction_filter: impl CompactionFilter,
729) -> HummockResult<CompactionStatistics>
730where
731    F: TableBuilderFactory,
732{
733    if !task_config.key_range.left.is_empty() {
734        let full_key = FullKey::decode(&task_config.key_range.left);
735        iter.seek(full_key)
736            .instrument_await("iter_seek".verbose())
737            .await?;
738    } else {
739        iter.rewind().instrument_await("rewind".verbose()).await?;
740    };
741
742    let end_key = if task_config.key_range.right.is_empty() {
743        FullKey::default()
744    } else {
745        FullKey::decode(&task_config.key_range.right).to_vec()
746    };
747    let max_key = end_key.to_ref();
748
749    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
750    let mut local_stats = StoreLocalStatistic::default();
751
752    // Keep table stats changes due to dropping KV.
753    let mut table_stats_drop = TableStatsMap::default();
754    let mut last_table_stats = TableStats::default();
755    let mut last_table_id = None;
756    let mut compaction_statistics = CompactionStatistics::default();
757    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
758    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
759    let schemas: HashMap<TableId, HashSet<i32>> = task_config
760        .table_schemas
761        .iter()
762        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
763        .collect();
764    while iter.is_valid() {
765        let iter_key = iter.key();
766        compaction_statistics.iter_total_key_counts += 1;
767
768        let is_new_user_key = full_key_tracker.observe(iter.key());
769        let mut drop = false;
770
771        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
772        let value = iter.value();
773        let ValueMeta {
774            object_id,
775            block_id,
776        } = iter.value_meta();
777        if is_new_user_key {
778            if !max_key.is_empty() && iter_key >= max_key {
779                break;
780            }
781            if value.is_delete() {
782                local_stats.skip_delete_key_count += 1;
783            }
784        } else {
785            local_stats.skip_multi_version_key_count += 1;
786        }
787
788        if last_table_id != Some(iter_key.user_key.table_id) {
789            if let Some(last_table_id) = last_table_id.take() {
790                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
791            }
792            last_table_id = Some(iter_key.user_key.table_id);
793        }
794
795        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
796        // In our design, frontend avoid to access keys which had be deleted, so we don't
797        // need to consider the epoch when the compaction_filter match (it
798        // means that mv had drop)
799        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
800        // Do not assume that "the epoch of keys behind must be smaller than the current key."
801        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
802            || (!task_config.retain_multiple_version && !is_new_user_key)
803        {
804            drop = true;
805        }
806
807        if !drop && compaction_filter.should_delete(iter_key) {
808            drop = true;
809        }
810
811        if drop {
812            compaction_statistics.iter_drop_key_counts += 1;
813
814            let should_count = match task_config.stats_target_table_ids.as_ref() {
815                Some(target_table_ids) => target_table_ids.contains(&iter_key.user_key.table_id),
816                None => true,
817            };
818            if should_count {
819                last_table_stats.total_key_count -= 1;
820                last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
821                last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
822            }
823            iter.next()
824                .instrument_await("iter_next_in_drop".verbose())
825                .await?;
826            continue;
827        }
828
829        // May drop stale columns
830        let check_table_id = iter_key.user_key.table_id;
831        let mut is_value_rewritten = false;
832        if let HummockValue::Put(v) = value
833            && let Some(object_id) = object_id
834            && let Some(block_id) = block_id
835            && !skip_schema_check
836                .get(&object_id)
837                .map(|prev_block_id| {
838                    assert!(*prev_block_id <= block_id);
839                    *prev_block_id == block_id
840                })
841                .unwrap_or(false)
842            && let Some(schema) = schemas.get(&check_table_id)
843        {
844            let value_size = v.len();
845            match try_drop_invalid_columns(v, schema) {
846                None => {
847                    if !task_config.disable_drop_column_optimization {
848                        // Under the assumption that all values in the same (object, block) group should share the same schema,
849                        // if one value drops no columns during a compaction, no need to check other values in the same group.
850                        skip_schema_check.insert(object_id, block_id);
851                    }
852                }
853                Some(new_value) => {
854                    is_value_rewritten = true;
855                    let new_put = HummockValue::put(new_value.as_slice());
856                    sst_builder
857                        .add_full_key(iter_key, new_put, is_new_user_key)
858                        .instrument_await("add_rewritten_full_key".verbose())
859                        .await?;
860                    let value_size_change = value_size as i64 - new_value.len() as i64;
861                    assert!(value_size_change >= 0);
862                    last_table_stats.total_value_size -= value_size_change;
863                }
864            }
865        }
866
867        if !is_value_rewritten {
868            // Don't allow two SSTs to share same user key
869            sst_builder
870                .add_full_key(iter_key, value, is_new_user_key)
871                .instrument_await("add_full_key".verbose())
872                .await?;
873        }
874
875        iter.next().instrument_await("iter_next".verbose()).await?;
876    }
877
878    if let Some(last_table_id) = last_table_id.take() {
879        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
880    }
881    iter.collect_local_statistic(&mut local_stats);
882    add_table_stats_map(
883        &mut table_stats_drop,
884        &local_stats.skipped_by_watermark_table_stats,
885    );
886    local_stats.report_compactor(compactor_metrics.as_ref());
887    compaction_statistics.delta_drop_stat = table_stats_drop;
888
889    Ok(compaction_statistics)
890}
891
892#[cfg(test)]
893pub mod tests {
894    use risingwave_hummock_sdk::can_concat;
895
896    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
897    use crate::hummock::iterator::test_utils::mock_sstable_store;
898    use crate::hummock::test_utils::{
899        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
900    };
901    use crate::hummock::value::HummockValue;
902
903    #[tokio::test]
904    async fn test_partition_overlapping_level() {
905        const TEST_KEYS_COUNT: usize = 10;
906        let sstable_store = mock_sstable_store().await;
907        let mut table_infos = vec![];
908        for object_id in 0..10 {
909            let start_index = object_id * TEST_KEYS_COUNT;
910            let end_index = start_index + 2 * TEST_KEYS_COUNT;
911            let table_info = gen_test_sstable_info(
912                default_builder_opt_for_test(),
913                object_id as u64,
914                (start_index..end_index)
915                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
916                sstable_store.clone(),
917            )
918            .await;
919            table_infos.push(table_info);
920        }
921        let table_infos = partition_overlapping_sstable_infos(table_infos);
922        assert_eq!(table_infos.len(), 2);
923        for ssts in table_infos {
924            assert!(can_concat(&ssts));
925        }
926    }
927}