risingwave_storage/hummock/compactor/
compaction_utils.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21use bytes::Bytes;
22use itertools::Itertools;
23use risingwave_common::catalog::TableId;
24use risingwave_common::constants::hummock::CompactionFilterFlag;
25use risingwave_hummock_sdk::compact_task::CompactTask;
26use risingwave_hummock_sdk::compaction_group::StateTableId;
27use risingwave_hummock_sdk::key::FullKey;
28use risingwave_hummock_sdk::key_range::KeyRange;
29use risingwave_hummock_sdk::sstable_info::SstableInfo;
30use risingwave_hummock_sdk::table_stats::TableStatsMap;
31use risingwave_hummock_sdk::{EpochWithGap, KeyComparator, can_concat};
32use risingwave_pb::hummock::compact_task::PbTaskType;
33use risingwave_pb::hummock::{BloomFilterType, PbLevelType, PbTableSchema};
34use tokio::time::Instant;
35
36pub use super::context::CompactorContext;
37use crate::compaction_catalog_manager::CompactionCatalogAgentRef;
38use crate::hummock::compactor::{
39    ConcatSstableIterator, MultiCompactionFilter, StateCleanUpCompactionFilter, TaskProgress,
40    TtlCompactionFilter,
41};
42use crate::hummock::iterator::{
43    Forward, HummockIterator, MergeIterator, NonPkPrefixSkipWatermarkIterator,
44    NonPkPrefixSkipWatermarkState, PkPrefixSkipWatermarkIterator, PkPrefixSkipWatermarkState,
45    UserIterator,
46};
47use crate::hummock::multi_builder::TableBuilderFactory;
48use crate::hummock::sstable::DEFAULT_ENTRY_SIZE;
49use crate::hummock::{
50    CachePolicy, FilterBuilder, GetObjectId, HummockResult, MemoryLimiter, SstableBuilder,
51    SstableBuilderOptions, SstableWriterFactory, SstableWriterOptions,
52};
53use crate::monitor::StoreLocalStatistic;
54
55pub struct RemoteBuilderFactory<W: SstableWriterFactory, F: FilterBuilder> {
56    pub object_id_getter: Arc<dyn GetObjectId>,
57    pub limiter: Arc<MemoryLimiter>,
58    pub options: SstableBuilderOptions,
59    pub policy: CachePolicy,
60    pub remote_rpc_cost: Arc<AtomicU64>,
61    pub compaction_catalog_agent_ref: CompactionCatalogAgentRef,
62    pub sstable_writer_factory: W,
63    pub _phantom: PhantomData<F>,
64}
65
66#[async_trait::async_trait]
67impl<W: SstableWriterFactory, F: FilterBuilder> TableBuilderFactory for RemoteBuilderFactory<W, F> {
68    type Filter = F;
69    type Writer = W::Writer;
70
71    async fn open_builder(&mut self) -> HummockResult<SstableBuilder<Self::Writer, Self::Filter>> {
72        let timer = Instant::now();
73        let table_id = self.object_id_getter.get_new_sst_object_id().await?;
74        let cost = (timer.elapsed().as_secs_f64() * 1000000.0).round() as u64;
75        self.remote_rpc_cost.fetch_add(cost, Ordering::Relaxed);
76        let writer_options = SstableWriterOptions {
77            capacity_hint: Some(self.options.capacity + self.options.block_capacity),
78            tracker: None,
79            policy: self.policy,
80        };
81        let writer = self
82            .sstable_writer_factory
83            .create_sst_writer(table_id, writer_options)
84            .await?;
85        let builder = SstableBuilder::new(
86            table_id,
87            writer,
88            Self::Filter::create(
89                self.options.bloom_false_positive,
90                self.options.capacity / DEFAULT_ENTRY_SIZE + 1,
91            ),
92            self.options.clone(),
93            self.compaction_catalog_agent_ref.clone(),
94            Some(self.limiter.clone()),
95        );
96        Ok(builder)
97    }
98}
99
100/// `CompactionStatistics` will count the results of each compact split
101#[derive(Default, Debug)]
102pub struct CompactionStatistics {
103    // to report per-table metrics
104    pub delta_drop_stat: TableStatsMap,
105
106    // to calculate delete ratio
107    pub iter_total_key_counts: u64,
108    pub iter_drop_key_counts: u64,
109}
110
111impl CompactionStatistics {
112    #[allow(dead_code)]
113    fn delete_ratio(&self) -> Option<u64> {
114        if self.iter_total_key_counts == 0 {
115            return None;
116        }
117
118        Some(self.iter_drop_key_counts / self.iter_total_key_counts)
119    }
120}
121
122#[derive(Clone, Default)]
123pub struct TaskConfig {
124    pub(crate) key_range: KeyRange,
125    pub(crate) cache_policy: CachePolicy,
126    pub(crate) gc_delete_keys: bool,
127    pub(crate) retain_multiple_version: bool,
128    /// `stats_target_table_ids` decides whether a dropped key should be counted as table stats
129    /// change. For an divided SST as input, a dropped key shouldn't be counted if its table id
130    /// doesn't belong to this divided SST. See `Compactor::compact_and_build_sst`.
131    pub(crate) stats_target_table_ids: Option<HashSet<TableId>>,
132    pub(crate) use_block_based_filter: bool,
133
134    pub(crate) table_vnode_partition: BTreeMap<TableId, u32>,
135    /// `TableId` -> `TableSchema`
136    /// Schemas in `table_schemas` are at least as new as the one used to create `input_ssts`.
137    /// For a table with schema existing in `table_schemas`, its columns not in `table_schemas` but in `input_ssts` can be safely dropped.
138    pub(crate) table_schemas: HashMap<TableId, PbTableSchema>,
139    /// `disable_drop_column_optimization` should only be set in benchmark.
140    pub(crate) disable_drop_column_optimization: bool,
141}
142
143impl TaskConfig {
144    #[cfg(any(test, feature = "test"))]
145    pub fn for_test(
146        key_range: KeyRange,
147        cache_policy: CachePolicy,
148        gc_delete_keys: bool,
149        use_block_based_filter: bool,
150        table_schemas: HashMap<TableId, PbTableSchema>,
151    ) -> Self {
152        Self {
153            key_range,
154            cache_policy,
155            gc_delete_keys,
156            retain_multiple_version: false,
157            stats_target_table_ids: None,
158            use_block_based_filter,
159            table_vnode_partition: BTreeMap::default(),
160            table_schemas,
161            disable_drop_column_optimization: false,
162        }
163    }
164
165    #[cfg(any(test, feature = "test"))]
166    pub fn with_disable_drop_column_optimization(mut self, disable: bool) -> Self {
167        self.disable_drop_column_optimization = disable;
168        self
169    }
170}
171
172pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter {
173    let mut multi_filter = MultiCompactionFilter::default();
174    let compaction_filter_flag =
175        CompactionFilterFlag::from_bits(compact_task.compaction_filter_mask).unwrap_or_default();
176    if compaction_filter_flag.contains(CompactionFilterFlag::STATE_CLEAN) {
177        let state_clean_up_filter = Box::new(StateCleanUpCompactionFilter::new(
178            HashSet::from_iter(compact_task.existing_table_ids.clone()),
179        ));
180
181        multi_filter.register(state_clean_up_filter);
182    }
183
184    if compaction_filter_flag.contains(CompactionFilterFlag::TTL) {
185        let id_to_ttl = compact_task
186            .table_options
187            .iter()
188            .filter_map(|(id, option)| {
189                option
190                    .retention_seconds
191                    .and_then(|ttl| if ttl > 0 { Some((*id, ttl)) } else { None })
192            })
193            .collect();
194
195        let ttl_filter = Box::new(TtlCompactionFilter::new(
196            id_to_ttl,
197            compact_task.current_epoch_time,
198        ));
199        multi_filter.register(ttl_filter);
200    }
201
202    multi_filter
203}
204
205fn generate_splits_fast(
206    sstable_infos: &Vec<SstableInfo>,
207    compaction_size: u64,
208    context: &CompactorContext,
209    max_sub_compaction: u32,
210) -> Vec<KeyRange> {
211    let worker_num = context.compaction_executor.worker_num();
212    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
213
214    let parallelism = calculate_task_parallelism_impl(
215        worker_num,
216        parallel_compact_size,
217        compaction_size,
218        max_sub_compaction,
219    );
220    let mut indexes = vec![];
221    for sst in sstable_infos {
222        let key_range = &sst.key_range;
223        indexes.push(
224            FullKey {
225                user_key: FullKey::decode(&key_range.left).user_key,
226                epoch_with_gap: EpochWithGap::new_max_epoch(),
227            }
228            .encode(),
229        );
230        indexes.push(
231            FullKey {
232                user_key: FullKey::decode(&key_range.right).user_key,
233                epoch_with_gap: EpochWithGap::new_max_epoch(),
234            }
235            .encode(),
236        );
237    }
238    indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
239    indexes.dedup();
240    if indexes.len() <= parallelism {
241        return vec![];
242    }
243
244    let mut splits = vec![];
245    splits.push(KeyRange::default());
246    let parallel_key_count = indexes.len() / parallelism;
247    let mut last_split_key_count = 0;
248    for key in indexes {
249        if last_split_key_count >= parallel_key_count {
250            splits.last_mut().unwrap().right = Bytes::from(key.clone());
251            splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
252            last_split_key_count = 0;
253        }
254        last_split_key_count += 1;
255    }
256
257    splits
258}
259
260pub async fn generate_splits(
261    sstable_infos: &Vec<SstableInfo>,
262    compaction_size: u64,
263    context: &CompactorContext,
264    max_sub_compaction: u32,
265) -> HummockResult<Vec<KeyRange>> {
266    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
267    if compaction_size > parallel_compact_size {
268        if sstable_infos.len() > context.storage_opts.compactor_max_preload_meta_file_count {
269            return Ok(generate_splits_fast(
270                sstable_infos,
271                compaction_size,
272                context,
273                max_sub_compaction,
274            ));
275        }
276        let mut indexes = vec![];
277        // preload the meta and get the smallest key to split sub_compaction
278        for sstable_info in sstable_infos {
279            indexes.extend(
280                context
281                    .sstable_store
282                    .sstable(sstable_info, &mut StoreLocalStatistic::default())
283                    .await?
284                    .meta
285                    .block_metas
286                    .iter()
287                    .map(|block| {
288                        let data_size = block.len;
289                        let full_key = FullKey {
290                            user_key: FullKey::decode(&block.smallest_key).user_key,
291                            epoch_with_gap: EpochWithGap::new_max_epoch(),
292                        }
293                        .encode();
294                        (data_size as u64, full_key)
295                    })
296                    .collect_vec(),
297            );
298        }
299        // sort by key, as for every data block has the same size;
300        indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.1.as_ref(), b.1.as_ref()));
301        let mut splits = vec![];
302        splits.push(KeyRange::default());
303
304        let parallelism = calculate_task_parallelism_impl(
305            context.compaction_executor.worker_num(),
306            parallel_compact_size,
307            compaction_size,
308            max_sub_compaction,
309        );
310
311        let sub_compaction_data_size =
312            std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size);
313
314        if parallelism > 1 {
315            let mut last_buffer_size = 0;
316            let mut last_key: Vec<u8> = vec![];
317            let mut remaining_size = indexes.iter().map(|block| block.0).sum::<u64>();
318            for (data_size, key) in indexes {
319                if last_buffer_size >= sub_compaction_data_size
320                    && !last_key.eq(&key)
321                    && remaining_size > parallel_compact_size
322                {
323                    splits.last_mut().unwrap().right = Bytes::from(key.clone());
324                    splits.push(KeyRange::new(Bytes::from(key.clone()), Bytes::default()));
325                    last_buffer_size = data_size;
326                } else {
327                    last_buffer_size += data_size;
328                }
329                remaining_size -= data_size;
330                last_key = key;
331            }
332            return Ok(splits);
333        }
334    }
335
336    Ok(vec![])
337}
338
339pub fn estimate_task_output_capacity(context: CompactorContext, task: &CompactTask) -> usize {
340    let max_target_file_size = context.storage_opts.sstable_size_mb as usize * (1 << 20);
341    let total_input_uncompressed_file_size = task
342        .input_ssts
343        .iter()
344        .flat_map(|level| level.table_infos.iter())
345        .map(|table| table.uncompressed_file_size)
346        .sum::<u64>();
347
348    let capacity = std::cmp::min(task.target_file_size as usize, max_target_file_size);
349    std::cmp::min(capacity, total_input_uncompressed_file_size as usize)
350}
351
352/// Compare result of compaction task and input. The data saw by user shall not change after applying compaction result.
353pub async fn check_compaction_result(
354    compact_task: &CompactTask,
355    context: CompactorContext,
356    compaction_catalog_agent_ref: CompactionCatalogAgentRef,
357) -> HummockResult<bool> {
358    let mut table_ids_from_input_ssts = compact_task.get_table_ids_from_input_ssts();
359    let need_clean_state_table = table_ids_from_input_ssts
360        .any(|table_id| !compact_task.existing_table_ids.contains(&table_id));
361    // This check method does not consider dropped keys by compaction filter.
362    if compact_task.contains_ttl() || need_clean_state_table {
363        return Ok(true);
364    }
365
366    let mut table_iters = Vec::new();
367    for level in &compact_task.input_ssts {
368        if level.table_infos.is_empty() {
369            continue;
370        }
371
372        // Do not need to filter the table because manager has done it.
373        if level.level_type == PbLevelType::Nonoverlapping {
374            debug_assert!(can_concat(&level.table_infos));
375
376            table_iters.push(ConcatSstableIterator::new(
377                compact_task.existing_table_ids.clone(),
378                level.table_infos.clone(),
379                KeyRange::inf(),
380                context.sstable_store.clone(),
381                Arc::new(TaskProgress::default()),
382                context.storage_opts.compactor_iter_max_io_retry_times,
383            ));
384        } else {
385            for table_info in &level.table_infos {
386                table_iters.push(ConcatSstableIterator::new(
387                    compact_task.existing_table_ids.clone(),
388                    vec![table_info.clone()],
389                    KeyRange::inf(),
390                    context.sstable_store.clone(),
391                    Arc::new(TaskProgress::default()),
392                    context.storage_opts.compactor_iter_max_io_retry_times,
393                ));
394            }
395        }
396    }
397
398    let iter = MergeIterator::for_compactor(table_iters);
399    let left_iter = {
400        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
401            iter,
402            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
403                compact_task.pk_prefix_table_watermarks.clone(),
404            ),
405        );
406
407        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
408            skip_watermark_iter,
409            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
410                compact_task.non_pk_prefix_table_watermarks.clone(),
411                compaction_catalog_agent_ref.clone(),
412            ),
413        );
414
415        UserIterator::new(
416            combine_iter,
417            (Bound::Unbounded, Bound::Unbounded),
418            u64::MAX,
419            0,
420            None,
421        )
422    };
423    let iter = ConcatSstableIterator::new(
424        compact_task.existing_table_ids.clone(),
425        compact_task.sorted_output_ssts.clone(),
426        KeyRange::inf(),
427        context.sstable_store.clone(),
428        Arc::new(TaskProgress::default()),
429        context.storage_opts.compactor_iter_max_io_retry_times,
430    );
431    let right_iter = {
432        let skip_watermark_iter = PkPrefixSkipWatermarkIterator::new(
433            iter,
434            PkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
435                compact_task.pk_prefix_table_watermarks.clone(),
436            ),
437        );
438
439        let combine_iter = NonPkPrefixSkipWatermarkIterator::new(
440            skip_watermark_iter,
441            NonPkPrefixSkipWatermarkState::from_safe_epoch_watermarks(
442                compact_task.non_pk_prefix_table_watermarks.clone(),
443                compaction_catalog_agent_ref,
444            ),
445        );
446
447        UserIterator::new(
448            combine_iter,
449            (Bound::Unbounded, Bound::Unbounded),
450            u64::MAX,
451            0,
452            None,
453        )
454    };
455
456    check_result(left_iter, right_iter).await
457}
458
459pub async fn check_flush_result<I: HummockIterator<Direction = Forward>>(
460    left_iter: UserIterator<I>,
461    existing_table_ids: Vec<StateTableId>,
462    sort_ssts: Vec<SstableInfo>,
463    context: CompactorContext,
464) -> HummockResult<bool> {
465    let iter = ConcatSstableIterator::new(
466        existing_table_ids.clone(),
467        sort_ssts.clone(),
468        KeyRange::inf(),
469        context.sstable_store.clone(),
470        Arc::new(TaskProgress::default()),
471        0,
472    );
473    let right_iter = UserIterator::new(
474        iter,
475        (Bound::Unbounded, Bound::Unbounded),
476        u64::MAX,
477        0,
478        None,
479    );
480    check_result(left_iter, right_iter).await
481}
482
483async fn check_result<
484    I1: HummockIterator<Direction = Forward>,
485    I2: HummockIterator<Direction = Forward>,
486>(
487    mut left_iter: UserIterator<I1>,
488    mut right_iter: UserIterator<I2>,
489) -> HummockResult<bool> {
490    left_iter.rewind().await?;
491    right_iter.rewind().await?;
492    let mut right_count = 0;
493    let mut left_count = 0;
494    while left_iter.is_valid() && right_iter.is_valid() {
495        if left_iter.key() != right_iter.key() {
496            tracing::error!(
497                "The key of input and output not equal. key: {:?} vs {:?}",
498                left_iter.key(),
499                right_iter.key()
500            );
501            return Ok(false);
502        }
503        if left_iter.value() != right_iter.value() {
504            tracing::error!(
505                "The value of input and output not equal. key: {:?}, value: {:?} vs {:?}",
506                left_iter.key(),
507                left_iter.value(),
508                right_iter.value()
509            );
510            return Ok(false);
511        }
512        left_iter.next().await?;
513        right_iter.next().await?;
514        left_count += 1;
515        right_count += 1;
516    }
517    while left_iter.is_valid() {
518        left_count += 1;
519        left_iter.next().await?;
520    }
521    while right_iter.is_valid() {
522        right_count += 1;
523        right_iter.next().await?;
524    }
525    if left_count != right_count {
526        tracing::error!(
527            "The key count of input and output not equal: {} vs {}",
528            left_count,
529            right_count
530        );
531        return Ok(false);
532    }
533    Ok(true)
534}
535
536pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
537    let sstable_infos = compact_task
538        .input_ssts
539        .iter()
540        .flat_map(|level| level.table_infos.iter())
541        .filter(|table_info| {
542            let table_ids = &table_info.table_ids;
543            table_ids
544                .iter()
545                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
546        })
547        .cloned()
548        .collect_vec();
549    let compaction_size = sstable_infos
550        .iter()
551        .map(|table_info| table_info.sst_size)
552        .sum::<u64>();
553
554    let all_ssts_are_blocked_filter = sstable_infos
555        .iter()
556        .all(|table_info| table_info.bloom_filter_kind == BloomFilterType::Blocked);
557
558    let delete_key_count = sstable_infos
559        .iter()
560        .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
561        .sum::<u64>();
562    let total_key_count = sstable_infos
563        .iter()
564        .map(|table_info| table_info.total_key_count)
565        .sum::<u64>();
566
567    let single_table = compact_task.build_compact_table_ids().len() == 1;
568    context.storage_opts.enable_fast_compaction
569        && all_ssts_are_blocked_filter
570        && !compact_task.contains_range_tombstone()
571        && !compact_task.contains_ttl()
572        && !compact_task.contains_split_sst()
573        && single_table
574        && compact_task.target_level > 0
575        && compact_task.input_ssts.len() == 2
576        && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
577        && delete_key_count * 100
578            < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
579        && compact_task.task_type == PbTaskType::Dynamic
580}
581
582pub async fn generate_splits_for_task(
583    compact_task: &mut CompactTask,
584    context: &CompactorContext,
585    optimize_by_copy_block: bool,
586) -> HummockResult<()> {
587    let sstable_infos = compact_task
588        .input_ssts
589        .iter()
590        .flat_map(|level| level.table_infos.iter())
591        .filter(|table_info| {
592            let table_ids = &table_info.table_ids;
593            table_ids
594                .iter()
595                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
596        })
597        .cloned()
598        .collect_vec();
599    let compaction_size = sstable_infos
600        .iter()
601        .map(|table_info| table_info.sst_size)
602        .sum::<u64>();
603
604    if !optimize_by_copy_block {
605        let splits = generate_splits(
606            &sstable_infos,
607            compaction_size,
608            context,
609            compact_task.max_sub_compaction,
610        )
611        .await?;
612        if !splits.is_empty() {
613            compact_task.splits = splits;
614        }
615        return Ok(());
616    }
617
618    Ok(())
619}
620
621pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
622    let group_label = compact_task.compaction_group_id.to_string();
623    let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
624    let select_table_infos = compact_task
625        .input_ssts
626        .iter()
627        .filter(|level| level.level_idx != compact_task.target_level)
628        .flat_map(|level| level.table_infos.iter())
629        .collect_vec();
630    let target_table_infos = compact_task
631        .input_ssts
632        .iter()
633        .filter(|level| level.level_idx == compact_task.target_level)
634        .flat_map(|level| level.table_infos.iter())
635        .collect_vec();
636    let select_size = select_table_infos
637        .iter()
638        .map(|table| table.sst_size)
639        .sum::<u64>();
640    context
641        .compactor_metrics
642        .compact_read_current_level
643        .with_label_values(&[&group_label, &cur_level_label])
644        .inc_by(select_size);
645    context
646        .compactor_metrics
647        .compact_read_sstn_current_level
648        .with_label_values(&[&group_label, &cur_level_label])
649        .inc_by(select_table_infos.len() as u64);
650
651    let target_level_read_bytes = target_table_infos.iter().map(|t| t.sst_size).sum::<u64>();
652    let next_level_label = compact_task.target_level.to_string();
653    context
654        .compactor_metrics
655        .compact_read_next_level
656        .with_label_values(&[&group_label, &next_level_label])
657        .inc_by(target_level_read_bytes);
658    context
659        .compactor_metrics
660        .compact_read_sstn_next_level
661        .with_label_values(&[&group_label, &next_level_label])
662        .inc_by(target_table_infos.len() as u64);
663}
664
665pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
666    let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);
667
668    if optimize_by_copy_block {
669        return 1;
670    }
671
672    let sstable_infos = compact_task
673        .input_ssts
674        .iter()
675        .flat_map(|level| level.table_infos.iter())
676        .filter(|table_info| {
677            let table_ids = &table_info.table_ids;
678            table_ids
679                .iter()
680                .any(|table_id| compact_task.existing_table_ids.contains(table_id))
681        })
682        .cloned()
683        .collect_vec();
684    let compaction_size = sstable_infos
685        .iter()
686        .map(|table_info| table_info.sst_size)
687        .sum::<u64>();
688    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
689    calculate_task_parallelism_impl(
690        context.compaction_executor.worker_num(),
691        parallel_compact_size,
692        compaction_size,
693        compact_task.max_sub_compaction,
694    )
695}
696
697pub fn calculate_task_parallelism_impl(
698    worker_num: usize,
699    parallel_compact_size: u64,
700    compaction_size: u64,
701    max_sub_compaction: u32,
702) -> usize {
703    let parallelism = compaction_size.div_ceil(parallel_compact_size);
704    worker_num.min(parallelism.min(max_sub_compaction as u64) as usize)
705}