Skip to main content

risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::config::meta::default::compaction_config;
25use risingwave_common::constants::hummock::CompactionFilterFlag;
26use risingwave_hummock_sdk::compact_task::CompactTask;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{
34    PbLevelType, PbSstableFilterLayout, PbSstableFilterType, PbTableSchema,
35};
36use tokio::time::Instant;
37
38pub use super::context::CompactorContext;
39use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
40use crate::hummock::compactor::{
41    ConcatSstableIterator, MultiCompactionFilter, TaskProgress, TtlCompactionFilter,
42};
43use crate::hummock::iterator::{
44    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
45    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
46    UserIterator,
47};
48use crate::hummock::multi_builder::TableBuilderFactory;
49use crate::hummock::{
50    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56    pub object_id_getter: Arc<dyn GetObjectId>,
57    pub limiter: Arc<MemoryLimiter>,
58    pub options: SstableBuilderOptions,
59    pub policy: CachePolicy,
60    pub remote_rpc_cost: Arc<AtomicU64>,
61    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62    pub sstable_writer_factory: W,
63    pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68    type Filter = F;
69    type Writer = W::Writer;
70
71    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72        let timer = Instant::now();
73        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76        let writer_options = SstableWriterOptions {
77            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78            tracker: None,
79            policy: self.policy,
80        };
81        let writer = self
82            .sstable_writer_factory
83            .create_sst_writer(table_id, writer_options)
84            .await?;
85        let builder = SstableBuilder::new(
86            table_id,
87            writer,
88            Self::Filter::create(self.options.filter_builder_options()),
89            self.options.clone(),
90            self.compaction_catalog_agent_ref.clone(),
91            Some(self.limiter.clone()),
92        );
93        Ok(builder)
94    }
95}
96
97/// `CompactionStatistics` will count the results of each compact split
98#[derive(Default, Debug)]
99pub struct CompactionStatistics {
100    // to report per-table metrics
101    pub delta_drop_stat: TableStatsMap,
102
103    // to calculate delete ratio
104    pub iter_total_key_counts: u64,
105    pub iter_drop_key_counts: u64,
106}
107
108#[derive(Clone, Default)]
109pub struct TaskConfig {
110    pub(crate) key_range: KeyRange,
111    pub(crate) cache_policy: CachePolicy,
112    pub(crate) gc_delete_keys: bool,
113    pub(crate) retain_multiple_version: bool,
114    /// Resolved output SST filter layout from `CompactTask::sstable_filter_layout_for_output`.
115    pub(crate) sstable_filter_layout: PbSstableFilterLayout,
116    pub(crate) sstable_filter_type: PbSstableFilterType,
117
118    pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
119    /// `TableId` -> `TableSchema`
120    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
121    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
122    pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
123    /// `disable_drop_column_optimization` should only be set in benchmark.
124    pub(crate) disable_drop_column_optimization: bool,
125}
126
127impl TaskConfig {
128    /// Construct a compact task config for tests and benchmarks.
129    pub fn for_test(
130        key_range: KeyRange,
131        cache_policy: CachePolicy,
132        gc_delete_keys: bool,
133        sstable_filter_layout: PbSstableFilterLayout,
134        table_schemas: HashMap<TableId, PbTableSchema>,
135    ) -> Self {
136        Self {
137            key_range,
138            cache_policy,
139            gc_delete_keys,
140            retain_multiple_version: false,
141            sstable_filter_layout,
142            sstable_filter_type: PbSstableFilterType::SstableFilterXor16,
143            table_vnode_partition: BTreeMap::default(),
144            table_schemas,
145            disable_drop_column_optimization: false,
146        }
147    }
148
149    pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
150        self.disable_drop_column_optimization = disable;
151        self
152    }
153}
154
155pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
156    let mut multi_filter = MultiCompactionFilter::default();
157    let compaction_filter_flag =
158        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
159    // STATE_CLEAN is handled by compact-task table-id normalization and iterator-level block
160    // pruning. Keep the flag as a no-op here for compatibility with existing task configs.
161
162    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
163        let id_to_ttl = compact_task
164            .table_options
165            .iter()
166            .filter_map(|(id, option)| {
167                option
168                    .retention_seconds
169                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
170            })
171            .collect();
172
173        let ttl_filter = Box::new(TtlCompactionFilter::new(
174            id_to_ttl,
175            compact_task.current_epoch_time,
176        ));
177        multi_filter.register(ttl_filter);
178    }
179
180    multi_filter
181}
182
183fn generate_splits_fast(
184    sstable_infos: &[&SstableInfo],
185    compaction_size: u64,
186    context: &CompactorContext,
187    max_sub_compaction: u32,
188) -> Vec<KeyRange> {
189    let worker_num = context.compaction_executor.worker_num();
190    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
191
192    let parallelism = calculate_task_parallelism_impl(
193        worker_num,
194        parallel_compact_size,
195        compaction_size,
196        max_sub_compaction,
197    );
198    let mut indexes = vec![];
199    for sst in sstable_infos {
200        let key_range = &sst.key_range;
201        indexes.push(
202            FullKey {
203                user_key: FullKey::decode(&key_range.left).user_key,
204                epoch_with_gap: EpochWithGap::new_max_epoch(),
205            }
206            .encode(),
207        );
208        indexes.push(
209            FullKey {
210                user_key: FullKey::decode(&key_range.right).user_key,
211                epoch_with_gap: EpochWithGap::new_max_epoch(),
212            }
213            .encode(),
214        );
215    }
216    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
217    indexes.dedup();
218    if indexes.len() <= parallelism {
219        return vec![];
220    }
221
222    let mut splits = vec![];
223    splits.push(KeyRange::default());
224    let parallel_key_count = indexes.len() / parallelism;
225    let mut last_split_key_count = 0;
226    for key in indexes {
227        if last_split_key_count >= parallel_key_count {
228            splits.last_mut().unwrap().right = Bytes::from(key.clone());
229            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
230            last_split_key_count = 0;
231        }
232        last_split_key_count += 1;
233    }
234
235    splits
236}
237
238pub async fn generate_splits(
239    sstable_infos: &[&SstableInfo],
240    compaction_size: u64,
241    context: &CompactorContext,
242    max_sub_compaction: u32,
243) -> HummockResult<Vec<KeyRange>> {
244    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
245    if compaction_size > parallel_compact_size {
246        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
247            return Ok(generate_splits_fast(
248                sstable_infos,
249                compaction_size,
250                context,
251                max_sub_compaction,
252            ));
253        }
254        let mut indexes = vec![];
255        // preload the meta and get the smallest key to split sub_compaction
256        for sstable_info in sstable_infos {
257            let sstable = context
258                .sstable_store
259                .sstable(sstable_info, &mut StoreLocalStatistic::default())
260                .await?;
261            indexes.extend(sstable.meta.block_metas.iter().map(|block| {
262                let data_size = block.len;
263                let full_key = FullKey {
264                    user_key: FullKey::decode(&block.smallest_key).user_key,
265                    epoch_with_gap: EpochWithGap::new_max_epoch(),
266                }
267                .encode();
268                (data_size as u64, full_key)
269            }));
270        }
271        // sort by key, as for every data block has the same size;
272        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
273        let mut splits = vec![];
274        splits.push(KeyRange::default());
275
276        let parallelism = calculate_task_parallelism_impl(
277            context.compaction_executor.worker_num(),
278            parallel_compact_size,
279            compaction_size,
280            max_sub_compaction,
281        );
282
283        let sub_compaction_data_size =
284            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
285
286        if parallelism > 1 {
287            let mut last_buffer_size = 0;
288            let mut last_key: Vec<u8> = vec![];
289            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
290            for (data_size, key) in indexes {
291                if last_buffer_size >= sub_compaction_data_size
292                    && !last_key.eq(&key)
293                    && remaining_size > parallel_compact_size
294                {
295                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
296                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
297                    last_buffer_size = data_size;
298                } else {
299                    last_buffer_size += data_size;
300                }
301                remaining_size -= data_size;
302                last_key = key;
303            }
304            return Ok(splits);
305        }
306    }
307
308    Ok(vec![])
309}
310
311pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
312    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
313    let total_input_uncompressed_file_size = task
314        .read_input_ssts()
315        .map(|table| table.uncompressed_file_size)
316        .sum::<u64>();
317
318    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
319    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
320}
321
322pub fn estimate_output_key_count_by_size(
323    total_key_count: u64,
324    total_size: u64,
325    output_capacity: usize,
326) -> usize {
327    if total_key_count == 0 {
328        return 0;
329    }
330    if total_size == 0 {
331        return total_key_count.try_into().unwrap_or(usize::MAX);
332    }
333    if output_capacity == 0 {
334        return 0;
335    }
336
337    // Estimate how many keys one target-sized output SST will contain by using the input key
338    // density. This replaces the old task-level key-count heuristic for filter layout decisions:
339    // large tasks can now still produce plain-filter output SSTs when each output is small.
340    let estimated = (total_key_count as u128 * output_capacity as u128)
341        .div_ceil(total_size as u128)
342        .min(total_key_count as u128);
343    estimated.try_into().unwrap_or(usize::MAX)
344}
345
346pub fn estimate_output_key_count_for_input_ssts<'a>(
347    input_ssts: impl Iterator<Item = &'a SstableInfo>,
348    output_capacity: usize,
349) -> usize {
350    let (total_key_count, total_uncompressed_size) =
351        input_ssts.fold((0u64, 0u64), |(key_count, size), sst| {
352            (
353                key_count + sst.total_key_count,
354                size + sst.uncompressed_file_size,
355            )
356        });
357
358    estimate_output_key_count_by_size(total_key_count, total_uncompressed_size, output_capacity)
359}
360
361pub fn estimate_output_key_count_for_task(task: &CompactTask, output_capacity: usize) -> usize {
362    estimate_output_key_count_for_input_ssts(task.read_input_ssts(), output_capacity)
363}
364
365pub fn blocked_xor_filter_key_count_threshold(
366    blocked_xor_filter_kv_count_threshold: Option<u64>,
367) -> usize {
368    blocked_xor_filter_kv_count_threshold
369        .unwrap_or(compaction_config::DEFAULT_BLOCKED_XOR_FILTER_KV_COUNT_THRESHOLD)
370        .try_into()
371        .unwrap_or(usize::MAX)
372}
373
374/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
375pub async fn check_compaction_result(
376    compact_task: &CompactTask,
377    context: CompactorContext,
378    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
379) -> HummockResult<bool> {
380    // This check method does not consider dropped keys by compaction filter.
381    if compact_task.contains_ttl() {
382        return Ok(true);
383    }
384
385    let mut table_iters = Vec::new();
386
387    for level in &compact_task.input_ssts {
388        if level.level_type == PbLevelType::Nonoverlapping {
389            let tables = level.read_sstable_infos().cloned().collect_vec();
390            if tables.is_empty() {
391                continue;
392            }
393            debug_assert!(can_concat(&tables));
394
395            table_iters.push(ConcatSstableIterator::new(
396                tables,
397                KeyRange::inf(),
398                context.sstable_store.clone(),
399                Arc::new(TaskProgress::default()),
400                context.storage_opts.compactor_iter_max_io_retry_times,
401            ));
402        } else {
403            for table_info in level.read_sstable_infos().cloned() {
404                table_iters.push(ConcatSstableIterator::new(
405                    vec![table_info],
406                    KeyRange::inf(),
407                    context.sstable_store.clone(),
408                    Arc::new(TaskProgress::default()),
409                    context.storage_opts.compactor_iter_max_io_retry_times,
410                ));
411            }
412        }
413    }
414
415    let iter = MergeIterator::for_compactor(table_iters);
416    let left_iter = {
417        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
418            iter,
419            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
420                compact_task.pk_prefix_table_watermarks.clone(),
421            ),
422        );
423
424        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
425            skip_watermark_iter,
426            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
427                compact_task.non_pk_prefix_table_watermarks.clone(),
428                compaction_catalog_agent_ref.clone(),
429            ),
430        );
431
432        UserIterator::new(
433            combine_iter,
434            (Bound::Unbounded, Bound::Unbounded),
435            u64::MAX,
436            0,
437            None,
438        )
439    };
440    let iter = ConcatSstableIterator::new(
441        compact_task.sorted_output_ssts.clone(),
442        KeyRange::inf(),
443        context.sstable_store.clone(),
444        Arc::new(TaskProgress::default()),
445        context.storage_opts.compactor_iter_max_io_retry_times,
446    );
447    let right_iter = {
448        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
449            iter,
450            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
451                compact_task.pk_prefix_table_watermarks.clone(),
452            ),
453        );
454
455        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
456            skip_watermark_iter,
457            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
458                compact_task.non_pk_prefix_table_watermarks.clone(),
459                compaction_catalog_agent_ref,
460            ),
461        );
462
463        UserIterator::new(
464            combine_iter,
465            (Bound::Unbounded, Bound::Unbounded),
466            u64::MAX,
467            0,
468            None,
469        )
470    };
471
472    check_result(left_iter, right_iter).await
473}
474
475pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
476    left_iter: UserIterator<I>,
477    sort_ssts: Vec<SstableInfo>,
478    context: CompactorContext,
479) -> HummockResult<bool> {
480    let iter = ConcatSstableIterator::new(
481        sort_ssts,
482        KeyRange::inf(),
483        context.sstable_store.clone(),
484        Arc::new(TaskProgress::default()),
485        0,
486    );
487    let right_iter = UserIterator::new(
488        iter,
489        (Bound::Unbounded, Bound::Unbounded),
490        u64::MAX,
491        0,
492        None,
493    );
494    check_result(left_iter, right_iter).await
495}
496
497async fn check_result<
498    I1: HummockIterator<Direction = Forward>,
499    I2: HummockIterator<Direction = Forward>,
500>(
501    mut left_iter: UserIterator<I1>,
502    mut right_iter: UserIterator<I2>,
503) -> HummockResult<bool> {
504    left_iter.rewind().await?;
505    right_iter.rewind().await?;
506    let mut right_count = 0;
507    let mut left_count = 0;
508    while left_iter.is_valid() && right_iter.is_valid() {
509        if left_iter.key() != right_iter.key() {
510            tracing::error!(
511                "The key of input and output not equal. key: {:?} vs {:?}",
512                left_iter.key(),
513                right_iter.key()
514            );
515            return Ok(false);
516        }
517        if left_iter.value() != right_iter.value() {
518            tracing::error!(
519                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
520                left_iter.key(),
521                left_iter.value(),
522                right_iter.value()
523            );
524            return Ok(false);
525        }
526        left_iter.next().await?;
527        right_iter.next().await?;
528        left_count += 1;
529        right_count += 1;
530    }
531    while left_iter.is_valid() {
532        left_count += 1;
533        left_iter.next().await?;
534    }
535    while right_iter.is_valid() {
536        right_count += 1;
537        right_iter.next().await?;
538    }
539    if left_count != right_count {
540        tracing::error!(
541            "The key count of input and output not equal: {} vs {}",
542            left_count,
543            right_count
544        );
545        return Ok(false);
546    }
547    Ok(true)
548}
549
550pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
551    let input_ssts = compact_task.read_input_ssts().collect_vec();
552    let compaction_size = input_ssts_size(&input_ssts);
553    optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size)
554}
555
556fn optimize_by_copy_block_with_input(
557    compact_task: &CompactTask,
558    context: &CompactorContext,
559    input_ssts: &[&SstableInfo],
560    compaction_size: u64,
561) -> bool {
562    let all_ssts_are_blocked_filter = input_ssts
563        .iter()
564        .all(|table_info| table_info.filter_layout == PbSstableFilterLayout::Blocked);
565    let current_filter_type = compact_task.sstable_filter_type;
566    let all_ssts_match_filter_type = input_ssts
567        .iter()
568        .all(|table_info| table_info.filter_type == current_filter_type);
569    // Fast compaction path can only preserve blocked filters by copying block payloads (and their
570    // per-block filter bytes). If the output-SST-level heuristic now wants a plain filter, fall back
571    // to the normal compaction path to rebuild filters. This intentionally lets tasks that were
572    // previously classified as blocked by total task key count move back to plain output filters.
573    let output_capacity = estimate_task_output_capacity(context.clone(), compact_task);
574    let estimated_output_key_count =
575        estimate_output_key_count_for_input_ssts(input_ssts.iter().copied(), output_capacity);
576    let output_filter_layout =
577        compact_task.sstable_filter_layout_for_output(estimated_output_key_count as u64);
578
579    let delete_key_count = input_ssts
580        .iter()
581        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
582        .sum::<u64>();
583    let total_key_count = input_ssts
584        .iter()
585        .map(|table_info| table_info.total_key_count)
586        .sum::<u64>();
587
588    let single_table = compact_task.get_table_ids_from_input_ssts().count() == 1;
589    context.storage_opts.enable_fast_compaction
590        && matches!(
591            current_filter_type,
592            PbSstableFilterType::SstableFilterXor8 | PbSstableFilterType::SstableFilterXor16
593        )
594        && all_ssts_are_blocked_filter
595        && all_ssts_match_filter_type
596        && output_filter_layout == PbSstableFilterLayout::Blocked
597        && !compact_task.contains_range_tombstone()
598        && !compact_task.contains_ttl()
599        && !compact_task.contains_split_sst()
600        && single_table
601        && compact_task.target_level > 0
602        && compact_task.input_ssts.len() == 2
603        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
604        && delete_key_count * 100
605            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
606        && compact_task.task_type == PbTaskType::Dynamic
607}
608
609pub async fn generate_splits_for_task(
610    compact_task: &mut CompactTask,
611    context: &CompactorContext,
612    optimize_by_copy_block: bool,
613) -> HummockResult<()> {
614    let input_ssts = compact_task.read_input_ssts().collect_vec();
615    let compaction_size = input_ssts_size(&input_ssts);
616
617    if !optimize_by_copy_block {
618        let splits = generate_splits(
619            &input_ssts,
620            compaction_size,
621            context,
622            compact_task.max_sub_compaction,
623        )
624        .await?;
625        if !splits.is_empty() {
626            compact_task.splits = splits;
627        }
628        return Ok(());
629    }
630
631    Ok(())
632}
633
634pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
635    let group_label = compact_task.compaction_group_id.to_string();
636    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
637
638    let (select_size, select_count) = read_sstable_size_and_count(
639        compact_task
640            .input_ssts
641            .iter()
642            .filter(|level| level.level_idx != compact_task.target_level)
643            .flat_map(|level| level.read_sstable_infos()),
644    );
645    let (target_level_read_bytes, target_count) = read_sstable_size_and_count(
646        compact_task
647            .input_ssts
648            .iter()
649            .filter(|level| level.level_idx == compact_task.target_level)
650            .flat_map(|level| level.read_sstable_infos()),
651    );
652
653    context
654        .compactor_metrics
655        .compact_read_current_level
656        .with_label_values(&[&group_label, &cur_level_label])
657        .inc_by(select_size);
658    context
659        .compactor_metrics
660        .compact_read_sstn_current_level
661        .with_label_values(&[&group_label, &cur_level_label])
662        .inc_by(select_count as u64);
663
664    let next_level_label = compact_task.target_level.to_string();
665    context
666        .compactor_metrics
667        .compact_read_next_level
668        .with_label_values(&[&group_label, &next_level_label])
669        .inc_by(target_level_read_bytes);
670    context
671        .compactor_metrics
672        .compact_read_sstn_next_level
673        .with_label_values(&[&group_label, &next_level_label])
674        .inc_by(target_count as u64);
675}
676
677fn read_sstable_size_and_count<'a>(
678    sstable_infos: impl IntoIterator<Item = &'a SstableInfo>,
679) -> (u64, usize) {
680    sstable_infos
681        .into_iter()
682        .fold((0, 0), |(size, count), table_info| {
683            (size + table_info.sst_size, count + 1)
684        })
685}
686
687pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
688    let input_ssts = compact_task.read_input_ssts().collect_vec();
689    let compaction_size = input_ssts_size(&input_ssts);
690    let optimize_by_copy_block =
691        optimize_by_copy_block_with_input(compact_task, context, &input_ssts, compaction_size);
692
693    if optimize_by_copy_block {
694        return 1;
695    }
696
697    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
698    calculate_task_parallelism_impl(
699        context.compaction_executor.worker_num(),
700        parallel_compact_size,
701        compaction_size,
702        compact_task.max_sub_compaction,
703    )
704}
705
706fn input_ssts_size(input_ssts: &[&SstableInfo]) -> u64 {
707    input_ssts
708        .iter()
709        .map(|table_info| table_info.sst_size)
710        .sum()
711}
712
713pub fn calculate_task_parallelism_impl(
714    worker_num: usize,
715    parallel_compact_size: u64,
716    compaction_size: u64,
717    max_sub_compaction: u32,
718) -> usize {
719    let parallelism = compaction_size.div_ceil(parallel_compact_size);
720    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
721}
722
723#[cfg(test)]
724mod tests {
725    use std::sync::Arc;
726
727    use risingwave_common::catalog::TableId;
728    use risingwave_hummock_sdk::level::InputLevel;
729    use risingwave_hummock_sdk::sstable_info::SstableInfoInner;
730    use risingwave_pb::hummock::compact_task::PbTaskType;
731    use risingwave_pb::hummock::{PbLevelType, PbSstableFilterLayout, PbSstableFilterType};
732
733    use super::{
734        CompactTask, CompactorContext, estimate_output_key_count_by_size, optimize_by_copy_block,
735    };
736    use crate::hummock::compactor::new_compaction_await_tree_reg_ref;
737    use crate::hummock::iterator::test_utils::mock_sstable_store;
738    use crate::monitor::CompactorMetrics;
739    use crate::opts::StorageOpts;
740
741    fn test_sstable(
742        table_id: TableId,
743        total_key_count: u64,
744        filter_type: PbSstableFilterType,
745    ) -> risingwave_hummock_sdk::sstable_info::SstableInfo {
746        SstableInfoInner {
747            object_id: 1.into(),
748            sst_id: 1.into(),
749            table_ids: vec![table_id],
750            total_key_count,
751            sst_size: 1024,
752            uncompressed_file_size: 1024,
753            filter_type,
754            filter_layout: PbSstableFilterLayout::Blocked,
755            ..Default::default()
756        }
757        .into()
758    }
759
760    async fn test_context() -> CompactorContext {
761        CompactorContext::new_local_compact_context(
762            Arc::new(StorageOpts::default()),
763            mock_sstable_store().await,
764            Arc::new(CompactorMetrics::unused()),
765            Some(new_compaction_await_tree_reg_ref(
766                await_tree::Config::default(),
767            )),
768        )
769    }
770
771    fn test_compact_task(
772        layout: PbSstableFilterLayout,
773        blocked_xor_filter_kv_count_threshold: Option<u64>,
774        filter_type: PbSstableFilterType,
775    ) -> CompactTask {
776        let table_id = TableId::new(1);
777        CompactTask {
778            input_ssts: vec![
779                InputLevel {
780                    level_idx: 1,
781                    level_type: PbLevelType::Nonoverlapping,
782                    table_infos: vec![test_sstable(table_id, 10, filter_type)],
783                },
784                InputLevel {
785                    level_idx: 2,
786                    level_type: PbLevelType::Nonoverlapping,
787                    table_infos: vec![test_sstable(table_id, 10, filter_type)],
788                },
789            ],
790            existing_table_ids: vec![table_id],
791            target_level: 2,
792            target_file_size: 1024,
793            task_type: PbTaskType::Dynamic,
794            sstable_filter_type: filter_type,
795            sstable_filter_layout: layout,
796            blocked_xor_filter_kv_count_threshold,
797            ..Default::default()
798        }
799    }
800
801    #[test]
802    fn test_estimate_output_key_count_by_size_scales_to_output_sst() {
803        let estimated_key_count =
804            estimate_output_key_count_by_size(1024 * 1024, 512 * 1024 * 1024, 128 * 1024 * 1024);
805
806        assert_eq!(estimated_key_count, 256 * 1024);
807        assert_eq!(estimate_output_key_count_by_size(100, 0, 0), 100);
808    }
809
810    #[tokio::test]
811    async fn test_optimize_by_copy_block_respects_layout_policy() {
812        let context = test_context().await;
813        let compact_task = test_compact_task(
814            PbSstableFilterLayout::Plain,
815            Some(1),
816            PbSstableFilterType::SstableFilterXor16,
817        );
818
819        assert!(!optimize_by_copy_block(&compact_task, &context));
820
821        let compact_task = test_compact_task(
822            PbSstableFilterLayout::Auto,
823            Some(1024),
824            PbSstableFilterType::SstableFilterXor16,
825        );
826
827        assert!(!optimize_by_copy_block(&compact_task, &context));
828    }
829
830    #[tokio::test]
831    async fn test_optimize_by_copy_block_supports_blocked_xor_filters() {
832        let context = test_context().await;
833        let compact_task = test_compact_task(
834            PbSstableFilterLayout::Blocked,
835            Some(1024),
836            PbSstableFilterType::SstableFilterXor16,
837        );
838
839        assert!(optimize_by_copy_block(&compact_task, &context));
840
841        let compact_task = test_compact_task(
842            PbSstableFilterLayout::Blocked,
843            Some(1024),
844            PbSstableFilterType::SstableFilterXor8,
845        );
846
847        assert!(optimize_by_copy_block(&compact_task, &context));
848    }
849}