risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::config::meta::default::compaction_config;
25use risingwave_common::constants::hummock::CompactionFilterFlag;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbSstableFilterType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39    ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
40};
41use crate::hummock::iterator::{
42    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
43    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
44    UserIterator,
45};
46use crate::hummock::multi_builder::TableBuilderFactory;
47use crate::hummock::{
48    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
49    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
50};
51use crate::monitor::StoreLocalStatistic;
52
53pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
54    pub object_id_getter: Arc<dyn GetObjectId>,
55    pub limiter: Arc<MemoryLimiter>,
56    pub options: SstableBuilderOptions,
57    pub policy: CachePolicy,
58    pub remote_rpc_cost: Arc<AtomicU64>,
59    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
60    pub sstable_writer_factory: W,
61    pub _phantom: PhantomData<F>,
62}
63
64#[async_trait::async_trait]
65impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
66    type Filter = F;
67    type Writer = W::Writer;
68
69    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
70        let timer = Instant::now();
71        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
72        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
73        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
74        let writer_options = SstableWriterOptions {
75            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
76            tracker: None,
77            policy: self.policy,
78        };
79        let writer = self
80            .sstable_writer_factory
81            .create_sst_writer(table_id, writer_options)
82            .await?;
83        let builder = SstableBuilder::new(
84            table_id,
85            writer,
86            Self::Filter::create(self.options.filter_builder_options()),
87            self.options.clone(),
88            self.compaction_catalog_agent_ref.clone(),
89            Some(self.limiter.clone()),
90        );
91        Ok(builder)
92    }
93}
94
95/// `CompactionStatistics` will count the results of each compact split
96#[derive(Default, Debug)]
97pub struct CompactionStatistics {
98    // to report per-table metrics
99    pub delta_drop_stat: TableStatsMap,
100
101    // to calculate delete ratio
102    pub iter_total_key_counts: u64,
103    pub iter_drop_key_counts: u64,
104}
105
106impl CompactionStatistics {
107    #[expect(dead_code)]
108    fn delete_ratio(&self) -> Option<u64> {
109        if self.iter_total_key_counts == 0 {
110            return None;
111        }
112
113        Some(self.iter_drop_key_counts / self.iter_total_key_counts)
114    }
115}
116
117#[derive(Clone, Default)]
118pub struct TaskConfig {
119    pub(crate) key_range: KeyRange,
120    pub(crate) cache_policy: CachePolicy,
121    pub(crate) gc_delete_keys: bool,
122    pub(crate) retain_multiple_version: bool,
123    pub(crate) use_block_based_filter: bool,
124    pub(crate) sstable_filter_kind: PbSstableFilterType,
125
126    pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
127    /// `TableId` -> `TableSchema`
128    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
129    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
130    pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
131    /// `disable_drop_column_optimization` should only be set in benchmark.
132    pub(crate) disable_drop_column_optimization: bool,
133}
134
135impl TaskConfig {
136    #[cfg(any(test, feature = "test"))]
137    pub fn for_test(
138        key_range: KeyRange,
139        cache_policy: CachePolicy,
140        gc_delete_keys: bool,
141        use_block_based_filter: bool,
142        table_schemas: HashMap<TableId, PbTableSchema>,
143    ) -> Self {
144        Self {
145            key_range,
146            cache_policy,
147            gc_delete_keys,
148            retain_multiple_version: false,
149            use_block_based_filter,
150            sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
151            table_vnode_partition: BTreeMap::default(),
152            table_schemas,
153            disable_drop_column_optimization: false,
154        }
155    }
156
157    #[cfg(any(test, feature = "test"))]
158    pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
159        self.disable_drop_column_optimization = disable;
160        self
161    }
162}
163
164pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
165    let mut multi_filter = MultiCompactionFilter::default();
166    let compaction_filter_flag =
167        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
168    // STATE_CLEAN is handled by compact-task table-id normalization and iterator-level block
169    // pruning. Keep the flag as a no-op here for compatibility with existing task configs.
170
171    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
172        let id_to_ttl = compact_task
173            .table_options
174            .iter()
175            .filter_map(|(id, option)| {
176                option
177                    .retention_seconds
178                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
179            })
180            .collect();
181
182        let ttl_filter = Box::new(TtlCompactionFilter::new(
183            id_to_ttl,
184            compact_task.current_epoch_time,
185        ));
186        multi_filter.register(ttl_filter);
187    }
188
189    multi_filter
190}
191
192fn generate_splits_fast(
193    sstable_infos: &[&SstableInfo],
194    compaction_size: u64,
195    context: &CompactorContext,
196    max_sub_compaction: u32,
197) -> Vec<KeyRange> {
198    let worker_num = context.compaction_executor.worker_num();
199    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
200
201    let parallelism = calculate_task_parallelism_impl(
202        worker_num,
203        parallel_compact_size,
204        compaction_size,
205        max_sub_compaction,
206    );
207    let mut indexes = vec![];
208    for sst in sstable_infos {
209        let key_range = &sst.key_range;
210        indexes.push(
211            FullKey {
212                user_key: FullKey::decode(&key_range.left).user_key,
213                epoch_with_gap: EpochWithGap::new_max_epoch(),
214            }
215            .encode(),
216        );
217        indexes.push(
218            FullKey {
219                user_key: FullKey::decode(&key_range.right).user_key,
220                epoch_with_gap: EpochWithGap::new_max_epoch(),
221            }
222            .encode(),
223        );
224    }
225    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
226    indexes.dedup();
227    if indexes.len() <= parallelism {
228        return vec![];
229    }
230
231    let mut splits = vec![];
232    splits.push(KeyRange::default());
233    let parallel_key_count = indexes.len() / parallelism;
234    let mut last_split_key_count = 0;
235    for key in indexes {
236        if last_split_key_count >= parallel_key_count {
237            splits.last_mut().unwrap().right = Bytes::from(key.clone());
238            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
239            last_split_key_count = 0;
240        }
241        last_split_key_count += 1;
242    }
243
244    splits
245}
246
247pub async fn generate_splits(
248    sstable_infos: &[&SstableInfo],
249    compaction_size: u64,
250    context: &CompactorContext,
251    max_sub_compaction: u32,
252) -> HummockResult<Vec<KeyRange>> {
253    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
254    if compaction_size > parallel_compact_size {
255        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
256            return Ok(generate_splits_fast(
257                sstable_infos,
258                compaction_size,
259                context,
260                max_sub_compaction,
261            ));
262        }
263        let mut indexes = vec![];
264        // preload the meta and get the smallest key to split sub_compaction
265        for sstable_info in sstable_infos {
266            let sstable = context
267                .sstable_store
268                .sstable(sstable_info, &mut StoreLocalStatistic::default())
269                .await?;
270            indexes.extend(sstable.meta.block_metas.iter().map(|block| {
271                let data_size = block.len;
272                let full_key = FullKey {
273                    user_key: FullKey::decode(&block.smallest_key).user_key,
274                    epoch_with_gap: EpochWithGap::new_max_epoch(),
275                }
276                .encode();
277                (data_size as u64, full_key)
278            }));
279        }
280        // sort by key, as for every data block has the same size;
281        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
282        let mut splits = vec![];
283        splits.push(KeyRange::default());
284
285        let parallelism = calculate_task_parallelism_impl(
286            context.compaction_executor.worker_num(),
287            parallel_compact_size,
288            compaction_size,
289            max_sub_compaction,
290        );
291
292        let sub_compaction_data_size =
293            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
294
295        if parallelism > 1 {
296            let mut last_buffer_size = 0;
297            let mut last_key: Vec<u8> = vec![];
298            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
299            for (data_size, key) in indexes {
300                if last_buffer_size >= sub_compaction_data_size
301                    && !last_key.eq(&key)
302                    && remaining_size > parallel_compact_size
303                {
304                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
305                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
306                    last_buffer_size = data_size;
307                } else {
308                    last_buffer_size += data_size;
309                }
310                remaining_size -= data_size;
311                last_key = key;
312            }
313            return Ok(splits);
314        }
315    }
316
317    Ok(vec![])
318}
319
320pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
321    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
322    let total_input_uncompressed_file_size = task
323        .read_input_ssts()
324        .map(|table| table.uncompressed_file_size)
325        .sum::<u64>();
326
327    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
328    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
329}
330
331pub fn estimate_output_key_count_by_size(
332    total_key_count: u64,
333    total_size: u64,
334    output_capacity: usize,
335) -> usize {
336    if total_key_count == 0 {
337        return 0;
338    }
339    if total_size == 0 {
340        return total_key_count.try_into().unwrap_or(usize::MAX);
341    }
342    if output_capacity == 0 {
343        return 0;
344    }
345
346    // Estimate how many keys one target-sized output SST will contain by using the input key
347    // density. This replaces the old task-level key-count heuristic for filter layout decisions:
348    // large tasks can now still produce plain-filter output SSTs when each output is small.
349    let estimated = (total_key_count as u128 * output_capacity as u128)
350        .div_ceil(total_size as u128)
351        .min(total_key_count as u128);
352    estimated.try_into().unwrap_or(usize::MAX)
353}
354
355pub fn estimate_output_key_count_for_input_ssts<'a>(
356    input_ssts: impl Iterator<Item = &'a SstableInfo>,
357    output_capacity: usize,
358) -> usize {
359    let (total_key_count, total_uncompressed_size) =
360        input_ssts.fold((0u64, 0u64), |(key_count, size), sst| {
361            (
362                key_count + sst.total_key_count,
363                size + sst.uncompressed_file_size,
364            )
365        });
366
367    estimate_output_key_count_by_size(total_key_count, total_uncompressed_size, output_capacity)
368}
369
370pub fn estimate_output_key_count_for_task(task: &CompactTask, output_capacity: usize) -> usize {
371    estimate_output_key_count_for_input_ssts(task.read_input_ssts(), output_capacity)
372}
373
374pub fn blocked_xor_filter_key_count_threshold(
375    blocked_xor_filter_kv_count_threshold: Option<u64>,
376) -> usize {
377    blocked_xor_filter_kv_count_threshold
378        .unwrap_or(compaction_config::DEFAULT_BLOCKED_XOR_FILTER_KV_COUNT_THRESHOLD)
379        .try_into()
380        .unwrap_or(usize::MAX)
381}
382
383/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
384pub async fn check_compaction_result(
385    compact_task: &CompactTask,
386    context: CompactorContext,
387    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
388) -> HummockResult<bool> {
389    // This check method does not consider dropped keys by compaction filter.
390    if compact_task.contains_ttl() {
391        return Ok(true);
392    }
393
394    let mut table_iters = Vec::new();
395
396    for level in &compact_task.input_ssts {
397        if level.level_type == PbLevelType::Nonoverlapping {
398            let tables = level.read_sstable_infos().cloned().collect_vec();
399            if tables.is_empty() {
400                continue;
401            }
402            debug_assert!(can_concat(&tables));
403
404            table_iters.push(ConcatSstableIterator::new(
405                tables,
406                KeyRange::inf(),
407                context.sstable_store.clone(),
408                Arc::new(TaskProgress::default()),
409                context.storage_opts.compactor_iter_max_io_retry_times,
410            ));
411        } else {
412            for table_info in level.read_sstable_infos().cloned() {
413                table_iters.push(ConcatSstableIterator::new(
414                    vec![table_info],
415                    KeyRange::inf(),
416                    context.sstable_store.clone(),
417                    Arc::new(TaskProgress::default()),
418                    context.storage_opts.compactor_iter_max_io_retry_times,
419                ));
420            }
421        }
422    }
423
424    let iter = MergeIterator::for_compactor(table_iters);
425    let left_iter = {
426        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
427            iter,
428            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
429                compact_task.pk_prefix_table_watermarks.clone(),
430            ),
431        );
432
433        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
434            skip_watermark_iter,
435            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
436                compact_task.non_pk_prefix_table_watermarks.clone(),
437                compaction_catalog_agent_ref.clone(),
438            ),
439        );
440
441        UserIterator::new(
442            combine_iter,
443            (Bound::Unbounded, Bound::Unbounded),
444            u64::MAX,
445            0,
446            None,
447        )
448    };
449    let iter = ConcatSstableIterator::new(
450        compact_task.sorted_output_ssts.clone(),
451        KeyRange::inf(),
452        context.sstable_store.clone(),
453        Arc::new(TaskProgress::default()),
454        context.storage_opts.compactor_iter_max_io_retry_times,
455    );
456    let right_iter = {
457        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
458            iter,
459            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
460                compact_task.pk_prefix_table_watermarks.clone(),
461            ),
462        );
463
464        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
465            skip_watermark_iter,
466            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
467                compact_task.non_pk_prefix_table_watermarks.clone(),
468                compaction_catalog_agent_ref,
469            ),
470        );
471
472        UserIterator::new(
473            combine_iter,
474            (Bound::Unbounded, Bound::Unbounded),
475            u64::MAX,
476            0,
477            None,
478        )
479    };
480
481    check_result(left_iter, right_iter).await
482}
483
484pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
485    left_iter: UserIterator<I>,
486    sort_ssts: Vec<SstableInfo>,
487    context: CompactorContext,
488) -> HummockResult<bool> {
489    let iter = ConcatSstableIterator::new(
490        sort_ssts,
491        KeyRange::inf(),
492        context.sstable_store.clone(),
493        Arc::new(TaskProgress::default()),
494        0,
495    );
496    let right_iter = UserIterator::new(
497        iter,
498        (Bound::Unbounded, Bound::Unbounded),
499        u64::MAX,
500        0,
501        None,
502    );
503    check_result(left_iter, right_iter).await
504}
505
506async fn check_result<
507    I1: HummockIterator<Direction = Forward>,
508    I2: HummockIterator<Direction = Forward>,
509>(
510    mut left_iter: UserIterator<I1>,
511    mut right_iter: UserIterator<I2>,
512) -> HummockResult<bool> {
513    left_iter.rewind().await?;
514    right_iter.rewind().await?;
515    let mut right_count = 0;
516    let mut left_count = 0;
517    while left_iter.is_valid() && right_iter.is_valid() {
518        if left_iter.key() != right_iter.key() {
519            tracing::error!(
520                "The key of input and output not equal. key: {:?} vs {:?}",
521                left_iter.key(),
522                right_iter.key()
523            );
524            return Ok(false);
525        }
526        if left_iter.value() != right_iter.value() {
527            tracing::error!(
528                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
529                left_iter.key(),
530                left_iter.value(),
531                right_iter.value()
532            );
533            return Ok(false);
534        }
535        left_iter.next().await?;
536        right_iter.next().await?;
537        left_count += 1;
538        right_count += 1;
539    }
540    while left_iter.is_valid() {
541        left_count += 1;
542        left_iter.next().await?;
543    }
544    while right_iter.is_valid() {
545        right_count += 1;
546        right_iter.next().await?;
547    }
548    if left_count != right_count {
549        tracing::error!(
550            "The key count of input and output not equal: {} vs {}",
551            left_count,
552            right_count
553        );
554        return Ok(false);
555    }
556    Ok(true)
557}
558
559pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
560    let input_ssts = compact_task.read_input_ssts().collect_vec();
561    let compaction_size = input_ssts_size(&input_ssts);
562    optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
563}
564
565fn optimize_by_copy_block_with_input(
566    compact_task: &CompactTask,
567    context: &CompactorContext,
568    input_ssts: &[&SstableInfo],
569    compaction_size: u64,
570) -> bool {
571    let all_ssts_are_blocked_filter = input_ssts
572        .iter()
573        .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
574    let current_filter_type = compact_task.sstable_filter_kind;
575    let all_ssts_match_filter_family = input_ssts
576        .iter()
577        .all(|table_info| table_info.filter_type_compatible_with(current_filter_type));
578    // Fast compaction path can only preserve blocked filters by copying block payloads (and their
579    // per-block filter bytes). If the output-SST-level heuristic now wants a plain filter, fall back
580    // to the normal compaction path to rebuild filters. This intentionally lets tasks that were
581    // previously classified as blocked by total task key count move back to plain output filters.
582    let output_capacity = estimate_task_output_capacity(context.clone(), compact_task);
583    let estimated_output_key_count =
584        estimate_output_key_count_for_input_ssts(input_ssts.iter().copied(), output_capacity);
585    let output_wants_blocked_filter =
586        compact_task.should_use_block_based_filter_for_output(estimated_output_key_count as u64);
587
588    let delete_key_count = input_ssts
589        .iter()
590        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
591        .sum::<u64>();
592    let total_key_count = input_ssts
593        .iter()
594        .map(|table_info| table_info.total_key_count)
595        .sum::<u64>();
596
597    let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
598    context.storage_opts.enable_fast_compaction
599        && current_filter_type == PbSstableFilterType::SstableFilterXor16
600        && all_ssts_are_blocked_filter
601        && all_ssts_match_filter_family
602        && output_wants_blocked_filter
603        && !compact_task.contains_range_tombstone()
604        && !compact_task.contains_ttl()
605        && !compact_task.contains_split_sst()
606        && single_table
607        && compact_task.target_level > 0
608        && compact_task.input_ssts.len() == 2
609        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
610        && delete_key_count * 100
611            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
612        && compact_task.task_type == PbTaskType::Dynamic
613}
614
615pub async fn generate_splits_for_task(
616    compact_task: &mut CompactTask,
617    context: &CompactorContext,
618    optimize_by_copy_block: bool,
619) -> HummockResult<()> {
620    let input_ssts = compact_task.read_input_ssts().collect_vec();
621    let compaction_size = input_ssts_size(&input_ssts);
622
623    if !optimize_by_copy_block {
624        let splits = generate_splits(
625            &input_ssts,
626            compaction_size,
627            context,
628            compact_task.max_sub_compaction,
629        )
630        .await?;
631        if !splits.is_empty() {
632            compact_task.splits = splits;
633        }
634        return Ok(());
635    }
636
637    Ok(())
638}
639
640pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
641    let group_label = compact_task.compaction_group_id.to_string();
642    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
643
644    let (select_size, select_count) = read_sstable_size_and_count(
645        compact_task
646            .input_ssts
647            .iter()
648            .filter(|level| level.level_idx != compact_task.target_level)
649            .flat_map(|level| level.read_sstable_infos()),
650    );
651    let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
652        compact_task
653            .input_ssts
654            .iter()
655            .filter(|level| level.level_idx == compact_task.target_level)
656            .flat_map(|level| level.read_sstable_infos()),
657    );
658
659    context
660        .compactor_metrics
661        .compact_read_current_level
662        .with_label_values(&[&group_label, &cur_level_label])
663        .inc_by(select_size);
664    context
665        .compactor_metrics
666        .compact_read_sstn_current_level
667        .with_label_values(&[&group_label, &cur_level_label])
668        .inc_by(select_count as u64);
669
670    let next_level_label = compact_task.target_level.to_string();
671    context
672        .compactor_metrics
673        .compact_read_next_level
674        .with_label_values(&[&group_label, &next_level_label])
675        .inc_by(target_level_read_bytes);
676    context
677        .compactor_metrics
678        .compact_read_sstn_next_level
679        .with_label_values(&[&group_label, &next_level_label])
680        .inc_by(target_count as u64);
681}
682
683fn read_sstable_size_and_count<'a>(
684    sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
685) -> (u64, usize) {
686    sstable_infos
687        .into_iter()
688        .fold((0, 0), |(size, count), table_info| {
689            (size + table_info.sst_size, count + 1)
690        })
691}
692
693pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
694    let input_ssts = compact_task.read_input_ssts().collect_vec();
695    let compaction_size = input_ssts_size(&input_ssts);
696    let optimize_by_copy_block =
697        optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
698
699    if optimize_by_copy_block {
700        return 1;
701    }
702
703    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
704    calculate_task_parallelism_impl(
705        context.compaction_executor.worker_num(),
706        parallel_compact_size,
707        compaction_size,
708        compact_task.max_sub_compaction,
709    )
710}
711
712fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
713    input_ssts
714        .iter()
715        .map(|table_info| table_info.sst_size)
716        .sum()
717}
718
719pub fn calculate_task_parallelism_impl(
720    worker_num: usize,
721    parallel_compact_size: u64,
722    compaction_size: u64,
723    max_sub_compaction: u32,
724) -> usize {
725    let parallelism = compaction_size.div_ceil(parallel_compact_size);
726    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
727}
728
729#[cfg(test)]
730mod tests {
731    use std::sync::Arc;
732
733    use risingwave_common::catalog::TableId;
734    use risingwave_hummock_sdk::level::InputLevel;
735    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
736    use risingwave_pb::hummock::compact_task::PbTaskType;
737    use risingwave_pb::hummock::{
738        PbBloomFilterType, PbLevelType, PbSstableFilterLayout, PbSstableFilterType,
739    };
740
741    use super::{
742        CompactTask, CompactorContext, estimate_output_key_count_by_size, optimize_by_copy_block,
743    };
744    use crate::hummock::compactor::new_compaction_await_tree_reg_ref;
745    use crate::hummock::iterator::test_utils::mock_sstable_store;
746    use crate::monitor::CompactorMetrics;
747    use crate::opts::StorageOpts;
748
749    fn test_sstable(
750        table_id: TableId,
751        total_key_count: u64,
752    ) -> risingwave_hummock_sdk::sstable_info::SstableInfo {
753        SstableInfoInner {
754            object_id: 1.into(),
755            sst_id: 1.into(),
756            table_ids: vec![table_id],
757            total_key_count,
758            sst_size: 1024,
759            uncompressed_file_size: 1024,
760            bloom_filter_kind: PbBloomFilterType::Blocked,
761            filter_type: PbSstableFilterType::SstableFilterXor16,
762            ..Default::default()
763        }
764        .into()
765    }
766
767    async fn test_context() -> CompactorContext {
768        CompactorContext::new_local_compact_context(
769            Arc::new(StorageOpts::default()),
770            mock_sstable_store().await,
771            Arc::new(CompactorMetrics::unused()),
772            Some(new_compaction_await_tree_reg_ref(
773                await_tree::Config::default(),
774            )),
775        )
776    }
777
778    fn test_compact_task(
779        layout: PbSstableFilterLayout,
780        blocked_xor_filter_kv_count_threshold: Option<u64>,
781    ) -> CompactTask {
782        let table_id = TableId::new(1);
783        CompactTask {
784            input_ssts: vec![
785                InputLevel {
786                    level_idx: 1,
787                    level_type: PbLevelType::Nonoverlapping,
788                    table_infos: vec![test_sstable(table_id, 10)],
789                },
790                InputLevel {
791                    level_idx: 2,
792                    level_type: PbLevelType::Nonoverlapping,
793                    table_infos: vec![test_sstable(table_id, 10)],
794                },
795            ],
796            existing_table_ids: vec![table_id],
797            target_level: 2,
798            target_file_size: 1024,
799            task_type: PbTaskType::Dynamic,
800            sstable_filter_kind: PbSstableFilterType::SstableFilterXor16,
801            sstable_filter_layout: layout,
802            blocked_xor_filter_kv_count_threshold,
803            ..Default::default()
804        }
805    }
806
807    #[test]
808    fn test_estimate_output_key_count_by_size_scales_to_output_sst() {
809        let estimated_key_count =
810            estimate_output_key_count_by_size(1024 * 1024, 512 * 1024 * 1024, 128 * 1024 * 1024);
811
812        assert_eq!(estimated_key_count, 256 * 1024);
813        assert_eq!(estimate_output_key_count_by_size(100, 0, 0), 100);
814    }
815
816    #[tokio::test]
817    async fn test_optimize_by_copy_block_respects_plain_layout() {
818        let context = test_context().await;
819        let compact_task = test_compact_task(PbSstableFilterLayout::Plain, Some(1));
820
821        assert!(!optimize_by_copy_block(&compact_task, &context));
822    }
823
824    #[tokio::test]
825    async fn test_optimize_by_copy_block_respects_auto_threshold() {
826        let context = test_context().await;
827        let compact_task = test_compact_task(PbSstableFilterLayout::Auto, Some(1024));
828
829        assert!(!optimize_by_copy_block(&compact_task, &context));
830    }
831
832    #[tokio::test]
833    async fn test_optimize_by_copy_block_keeps_blocked_output_when_requested() {
834        let context = test_context().await;
835        let compact_task = test_compact_task(PbSstableFilterLayout::Blocked, Some(1024));
836
837        assert!(optimize_by_copy_block(&compact_task, &context));
838    }
839}