risingwave_storage/hummock/compactor/
compactor_runner.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BinaryHeap, HashMap, HashSet};
16use std::sync::Arc;
17
18use await_tree::{InstrumentAwait, SpanExt};
19use bytes::Bytes;
20use futures::{FutureExt, StreamExt, stream};
21use itertools::Itertools;
22use risingwave_common::catalog::TableId;
23use risingwave_common::util::value_encoding::column_aware_row_encoding::try_drop_invalid_columns;
24use risingwave_hummock_sdk::compact::{
25    compact_task_to_string, estimate_memory_for_compact_task, statistics_compact_task,
26};
27use risingwave_hummock_sdk::compact_task::CompactTask;
28use risingwave_hummock_sdk::compaction_group::StateTableId;
29use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker};
30use risingwave_hummock_sdk::key_range::{KeyRange, KeyRangeCommon};
31use risingwave_hummock_sdk::sstable_info::SstableInfo;
32use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap, add_table_stats_map};
33use risingwave_hummock_sdk::{
34    HummockSstableObjectId, KeyComparator, can_concat, compact_task_output_to_string,
35    full_key_can_concat,
36};
37use risingwave_pb::hummock::LevelType;
38use risingwave_pb::hummock::compact_task::TaskStatus;
39use thiserror_ext::AsReport;
40use tokio::sync::oneshot::Receiver;
41
42use super::iterator::MonitoredCompactorIterator;
43use super::task_progress::TaskProgress;
44use super::{CompactionStatistics, TaskConfig};
45use crate::compaction_catalog_manager::{CompactionCatalogAgentRef, CompactionCatalogManagerRef};
46use crate::hummock::compactor::compaction_utils::{
47    build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task,
48    metrics_report_for_task, optimize_by_copy_block,
49};
50use crate::hummock::compactor::iterator::ConcatSstableIterator;
51use crate::hummock::compactor::task_progress::TaskProgressGuard;
52use crate::hummock::compactor::{
53    CompactOutput, CompactionFilter, Compactor, CompactorContext, await_tree_key,
54    fast_compactor_runner,
55};
56use crate::hummock::iterator::{
57    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
58    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
59    ValueMeta, ValueSkipWatermarkIterator, ValueSkipWatermarkState,
60};
61use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
62use crate::hummock::utils::MemoryTracker;
63use crate::hummock::value::HummockValue;
64use crate::hummock::{
65    CachePolicy, CompressionAlgorithm, GetObjectId, HummockResult, SstableBuilderOptions,
66    SstableStoreRef,
67};
68use crate::monitor::{CompactorMetrics, StoreLocalStatistic};
69pub struct CompactorRunner {
70    compact_task: CompactTask,
71    compactor: Compactor,
72    sstable_store: SstableStoreRef,
73    key_range: KeyRange,
74    split_index: usize,
75}
76
77impl CompactorRunner {
78    pub fn new(
79        split_index: usize,
80        context: CompactorContext,
81        task: CompactTask,
82        object_id_getter: Arc<dyn GetObjectId>,
83    ) -> Self {
84        let mut options: SstableBuilderOptions = context.storage_opts.as_ref().into();
85        options.compression_algorithm = match task.compression_algorithm {
86            0 => CompressionAlgorithm::None,
87            1 => CompressionAlgorithm::Lz4,
88            _ => CompressionAlgorithm::Zstd,
89        };
90
91        options.capacity = estimate_task_output_capacity(context.clone(), &task);
92        options.max_vnode_key_range_bytes = task.effective_max_vnode_key_range_bytes();
93        let use_block_based_filter = task.should_use_block_based_filter();
94
95        let key_range = KeyRange {
96            left: task.splits[split_index].left.clone(),
97            right: task.splits[split_index].right.clone(),
98            right_exclusive: true,
99        };
100        let compactor = Compactor::new(
101            context.clone(),
102            options,
103            TaskConfig {
104                key_range: key_range.clone(),
105                cache_policy: CachePolicy::NotFill,
106                gc_delete_keys: task.gc_delete_keys,
107                retain_multiple_version: false,
108                use_block_based_filter,
109                table_vnode_partition: task.table_vnode_partition.clone(),
110                table_schemas: task
111                    .table_schemas
112                    .iter()
113                    .map(|(k, v)| (*k, v.clone()))
114                    .collect(),
115                disable_drop_column_optimization: false,
116            },
117            object_id_getter,
118        );
119
120        Self {
121            compactor,
122            compact_task: task,
123            sstable_store: context.sstable_store,
124            key_range,
125            split_index,
126        }
127    }
128
129    pub async fn run(
130        &self,
131        compaction_filter: impl CompactionFilter,
132        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
133        task_progress: Arc<TaskProgress>,
134    ) -> HummockResult<CompactOutput> {
135        let iter =
136            self.build_sst_iter(task_progress.clone(), compaction_catalog_agent_ref.clone())?;
137        let (ssts, compaction_stat) = self
138            .compactor
139            .compact_key_range(
140                iter,
141                compaction_filter,
142                compaction_catalog_agent_ref,
143                Some(task_progress),
144                Some(self.compact_task.task_id),
145                Some(self.split_index),
146            )
147            .await?;
148        Ok((self.split_index, ssts, compaction_stat))
149    }
150
151    /// Build the merge iterator based on the given input ssts.
152    fn build_sst_iter(
153        &self,
154        task_progress: Arc<TaskProgress>,
155        compaction_catalog_agent_ref: CompactionCatalogAgentRef,
156    ) -> HummockResult<impl HummockIterator<Direction = Forward> + use<>> {
157        let compactor_iter_max_io_retry_times = self
158            .compactor
159            .context
160            .storage_opts
161            .compactor_iter_max_io_retry_times;
162        let mut table_iters = Vec::new();
163
164        for level in &self.compact_task.input_ssts {
165            let tables = level
166                .read_sstable_infos()
167                .filter(|table_info| self.key_range.full_key_overlap(&table_info.key_range))
168                .cloned()
169                .collect_vec();
170            if tables.is_empty() {
171                continue;
172            }
173
174            if level.level_type == LevelType::Nonoverlapping {
175                debug_assert!(can_concat(&tables));
176                table_iters.push(ConcatSstableIterator::new(
177                    tables,
178                    self.compactor.task_config.key_range.clone(),
179                    self.sstable_store.clone(),
180                    task_progress.clone(),
181                    compactor_iter_max_io_retry_times,
182                ));
183            } else if tables.len()
184                > self
185                    .compactor
186                    .context
187                    .storage_opts
188                    .compactor_max_overlap_sst_count
189            {
190                let sst_groups = partition_overlapping_sstable_infos(tables);
191                tracing::warn!(
192                    "COMPACT A LARGE OVERLAPPING LEVEL: try to partition {} ssts with {} groups",
193                    sst_groups.iter().map(Vec::len).sum::<usize>(),
194                    sst_groups.len()
195                );
196                for (idx, table_infos) in sst_groups.into_iter().enumerate() {
197                    // Overlapping sstables may contains ssts with same user key (generated by spilled), so we need to check concat with full key.
198                    assert!(
199                        full_key_can_concat(&table_infos),
200                        "sst_group idx {:?} table_infos: {:?}",
201                        idx,
202                        table_infos
203                    );
204                    table_iters.push(ConcatSstableIterator::new(
205                        table_infos,
206                        self.compactor.task_config.key_range.clone(),
207                        self.sstable_store.clone(),
208                        task_progress.clone(),
209                        compactor_iter_max_io_retry_times,
210                    ));
211                }
212            } else {
213                for table_info in tables {
214                    table_iters.push(ConcatSstableIterator::new(
215                        vec![table_info],
216                        self.compactor.task_config.key_range.clone(),
217                        self.sstable_store.clone(),
218                        task_progress.clone(),
219                        compactor_iter_max_io_retry_times,
220                    ));
221                }
222            }
223        }
224
225        // // The `SkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
226        // // in https://github.com/risingwavelabs/risingwave/issues/13148
227        // The `Pk/NonPkPrefixSkipWatermarkIterator` is used to handle the table watermark state cleaning introduced
228        // in https://github.com/risingwavelabs/risingwave/issues/13148
229        let combine_iter = {
230            let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
231                MonitoredCompactorIterator::new(
232                    MergeIterator::for_compactor(table_iters),
233                    task_progress,
234                ),
235                PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
236                    self.compact_task.pk_prefix_table_watermarks.clone(),
237                ),
238            );
239
240            let pk_skip_watermark_iter = NonPkPrefixSkipWatermarkIterator::new(
241                skip_watermark_iter,
242                NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
243                    self.compact_task.non_pk_prefix_table_watermarks.clone(),
244                    compaction_catalog_agent_ref.clone(),
245                ),
246            );
247
248            ValueSkipWatermarkIterator::new(
249                pk_skip_watermark_iter,
250                ValueSkipWatermarkState::from_safe_epoch_watermarks(
251                    self.compact_task.value_table_watermarks.clone(),
252                    compaction_catalog_agent_ref,
253                ),
254            )
255        };
256
257        Ok(combine_iter)
258    }
259}
260
261pub fn partition_overlapping_sstable_infos(
262    mut origin_infos: Vec<SstableInfo>,
263) -> Vec<Vec<SstableInfo>> {
264    pub struct SstableGroup {
265        ssts: Vec<SstableInfo>,
266        max_right_bound: Bytes,
267    }
268
269    impl PartialEq for SstableGroup {
270        fn eq(&self, other: &SstableGroup) -> bool {
271            self.max_right_bound == other.max_right_bound
272        }
273    }
274    impl PartialOrd for SstableGroup {
275        fn partial_cmp(&self, other: &SstableGroup) -> Option<std::cmp::Ordering> {
276            Some(self.cmp(other))
277        }
278    }
279    impl Eq for SstableGroup {}
280    impl Ord for SstableGroup {
281        fn cmp(&self, other: &SstableGroup) -> std::cmp::Ordering {
282            // Pick group with the smallest right bound for every new sstable.
283            KeyComparator::compare_encoded_full_key(&other.max_right_bound, &self.max_right_bound)
284        }
285    }
286    let mut groups: BinaryHeap<SstableGroup> = BinaryHeap::default();
287    origin_infos.sort_by(|a, b| {
288        let x = &a.key_range;
289        let y = &b.key_range;
290        KeyComparator::compare_encoded_full_key(&x.left, &y.left)
291    });
292    for sst in origin_infos {
293        // Pick group with the smallest right bound for every new sstable. So do not check the larger one if the smallest one does not meet condition.
294        if let Some(mut prev_group) = groups.peek_mut()
295            && KeyComparator::encoded_full_key_less_than(
296                &prev_group.max_right_bound,
297                &sst.key_range.left,
298            )
299        {
300            prev_group.max_right_bound.clone_from(&sst.key_range.right);
301            prev_group.ssts.push(sst);
302            continue;
303        }
304        groups.push(SstableGroup {
305            max_right_bound: sst.key_range.right.clone(),
306            ssts: vec![sst],
307        });
308    }
309    assert!(!groups.is_empty());
310    groups.into_iter().map(|group| group.ssts).collect_vec()
311}
312
313/// Handles a compaction task and reports its status to hummock manager.
314/// Always return `Ok` and let hummock manager handle errors.
315pub async fn compact_with_agent(
316    compactor_context: CompactorContext,
317    mut compact_task: CompactTask,
318    mut shutdown_rx: Receiver<()>,
319    object_id_getter: Arc<dyn GetObjectId>,
320    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
321) -> (
322    (
323        CompactTask,
324        HashMap<TableId, TableStats>,
325        HashMap<HummockSstableObjectId, u64>,
326    ),
327    Option<MemoryTracker>,
328) {
329    let context = compactor_context.clone();
330    let group_label = compact_task.compaction_group_id.to_string();
331    metrics_report_for_task(&compact_task, &context);
332
333    let timer = context
334        .compactor_metrics
335        .compact_task_duration
336        .with_label_values(&[
337            &group_label,
338            &compact_task.input_ssts[0].level_idx.to_string(),
339        ])
340        .start_timer();
341
342    let multi_filter = build_multi_compaction_filter(&compact_task);
343    let mut task_status = TaskStatus::Success;
344    let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);
345
346    if let Err(e) =
347        generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await
348    {
349        tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
350        task_status = TaskStatus::ExecuteFailed;
351        return (
352            compact_done(compact_task, context.clone(), vec![], task_status),
353            None,
354        );
355    }
356
357    let compact_task_statistics = statistics_compact_task(&compact_task);
358    // Number of splits (key ranges) is equal to number of compaction tasks
359    let parallelism = compact_task.splits.len();
360    assert_ne!(parallelism, 0, "splits cannot be empty");
361    let mut output_ssts = Vec::with_capacity(parallelism);
362    let mut compaction_futures = vec![];
363    let mut abort_handles = vec![];
364    let task_progress_guard =
365        TaskProgressGuard::new(compact_task.task_id, context.task_progress_manager.clone());
366
367    let capacity = estimate_task_output_capacity(context.clone(), &compact_task);
368
369    let task_memory_capacity_with_parallelism = estimate_memory_for_compact_task(
370        &compact_task,
371        (context.storage_opts.block_size_kb as u64) * (1 << 10),
372        context
373            .storage_opts
374            .object_store_config
375            .s3
376            .recv_buffer_size
377            .unwrap_or(6 * 1024 * 1024) as u64,
378        capacity as u64,
379    ) * compact_task.splits.len() as u64;
380
381    tracing::info!(
382        "Ready to handle task: {} compact_task_statistics {:?} compression_algorithm {:?}  parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}, {}",
383        compact_task.task_id,
384        compact_task_statistics,
385        compact_task.compression_algorithm,
386        parallelism,
387        task_memory_capacity_with_parallelism,
388        optimize_by_copy_block,
389        compact_task_to_string(&compact_task),
390    );
391
392    // If the task does not have enough memory, it should cancel the task and let the meta
393    // reschedule it, so that it does not occupy the compactor's resources.
394    let memory_detector = context
395        .memory_limiter
396        .try_require_memory(task_memory_capacity_with_parallelism);
397    if memory_detector.is_none() {
398        tracing::warn!(
399            "Not enough memory to serve the task {} task_memory_capacity_with_parallelism {}  memory_usage {} memory_quota {}",
400            compact_task.task_id,
401            task_memory_capacity_with_parallelism,
402            context.memory_limiter.get_memory_usage(),
403            context.memory_limiter.quota()
404        );
405        task_status = TaskStatus::NoAvailMemoryResourceCanceled;
406        return (
407            compact_done(compact_task, context.clone(), output_ssts, task_status),
408            memory_detector,
409        );
410    }
411
412    context.compactor_metrics.compact_task_pending_num.inc();
413    context
414        .compactor_metrics
415        .compact_task_pending_parallelism
416        .add(parallelism as _);
417    let _release_metrics_guard =
418        scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| {
419            context.compactor_metrics.compact_task_pending_num.dec();
420            context
421                .compactor_metrics
422                .compact_task_pending_parallelism
423                .sub(parallelism as _);
424        });
425
426    if optimize_by_copy_block {
427        let runner = fast_compactor_runner::CompactorRunner::new(
428            context.clone(),
429            compact_task.clone(),
430            compaction_catalog_agent_ref.clone(),
431            object_id_getter.clone(),
432            task_progress_guard.progress.clone(),
433            multi_filter,
434        );
435
436        tokio::select! {
437            _ = &mut shutdown_rx => {
438                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
439                task_status = TaskStatus::ManualCanceled;
440            },
441
442            ret = runner.run() => {
443                match ret {
444                    Ok((ssts, statistics)) => {
445                        output_ssts.push((0, ssts, statistics));
446                    }
447                    Err(e) => {
448                        task_status = TaskStatus::ExecuteFailed;
449                        tracing::warn!(
450                            error = %e.as_report(),
451                            "Compaction task {} failed with error",
452                            compact_task.task_id,
453                        );
454                    }
455                }
456            }
457        }
458
459        // After a compaction is done, mutate the compaction task.
460        let (compact_task, table_stats, object_timestamps) =
461            compact_done(compact_task, context.clone(), output_ssts, task_status);
462        let cost_time = timer.stop_and_record() * 1000.0;
463        tracing::info!(
464            "Finished fast compaction task in {:?}ms: {}",
465            cost_time,
466            compact_task_to_string(&compact_task)
467        );
468        return (
469            (compact_task, table_stats, object_timestamps),
470            memory_detector,
471        );
472    }
473    for (split_index, _) in compact_task.splits.iter().enumerate() {
474        let filter = multi_filter.clone();
475        let compaction_catalog_agent_ref = compaction_catalog_agent_ref.clone();
476        let compactor_runner = CompactorRunner::new(
477            split_index,
478            compactor_context.clone(),
479            compact_task.clone(),
480            object_id_getter.clone(),
481        );
482        let task_progress = task_progress_guard.progress.clone();
483        let runner = async move {
484            compactor_runner
485                .run(filter, compaction_catalog_agent_ref, task_progress)
486                .await
487        };
488        let traced = match context.await_tree_reg.as_ref() {
489            None => runner.right_future(),
490            Some(await_tree_reg) => await_tree_reg
491                .register(
492                    await_tree_key::CompactRunner {
493                        task_id: compact_task.task_id,
494                        split_index,
495                    },
496                    format!(
497                        "Compaction Task {} Split {} ",
498                        compact_task.task_id, split_index
499                    ),
500                )
501                .instrument(runner)
502                .left_future(),
503        };
504        let handle = tokio::spawn(traced);
505        abort_handles.push(handle.abort_handle());
506        compaction_futures.push(handle);
507    }
508
509    let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
510    loop {
511        tokio::select! {
512            _ = &mut shutdown_rx => {
513                tracing::warn!("Compaction task cancelled externally:\n{}", compact_task_to_string(&compact_task));
514                task_status = TaskStatus::ManualCanceled;
515                break;
516            }
517            future_result = buffered.next() => {
518                match future_result {
519                    Some(Ok(Ok((split_index, ssts, compact_stat)))) => {
520                        output_ssts.push((split_index, ssts, compact_stat));
521                    }
522                    Some(Ok(Err(e))) => {
523                        task_status = TaskStatus::ExecuteFailed;
524                        tracing::warn!(
525                            error = %e.as_report(),
526                            "Compaction task {} failed with error",
527                            compact_task.task_id,
528                        );
529                        break;
530                    }
531                    Some(Err(e)) => {
532                        task_status = TaskStatus::JoinHandleFailed;
533                        tracing::warn!(
534                            error = %e.as_report(),
535                            "Compaction task {} failed with join handle error",
536                            compact_task.task_id,
537                        );
538                        break;
539                    }
540                    None => break,
541                }
542            }
543        }
544    }
545
546    if task_status != TaskStatus::Success {
547        for abort_handle in abort_handles {
548            abort_handle.abort();
549        }
550        output_ssts.clear();
551    }
552    // Sort by split/key range index.
553    if !output_ssts.is_empty() {
554        output_ssts.sort_by_key(|(split_index, ..)| *split_index);
555    }
556
557    // After a compaction is done, mutate the compaction task.
558    let (compact_task, table_stats, object_timestamps) =
559        compact_done(compact_task, context.clone(), output_ssts, task_status);
560    let cost_time = timer.stop_and_record() * 1000.0;
561    tracing::info!(
562        "Finished compaction task in {:?}ms: {}",
563        cost_time,
564        compact_task_output_to_string(&compact_task)
565    );
566    (
567        (compact_task, table_stats, object_timestamps),
568        memory_detector,
569    )
570}
571
572/// Handles a compaction task and reports its status to hummock manager.
573/// Always return `Ok` and let hummock manager handle errors.
574pub async fn compact(
575    compactor_context: CompactorContext,
576    compact_task: CompactTask,
577    shutdown_rx: Receiver<()>,
578    object_id_getter: Arc<dyn GetObjectId>,
579    compaction_catalog_manager_ref: CompactionCatalogManagerRef,
580) -> (
581    (
582        CompactTask,
583        HashMap<TableId, TableStats>,
584        HashMap<HummockSstableObjectId, u64>,
585    ),
586    Option<MemoryTracker>,
587) {
588    let read_table_ids = compact_task.get_table_ids_from_input_ssts().collect_vec();
589    let compaction_catalog_agent_ref = match compaction_catalog_manager_ref
590        .acquire(read_table_ids.clone())
591        .await
592    {
593        Ok(compaction_catalog_agent_ref) => {
594            let acquire_table_ids: HashSet<StateTableId> =
595                compaction_catalog_agent_ref.table_ids().collect();
596            if acquire_table_ids.len() != read_table_ids.len() {
597                let diff = read_table_ids
598                    .into_iter()
599                    .collect::<HashSet<_>>()
600                    .symmetric_difference(&acquire_table_ids)
601                    .cloned()
602                    .collect::<Vec<_>>();
603                tracing::warn!(
604                    dif= ?diff,
605                    "Some table ids are not acquired."
606                );
607                return (
608                    compact_done(
609                        compact_task,
610                        compactor_context.clone(),
611                        vec![],
612                        TaskStatus::ExecuteFailed,
613                    ),
614                    None,
615                );
616            }
617
618            compaction_catalog_agent_ref
619        }
620        Err(e) => {
621            tracing::warn!(
622                error = %e.as_report(),
623                "Failed to acquire compaction catalog agent"
624            );
625            return (
626                compact_done(
627                    compact_task,
628                    compactor_context.clone(),
629                    vec![],
630                    TaskStatus::ExecuteFailed,
631                ),
632                None,
633            );
634        }
635    };
636
637    compact_with_agent(
638        compactor_context,
639        compact_task,
640        shutdown_rx,
641        object_id_getter,
642        compaction_catalog_agent_ref,
643    )
644    .await
645}
646
647/// Fills in the compact task and tries to report the task result to meta node.
648pub(crate) fn compact_done(
649    mut compact_task: CompactTask,
650    context: CompactorContext,
651    output_ssts: Vec<CompactOutput>,
652    task_status: TaskStatus,
653) -> (
654    CompactTask,
655    HashMap<TableId, TableStats>,
656    HashMap<HummockSstableObjectId, u64>,
657) {
658    let mut table_stats_map = TableStatsMap::default();
659    let mut object_timestamps = HashMap::default();
660    compact_task.task_status = task_status;
661    compact_task
662        .sorted_output_ssts
663        .reserve(compact_task.splits.len());
664    let mut compaction_write_bytes = 0;
665    for (
666        _,
667        ssts,
668        CompactionStatistics {
669            delta_drop_stat, ..
670        },
671    ) in output_ssts
672    {
673        add_table_stats_map(&mut table_stats_map, &delta_drop_stat);
674        for sst_info in ssts {
675            compaction_write_bytes += sst_info.file_size();
676            object_timestamps.insert(sst_info.sst_info.object_id, sst_info.created_at);
677            compact_task.sorted_output_ssts.push(sst_info.sst_info);
678        }
679    }
680    assert_output_table_ids_subset_of_read_input(&compact_task);
681
682    let group_label = compact_task.compaction_group_id.to_string();
683    let level_label = compact_task.target_level.to_string();
684    context
685        .compactor_metrics
686        .compact_write_bytes
687        .with_label_values(&[&group_label, &level_label])
688        .inc_by(compaction_write_bytes);
689    context
690        .compactor_metrics
691        .compact_write_sstn
692        .with_label_values(&[&group_label, &level_label])
693        .inc_by(compact_task.sorted_output_ssts.len() as u64);
694
695    (compact_task, table_stats_map, object_timestamps)
696}
697
698/// Compactor output must only contain table ids that are readable in the normalized task input.
699fn assert_output_table_ids_subset_of_read_input(compact_task: &CompactTask) {
700    let read_table_ids = compact_task
701        .get_table_ids_from_input_ssts()
702        .collect::<HashSet<_>>();
703
704    for output_sst in &compact_task.sorted_output_ssts {
705        assert!(
706            output_sst
707                .table_ids
708                .iter()
709                .all(|table_id| read_table_ids.contains(table_id)),
710            "compaction output contains table ids outside task read input: task_id={}, output_sst_id={}, read_table_ids={:?}, output_table_ids={:?}",
711            compact_task.task_id,
712            output_sst.sst_id,
713            read_table_ids,
714            output_sst.table_ids,
715        );
716    }
717}
718
719pub async fn compact_and_build_sst<F>(
720    sst_builder: &mut CapacitySplitTableBuilder<F>,
721    task_config: &TaskConfig,
722    compactor_metrics: Arc<CompactorMetrics>,
723    mut iter: impl HummockIterator<Direction = Forward>,
724    mut compaction_filter: impl CompactionFilter,
725) -> HummockResult<CompactionStatistics>
726where
727    F: TableBuilderFactory,
728{
729    if !task_config.key_range.left.is_empty() {
730        let full_key = FullKey::decode(&task_config.key_range.left);
731        iter.seek(full_key)
732            .instrument_await("iter_seek".verbose())
733            .await?;
734    } else {
735        iter.rewind().instrument_await("rewind".verbose()).await?;
736    };
737
738    let end_key = if task_config.key_range.right.is_empty() {
739        FullKey::default()
740    } else {
741        FullKey::decode(&task_config.key_range.right).to_vec()
742    };
743    let max_key = end_key.to_ref();
744
745    let mut full_key_tracker = FullKeyTracker::<Vec<u8>>::new(FullKey::default());
746    let mut local_stats = StoreLocalStatistic::default();
747
748    // Keep table stats changes due to dropping KV.
749    let mut table_stats_drop = TableStatsMap::default();
750    let mut last_table_stats = TableStats::default();
751    let mut last_table_id = None;
752    let mut compaction_statistics = CompactionStatistics::default();
753    // object id -> block id. For an object id, block id is updated in a monotonically increasing manner.
754    let mut skip_schema_check: HashMap<HummockSstableObjectId, u64> = HashMap::default();
755    let schemas: HashMap<TableId, HashSet<i32>> = task_config
756        .table_schemas
757        .iter()
758        .map(|(table_id, schema)| (*table_id, schema.column_ids.iter().copied().collect()))
759        .collect();
760    while iter.is_valid() {
761        let iter_key = iter.key();
762        compaction_statistics.iter_total_key_counts += 1;
763
764        let is_new_user_key = full_key_tracker.observe(iter.key());
765        let mut drop = false;
766
767        // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary.
768        let value = iter.value();
769        let ValueMeta {
770            object_id,
771            block_id,
772        } = iter.value_meta();
773        if is_new_user_key {
774            if !max_key.is_empty() && iter_key >= max_key {
775                break;
776            }
777            if value.is_delete() {
778                local_stats.skip_delete_key_count += 1;
779            }
780        } else {
781            local_stats.skip_multi_version_key_count += 1;
782        }
783
784        if last_table_id != Some(iter_key.user_key.table_id) {
785            if let Some(last_table_id) = last_table_id.take() {
786                table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
787            }
788            last_table_id = Some(iter_key.user_key.table_id);
789        }
790
791        // Among keys with same user key, only keep the latest key unless retain_multiple_version is true.
792        // In our design, frontend avoid to access keys which had be deleted, so we don't
793        // need to consider the epoch when the compaction_filter match (it
794        // means that mv had drop)
795        // Because of memtable spill, there may be a PUT key share the same `pure_epoch` with DELETE key.
796        // Do not assume that "the epoch of keys behind must be smaller than the current key."
797        if (!task_config.retain_multiple_version && task_config.gc_delete_keys && value.is_delete())
798            || (!task_config.retain_multiple_version && !is_new_user_key)
799        {
800            drop = true;
801        }
802
803        if !drop && compaction_filter.should_delete(iter_key) {
804            drop = true;
805        }
806
807        if drop {
808            compaction_statistics.iter_drop_key_counts += 1;
809
810            last_table_stats.total_key_count -= 1;
811            last_table_stats.total_key_size -= iter_key.encoded_len() as i64;
812            last_table_stats.total_value_size -= iter.value().encoded_len() as i64;
813            iter.next()
814                .instrument_await("iter_next_in_drop".verbose())
815                .await?;
816            continue;
817        }
818
819        // May drop stale columns
820        let check_table_id = iter_key.user_key.table_id;
821        let mut is_value_rewritten = false;
822        if let HummockValue::Put(v) = value
823            && let Some(object_id) = object_id
824            && let Some(block_id) = block_id
825            && !skip_schema_check
826                .get(&object_id)
827                .map(|prev_block_id| {
828                    assert!(*prev_block_id <= block_id);
829                    *prev_block_id == block_id
830                })
831                .unwrap_or(false)
832            && let Some(schema) = schemas.get(&check_table_id)
833        {
834            let value_size = v.len();
835            match try_drop_invalid_columns(v, schema) {
836                None => {
837                    if !task_config.disable_drop_column_optimization {
838                        // Under the assumption that all values in the same (object, block) group should share the same schema,
839                        // if one value drops no columns during a compaction, no need to check other values in the same group.
840                        skip_schema_check.insert(object_id, block_id);
841                    }
842                }
843                Some(new_value) => {
844                    is_value_rewritten = true;
845                    let new_put = HummockValue::put(new_value.as_slice());
846                    sst_builder
847                        .add_full_key(iter_key, new_put, is_new_user_key)
848                        .instrument_await("add_rewritten_full_key".verbose())
849                        .await?;
850                    let value_size_change = value_size as i64 - new_value.len() as i64;
851                    assert!(value_size_change >= 0);
852                    last_table_stats.total_value_size -= value_size_change;
853                }
854            }
855        }
856
857        if !is_value_rewritten {
858            // Don't allow two SSTs to share same user key
859            sst_builder
860                .add_full_key(iter_key, value, is_new_user_key)
861                .instrument_await("add_full_key".verbose())
862                .await?;
863        }
864
865        iter.next().instrument_await("iter_next".verbose()).await?;
866    }
867
868    if let Some(last_table_id) = last_table_id.take() {
869        table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
870    }
871    iter.collect_local_statistic(&mut local_stats);
872    add_table_stats_map(
873        &mut table_stats_drop,
874        &local_stats.skipped_by_watermark_table_stats,
875    );
876    local_stats.report_compactor(compactor_metrics.as_ref());
877    compaction_statistics.delta_drop_stat = table_stats_drop;
878
879    Ok(compaction_statistics)
880}
881
882#[cfg(test)]
883pub mod tests {
884    use risingwave_hummock_sdk::can_concat;
885
886    use crate::hummock::compactor::compactor_runner::partition_overlapping_sstable_infos;
887    use crate::hummock::iterator::test_utils::mock_sstable_store;
888    use crate::hummock::test_utils::{
889        default_builder_opt_for_test, gen_test_sstable_info, test_key_of, test_value_of,
890    };
891    use crate::hummock::value::HummockValue;
892
893    #[tokio::test]
894    async fn test_partition_overlapping_level() {
895        const TEST_KEYS_COUNT: usize = 10;
896        let sstable_store = mock_sstable_store().await;
897        let mut table_infos = vec![];
898        for object_id in 0..10 {
899            let start_index = object_id * TEST_KEYS_COUNT;
900            let end_index = start_index + 2 * TEST_KEYS_COUNT;
901            let table_info = gen_test_sstable_info(
902                default_builder_opt_for_test(),
903                object_id as u64,
904                (start_index..end_index)
905                    .map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
906                sstable_store.clone(),
907            )
908            .await;
909            table_infos.push(table_info);
910        }
911        let table_infos = partition_overlapping_sstable_infos(table_infos);
912        assert_eq!(table_infos.len(), 2);
913        for ssts in table_infos {
914            assert!(can_concat(&ssts));
915        }
916    }
917}